mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-06-27 21:05:53 +00:00
237 lines
6.2 KiB
Go
237 lines
6.2 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/base64"
|
|
"encoding/binary"
|
|
"encoding/gob"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
"io"
|
|
|
|
"github.com/clarkduvall/hyperloglog"
|
|
"github.com/satori/go.uuid"
|
|
)
|
|
|
|
// UuidHash implements a hash for uuid.UUID by XORing the random bits.
|
|
type UuidHash uuid.UUID
|
|
|
|
func (u UuidHash) Sum64() uint64 {
|
|
var valLow, valHigh uint64
|
|
valLow = binary.LittleEndian.Uint64(u[0:8])
|
|
valHigh = binary.LittleEndian.Uint64(u[8:16])
|
|
return valLow ^ valHigh
|
|
}
|
|
|
|
type PeriodUniqueUsers struct {
|
|
Start time.Time
|
|
End time.Time
|
|
Counter *hyperloglog.HyperLogLogPlus
|
|
}
|
|
|
|
type usageToken struct{}
|
|
|
|
const uniqCountDir = "./uniques"
|
|
const UsersDailyFmt = "daily-%d-%d-%d.gob" // d-m-y
|
|
const CounterPrecision uint8 = 12
|
|
|
|
var uniqueCounter PeriodUniqueUsers
|
|
var uniqueUserChannel chan uuid.UUID
|
|
var uniqueCtrWritingToken chan usageToken
|
|
|
|
var CounterLocation *time.Location = time.FixedZone("UTC-5", int((time.Hour*-5)/time.Second))
|
|
|
|
// GetCounterPeriod calculates the start and end timestamps for the HLL measurement period that includes the 'at' timestamp.
|
|
func GetCounterPeriod(at time.Time) (start time.Time, end time.Time) {
|
|
year, month, day := at.Date()
|
|
start = time.Date(year, month, day, 0, 0, 0, 0, CounterLocation)
|
|
end = time.Date(year, month, day+1, 0, 0, 0, 0, CounterLocation)
|
|
return start, end
|
|
}
|
|
|
|
// GetHLLFilename returns the filename for the saved HLL whose measurement period covers the given time.
|
|
func GetHLLFilename(at time.Time) string {
|
|
var filename string
|
|
year, month, day := at.Date()
|
|
filename = fmt.Sprintf(UsersDailyFmt, day, month, year)
|
|
return fmt.Sprintf("%s/%s", uniqCountDir, filename)
|
|
}
|
|
|
|
// loadHLL loads a HLL from disk and stores the result in dest.Counter.
|
|
// If dest.Counter is nil, it will be initialized. (This is a useful side-effect.)
|
|
// If dest is one of the uniqueCounters, the usageToken must be held.
|
|
func loadHLL(at time.Time, dest *PeriodUniqueUsers) error {
|
|
fileBytes, err := ioutil.ReadFile(GetHLLFilename(at))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if dest.Counter == nil {
|
|
dest.Counter, _ = hyperloglog.NewPlus(CounterPrecision)
|
|
}
|
|
|
|
dec := gob.NewDecoder(bytes.NewReader(fileBytes))
|
|
err = dec.Decode(dest.Counter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// writeHLL writes the indicated HLL to disk.
|
|
// The function takes the usageToken.
|
|
func writeHLL() error {
|
|
token := <-uniqueCtrWritingToken
|
|
result := writeHLL_do(&uniqueCounter)
|
|
uniqueCtrWritingToken <- token
|
|
return result
|
|
}
|
|
|
|
// writeHLL_do writes out the HLL indicated by `which` to disk.
|
|
// The usageToken must be held when calling this function.
|
|
func writeHLL_do(hll *PeriodUniqueUsers) (err error) {
|
|
filename := GetHLLFilename(hll.Start)
|
|
file, err := os.Create(filename)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func(file io.Closer) {
|
|
fileErr := file.Close()
|
|
if err == nil {
|
|
err = fileErr
|
|
}
|
|
}(file)
|
|
|
|
enc := gob.NewEncoder(file)
|
|
return enc.Encode(hll.Counter)
|
|
}
|
|
|
|
// readCurrentHLL reads the current value of the active HLL counter.
|
|
// The function takes the usageToken.
|
|
func readCurrentHLL() uint64 {
|
|
token := <-uniqueCtrWritingToken
|
|
result := uniqueCounter.Counter.Count()
|
|
uniqueCtrWritingToken <- token
|
|
return result
|
|
}
|
|
|
|
var hllFileServer = http.StripPrefix("/hll", http.FileServer(http.Dir(uniqCountDir)))
|
|
|
|
func HTTPShowHLL(w http.ResponseWriter, r *http.Request) {
|
|
hllFileServer.ServeHTTP(w, r)
|
|
}
|
|
|
|
func HTTPWriteHLL(w http.ResponseWriter, _ *http.Request) {
|
|
writeHLL()
|
|
w.WriteHeader(200)
|
|
w.Write([]byte("ok"))
|
|
}
|
|
|
|
// loadUniqueUsers loads the previous HLLs into memory.
|
|
// is_init_func
|
|
func loadUniqueUsers() {
|
|
gob.RegisterName("hyperloglog", hyperloglog.HyperLogLogPlus{})
|
|
err := os.MkdirAll(uniqCountDir, 0755)
|
|
if err != nil {
|
|
log.Panicln("could not make unique users data dir:", err)
|
|
}
|
|
|
|
now := time.Now().In(CounterLocation)
|
|
uniqueCounter.Start, uniqueCounter.End = GetCounterPeriod(now)
|
|
err = loadHLL(now, &uniqueCounter)
|
|
isIgnorableError := err != nil && (os.IsNotExist(err) || err == io.EOF)
|
|
|
|
if isIgnorableError {
|
|
// file didn't finish writing
|
|
// errors in NewPlus are bad precisions
|
|
uniqueCounter.Counter, _ = hyperloglog.NewPlus(CounterPrecision)
|
|
log.Println("failed to load unique users data:", err)
|
|
} else if err != nil {
|
|
log.Panicln("failed to load unique users data:", err)
|
|
}
|
|
|
|
uniqueUserChannel = make(chan uuid.UUID)
|
|
uniqueCtrWritingToken = make(chan usageToken)
|
|
go processNewUsers()
|
|
go rolloverCounters()
|
|
uniqueCtrWritingToken <- usageToken{}
|
|
}
|
|
|
|
// dumpUniqueUsers dumps all the data in uniqueCounters.
|
|
func dumpUniqueUsers() {
|
|
token := <-uniqueCtrWritingToken
|
|
|
|
uniqueCounter.Counter.Clear()
|
|
|
|
uniqueCtrWritingToken <- token
|
|
}
|
|
|
|
// processNewUsers reads uniqueUserChannel, and also dispatches the writing token.
|
|
// This function is the primary writer of uniqueCounters, so it makes sense for it to hold the token.
|
|
// is_init_func
|
|
func processNewUsers() {
|
|
token := <-uniqueCtrWritingToken
|
|
|
|
for {
|
|
select {
|
|
case u := <-uniqueUserChannel:
|
|
hashed := UuidHash(u)
|
|
uniqueCounter.Counter.Add(hashed)
|
|
case uniqueCtrWritingToken <- token:
|
|
// relinquish token. important that there is only one of this going on
|
|
// otherwise we thrash
|
|
token = <-uniqueCtrWritingToken
|
|
}
|
|
}
|
|
}
|
|
|
|
func getNextMidnight() time.Time {
|
|
now := time.Now().In(CounterLocation)
|
|
year, month, day := now.Date()
|
|
return time.Date(year, month, day+1, 0, 0, 1, 0, CounterLocation)
|
|
}
|
|
|
|
// is_init_func
|
|
func rolloverCounters() {
|
|
for {
|
|
duration := getNextMidnight().Sub(time.Now())
|
|
// fmt.Println(duration)
|
|
time.Sleep(duration)
|
|
rolloverCounters_do()
|
|
}
|
|
}
|
|
|
|
func rolloverCounters_do() {
|
|
var token usageToken
|
|
var now time.Time
|
|
|
|
token = <-uniqueCtrWritingToken
|
|
now = time.Now().In(CounterLocation)
|
|
// Cycle for period
|
|
err := writeHLL_do(&uniqueCounter)
|
|
if err != nil {
|
|
log.Println("could not cycle unique user counter:", err)
|
|
|
|
// Attempt to rescue the data into the log
|
|
var buf bytes.Buffer
|
|
by, err := uniqueCounter.Counter.GobEncode()
|
|
if err == nil {
|
|
enc := base64.NewEncoder(base64.StdEncoding, &buf)
|
|
enc.Write(by)
|
|
enc.Close()
|
|
log.Print("data for ", GetHLLFilename(uniqueCounter.Start), ":", buf.String())
|
|
}
|
|
}
|
|
|
|
uniqueCounter.Start, uniqueCounter.End = GetCounterPeriod(now)
|
|
// errors are bad precisions, so we can ignore
|
|
uniqueCounter.Counter, _ = hyperloglog.NewPlus(CounterPrecision)
|
|
|
|
uniqueCtrWritingToken <- token
|
|
}
|