diff --git a/socketserver/internal/server/commands.go b/socketserver/internal/server/commands.go index 0bb44efa..7d029b82 100644 --- a/socketserver/internal/server/commands.go +++ b/socketserver/internal/server/commands.go @@ -316,6 +316,122 @@ func DoSendAggregateData() { // done } +type BunchedRequest struct { + Command Command + Param string +} +func BunchedRequestFromCM(msg *ClientMessage) BunchedRequest { + return BunchedRequest{Command: msg.Command, Param: msg.origArguments} +} +type BunchedResponse struct { + Response string + Timestamp time.Time +} +type BunchSubscriber struct { + Client *ClientInfo + MessageID int +} + +type BunchSubscriberList struct { + sync.Mutex + Members []BunchSubscriber +} + +var PendingBunchedRequests map[BunchedRequest]BunchSubscriberList = make(map[BunchedRequest]BunchSubscriberList) +var PendingBunchLock sync.RWMutex +var CompletedBunchedRequests map[BunchedRequest]BunchedResponse +var CompletedBunchLock sync.RWMutex + +func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { + br := BunchedRequestFromCM(&msg) + + CompletedBunchLock.RLock() + resp, ok := CompletedBunchedRequests[br] + if ok && !resp.Timestamp.After(time.Now().Add(5 * time.Minute)) { + CompletedBunchLock.RUnlock() + return SuccessMessageFromString(resp.Response), nil + } else if ok { + CompletedBunchLock.RUnlock() + + // Entry expired, let's remove it... + CompletedBunchLock.Lock() + // recheck condition + resp, ok = CompletedBunchedRequests[br] + if ok && resp.Timestamp.After(time.Now().Add(5 * time.Minute)) { + delete(CompletedBunchedRequests, br) + } + CompletedBunchLock.Unlock() + } else { + CompletedBunchLock.RUnlock() + } + + // !!! unlocked on reply + client.MsgChannelKeepalive.RLock() + + PendingBunchLock.RLock() + list, ok := PendingBunchedRequests[br] + var needToStart bool + if ok { + list.Lock() + AddToSliceB(&list.Members, client, msg.MessageID) + list.Unlock() + PendingBunchLock.RUnlock() + + return ClientMessage{Command: AsyncResponseCommand}, nil + } else { + PendingBunchLock.RUnlock() + PendingBunchLock.Lock() + // RECHECK because someone else might have added it + list, ok = PendingBunchedRequests[br] + if ok { + list.Lock() + AddToSliceB(&list.Members, client, msg.MessageID) + list.Unlock() + PendingBunchLock.Unlock() + return ClientMessage{Command: AsyncResponseCommand}, nil + } else { + PendingBunchedRequests[br] = BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}} + needToStart = true + PendingBunchLock.Unlock() + } + } + + if needToStart { + go func(request BunchedRequest) { + resp, err := RequestRemoteDataCached(string(request.Command), request.Param, AuthInfo{}) + + PendingBunchLock.Lock() // Prevent new signups + var msg ClientMessage + if err != nil { + CompletedBunchLock.Lock() // mutex on map + CompletedBunchedRequests[request] = BunchedResponse{Response: resp, Timestamp: time.Now()} + CompletedBunchLock.Unlock() + + msg = SuccessMessageFromString(resp) + } else { + msg.Command = ErrorCommand + msg.Arguments = err.Error() + } + + bsl := PendingBunchedRequests[request] + bsl.Lock() + for _, member := range bsl.Members { + msg.MessageID = member.MessageID + member.Client.MessageChannel <- msg + member.Client.MsgChannelKeepalive.RUnlock() + } + bsl.Unlock() + + delete(PendingBunchedRequests, request) + PendingBunchLock.Unlock() + }(br) + + return ClientMessage{Command: AsyncResponseCommand}, nil + } else { + panic("logic error") + } +} + func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { go func(conn *websocket.Conn, msg ClientMessage, authInfo AuthInfo) { resp, err := RequestRemoteDataCached(string(msg.Command), msg.origArguments, authInfo) @@ -325,9 +441,7 @@ func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMes if err != nil { client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()} } else { - cm := ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp} - cm.parseOrigArguments() - client.MessageChannel <- cm + client.MessageChannel <- SuccessMessageFromString(resp) } } client.MsgChannelKeepalive.RUnlock() diff --git a/socketserver/internal/server/handlecore.go b/socketserver/internal/server/handlecore.go index f22bf842..be28f511 100644 --- a/socketserver/internal/server/handlecore.go +++ b/socketserver/internal/server/handlecore.go @@ -387,12 +387,14 @@ func MarshalClientMessage(clientMessage interface{}) (payloadType int, data []by } // Command handlers should use this to construct responses. -func NewClientMessage(arguments interface{}) ClientMessage { - return ClientMessage{ - MessageID: 0, // filled by the select loop +func SuccessMessageFromString(arguments string) ClientMessage { + cm := ClientMessage{ + MessageID: -1, // filled by the select loop Command: SuccessCommand, - Arguments: arguments, + origArguments: arguments, } + cm.parseOrigArguments() + return cm } // Convenience method: Parse the arguments of the ClientMessage as a single string. diff --git a/socketserver/internal/server/utils.go b/socketserver/internal/server/utils.go index 8dbff0f4..48f8749f 100644 --- a/socketserver/internal/server/utils.go +++ b/socketserver/internal/server/utils.go @@ -159,3 +159,17 @@ func RemoveFromSliceC(ary *[]chan<- ClientMessage, val chan<- ClientMessage) boo *ary = slice return true } + +func AddToSliceB(ary *[]BunchSubscriber, client *ClientInfo, mid int) bool { + newSub := BunchSubscriber{Client: client, MessageID: mid} + slice := *ary + for _, v := range slice { + if v == newSub { + return false + } + } + + slice = append(slice, newSub) + *ary = slice + return true +}