diff --git a/socketserver/cmd/ffzsocketserver/socketserver.go b/socketserver/cmd/ffzsocketserver/socketserver.go index fedcdf80..8b56d707 100644 --- a/socketserver/cmd/ffzsocketserver/socketserver.go +++ b/socketserver/cmd/ffzsocketserver/socketserver.go @@ -11,6 +11,8 @@ import ( "os" ) +import _ "net/http/pprof" + var configFilename = flag.String("config", "config.json", "Configuration file, including the keypairs for the NaCl crypto library, for communicating with the backend.") var flagGenerateKeys = flag.Bool("genkeys", false, "Generate NaCl keys instead of serving requests.\nArguments: [int serverId] [base64 backendPublic]\nThe backend public key can either be specified in base64 on the command line, or put in the json file later.") diff --git a/socketserver/cmd/statsweb/config.go b/socketserver/cmd/statsweb/config.go index 6ddca585..04591f44 100644 --- a/socketserver/cmd/statsweb/config.go +++ b/socketserver/cmd/statsweb/config.go @@ -18,11 +18,11 @@ func makeConfig() { if ok { config.DatabaseLocation = fmt.Sprintf("%s/.ffzstatsweb/database.sqlite", home) config.GobFilesLocation = fmt.Sprintf("%s/.ffzstatsweb/gobcache", home) - os.MkdirAll(config.GobFilesLocation, 0644) + os.MkdirAll(config.GobFilesLocation, 0755) } else { config.DatabaseLocation = "./database.sqlite" config.GobFilesLocation = "./gobcache" - os.MkdirAll(config.GobFilesLocation, 0644) + os.MkdirAll(config.GobFilesLocation, 0755) } file, err := os.Create(*configLocation) if err != nil { diff --git a/socketserver/cmd/statsweb/html.go b/socketserver/cmd/statsweb/html.go index 303c4ddf..a2bca5d4 100644 --- a/socketserver/cmd/statsweb/html.go +++ b/socketserver/cmd/statsweb/html.go @@ -33,8 +33,9 @@ type CalendarMonthInfo struct { func GetMonthInfo(at time.Time) CalendarMonthInfo { year, month, _ := at.Date() - // 1 (start of month) - weekday of start of month = day offset of start of week at start of month - monthWeekStartDay := 1 - time.Date(year, month, 1, 0, 0, 0, 0, server.CounterLocation).Weekday() + monthStartWeekday := time.Date(year, month, 1, 0, 0, 0, 0, server.CounterLocation).Weekday() + // 1 (start of month) - weekday of start of month = day offset of start of week at start of mont + monthWeekStartDay := 1 - int(monthStartWeekday) // first day on calendar + 6 weeks < end of month? sixthSundayDay := monthWeekStartDay + 5*7 sixthSundayDate := time.Date(year, month, sixthSundayDay, 0, 0, 0, 0, server.CounterLocation) @@ -55,5 +56,6 @@ func renderCalendar(w http.ResponseWriter, at time.Time) { layout, err := template.ParseFiles("./webroot/layout.template.html", "./webroot/cal_entry.hbs", "./webroot/calendar.hbs") data := CalendarData{} data.Weeks = make([]CalWeekData, 6) - + _ = layout + _ = err } diff --git a/socketserver/cmd/statsweb/servers.go b/socketserver/cmd/statsweb/servers.go index 83f9bffc..659b430b 100644 --- a/socketserver/cmd/statsweb/servers.go +++ b/socketserver/cmd/statsweb/servers.go @@ -1,17 +1,32 @@ package main +import ( + "time" + "io" + "fmt" + "os" + "net/http" + "errors" + "sync" + "github.com/hashicorp/golang-lru" + "github.com/clarkduvall/hyperloglog" + "bitbucket.org/stendec/frankerfacez/socketserver/server" + "encoding/gob" +) + type serverFilter struct { // Mode is false for blacklist, true for whitelist - Mode bool - Special string[] + Mode bool + Special []string } const serverFilterModeBlacklist = false const serverFilterModeWhitelist = true -func (sf *serverFilter) IsServerAllowed(server string) { +func (sf *serverFilter) IsServerAllowed(server *serverInfo) bool { + name := server.subdomain for _, v := range sf.Special { - if server == v { + if name == v { return sf.Mode } } @@ -28,7 +43,7 @@ func (sf *serverFilter) Remove(server string) { } } if idx != -1 { - var lenMinusOne = len(sf.Special)-1 + var lenMinusOne = len(sf.Special) - 1 sf.Special[idx] = sf.Special[lenMinusOne] sf.Special = sf.Special[:lenMinusOne] } @@ -52,7 +67,7 @@ func (sf *serverFilter) Add(server string) { } } if idx != -1 { - var lenMinusOne = len(sf.Special)-1 + var lenMinusOne = len(sf.Special) - 1 sf.Special[idx] = sf.Special[lenMinusOne] sf.Special = sf.Special[:lenMinusOne] } @@ -66,5 +81,250 @@ func (sf *serverFilter) Add(server string) { } } -const serverFilterAll serverFilter = serverFilter{Mode: serverFilterModeBlacklist} -const serverFilterNone serverFilter = serverFilter{Mode: serverFilterModeWhitelist} +var serverFilterAll serverFilter = serverFilter{Mode: serverFilterModeBlacklist} +var serverFilterNone serverFilter = serverFilter{Mode: serverFilterModeWhitelist} + +func cannotCacheHLL(at time.Time) bool { + now := time.Now() + now.Add(-25 * time.Hour) + return now.Before(at) +} + +var ServerNames = []string{ + "catbag", + "andknuckles", + "tuturu", +} + +var httpClient http.Client + +const serverNameSuffix = ".frankerfacez.com" + +const failedStateThreshold = 4 + +var ErrServerInFailedState = errors.New("server has been down recently and not recovered") +var ErrServerHasNoData = errors.New("no data for specified date") + +type errServerNot200 struct { + StatusCode int + StatusText string +} + +func (e *errServerNot200) Error() string { + return fmt.Sprintf("The server responded with %d %s", e.StatusCode, e.StatusText) +} +func Not200Error(resp *http.Response) *errServerNot200 { + return &errServerNot200{ + StatusCode: resp.StatusCode, + StatusText: resp.Status, + } +} + +func getHLLCacheKey(at time.Time) string { + year, month, day := at.Date() + return fmt.Sprintf("%d-%d-%d", year, month, day) +} + +type serverInfo struct { + subdomain string + + memcache *lru.TwoQueueCache + + FailedState bool + FailureErr error + failureCount int + + lock sync.Mutex +} + +func (si *serverInfo) Setup(subdomain string) { + si.subdomain = subdomain + tq, err := lru.New2Q(60) + if err != nil { + panic(err) + } + si.memcache = tq +} + +// GetHLL gets the HLL from +func (si *serverInfo) GetHLL(at time.Time) (*hyperloglog.HyperLogLogPlus, error) { + if cannotCacheHLL(at) { + err := si.ForceWrite() + if err != nil { + return nil, err + } + reader, err := si.DownloadHLL(at) + if err != nil { + return nil, err + } + fmt.Printf("downloaded hll %s:%s\n", si.subdomain, getHLLCacheKey(at)) + return loadHLLFromStream(reader) + } + + hll, ok := si.PeekHLL(at) + if ok { + fmt.Printf("got cached hll %s:%s\n", si.subdomain, getHLLCacheKey(at)) + return hll, nil + } + + reader, err := si.OpenHLL(at) + if err != nil { + // continue to download + } else { + fmt.Printf("opened hll %s:%s\n", si.subdomain, getHLLCacheKey(at)) + return loadHLLFromStream(reader) + } + + reader, err = si.DownloadHLL(at) + if err != nil { + if err == ErrServerHasNoData { + return hyperloglog.NewPlus(server.CounterPrecision) + } + return nil, err + } + fmt.Printf("downloaded hll %s:%s\n", si.subdomain, getHLLCacheKey(at)) + return loadHLLFromStream(reader) +} + +func loadHLLFromStream(reader io.ReadCloser) (*hyperloglog.HyperLogLogPlus, error) { + defer reader.Close() + hll, _ := hyperloglog.NewPlus(server.CounterPrecision) + dec := gob.NewDecoder(reader) + err := dec.Decode(hll) + if err != nil { + return nil, err + } + return hll, nil +} + +// PeekHLL tries to grab a HLL from the memcache without downloading it or hitting the disk. +func (si *serverInfo) PeekHLL(at time.Time) (*hyperloglog.HyperLogLogPlus, bool) { + if cannotCacheHLL(at) { + return nil, false + } + + key := getHLLCacheKey(at) + hll, ok := si.memcache.Get(key) + if ok { + return hll.(*hyperloglog.HyperLogLogPlus), true + } + + return nil, false +} + +func (si *serverInfo) OpenHLL(at time.Time) (io.ReadCloser, error) { + year, month, day := at.Date() + filename := fmt.Sprintf("%s/%s/%d-%d-%d.gob", config.GobFilesLocation, si.subdomain, year, month, day) + + file, err := os.Open(filename) + if err == nil { + return file, nil + } + // file is nil + if !os.IsNotExist(err) { + return nil, err + } + + return nil, os.ErrNotExist +} + +func (si *serverInfo) DownloadHLL(at time.Time) (io.ReadCloser, error) { + if si.FailedState { + return nil, ErrServerInFailedState + } + si.lock.Lock() + defer si.lock.Unlock() + + year, month, day := at.Date() + url := fmt.Sprintf("https://%s/hll/daily-%d-%d-%d.gob", si.Domain(), day, month, year) + resp, err := httpClient.Get(url) + if err != nil { + si.ServerFailed(err) + return nil, err + } + if resp.StatusCode == 404 { + return nil, ErrServerHasNoData + } + if resp.StatusCode != 200 { + err = Not200Error(resp) + si.ServerFailed(err) + return nil, err + } + + filename := fmt.Sprintf("%s/%s/%d-%d-%d.gob", config.GobFilesLocation, si.subdomain, year, month, day) + file, err := os.OpenFile(filename, os.O_CREATE | os.O_EXCL | os.O_RDWR, 0644) + if os.IsNotExist(err) { + os.MkdirAll(fmt.Sprintf("%s/%s", config.GobFilesLocation, si.subdomain), 0755) + file, err = os.OpenFile(filename, os.O_CREATE | os.O_EXCL | os.O_RDWR, 0644) + } + if err != nil { + resp.Body.Close() + return nil, fmt.Errorf("downloadhll: error opening file for writing: %v", err) + } + + return &teeReadCloser{r: resp.Body, w: file}, nil +} + +func (si *serverInfo) ForceWrite() error { + if si.FailedState { + return ErrServerInFailedState + } + + url := fmt.Sprintf("https://%s/hll_force_write", si.Domain()) + resp, err := httpClient.Get(url) + if err != nil { + si.ServerFailed(err) + return err + } + if resp.StatusCode != 200 { + err = Not200Error(resp) + si.ServerFailed(err) + return err + } + resp.Body.Close() + return nil +} + +func (si *serverInfo) Domain() string { + return fmt.Sprintf("%s%s", si.subdomain, serverNameSuffix) +} + +func (si *serverInfo) ServerFailed(err error) { + si.lock.Lock() + defer si.lock.Unlock() + si.failureCount++ + if si.failureCount > failedStateThreshold { + fmt.Printf("Server %s entering failed state\n", si.subdomain) + si.FailedState = true + si.FailureErr = err + go recoveryCheck(si) + } +} + +func recoveryCheck(si *serverInfo) { + // TODO check for server recovery +} + +type teeReadCloser struct { + r io.ReadCloser + w io.WriteCloser +} + +func (t *teeReadCloser) Read(p []byte) (n int, err error) { + n, err = t.r.Read(p) + if n > 0 { + if n, err := t.w.Write(p[:n]); err != nil { + return n, err + } + } + return +} + +func (t *teeReadCloser) Close() error { + err1 := t.r.Close() + err2 := t.w.Close() + if err1 != nil { + return err1 + } + return err2 +} diff --git a/socketserver/cmd/statsweb/statsweb.go b/socketserver/cmd/statsweb/statsweb.go index 284eb1d5..9fce5f6e 100644 --- a/socketserver/cmd/statsweb/statsweb.go +++ b/socketserver/cmd/statsweb/statsweb.go @@ -10,7 +10,8 @@ import ( "fmt" "strings" "errors" - "github.com/dustin/gojson" + "encoding/json" + "sync" ) var configLocation = flag.String("config", "./config.json", "Location of the configuration file. Defaults to ./config.json") @@ -20,6 +21,8 @@ var config ConfigFile const ExitCodeBadConfig = 2 +var allServers []*serverInfo + func main() { flag.Parse() @@ -30,6 +33,12 @@ func main() { loadConfig() + allServers = make([]*serverInfo, len(ServerNames)) + for i, v := range ServerNames { + allServers[i] = &serverInfo{} + allServers[i].Setup(v) + } + http.HandleFunc("/api/get", ServeAPIGet) http.ListenAndServe(config.ListenAddr, http.DefaultServeMux) } @@ -43,10 +52,12 @@ const jsonErrBlankRequest = `{"status":"error","error":"no queries given"}` const statusError = "error" const statusPartial = "partial" const statusOk = "ok" + type apiResponse struct { Status string `json:"status"` Responses []requestResponse `json:"resp"` } + type requestResponse struct { Status string `json:"status"` Request string `json:"req"` @@ -81,7 +92,7 @@ func ServeAPIGet(w http.ResponseWriter, r *http.Request) { resp.Responses[i] = ProcessSingleGetRequest(v) } for _, v := range resp.Responses { - if v.Status == statusError { + if v.Status != statusOk { resp.Status = statusPartial break } @@ -92,7 +103,7 @@ func ServeAPIGet(w http.ResponseWriter, r *http.Request) { enc.Encode(resp) } -const errRangeFormatIncorrect = errors.New("incorrect range format, must be yyyy-mm-dd~yyyy-mm-dd") +var errRangeFormatIncorrect = errors.New("incorrect range format, must be yyyy-mm-dd~yyyy-mm-dd") // ProcessSingleGetRequest takes a request string and pulls the unique user data for the given dates and filters. // @@ -122,14 +133,18 @@ const errRangeFormatIncorrect = errors.New("incorrect range format, must be yyyy // // It does not matter if a date is specified multiple times, due to the data format used. func ProcessSingleGetRequest(req string) (result requestResponse) { - var hll hyperloglog.HyperLogLogPlus, _ = hyperloglog.NewPlus(server.CounterPrecision) + fmt.Println("processing request:", req) + hll, _ := hyperloglog.NewPlus(server.CounterPrecision) result.Request = req result.Status = statusOk filter := serverFilterAll collectError := func(err error) bool { - if err != nil { + if err == ErrServerInFailedState { + result.Status = statusPartial + return false + } else if err != nil { result.Status = statusError result.Error = err.Error() return true @@ -161,7 +176,7 @@ func ProcessSingleGetRequest(req string) (result requestResponse) { break outerLoop } - err = addSingleDate(at, filter, &hll) + err = addSingleDate(at, filter, hll) if collectError(err) { break outerLoop } @@ -175,7 +190,7 @@ func ProcessSingleGetRequest(req string) (result requestResponse) { break outerLoop } - err = addRange(from, to, filter, &hll) + err = addRange(from, to, filter, hll) if collectError(err) { break outerLoop } @@ -200,26 +215,73 @@ func parseDateFromRequest(dateStr string) (time.Time, error) { if err != nil || n != 3 { return zeroTime, errBadDate } - return time.Date(year, month, day, 0, 0, 0, 0, server.CounterLocation) + return time.Date(year, time.Month(month), day, 0, 0, 0, 0, server.CounterLocation), nil +} + +type hllAndError struct { + hll *hyperloglog.HyperLogLogPlus + err error } func addSingleDate(at time.Time, filter serverFilter, dest *hyperloglog.HyperLogLogPlus) error { - // TODO - return nil + var partialErr error + for _, si := range allServers { + if filter.IsServerAllowed(si) { + hll, err2 := si.GetHLL(at) + if err2 == ErrServerInFailedState { + partialErr = err2 + } else if err2 != nil { + return err2 + } else { + dest.Merge(hll) + } + } + } + return partialErr } func addRange(start time.Time, end time.Time, filter serverFilter, dest *hyperloglog.HyperLogLogPlus) error { + end = server.TruncateToMidnight(end) + year, month, day := start.Date() + var partialErr error + var myAllServers = make([]*serverInfo, 0, len(allServers)) + for _, si := range allServers { + if filter.IsServerAllowed(si) { + myAllServers = append(myAllServers, si) + } + } - return nil + var ch = make(chan hllAndError) + var wg sync.WaitGroup + for current := start; current.Before(end); day = day + 1 { + current = time.Date(year, month, day, 0, 0, 0, 0, server.CounterLocation) + for _, si := range myAllServers { + wg.Add(1) + go getHLL(ch, si, current) + } + } + + go func() { + wg.Wait() + close(ch) + }() + + for pair := range ch { + wg.Done() + hll, err := pair.hll, pair.err + if err != nil { + if partialErr == nil || partialErr == ErrServerInFailedState { + partialErr = err + } + } else { + dest.Merge(hll) + } + } + + return partialErr } -func combineDateRange(from time.Time, to time.Time, dest *hyperloglog.HyperLogLogPlus) error { - from = server.TruncateToMidnight(from) - to = server.TruncateToMidnight(to) - year, month, day := from.Date() - for current := from; current.Before(to); day = day + 1 { - current = time.Date(year, month, day, 0, 0, 0, 0, server.CounterLocation) - - } - return nil +func getHLL(ch chan hllAndError, si *serverInfo, at time.Time) { + hll, err := si.GetHLL(at) + ch <- hllAndError{hll: hll, err: err} } \ No newline at end of file diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index ed0ae1f5..ec705987 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -177,7 +177,7 @@ var SocketUpgrader = websocket.Upgrader{ // Memes go here. var BannerHTML []byte -// StopAcceptingConnections is closed while the server is shutting down. +// StopAcceptingConnectionsCh is closed while the server is shutting down. var StopAcceptingConnectionsCh = make(chan struct{}) var StopAcceptingConnections = false @@ -283,12 +283,12 @@ const sendMessageBufferLength = 125 const sendMessageAbortLength = 50 // RunSocketConnection contains the main run loop of a websocket connection. - +// // First, it sets up the channels, the ClientInfo object, and the pong frame handler. // It starts the reader goroutine pointing at the newly created channels. // The function then enters the run loop (a `for{select{}}`). // The run loop is broken when an object is received on errorChan, or if `hello` is not the first C2S Command. - +// // After the run loop stops, the function launches a goroutine to drain // client.MessageChannel, signals the reader goroutine to stop, unsubscribes // from all pub/sub channels, waits on MsgChannelKeepalive (remember, the