1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-06-27 21:05:53 +00:00

Use groupcache for bunched backend commands

This commit is contained in:
Kane York 2017-09-15 12:21:41 -07:00
parent d97a44326a
commit 14417c708e
3 changed files with 19 additions and 120 deletions

View file

@ -12,6 +12,7 @@ import (
"github.com/gorilla/websocket"
"github.com/satori/go.uuid"
"golang.org/x/sync/singleflight"
)
// Command is a string indicating which RPC is requested.
@ -426,137 +427,34 @@ const (
CacheStatusExpired
)
var pendingBunchedRequests = make(map[bunchedRequest]*bunchSubscriberList)
var pendingBunchLock sync.Mutex
var bunchCache = make(map[bunchedRequest]cachedBunchedResponse)
var bunchCacheLock sync.RWMutex
var bunchCacheCleanupSignal = sync.NewCond(&bunchCacheLock)
var bunchCacheLastCleanup time.Time
func bunchedRequestFromCM(msg *ClientMessage) bunchedRequest {
return bunchedRequest{Command: msg.Command, Param: copyString(msg.origArguments)}
}
// is_init_func
func bunchCacheJanitor() {
go func() {
for {
time.Sleep(30 * time.Minute)
bunchCacheCleanupSignal.Signal()
}
}()
bunchCacheLock.Lock()
for {
// Unlocks CachedBunchLock, waits for signal, re-locks
bunchCacheCleanupSignal.Wait()
if bunchCacheLastCleanup.After(time.Now().Add(-1 * time.Second)) {
// skip if it's been less than 1 second
continue
}
// CachedBunchLock is held here
keepIfAfter := time.Now().Add(-5 * time.Minute)
for req, resp := range bunchCache {
if !resp.Timestamp.After(keepIfAfter) {
delete(bunchCache, req)
}
}
bunchCacheLastCleanup = time.Now()
// Loop and Wait(), which re-locks
}
}
var emptyCachedBunchedResponse cachedBunchedResponse
func bunchGetCacheStatus(br bunchedRequest, client *ClientInfo) (cacheStatus, cachedBunchedResponse) {
bunchCacheLock.RLock()
defer bunchCacheLock.RUnlock()
cachedResponse, ok := bunchCache[br]
if ok && cachedResponse.Timestamp.After(time.Now().Add(-5*time.Minute)) {
return CacheStatusFound, cachedResponse
} else if ok {
return CacheStatusExpired, emptyCachedBunchedResponse
}
return CacheStatusNotFound, emptyCachedBunchedResponse
}
func normalizeBunchedRequest(br bunchedRequest) bunchedRequest {
if br.Command == "get_link" {
// TODO
}
return br
}
var bunchGroup singleflight.Group
// C2SHandleBunchedCommand handles C2S Commands such as `get_link`.
// It makes a request to the backend server for the data, but any other requests coming in while the first is pending also get the responses from the first one.
// Additionally, results are cached.
func C2SHandleBunchedCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
// FIXME(riking): Function is too complex
key := fmt.Sprintf("%s:%s", msg.Command, msg.origArguments)
br := bunchedRequestFromCM(&msg)
br = normalizeBunchedRequest(br)
cacheStatus, cachedResponse := bunchGetCacheStatus(br, client)
if cacheStatus == CacheStatusFound {
var response ClientMessage
response.Command = SuccessCommand
response.MessageID = msg.MessageID
response.origArguments = cachedResponse.Response
response.parseOrigArguments()
return response, nil
} else if cacheStatus == CacheStatusExpired {
// Wake up the lazy janitor
bunchCacheCleanupSignal.Signal()
}
pendingBunchLock.Lock()
defer pendingBunchLock.Unlock()
list, ok := pendingBunchedRequests[br]
if ok {
list.Lock()
AddToSliceB(&list.Members, client, msg.MessageID)
list.Unlock()
return ClientMessage{Command: AsyncResponseCommand}, nil
}
pendingBunchedRequests[br] = &bunchSubscriberList{Members: []bunchSubscriber{{Client: client, MessageID: msg.MessageID}}}
go func(request bunchedRequest) {
respStr, err := Backend.SendRemoteCommandCached(string(request.Command), request.Param, AuthInfo{})
resultCh := bunchGroup.DoChan(key, func() (interface{}, error) {
return Backend.SendRemoteCommandCached(msg.Command, msg.origArguments, AuthInfo{})
})
client.MsgChannelKeepalive.Add(1)
go func() {
result := <-resultCh
var msg ClientMessage
if err == nil {
msg.Command = SuccessCommand
msg.origArguments = respStr
msg.parseOrigArguments()
} else {
if result.Err != nil {
msg.Command = ErrorCommand
msg.Arguments = err.Error()
} else {
msg.Command = SuccessCommand
msg.origArguments = result.Val.(string)
msg.parseOrigArguments()
}
if err == nil {
bunchCacheLock.Lock()
bunchCache[request] = cachedBunchedResponse{Response: respStr, Timestamp: time.Now()}
bunchCacheLock.Unlock()
}
pendingBunchLock.Lock()
bsl := pendingBunchedRequests[request]
delete(pendingBunchedRequests, request)
pendingBunchLock.Unlock()
bsl.Lock()
for _, member := range bsl.Members {
msg.MessageID = member.MessageID
member.Client.Send(msg)
}
bsl.Unlock()
}(br)
client.Send(msg)
client.MsgChannelKeepalive.Done()
}()
return ClientMessage{Command: AsyncResponseCommand}, nil
}