1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-07-04 10:08:31 +00:00

Fix bunching commands and enable

This commit is contained in:
Kane York 2015-11-01 13:17:35 -08:00
parent 7c89ed98e3
commit f9323413aa
4 changed files with 43 additions and 16 deletions

View file

@ -61,11 +61,18 @@ func commandLineConsole() {
if val.Mallocs == 0 { if val.Mallocs == 0 {
continue continue
} }
shell.Print(fmt.Sprintf("%5d: %6d outstanding (%d total)\n", val.Size, val.Mallocs - val.Frees, val.Mallocs)) shell.Print(fmt.Sprintf("%5d: %6d outstanding (%d total)\n", val.Size, val.Mallocs-val.Frees, val.Mallocs))
} }
shell.Println(m.NumGC, "collections occurred") shell.Println(m.NumGC, "collections occurred")
return "", nil return "", nil
}) })
shell.Register("panic", func(args ...string) (string, error) {
go func() {
panic("requested panic")
}()
return "", nil
})
shell.Start() shell.Start()
} }

View file

@ -318,17 +318,19 @@ func DoSendAggregateData() {
type BunchedRequest struct { type BunchedRequest struct {
Command Command Command Command
Param string Param string
} }
func BunchedRequestFromCM(msg *ClientMessage) BunchedRequest { func BunchedRequestFromCM(msg *ClientMessage) BunchedRequest {
return BunchedRequest{Command: msg.Command, Param: msg.origArguments} return BunchedRequest{Command: msg.Command, Param: msg.origArguments}
} }
type BunchedResponse struct { type BunchedResponse struct {
Response string Response string
Timestamp time.Time Timestamp time.Time
} }
type BunchSubscriber struct { type BunchSubscriber struct {
Client *ClientInfo Client *ClientInfo
MessageID int MessageID int
} }
@ -337,17 +339,31 @@ type BunchSubscriberList struct {
Members []BunchSubscriber Members []BunchSubscriber
} }
var PendingBunchedRequests map[BunchedRequest]BunchSubscriberList = make(map[BunchedRequest]BunchSubscriberList) var PendingBunchedRequests map[BunchedRequest]*BunchSubscriberList = make(map[BunchedRequest]*BunchSubscriberList)
var PendingBunchLock sync.RWMutex var PendingBunchLock sync.RWMutex
var CompletedBunchedRequests map[BunchedRequest]BunchedResponse var CompletedBunchedRequests map[BunchedRequest]BunchedResponse = make(map[BunchedRequest]BunchedResponse)
var CompletedBunchLock sync.RWMutex var CompletedBunchLock sync.RWMutex
func bunchingJanitor() {
for {
time.Sleep(5 * time.Minute)
keepIfAfter := time.Now().Add(-5 * time.Minute)
CompletedBunchLock.Lock()
for req, resp := range CompletedBunchedRequests {
if !resp.Timestamp.After(keepIfAfter) {
delete(CompletedBunchedRequests, req)
}
}
CompletedBunchLock.Unlock()
}
}
func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
br := BunchedRequestFromCM(&msg) br := BunchedRequestFromCM(&msg)
CompletedBunchLock.RLock() CompletedBunchLock.RLock()
resp, ok := CompletedBunchedRequests[br] resp, ok := CompletedBunchedRequests[br]
if ok && !resp.Timestamp.After(time.Now().Add(5 * time.Minute)) { if ok && resp.Timestamp.After(time.Now().Add(-5*time.Minute)) {
CompletedBunchLock.RUnlock() CompletedBunchLock.RUnlock()
return SuccessMessageFromString(resp.Response), nil return SuccessMessageFromString(resp.Response), nil
} else if ok { } else if ok {
@ -357,7 +373,7 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl
CompletedBunchLock.Lock() CompletedBunchLock.Lock()
// recheck condition // recheck condition
resp, ok = CompletedBunchedRequests[br] resp, ok = CompletedBunchedRequests[br]
if ok && resp.Timestamp.After(time.Now().Add(5 * time.Minute)) { if ok && !resp.Timestamp.After(time.Now().Add(-5*time.Minute)) {
delete(CompletedBunchedRequests, br) delete(CompletedBunchedRequests, br)
} }
CompletedBunchLock.Unlock() CompletedBunchLock.Unlock()
@ -378,7 +394,7 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl
PendingBunchLock.RUnlock() PendingBunchLock.RUnlock()
return ClientMessage{Command: AsyncResponseCommand}, nil return ClientMessage{Command: AsyncResponseCommand}, nil
} else { } else {
PendingBunchLock.RUnlock() PendingBunchLock.RUnlock()
PendingBunchLock.Lock() PendingBunchLock.Lock()
// RECHECK because someone else might have added it // RECHECK because someone else might have added it
@ -390,7 +406,7 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl
PendingBunchLock.Unlock() PendingBunchLock.Unlock()
return ClientMessage{Command: AsyncResponseCommand}, nil return ClientMessage{Command: AsyncResponseCommand}, nil
} else { } else {
PendingBunchedRequests[br] = BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}} PendingBunchedRequests[br] = &BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}}
needToStart = true needToStart = true
PendingBunchLock.Unlock() PendingBunchLock.Unlock()
} }
@ -402,7 +418,7 @@ func HandleBunchedRemotecommand(conn *websocket.Conn, client *ClientInfo, msg Cl
PendingBunchLock.Lock() // Prevent new signups PendingBunchLock.Lock() // Prevent new signups
var msg ClientMessage var msg ClientMessage
if err != nil { if err == nil {
CompletedBunchLock.Lock() // mutex on map CompletedBunchLock.Lock() // mutex on map
CompletedBunchedRequests[request] = BunchedResponse{Response: resp, Timestamp: time.Now()} CompletedBunchedRequests[request] = BunchedResponse{Response: resp, Timestamp: time.Now()}
CompletedBunchLock.Unlock() CompletedBunchLock.Unlock()

View file

@ -36,8 +36,8 @@ var CommandHandlers = map[Command]CommandHandler{
"survey": HandleSurvey, "survey": HandleSurvey,
"twitch_emote": HandleRemoteCommand, "twitch_emote": HandleRemoteCommand,
"get_link": HandleRemoteCommand, "get_link": HandleBunchedRemotecommand,
"get_display_name": HandleRemoteCommand, "get_display_name": HandleBunchedRemotecommand,
"update_follow_buttons": HandleRemoteCommand, "update_follow_buttons": HandleRemoteCommand,
"chat_history": HandleRemoteCommand, "chat_history": HandleRemoteCommand,
} }
@ -151,6 +151,7 @@ func HandleSocketConnection(conn *websocket.Conn) {
var client ClientInfo var client ClientInfo
client.MessageChannel = _serverMessageChan client.MessageChannel = _serverMessageChan
client.RemoteAddr = conn.RemoteAddr()
// Launch receiver goroutine // Launch receiver goroutine
go func(errorChan chan<- error, clientChan chan<- ClientMessage, stoppedChan <-chan struct{}) { go func(errorChan chan<- error, clientChan chan<- ClientMessage, stoppedChan <-chan struct{}) {
@ -232,7 +233,7 @@ RunLoop:
case smsg := <-serverMessageChan: case smsg := <-serverMessageChan:
SendMessage(conn, smsg) SendMessage(conn, smsg)
case <- time.After(1 * time.Minute): case <-time.After(1 * time.Minute):
client.pingCount++ client.pingCount++
if client.pingCount == 5 { if client.pingCount == 5 {
CloseConnection(conn, &CloseTimedOut) CloseConnection(conn, &CloseTimedOut)
@ -389,8 +390,8 @@ func MarshalClientMessage(clientMessage interface{}) (payloadType int, data []by
// Command handlers should use this to construct responses. // Command handlers should use this to construct responses.
func SuccessMessageFromString(arguments string) ClientMessage { func SuccessMessageFromString(arguments string) ClientMessage {
cm := ClientMessage{ cm := ClientMessage{
MessageID: -1, // filled by the select loop MessageID: -1, // filled by the select loop
Command: SuccessCommand, Command: SuccessCommand,
origArguments: arguments, origArguments: arguments,
} }
cm.parseOrigArguments() cm.parseOrigArguments()

View file

@ -3,6 +3,7 @@ package server
import ( import (
"encoding/json" "encoding/json"
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"net"
"sync" "sync"
"time" "time"
) )
@ -68,6 +69,8 @@ type ClientInfo struct {
// TODO(riking) - does this need to be protected cross-thread? // TODO(riking) - does this need to be protected cross-thread?
AuthInfo AuthInfo
RemoteAddr net.Addr
// Username validation nonce. // Username validation nonce.
ValidationNonce string ValidationNonce string