diff --git a/socketserver/cmd/mergecounts/mergecounts.go b/socketserver/cmd/mergecounts/mergecounts.go index 4eff882f..af3ec83b 100644 --- a/socketserver/cmd/mergecounts/mergecounts.go +++ b/socketserver/cmd/mergecounts/mergecounts.go @@ -11,7 +11,7 @@ import ( ) var SERVERS = []string{ - "https://catbag.frankerfacez.com", +// "https://catbag.frankerfacez.com", "https://andknuckles.frankerfacez.com", "https://tuturu.frankerfacez.com", } diff --git a/socketserver/server/backend.go b/socketserver/server/backend.go index 5cad2f58..1ec4751c 100644 --- a/socketserver/server/backend.go +++ b/socketserver/server/backend.go @@ -120,7 +120,7 @@ type ErrForwardedFromBackend struct { } func (bfe ErrForwardedFromBackend) Error() string { - bytes, _ := json.Marshal(bfe) + bytes, _ := json.Marshal(bfe.JSONError) return string(bytes) } diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 49d73c53..f068e142 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -139,7 +139,7 @@ func shutdownHandler() { var wg sync.WaitGroup wg.Add(1) go func() { - writeAllHLLs() + writeHLL() wg.Done() }() diff --git a/socketserver/server/stats.go b/socketserver/server/stats.go index 0275d4d7..24fd52dc 100644 --- a/socketserver/server/stats.go +++ b/socketserver/server/stats.go @@ -165,10 +165,10 @@ func updateSysMem() { if err == nil { Statistics.SysMemTotalKB = memInfo.MemTotal Statistics.SysMemFreeKB = memInfo.MemAvailable + } - if memInfo.MemAvailable > 0 && memInfo.MemAvailable < Configuration.MinMemoryKBytes { - writeAllHLLs() - } + { + writeHLL() } } diff --git a/socketserver/server/subscriptions_test.go b/socketserver/server/subscriptions_test.go index d2e561d2..c0c04720 100644 --- a/socketserver/server/subscriptions_test.go +++ b/socketserver/server/subscriptions_test.go @@ -229,11 +229,9 @@ func TestSubscriptionAndPublish(t *testing.T) { doneWg.Wait() server.Close() - for _, period := range periods { - clientCount := readHLL(period) - if clientCount < 3 || clientCount > 5 { - t.Error("clientCount outside acceptable range: expected 4, got ", clientCount) - } + clientCount := readCurrentHLL() + if clientCount < 3 || clientCount > 5 { + t.Error("clientCount outside acceptable range: expected 4, got ", clientCount) } } diff --git a/socketserver/server/testinfra_test.go b/socketserver/server/testinfra_test.go index 9c03fa78..bc7c5e0b 100644 --- a/socketserver/server/testinfra_test.go +++ b/socketserver/server/testinfra_test.go @@ -12,6 +12,7 @@ import ( "sync" "testing" "time" + "strconv" ) const ( @@ -272,7 +273,7 @@ func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments in if deleteMode { form.Set("delete", "1") } - form.Set("time", time.Now().Format(time.UnixDate)) + form.Set("time", strconv.FormatInt(time.Now().Unix(), 10)) sealed, err := SealRequest(form) if err != nil { diff --git a/socketserver/server/usercount.go b/socketserver/server/usercount.go index 7cdbed67..74360172 100644 --- a/socketserver/server/usercount.go +++ b/socketserver/server/usercount.go @@ -14,6 +14,7 @@ import ( "github.com/clarkduvall/hyperloglog" "github.com/satori/go.uuid" + "io" ) // uuidHash implements a hash for uuid.UUID by XORing the random bits. @@ -34,67 +35,37 @@ type PeriodUniqueUsers struct { type usageToken struct{} -const ( - periodDaily = iota - periodWeekly - periodMonthly -) - -var periods [3]int = [3]int{periodDaily, periodWeekly, periodMonthly} - const uniqCountDir = "./uniques" -const usersDailyFmt = "daily-%d-%d-%d.gob" // d-m-y -const usersWeeklyFmt = "weekly-%d-%d.gob" // w-y -const usersMonthlyFmt = "monthly-%d-%d.gob" // m-y +const usersDailyFmt = "daily-%d-%d-%d.gob" // d-m-y const CounterPrecision uint8 = 12 -var uniqueCounters [3]PeriodUniqueUsers +var uniqueCounter PeriodUniqueUsers var uniqueUserChannel chan uuid.UUID var uniqueCtrWritingToken chan usageToken var counterLocation *time.Location = time.FixedZone("UTC-5", int((time.Hour*-5)/time.Second)) // getCounterPeriod calculates the start and end timestamps for the HLL measurement period that includes the 'at' timestamp. -func getCounterPeriod(which int, at time.Time) (start time.Time, end time.Time) { +func getCounterPeriod(at time.Time) (start time.Time, end time.Time) { year, month, day := at.Date() - - switch which { - case periodDaily: - start = time.Date(year, month, day, 0, 0, 0, 0, counterLocation) - end = time.Date(year, month, day+1, 0, 0, 0, 0, counterLocation) - case periodWeekly: - dayOffset := at.Weekday() - time.Sunday - start = time.Date(year, month, day-int(dayOffset), 0, 0, 0, 0, counterLocation) - end = time.Date(year, month, day-int(dayOffset)+7, 0, 0, 0, 0, counterLocation) - case periodMonthly: - start = time.Date(year, month, 1, 0, 0, 0, 0, counterLocation) - end = time.Date(year, month+1, 1, 0, 0, 0, 0, counterLocation) - } + start = time.Date(year, month, day, 0, 0, 0, 0, counterLocation) + end = time.Date(year, month, day+1, 0, 0, 0, 0, counterLocation) return start, end } // getHLLFilename returns the filename for the saved HLL whose measurement period covers the given time. -func getHLLFilename(which int, at time.Time) string { +func getHLLFilename(at time.Time) string { var filename string - switch which { - case periodDaily: - year, month, day := at.Date() - filename = fmt.Sprintf(usersDailyFmt, day, month, year) - case periodWeekly: - year, week := at.ISOWeek() - filename = fmt.Sprintf(usersWeeklyFmt, week, year) - case periodMonthly: - year, month, _ := at.Date() - filename = fmt.Sprintf(usersMonthlyFmt, month, year) - } + year, month, day := at.Date() + filename = fmt.Sprintf(usersDailyFmt, day, month, year) return fmt.Sprintf("%s/%s", uniqCountDir, filename) } // loadHLL loads a HLL from disk and stores the result in dest.Counter. // If dest.Counter is nil, it will be initialized. (This is a useful side-effect.) // If dest is one of the uniqueCounters, the usageToken must be held. -func loadHLL(which int, at time.Time, dest *PeriodUniqueUsers) error { - fileBytes, err := ioutil.ReadFile(getHLLFilename(which, at)) +func loadHLL(at time.Time, dest *PeriodUniqueUsers) error { + fileBytes, err := ioutil.ReadFile(getHLLFilename(at)) if err != nil { return err } @@ -107,6 +78,7 @@ func loadHLL(which int, at time.Time, dest *PeriodUniqueUsers) error { err = dec.Decode(dest.Counter) if err != nil { log.Panicln(err) + return err } return nil @@ -114,51 +86,42 @@ func loadHLL(which int, at time.Time, dest *PeriodUniqueUsers) error { // writeHLL writes the indicated HLL to disk. // The function takes the usageToken. -func writeHLL(which int) error { +func writeHLL() error { token := <-uniqueCtrWritingToken - result := writeHLL_do(which) + result := writeHLL_do(&uniqueCounter) uniqueCtrWritingToken <- token return result } // writeHLL_do writes out the HLL indicated by `which` to disk. // The usageToken must be held when calling this function. -func writeHLL_do(which int) error { - counter := uniqueCounters[which] - filename := getHLLFilename(which, counter.Start) +func writeHLL_do(hll *PeriodUniqueUsers) (err error) { + filename := getHLLFilename(hll.Start) file, err := os.Create(filename) if err != nil { return err } + + defer func(file io.Closer) { + fileErr := file.Close() + if err == nil { + err = fileErr + } + }(file) + enc := gob.NewEncoder(file) - enc.Encode(counter.Counter) - return file.Close() + return enc.Encode(hll.Counter) } -// readHLL reads the current value of the indicated HLL counter. +// readCurrentHLL reads the current value of the active HLL counter. // The function takes the usageToken. -func readHLL(which int) uint64 { +func readCurrentHLL() uint64 { token := <-uniqueCtrWritingToken - result := uniqueCounters[which].Counter.Count() + result := uniqueCounter.Counter.Count() uniqueCtrWritingToken <- token return result } -// writeAllHLLs writes out all in-memory HLLs to disk. -// The function takes the usageToken. -func writeAllHLLs() error { - var err, err2 error - token := <-uniqueCtrWritingToken - for _, period := range periods { - err2 = writeHLL_do(period) - if err == nil { - err = err2 - } - } - uniqueCtrWritingToken <- token - return err -} - var hllFileServer = http.StripPrefix("/hll", http.FileServer(http.Dir(uniqCountDir))) func HTTPShowHLL(w http.ResponseWriter, r *http.Request) { @@ -166,7 +129,7 @@ func HTTPShowHLL(w http.ResponseWriter, r *http.Request) { } func HTTPWriteHLL(w http.ResponseWriter, r *http.Request) { - writeAllHLLs() + writeHLL() w.WriteHeader(200) w.Write([]byte("ok")) } @@ -181,15 +144,18 @@ func loadUniqueUsers() { } now := time.Now().In(counterLocation) - for _, period := range periods { - uniqueCounters[period].Start, uniqueCounters[period].End = getCounterPeriod(period, now) - err := loadHLL(period, now, &uniqueCounters[period]) - if err != nil && os.IsNotExist(err) { - // errors are bad precisions - uniqueCounters[period].Counter, _ = hyperloglog.NewPlus(CounterPrecision) - } else if err != nil && !os.IsNotExist(err) { - log.Panicln("failed to load unique users data:", err) - } + uniqueCounter.Start, uniqueCounter.End = getCounterPeriod(now) + err = loadHLL(now, &uniqueCounter) + isIgnorableError := err != nil && (false || + (os.IsNotExist(err)) || + (err == io.EOF)) + + if isIgnorableError { + // file didn't finish writing + // errors in NewPlus are bad precisions + uniqueCounter.Counter, _ = hyperloglog.NewPlus(CounterPrecision) + } else if err != nil { + log.Panicln("failed to load unique users data:", err) } uniqueUserChannel = make(chan uuid.UUID) @@ -203,9 +169,7 @@ func loadUniqueUsers() { func dumpUniqueUsers() { token := <-uniqueCtrWritingToken - for _, period := range periods { - uniqueCounters[period].Counter.Clear() - } + uniqueCounter.Counter.Clear() uniqueCtrWritingToken <- token } @@ -220,9 +184,7 @@ func processNewUsers() { select { case u := <-uniqueUserChannel: hashed := UuidHash(u) - for _, period := range periods { - uniqueCounters[period].Counter.Add(hashed) - } + uniqueCounter.Counter.Add(hashed) case uniqueCtrWritingToken <- token: // relinquish token. important that there is only one of this going on // otherwise we thrash @@ -253,29 +215,25 @@ func rolloverCounters_do() { token = <-uniqueCtrWritingToken now = time.Now().In(counterLocation) - for _, period := range periods { - if now.After(uniqueCounters[period].End) { - // Cycle for period - err := writeHLL_do(period) - if err != nil { - log.Println("could not cycle unique user counter:", err) + // Cycle for period + err := writeHLL_do(&uniqueCounter) + if err != nil { + log.Println("could not cycle unique user counter:", err) - // Attempt to rescue the data into the log - var buf bytes.Buffer - bytes, err := uniqueCounters[period].Counter.GobEncode() - if err == nil { - enc := base64.NewEncoder(base64.StdEncoding, &buf) - enc.Write(bytes) - enc.Close() - log.Print("data for ", getHLLFilename(period, now), ":", buf.String()) - } - } - - uniqueCounters[period].Start, uniqueCounters[period].End = getCounterPeriod(period, now) - // errors are bad precisions, so we can ignore - uniqueCounters[period].Counter, _ = hyperloglog.NewPlus(CounterPrecision) + // Attempt to rescue the data into the log + var buf bytes.Buffer + bytes, err := uniqueCounter.Counter.GobEncode() + if err == nil { + enc := base64.NewEncoder(base64.StdEncoding, &buf) + enc.Write(bytes) + enc.Close() + log.Print("data for ", getHLLFilename(uniqueCounter.Start), ":", buf.String()) } } + uniqueCounter.Start, uniqueCounter.End = getCounterPeriod(now) + // errors are bad precisions, so we can ignore + uniqueCounter.Counter, _ = hyperloglog.NewPlus(CounterPrecision) + uniqueCtrWritingToken <- token } diff --git a/socketserver/server/usercount_test.go b/socketserver/server/usercount_test.go index 4b436d49..c6ed2050 100644 --- a/socketserver/server/usercount_test.go +++ b/socketserver/server/usercount_test.go @@ -32,12 +32,10 @@ func TestUniqueConnections(t *testing.T) { uniqueUserChannel <- uuid } - TCheckHLLValue(t, TestExpectedCount, readHLL(periodDaily)) - TCheckHLLValue(t, TestExpectedCount, readHLL(periodWeekly)) - TCheckHLLValue(t, TestExpectedCount, readHLL(periodMonthly)) + TCheckHLLValue(t, TestExpectedCount, readCurrentHLL()) token := <-uniqueCtrWritingToken - uniqueCounters[periodDaily].End = time.Now().In(counterLocation).Add(-1 * time.Second) + uniqueCounter.End = time.Now().In(counterLocation).Add(-1 * time.Second) uniqueCtrWritingToken <- token rolloverCounters_do() @@ -48,17 +46,16 @@ func TestUniqueConnections(t *testing.T) { uniqueUserChannel <- uuid } - TCheckHLLValue(t, TestExpectedCount, readHLL(periodDaily)) - TCheckHLLValue(t, TestExpectedCount*2, readHLL(periodWeekly)) - TCheckHLLValue(t, TestExpectedCount*2, readHLL(periodMonthly)) + TCheckHLLValue(t, TestExpectedCount, readCurrentHLL()) // Check: Merging the two days results in 2000 // note: rolloverCounters_do() wrote out a file, and loadHLL() is reading it back + // TODO need to rewrite some of the test to make this work var loadDest PeriodUniqueUsers - loadHLL(periodDaily, testStart, &loadDest) + loadHLL(testStart, &loadDest) token = <-uniqueCtrWritingToken - loadDest.Counter.Merge(uniqueCounters[periodDaily].Counter) + loadDest.Counter.Merge(uniqueCounter.Counter) uniqueCtrWritingToken <- token TCheckHLLValue(t, TestExpectedCount*2, loadDest.Counter.Count())