diff --git a/socketserver/internal/server/commands.go b/socketserver/internal/server/commands.go index 018c8c97..cd735d35 100644 --- a/socketserver/internal/server/commands.go +++ b/socketserver/internal/server/commands.go @@ -357,77 +357,41 @@ type BunchSubscriberList struct { var PendingBunchedRequests map[BunchedRequest]*BunchSubscriberList = make(map[BunchedRequest]*BunchSubscriberList) var PendingBunchLock sync.Mutex -var CompletedBunchedRequests map[BunchedRequest]BunchedResponse = make(map[BunchedRequest]BunchedResponse) -var CompletedBunchLock sync.RWMutex - -func bunchingJanitor() { - for { - time.Sleep(5 * time.Minute) - keepIfAfter := time.Now().Add(-5 * time.Minute) - CompletedBunchLock.Lock() - for req, resp := range CompletedBunchedRequests { - if !resp.Timestamp.After(keepIfAfter) { - delete(CompletedBunchedRequests, req) - } - } - CompletedBunchLock.Unlock() - } -} func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { br := BunchedRequestFromCM(&msg) - CompletedBunchLock.RLock() - resp, ok := CompletedBunchedRequests[br] - CompletedBunchLock.RUnlock() - - if ok && resp.Timestamp.After(time.Now().Add(-5*time.Minute)) { - return SuccessMessageFromString(resp.Response), nil - } else if ok { - - // Entry expired, let's remove it... - CompletedBunchLock.Lock() - // recheck condition - resp, ok = CompletedBunchedRequests[br] - if ok && !resp.Timestamp.After(time.Now().Add(-5*time.Minute)) { - delete(CompletedBunchedRequests, br) - } - CompletedBunchLock.Unlock() - } - PendingBunchLock.Lock() + defer PendingBunchLock.Unlock() list, ok := PendingBunchedRequests[br] if ok { list.Lock() AddToSliceB(&list.Members, client, msg.MessageID) list.Unlock() - PendingBunchLock.Unlock() - client.MsgChannelKeepalive.Add(1) return ClientMessage{Command: AsyncResponseCommand}, nil } PendingBunchedRequests[br] = &BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}} - PendingBunchLock.Unlock() - client.MsgChannelKeepalive.Add(1) go func(request BunchedRequest) { resp, err := RequestRemoteDataCached(string(request.Command), request.Param, AuthInfo{}) - PendingBunchLock.Lock() // Prevent new signups var msg ClientMessage if err == nil { - CompletedBunchLock.Lock() // mutex on map - CompletedBunchedRequests[request] = BunchedResponse{Response: resp, Timestamp: time.Now()} - CompletedBunchLock.Unlock() - - msg = SuccessMessageFromString(resp) + msg.Command = SuccessCommand + msg.origArguments = resp + msg.parseOrigArguments() } else { msg.Command = ErrorCommand msg.Arguments = err.Error() } + PendingBunchLock.Lock() bsl := PendingBunchedRequests[request] + delete(PendingBunchedRequests, request) + PendingBunchLock.Unlock() + bsl.Lock() for _, member := range bsl.Members { msg.MessageID = member.MessageID @@ -435,12 +399,8 @@ func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg Cl case member.Client.MessageChannel <- msg: case <-member.Client.MsgChannelIsDone: } - member.Client.MsgChannelKeepalive.Done() } bsl.Unlock() - - delete(PendingBunchedRequests, request) - PendingBunchLock.Unlock() }(br) return ClientMessage{Command: AsyncResponseCommand}, nil