mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-08-03 00:18:31 +00:00
Fix the build (move out of internal)
This commit is contained in:
parent
c38b9b0018
commit
58dd0bd9ee
15 changed files with 4 additions and 5 deletions
530
socketserver/server/commands.go
Normal file
530
socketserver/server/commands.go
Normal file
|
@ -0,0 +1,530 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/satori/go.uuid"
|
||||
"log"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Command is a string indicating which RPC is requested.
|
||||
// The Commands sent from Client -> Server and Server -> Client are disjoint sets.
|
||||
type Command string
|
||||
|
||||
// CommandHandler is a RPC handler assosciated with a Command.
|
||||
type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error)
|
||||
|
||||
var commandHandlers = map[Command]CommandHandler{
|
||||
HelloCommand: C2SHello,
|
||||
"setuser": C2SSetUser,
|
||||
"ready": C2SReady,
|
||||
|
||||
"sub": C2SSubscribe,
|
||||
"unsub": C2SUnsubscribe,
|
||||
|
||||
"track_follow": C2STrackFollow,
|
||||
"emoticon_uses": C2SEmoticonUses,
|
||||
"survey": C2SSurvey,
|
||||
|
||||
"twitch_emote": C2SHandleRemoteCommand,
|
||||
"get_link": C2SHandleBunchedCommand,
|
||||
"get_display_name": C2SHandleBunchedCommand,
|
||||
"update_follow_buttons": C2SHandleRemoteCommand,
|
||||
"chat_history": C2SHandleRemoteCommand,
|
||||
}
|
||||
|
||||
// DispatchC2SCommand handles a C2S Command in the provided ClientMessage.
|
||||
// It calls the correct CommandHandler function, catching panics.
|
||||
// It sends either the returned Reply ClientMessage, setting the correct messageID, or sends an ErrorCommand
|
||||
func DispatchC2SCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) {
|
||||
handler, ok := commandHandlers[msg.Command]
|
||||
if !ok {
|
||||
handler = C2SHandleRemoteCommand
|
||||
}
|
||||
|
||||
Statistics.CommandsIssuedTotal++
|
||||
Statistics.CommandsIssuedMap[msg.Command]++
|
||||
|
||||
response, err := callHandler(handler, conn, client, msg)
|
||||
|
||||
if err == nil {
|
||||
if response.Command == AsyncResponseCommand {
|
||||
// Don't send anything
|
||||
// The response will be delivered over client.MessageChannel / serverMessageChan
|
||||
} else {
|
||||
response.MessageID = msg.MessageID
|
||||
SendMessage(conn, response)
|
||||
}
|
||||
} else {
|
||||
SendMessage(conn, ClientMessage{
|
||||
MessageID: msg.MessageID,
|
||||
Command: ErrorCommand,
|
||||
Arguments: err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func callHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
var ok bool
|
||||
fmt.Print("[!] Error executing command", cmsg.Command, "--", r)
|
||||
err, ok = r.(error)
|
||||
if !ok {
|
||||
err = fmt.Errorf("command handler: %v", r)
|
||||
}
|
||||
}
|
||||
}()
|
||||
return handler(conn, client, cmsg)
|
||||
}
|
||||
|
||||
// C2SHello implements the `hello` C2S Command.
|
||||
// It calls SubscribeGlobal() and SubscribeDefaults() with the client, and fills out ClientInfo.Version and ClientInfo.ClientID.
|
||||
func C2SHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
version, clientID, err := msg.ArgumentsAsTwoStrings()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
client.Version = version
|
||||
client.ClientID = uuid.FromStringOrNil(clientID)
|
||||
if client.ClientID == uuid.Nil {
|
||||
client.ClientID = uuid.NewV4()
|
||||
}
|
||||
|
||||
SubscribeGlobal(client)
|
||||
SubscribeDefaults(client)
|
||||
|
||||
return ClientMessage{
|
||||
Arguments: client.ClientID.String(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func C2SReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
disconnectAt, err := msg.ArgumentsAsInt()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
client.Mutex.Lock()
|
||||
if client.MakePendingRequests != nil {
|
||||
if !client.MakePendingRequests.Stop() {
|
||||
// Timer already fired, GetSubscriptionBacklog() has started
|
||||
rmsg.Command = SuccessCommand
|
||||
return
|
||||
}
|
||||
}
|
||||
client.PendingSubscriptionsBacklog = nil
|
||||
client.MakePendingRequests = nil
|
||||
client.Mutex.Unlock()
|
||||
|
||||
client.MsgChannelKeepalive.Add(1)
|
||||
go func() {
|
||||
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand}
|
||||
SendBacklogForNewClient(client)
|
||||
if disconnectAt != 0 {
|
||||
SendTimedBacklogMessages(client, time.Unix(disconnectAt, 0))
|
||||
}
|
||||
client.MsgChannelKeepalive.Done()
|
||||
}()
|
||||
return ClientMessage{Command: AsyncResponseCommand}, nil
|
||||
}
|
||||
|
||||
func C2SSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
username, err := msg.ArgumentsAsString()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
client.Mutex.Lock()
|
||||
client.TwitchUsername = username
|
||||
client.UsernameValidated = false
|
||||
client.Mutex.Unlock()
|
||||
|
||||
if Configuration.SendAuthToNewClients {
|
||||
client.MsgChannelKeepalive.Add(1)
|
||||
go client.StartAuthorization(func(_ *ClientInfo, _ bool) {
|
||||
client.MsgChannelKeepalive.Done()
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
return ResponseSuccess, nil
|
||||
}
|
||||
|
||||
func C2SSubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
channel, err := msg.ArgumentsAsString()
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
client.Mutex.Lock()
|
||||
AddToSliceS(&client.CurrentChannels, channel)
|
||||
if usePendingSubscrptionsBacklog {
|
||||
client.PendingSubscriptionsBacklog = append(client.PendingSubscriptionsBacklog, channel)
|
||||
}
|
||||
client.Mutex.Unlock()
|
||||
|
||||
SubscribeChannel(client, channel)
|
||||
|
||||
return ResponseSuccess, nil
|
||||
}
|
||||
|
||||
// C2SUnsubscribe implements the `unsub` C2S Command.
|
||||
// It removes the channel from ClientInfo.CurrentChannels and calls UnsubscribeSingleChat.
|
||||
func C2SUnsubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
channel, err := msg.ArgumentsAsString()
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
client.Mutex.Lock()
|
||||
RemoveFromSliceS(&client.CurrentChannels, channel)
|
||||
client.Mutex.Unlock()
|
||||
|
||||
UnsubscribeSingleChat(client, channel)
|
||||
|
||||
return ResponseSuccess, nil
|
||||
}
|
||||
|
||||
// C2SSurvey implements the survey C2S Command.
|
||||
// Surveys are discarded.s
|
||||
func C2SSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
// Discard
|
||||
return ResponseSuccess, nil
|
||||
}
|
||||
|
||||
type followEvent struct {
|
||||
User string `json:"u"`
|
||||
Channel string `json:"c"`
|
||||
NowFollowing bool `json:"f"`
|
||||
Timestamp time.Time `json:"t"`
|
||||
}
|
||||
|
||||
var followEvents []followEvent
|
||||
|
||||
// followEventsLock is the lock for followEvents.
|
||||
var followEventsLock sync.Mutex
|
||||
|
||||
// C2STrackFollow implements the `track_follow` C2S Command.
|
||||
// It adds the record to `followEvents`, which is submitted to the backend on a timer.
|
||||
func C2STrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
channel, following, err := msg.ArgumentsAsStringAndBool()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
|
||||
followEventsLock.Lock()
|
||||
followEvents = append(followEvents, followEvent{client.TwitchUsername, channel, following, now})
|
||||
followEventsLock.Unlock()
|
||||
|
||||
return ResponseSuccess, nil
|
||||
}
|
||||
|
||||
// AggregateEmoteUsage is a map from emoteID to a map from chatroom name to usage count.
|
||||
var aggregateEmoteUsage = make(map[int]map[string]int)
|
||||
|
||||
// AggregateEmoteUsageLock is the lock for AggregateEmoteUsage.
|
||||
var aggregateEmoteUsageLock sync.Mutex
|
||||
|
||||
// ErrNegativeEmoteUsage is emitted when the submitted emote usage is negative.
|
||||
var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative")
|
||||
|
||||
// C2SEmoticonUses implements the `emoticon_uses` C2S Command.
|
||||
// msg.Arguments are in the JSON format of [1]map[emoteID]map[ChatroomName]float64.
|
||||
func C2SEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
// if this panics, will be caught by callHandler
|
||||
mapRoot := msg.Arguments.([]interface{})[0].(map[string]interface{})
|
||||
|
||||
// Validate: male suire
|
||||
for strEmote, val1 := range mapRoot {
|
||||
_, err = strconv.Atoi(strEmote)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
mapInner := val1.(map[string]interface{})
|
||||
for _, val2 := range mapInner {
|
||||
var count = int(val2.(float64))
|
||||
if count <= 0 {
|
||||
err = ErrNegativeEmoteUsage
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
aggregateEmoteUsageLock.Lock()
|
||||
defer aggregateEmoteUsageLock.Unlock()
|
||||
|
||||
for strEmote, val1 := range mapRoot {
|
||||
var emoteID int
|
||||
emoteID, err = strconv.Atoi(strEmote)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
destMapInner, ok := aggregateEmoteUsage[emoteID]
|
||||
if !ok {
|
||||
destMapInner = make(map[string]int)
|
||||
aggregateEmoteUsage[emoteID] = destMapInner
|
||||
}
|
||||
|
||||
mapInner := val1.(map[string]interface{})
|
||||
for roomName, val2 := range mapInner {
|
||||
var count = int(val2.(float64))
|
||||
if count > 200 {
|
||||
count = 200
|
||||
}
|
||||
destMapInner[roomName] += count
|
||||
}
|
||||
}
|
||||
|
||||
return ResponseSuccess, nil
|
||||
}
|
||||
|
||||
func aggregateDataSender() {
|
||||
for {
|
||||
time.Sleep(5 * time.Minute)
|
||||
doSendAggregateData()
|
||||
}
|
||||
}
|
||||
|
||||
func doSendAggregateData() {
|
||||
followEventsLock.Lock()
|
||||
follows := followEvents
|
||||
followEvents = nil
|
||||
followEventsLock.Unlock()
|
||||
aggregateEmoteUsageLock.Lock()
|
||||
emoteUsage := aggregateEmoteUsage
|
||||
aggregateEmoteUsage = make(map[int]map[string]int)
|
||||
aggregateEmoteUsageLock.Unlock()
|
||||
|
||||
reportForm := url.Values{}
|
||||
|
||||
followJSON, err := json.Marshal(follows)
|
||||
if err != nil {
|
||||
log.Println("error reporting aggregate data:", err)
|
||||
} else {
|
||||
reportForm.Set("follows", string(followJSON))
|
||||
}
|
||||
|
||||
strEmoteUsage := make(map[string]map[string]int)
|
||||
for emoteID, usageByChannel := range emoteUsage {
|
||||
strEmoteID := strconv.Itoa(emoteID)
|
||||
strEmoteUsage[strEmoteID] = usageByChannel
|
||||
}
|
||||
emoteJSON, err := json.Marshal(strEmoteUsage)
|
||||
if err != nil {
|
||||
log.Println("error reporting aggregate data:", err)
|
||||
} else {
|
||||
reportForm.Set("emotes", string(emoteJSON))
|
||||
}
|
||||
|
||||
form, err := SealRequest(reportForm)
|
||||
if err != nil {
|
||||
log.Println("error reporting aggregate data:", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = SendAggregatedData(form)
|
||||
if err != nil {
|
||||
log.Println("error reporting aggregate data:", err)
|
||||
return
|
||||
}
|
||||
|
||||
// done
|
||||
}
|
||||
|
||||
type bunchedRequest struct {
|
||||
Command Command
|
||||
Param string
|
||||
}
|
||||
|
||||
type cachedBunchedResponse struct {
|
||||
Response string
|
||||
Timestamp time.Time
|
||||
}
|
||||
type bunchSubscriber struct {
|
||||
Client *ClientInfo
|
||||
MessageID int
|
||||
}
|
||||
|
||||
type bunchSubscriberList struct {
|
||||
sync.Mutex
|
||||
Members []bunchSubscriber
|
||||
}
|
||||
|
||||
type CacheStatus byte
|
||||
|
||||
const (
|
||||
CacheStatusNotFound = iota
|
||||
CacheStatusFound
|
||||
CacheStatusExpired
|
||||
)
|
||||
|
||||
var pendingBunchedRequests = make(map[bunchedRequest]*bunchSubscriberList)
|
||||
var pendingBunchLock sync.Mutex
|
||||
var bunchCache = make(map[bunchedRequest]cachedBunchedResponse)
|
||||
var bunchCacheLock sync.RWMutex
|
||||
var bunchCacheCleanupSignal = sync.NewCond(&bunchCacheLock)
|
||||
var bunchCacheLastCleanup time.Time
|
||||
|
||||
func bunchedRequestFromCM(msg *ClientMessage) bunchedRequest {
|
||||
return bunchedRequest{Command: msg.Command, Param: msg.origArguments}
|
||||
}
|
||||
|
||||
func bunchCacheJanitor() {
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(30 * time.Minute)
|
||||
bunchCacheCleanupSignal.Signal()
|
||||
}
|
||||
}()
|
||||
|
||||
bunchCacheLock.Lock()
|
||||
for {
|
||||
// Unlocks CachedBunchLock, waits for signal, re-locks
|
||||
bunchCacheCleanupSignal.Wait()
|
||||
|
||||
if bunchCacheLastCleanup.After(time.Now().Add(-1 * time.Second)) {
|
||||
// skip if it's been less than 1 second
|
||||
continue
|
||||
}
|
||||
|
||||
// CachedBunchLock is held here
|
||||
keepIfAfter := time.Now().Add(-5 * time.Minute)
|
||||
for req, resp := range bunchCache {
|
||||
if !resp.Timestamp.After(keepIfAfter) {
|
||||
delete(bunchCache, req)
|
||||
}
|
||||
}
|
||||
bunchCacheLastCleanup = time.Now()
|
||||
// Loop and Wait(), which re-locks
|
||||
}
|
||||
}
|
||||
|
||||
// C2SHandleBunchedCommand handles C2S Commands such as `get_link`.
|
||||
// It makes a request to the backend server for the data, but any other requests coming in while the first is pending also get the responses from the first one.
|
||||
// Additionally, results are cached.
|
||||
func C2SHandleBunchedCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
br := bunchedRequestFromCM(&msg)
|
||||
|
||||
cacheStatus := func() byte {
|
||||
bunchCacheLock.RLock()
|
||||
defer bunchCacheLock.RUnlock()
|
||||
bresp, ok := bunchCache[br]
|
||||
if ok && bresp.Timestamp.After(time.Now().Add(-5*time.Minute)) {
|
||||
client.MsgChannelKeepalive.Add(1)
|
||||
go func() {
|
||||
var rmsg ClientMessage
|
||||
rmsg.Command = SuccessCommand
|
||||
rmsg.MessageID = msg.MessageID
|
||||
rmsg.origArguments = bresp.Response
|
||||
rmsg.parseOrigArguments()
|
||||
client.MessageChannel <- rmsg
|
||||
client.MsgChannelKeepalive.Done()
|
||||
}()
|
||||
return CacheStatusFound
|
||||
} else if ok {
|
||||
return CacheStatusExpired
|
||||
}
|
||||
return CacheStatusNotFound
|
||||
}()
|
||||
|
||||
if cacheStatus == CacheStatusFound {
|
||||
return ClientMessage{Command: AsyncResponseCommand}, nil
|
||||
} else if cacheStatus == CacheStatusExpired {
|
||||
// Wake up the lazy janitor
|
||||
bunchCacheCleanupSignal.Signal()
|
||||
}
|
||||
|
||||
pendingBunchLock.Lock()
|
||||
defer pendingBunchLock.Unlock()
|
||||
list, ok := pendingBunchedRequests[br]
|
||||
if ok {
|
||||
list.Lock()
|
||||
AddToSliceB(&list.Members, client, msg.MessageID)
|
||||
list.Unlock()
|
||||
|
||||
return ClientMessage{Command: AsyncResponseCommand}, nil
|
||||
}
|
||||
|
||||
pendingBunchedRequests[br] = &bunchSubscriberList{Members: []bunchSubscriber{{Client: client, MessageID: msg.MessageID}}}
|
||||
|
||||
go func(request bunchedRequest) {
|
||||
respStr, err := SendRemoteCommandCached(string(request.Command), request.Param, AuthInfo{})
|
||||
|
||||
var msg ClientMessage
|
||||
if err == nil {
|
||||
msg.Command = SuccessCommand
|
||||
msg.origArguments = respStr
|
||||
msg.parseOrigArguments()
|
||||
} else {
|
||||
msg.Command = ErrorCommand
|
||||
msg.Arguments = err.Error()
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
bunchCacheLock.Lock()
|
||||
bunchCache[request] = cachedBunchedResponse{Response: respStr, Timestamp: time.Now()}
|
||||
bunchCacheLock.Unlock()
|
||||
}
|
||||
|
||||
pendingBunchLock.Lock()
|
||||
bsl := pendingBunchedRequests[request]
|
||||
delete(pendingBunchedRequests, request)
|
||||
pendingBunchLock.Unlock()
|
||||
|
||||
bsl.Lock()
|
||||
for _, member := range bsl.Members {
|
||||
msg.MessageID = member.MessageID
|
||||
select {
|
||||
case member.Client.MessageChannel <- msg:
|
||||
case <-member.Client.MsgChannelIsDone:
|
||||
}
|
||||
}
|
||||
bsl.Unlock()
|
||||
}(br)
|
||||
|
||||
return ClientMessage{Command: AsyncResponseCommand}, nil
|
||||
}
|
||||
|
||||
func C2SHandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
client.MsgChannelKeepalive.Add(1)
|
||||
go doRemoteCommand(conn, msg, client)
|
||||
|
||||
return ClientMessage{Command: AsyncResponseCommand}, nil
|
||||
}
|
||||
|
||||
const AuthorizationFailedErrorString = "Failed to verify your Twitch username."
|
||||
|
||||
func doRemoteCommand(conn *websocket.Conn, msg ClientMessage, client *ClientInfo) {
|
||||
resp, err := SendRemoteCommandCached(string(msg.Command), msg.origArguments, client.AuthInfo)
|
||||
|
||||
if err == ErrAuthorizationNeeded {
|
||||
client.StartAuthorization(func(_ *ClientInfo, success bool) {
|
||||
if success {
|
||||
doRemoteCommand(conn, msg, client)
|
||||
} else {
|
||||
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: AuthorizationFailedErrorString}
|
||||
client.MsgChannelKeepalive.Done()
|
||||
}
|
||||
})
|
||||
return // without keepalive.Done()
|
||||
} else if err != nil {
|
||||
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()}
|
||||
} else {
|
||||
msg := ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp}
|
||||
msg.parseOrigArguments()
|
||||
client.MessageChannel <- msg
|
||||
}
|
||||
client.MsgChannelKeepalive.Done()
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue