mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-08-03 08:28:31 +00:00
Rip out the weekly/monthly HLLs - they're redundant
The weekly data can be constructed much more flexibly by just using the daily data.
This commit is contained in:
parent
9ca9b6b213
commit
e117766eb6
8 changed files with 76 additions and 122 deletions
|
@ -11,7 +11,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var SERVERS = []string{
|
var SERVERS = []string{
|
||||||
"https://catbag.frankerfacez.com",
|
// "https://catbag.frankerfacez.com",
|
||||||
"https://andknuckles.frankerfacez.com",
|
"https://andknuckles.frankerfacez.com",
|
||||||
"https://tuturu.frankerfacez.com",
|
"https://tuturu.frankerfacez.com",
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,7 +120,7 @@ type ErrForwardedFromBackend struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bfe ErrForwardedFromBackend) Error() string {
|
func (bfe ErrForwardedFromBackend) Error() string {
|
||||||
bytes, _ := json.Marshal(bfe)
|
bytes, _ := json.Marshal(bfe.JSONError)
|
||||||
return string(bytes)
|
return string(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -139,7 +139,7 @@ func shutdownHandler() {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
writeAllHLLs()
|
writeHLL()
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
@ -165,10 +165,10 @@ func updateSysMem() {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
Statistics.SysMemTotalKB = memInfo.MemTotal
|
Statistics.SysMemTotalKB = memInfo.MemTotal
|
||||||
Statistics.SysMemFreeKB = memInfo.MemAvailable
|
Statistics.SysMemFreeKB = memInfo.MemAvailable
|
||||||
|
|
||||||
if memInfo.MemAvailable > 0 && memInfo.MemAvailable < Configuration.MinMemoryKBytes {
|
|
||||||
writeAllHLLs()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
writeHLL()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -229,13 +229,11 @@ func TestSubscriptionAndPublish(t *testing.T) {
|
||||||
doneWg.Wait()
|
doneWg.Wait()
|
||||||
server.Close()
|
server.Close()
|
||||||
|
|
||||||
for _, period := range periods {
|
clientCount := readCurrentHLL()
|
||||||
clientCount := readHLL(period)
|
|
||||||
if clientCount < 3 || clientCount > 5 {
|
if clientCount < 3 || clientCount > 5 {
|
||||||
t.Error("clientCount outside acceptable range: expected 4, got ", clientCount)
|
t.Error("clientCount outside acceptable range: expected 4, got ", clientCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func TestRestrictedCommands(t *testing.T) {
|
func TestRestrictedCommands(t *testing.T) {
|
||||||
var doneWg sync.WaitGroup
|
var doneWg sync.WaitGroup
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -272,7 +273,7 @@ func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments in
|
||||||
if deleteMode {
|
if deleteMode {
|
||||||
form.Set("delete", "1")
|
form.Set("delete", "1")
|
||||||
}
|
}
|
||||||
form.Set("time", time.Now().Format(time.UnixDate))
|
form.Set("time", strconv.FormatInt(time.Now().Unix(), 10))
|
||||||
|
|
||||||
sealed, err := SealRequest(form)
|
sealed, err := SealRequest(form)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
|
|
||||||
"github.com/clarkduvall/hyperloglog"
|
"github.com/clarkduvall/hyperloglog"
|
||||||
"github.com/satori/go.uuid"
|
"github.com/satori/go.uuid"
|
||||||
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
// uuidHash implements a hash for uuid.UUID by XORing the random bits.
|
// uuidHash implements a hash for uuid.UUID by XORing the random bits.
|
||||||
|
@ -34,67 +35,37 @@ type PeriodUniqueUsers struct {
|
||||||
|
|
||||||
type usageToken struct{}
|
type usageToken struct{}
|
||||||
|
|
||||||
const (
|
|
||||||
periodDaily = iota
|
|
||||||
periodWeekly
|
|
||||||
periodMonthly
|
|
||||||
)
|
|
||||||
|
|
||||||
var periods [3]int = [3]int{periodDaily, periodWeekly, periodMonthly}
|
|
||||||
|
|
||||||
const uniqCountDir = "./uniques"
|
const uniqCountDir = "./uniques"
|
||||||
const usersDailyFmt = "daily-%d-%d-%d.gob" // d-m-y
|
const usersDailyFmt = "daily-%d-%d-%d.gob" // d-m-y
|
||||||
const usersWeeklyFmt = "weekly-%d-%d.gob" // w-y
|
|
||||||
const usersMonthlyFmt = "monthly-%d-%d.gob" // m-y
|
|
||||||
const CounterPrecision uint8 = 12
|
const CounterPrecision uint8 = 12
|
||||||
|
|
||||||
var uniqueCounters [3]PeriodUniqueUsers
|
var uniqueCounter PeriodUniqueUsers
|
||||||
var uniqueUserChannel chan uuid.UUID
|
var uniqueUserChannel chan uuid.UUID
|
||||||
var uniqueCtrWritingToken chan usageToken
|
var uniqueCtrWritingToken chan usageToken
|
||||||
|
|
||||||
var counterLocation *time.Location = time.FixedZone("UTC-5", int((time.Hour*-5)/time.Second))
|
var counterLocation *time.Location = time.FixedZone("UTC-5", int((time.Hour*-5)/time.Second))
|
||||||
|
|
||||||
// getCounterPeriod calculates the start and end timestamps for the HLL measurement period that includes the 'at' timestamp.
|
// getCounterPeriod calculates the start and end timestamps for the HLL measurement period that includes the 'at' timestamp.
|
||||||
func getCounterPeriod(which int, at time.Time) (start time.Time, end time.Time) {
|
func getCounterPeriod(at time.Time) (start time.Time, end time.Time) {
|
||||||
year, month, day := at.Date()
|
year, month, day := at.Date()
|
||||||
|
|
||||||
switch which {
|
|
||||||
case periodDaily:
|
|
||||||
start = time.Date(year, month, day, 0, 0, 0, 0, counterLocation)
|
start = time.Date(year, month, day, 0, 0, 0, 0, counterLocation)
|
||||||
end = time.Date(year, month, day+1, 0, 0, 0, 0, counterLocation)
|
end = time.Date(year, month, day+1, 0, 0, 0, 0, counterLocation)
|
||||||
case periodWeekly:
|
|
||||||
dayOffset := at.Weekday() - time.Sunday
|
|
||||||
start = time.Date(year, month, day-int(dayOffset), 0, 0, 0, 0, counterLocation)
|
|
||||||
end = time.Date(year, month, day-int(dayOffset)+7, 0, 0, 0, 0, counterLocation)
|
|
||||||
case periodMonthly:
|
|
||||||
start = time.Date(year, month, 1, 0, 0, 0, 0, counterLocation)
|
|
||||||
end = time.Date(year, month+1, 1, 0, 0, 0, 0, counterLocation)
|
|
||||||
}
|
|
||||||
return start, end
|
return start, end
|
||||||
}
|
}
|
||||||
|
|
||||||
// getHLLFilename returns the filename for the saved HLL whose measurement period covers the given time.
|
// getHLLFilename returns the filename for the saved HLL whose measurement period covers the given time.
|
||||||
func getHLLFilename(which int, at time.Time) string {
|
func getHLLFilename(at time.Time) string {
|
||||||
var filename string
|
var filename string
|
||||||
switch which {
|
|
||||||
case periodDaily:
|
|
||||||
year, month, day := at.Date()
|
year, month, day := at.Date()
|
||||||
filename = fmt.Sprintf(usersDailyFmt, day, month, year)
|
filename = fmt.Sprintf(usersDailyFmt, day, month, year)
|
||||||
case periodWeekly:
|
|
||||||
year, week := at.ISOWeek()
|
|
||||||
filename = fmt.Sprintf(usersWeeklyFmt, week, year)
|
|
||||||
case periodMonthly:
|
|
||||||
year, month, _ := at.Date()
|
|
||||||
filename = fmt.Sprintf(usersMonthlyFmt, month, year)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%s/%s", uniqCountDir, filename)
|
return fmt.Sprintf("%s/%s", uniqCountDir, filename)
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadHLL loads a HLL from disk and stores the result in dest.Counter.
|
// loadHLL loads a HLL from disk and stores the result in dest.Counter.
|
||||||
// If dest.Counter is nil, it will be initialized. (This is a useful side-effect.)
|
// If dest.Counter is nil, it will be initialized. (This is a useful side-effect.)
|
||||||
// If dest is one of the uniqueCounters, the usageToken must be held.
|
// If dest is one of the uniqueCounters, the usageToken must be held.
|
||||||
func loadHLL(which int, at time.Time, dest *PeriodUniqueUsers) error {
|
func loadHLL(at time.Time, dest *PeriodUniqueUsers) error {
|
||||||
fileBytes, err := ioutil.ReadFile(getHLLFilename(which, at))
|
fileBytes, err := ioutil.ReadFile(getHLLFilename(at))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -107,6 +78,7 @@ func loadHLL(which int, at time.Time, dest *PeriodUniqueUsers) error {
|
||||||
err = dec.Decode(dest.Counter)
|
err = dec.Decode(dest.Counter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicln(err)
|
log.Panicln(err)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -114,51 +86,42 @@ func loadHLL(which int, at time.Time, dest *PeriodUniqueUsers) error {
|
||||||
|
|
||||||
// writeHLL writes the indicated HLL to disk.
|
// writeHLL writes the indicated HLL to disk.
|
||||||
// The function takes the usageToken.
|
// The function takes the usageToken.
|
||||||
func writeHLL(which int) error {
|
func writeHLL() error {
|
||||||
token := <-uniqueCtrWritingToken
|
token := <-uniqueCtrWritingToken
|
||||||
result := writeHLL_do(which)
|
result := writeHLL_do(&uniqueCounter)
|
||||||
uniqueCtrWritingToken <- token
|
uniqueCtrWritingToken <- token
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeHLL_do writes out the HLL indicated by `which` to disk.
|
// writeHLL_do writes out the HLL indicated by `which` to disk.
|
||||||
// The usageToken must be held when calling this function.
|
// The usageToken must be held when calling this function.
|
||||||
func writeHLL_do(which int) error {
|
func writeHLL_do(hll *PeriodUniqueUsers) (err error) {
|
||||||
counter := uniqueCounters[which]
|
filename := getHLLFilename(hll.Start)
|
||||||
filename := getHLLFilename(which, counter.Start)
|
|
||||||
file, err := os.Create(filename)
|
file, err := os.Create(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func(file io.Closer) {
|
||||||
|
fileErr := file.Close()
|
||||||
|
if err == nil {
|
||||||
|
err = fileErr
|
||||||
|
}
|
||||||
|
}(file)
|
||||||
|
|
||||||
enc := gob.NewEncoder(file)
|
enc := gob.NewEncoder(file)
|
||||||
enc.Encode(counter.Counter)
|
return enc.Encode(hll.Counter)
|
||||||
return file.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// readHLL reads the current value of the indicated HLL counter.
|
// readCurrentHLL reads the current value of the active HLL counter.
|
||||||
// The function takes the usageToken.
|
// The function takes the usageToken.
|
||||||
func readHLL(which int) uint64 {
|
func readCurrentHLL() uint64 {
|
||||||
token := <-uniqueCtrWritingToken
|
token := <-uniqueCtrWritingToken
|
||||||
result := uniqueCounters[which].Counter.Count()
|
result := uniqueCounter.Counter.Count()
|
||||||
uniqueCtrWritingToken <- token
|
uniqueCtrWritingToken <- token
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeAllHLLs writes out all in-memory HLLs to disk.
|
|
||||||
// The function takes the usageToken.
|
|
||||||
func writeAllHLLs() error {
|
|
||||||
var err, err2 error
|
|
||||||
token := <-uniqueCtrWritingToken
|
|
||||||
for _, period := range periods {
|
|
||||||
err2 = writeHLL_do(period)
|
|
||||||
if err == nil {
|
|
||||||
err = err2
|
|
||||||
}
|
|
||||||
}
|
|
||||||
uniqueCtrWritingToken <- token
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var hllFileServer = http.StripPrefix("/hll", http.FileServer(http.Dir(uniqCountDir)))
|
var hllFileServer = http.StripPrefix("/hll", http.FileServer(http.Dir(uniqCountDir)))
|
||||||
|
|
||||||
func HTTPShowHLL(w http.ResponseWriter, r *http.Request) {
|
func HTTPShowHLL(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -166,7 +129,7 @@ func HTTPShowHLL(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func HTTPWriteHLL(w http.ResponseWriter, r *http.Request) {
|
func HTTPWriteHLL(w http.ResponseWriter, r *http.Request) {
|
||||||
writeAllHLLs()
|
writeHLL()
|
||||||
w.WriteHeader(200)
|
w.WriteHeader(200)
|
||||||
w.Write([]byte("ok"))
|
w.Write([]byte("ok"))
|
||||||
}
|
}
|
||||||
|
@ -181,16 +144,19 @@ func loadUniqueUsers() {
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now().In(counterLocation)
|
now := time.Now().In(counterLocation)
|
||||||
for _, period := range periods {
|
uniqueCounter.Start, uniqueCounter.End = getCounterPeriod(now)
|
||||||
uniqueCounters[period].Start, uniqueCounters[period].End = getCounterPeriod(period, now)
|
err = loadHLL(now, &uniqueCounter)
|
||||||
err := loadHLL(period, now, &uniqueCounters[period])
|
isIgnorableError := err != nil && (false ||
|
||||||
if err != nil && os.IsNotExist(err) {
|
(os.IsNotExist(err)) ||
|
||||||
// errors are bad precisions
|
(err == io.EOF))
|
||||||
uniqueCounters[period].Counter, _ = hyperloglog.NewPlus(CounterPrecision)
|
|
||||||
} else if err != nil && !os.IsNotExist(err) {
|
if isIgnorableError {
|
||||||
|
// file didn't finish writing
|
||||||
|
// errors in NewPlus are bad precisions
|
||||||
|
uniqueCounter.Counter, _ = hyperloglog.NewPlus(CounterPrecision)
|
||||||
|
} else if err != nil {
|
||||||
log.Panicln("failed to load unique users data:", err)
|
log.Panicln("failed to load unique users data:", err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
uniqueUserChannel = make(chan uuid.UUID)
|
uniqueUserChannel = make(chan uuid.UUID)
|
||||||
uniqueCtrWritingToken = make(chan usageToken)
|
uniqueCtrWritingToken = make(chan usageToken)
|
||||||
|
@ -203,9 +169,7 @@ func loadUniqueUsers() {
|
||||||
func dumpUniqueUsers() {
|
func dumpUniqueUsers() {
|
||||||
token := <-uniqueCtrWritingToken
|
token := <-uniqueCtrWritingToken
|
||||||
|
|
||||||
for _, period := range periods {
|
uniqueCounter.Counter.Clear()
|
||||||
uniqueCounters[period].Counter.Clear()
|
|
||||||
}
|
|
||||||
|
|
||||||
uniqueCtrWritingToken <- token
|
uniqueCtrWritingToken <- token
|
||||||
}
|
}
|
||||||
|
@ -220,9 +184,7 @@ func processNewUsers() {
|
||||||
select {
|
select {
|
||||||
case u := <-uniqueUserChannel:
|
case u := <-uniqueUserChannel:
|
||||||
hashed := UuidHash(u)
|
hashed := UuidHash(u)
|
||||||
for _, period := range periods {
|
uniqueCounter.Counter.Add(hashed)
|
||||||
uniqueCounters[period].Counter.Add(hashed)
|
|
||||||
}
|
|
||||||
case uniqueCtrWritingToken <- token:
|
case uniqueCtrWritingToken <- token:
|
||||||
// relinquish token. important that there is only one of this going on
|
// relinquish token. important that there is only one of this going on
|
||||||
// otherwise we thrash
|
// otherwise we thrash
|
||||||
|
@ -253,29 +215,25 @@ func rolloverCounters_do() {
|
||||||
|
|
||||||
token = <-uniqueCtrWritingToken
|
token = <-uniqueCtrWritingToken
|
||||||
now = time.Now().In(counterLocation)
|
now = time.Now().In(counterLocation)
|
||||||
for _, period := range periods {
|
|
||||||
if now.After(uniqueCounters[period].End) {
|
|
||||||
// Cycle for period
|
// Cycle for period
|
||||||
err := writeHLL_do(period)
|
err := writeHLL_do(&uniqueCounter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("could not cycle unique user counter:", err)
|
log.Println("could not cycle unique user counter:", err)
|
||||||
|
|
||||||
// Attempt to rescue the data into the log
|
// Attempt to rescue the data into the log
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
bytes, err := uniqueCounters[period].Counter.GobEncode()
|
bytes, err := uniqueCounter.Counter.GobEncode()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
enc := base64.NewEncoder(base64.StdEncoding, &buf)
|
enc := base64.NewEncoder(base64.StdEncoding, &buf)
|
||||||
enc.Write(bytes)
|
enc.Write(bytes)
|
||||||
enc.Close()
|
enc.Close()
|
||||||
log.Print("data for ", getHLLFilename(period, now), ":", buf.String())
|
log.Print("data for ", getHLLFilename(uniqueCounter.Start), ":", buf.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uniqueCounters[period].Start, uniqueCounters[period].End = getCounterPeriod(period, now)
|
uniqueCounter.Start, uniqueCounter.End = getCounterPeriod(now)
|
||||||
// errors are bad precisions, so we can ignore
|
// errors are bad precisions, so we can ignore
|
||||||
uniqueCounters[period].Counter, _ = hyperloglog.NewPlus(CounterPrecision)
|
uniqueCounter.Counter, _ = hyperloglog.NewPlus(CounterPrecision)
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
uniqueCtrWritingToken <- token
|
uniqueCtrWritingToken <- token
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,12 +32,10 @@ func TestUniqueConnections(t *testing.T) {
|
||||||
uniqueUserChannel <- uuid
|
uniqueUserChannel <- uuid
|
||||||
}
|
}
|
||||||
|
|
||||||
TCheckHLLValue(t, TestExpectedCount, readHLL(periodDaily))
|
TCheckHLLValue(t, TestExpectedCount, readCurrentHLL())
|
||||||
TCheckHLLValue(t, TestExpectedCount, readHLL(periodWeekly))
|
|
||||||
TCheckHLLValue(t, TestExpectedCount, readHLL(periodMonthly))
|
|
||||||
|
|
||||||
token := <-uniqueCtrWritingToken
|
token := <-uniqueCtrWritingToken
|
||||||
uniqueCounters[periodDaily].End = time.Now().In(counterLocation).Add(-1 * time.Second)
|
uniqueCounter.End = time.Now().In(counterLocation).Add(-1 * time.Second)
|
||||||
uniqueCtrWritingToken <- token
|
uniqueCtrWritingToken <- token
|
||||||
|
|
||||||
rolloverCounters_do()
|
rolloverCounters_do()
|
||||||
|
@ -48,17 +46,16 @@ func TestUniqueConnections(t *testing.T) {
|
||||||
uniqueUserChannel <- uuid
|
uniqueUserChannel <- uuid
|
||||||
}
|
}
|
||||||
|
|
||||||
TCheckHLLValue(t, TestExpectedCount, readHLL(periodDaily))
|
TCheckHLLValue(t, TestExpectedCount, readCurrentHLL())
|
||||||
TCheckHLLValue(t, TestExpectedCount*2, readHLL(periodWeekly))
|
|
||||||
TCheckHLLValue(t, TestExpectedCount*2, readHLL(periodMonthly))
|
|
||||||
|
|
||||||
// Check: Merging the two days results in 2000
|
// Check: Merging the two days results in 2000
|
||||||
// note: rolloverCounters_do() wrote out a file, and loadHLL() is reading it back
|
// note: rolloverCounters_do() wrote out a file, and loadHLL() is reading it back
|
||||||
|
// TODO need to rewrite some of the test to make this work
|
||||||
var loadDest PeriodUniqueUsers
|
var loadDest PeriodUniqueUsers
|
||||||
loadHLL(periodDaily, testStart, &loadDest)
|
loadHLL(testStart, &loadDest)
|
||||||
|
|
||||||
token = <-uniqueCtrWritingToken
|
token = <-uniqueCtrWritingToken
|
||||||
loadDest.Counter.Merge(uniqueCounters[periodDaily].Counter)
|
loadDest.Counter.Merge(uniqueCounter.Counter)
|
||||||
uniqueCtrWritingToken <- token
|
uniqueCtrWritingToken <- token
|
||||||
|
|
||||||
TCheckHLLValue(t, TestExpectedCount*2, loadDest.Counter.Count())
|
TCheckHLLValue(t, TestExpectedCount*2, loadDest.Counter.Count())
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue