From bdd5b5416da541b142e85f2dd57c614e4d6b3b79 Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 25 Sep 2017 14:28:21 -0700 Subject: [PATCH 1/4] add visibility for how many items in response cache --- socketserver/server/stats.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/socketserver/server/stats.go b/socketserver/server/stats.go index 6eec5e93..3b413f0d 100644 --- a/socketserver/server/stats.go +++ b/socketserver/server/stats.go @@ -38,9 +38,8 @@ type StatsData struct { MemoryInUseKB uint64 MemoryRSSKB uint64 - LowMemDroppedConnections uint64 - - MemPerClientBytes uint64 + ResponseCacheItems uint64 + MemPerClientBytes uint64 CpuUsagePct float64 @@ -84,7 +83,7 @@ func commandCounter() { } // StatsDataVersion is the version of the StatsData struct. -const StatsDataVersion = 7 +const StatsDataVersion = 8 const pageSize = 4096 var cpuUsage struct { @@ -170,6 +169,7 @@ func updatePeriodicStats() { { Statistics.Uptime = nowUpdate.Sub(Statistics.StartTime).String() + Statistics.ResponseCacheItems = Backend.responseCache.ItemCount() } { From 75e1a67e9a71442d070ca370bc9566b8adb94bb4 Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 25 Sep 2017 14:37:48 -0700 Subject: [PATCH 2/4] Split bunched command list out to own array --- socketserver/server/commands.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/socketserver/server/commands.go b/socketserver/server/commands.go index 01d621da..3108f061 100644 --- a/socketserver/server/commands.go +++ b/socketserver/server/commands.go @@ -32,14 +32,17 @@ var commandHandlers = map[Command]CommandHandler{ "track_follow": C2STrackFollow, "emoticon_uses": C2SEmoticonUses, "survey": C2SSurvey, +} - "twitch_emote": C2SHandleBunchedCommand, - "get_link": C2SHandleBunchedCommand, - "get_display_name": C2SHandleBunchedCommand, - "get_emote": C2SHandleBunchedCommand, - "get_emote_set": C2SHandleBunchedCommand, - "has_logs": C2SHandleBunchedCommand, - "update_follow_buttons": C2SHandleRemoteCommand, +var bunchedCommands = []string{ + "get_display_name", + "get_emote", + "get_emote_set", + "get_link", + "get_itad_plain", + "get_itad_prices", + "get_name_history", + "has_logs", } func setupInterning() { @@ -71,6 +74,12 @@ func DispatchC2SCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMess handler, ok := commandHandlers[msg.Command] if !ok { handler = C2SHandleRemoteCommand + + for _, v := range bunchedCommands { + if msg.Command == v { + handler = C2SHandleBunchedCommand + } + } } CommandCounter <- msg.Command From 7657357164d995ac5f2c7e3747b4c55ad8cdcdda Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 25 Sep 2017 15:08:22 -0700 Subject: [PATCH 3/4] Compile fixes, switch cache implementation --- socketserver/server/backend.go | 64 +++++++++++++++++++++++----- socketserver/server/commands.go | 2 +- socketserver/server/handlecore.go | 1 - socketserver/server/stats.go | 1 - socketserver/server/subscriptions.go | 7 ++- 5 files changed, 60 insertions(+), 15 deletions(-) diff --git a/socketserver/server/backend.go b/socketserver/server/backend.go index 6b4643e8..7f15b310 100644 --- a/socketserver/server/backend.go +++ b/socketserver/server/backend.go @@ -16,8 +16,9 @@ import ( "time" "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/naclform" - cache "github.com/patrickmn/go-cache" + "github.com/karlseguin/ccache" "golang.org/x/crypto/nacl/box" + "golang.org/x/sync/singleflight" ) const bPathAnnounceStartup = "/startup" @@ -28,7 +29,8 @@ const bPathOtherCommand = "/cmd/" type backendInfo struct { HTTPClient http.Client baseURL string - responseCache *cache.Cache + responseCache *ccache.Cache + reloadGroup singleflight.Group postStatisticsURL string addTopicURL string @@ -49,7 +51,8 @@ func setupBackend(config *ConfigFile) *backendInfo { b.HTTPClient.Timeout = 60 * time.Second b.baseURL = config.BackendURL - b.responseCache = cache.New(60*time.Second, 120*time.Second) + // size in bytes of string payload + b.responseCache = ccache.New(ccache.Configure().MaxSize(250 * 1000 * 1024)) b.announceStartupURL = fmt.Sprintf("%s%s", b.baseURL, bPathAnnounceStartup) b.addTopicURL = fmt.Sprintf("%s%s", b.baseURL, bPathAddTopic) @@ -77,6 +80,18 @@ func getCacheKey(remoteCommand, data string) string { return fmt.Sprintf("%s/%s", remoteCommand, data) } +type cachedResponseStr string + +// implements ccache.Sized +func (c cachedResponseStr) Size() int64 { + return int64(len(string(c))) +} + +// implements Stringer +func (c cachedResponseStr) String() string { + return string(c) +} + // ErrForwardedFromBackend is an error returned by the backend server. type ErrForwardedFromBackend struct { JSONError interface{} @@ -88,22 +103,47 @@ func (bfe ErrForwardedFromBackend) Error() string { } // ErrAuthorizationNeeded is emitted when the backend replies with HTTP 401. +// // Indicates that an attempt to validate `ClientInfo.TwitchUsername` should be attempted. var ErrAuthorizationNeeded = errors.New("Must authenticate Twitch username to use this command") -// SendRemoteCommandCached performs a RPC call on the backend, but caches responses. +// SendRemoteCommandCached performs a RPC call on the backend, checking for a +// cached response first. +// +// If a cached, but expired, response is found, the existing value is returned +// and the cache is updated in the background. func (backend *backendInfo) SendRemoteCommandCached(remoteCommand, data string, auth AuthInfo) (string, error) { - cached, ok := backend.responseCache.Get(getCacheKey(remoteCommand, data)) - if ok { - return cached.(string), nil + cacheKey := getCacheKey(remoteCommand, data) + item := backend.responseCache.Get(cacheKey) + if item != nil { + if item.Expired() { + // reload in background + go backend.reloadGroup.Do(cacheKey, func() (interface{}, error) { + backend.SendRemoteCommand(remoteCommand, data, auth) + return nil, nil + }) + } + return item.Value().(cachedResponseStr).String(), nil } return backend.SendRemoteCommand(remoteCommand, data, auth) } // SendRemoteCommand performs a RPC call on the backend by POSTing to `/cmd/$remoteCommand`. +// // The form data is as follows: `clientData` is the JSON in the `data` parameter -// (should be retrieved from ClientMessage.Arguments), and either `username` or -// `usernameClaimed` depending on whether AuthInfo.UsernameValidates is true is AuthInfo.TwitchUsername. +// (should be retrieved from ClientMessage.Arguments), `username` is AuthInfo.TwitchUsername, +// and `authenticated` is 1 or 0 depending on AuthInfo.UsernameValidated. +// +// 401 responses return an ErrAuthorizationNeeded. +// +// Non-2xx responses return the response body as an error to the client (application/json +// responses are sent as-is, non-json are sent as a JSON string). +// +// If a 2xx response has the FFZ-Cache header, its value is used as a minimum number of +// seconds to cache the response for. (Responses may be cached for longer, see +// SendRemoteCommandCached and the cache implementation.) +// +// A successful response updates the Statistics.Health.Backend map. func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth AuthInfo) (responseStr string, err error) { destURL := fmt.Sprintf("%s/cmd/%s", backend.baseURL, remoteCommand) healthBucket := fmt.Sprintf("/cmd/%s", remoteCommand) @@ -166,7 +206,11 @@ func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth A return "", fmt.Errorf("The RPC server returned a non-integer cache duration: %v", err) } duration := time.Duration(durSecs) * time.Second - backend.responseCache.Set(getCacheKey(remoteCommand, data), responseStr, duration) + backend.responseCache.Set( + getCacheKey(remoteCommand, data), + cachedResponseStr(responseStr), + duration, + ) } now := time.Now().UTC() diff --git a/socketserver/server/commands.go b/socketserver/server/commands.go index 3108f061..8a8108be 100644 --- a/socketserver/server/commands.go +++ b/socketserver/server/commands.go @@ -34,7 +34,7 @@ var commandHandlers = map[Command]CommandHandler{ "survey": C2SSurvey, } -var bunchedCommands = []string{ +var bunchedCommands = []Command{ "get_display_name", "get_emote", "get_emote_set", diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 15179638..07d095db 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -205,7 +205,6 @@ func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) { updateSysMem() if Statistics.SysMemFreeKB > 0 && Statistics.SysMemFreeKB < Configuration.MinMemoryKBytes { - atomic.AddUint64(&Statistics.LowMemDroppedConnections, 1) w.WriteHeader(503) fmt.Fprint(w, "error: low memory") return diff --git a/socketserver/server/stats.go b/socketserver/server/stats.go index 3b413f0d..10dad36b 100644 --- a/socketserver/server/stats.go +++ b/socketserver/server/stats.go @@ -169,7 +169,6 @@ func updatePeriodicStats() { { Statistics.Uptime = nowUpdate.Sub(Statistics.StartTime).String() - Statistics.ResponseCacheItems = Backend.responseCache.ItemCount() } { diff --git a/socketserver/server/subscriptions.go b/socketserver/server/subscriptions.go index 007aea03..136fa603 100644 --- a/socketserver/server/subscriptions.go +++ b/socketserver/server/subscriptions.go @@ -156,8 +156,11 @@ func UnsubscribeSingleChat(client *ClientInfo, channelName string) { // - write lock to SubscriptionInfos // - write lock to ClientInfo func UnsubscribeAll(client *ClientInfo) { - if StopAcceptingConnections { - return // no need to remove from a high-contention list when the server is closing + select { + case <-StopAcceptingConnectionsCh: + // Skip high-contention client removal operations while server shutting down + return + default: } GlobalSubscriptionLock.Lock() From 87672061058aaef71a5c874f74b6e5c113e44045 Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 25 Sep 2017 15:24:58 -0700 Subject: [PATCH 4/4] Revert switching cache implementation --- socketserver/server/backend.go | 33 +++++++-------------------------- socketserver/server/stats.go | 3 ++- 2 files changed, 9 insertions(+), 27 deletions(-) diff --git a/socketserver/server/backend.go b/socketserver/server/backend.go index 7f15b310..b3761795 100644 --- a/socketserver/server/backend.go +++ b/socketserver/server/backend.go @@ -16,7 +16,7 @@ import ( "time" "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/naclform" - "github.com/karlseguin/ccache" + cache "github.com/patrickmn/go-cache" "golang.org/x/crypto/nacl/box" "golang.org/x/sync/singleflight" ) @@ -29,7 +29,7 @@ const bPathOtherCommand = "/cmd/" type backendInfo struct { HTTPClient http.Client baseURL string - responseCache *ccache.Cache + responseCache *cache.Cache reloadGroup singleflight.Group postStatisticsURL string @@ -52,7 +52,7 @@ func setupBackend(config *ConfigFile) *backendInfo { b.HTTPClient.Timeout = 60 * time.Second b.baseURL = config.BackendURL // size in bytes of string payload - b.responseCache = ccache.New(ccache.Configure().MaxSize(250 * 1000 * 1024)) + b.responseCache = cache.New(60*time.Second, 10*time.Minute) b.announceStartupURL = fmt.Sprintf("%s%s", b.baseURL, bPathAnnounceStartup) b.addTopicURL = fmt.Sprintf("%s%s", b.baseURL, bPathAddTopic) @@ -80,18 +80,6 @@ func getCacheKey(remoteCommand, data string) string { return fmt.Sprintf("%s/%s", remoteCommand, data) } -type cachedResponseStr string - -// implements ccache.Sized -func (c cachedResponseStr) Size() int64 { - return int64(len(string(c))) -} - -// implements Stringer -func (c cachedResponseStr) String() string { - return string(c) -} - // ErrForwardedFromBackend is an error returned by the backend server. type ErrForwardedFromBackend struct { JSONError interface{} @@ -114,16 +102,9 @@ var ErrAuthorizationNeeded = errors.New("Must authenticate Twitch username to us // and the cache is updated in the background. func (backend *backendInfo) SendRemoteCommandCached(remoteCommand, data string, auth AuthInfo) (string, error) { cacheKey := getCacheKey(remoteCommand, data) - item := backend.responseCache.Get(cacheKey) - if item != nil { - if item.Expired() { - // reload in background - go backend.reloadGroup.Do(cacheKey, func() (interface{}, error) { - backend.SendRemoteCommand(remoteCommand, data, auth) - return nil, nil - }) - } - return item.Value().(cachedResponseStr).String(), nil + cached, ok := backend.responseCache.Get(cacheKey) + if ok { + return cached.(string), nil } return backend.SendRemoteCommand(remoteCommand, data, auth) } @@ -208,7 +189,7 @@ func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth A duration := time.Duration(durSecs) * time.Second backend.responseCache.Set( getCacheKey(remoteCommand, data), - cachedResponseStr(responseStr), + responseStr, duration, ) } diff --git a/socketserver/server/stats.go b/socketserver/server/stats.go index 10dad36b..da5debb1 100644 --- a/socketserver/server/stats.go +++ b/socketserver/server/stats.go @@ -38,7 +38,7 @@ type StatsData struct { MemoryInUseKB uint64 MemoryRSSKB uint64 - ResponseCacheItems uint64 + ResponseCacheItems int MemPerClientBytes uint64 CpuUsagePct float64 @@ -169,6 +169,7 @@ func updatePeriodicStats() { { Statistics.Uptime = nowUpdate.Sub(Statistics.StartTime).String() + Statistics.ResponseCacheItems = Backend.responseCache.ItemCount() } {