1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-07-28 05:28:30 +00:00

Change MsgChannelKeepalive to a sync.WaitGroup

This commit is contained in:
Kane York 2015-11-02 22:54:53 -08:00
parent 46887cdb5d
commit 013e49e2c5
3 changed files with 19 additions and 26 deletions

View file

@ -86,18 +86,14 @@ func HandleReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (r
client.MakePendingRequests = nil client.MakePendingRequests = nil
client.Mutex.Unlock() client.Mutex.Unlock()
client.MsgChannelKeepalive.Add(1)
go func() { go func() {
client.MsgChannelKeepalive.RLock()
if client.MessageChannel == nil {
return
}
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand} client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand}
SendBacklogForNewClient(client) SendBacklogForNewClient(client)
if disconnectAt != 0 { if disconnectAt != 0 {
SendTimedBacklogMessages(client, time.Unix(disconnectAt, 0)) SendTimedBacklogMessages(client, time.Unix(disconnectAt, 0))
} }
client.MsgChannelKeepalive.RUnlock() client.MsgChannelKeepalive.Done()
}() }()
return ClientMessage{Command: AsyncResponseCommand}, nil return ClientMessage{Command: AsyncResponseCommand}, nil
} }
@ -192,13 +188,13 @@ func GetSubscriptionBacklog(conn *websocket.Conn, client *ClientInfo) {
} }
// Deliver to client // Deliver to client
client.MsgChannelKeepalive.RLock() client.MsgChannelKeepalive.Add(1)
if client.MessageChannel != nil { if client.MessageChannel != nil {
for _, msg := range messages { for _, msg := range messages {
client.MessageChannel <- msg client.MessageChannel <- msg
} }
} }
client.MsgChannelKeepalive.RUnlock() client.MsgChannelKeepalive.Done()
} }
func HandleSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
@ -401,8 +397,7 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl
CompletedBunchLock.RUnlock() CompletedBunchLock.RUnlock()
} }
// !!! unlocked on reply client.MsgChannelKeepalive.Add(1)
client.MsgChannelKeepalive.RLock()
PendingBunchLock.RLock() PendingBunchLock.RLock()
list, ok := PendingBunchedRequests[br] list, ok := PendingBunchedRequests[br]
@ -454,7 +449,7 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl
for _, member := range bsl.Members { for _, member := range bsl.Members {
msg.MessageID = member.MessageID msg.MessageID = member.MessageID
member.Client.MessageChannel <- msg member.Client.MessageChannel <- msg
member.Client.MsgChannelKeepalive.RUnlock() member.Client.MsgChannelKeepalive.Done()
} }
bsl.Unlock() bsl.Unlock()
@ -469,11 +464,10 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl
} }
func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
client.MsgChannelKeepalive.Add(1)
go func(conn *websocket.Conn, msg ClientMessage, authInfo AuthInfo) { go func(conn *websocket.Conn, msg ClientMessage, authInfo AuthInfo) {
resp, err := RequestRemoteDataCached(string(msg.Command), msg.origArguments, authInfo) resp, err := RequestRemoteDataCached(string(msg.Command), msg.origArguments, authInfo)
client.MsgChannelKeepalive.RLock()
if client.MessageChannel != nil {
if err != nil { if err != nil {
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()} client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()}
} else { } else {
@ -481,8 +475,7 @@ func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMes
msg.MessageID = msg.MessageID msg.MessageID = msg.MessageID
client.MessageChannel <- msg client.MessageChannel <- msg
} }
} client.MsgChannelKeepalive.Done()
client.MsgChannelKeepalive.RUnlock()
}(conn, msg, client.AuthInfo) }(conn, msg, client.AuthInfo)
return ClientMessage{Command: AsyncResponseCommand}, nil return ClientMessage{Command: AsyncResponseCommand}, nil

View file

@ -257,11 +257,11 @@ RunLoop:
// Stop getting messages... // Stop getting messages...
UnsubscribeAll(&client) UnsubscribeAll(&client)
client.MsgChannelKeepalive.Lock() // Wait for pending jobs to finish...
client.MsgChannelKeepalive.Wait()
client.MessageChannel = nil client.MessageChannel = nil
client.MsgChannelKeepalive.Unlock()
// And finished. // And done.
// Close the channel so the draining goroutine can finish, too. // Close the channel so the draining goroutine can finish, too.
close(_serverMessageChan) close(_serverMessageChan)

View file

@ -91,8 +91,8 @@ type ClientInfo struct {
// This field will be nil before it is closed. // This field will be nil before it is closed.
MessageChannel chan<- ClientMessage MessageChannel chan<- ClientMessage
// Take a read-lock on this before checking whether MessageChannel is nil. // Take out an Add() on this during a command if you need to use the MessageChannel later.
MsgChannelKeepalive sync.RWMutex MsgChannelKeepalive sync.WaitGroup
// The number of pings sent without a response // The number of pings sent without a response
pingCount int pingCount int