mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-07-03 17:48:30 +00:00
Add bunching get_link, without enabling it
This commit is contained in:
parent
d6f5b28ef5
commit
40e26b5535
3 changed files with 137 additions and 7 deletions
|
@ -316,6 +316,122 @@ func DoSendAggregateData() {
|
||||||
// done
|
// done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type BunchedRequest struct {
|
||||||
|
Command Command
|
||||||
|
Param string
|
||||||
|
}
|
||||||
|
func BunchedRequestFromCM(msg *ClientMessage) BunchedRequest {
|
||||||
|
return BunchedRequest{Command: msg.Command, Param: msg.origArguments}
|
||||||
|
}
|
||||||
|
type BunchedResponse struct {
|
||||||
|
Response string
|
||||||
|
Timestamp time.Time
|
||||||
|
}
|
||||||
|
type BunchSubscriber struct {
|
||||||
|
Client *ClientInfo
|
||||||
|
MessageID int
|
||||||
|
}
|
||||||
|
|
||||||
|
type BunchSubscriberList struct {
|
||||||
|
sync.Mutex
|
||||||
|
Members []BunchSubscriber
|
||||||
|
}
|
||||||
|
|
||||||
|
var PendingBunchedRequests map[BunchedRequest]BunchSubscriberList = make(map[BunchedRequest]BunchSubscriberList)
|
||||||
|
var PendingBunchLock sync.RWMutex
|
||||||
|
var CompletedBunchedRequests map[BunchedRequest]BunchedResponse
|
||||||
|
var CompletedBunchLock sync.RWMutex
|
||||||
|
|
||||||
|
func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||||
|
br := BunchedRequestFromCM(&msg)
|
||||||
|
|
||||||
|
CompletedBunchLock.RLock()
|
||||||
|
resp, ok := CompletedBunchedRequests[br]
|
||||||
|
if ok && !resp.Timestamp.After(time.Now().Add(5 * time.Minute)) {
|
||||||
|
CompletedBunchLock.RUnlock()
|
||||||
|
return SuccessMessageFromString(resp.Response), nil
|
||||||
|
} else if ok {
|
||||||
|
CompletedBunchLock.RUnlock()
|
||||||
|
|
||||||
|
// Entry expired, let's remove it...
|
||||||
|
CompletedBunchLock.Lock()
|
||||||
|
// recheck condition
|
||||||
|
resp, ok = CompletedBunchedRequests[br]
|
||||||
|
if ok && resp.Timestamp.After(time.Now().Add(5 * time.Minute)) {
|
||||||
|
delete(CompletedBunchedRequests, br)
|
||||||
|
}
|
||||||
|
CompletedBunchLock.Unlock()
|
||||||
|
} else {
|
||||||
|
CompletedBunchLock.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// !!! unlocked on reply
|
||||||
|
client.MsgChannelKeepalive.RLock()
|
||||||
|
|
||||||
|
PendingBunchLock.RLock()
|
||||||
|
list, ok := PendingBunchedRequests[br]
|
||||||
|
var needToStart bool
|
||||||
|
if ok {
|
||||||
|
list.Lock()
|
||||||
|
AddToSliceB(&list.Members, client, msg.MessageID)
|
||||||
|
list.Unlock()
|
||||||
|
PendingBunchLock.RUnlock()
|
||||||
|
|
||||||
|
return ClientMessage{Command: AsyncResponseCommand}, nil
|
||||||
|
} else {
|
||||||
|
PendingBunchLock.RUnlock()
|
||||||
|
PendingBunchLock.Lock()
|
||||||
|
// RECHECK because someone else might have added it
|
||||||
|
list, ok = PendingBunchedRequests[br]
|
||||||
|
if ok {
|
||||||
|
list.Lock()
|
||||||
|
AddToSliceB(&list.Members, client, msg.MessageID)
|
||||||
|
list.Unlock()
|
||||||
|
PendingBunchLock.Unlock()
|
||||||
|
return ClientMessage{Command: AsyncResponseCommand}, nil
|
||||||
|
} else {
|
||||||
|
PendingBunchedRequests[br] = BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}}
|
||||||
|
needToStart = true
|
||||||
|
PendingBunchLock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if needToStart {
|
||||||
|
go func(request BunchedRequest) {
|
||||||
|
resp, err := RequestRemoteDataCached(string(request.Command), request.Param, AuthInfo{})
|
||||||
|
|
||||||
|
PendingBunchLock.Lock() // Prevent new signups
|
||||||
|
var msg ClientMessage
|
||||||
|
if err != nil {
|
||||||
|
CompletedBunchLock.Lock() // mutex on map
|
||||||
|
CompletedBunchedRequests[request] = BunchedResponse{Response: resp, Timestamp: time.Now()}
|
||||||
|
CompletedBunchLock.Unlock()
|
||||||
|
|
||||||
|
msg = SuccessMessageFromString(resp)
|
||||||
|
} else {
|
||||||
|
msg.Command = ErrorCommand
|
||||||
|
msg.Arguments = err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
bsl := PendingBunchedRequests[request]
|
||||||
|
bsl.Lock()
|
||||||
|
for _, member := range bsl.Members {
|
||||||
|
msg.MessageID = member.MessageID
|
||||||
|
member.Client.MessageChannel <- msg
|
||||||
|
member.Client.MsgChannelKeepalive.RUnlock()
|
||||||
|
}
|
||||||
|
bsl.Unlock()
|
||||||
|
|
||||||
|
delete(PendingBunchedRequests, request)
|
||||||
|
PendingBunchLock.Unlock()
|
||||||
|
}(br)
|
||||||
|
|
||||||
|
return ClientMessage{Command: AsyncResponseCommand}, nil
|
||||||
|
} else {
|
||||||
|
panic("logic error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||||
go func(conn *websocket.Conn, msg ClientMessage, authInfo AuthInfo) {
|
go func(conn *websocket.Conn, msg ClientMessage, authInfo AuthInfo) {
|
||||||
resp, err := RequestRemoteDataCached(string(msg.Command), msg.origArguments, authInfo)
|
resp, err := RequestRemoteDataCached(string(msg.Command), msg.origArguments, authInfo)
|
||||||
|
@ -325,9 +441,7 @@ func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMes
|
||||||
if err != nil {
|
if err != nil {
|
||||||
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()}
|
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()}
|
||||||
} else {
|
} else {
|
||||||
cm := ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp}
|
client.MessageChannel <- SuccessMessageFromString(resp)
|
||||||
cm.parseOrigArguments()
|
|
||||||
client.MessageChannel <- cm
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
client.MsgChannelKeepalive.RUnlock()
|
client.MsgChannelKeepalive.RUnlock()
|
||||||
|
|
|
@ -387,12 +387,14 @@ func MarshalClientMessage(clientMessage interface{}) (payloadType int, data []by
|
||||||
}
|
}
|
||||||
|
|
||||||
// Command handlers should use this to construct responses.
|
// Command handlers should use this to construct responses.
|
||||||
func NewClientMessage(arguments interface{}) ClientMessage {
|
func SuccessMessageFromString(arguments string) ClientMessage {
|
||||||
return ClientMessage{
|
cm := ClientMessage{
|
||||||
MessageID: 0, // filled by the select loop
|
MessageID: -1, // filled by the select loop
|
||||||
Command: SuccessCommand,
|
Command: SuccessCommand,
|
||||||
Arguments: arguments,
|
origArguments: arguments,
|
||||||
}
|
}
|
||||||
|
cm.parseOrigArguments()
|
||||||
|
return cm
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convenience method: Parse the arguments of the ClientMessage as a single string.
|
// Convenience method: Parse the arguments of the ClientMessage as a single string.
|
||||||
|
|
|
@ -159,3 +159,17 @@ func RemoveFromSliceC(ary *[]chan<- ClientMessage, val chan<- ClientMessage) boo
|
||||||
*ary = slice
|
*ary = slice
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func AddToSliceB(ary *[]BunchSubscriber, client *ClientInfo, mid int) bool {
|
||||||
|
newSub := BunchSubscriber{Client: client, MessageID: mid}
|
||||||
|
slice := *ary
|
||||||
|
for _, v := range slice {
|
||||||
|
if v == newSub {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
slice = append(slice, newSub)
|
||||||
|
*ary = slice
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue