diff --git a/socketserver/cmd/ffzsocketserver/console.go b/socketserver/cmd/ffzsocketserver/console.go index a6638d39..a82904bd 100644 --- a/socketserver/cmd/ffzsocketserver/console.go +++ b/socketserver/cmd/ffzsocketserver/console.go @@ -61,11 +61,18 @@ func commandLineConsole() { if val.Mallocs == 0 { continue } - shell.Print(fmt.Sprintf("%5d: %6d outstanding (%d total)\n", val.Size, val.Mallocs - val.Frees, val.Mallocs)) + shell.Print(fmt.Sprintf("%5d: %6d outstanding (%d total)\n", val.Size, val.Mallocs-val.Frees, val.Mallocs)) } shell.Println(m.NumGC, "collections occurred") return "", nil }) + shell.Register("panic", func(args ...string) (string, error) { + go func() { + panic("requested panic") + }() + return "", nil + }) + shell.Start() } diff --git a/socketserver/internal/server/commands.go b/socketserver/internal/server/commands.go index f1f0cf9f..8d157722 100644 --- a/socketserver/internal/server/commands.go +++ b/socketserver/internal/server/commands.go @@ -318,17 +318,19 @@ func DoSendAggregateData() { type BunchedRequest struct { Command Command - Param string + Param string } + func BunchedRequestFromCM(msg *ClientMessage) BunchedRequest { return BunchedRequest{Command: msg.Command, Param: msg.origArguments} } + type BunchedResponse struct { - Response string + Response string Timestamp time.Time } type BunchSubscriber struct { - Client *ClientInfo + Client *ClientInfo MessageID int } @@ -337,17 +339,31 @@ type BunchSubscriberList struct { Members []BunchSubscriber } -var PendingBunchedRequests map[BunchedRequest]BunchSubscriberList = make(map[BunchedRequest]BunchSubscriberList) +var PendingBunchedRequests map[BunchedRequest]*BunchSubscriberList = make(map[BunchedRequest]*BunchSubscriberList) var PendingBunchLock sync.RWMutex -var CompletedBunchedRequests map[BunchedRequest]BunchedResponse +var CompletedBunchedRequests map[BunchedRequest]BunchedResponse = make(map[BunchedRequest]BunchedResponse) var CompletedBunchLock sync.RWMutex +func bunchingJanitor() { + for { + time.Sleep(5 * time.Minute) + keepIfAfter := time.Now().Add(-5 * time.Minute) + CompletedBunchLock.Lock() + for req, resp := range CompletedBunchedRequests { + if !resp.Timestamp.After(keepIfAfter) { + delete(CompletedBunchedRequests, req) + } + } + CompletedBunchLock.Unlock() + } +} + func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { br := BunchedRequestFromCM(&msg) CompletedBunchLock.RLock() resp, ok := CompletedBunchedRequests[br] - if ok && !resp.Timestamp.After(time.Now().Add(5 * time.Minute)) { + if ok && resp.Timestamp.After(time.Now().Add(-5*time.Minute)) { CompletedBunchLock.RUnlock() return SuccessMessageFromString(resp.Response), nil } else if ok { @@ -357,7 +373,7 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl CompletedBunchLock.Lock() // recheck condition resp, ok = CompletedBunchedRequests[br] - if ok && resp.Timestamp.After(time.Now().Add(5 * time.Minute)) { + if ok && !resp.Timestamp.After(time.Now().Add(-5*time.Minute)) { delete(CompletedBunchedRequests, br) } CompletedBunchLock.Unlock() @@ -378,7 +394,7 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl PendingBunchLock.RUnlock() return ClientMessage{Command: AsyncResponseCommand}, nil - } else { + } else { PendingBunchLock.RUnlock() PendingBunchLock.Lock() // RECHECK because someone else might have added it @@ -390,7 +406,7 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl PendingBunchLock.Unlock() return ClientMessage{Command: AsyncResponseCommand}, nil } else { - PendingBunchedRequests[br] = BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}} + PendingBunchedRequests[br] = &BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}} needToStart = true PendingBunchLock.Unlock() } @@ -402,7 +418,7 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl PendingBunchLock.Lock() // Prevent new signups var msg ClientMessage - if err != nil { + if err == nil { CompletedBunchLock.Lock() // mutex on map CompletedBunchedRequests[request] = BunchedResponse{Response: resp, Timestamp: time.Now()} CompletedBunchLock.Unlock() diff --git a/socketserver/internal/server/handlecore.go b/socketserver/internal/server/handlecore.go index 9fd110f1..06ae7a93 100644 --- a/socketserver/internal/server/handlecore.go +++ b/socketserver/internal/server/handlecore.go @@ -36,8 +36,8 @@ var CommandHandlers = map[Command]CommandHandler{ "survey": HandleSurvey, "twitch_emote": HandleRemoteCommand, - "get_link": HandleRemoteCommand, - "get_display_name": HandleRemoteCommand, + "get_link": HandleBunchedRemotecommand, + "get_display_name": HandleBunchedRemotecommand, "update_follow_buttons": HandleRemoteCommand, "chat_history": HandleRemoteCommand, } @@ -151,6 +151,7 @@ func HandleSocketConnection(conn *websocket.Conn) { var client ClientInfo client.MessageChannel = _serverMessageChan + client.RemoteAddr = conn.RemoteAddr() // Launch receiver goroutine go func(errorChan chan<- error, clientChan chan<- ClientMessage, stoppedChan <-chan struct{}) { @@ -232,7 +233,7 @@ RunLoop: case smsg := <-serverMessageChan: SendMessage(conn, smsg) - case <- time.After(1 * time.Minute): + case <-time.After(1 * time.Minute): client.pingCount++ if client.pingCount == 5 { CloseConnection(conn, &CloseTimedOut) @@ -389,8 +390,8 @@ func MarshalClientMessage(clientMessage interface{}) (payloadType int, data []by // Command handlers should use this to construct responses. func SuccessMessageFromString(arguments string) ClientMessage { cm := ClientMessage{ - MessageID: -1, // filled by the select loop - Command: SuccessCommand, + MessageID: -1, // filled by the select loop + Command: SuccessCommand, origArguments: arguments, } cm.parseOrigArguments() diff --git a/socketserver/internal/server/types.go b/socketserver/internal/server/types.go index 0fd79a39..1420401d 100644 --- a/socketserver/internal/server/types.go +++ b/socketserver/internal/server/types.go @@ -3,6 +3,7 @@ package server import ( "encoding/json" "github.com/satori/go.uuid" + "net" "sync" "time" ) @@ -68,6 +69,8 @@ type ClientInfo struct { // TODO(riking) - does this need to be protected cross-thread? AuthInfo + RemoteAddr net.Addr + // Username validation nonce. ValidationNonce string