diff --git a/socketserver/cmd/ffzsocketserver/console.go b/socketserver/cmd/ffzsocketserver/console.go index 55579f8f..f0f83a8d 100644 --- a/socketserver/cmd/ffzsocketserver/console.go +++ b/socketserver/cmd/ffzsocketserver/console.go @@ -8,8 +8,8 @@ import ( "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server" "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/rate" - "gopkg.in/abiosoft/ishell.v1" "github.com/gorilla/websocket" + "gopkg.in/abiosoft/ishell.v1" ) func commandLineConsole() { diff --git a/socketserver/cmd/statsweb/servers.go b/socketserver/cmd/statsweb/servers.go index d17e590f..53530cb9 100644 --- a/socketserver/cmd/statsweb/servers.go +++ b/socketserver/cmd/statsweb/servers.go @@ -12,6 +12,7 @@ import ( "bitbucket.org/stendec/frankerfacez/socketserver/server" "github.com/clarkduvall/hyperloglog" + lru "github.com/hashicorp/golang-lru" ) type serverFilter struct { diff --git a/socketserver/server/commands.go b/socketserver/server/commands.go index 0ee61107..7e7d490e 100644 --- a/socketserver/server/commands.go +++ b/socketserver/server/commands.go @@ -12,6 +12,7 @@ import ( "github.com/gorilla/websocket" "github.com/satori/go.uuid" + "golang.org/x/sync/singleflight" ) // Command is a string indicating which RPC is requested. @@ -426,137 +427,34 @@ const ( CacheStatusExpired ) -var pendingBunchedRequests = make(map[bunchedRequest]*bunchSubscriberList) -var pendingBunchLock sync.Mutex -var bunchCache = make(map[bunchedRequest]cachedBunchedResponse) -var bunchCacheLock sync.RWMutex -var bunchCacheCleanupSignal = sync.NewCond(&bunchCacheLock) -var bunchCacheLastCleanup time.Time - -func bunchedRequestFromCM(msg *ClientMessage) bunchedRequest { - return bunchedRequest{Command: msg.Command, Param: copyString(msg.origArguments)} -} - -// is_init_func -func bunchCacheJanitor() { - go func() { - for { - time.Sleep(30 * time.Minute) - bunchCacheCleanupSignal.Signal() - } - }() - - bunchCacheLock.Lock() - for { - // Unlocks CachedBunchLock, waits for signal, re-locks - bunchCacheCleanupSignal.Wait() - - if bunchCacheLastCleanup.After(time.Now().Add(-1 * time.Second)) { - // skip if it's been less than 1 second - continue - } - - // CachedBunchLock is held here - keepIfAfter := time.Now().Add(-5 * time.Minute) - for req, resp := range bunchCache { - if !resp.Timestamp.After(keepIfAfter) { - delete(bunchCache, req) - } - } - bunchCacheLastCleanup = time.Now() - // Loop and Wait(), which re-locks - } -} - -var emptyCachedBunchedResponse cachedBunchedResponse - -func bunchGetCacheStatus(br bunchedRequest, client *ClientInfo) (cacheStatus, cachedBunchedResponse) { - bunchCacheLock.RLock() - defer bunchCacheLock.RUnlock() - cachedResponse, ok := bunchCache[br] - if ok && cachedResponse.Timestamp.After(time.Now().Add(-5*time.Minute)) { - return CacheStatusFound, cachedResponse - } else if ok { - return CacheStatusExpired, emptyCachedBunchedResponse - } - return CacheStatusNotFound, emptyCachedBunchedResponse -} - -func normalizeBunchedRequest(br bunchedRequest) bunchedRequest { - if br.Command == "get_link" { - // TODO - } - return br -} +var bunchGroup singleflight.Group // C2SHandleBunchedCommand handles C2S Commands such as `get_link`. // It makes a request to the backend server for the data, but any other requests coming in while the first is pending also get the responses from the first one. // Additionally, results are cached. func C2SHandleBunchedCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { - // FIXME(riking): Function is too complex + key := fmt.Sprintf("%s:%s", msg.Command, msg.origArguments) - br := bunchedRequestFromCM(&msg) - br = normalizeBunchedRequest(br) - - cacheStatus, cachedResponse := bunchGetCacheStatus(br, client) - - if cacheStatus == CacheStatusFound { - var response ClientMessage - response.Command = SuccessCommand - response.MessageID = msg.MessageID - response.origArguments = cachedResponse.Response - response.parseOrigArguments() - - return response, nil - } else if cacheStatus == CacheStatusExpired { - // Wake up the lazy janitor - bunchCacheCleanupSignal.Signal() - } - - pendingBunchLock.Lock() - defer pendingBunchLock.Unlock() - list, ok := pendingBunchedRequests[br] - if ok { - list.Lock() - AddToSliceB(&list.Members, client, msg.MessageID) - list.Unlock() - - return ClientMessage{Command: AsyncResponseCommand}, nil - } - - pendingBunchedRequests[br] = &bunchSubscriberList{Members: []bunchSubscriber{{Client: client, MessageID: msg.MessageID}}} - - go func(request bunchedRequest) { - respStr, err := Backend.SendRemoteCommandCached(string(request.Command), request.Param, AuthInfo{}) + resultCh := bunchGroup.DoChan(key, func() (interface{}, error) { + return Backend.SendRemoteCommandCached(msg.Command, msg.origArguments, AuthInfo{}) + }) + client.MsgChannelKeepalive.Add(1) + go func() { + result := <-resultCh var msg ClientMessage - if err == nil { - msg.Command = SuccessCommand - msg.origArguments = respStr - msg.parseOrigArguments() - } else { + if result.Err != nil { msg.Command = ErrorCommand msg.Arguments = err.Error() + } else { + msg.Command = SuccessCommand + msg.origArguments = result.Val.(string) + msg.parseOrigArguments() } - if err == nil { - bunchCacheLock.Lock() - bunchCache[request] = cachedBunchedResponse{Response: respStr, Timestamp: time.Now()} - bunchCacheLock.Unlock() - } - - pendingBunchLock.Lock() - bsl := pendingBunchedRequests[request] - delete(pendingBunchedRequests, request) - pendingBunchLock.Unlock() - - bsl.Lock() - for _, member := range bsl.Members { - msg.MessageID = member.MessageID - member.Client.Send(msg) - } - bsl.Unlock() - }(br) + client.Send(msg) + client.MsgChannelKeepalive.Done() + }() return ClientMessage{Command: AsyncResponseCommand}, nil }