diff --git a/socketserver/cmd/ffzsocketserver/console.go b/socketserver/cmd/ffzsocketserver/console.go index 88cc4b44..7da8e6fa 100644 --- a/socketserver/cmd/ffzsocketserver/console.go +++ b/socketserver/cmd/ffzsocketserver/console.go @@ -1,7 +1,7 @@ package main import ( - "../../internal/server" + "bitbucket.org/stendec/frankerfacez/socketserver/internal/server" "fmt" "github.com/abiosoft/ishell" "github.com/gorilla/websocket" diff --git a/socketserver/cmd/ffzsocketserver/socketserver.go b/socketserver/cmd/ffzsocketserver/socketserver.go index c280218f..67f982c5 100644 --- a/socketserver/cmd/ffzsocketserver/socketserver.go +++ b/socketserver/cmd/ffzsocketserver/socketserver.go @@ -1,7 +1,7 @@ -package main +package main // import "bitbucket.org/stendec/frankerfacez/socketserver/cmd/ffzsocketserver" import ( - "../../internal/server" + "bitbucket.org/stendec/frankerfacez/socketserver/internal/server" "encoding/json" "flag" "fmt" diff --git a/socketserver/internal/server/backend.go b/socketserver/internal/server/backend.go index d4cc5bf2..c1241752 100644 --- a/socketserver/internal/server/backend.go +++ b/socketserver/internal/server/backend.go @@ -190,6 +190,7 @@ func SendRemoteCommand(remoteCommand, data string, auth AuthInfo) (responseStr s return } +// SendAggregatedData sends aggregated emote usage and following data to the backend server. func SendAggregatedData(sealedForm url.Values) error { resp, err := backendHTTPClient.PostForm(postStatisticsURL, sealedForm) if err != nil { @@ -203,6 +204,8 @@ func SendAggregatedData(sealedForm url.Values) error { return resp.Body.Close() } +// FetchBacklogData makes a request to the backend for backlog data on a set of pub/sub topics. +// TODO scrap this, replaced by /cached_pub func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) { formData := url.Values{ "subs": chatSubs, @@ -247,10 +250,18 @@ func (noe ErrBackendNotOK) Error() string { return fmt.Sprintf("backend returned %d: %s", noe.Code, noe.Response) } +// SendNewTopicNotice notifies the backend that a client has performed the first subscription to a pub/sub topic. +// POST data: +// channels=room.trihex +// added=t func SendNewTopicNotice(topic string) error { return sendTopicNotice(topic, true) } +// SendCleanupTopicsNotice notifies the backend that pub/sub topics have no subscribers anymore. +// POST data: +// channels=room.sirstendec,room.bobross,feature.foo +// added=f func SendCleanupTopicsNotice(topics []string) error { return sendTopicNotice(strings.Join(topics, ","), false) } @@ -292,6 +303,7 @@ func httpError(statusCode int) error { return fmt.Errorf("backend http error: %d", statusCode) } +// GenerateKeys generates a new NaCl keypair for the server and writes out the default configuration file. func GenerateKeys(outputFile, serverID, theirPublicStr string) { var err error output := ConfigFile{ diff --git a/socketserver/internal/server/commands.go b/socketserver/internal/server/commands.go index 2f26b72e..8552f768 100644 --- a/socketserver/internal/server/commands.go +++ b/socketserver/internal/server/commands.go @@ -3,6 +3,7 @@ package server import ( "encoding/json" "errors" + "fmt" "github.com/gorilla/websocket" "github.com/satori/go.uuid" "log" @@ -20,33 +21,34 @@ type Command string type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) var commandHandlers = map[Command]CommandHandler{ - HelloCommand: HandleHello, - "setuser": HandleSetUser, - "ready": HandleReady, + HelloCommand: C2SHello, + "setuser": C2SSetUser, + "ready": C2SReady, - "sub": HandleSub, - "unsub": HandleUnsub, + "sub": C2SSubscribe, + "unsub": C2SUnsubscribe, - "track_follow": HandleTrackFollow, - "emoticon_uses": HandleEmoticonUses, - "survey": HandleSurvey, + "track_follow": C2STrackFollow, + "emoticon_uses": C2SEmoticonUses, + "survey": C2SSurvey, - "twitch_emote": HandleRemoteCommand, - "get_link": HandleBunchedRemoteCommand, - "get_display_name": HandleBunchedRemoteCommand, - "update_follow_buttons": HandleRemoteCommand, - "chat_history": HandleRemoteCommand, + "twitch_emote": C2SHandleRemoteCommand, + "get_link": C2SHandleBunchedCommand, + "get_display_name": C2SHandleBunchedCommand, + "update_follow_buttons": C2SHandleRemoteCommand, + "chat_history": C2SHandleRemoteCommand, } -const ChannelInfoDelay = 2 * time.Second - -func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) { +// DispatchC2SCommand handles a C2S Command in the provided ClientMessage. +// It calls the correct CommandHandler function, catching panics. +// It sends either the returned Reply ClientMessage, setting the correct messageID, or sends an ErrorCommand +func DispatchC2SCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) { handler, ok := commandHandlers[msg.Command] if !ok { - handler = HandleRemoteCommand + handler = C2SHandleRemoteCommand } - response, err := CallHandler(handler, conn, client, msg) + response, err := callHandler(handler, conn, client, msg) if err == nil { if response.Command == AsyncResponseCommand { @@ -59,13 +61,29 @@ func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) } else { SendMessage(conn, ClientMessage{ MessageID: msg.MessageID, - Command: "error", + Command: ErrorCommand, Arguments: err.Error(), }) } } -func HandleHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func callHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) { + defer func() { + if r := recover(); r != nil { + var ok bool + fmt.Print("[!] Error executing command", cmsg.Command, "--", r) + err, ok = r.(error) + if !ok { + err = fmt.Errorf("command handler: %v", r) + } + } + }() + return handler(conn, client, cmsg) +} + +// C2SHello implements the `hello` C2S Command. +// It calls SubscribeGlobal() and SubscribeDefaults() with the client, and fills out ClientInfo.Version and ClientInfo.ClientID. +func C2SHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { version, clientID, err := msg.ArgumentsAsTwoStrings() if err != nil { return @@ -85,7 +103,7 @@ func HandleHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (r }, nil } -func HandleReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func C2SReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { disconnectAt, err := msg.ArgumentsAsInt() if err != nil { return @@ -115,7 +133,7 @@ func HandleReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (r return ClientMessage{Command: AsyncResponseCommand}, nil } -func HandleSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func C2SSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { username, err := msg.ArgumentsAsString() if err != nil { return @@ -137,7 +155,7 @@ func HandleSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) return ResponseSuccess, nil } -func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func C2SSubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { channel, err := msg.ArgumentsAsString() if err != nil { @@ -145,18 +163,10 @@ func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rms } client.Mutex.Lock() - AddToSliceS(&client.CurrentChannels, channel) - client.PendingSubscriptionsBacklog = append(client.PendingSubscriptionsBacklog, channel) - - // if client.MakePendingRequests == nil { - // client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client)) - // } else { - // if !client.MakePendingRequests.Reset(ChannelInfoDelay) { - // client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client)) - // } - // } - + if usePendingSubscrptionsBacklog { + client.PendingSubscriptionsBacklog = append(client.PendingSubscriptionsBacklog, channel) + } client.Mutex.Unlock() SubscribeChannel(client, channel) @@ -164,7 +174,9 @@ func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rms return ResponseSuccess, nil } -func HandleUnsub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +// C2SUnsubscribe implements the `unsub` C2S Command. +// It removes the channel from ClientInfo.CurrentChannels and calls UnsubscribeSingleChat. +func C2SUnsubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { channel, err := msg.ArgumentsAsString() if err != nil { @@ -180,91 +192,57 @@ func HandleUnsub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (r return ResponseSuccess, nil } -func GetSubscriptionBacklogFor(conn *websocket.Conn, client *ClientInfo) func() { - return func() { - GetSubscriptionBacklog(conn, client) - } -} - -// On goroutine -func GetSubscriptionBacklog(conn *websocket.Conn, client *ClientInfo) { - var subs []string - - // Lock, grab the data, and reset it - client.Mutex.Lock() - subs = client.PendingSubscriptionsBacklog - client.PendingSubscriptionsBacklog = nil - client.MakePendingRequests = nil - client.Mutex.Unlock() - - if len(subs) == 0 { - return - } - - if backendURL == "" { - return // for testing runs - } - messages, err := FetchBacklogData(subs) - - if err != nil { - // Oh well. - log.Print("error in GetSubscriptionBacklog:", err) - return - } - - // Deliver to client - client.MsgChannelKeepalive.Add(1) - if client.MessageChannel != nil { - for _, msg := range messages { - client.MessageChannel <- msg - } - } - client.MsgChannelKeepalive.Done() -} - -func HandleSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +// C2SSurvey implements the survey C2S Command. +// Surveys are discarded.s +func C2SSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { // Discard return ResponseSuccess, nil } -type FollowEvent struct { +type followEvent struct { User string `json:"u"` Channel string `json:"c"` NowFollowing bool `json:"f"` Timestamp time.Time `json:"t"` } -var FollowEvents []FollowEvent -var FollowEventsLock sync.Mutex +var followEvents []followEvent -func HandleTrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +// followEventsLock is the lock for followEvents. +var followEventsLock sync.Mutex + +// C2STrackFollow implements the `track_follow` C2S Command. +// It adds the record to `followEvents`, which is submitted to the backend on a timer. +func C2STrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { channel, following, err := msg.ArgumentsAsStringAndBool() if err != nil { return } now := time.Now() - FollowEventsLock.Lock() - FollowEvents = append(FollowEvents, FollowEvent{client.TwitchUsername, channel, following, now}) - FollowEventsLock.Unlock() + followEventsLock.Lock() + followEvents = append(followEvents, followEvent{client.TwitchUsername, channel, following, now}) + followEventsLock.Unlock() return ResponseSuccess, nil } // AggregateEmoteUsage is a map from emoteID to a map from chatroom name to usage count. -var AggregateEmoteUsage map[int]map[string]int = make(map[int]map[string]int) +var aggregateEmoteUsage = make(map[int]map[string]int) // AggregateEmoteUsageLock is the lock for AggregateEmoteUsage. -var AggregateEmoteUsageLock sync.Mutex +var aggregateEmoteUsageLock sync.Mutex // ErrNegativeEmoteUsage is emitted when the submitted emote usage is negative. var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative") -func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { - // arguments is [1]map[emoteID]map[ChatroomName]float64 - +// C2SEmoticonUses implements the `emoticon_uses` C2S Command. +// msg.Arguments are in the JSON format of [1]map[emoteID]map[ChatroomName]float64. +func C2SEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { + // if this panics, will be caught by callHandler mapRoot := msg.Arguments.([]interface{})[0].(map[string]interface{}) + // Validate: male suire for strEmote, val1 := range mapRoot { _, err = strconv.Atoi(strEmote) if err != nil { @@ -272,7 +250,7 @@ func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMess } mapInner := val1.(map[string]interface{}) for _, val2 := range mapInner { - var count int = int(val2.(float64)) + var count = int(val2.(float64)) if count <= 0 { err = ErrNegativeEmoteUsage return @@ -280,8 +258,8 @@ func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMess } } - AggregateEmoteUsageLock.Lock() - defer AggregateEmoteUsageLock.Unlock() + aggregateEmoteUsageLock.Lock() + defer aggregateEmoteUsageLock.Unlock() for strEmote, val1 := range mapRoot { var emoteID int @@ -290,15 +268,15 @@ func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMess return } - destMapInner, ok := AggregateEmoteUsage[emoteID] + destMapInner, ok := aggregateEmoteUsage[emoteID] if !ok { destMapInner = make(map[string]int) - AggregateEmoteUsage[emoteID] = destMapInner + aggregateEmoteUsage[emoteID] = destMapInner } mapInner := val1.(map[string]interface{}) for roomName, val2 := range mapInner { - var count int = int(val2.(float64)) + var count = int(val2.(float64)) if count > 200 { count = 200 } @@ -309,22 +287,22 @@ func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMess return ResponseSuccess, nil } -func sendAggregateData() { +func aggregateDataSender() { for { time.Sleep(5 * time.Minute) - DoSendAggregateData() + doSendAggregateData() } } -func DoSendAggregateData() { - FollowEventsLock.Lock() - follows := FollowEvents - FollowEvents = nil - FollowEventsLock.Unlock() - AggregateEmoteUsageLock.Lock() - emoteUsage := AggregateEmoteUsage - AggregateEmoteUsage = make(map[int]map[string]int) - AggregateEmoteUsageLock.Unlock() +func doSendAggregateData() { + followEventsLock.Lock() + follows := followEvents + followEvents = nil + followEventsLock.Unlock() + aggregateEmoteUsageLock.Lock() + emoteUsage := aggregateEmoteUsage + aggregateEmoteUsage = make(map[int]map[string]int) + aggregateEmoteUsageLock.Unlock() reportForm := url.Values{} @@ -362,27 +340,23 @@ func DoSendAggregateData() { // done } -type BunchedRequest struct { +type bunchedRequest struct { Command Command Param string } -func BunchedRequestFromCM(msg *ClientMessage) BunchedRequest { - return BunchedRequest{Command: msg.Command, Param: msg.origArguments} -} - -type CachedBunchedResponse struct { +type cachedBunchedResponse struct { Response string Timestamp time.Time } -type BunchSubscriber struct { +type bunchSubscriber struct { Client *ClientInfo MessageID int } -type BunchSubscriberList struct { +type bunchSubscriberList struct { sync.Mutex - Members []BunchSubscriber + Members []bunchSubscriber } type CacheStatus byte @@ -393,50 +367,57 @@ const ( CacheStatusExpired ) -var PendingBunchedRequests map[BunchedRequest]*BunchSubscriberList = make(map[BunchedRequest]*BunchSubscriberList) -var PendingBunchLock sync.Mutex -var BunchCache map[BunchedRequest]CachedBunchedResponse = make(map[BunchedRequest]CachedBunchedResponse) -var BunchCacheLock sync.RWMutex -var BunchCacheCleanupSignal *sync.Cond = sync.NewCond(&BunchCacheLock) -var BunchCacheLastCleanup time.Time +var pendingBunchedRequests = make(map[bunchedRequest]*bunchSubscriberList) +var pendingBunchLock sync.Mutex +var bunchCache = make(map[bunchedRequest]cachedBunchedResponse) +var bunchCacheLock sync.RWMutex +var bunchCacheCleanupSignal = sync.NewCond(&bunchCacheLock) +var bunchCacheLastCleanup time.Time + +func bunchedRequestFromCM(msg *ClientMessage) bunchedRequest { + return bunchedRequest{Command: msg.Command, Param: msg.origArguments} +} func bunchCacheJanitor() { go func() { for { time.Sleep(30 * time.Minute) - BunchCacheCleanupSignal.Signal() + bunchCacheCleanupSignal.Signal() } }() - BunchCacheLock.Lock() + bunchCacheLock.Lock() for { // Unlocks CachedBunchLock, waits for signal, re-locks - BunchCacheCleanupSignal.Wait() + bunchCacheCleanupSignal.Wait() - if BunchCacheLastCleanup.After(time.Now().Add(-1 * time.Second)) { + if bunchCacheLastCleanup.After(time.Now().Add(-1 * time.Second)) { // skip if it's been less than 1 second continue } // CachedBunchLock is held here keepIfAfter := time.Now().Add(-5 * time.Minute) - for req, resp := range BunchCache { + for req, resp := range bunchCache { if !resp.Timestamp.After(keepIfAfter) { - delete(BunchCache, req) + delete(bunchCache, req) } } - BunchCacheLastCleanup = time.Now() + bunchCacheLastCleanup = time.Now() // Loop and Wait(), which re-locks } } -func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { - br := BunchedRequestFromCM(&msg) +// C2SHandleBunchedCommand handles C2S Commands such as `get_link`. +// It makes a request to the backend server for the data, but any other requests coming in while the first is pending also get the responses from the first one. +// Additionally, results are cached. +func C2SHandleBunchedCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { + br := bunchedRequestFromCM(&msg) cacheStatus := func() byte { - BunchCacheLock.RLock() - defer BunchCacheLock.RUnlock() - bresp, ok := BunchCache[br] + bunchCacheLock.RLock() + defer bunchCacheLock.RUnlock() + bresp, ok := bunchCache[br] if ok && bresp.Timestamp.After(time.Now().Add(-5*time.Minute)) { client.MsgChannelKeepalive.Add(1) go func() { @@ -459,12 +440,12 @@ func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg Cl return ClientMessage{Command: AsyncResponseCommand}, nil } else if cacheStatus == CacheStatusExpired { // Wake up the lazy janitor - BunchCacheCleanupSignal.Signal() + bunchCacheCleanupSignal.Signal() } - PendingBunchLock.Lock() - defer PendingBunchLock.Unlock() - list, ok := PendingBunchedRequests[br] + pendingBunchLock.Lock() + defer pendingBunchLock.Unlock() + list, ok := pendingBunchedRequests[br] if ok { list.Lock() AddToSliceB(&list.Members, client, msg.MessageID) @@ -473,9 +454,9 @@ func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg Cl return ClientMessage{Command: AsyncResponseCommand}, nil } - PendingBunchedRequests[br] = &BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}} + pendingBunchedRequests[br] = &bunchSubscriberList{Members: []bunchSubscriber{{Client: client, MessageID: msg.MessageID}}} - go func(request BunchedRequest) { + go func(request bunchedRequest) { respStr, err := SendRemoteCommandCached(string(request.Command), request.Param, AuthInfo{}) var msg ClientMessage @@ -489,15 +470,15 @@ func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg Cl } if err == nil { - BunchCacheLock.Lock() - BunchCache[request] = CachedBunchedResponse{Response: respStr, Timestamp: time.Now()} - BunchCacheLock.Unlock() + bunchCacheLock.Lock() + bunchCache[request] = cachedBunchedResponse{Response: respStr, Timestamp: time.Now()} + bunchCacheLock.Unlock() } - PendingBunchLock.Lock() - bsl := PendingBunchedRequests[request] - delete(PendingBunchedRequests, request) - PendingBunchLock.Unlock() + pendingBunchLock.Lock() + bsl := pendingBunchedRequests[request] + delete(pendingBunchedRequests, request) + pendingBunchLock.Unlock() bsl.Lock() for _, member := range bsl.Members { @@ -513,7 +494,7 @@ func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg Cl return ClientMessage{Command: AsyncResponseCommand}, nil } -func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func C2SHandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { client.MsgChannelKeepalive.Add(1) go doRemoteCommand(conn, msg, client) diff --git a/socketserver/internal/server/handlecore.go b/socketserver/internal/server/handlecore.go index e38c8d03..82ac446b 100644 --- a/socketserver/internal/server/handlecore.go +++ b/socketserver/internal/server/handlecore.go @@ -59,8 +59,8 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { } BannerHTML = bannerBytes - serveMux.HandleFunc("/", ServeWebsocketOrCatbag) - serveMux.HandleFunc("/drop_backlog", HBackendDropBacklog) + serveMux.HandleFunc("/", HTTPHandleRootURL) + serveMux.HandleFunc("/drop_backlog", HTTPBackendDropBacklog) serveMux.HandleFunc("/uncached_pub", HBackendPublishRequest) serveMux.HandleFunc("/cached_pub", HBackendUpdateAndPublish) @@ -81,7 +81,7 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { go backlogJanitor() go bunchCacheJanitor() go pubsubJanitor() - go sendAggregateData() + go aggregateDataSender() go ircConnection() } @@ -99,14 +99,16 @@ var SocketUpgrader = websocket.Upgrader{ // Memes go here. var BannerHTML []byte -func ServeWebsocketOrCatbag(w http.ResponseWriter, r *http.Request) { +// HTTPHandleRootURL is the http.HandleFunc for requests on `/`. +// It either uses the SocketUpgrader or writes out the BannerHTML. +func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) { if r.Header.Get("Connection") == "Upgrade" { conn, err := SocketUpgrader.Upgrade(w, r, nil) if err != nil { fmt.Fprintf(w, "error: %v", err) return } - HandleSocketConnection(conn) + RunSocketConnection(conn) return } else { @@ -114,26 +116,46 @@ func ServeWebsocketOrCatbag(w http.ResponseWriter, r *http.Request) { } } -// Errors that get returned to the client. -var ProtocolError error = errors.New("FFZ Socket protocol error.") -var ProtocolErrorNegativeID error = errors.New("FFZ Socket protocol error: negative or zero message ID.") -var ExpectedSingleString = errors.New("Error: Expected single string as arguments.") -var ExpectedSingleInt = errors.New("Error: Expected single integer as arguments.") -var ExpectedTwoStrings = errors.New("Error: Expected array of string, string as arguments.") -var ExpectedStringAndInt = errors.New("Error: Expected array of string, int as arguments.") -var ExpectedStringAndBool = errors.New("Error: Expected array of string, bool as arguments.") -var ExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.") +// ErrProtocolGeneric is sent in a ErrorCommand Reply. +var ErrProtocolGeneric error = errors.New("FFZ Socket protocol error.") +// ErrProtocolNegativeMsgID is sent in a ErrorCommand Reply when a negative MessageID is received. +var ErrProtocolNegativeMsgID error = errors.New("FFZ Socket protocol error: negative or zero message ID.") +// ErrExpectedSingleString is sent in a ErrorCommand Reply when the Arguments are of the wrong type. +var ErrExpectedSingleString = errors.New("Error: Expected single string as arguments.") +// ErrExpectedSingleInt is sent in a ErrorCommand Reply when the Arguments are of the wrong type. +var ErrExpectedSingleInt = errors.New("Error: Expected single integer as arguments.") +// ErrExpectedTwoStrings is sent in a ErrorCommand Reply when the Arguments are of the wrong type. +var ErrExpectedTwoStrings = errors.New("Error: Expected array of string, string as arguments.") +// ErrExpectedStringAndBool is sent in a ErrorCommand Reply when the Arguments are of the wrong type. +var ErrExpectedStringAndBool = errors.New("Error: Expected array of string, bool as arguments.") +// ErrExpectedStringAndInt is sent in a ErrorCommand Reply when the Arguments are of the wrong type. +var ErrExpectedStringAndInt = errors.New("Error: Expected array of string, int as arguments.") +// ErrExpectedStringAndIntGotFloat is sent in a ErrorCommand Reply when the Arguments are of the wrong type. +var ErrExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.") +// CloseGotBinaryMessage is the termination reason when the client sends a binary websocket frame. var CloseGotBinaryMessage = websocket.CloseError{Code: websocket.CloseUnsupportedData, Text: "got binary packet"} +// CloseTimedOut is the termination reason when the client fails to send or respond to ping frames. var CloseTimedOut = websocket.CloseError{Code: websocket.CloseNoStatusReceived, Text: "no ping replies for 5 minutes"} +// CloseFirstMessageNotHello is the termination reason var CloseFirstMessageNotHello = websocket.CloseError{ Text: "Error - the first message sent must be a 'hello'", Code: websocket.ClosePolicyViolation, } -// Handle a new websocket connection from a FFZ client. -// This runs in a goroutine started by net/http. -func HandleSocketConnection(conn *websocket.Conn) { +// RunSocketConnection contains the main run loop of a websocket connection. + +// First, it sets up the channels, the ClientInfo object, and the pong frame handler. +// It starts the reader goroutine pointing at the newly created channels. +// The function then enters the run loop (a `for{select{}}`). +// The run loop is broken when an object is received on errorChan, or if `hello` is not the first C2S Command. + +// After the run loop stops, the function launches a goroutine to drain +// client.MessageChannel, signals the reader goroutine to stop, unsubscribes +// from all pub/sub channels, waits on MsgChannelKeepalive (remember, the +// messages are being drained), and finally closes client.MessageChannel +// (which ends the drainer goroutine). +func RunSocketConnection(conn *websocket.Conn) { // websocket.Conn is a ReadWriteCloser log.Println("Got socket connection from", conn.RemoteAddr()) @@ -201,7 +223,9 @@ func HandleSocketConnection(conn *websocket.Conn) { }(_errorChan, _clientChan, stoppedChan) conn.SetPongHandler(func(pongBody string) error { + client.Mutex.Lock() client.pingCount = 0 + client.Mutex.Unlock() return nil }) @@ -236,14 +260,17 @@ RunLoop: break RunLoop } - HandleCommand(conn, &client, msg) + DispatchC2SCommand(conn, &client, msg) - case smsg := <-serverMessageChan: - SendMessage(conn, smsg) + case msg := <-serverMessageChan: + SendMessage(conn, msg) case <-time.After(1 * time.Minute): + client.Mutex.Lock() client.pingCount++ - if client.pingCount == 5 { + tooManyPings := client.pingCount == 5 + client.Mutex.Unlock() + if tooManyPings { CloseConnection(conn, &CloseTimedOut) break RunLoop } else { @@ -280,20 +307,6 @@ func getDeadline() time.Time { return time.Now().Add(1 * time.Minute) } -func CallHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) { - defer func() { - if r := recover(); r != nil { - var ok bool - fmt.Print("[!] Error executing command", cmsg.Command, "--", r) - err, ok = r.(error) - if !ok { - err = fmt.Errorf("command handler: %v", r) - } - } - }() - return handler(conn, client, cmsg) -} - func CloseConnection(conn *websocket.Conn, closeMsg *websocket.CloseError) { if closeMsg != &CloseFirstMessageNotHello { log.Println("Terminating connection with", conn.RemoteAddr(), "-", closeMsg.Text) @@ -323,11 +336,11 @@ func UnmarshalClientMessage(data []byte, payloadType int, v interface{}) (err er // Message ID spaceIdx = strings.IndexRune(dataStr, ' ') if spaceIdx == -1 { - return ProtocolError + return ErrProtocolGeneric } messageID, err := strconv.Atoi(dataStr[:spaceIdx]) if messageID < -1 || messageID == 0 { - return ProtocolErrorNegativeID + return ErrProtocolNegativeMsgID } out.MessageID = messageID @@ -397,23 +410,12 @@ func MarshalClientMessage(clientMessage interface{}) (payloadType int, data []by return websocket.TextMessage, []byte(dataStr), nil } -// Command handlers should use this to construct responses. -func SuccessMessageFromString(arguments string) ClientMessage { - cm := ClientMessage{ - MessageID: -1, // filled by the select loop - Command: SuccessCommand, - origArguments: arguments, - } - cm.parseOrigArguments() - return cm -} - // Convenience method: Parse the arguments of the ClientMessage as a single string. func (cm *ClientMessage) ArgumentsAsString() (string1 string, err error) { var ok bool string1, ok = cm.Arguments.(string) if !ok { - err = ExpectedSingleString + err = ErrExpectedSingleString return } else { return string1, nil @@ -426,7 +428,7 @@ func (cm *ClientMessage) ArgumentsAsInt() (int1 int64, err error) { var num float64 num, ok = cm.Arguments.(float64) if !ok { - err = ExpectedSingleInt + err = ErrExpectedSingleInt return } else { int1 = int64(num) @@ -440,16 +442,16 @@ func (cm *ClientMessage) ArgumentsAsTwoStrings() (string1, string2 string, err e var ary []interface{} ary, ok = cm.Arguments.([]interface{}) if !ok { - err = ExpectedTwoStrings + err = ErrExpectedTwoStrings return } else { if len(ary) != 2 { - err = ExpectedTwoStrings + err = ErrExpectedTwoStrings return } string1, ok = ary[0].(string) if !ok { - err = ExpectedTwoStrings + err = ErrExpectedTwoStrings return } // clientID can be null @@ -458,7 +460,7 @@ func (cm *ClientMessage) ArgumentsAsTwoStrings() (string1, string2 string, err e } string2, ok = ary[1].(string) if !ok { - err = ExpectedTwoStrings + err = ErrExpectedTwoStrings return } return string1, string2, nil @@ -471,27 +473,27 @@ func (cm *ClientMessage) ArgumentsAsStringAndInt() (string1 string, int int64, e var ary []interface{} ary, ok = cm.Arguments.([]interface{}) if !ok { - err = ExpectedStringAndInt + err = ErrExpectedStringAndInt return } else { if len(ary) != 2 { - err = ExpectedStringAndInt + err = ErrExpectedStringAndInt return } string1, ok = ary[0].(string) if !ok { - err = ExpectedStringAndInt + err = ErrExpectedStringAndInt return } var num float64 num, ok = ary[1].(float64) if !ok { - err = ExpectedStringAndInt + err = ErrExpectedStringAndInt return } int = int64(num) if float64(int) != num { - err = ExpectedStringAndIntGotFloat + err = ErrExpectedStringAndIntGotFloat return } return string1, int, nil @@ -504,21 +506,21 @@ func (cm *ClientMessage) ArgumentsAsStringAndBool() (str string, flag bool, err var ary []interface{} ary, ok = cm.Arguments.([]interface{}) if !ok { - err = ExpectedStringAndBool + err = ErrExpectedStringAndBool return } else { if len(ary) != 2 { - err = ExpectedStringAndBool + err = ErrExpectedStringAndBool return } str, ok = ary[0].(string) if !ok { - err = ExpectedStringAndBool + err = ErrExpectedStringAndBool return } flag, ok = ary[1].(bool) if !ok { - err = ExpectedStringAndBool + err = ErrExpectedStringAndBool return } return str, flag, nil diff --git a/socketserver/internal/server/publisher.go b/socketserver/internal/server/publisher.go index a90efb1c..d93618cf 100644 --- a/socketserver/internal/server/publisher.go +++ b/socketserver/internal/server/publisher.go @@ -117,7 +117,8 @@ var CachedGlobalMessages []TimestampedGlobalMessage var CachedChannelMessages []TimestampedMultichatMessage var CacheListsLock sync.RWMutex -func DumpCache() { +// DumpBacklogData drops all /cached_pub data. +func DumpBacklogData() { CachedLSMLock.Lock() CachedLastMessages = make(map[Command]map[string]LastSavedMessage) CachedLSMLock.Unlock() @@ -132,6 +133,9 @@ func DumpCache() { CacheListsLock.Unlock() } +// SendBacklogForNewClient sends any backlog data relevant to a new client. +// This should be done when the client sends a `ready` message. +// This will only send data for CacheTypePersistent and CacheTypeLastOnly because those do not involve timestamps. func SendBacklogForNewClient(client *ClientInfo) { client.Mutex.Lock() // reading CurrentChannels PersistentLSMLock.RLock() @@ -170,11 +174,13 @@ func SendBacklogForNewClient(client *ClientInfo) { client.Mutex.Unlock() } +// SendTimedBacklogMessages sends any once-off messages that the client may have missed while it was disconnected. +// Effectively, this can only process CacheTypeTimestamps. func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) { client.Mutex.Lock() // reading CurrentChannels CacheListsLock.RLock() - globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime) + globIdx := findFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime) if globIdx != -1 { for i := globIdx; i < len(CachedGlobalMessages); i++ { @@ -185,7 +191,7 @@ func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) { } } - chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime) + chanIdx := findFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime) if chanIdx != -1 { for i := chanIdx; i < len(CachedChannelMessages); i++ { @@ -217,20 +223,20 @@ func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) { func backlogJanitor() { for { time.Sleep(1 * time.Hour) - CleanupTimedBacklogMessages() + cleanupTimedBacklogMessages() } } -func CleanupTimedBacklogMessages() { +func cleanupTimedBacklogMessages() { CacheListsLock.Lock() oneHourAgo := time.Now().Add(-24 * time.Hour) - globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), oneHourAgo) + globIdx := findFirstNewMessage(tgmarray(CachedGlobalMessages), oneHourAgo) if globIdx != -1 { newGlobMsgs := make([]TimestampedGlobalMessage, len(CachedGlobalMessages)-globIdx) copy(newGlobMsgs, CachedGlobalMessages[globIdx:]) CachedGlobalMessages = newGlobMsgs } - chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), oneHourAgo) + chanIdx := findFirstNewMessage(tmmarray(CachedChannelMessages), oneHourAgo) if chanIdx != -1 { newChanMsgs := make([]TimestampedMultichatMessage, len(CachedChannelMessages)-chanIdx) copy(newChanMsgs, CachedChannelMessages[chanIdx:]) @@ -239,7 +245,10 @@ func CleanupTimedBacklogMessages() { CacheListsLock.Unlock() } -func InsertionSort(ary sort.Interface) { +// insertionSort implements insertion sort. +// CacheTypeTimestamps should use insertion sort for O(N) average performance. +// (The average case is the array is still sorted after insertion of the new item.) +func insertionSort(ary sort.Interface) { for i := 1; i < ary.Len(); i++ { for j := i; j > 0 && ary.Less(j, j-1); j-- { ary.Swap(j, j-1) @@ -247,13 +256,12 @@ func InsertionSort(ary sort.Interface) { } } -type TimestampArray interface { +type timestampArray interface { Len() int GetTime(int) time.Time } -func FindFirstNewMessage(ary TimestampArray, disconnectTime time.Time) (idx int) { - // TODO needs tests +func findFirstNewMessage(ary timestampArray, disconnectTime time.Time) (idx int) { len := ary.Len() i := len @@ -304,14 +312,14 @@ func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync. func SaveGlobalMessage(cmd Command, timestamp time.Time, data string) { CacheListsLock.Lock() CachedGlobalMessages = append(CachedGlobalMessages, TimestampedGlobalMessage{timestamp, cmd, data}) - InsertionSort(tgmarray(CachedGlobalMessages)) + insertionSort(tgmarray(CachedGlobalMessages)) CacheListsLock.Unlock() } func SaveMultichanMessage(cmd Command, channels string, timestamp time.Time, data string) { CacheListsLock.Lock() CachedChannelMessages = append(CachedChannelMessages, TimestampedMultichatMessage{timestamp, strings.Split(channels, ","), cmd, data}) - InsertionSort(tmmarray(CachedChannelMessages)) + insertionSort(tmmarray(CachedChannelMessages)) CacheListsLock.Unlock() } @@ -325,7 +333,7 @@ func GetCommandsOfType(match PushCommandCacheInfo) []Command { return ret } -func HBackendDropBacklog(w http.ResponseWriter, r *http.Request) { +func HTTPBackendDropBacklog(w http.ResponseWriter, r *http.Request) { r.ParseForm() formData, err := UnsealRequest(r.Form) if err != nil { @@ -336,7 +344,7 @@ func HBackendDropBacklog(w http.ResponseWriter, r *http.Request) { confirm := formData.Get("confirm") if confirm == "1" { - DumpCache() + DumpBacklogData() } } diff --git a/socketserver/internal/server/publisher_test.go b/socketserver/internal/server/publisher_test.go index 99ce4f5a..e3c0484f 100644 --- a/socketserver/internal/server/publisher_test.go +++ b/socketserver/internal/server/publisher_test.go @@ -11,7 +11,7 @@ func TestCleanupBacklogMessages(t *testing.T) { func TestFindFirstNewMessageEmpty(t *testing.T) { CachedGlobalMessages = []TimestampedGlobalMessage{} - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) if i != -1 { t.Errorf("Expected -1, got %d", i) } @@ -20,7 +20,7 @@ func TestFindFirstNewMessageOneBefore(t *testing.T) { CachedGlobalMessages = []TimestampedGlobalMessage{ {Timestamp: time.Unix(8, 0)}, } - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) if i != -1 { t.Errorf("Expected -1, got %d", i) } @@ -33,7 +33,7 @@ func TestFindFirstNewMessageSeveralBefore(t *testing.T) { {Timestamp: time.Unix(4, 0)}, {Timestamp: time.Unix(5, 0)}, } - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) if i != -1 { t.Errorf("Expected -1, got %d", i) } @@ -51,7 +51,7 @@ func TestFindFirstNewMessageInMiddle(t *testing.T) { {Timestamp: time.Unix(14, 0)}, {Timestamp: time.Unix(15, 0)}, } - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) if i != 5 { t.Errorf("Expected 5, got %d", i) } @@ -60,7 +60,7 @@ func TestFindFirstNewMessageOneAfter(t *testing.T) { CachedGlobalMessages = []TimestampedGlobalMessage{ {Timestamp: time.Unix(15, 0)}, } - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) if i != 0 { t.Errorf("Expected 0, got %d", i) } @@ -73,7 +73,7 @@ func TestFindFirstNewMessageSeveralAfter(t *testing.T) { {Timestamp: time.Unix(14, 0)}, {Timestamp: time.Unix(15, 0)}, } - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) if i != 0 { t.Errorf("Expected 0, got %d", i) } diff --git a/socketserver/internal/server/subscriptions_test.go b/socketserver/internal/server/subscriptions_test.go index 9e87f504..290dcea9 100644 --- a/socketserver/internal/server/subscriptions_test.go +++ b/socketserver/internal/server/subscriptions_test.go @@ -134,7 +134,7 @@ func TGetUrls(testserver *httptest.Server) TURLs { } func TSetup(testserver **httptest.Server, urls *TURLs) { - DumpCache() + DumpBacklogData() conf := &ConfigFile{ ServerID: 20, diff --git a/socketserver/internal/server/types.go b/socketserver/internal/server/types.go index 1088b0fd..7a3b2f99 100644 --- a/socketserver/internal/server/types.go +++ b/socketserver/internal/server/types.go @@ -98,10 +98,13 @@ type ClientInfo struct { // Take out an Add() on this during a command if you need to use the MessageChannel later. MsgChannelKeepalive sync.WaitGroup - // The number of pings sent without a response + // The number of pings sent without a response. + // Protected by Mutex pingCount int } +const usePendingSubscrptionsBacklog = false + type tgmarray []TimestampedGlobalMessage type tmmarray []TimestampedMultichatMessage @@ -178,9 +181,9 @@ func (bct *BacklogCacheType) UnmarshalJSON(data []byte) error { *bct = CacheTypeInvalid return nil } - val := BacklogCacheTypeByName(str) - if val != CacheTypeInvalid { - *bct = val + newBct := BacklogCacheTypeByName(str) + if newBct != CacheTypeInvalid { + *bct = newBct return nil } return ErrorUnrecognizedCacheType @@ -234,9 +237,9 @@ func (mtt *MessageTargetType) UnmarshalJSON(data []byte) error { *mtt = MsgTargetTypeInvalid return nil } - mtt := MessageTargetTypeByName(str) - if mtt != MsgTargetTypeInvalid { - *mtt = mtt + newMtt := MessageTargetTypeByName(str) + if newMtt != MsgTargetTypeInvalid { + *mtt = newMtt return nil } return ErrorUnrecognizedTargetType diff --git a/socketserver/internal/server/utils.go b/socketserver/internal/server/utils.go index 11c9c77b..12c26ed0 100644 --- a/socketserver/internal/server/utils.go +++ b/socketserver/internal/server/utils.go @@ -160,8 +160,8 @@ func RemoveFromSliceC(ary *[]chan<- ClientMessage, val chan<- ClientMessage) boo return true } -func AddToSliceB(ary *[]BunchSubscriber, client *ClientInfo, mid int) bool { - newSub := BunchSubscriber{Client: client, MessageID: mid} +func AddToSliceB(ary *[]bunchSubscriber, client *ClientInfo, mid int) bool { + newSub := bunchSubscriber{Client: client, MessageID: mid} slice := *ary for _, v := range slice { if v == newSub {