package server import ( "encoding/json" "errors" "fmt" "github.com/gorilla/websocket" "github.com/satori/go.uuid" "log" "net/url" "strconv" "sync" "time" ) // Command is a string indicating which RPC is requested. // The Commands sent from Client -> Server and Server -> Client are disjoint sets. type Command string // CommandHandler is a RPC handler assosciated with a Command. type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) var commandHandlers = map[Command]CommandHandler{ HelloCommand: C2SHello, "ping": C2SPing, "setuser": C2SSetUser, "ready": C2SReady, "sub": C2SSubscribe, "unsub": C2SUnsubscribe, "track_follow": C2STrackFollow, "emoticon_uses": C2SEmoticonUses, "survey": C2SSurvey, "twitch_emote": C2SHandleBunchedCommand, "get_link": C2SHandleBunchedCommand, "get_display_name": C2SHandleBunchedCommand, "update_follow_buttons": C2SHandleRemoteCommand, "chat_history": C2SHandleRemoteCommand, } // DispatchC2SCommand handles a C2S Command in the provided ClientMessage. // It calls the correct CommandHandler function, catching panics. // It sends either the returned Reply ClientMessage, setting the correct messageID, or sends an ErrorCommand func DispatchC2SCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) { handler, ok := commandHandlers[msg.Command] if !ok { handler = C2SHandleRemoteCommand } Statistics.CommandsIssuedTotal++ Statistics.CommandsIssuedMap[msg.Command]++ response, err := callHandler(handler, conn, client, msg) if err == nil { if response.Command == AsyncResponseCommand { // Don't send anything // The response will be delivered over client.MessageChannel / serverMessageChan } else { response.MessageID = msg.MessageID SendMessage(conn, response) } } else { SendMessage(conn, ClientMessage{ MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error(), }) } } func callHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) { defer func() { if r := recover(); r != nil { var ok bool fmt.Print("[!] Error executing command", cmsg.Command, "--", r) err, ok = r.(error) if !ok { err = fmt.Errorf("command handler: %v", r) } } }() return handler(conn, client, cmsg) } var lastVersionWithoutReplyWithServerTime = VersionFromString("ffz_3.5.78") // C2SHello implements the `hello` C2S Command. // It calls SubscribeGlobal() and SubscribeDefaults() with the client, and fills out ClientInfo.Version and ClientInfo.ClientID. func C2SHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { version, clientID, err := msg.ArgumentsAsTwoStrings() if err != nil { return } client.VersionString = version client.Version = VersionFromString(version) client.ClientID = uuid.FromStringOrNil(clientID) if client.ClientID == uuid.Nil { client.ClientID = uuid.NewV4() } SubscribeGlobal(client) SubscribeDefaults(client) if client.Version.After(&lastVersionWithoutReplyWithServerTime) { jsTime := float64(time.Now().UnixNano()/1000) / 1000 return ClientMessage{ Arguments: []interface{}{ client.ClientID.String(), jsTime, }, }, nil } else { return ClientMessage{ Arguments: client.ClientID.String(), }, nil } } func C2SPing(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { return ClientMessage{ Arguments: float64(time.Now().UnixNano()/1000) / 1000, }, nil } func C2SSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { username, err := msg.ArgumentsAsString() if err != nil { return } client.Mutex.Lock() client.TwitchUsername = username client.UsernameValidated = false client.Mutex.Unlock() if Configuration.SendAuthToNewClients { client.MsgChannelKeepalive.Add(1) go client.StartAuthorization(func(_ *ClientInfo, _ bool) { client.MsgChannelKeepalive.Done() }) } return ResponseSuccess, nil } func C2SReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { // disconnectAt, err := msg.ArgumentsAsInt() // if err != nil { // return // } client.Mutex.Lock() if client.MakePendingRequests != nil { if !client.MakePendingRequests.Stop() { // Timer already fired, GetSubscriptionBacklog() has started rmsg.Command = SuccessCommand return } } client.PendingSubscriptionsBacklog = nil client.MakePendingRequests = nil client.Mutex.Unlock() client.MsgChannelKeepalive.Add(1) go func() { client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand} SendBacklogForNewClient(client) // if disconnectAt != 0 { // SendTimedBacklogMessages(client, time.Unix(disconnectAt, 0)) // } client.MsgChannelKeepalive.Done() }() return ClientMessage{Command: AsyncResponseCommand}, nil } func C2SSubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { channel, err := msg.ArgumentsAsString() if err != nil { return } client.Mutex.Lock() AddToSliceS(&client.CurrentChannels, channel) if usePendingSubscrptionsBacklog { client.PendingSubscriptionsBacklog = append(client.PendingSubscriptionsBacklog, channel) } client.Mutex.Unlock() SubscribeChannel(client, channel) return ResponseSuccess, nil } // C2SUnsubscribe implements the `unsub` C2S Command. // It removes the channel from ClientInfo.CurrentChannels and calls UnsubscribeSingleChat. func C2SUnsubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { channel, err := msg.ArgumentsAsString() if err != nil { return } client.Mutex.Lock() RemoveFromSliceS(&client.CurrentChannels, channel) client.Mutex.Unlock() UnsubscribeSingleChat(client, channel) return ResponseSuccess, nil } // C2SSurvey implements the survey C2S Command. // Surveys are discarded.s func C2SSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { // Discard return ResponseSuccess, nil } type followEvent struct { User string `json:"u"` Channel string `json:"c"` NowFollowing bool `json:"f"` Timestamp time.Time `json:"t"` } var followEvents []followEvent // followEventsLock is the lock for followEvents. var followEventsLock sync.Mutex // C2STrackFollow implements the `track_follow` C2S Command. // It adds the record to `followEvents`, which is submitted to the backend on a timer. func C2STrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { channel, following, err := msg.ArgumentsAsStringAndBool() if err != nil { return } now := time.Now() followEventsLock.Lock() followEvents = append(followEvents, followEvent{client.TwitchUsername, channel, following, now}) followEventsLock.Unlock() return ResponseSuccess, nil } // AggregateEmoteUsage is a map from emoteID to a map from chatroom name to usage count. var aggregateEmoteUsage = make(map[int]map[string]int) // AggregateEmoteUsageLock is the lock for AggregateEmoteUsage. var aggregateEmoteUsageLock sync.Mutex // ErrNegativeEmoteUsage is emitted when the submitted emote usage is negative. var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative") // C2SEmoticonUses implements the `emoticon_uses` C2S Command. // msg.Arguments are in the JSON format of [1]map[emoteID]map[ChatroomName]float64. func C2SEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { // if this panics, will be caught by callHandler mapRoot := msg.Arguments.([]interface{})[0].(map[string]interface{}) // Validate: male suire for strEmote, val1 := range mapRoot { _, err = strconv.Atoi(strEmote) if err != nil { return } mapInner := val1.(map[string]interface{}) for _, val2 := range mapInner { var count = int(val2.(float64)) if count <= 0 { err = ErrNegativeEmoteUsage return } } } aggregateEmoteUsageLock.Lock() defer aggregateEmoteUsageLock.Unlock() var total int for strEmote, val1 := range mapRoot { var emoteID int emoteID, err = strconv.Atoi(strEmote) if err != nil { return } destMapInner, ok := aggregateEmoteUsage[emoteID] if !ok { destMapInner = make(map[string]int) aggregateEmoteUsage[emoteID] = destMapInner } mapInner := val1.(map[string]interface{}) for roomName, val2 := range mapInner { var count = int(val2.(float64)) if count > 200 { count = 200 } destMapInner[roomName] += count total += count } } Statistics.EmotesReportedTotal += uint64(total) return ResponseSuccess, nil } func aggregateDataSender() { for { time.Sleep(5 * time.Minute) doSendAggregateData() } } func doSendAggregateData() { followEventsLock.Lock() follows := followEvents followEvents = nil followEventsLock.Unlock() aggregateEmoteUsageLock.Lock() emoteUsage := aggregateEmoteUsage aggregateEmoteUsage = make(map[int]map[string]int) aggregateEmoteUsageLock.Unlock() reportForm := url.Values{} followJSON, err := json.Marshal(follows) if err != nil { log.Println("error reporting aggregate data:", err) } else { reportForm.Set("follows", string(followJSON)) } strEmoteUsage := make(map[string]map[string]int) for emoteID, usageByChannel := range emoteUsage { strEmoteID := strconv.Itoa(emoteID) strEmoteUsage[strEmoteID] = usageByChannel } emoteJSON, err := json.Marshal(strEmoteUsage) if err != nil { log.Println("error reporting aggregate data:", err) } else { reportForm.Set("emotes", string(emoteJSON)) } form, err := SealRequest(reportForm) if err != nil { log.Println("error reporting aggregate data:", err) return } err = SendAggregatedData(form) if err != nil { log.Println("error reporting aggregate data:", err) return } // done } type bunchedRequest struct { Command Command Param string } type cachedBunchedResponse struct { Response string Timestamp time.Time } type bunchSubscriber struct { Client *ClientInfo MessageID int } type bunchSubscriberList struct { sync.Mutex Members []bunchSubscriber } type CacheStatus byte const ( CacheStatusNotFound = iota CacheStatusFound CacheStatusExpired ) var pendingBunchedRequests = make(map[bunchedRequest]*bunchSubscriberList) var pendingBunchLock sync.Mutex var bunchCache = make(map[bunchedRequest]cachedBunchedResponse) var bunchCacheLock sync.RWMutex var bunchCacheCleanupSignal = sync.NewCond(&bunchCacheLock) var bunchCacheLastCleanup time.Time func bunchedRequestFromCM(msg *ClientMessage) bunchedRequest { return bunchedRequest{Command: msg.Command, Param: msg.origArguments} } func bunchCacheJanitor() { go func() { for { time.Sleep(30 * time.Minute) bunchCacheCleanupSignal.Signal() } }() bunchCacheLock.Lock() for { // Unlocks CachedBunchLock, waits for signal, re-locks bunchCacheCleanupSignal.Wait() if bunchCacheLastCleanup.After(time.Now().Add(-1 * time.Second)) { // skip if it's been less than 1 second continue } // CachedBunchLock is held here keepIfAfter := time.Now().Add(-5 * time.Minute) for req, resp := range bunchCache { if !resp.Timestamp.After(keepIfAfter) { delete(bunchCache, req) } } bunchCacheLastCleanup = time.Now() // Loop and Wait(), which re-locks } } // C2SHandleBunchedCommand handles C2S Commands such as `get_link`. // It makes a request to the backend server for the data, but any other requests coming in while the first is pending also get the responses from the first one. // Additionally, results are cached. func C2SHandleBunchedCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { br := bunchedRequestFromCM(&msg) cacheStatus := func() byte { bunchCacheLock.RLock() defer bunchCacheLock.RUnlock() bresp, ok := bunchCache[br] if ok && bresp.Timestamp.After(time.Now().Add(-5*time.Minute)) { client.MsgChannelKeepalive.Add(1) go func() { var rmsg ClientMessage rmsg.Command = SuccessCommand rmsg.MessageID = msg.MessageID rmsg.origArguments = bresp.Response rmsg.parseOrigArguments() client.MessageChannel <- rmsg client.MsgChannelKeepalive.Done() }() return CacheStatusFound } else if ok { return CacheStatusExpired } return CacheStatusNotFound }() if cacheStatus == CacheStatusFound { return ClientMessage{Command: AsyncResponseCommand}, nil } else if cacheStatus == CacheStatusExpired { // Wake up the lazy janitor bunchCacheCleanupSignal.Signal() } pendingBunchLock.Lock() defer pendingBunchLock.Unlock() list, ok := pendingBunchedRequests[br] if ok { list.Lock() AddToSliceB(&list.Members, client, msg.MessageID) list.Unlock() return ClientMessage{Command: AsyncResponseCommand}, nil } pendingBunchedRequests[br] = &bunchSubscriberList{Members: []bunchSubscriber{{Client: client, MessageID: msg.MessageID}}} go func(request bunchedRequest) { respStr, err := SendRemoteCommandCached(string(request.Command), request.Param, AuthInfo{}) var msg ClientMessage if err == nil { msg.Command = SuccessCommand msg.origArguments = respStr msg.parseOrigArguments() } else { msg.Command = ErrorCommand msg.Arguments = err.Error() } if err == nil { bunchCacheLock.Lock() bunchCache[request] = cachedBunchedResponse{Response: respStr, Timestamp: time.Now()} bunchCacheLock.Unlock() } pendingBunchLock.Lock() bsl := pendingBunchedRequests[request] delete(pendingBunchedRequests, request) pendingBunchLock.Unlock() bsl.Lock() for _, member := range bsl.Members { msg.MessageID = member.MessageID select { case member.Client.MessageChannel <- msg: case <-member.Client.MsgChannelIsDone: } } bsl.Unlock() }(br) return ClientMessage{Command: AsyncResponseCommand}, nil } func C2SHandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { client.MsgChannelKeepalive.Add(1) go doRemoteCommand(conn, msg, client) return ClientMessage{Command: AsyncResponseCommand}, nil } const AuthorizationFailedErrorString = "Failed to verify your Twitch username." func doRemoteCommand(conn *websocket.Conn, msg ClientMessage, client *ClientInfo) { resp, err := SendRemoteCommandCached(string(msg.Command), msg.origArguments, client.AuthInfo) if err == ErrAuthorizationNeeded { client.StartAuthorization(func(_ *ClientInfo, success bool) { if success { doRemoteCommand(conn, msg, client) } else { client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: AuthorizationFailedErrorString} client.MsgChannelKeepalive.Done() } }) return // without keepalive.Done() } else if err != nil { client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()} } else { msg := ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp} msg.parseOrigArguments() client.MessageChannel <- msg } client.MsgChannelKeepalive.Done() }