1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-07-04 01:58:31 +00:00

Refactor bunched remote command

This commit is contained in:
Kane York 2015-11-02 22:59:38 -08:00
parent 013e49e2c5
commit db486e4eba
2 changed files with 40 additions and 60 deletions

View file

@ -356,7 +356,7 @@ type BunchSubscriberList struct {
} }
var PendingBunchedRequests map[BunchedRequest]*BunchSubscriberList = make(map[BunchedRequest]*BunchSubscriberList) var PendingBunchedRequests map[BunchedRequest]*BunchSubscriberList = make(map[BunchedRequest]*BunchSubscriberList)
var PendingBunchLock sync.RWMutex var PendingBunchLock sync.Mutex
var CompletedBunchedRequests map[BunchedRequest]BunchedResponse = make(map[BunchedRequest]BunchedResponse) var CompletedBunchedRequests map[BunchedRequest]BunchedResponse = make(map[BunchedRequest]BunchedResponse)
var CompletedBunchLock sync.RWMutex var CompletedBunchLock sync.RWMutex
@ -374,16 +374,16 @@ func bunchingJanitor() {
} }
} }
func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
br := BunchedRequestFromCM(&msg) br := BunchedRequestFromCM(&msg)
CompletedBunchLock.RLock() CompletedBunchLock.RLock()
resp, ok := CompletedBunchedRequests[br] resp, ok := CompletedBunchedRequests[br]
CompletedBunchLock.RUnlock()
if ok && resp.Timestamp.After(time.Now().Add(-5*time.Minute)) { if ok && resp.Timestamp.After(time.Now().Add(-5*time.Minute)) {
CompletedBunchLock.RUnlock()
return SuccessMessageFromString(resp.Response), nil return SuccessMessageFromString(resp.Response), nil
} else if ok { } else if ok {
CompletedBunchLock.RUnlock()
// Entry expired, let's remove it... // Entry expired, let's remove it...
CompletedBunchLock.Lock() CompletedBunchLock.Lock()
@ -393,74 +393,54 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl
delete(CompletedBunchedRequests, br) delete(CompletedBunchedRequests, br)
} }
CompletedBunchLock.Unlock() CompletedBunchLock.Unlock()
} else {
CompletedBunchLock.RUnlock()
} }
client.MsgChannelKeepalive.Add(1) PendingBunchLock.Lock()
PendingBunchLock.RLock()
list, ok := PendingBunchedRequests[br] list, ok := PendingBunchedRequests[br]
var needToStart bool
if ok { if ok {
list.Lock() list.Lock()
AddToSliceB(&list.Members, client, msg.MessageID) AddToSliceB(&list.Members, client, msg.MessageID)
list.Unlock() list.Unlock()
PendingBunchLock.RUnlock() PendingBunchLock.Unlock()
client.MsgChannelKeepalive.Add(1)
return ClientMessage{Command: AsyncResponseCommand}, nil return ClientMessage{Command: AsyncResponseCommand}, nil
} else { }
PendingBunchLock.RUnlock()
PendingBunchLock.Lock() PendingBunchedRequests[br] = &BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}}
// RECHECK because someone else might have added it PendingBunchLock.Unlock()
list, ok = PendingBunchedRequests[br] client.MsgChannelKeepalive.Add(1)
if ok {
list.Lock() go func(request BunchedRequest) {
AddToSliceB(&list.Members, client, msg.MessageID) resp, err := RequestRemoteDataCached(string(request.Command), request.Param, AuthInfo{})
list.Unlock()
PendingBunchLock.Unlock() PendingBunchLock.Lock() // Prevent new signups
return ClientMessage{Command: AsyncResponseCommand}, nil var msg ClientMessage
if err == nil {
CompletedBunchLock.Lock() // mutex on map
CompletedBunchedRequests[request] = BunchedResponse{Response: resp, Timestamp: time.Now()}
CompletedBunchLock.Unlock()
msg = SuccessMessageFromString(resp)
} else { } else {
PendingBunchedRequests[br] = &BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}} msg.Command = ErrorCommand
needToStart = true msg.Arguments = err.Error()
PendingBunchLock.Unlock()
} }
}
if needToStart { bsl := PendingBunchedRequests[request]
go func(request BunchedRequest) { bsl.Lock()
resp, err := RequestRemoteDataCached(string(request.Command), request.Param, AuthInfo{}) for _, member := range bsl.Members {
msg.MessageID = member.MessageID
member.Client.MessageChannel <- msg
member.Client.MsgChannelKeepalive.Done()
}
bsl.Unlock()
PendingBunchLock.Lock() // Prevent new signups delete(PendingBunchedRequests, request)
var msg ClientMessage PendingBunchLock.Unlock()
if err == nil { }(br)
CompletedBunchLock.Lock() // mutex on map
CompletedBunchedRequests[request] = BunchedResponse{Response: resp, Timestamp: time.Now()}
CompletedBunchLock.Unlock()
msg = SuccessMessageFromString(resp) return ClientMessage{Command: AsyncResponseCommand}, nil
} else {
msg.Command = ErrorCommand
msg.Arguments = err.Error()
}
bsl := PendingBunchedRequests[request]
bsl.Lock()
for _, member := range bsl.Members {
msg.MessageID = member.MessageID
member.Client.MessageChannel <- msg
member.Client.MsgChannelKeepalive.Done()
}
bsl.Unlock()
delete(PendingBunchedRequests, request)
PendingBunchLock.Unlock()
}(br)
return ClientMessage{Command: AsyncResponseCommand}, nil
} else {
panic("logic error")
}
} }
func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {

View file

@ -36,8 +36,8 @@ var CommandHandlers = map[Command]CommandHandler{
"survey": HandleSurvey, "survey": HandleSurvey,
"twitch_emote": HandleRemoteCommand, "twitch_emote": HandleRemoteCommand,
"get_link": HandleBunchedRemotecommand, "get_link": HandleBunchedRemoteCommand,
"get_display_name": HandleBunchedRemotecommand, "get_display_name": HandleBunchedRemoteCommand,
"update_follow_buttons": HandleRemoteCommand, "update_follow_buttons": HandleRemoteCommand,
"chat_history": HandleRemoteCommand, "chat_history": HandleRemoteCommand,
} }