mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-08-04 00:48:32 +00:00
implement addRange, add pprof to server
This commit is contained in:
parent
aca50d9de5
commit
1023b6ed11
6 changed files with 362 additions and 36 deletions
|
@ -11,6 +11,8 @@ import (
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
import _ "net/http/pprof"
|
||||||
|
|
||||||
var configFilename = flag.String("config", "config.json", "Configuration file, including the keypairs for the NaCl crypto library, for communicating with the backend.")
|
var configFilename = flag.String("config", "config.json", "Configuration file, including the keypairs for the NaCl crypto library, for communicating with the backend.")
|
||||||
var flagGenerateKeys = flag.Bool("genkeys", false, "Generate NaCl keys instead of serving requests.\nArguments: [int serverId] [base64 backendPublic]\nThe backend public key can either be specified in base64 on the command line, or put in the json file later.")
|
var flagGenerateKeys = flag.Bool("genkeys", false, "Generate NaCl keys instead of serving requests.\nArguments: [int serverId] [base64 backendPublic]\nThe backend public key can either be specified in base64 on the command line, or put in the json file later.")
|
||||||
|
|
||||||
|
|
|
@ -18,11 +18,11 @@ func makeConfig() {
|
||||||
if ok {
|
if ok {
|
||||||
config.DatabaseLocation = fmt.Sprintf("%s/.ffzstatsweb/database.sqlite", home)
|
config.DatabaseLocation = fmt.Sprintf("%s/.ffzstatsweb/database.sqlite", home)
|
||||||
config.GobFilesLocation = fmt.Sprintf("%s/.ffzstatsweb/gobcache", home)
|
config.GobFilesLocation = fmt.Sprintf("%s/.ffzstatsweb/gobcache", home)
|
||||||
os.MkdirAll(config.GobFilesLocation, 0644)
|
os.MkdirAll(config.GobFilesLocation, 0755)
|
||||||
} else {
|
} else {
|
||||||
config.DatabaseLocation = "./database.sqlite"
|
config.DatabaseLocation = "./database.sqlite"
|
||||||
config.GobFilesLocation = "./gobcache"
|
config.GobFilesLocation = "./gobcache"
|
||||||
os.MkdirAll(config.GobFilesLocation, 0644)
|
os.MkdirAll(config.GobFilesLocation, 0755)
|
||||||
}
|
}
|
||||||
file, err := os.Create(*configLocation)
|
file, err := os.Create(*configLocation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -33,8 +33,9 @@ type CalendarMonthInfo struct {
|
||||||
|
|
||||||
func GetMonthInfo(at time.Time) CalendarMonthInfo {
|
func GetMonthInfo(at time.Time) CalendarMonthInfo {
|
||||||
year, month, _ := at.Date()
|
year, month, _ := at.Date()
|
||||||
// 1 (start of month) - weekday of start of month = day offset of start of week at start of month
|
monthStartWeekday := time.Date(year, month, 1, 0, 0, 0, 0, server.CounterLocation).Weekday()
|
||||||
monthWeekStartDay := 1 - time.Date(year, month, 1, 0, 0, 0, 0, server.CounterLocation).Weekday()
|
// 1 (start of month) - weekday of start of month = day offset of start of week at start of mont
|
||||||
|
monthWeekStartDay := 1 - int(monthStartWeekday)
|
||||||
// first day on calendar + 6 weeks < end of month?
|
// first day on calendar + 6 weeks < end of month?
|
||||||
sixthSundayDay := monthWeekStartDay + 5*7
|
sixthSundayDay := monthWeekStartDay + 5*7
|
||||||
sixthSundayDate := time.Date(year, month, sixthSundayDay, 0, 0, 0, 0, server.CounterLocation)
|
sixthSundayDate := time.Date(year, month, sixthSundayDay, 0, 0, 0, 0, server.CounterLocation)
|
||||||
|
@ -55,5 +56,6 @@ func renderCalendar(w http.ResponseWriter, at time.Time) {
|
||||||
layout, err := template.ParseFiles("./webroot/layout.template.html", "./webroot/cal_entry.hbs", "./webroot/calendar.hbs")
|
layout, err := template.ParseFiles("./webroot/layout.template.html", "./webroot/cal_entry.hbs", "./webroot/calendar.hbs")
|
||||||
data := CalendarData{}
|
data := CalendarData{}
|
||||||
data.Weeks = make([]CalWeekData, 6)
|
data.Weeks = make([]CalWeekData, 6)
|
||||||
|
_ = layout
|
||||||
|
_ = err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,17 +1,32 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
"io"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"net/http"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"github.com/hashicorp/golang-lru"
|
||||||
|
"github.com/clarkduvall/hyperloglog"
|
||||||
|
"bitbucket.org/stendec/frankerfacez/socketserver/server"
|
||||||
|
"encoding/gob"
|
||||||
|
)
|
||||||
|
|
||||||
type serverFilter struct {
|
type serverFilter struct {
|
||||||
// Mode is false for blacklist, true for whitelist
|
// Mode is false for blacklist, true for whitelist
|
||||||
Mode bool
|
Mode bool
|
||||||
Special string[]
|
Special []string
|
||||||
}
|
}
|
||||||
|
|
||||||
const serverFilterModeBlacklist = false
|
const serverFilterModeBlacklist = false
|
||||||
const serverFilterModeWhitelist = true
|
const serverFilterModeWhitelist = true
|
||||||
|
|
||||||
func (sf *serverFilter) IsServerAllowed(server string) {
|
func (sf *serverFilter) IsServerAllowed(server *serverInfo) bool {
|
||||||
|
name := server.subdomain
|
||||||
for _, v := range sf.Special {
|
for _, v := range sf.Special {
|
||||||
if server == v {
|
if name == v {
|
||||||
return sf.Mode
|
return sf.Mode
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,7 +43,7 @@ func (sf *serverFilter) Remove(server string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if idx != -1 {
|
if idx != -1 {
|
||||||
var lenMinusOne = len(sf.Special)-1
|
var lenMinusOne = len(sf.Special) - 1
|
||||||
sf.Special[idx] = sf.Special[lenMinusOne]
|
sf.Special[idx] = sf.Special[lenMinusOne]
|
||||||
sf.Special = sf.Special[:lenMinusOne]
|
sf.Special = sf.Special[:lenMinusOne]
|
||||||
}
|
}
|
||||||
|
@ -52,7 +67,7 @@ func (sf *serverFilter) Add(server string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if idx != -1 {
|
if idx != -1 {
|
||||||
var lenMinusOne = len(sf.Special)-1
|
var lenMinusOne = len(sf.Special) - 1
|
||||||
sf.Special[idx] = sf.Special[lenMinusOne]
|
sf.Special[idx] = sf.Special[lenMinusOne]
|
||||||
sf.Special = sf.Special[:lenMinusOne]
|
sf.Special = sf.Special[:lenMinusOne]
|
||||||
}
|
}
|
||||||
|
@ -66,5 +81,250 @@ func (sf *serverFilter) Add(server string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const serverFilterAll serverFilter = serverFilter{Mode: serverFilterModeBlacklist}
|
var serverFilterAll serverFilter = serverFilter{Mode: serverFilterModeBlacklist}
|
||||||
const serverFilterNone serverFilter = serverFilter{Mode: serverFilterModeWhitelist}
|
var serverFilterNone serverFilter = serverFilter{Mode: serverFilterModeWhitelist}
|
||||||
|
|
||||||
|
func cannotCacheHLL(at time.Time) bool {
|
||||||
|
now := time.Now()
|
||||||
|
now.Add(-25 * time.Hour)
|
||||||
|
return now.Before(at)
|
||||||
|
}
|
||||||
|
|
||||||
|
var ServerNames = []string{
|
||||||
|
"catbag",
|
||||||
|
"andknuckles",
|
||||||
|
"tuturu",
|
||||||
|
}
|
||||||
|
|
||||||
|
var httpClient http.Client
|
||||||
|
|
||||||
|
const serverNameSuffix = ".frankerfacez.com"
|
||||||
|
|
||||||
|
const failedStateThreshold = 4
|
||||||
|
|
||||||
|
var ErrServerInFailedState = errors.New("server has been down recently and not recovered")
|
||||||
|
var ErrServerHasNoData = errors.New("no data for specified date")
|
||||||
|
|
||||||
|
type errServerNot200 struct {
|
||||||
|
StatusCode int
|
||||||
|
StatusText string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *errServerNot200) Error() string {
|
||||||
|
return fmt.Sprintf("The server responded with %d %s", e.StatusCode, e.StatusText)
|
||||||
|
}
|
||||||
|
func Not200Error(resp *http.Response) *errServerNot200 {
|
||||||
|
return &errServerNot200{
|
||||||
|
StatusCode: resp.StatusCode,
|
||||||
|
StatusText: resp.Status,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getHLLCacheKey(at time.Time) string {
|
||||||
|
year, month, day := at.Date()
|
||||||
|
return fmt.Sprintf("%d-%d-%d", year, month, day)
|
||||||
|
}
|
||||||
|
|
||||||
|
type serverInfo struct {
|
||||||
|
subdomain string
|
||||||
|
|
||||||
|
memcache *lru.TwoQueueCache
|
||||||
|
|
||||||
|
FailedState bool
|
||||||
|
FailureErr error
|
||||||
|
failureCount int
|
||||||
|
|
||||||
|
lock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (si *serverInfo) Setup(subdomain string) {
|
||||||
|
si.subdomain = subdomain
|
||||||
|
tq, err := lru.New2Q(60)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
si.memcache = tq
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetHLL gets the HLL from
|
||||||
|
func (si *serverInfo) GetHLL(at time.Time) (*hyperloglog.HyperLogLogPlus, error) {
|
||||||
|
if cannotCacheHLL(at) {
|
||||||
|
err := si.ForceWrite()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
reader, err := si.DownloadHLL(at)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
fmt.Printf("downloaded hll %s:%s\n", si.subdomain, getHLLCacheKey(at))
|
||||||
|
return loadHLLFromStream(reader)
|
||||||
|
}
|
||||||
|
|
||||||
|
hll, ok := si.PeekHLL(at)
|
||||||
|
if ok {
|
||||||
|
fmt.Printf("got cached hll %s:%s\n", si.subdomain, getHLLCacheKey(at))
|
||||||
|
return hll, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := si.OpenHLL(at)
|
||||||
|
if err != nil {
|
||||||
|
// continue to download
|
||||||
|
} else {
|
||||||
|
fmt.Printf("opened hll %s:%s\n", si.subdomain, getHLLCacheKey(at))
|
||||||
|
return loadHLLFromStream(reader)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err = si.DownloadHLL(at)
|
||||||
|
if err != nil {
|
||||||
|
if err == ErrServerHasNoData {
|
||||||
|
return hyperloglog.NewPlus(server.CounterPrecision)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
fmt.Printf("downloaded hll %s:%s\n", si.subdomain, getHLLCacheKey(at))
|
||||||
|
return loadHLLFromStream(reader)
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadHLLFromStream(reader io.ReadCloser) (*hyperloglog.HyperLogLogPlus, error) {
|
||||||
|
defer reader.Close()
|
||||||
|
hll, _ := hyperloglog.NewPlus(server.CounterPrecision)
|
||||||
|
dec := gob.NewDecoder(reader)
|
||||||
|
err := dec.Decode(hll)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return hll, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PeekHLL tries to grab a HLL from the memcache without downloading it or hitting the disk.
|
||||||
|
func (si *serverInfo) PeekHLL(at time.Time) (*hyperloglog.HyperLogLogPlus, bool) {
|
||||||
|
if cannotCacheHLL(at) {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
key := getHLLCacheKey(at)
|
||||||
|
hll, ok := si.memcache.Get(key)
|
||||||
|
if ok {
|
||||||
|
return hll.(*hyperloglog.HyperLogLogPlus), true
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (si *serverInfo) OpenHLL(at time.Time) (io.ReadCloser, error) {
|
||||||
|
year, month, day := at.Date()
|
||||||
|
filename := fmt.Sprintf("%s/%s/%d-%d-%d.gob", config.GobFilesLocation, si.subdomain, year, month, day)
|
||||||
|
|
||||||
|
file, err := os.Open(filename)
|
||||||
|
if err == nil {
|
||||||
|
return file, nil
|
||||||
|
}
|
||||||
|
// file is nil
|
||||||
|
if !os.IsNotExist(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, os.ErrNotExist
|
||||||
|
}
|
||||||
|
|
||||||
|
func (si *serverInfo) DownloadHLL(at time.Time) (io.ReadCloser, error) {
|
||||||
|
if si.FailedState {
|
||||||
|
return nil, ErrServerInFailedState
|
||||||
|
}
|
||||||
|
si.lock.Lock()
|
||||||
|
defer si.lock.Unlock()
|
||||||
|
|
||||||
|
year, month, day := at.Date()
|
||||||
|
url := fmt.Sprintf("https://%s/hll/daily-%d-%d-%d.gob", si.Domain(), day, month, year)
|
||||||
|
resp, err := httpClient.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
si.ServerFailed(err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if resp.StatusCode == 404 {
|
||||||
|
return nil, ErrServerHasNoData
|
||||||
|
}
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
err = Not200Error(resp)
|
||||||
|
si.ServerFailed(err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
filename := fmt.Sprintf("%s/%s/%d-%d-%d.gob", config.GobFilesLocation, si.subdomain, year, month, day)
|
||||||
|
file, err := os.OpenFile(filename, os.O_CREATE | os.O_EXCL | os.O_RDWR, 0644)
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
os.MkdirAll(fmt.Sprintf("%s/%s", config.GobFilesLocation, si.subdomain), 0755)
|
||||||
|
file, err = os.OpenFile(filename, os.O_CREATE | os.O_EXCL | os.O_RDWR, 0644)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil, fmt.Errorf("downloadhll: error opening file for writing: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &teeReadCloser{r: resp.Body, w: file}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (si *serverInfo) ForceWrite() error {
|
||||||
|
if si.FailedState {
|
||||||
|
return ErrServerInFailedState
|
||||||
|
}
|
||||||
|
|
||||||
|
url := fmt.Sprintf("https://%s/hll_force_write", si.Domain())
|
||||||
|
resp, err := httpClient.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
si.ServerFailed(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
err = Not200Error(resp)
|
||||||
|
si.ServerFailed(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (si *serverInfo) Domain() string {
|
||||||
|
return fmt.Sprintf("%s%s", si.subdomain, serverNameSuffix)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (si *serverInfo) ServerFailed(err error) {
|
||||||
|
si.lock.Lock()
|
||||||
|
defer si.lock.Unlock()
|
||||||
|
si.failureCount++
|
||||||
|
if si.failureCount > failedStateThreshold {
|
||||||
|
fmt.Printf("Server %s entering failed state\n", si.subdomain)
|
||||||
|
si.FailedState = true
|
||||||
|
si.FailureErr = err
|
||||||
|
go recoveryCheck(si)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func recoveryCheck(si *serverInfo) {
|
||||||
|
// TODO check for server recovery
|
||||||
|
}
|
||||||
|
|
||||||
|
type teeReadCloser struct {
|
||||||
|
r io.ReadCloser
|
||||||
|
w io.WriteCloser
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *teeReadCloser) Read(p []byte) (n int, err error) {
|
||||||
|
n, err = t.r.Read(p)
|
||||||
|
if n > 0 {
|
||||||
|
if n, err := t.w.Write(p[:n]); err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *teeReadCloser) Close() error {
|
||||||
|
err1 := t.r.Close()
|
||||||
|
err2 := t.w.Close()
|
||||||
|
if err1 != nil {
|
||||||
|
return err1
|
||||||
|
}
|
||||||
|
return err2
|
||||||
|
}
|
||||||
|
|
|
@ -10,7 +10,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/dustin/gojson"
|
"encoding/json"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
var configLocation = flag.String("config", "./config.json", "Location of the configuration file. Defaults to ./config.json")
|
var configLocation = flag.String("config", "./config.json", "Location of the configuration file. Defaults to ./config.json")
|
||||||
|
@ -20,6 +21,8 @@ var config ConfigFile
|
||||||
|
|
||||||
const ExitCodeBadConfig = 2
|
const ExitCodeBadConfig = 2
|
||||||
|
|
||||||
|
var allServers []*serverInfo
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
@ -30,6 +33,12 @@ func main() {
|
||||||
|
|
||||||
loadConfig()
|
loadConfig()
|
||||||
|
|
||||||
|
allServers = make([]*serverInfo, len(ServerNames))
|
||||||
|
for i, v := range ServerNames {
|
||||||
|
allServers[i] = &serverInfo{}
|
||||||
|
allServers[i].Setup(v)
|
||||||
|
}
|
||||||
|
|
||||||
http.HandleFunc("/api/get", ServeAPIGet)
|
http.HandleFunc("/api/get", ServeAPIGet)
|
||||||
http.ListenAndServe(config.ListenAddr, http.DefaultServeMux)
|
http.ListenAndServe(config.ListenAddr, http.DefaultServeMux)
|
||||||
}
|
}
|
||||||
|
@ -43,10 +52,12 @@ const jsonErrBlankRequest = `{"status":"error","error":"no queries given"}`
|
||||||
const statusError = "error"
|
const statusError = "error"
|
||||||
const statusPartial = "partial"
|
const statusPartial = "partial"
|
||||||
const statusOk = "ok"
|
const statusOk = "ok"
|
||||||
|
|
||||||
type apiResponse struct {
|
type apiResponse struct {
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
Responses []requestResponse `json:"resp"`
|
Responses []requestResponse `json:"resp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type requestResponse struct {
|
type requestResponse struct {
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
Request string `json:"req"`
|
Request string `json:"req"`
|
||||||
|
@ -81,7 +92,7 @@ func ServeAPIGet(w http.ResponseWriter, r *http.Request) {
|
||||||
resp.Responses[i] = ProcessSingleGetRequest(v)
|
resp.Responses[i] = ProcessSingleGetRequest(v)
|
||||||
}
|
}
|
||||||
for _, v := range resp.Responses {
|
for _, v := range resp.Responses {
|
||||||
if v.Status == statusError {
|
if v.Status != statusOk {
|
||||||
resp.Status = statusPartial
|
resp.Status = statusPartial
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -92,7 +103,7 @@ func ServeAPIGet(w http.ResponseWriter, r *http.Request) {
|
||||||
enc.Encode(resp)
|
enc.Encode(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
const errRangeFormatIncorrect = errors.New("incorrect range format, must be yyyy-mm-dd~yyyy-mm-dd")
|
var errRangeFormatIncorrect = errors.New("incorrect range format, must be yyyy-mm-dd~yyyy-mm-dd")
|
||||||
|
|
||||||
// ProcessSingleGetRequest takes a request string and pulls the unique user data for the given dates and filters.
|
// ProcessSingleGetRequest takes a request string and pulls the unique user data for the given dates and filters.
|
||||||
//
|
//
|
||||||
|
@ -122,14 +133,18 @@ const errRangeFormatIncorrect = errors.New("incorrect range format, must be yyyy
|
||||||
//
|
//
|
||||||
// It does not matter if a date is specified multiple times, due to the data format used.
|
// It does not matter if a date is specified multiple times, due to the data format used.
|
||||||
func ProcessSingleGetRequest(req string) (result requestResponse) {
|
func ProcessSingleGetRequest(req string) (result requestResponse) {
|
||||||
var hll hyperloglog.HyperLogLogPlus, _ = hyperloglog.NewPlus(server.CounterPrecision)
|
fmt.Println("processing request:", req)
|
||||||
|
hll, _ := hyperloglog.NewPlus(server.CounterPrecision)
|
||||||
|
|
||||||
result.Request = req
|
result.Request = req
|
||||||
result.Status = statusOk
|
result.Status = statusOk
|
||||||
filter := serverFilterAll
|
filter := serverFilterAll
|
||||||
|
|
||||||
collectError := func(err error) bool {
|
collectError := func(err error) bool {
|
||||||
if err != nil {
|
if err == ErrServerInFailedState {
|
||||||
|
result.Status = statusPartial
|
||||||
|
return false
|
||||||
|
} else if err != nil {
|
||||||
result.Status = statusError
|
result.Status = statusError
|
||||||
result.Error = err.Error()
|
result.Error = err.Error()
|
||||||
return true
|
return true
|
||||||
|
@ -161,7 +176,7 @@ func ProcessSingleGetRequest(req string) (result requestResponse) {
|
||||||
break outerLoop
|
break outerLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
err = addSingleDate(at, filter, &hll)
|
err = addSingleDate(at, filter, hll)
|
||||||
if collectError(err) {
|
if collectError(err) {
|
||||||
break outerLoop
|
break outerLoop
|
||||||
}
|
}
|
||||||
|
@ -175,7 +190,7 @@ func ProcessSingleGetRequest(req string) (result requestResponse) {
|
||||||
break outerLoop
|
break outerLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
err = addRange(from, to, filter, &hll)
|
err = addRange(from, to, filter, hll)
|
||||||
if collectError(err) {
|
if collectError(err) {
|
||||||
break outerLoop
|
break outerLoop
|
||||||
}
|
}
|
||||||
|
@ -200,26 +215,73 @@ func parseDateFromRequest(dateStr string) (time.Time, error) {
|
||||||
if err != nil || n != 3 {
|
if err != nil || n != 3 {
|
||||||
return zeroTime, errBadDate
|
return zeroTime, errBadDate
|
||||||
}
|
}
|
||||||
return time.Date(year, month, day, 0, 0, 0, 0, server.CounterLocation)
|
return time.Date(year, time.Month(month), day, 0, 0, 0, 0, server.CounterLocation), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type hllAndError struct {
|
||||||
|
hll *hyperloglog.HyperLogLogPlus
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func addSingleDate(at time.Time, filter serverFilter, dest *hyperloglog.HyperLogLogPlus) error {
|
func addSingleDate(at time.Time, filter serverFilter, dest *hyperloglog.HyperLogLogPlus) error {
|
||||||
// TODO
|
var partialErr error
|
||||||
return nil
|
for _, si := range allServers {
|
||||||
|
if filter.IsServerAllowed(si) {
|
||||||
|
hll, err2 := si.GetHLL(at)
|
||||||
|
if err2 == ErrServerInFailedState {
|
||||||
|
partialErr = err2
|
||||||
|
} else if err2 != nil {
|
||||||
|
return err2
|
||||||
|
} else {
|
||||||
|
dest.Merge(hll)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return partialErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func addRange(start time.Time, end time.Time, filter serverFilter, dest *hyperloglog.HyperLogLogPlus) error {
|
func addRange(start time.Time, end time.Time, filter serverFilter, dest *hyperloglog.HyperLogLogPlus) error {
|
||||||
|
end = server.TruncateToMidnight(end)
|
||||||
|
year, month, day := start.Date()
|
||||||
|
var partialErr error
|
||||||
|
var myAllServers = make([]*serverInfo, 0, len(allServers))
|
||||||
|
for _, si := range allServers {
|
||||||
|
if filter.IsServerAllowed(si) {
|
||||||
|
myAllServers = append(myAllServers, si)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
var ch = make(chan hllAndError)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for current := start; current.Before(end); day = day + 1 {
|
||||||
|
current = time.Date(year, month, day, 0, 0, 0, 0, server.CounterLocation)
|
||||||
|
for _, si := range myAllServers {
|
||||||
|
wg.Add(1)
|
||||||
|
go getHLL(ch, si, current)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
for pair := range ch {
|
||||||
|
wg.Done()
|
||||||
|
hll, err := pair.hll, pair.err
|
||||||
|
if err != nil {
|
||||||
|
if partialErr == nil || partialErr == ErrServerInFailedState {
|
||||||
|
partialErr = err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dest.Merge(hll)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return partialErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func combineDateRange(from time.Time, to time.Time, dest *hyperloglog.HyperLogLogPlus) error {
|
func getHLL(ch chan hllAndError, si *serverInfo, at time.Time) {
|
||||||
from = server.TruncateToMidnight(from)
|
hll, err := si.GetHLL(at)
|
||||||
to = server.TruncateToMidnight(to)
|
ch <- hllAndError{hll: hll, err: err}
|
||||||
year, month, day := from.Date()
|
|
||||||
for current := from; current.Before(to); day = day + 1 {
|
|
||||||
current = time.Date(year, month, day, 0, 0, 0, 0, server.CounterLocation)
|
|
||||||
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
|
@ -177,7 +177,7 @@ var SocketUpgrader = websocket.Upgrader{
|
||||||
// Memes go here.
|
// Memes go here.
|
||||||
var BannerHTML []byte
|
var BannerHTML []byte
|
||||||
|
|
||||||
// StopAcceptingConnections is closed while the server is shutting down.
|
// StopAcceptingConnectionsCh is closed while the server is shutting down.
|
||||||
var StopAcceptingConnectionsCh = make(chan struct{})
|
var StopAcceptingConnectionsCh = make(chan struct{})
|
||||||
var StopAcceptingConnections = false
|
var StopAcceptingConnections = false
|
||||||
|
|
||||||
|
@ -283,12 +283,12 @@ const sendMessageBufferLength = 125
|
||||||
const sendMessageAbortLength = 50
|
const sendMessageAbortLength = 50
|
||||||
|
|
||||||
// RunSocketConnection contains the main run loop of a websocket connection.
|
// RunSocketConnection contains the main run loop of a websocket connection.
|
||||||
|
//
|
||||||
// First, it sets up the channels, the ClientInfo object, and the pong frame handler.
|
// First, it sets up the channels, the ClientInfo object, and the pong frame handler.
|
||||||
// It starts the reader goroutine pointing at the newly created channels.
|
// It starts the reader goroutine pointing at the newly created channels.
|
||||||
// The function then enters the run loop (a `for{select{}}`).
|
// The function then enters the run loop (a `for{select{}}`).
|
||||||
// The run loop is broken when an object is received on errorChan, or if `hello` is not the first C2S Command.
|
// The run loop is broken when an object is received on errorChan, or if `hello` is not the first C2S Command.
|
||||||
|
//
|
||||||
// After the run loop stops, the function launches a goroutine to drain
|
// After the run loop stops, the function launches a goroutine to drain
|
||||||
// client.MessageChannel, signals the reader goroutine to stop, unsubscribes
|
// client.MessageChannel, signals the reader goroutine to stop, unsubscribes
|
||||||
// from all pub/sub channels, waits on MsgChannelKeepalive (remember, the
|
// from all pub/sub channels, waits on MsgChannelKeepalive (remember, the
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue