1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-06-28 05:15:54 +00:00
FrankerFaceZ/socketserver/server/commands.go

565 lines
15 KiB
Go
Raw Normal View History

package server
import (
"encoding/json"
"errors"
2015-11-16 12:50:00 -08:00
"fmt"
"github.com/gorilla/websocket"
"github.com/satori/go.uuid"
"log"
2015-10-28 18:12:20 -07:00
"net/url"
2015-10-25 00:58:05 -07:00
"strconv"
"sync"
2015-10-25 03:21:50 -07:00
"time"
)
2015-11-15 18:43:34 -08:00
// Command is a string indicating which RPC is requested.
// The Commands sent from Client -> Server and Server -> Client are disjoint sets.
2015-11-08 22:34:06 -08:00
type Command string
2015-11-15 18:43:34 -08:00
// CommandHandler is a RPC handler assosciated with a Command.
2015-11-08 22:34:06 -08:00
type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error)
2015-11-15 18:43:34 -08:00
var commandHandlers = map[Command]CommandHandler{
2015-11-16 12:50:00 -08:00
HelloCommand: C2SHello,
2015-11-16 15:23:27 -08:00
"ping": C2SPing,
2015-11-16 12:50:00 -08:00
"setuser": C2SSetUser,
"ready": C2SReady,
"sub": C2SSubscribe,
"unsub": C2SUnsubscribe,
"track_follow": C2STrackFollow,
"emoticon_uses": C2SEmoticonUses,
"survey": C2SSurvey,
2015-11-16 22:07:43 -08:00
"twitch_emote": C2SHandleBunchedCommand,
2015-11-16 12:50:00 -08:00
"get_link": C2SHandleBunchedCommand,
"get_display_name": C2SHandleBunchedCommand,
"update_follow_buttons": C2SHandleRemoteCommand,
"chat_history": C2SHandleRemoteCommand,
2015-11-16 22:10:55 -08:00
"user_history": C2SHandleRemoteCommand,
2015-11-08 22:34:06 -08:00
}
2015-11-16 12:50:00 -08:00
// DispatchC2SCommand handles a C2S Command in the provided ClientMessage.
// It calls the correct CommandHandler function, catching panics.
// It sends either the returned Reply ClientMessage, setting the correct messageID, or sends an ErrorCommand
func DispatchC2SCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) {
2015-11-15 18:43:34 -08:00
handler, ok := commandHandlers[msg.Command]
if !ok {
2015-11-16 12:50:00 -08:00
handler = C2SHandleRemoteCommand
}
2015-11-16 13:07:02 -08:00
Statistics.CommandsIssuedTotal++
Statistics.CommandsIssuedMap[msg.Command]++
2015-11-16 12:50:00 -08:00
response, err := callHandler(handler, conn, client, msg)
if err == nil {
2015-10-26 14:55:20 -07:00
if response.Command == AsyncResponseCommand {
// Don't send anything
// The response will be delivered over client.MessageChannel / serverMessageChan
} else {
response.MessageID = msg.MessageID
SendMessage(conn, response)
2015-10-26 14:55:20 -07:00
}
} else {
SendMessage(conn, ClientMessage{
MessageID: msg.MessageID,
2015-11-16 12:50:00 -08:00
Command: ErrorCommand,
Arguments: err.Error(),
})
}
}
2015-11-16 12:50:00 -08:00
func callHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) {
defer func() {
if r := recover(); r != nil {
var ok bool
fmt.Print("[!] Error executing command", cmsg.Command, "--", r)
err, ok = r.(error)
if !ok {
err = fmt.Errorf("command handler: %v", r)
}
}
}()
return handler(conn, client, cmsg)
}
2015-11-16 14:30:09 -08:00
var lastVersionWithoutReplyWithServerTime = VersionFromString("ffz_3.5.78")
2015-11-16 12:50:00 -08:00
// C2SHello implements the `hello` C2S Command.
// It calls SubscribeGlobal() and SubscribeDefaults() with the client, and fills out ClientInfo.Version and ClientInfo.ClientID.
func C2SHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
2015-11-15 18:43:34 -08:00
version, clientID, err := msg.ArgumentsAsTwoStrings()
if err != nil {
return
}
2015-11-16 14:30:09 -08:00
client.VersionString = version
client.Version = VersionFromString(version)
2015-11-15 18:43:34 -08:00
client.ClientID = uuid.FromStringOrNil(clientID)
if client.ClientID == uuid.Nil {
client.ClientID = uuid.NewV4()
}
SubscribeGlobal(client)
2015-11-09 14:44:33 -08:00
SubscribeDefaults(client)
2015-11-16 14:48:24 -08:00
if client.Version.After(&lastVersionWithoutReplyWithServerTime) {
2015-11-16 20:35:03 -08:00
jsTime := float64(time.Now().UnixNano()/1000) / 1000
2015-11-16 14:30:09 -08:00
return ClientMessage{
Arguments: []interface{}{
client.ClientID.String(),
2015-11-16 14:48:24 -08:00
jsTime,
2015-11-16 14:30:09 -08:00
},
2015-11-16 14:48:24 -08:00
}, nil
2015-11-16 14:30:09 -08:00
} else {
return ClientMessage{
Arguments: client.ClientID.String(),
}, nil
}
}
2015-11-16 15:23:27 -08:00
func C2SPing(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
return ClientMessage{
2015-11-16 20:35:03 -08:00
Arguments: float64(time.Now().UnixNano()/1000) / 1000,
2015-11-16 16:56:27 -08:00
}, nil
2015-11-16 15:23:27 -08:00
}
func C2SSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
username, err := msg.ArgumentsAsString()
if err != nil {
return
}
client.Mutex.Lock()
client.TwitchUsername = username
client.UsernameValidated = false
client.Mutex.Unlock()
if Configuration.SendAuthToNewClients {
client.MsgChannelKeepalive.Add(1)
go client.StartAuthorization(func(_ *ClientInfo, _ bool) {
client.MsgChannelKeepalive.Done()
})
}
return ResponseSuccess, nil
}
2015-11-16 12:50:00 -08:00
func C2SReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
2015-11-16 20:35:03 -08:00
// disconnectAt, err := msg.ArgumentsAsInt()
// if err != nil {
// return
// }
2015-10-26 14:55:20 -07:00
client.Mutex.Lock()
if client.MakePendingRequests != nil {
if !client.MakePendingRequests.Stop() {
// Timer already fired, GetSubscriptionBacklog() has started
rmsg.Command = SuccessCommand
return
}
}
client.PendingSubscriptionsBacklog = nil
client.MakePendingRequests = nil
client.Mutex.Unlock()
client.MsgChannelKeepalive.Add(1)
go func() {
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand}
SendBacklogForNewClient(client)
client.MsgChannelKeepalive.Done()
}()
return ClientMessage{Command: AsyncResponseCommand}, nil
2015-10-26 14:55:20 -07:00
}
2015-11-16 12:50:00 -08:00
func C2SSubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
channel, err := msg.ArgumentsAsString()
if err != nil {
return
}
2015-10-25 03:21:50 -07:00
client.Mutex.Lock()
AddToSliceS(&client.CurrentChannels, channel)
2015-11-16 12:50:00 -08:00
if usePendingSubscrptionsBacklog {
client.PendingSubscriptionsBacklog = append(client.PendingSubscriptionsBacklog, channel)
}
2015-10-25 03:21:50 -07:00
client.Mutex.Unlock()
2015-11-08 22:34:06 -08:00
SubscribeChannel(client, channel)
return ResponseSuccess, nil
}
2015-11-16 12:50:00 -08:00
// C2SUnsubscribe implements the `unsub` C2S Command.
// It removes the channel from ClientInfo.CurrentChannels and calls UnsubscribeSingleChat.
func C2SUnsubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
channel, err := msg.ArgumentsAsString()
if err != nil {
return
}
2015-10-25 03:21:50 -07:00
client.Mutex.Lock()
RemoveFromSliceS(&client.CurrentChannels, channel)
2015-10-25 03:21:50 -07:00
client.Mutex.Unlock()
UnsubscribeSingleChat(client, channel)
return ResponseSuccess, nil
}
2015-11-16 12:50:00 -08:00
// C2SSurvey implements the survey C2S Command.
// Surveys are discarded.s
func C2SSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
// Discard
return ResponseSuccess, nil
}
2015-11-16 12:50:00 -08:00
type followEvent struct {
2015-11-05 23:24:35 -08:00
User string `json:"u"`
Channel string `json:"c"`
NowFollowing bool `json:"f"`
Timestamp time.Time `json:"t"`
2015-10-25 03:21:50 -07:00
}
2015-11-16 12:50:00 -08:00
var followEvents []followEvent
2015-10-25 03:21:50 -07:00
2015-11-16 12:50:00 -08:00
// followEventsLock is the lock for followEvents.
var followEventsLock sync.Mutex
// C2STrackFollow implements the `track_follow` C2S Command.
// It adds the record to `followEvents`, which is submitted to the backend on a timer.
func C2STrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
2015-10-25 03:21:50 -07:00
channel, following, err := msg.ArgumentsAsStringAndBool()
if err != nil {
return
}
now := time.Now()
2015-11-16 12:50:00 -08:00
followEventsLock.Lock()
followEvents = append(followEvents, followEvent{client.TwitchUsername, channel, following, now})
followEventsLock.Unlock()
return ResponseSuccess, nil
}
2015-11-15 18:43:34 -08:00
// AggregateEmoteUsage is a map from emoteID to a map from chatroom name to usage count.
2015-11-16 12:50:00 -08:00
var aggregateEmoteUsage = make(map[int]map[string]int)
2015-11-15 18:43:34 -08:00
// AggregateEmoteUsageLock is the lock for AggregateEmoteUsage.
2015-11-16 12:50:00 -08:00
var aggregateEmoteUsageLock sync.Mutex
2015-11-15 18:43:34 -08:00
// ErrNegativeEmoteUsage is emitted when the submitted emote usage is negative.
var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative")
2015-10-25 00:58:05 -07:00
2015-11-16 12:50:00 -08:00
// C2SEmoticonUses implements the `emoticon_uses` C2S Command.
// msg.Arguments are in the JSON format of [1]map[emoteID]map[ChatroomName]float64.
func C2SEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
// if this panics, will be caught by callHandler
2015-10-25 00:58:05 -07:00
mapRoot := msg.Arguments.([]interface{})[0].(map[string]interface{})
2015-11-16 12:50:00 -08:00
// Validate: male suire
for strEmote, val1 := range mapRoot {
_, err = strconv.Atoi(strEmote)
if err != nil {
return
}
mapInner := val1.(map[string]interface{})
for _, val2 := range mapInner {
2015-11-16 12:50:00 -08:00
var count = int(val2.(float64))
if count <= 0 {
2015-11-15 18:43:34 -08:00
err = ErrNegativeEmoteUsage
return
}
}
}
2015-11-16 12:50:00 -08:00
aggregateEmoteUsageLock.Lock()
defer aggregateEmoteUsageLock.Unlock()
2015-10-25 00:58:05 -07:00
2015-11-16 21:57:18 -08:00
var total int
2015-10-25 00:58:05 -07:00
for strEmote, val1 := range mapRoot {
2015-11-15 18:43:34 -08:00
var emoteID int
emoteID, err = strconv.Atoi(strEmote)
2015-10-25 00:58:05 -07:00
if err != nil {
return
}
2015-11-16 12:50:00 -08:00
destMapInner, ok := aggregateEmoteUsage[emoteID]
2015-10-25 00:58:05 -07:00
if !ok {
destMapInner = make(map[string]int)
2015-11-16 12:50:00 -08:00
aggregateEmoteUsage[emoteID] = destMapInner
2015-10-25 00:58:05 -07:00
}
mapInner := val1.(map[string]interface{})
for roomName, val2 := range mapInner {
2015-11-16 12:50:00 -08:00
var count = int(val2.(float64))
if count > 200 {
count = 200
}
2015-10-25 00:58:05 -07:00
destMapInner[roomName] += count
2015-11-16 21:57:18 -08:00
total += count
2015-10-25 00:58:05 -07:00
}
}
2015-11-16 21:57:18 -08:00
Statistics.EmotesReportedTotal += uint64(total)
return ResponseSuccess, nil
}
2015-11-16 12:50:00 -08:00
func aggregateDataSender() {
for {
2015-10-28 23:27:04 -07:00
time.Sleep(5 * time.Minute)
2015-11-16 12:50:00 -08:00
doSendAggregateData()
}
}
2015-11-16 12:50:00 -08:00
func doSendAggregateData() {
followEventsLock.Lock()
follows := followEvents
followEvents = nil
followEventsLock.Unlock()
aggregateEmoteUsageLock.Lock()
emoteUsage := aggregateEmoteUsage
aggregateEmoteUsage = make(map[int]map[string]int)
aggregateEmoteUsageLock.Unlock()
reportForm := url.Values{}
2015-11-15 18:43:34 -08:00
followJSON, err := json.Marshal(follows)
if err != nil {
2015-11-16 13:07:02 -08:00
log.Println("error reporting aggregate data:", err)
} else {
2015-11-15 18:43:34 -08:00
reportForm.Set("follows", string(followJSON))
}
strEmoteUsage := make(map[string]map[string]int)
2015-11-15 18:43:34 -08:00
for emoteID, usageByChannel := range emoteUsage {
strEmoteID := strconv.Itoa(emoteID)
strEmoteUsage[strEmoteID] = usageByChannel
}
2015-11-15 18:43:34 -08:00
emoteJSON, err := json.Marshal(strEmoteUsage)
if err != nil {
2015-11-16 13:07:02 -08:00
log.Println("error reporting aggregate data:", err)
} else {
2015-11-15 18:43:34 -08:00
reportForm.Set("emotes", string(emoteJSON))
}
form, err := SealRequest(reportForm)
if err != nil {
2015-11-16 13:07:02 -08:00
log.Println("error reporting aggregate data:", err)
return
}
err = SendAggregatedData(form)
if err != nil {
2015-11-16 13:07:02 -08:00
log.Println("error reporting aggregate data:", err)
return
}
// done
}
2015-11-16 12:50:00 -08:00
type bunchedRequest struct {
Command Command
2015-11-01 13:17:35 -08:00
Param string
}
2015-11-01 13:17:35 -08:00
2015-11-16 12:50:00 -08:00
type cachedBunchedResponse struct {
2015-11-01 13:17:35 -08:00
Response string
Timestamp time.Time
}
2015-11-16 12:50:00 -08:00
type bunchSubscriber struct {
2015-11-01 13:17:35 -08:00
Client *ClientInfo
MessageID int
}
2015-11-16 12:50:00 -08:00
type bunchSubscriberList struct {
sync.Mutex
2015-11-16 12:50:00 -08:00
Members []bunchSubscriber
}
2015-12-02 19:08:19 -08:00
type cacheStatus byte
2015-11-15 18:43:34 -08:00
const (
CacheStatusNotFound = iota
CacheStatusFound
CacheStatusExpired
)
2015-11-16 12:50:00 -08:00
var pendingBunchedRequests = make(map[bunchedRequest]*bunchSubscriberList)
var pendingBunchLock sync.Mutex
var bunchCache = make(map[bunchedRequest]cachedBunchedResponse)
var bunchCacheLock sync.RWMutex
var bunchCacheCleanupSignal = sync.NewCond(&bunchCacheLock)
var bunchCacheLastCleanup time.Time
func bunchedRequestFromCM(msg *ClientMessage) bunchedRequest {
return bunchedRequest{Command: msg.Command, Param: msg.origArguments}
}
func bunchCacheJanitor() {
go func() {
for {
2015-11-15 18:43:34 -08:00
time.Sleep(30 * time.Minute)
2015-11-16 12:50:00 -08:00
bunchCacheCleanupSignal.Signal()
}
}()
2015-11-16 12:50:00 -08:00
bunchCacheLock.Lock()
for {
// Unlocks CachedBunchLock, waits for signal, re-locks
2015-11-16 12:50:00 -08:00
bunchCacheCleanupSignal.Wait()
2015-11-16 12:50:00 -08:00
if bunchCacheLastCleanup.After(time.Now().Add(-1 * time.Second)) {
// skip if it's been less than 1 second
continue
}
// CachedBunchLock is held here
2015-11-15 18:43:34 -08:00
keepIfAfter := time.Now().Add(-5 * time.Minute)
2015-11-16 12:50:00 -08:00
for req, resp := range bunchCache {
if !resp.Timestamp.After(keepIfAfter) {
2015-11-16 12:50:00 -08:00
delete(bunchCache, req)
}
}
2015-11-16 12:50:00 -08:00
bunchCacheLastCleanup = time.Now()
// Loop and Wait(), which re-locks
}
}
2015-11-01 13:17:35 -08:00
2015-12-02 19:08:19 -08:00
var emptyCachedBunchedResponse cachedBunchedResponse
func bunchGetCacheStatus(br bunchedRequest, client *ClientInfo) (cacheStatus, cachedBunchedResponse) {
bunchCacheLock.RLock()
defer bunchCacheLock.RUnlock()
cachedResponse, ok := bunchCache[br]
if ok && cachedResponse.Timestamp.After(time.Now().Add(-5*time.Minute)) {
return CacheStatusFound, cachedResponse
} else if ok {
return CacheStatusExpired, emptyCachedBunchedResponse
}
return CacheStatusNotFound, emptyCachedBunchedResponse
}
2015-12-02 19:09:50 -08:00
func normalizeBunchedRequest(br bunchedRequest) bunchedRequest {
if br.Command == "get_link" {
// TODO
}
return br
}
2015-11-16 12:50:00 -08:00
// C2SHandleBunchedCommand handles C2S Commands such as `get_link`.
// It makes a request to the backend server for the data, but any other requests coming in while the first is pending also get the responses from the first one.
// Additionally, results are cached.
func C2SHandleBunchedCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
2015-12-02 19:08:19 -08:00
// FIXME(riking): Function is too complex
2015-11-16 12:50:00 -08:00
br := bunchedRequestFromCM(&msg)
2015-12-02 19:09:50 -08:00
br = normalizeBunchedRequest(br)
2015-12-02 19:08:19 -08:00
cacheStatus, cachedResponse := bunchGetCacheStatus(br, client)
if cacheStatus == CacheStatusFound {
2015-12-02 19:08:19 -08:00
var response ClientMessage
response.Command = SuccessCommand
response.MessageID = msg.MessageID
response.origArguments = cachedResponse.Response
response.parseOrigArguments()
return response, nil
} else if cacheStatus == CacheStatusExpired {
// Wake up the lazy janitor
2015-11-16 12:50:00 -08:00
bunchCacheCleanupSignal.Signal()
}
2015-11-16 12:50:00 -08:00
pendingBunchLock.Lock()
defer pendingBunchLock.Unlock()
list, ok := pendingBunchedRequests[br]
if ok {
list.Lock()
AddToSliceB(&list.Members, client, msg.MessageID)
list.Unlock()
return ClientMessage{Command: AsyncResponseCommand}, nil
}
2015-11-16 12:50:00 -08:00
pendingBunchedRequests[br] = &bunchSubscriberList{Members: []bunchSubscriber{{Client: client, MessageID: msg.MessageID}}}
2015-11-16 12:50:00 -08:00
go func(request bunchedRequest) {
respStr, err := SendRemoteCommandCached(string(request.Command), request.Param, AuthInfo{})
2015-11-02 22:59:38 -08:00
var msg ClientMessage
if err == nil {
msg.Command = SuccessCommand
msg.origArguments = respStr
msg.parseOrigArguments()
2015-11-02 22:59:38 -08:00
} else {
msg.Command = ErrorCommand
msg.Arguments = err.Error()
}
if err == nil {
2015-11-16 12:50:00 -08:00
bunchCacheLock.Lock()
bunchCache[request] = cachedBunchedResponse{Response: respStr, Timestamp: time.Now()}
bunchCacheLock.Unlock()
}
2015-11-16 12:50:00 -08:00
pendingBunchLock.Lock()
bsl := pendingBunchedRequests[request]
delete(pendingBunchedRequests, request)
pendingBunchLock.Unlock()
2015-11-02 22:59:38 -08:00
bsl.Lock()
for _, member := range bsl.Members {
msg.MessageID = member.MessageID
2015-11-03 16:44:42 -08:00
select {
case member.Client.MessageChannel <- msg:
case <-member.Client.MsgChannelIsDone:
}
2015-11-02 22:59:38 -08:00
}
bsl.Unlock()
}(br)
return ClientMessage{Command: AsyncResponseCommand}, nil
}
2015-11-16 12:50:00 -08:00
func C2SHandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
client.MsgChannelKeepalive.Add(1)
2015-11-08 16:44:16 -08:00
go doRemoteCommand(conn, msg, client)
return ClientMessage{Command: AsyncResponseCommand}, nil
}
2015-11-08 16:44:16 -08:00
const AuthorizationFailedErrorString = "Failed to verify your Twitch username."
func doRemoteCommand(conn *websocket.Conn, msg ClientMessage, client *ClientInfo) {
2015-11-08 22:01:32 -08:00
resp, err := SendRemoteCommandCached(string(msg.Command), msg.origArguments, client.AuthInfo)
2015-11-08 16:44:16 -08:00
2015-11-15 18:43:34 -08:00
if err == ErrAuthorizationNeeded {
2015-11-08 16:44:16 -08:00
client.StartAuthorization(func(_ *ClientInfo, success bool) {
if success {
doRemoteCommand(conn, msg, client)
} else {
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: AuthorizationFailedErrorString}
client.MsgChannelKeepalive.Done()
}
})
return // without keepalive.Done()
} else if err != nil {
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()}
} else {
msg := ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp}
msg.parseOrigArguments()
client.MessageChannel <- msg
}
client.MsgChannelKeepalive.Done()
}