From d518759fa012bcec4979ef9c0e770d4497e6c3c4 Mon Sep 17 00:00:00 2001 From: Kane York Date: Sun, 15 Nov 2015 15:52:37 -0800 Subject: [PATCH] Re-add bunched request caching with a lazy janitor And this time, only one lock is held at a time during the response. --- socketserver/internal/server/commands.go | 80 +++++++++++++++++++++- socketserver/internal/server/handlecore.go | 5 +- 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/socketserver/internal/server/commands.go b/socketserver/internal/server/commands.go index 24f60284..68b29d5c 100644 --- a/socketserver/internal/server/commands.go +++ b/socketserver/internal/server/commands.go @@ -379,12 +379,82 @@ type BunchSubscriberList struct { Members []BunchSubscriber } +type CacheStatus byte +const ( + CacheStatusNotFound = iota + CacheStatusFound + CacheStatusExpired +) + var PendingBunchedRequests map[BunchedRequest]*BunchSubscriberList = make(map[BunchedRequest]*BunchSubscriberList) var PendingBunchLock sync.Mutex +var CachedBunchedRequests map[BunchedRequest]BunchedResponse = make(map[BunchedRequest]BunchedResponse) +var CachedBunchLock sync.RWMutex +var BunchCacheCleanupSignal *sync.Cond = sync.NewCond(&CachedBunchLock) +var BunchCacheLastCleanup time.Time + +func bunchCacheJanitor() { + go func() { + for { + time.Sleep(30*time.Minute) + BunchCacheCleanupSignal.Signal() + } + }() + + CachedBunchLock.Lock() + for { + // Unlocks CachedBunchLock, waits for signal, re-locks + BunchCacheCleanupSignal.Wait() + + if BunchCacheLastCleanup.After(time.Now().Add(-1*time.Second)) { + // skip if it's been less than 1 second + continue + } + + // CachedBunchLock is held here + keepIfAfter := time.Now().Add(-5*time.Minute) + for req, resp := range CachedBunchedRequests { + if !resp.Timestamp.After(keepIfAfter) { + delete(CachedBunchedRequests, req) + } + } + BunchCacheLastCleanup = time.Now() + // Loop and Wait(), which re-locks + } +} func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { br := BunchedRequestFromCM(&msg) + cacheStatus := func() byte { + CachedBunchLock.RLock() + defer CachedBunchLock.RUnlock() + bresp, ok := CachedBunchedRequests[br] + if ok && bresp.Timestamp.After(time.Now().Add(-5*time.Minute)) { + client.MsgChannelKeepalive.Add(1) + go func() { + var rmsg ClientMessage + rmsg.Command = SuccessCommand + rmsg.MessageID = msg.MessageID + rmsg.origArguments = bresp.Response + rmsg.parseOrigArguments() + client.MessageChannel <- rmsg + client.MsgChannelKeepalive.Done() + }() + return CacheStatusFound + } else if ok { + return CacheStatusExpired + } + return CacheStatusNotFound + }() + + if cacheStatus == CacheStatusFound { + return ClientMessage{Command: AsyncResponseCommand}, nil + } else if cacheStatus == CacheStatusExpired { + // Wake up the lazy janitor + BunchCacheCleanupSignal.Signal() + } + PendingBunchLock.Lock() defer PendingBunchLock.Unlock() list, ok := PendingBunchedRequests[br] @@ -399,18 +469,24 @@ func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg Cl PendingBunchedRequests[br] = &BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}} go func(request BunchedRequest) { - resp, err := SendRemoteCommandCached(string(request.Command), request.Param, AuthInfo{}) + respStr, err := SendRemoteCommandCached(string(request.Command), request.Param, AuthInfo{}) var msg ClientMessage if err == nil { msg.Command = SuccessCommand - msg.origArguments = resp + msg.origArguments = respStr msg.parseOrigArguments() } else { msg.Command = ErrorCommand msg.Arguments = err.Error() } + if err == nil { + CachedBunchLock.Lock() + CachedBunchedRequests[request] = BunchedResponse{Response: respStr, Timestamp: time.Now()} + CachedBunchLock.Unlock() + } + PendingBunchLock.Lock() bsl := PendingBunchedRequests[request] delete(PendingBunchedRequests, request) diff --git a/socketserver/internal/server/handlecore.go b/socketserver/internal/server/handlecore.go index d0d0c694..6ecb1d28 100644 --- a/socketserver/internal/server/handlecore.go +++ b/socketserver/internal/server/handlecore.go @@ -73,9 +73,10 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { resp.Body.Close() } - go pubsubJanitor() - go backlogJanitor() go authorizationJanitor() + go backlogJanitor() + go bunchCacheJanitor() + go pubsubJanitor() go sendAggregateData() go ircConnection()