diff --git a/socketserver/server/backend.go b/socketserver/server/backend.go index 9b8c5995..409f1f06 100644 --- a/socketserver/server/backend.go +++ b/socketserver/server/backend.go @@ -16,6 +16,7 @@ import ( "github.com/pmylund/go-cache" "golang.org/x/crypto/nacl/box" + "sync" ) const bPathAnnounceStartup = "/startup" @@ -24,24 +25,23 @@ const bPathAggStats = "/stats" const bPathOtherCommand = "/cmd/" type backendInfo struct { - HTTPClient http.Client - baseURL string + HTTPClient http.Client + baseURL string responseCache *cache.Cache - postStatsURL string - addTopicURL string + postStatsURL string + addTopicURL string announceStartupURL string sharedKey [32]byte - serverID int + serverID int - lastSuccess map[string]time.Time + lastSuccess map[string]time.Time + lastSuccessLock sync.Mutex } var Backend *backendInfo -var responseCache *cache.Cache - var postStatisticsURL string var addTopicURL string var announceStartupURL string @@ -57,14 +57,13 @@ func setupBackend(config *ConfigFile) *backendInfo { b.HTTPClient.Timeout = 60 * time.Second b.baseURL = config.BackendURL - if responseCache != nil { - responseCache.Flush() - } - responseCache = cache.New(60*time.Second, 120*time.Second) + b.responseCache = cache.New(60*time.Second, 120*time.Second) announceStartupURL = fmt.Sprintf("%s%s", b.baseURL, bPathAnnounceStartup) addTopicURL = fmt.Sprintf("%s%s", b.baseURL, bPathAddTopic) postStatisticsURL = fmt.Sprintf("%s%s", b.baseURL, bPathAggStats) + + epochTime := time.Unix(0, 0).UTC() lastBackendSuccess = map[string]time.Time{ bPathAnnounceStartup: epochTime, @@ -72,7 +71,7 @@ func setupBackend(config *ConfigFile) *backendInfo { bPathAggStats: epochTime, bPathOtherCommand: epochTime, } - Statistics.Health.Backend = lastBackendSuccess + b.lastSuccess = lastBackendSuccess var theirPublic, ourPrivate [32]byte copy(theirPublic[:], config.BackendPublicKey) @@ -156,7 +155,7 @@ var ErrAuthorizationNeeded = errors.New("Must authenticate Twitch username to us // SendRemoteCommandCached performs a RPC call on the backend, but caches responses. func (backend *backendInfo) SendRemoteCommandCached(remoteCommand, data string, auth AuthInfo) (string, error) { - cached, ok := responseCache.Get(getCacheKey(remoteCommand, data)) + cached, ok := backend.responseCache.Get(getCacheKey(remoteCommand, data)) if ok { return cached.(string), nil } @@ -169,6 +168,7 @@ func (backend *backendInfo) SendRemoteCommandCached(remoteCommand, data string, // `usernameClaimed` depending on whether AuthInfo.UsernameValidates is true is AuthInfo.TwitchUsername. func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth AuthInfo) (responseStr string, err error) { destURL := fmt.Sprintf("%s/cmd/%s", backend.baseURL, remoteCommand) + healthBucket := fmt.Sprintf("/cmd/%s", remoteCommand) formData := url.Values{ "clientData": []string{data}, @@ -219,10 +219,14 @@ func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth A return "", fmt.Errorf("The RPC server returned a non-integer cache duration: %v", err) } duration := time.Duration(durSecs) * time.Second - responseCache.Set(getCacheKey(remoteCommand, data), responseStr, duration) + backend.responseCache.Set(getCacheKey(remoteCommand, data), responseStr, duration) } - lastBackendSuccess[bPathOtherCommand] = time.Now().UTC() + now := time.Now().UTC() + backend.lastSuccessLock.Lock() + defer backend.lastSuccessLock.Unlock() + backend.lastSuccess[bPathOtherCommand] = now + backend.lastSuccess[healthBucket] = now return } @@ -243,7 +247,9 @@ func (backend *backendInfo) SendAggregatedData(form url.Values) error { return httpError(resp.StatusCode) } - lastBackendSuccess[bPathAggStats] = time.Now().UTC() + backend.lastSuccessLock.Lock() + defer backend.lastSuccessLock.Unlock() + backend.lastSuccess[bPathAggStats] = time.Now().UTC() return resp.Body.Close() } @@ -305,7 +311,9 @@ func (backend *backendInfo) sendTopicNotice(topic string, added bool) error { return ErrBackendNotOK{Code: resp.StatusCode, Response: respStr} } - lastBackendSuccess[bPathAddTopic] = time.Now().UTC() + backend.lastSuccessLock.Lock() + defer backend.lastSuccessLock.Unlock() + backend.lastSuccess[bPathAddTopic] = time.Now().UTC() return nil } diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index da3c7a3d..66547586 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -114,11 +114,9 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { log.Println("could not announce startup to backend:", err) } else { resp.Body.Close() - lastBackendSuccess[bPathAnnounceStartup] = time.Now().UTC() - } - - if Configuration.UseESLogStashing { - // logstasher.Setup(Configuration.ESServer, Configuration.ESIndexPrefix, Configuration.ESHostName) + Backend.lastSuccessLock.Lock() + Backend.lastSuccess[bPathAnnounceStartup] = time.Now().UTC() + Backend.lastSuccessLock.Unlock() } janitorsOnce.Do(startJanitors) diff --git a/socketserver/server/stats.go b/socketserver/server/stats.go index a330297d..a9401248 100644 --- a/socketserver/server/stats.go +++ b/socketserver/server/stats.go @@ -97,6 +97,12 @@ func newStatsData() *StatsData { DisconnectReasons: make(map[string]uint64), ClientVersions: make(map[string]uint64), StatsDataVersion: StatsDataVersion, + Health: struct { + IRC bool + Backend map[string]time.Time + }{ + Backend: make(map[string]time.Time), + }, } } @@ -162,6 +168,11 @@ func updatePeriodicStats() { { Statistics.Health.IRC = authIrcConnection.Connected() + Backend.lastSuccessLock.Lock() + for k, v := range Backend.lastSuccess { + Statistics.Health.Backend[k] = v + } + Backend.lastSuccessLock.Unlock() } }