1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-06-27 21:05:53 +00:00
FrankerFaceZ/socketserver/server/commands.go

445 lines
12 KiB
Go
Raw Normal View History

package server
import (
"encoding/json"
2015-11-16 12:50:00 -08:00
"fmt"
"log"
2015-10-28 18:12:20 -07:00
"net/url"
"sync"
2015-10-25 03:21:50 -07:00
"time"
2016-04-28 14:36:59 -07:00
"github.com/gorilla/websocket"
"github.com/satori/go.uuid"
"golang.org/x/sync/singleflight"
)
2015-11-15 18:43:34 -08:00
// Command is a string indicating which RPC is requested.
// The Commands sent from Client -> Server and Server -> Client are disjoint sets.
2015-11-08 22:34:06 -08:00
type Command string
2016-01-17 18:01:21 -08:00
// CommandHandler is a RPC handler associated with a Command.
2015-11-08 22:34:06 -08:00
type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error)
2015-11-15 18:43:34 -08:00
var commandHandlers = map[Command]CommandHandler{
HelloCommand: C2SHello,
"ping": C2SPing,
SetUserCommand: C2SSetUser,
ReadyCommand: C2SReady,
2015-11-16 12:50:00 -08:00
"sub": C2SSubscribe,
"unsub": C2SUnsubscribe,
"track_follow": C2STrackFollow,
"emoticon_uses": C2SEmoticonUses,
"survey": C2SSurvey,
}
2015-11-16 12:50:00 -08:00
var bunchedCommands = []Command{
"get_display_name",
"get_emote",
"get_emote_set",
"get_link",
"get_itad_plain",
"get_itad_prices",
"get_name_history",
"has_logs",
2015-11-08 22:34:06 -08:00
}
func setupInterning() {
PubSubChannelPool = NewStringPool()
TwitchChannelPool = NewStringPool()
2016-01-17 17:45:37 -08:00
CommandPool = NewStringPool()
2016-01-17 18:01:21 -08:00
CommandPool._Intern_Setup(string(HelloCommand))
2016-01-17 17:45:37 -08:00
CommandPool._Intern_Setup("ping")
2016-01-17 18:01:21 -08:00
CommandPool._Intern_Setup(string(SetUserCommand))
CommandPool._Intern_Setup(string(ReadyCommand))
2016-01-17 17:45:37 -08:00
CommandPool._Intern_Setup("sub")
CommandPool._Intern_Setup("unsub")
CommandPool._Intern_Setup("track_follow")
CommandPool._Intern_Setup("emoticon_uses")
CommandPool._Intern_Setup("twitch_emote")
CommandPool._Intern_Setup("get_emote")
CommandPool._Intern_Setup("get_emote_set")
CommandPool._Intern_Setup("has_logs")
2016-01-17 17:45:37 -08:00
CommandPool._Intern_Setup("get_link")
CommandPool._Intern_Setup("get_display_name")
CommandPool._Intern_Setup("update_follow_buttons")
}
2015-11-16 12:50:00 -08:00
// DispatchC2SCommand handles a C2S Command in the provided ClientMessage.
// It calls the correct CommandHandler function, catching panics.
// It sends either the returned Reply ClientMessage, setting the correct messageID, or sends an ErrorCommand
func DispatchC2SCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) {
2015-11-15 18:43:34 -08:00
handler, ok := commandHandlers[msg.Command]
if !ok {
2015-11-16 12:50:00 -08:00
handler = C2SHandleRemoteCommand
for _, v := range bunchedCommands {
if msg.Command == v {
handler = C2SHandleBunchedCommand
}
}
}
2015-12-16 11:58:48 -08:00
CommandCounter <- msg.Command
2015-11-16 13:07:02 -08:00
2015-11-16 12:50:00 -08:00
response, err := callHandler(handler, conn, client, msg)
if err == nil {
2015-10-26 14:55:20 -07:00
if response.Command == AsyncResponseCommand {
// Don't send anything
// The response will be delivered over client.MessageChannel / serverMessageChan
} else {
response.MessageID = msg.MessageID
SendMessage(conn, response)
2015-10-26 14:55:20 -07:00
}
} else {
SendMessage(conn, ClientMessage{
MessageID: msg.MessageID,
2015-11-16 12:50:00 -08:00
Command: ErrorCommand,
Arguments: err.Error(),
})
}
}
func callHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (_ ClientMessage, err error) {
2015-11-16 12:50:00 -08:00
defer func() {
if r := recover(); r != nil {
var ok bool
fmt.Print("[!] Error executing command", cmsg.Command, "--", r)
err, ok = r.(error)
if !ok {
err = fmt.Errorf("command handler: %v", r)
}
}
}()
return handler(conn, client, cmsg)
}
// C2SHello implements the `hello` C2S Command.
// It calls SubscribeGlobal() and SubscribeDefaults() with the client, and fills out ClientInfo.Version and ClientInfo.ClientID.
func C2SHello(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (_ ClientMessage, err error) {
2016-05-21 11:35:32 -07:00
ary, ok := msg.Arguments.([]interface{})
if !ok {
err = ErrExpectedTwoStrings
return
}
if len(ary) != 2 {
err = ErrExpectedTwoStrings
return
}
version, ok := ary[0].(string)
if !ok {
err = ErrExpectedTwoStrings
return
}
var clientID uuid.UUID
2016-05-21 11:35:32 -07:00
if clientIDStr, ok := ary[1].(string); ok {
clientID = uuid.FromStringOrNil(clientIDStr)
if clientID == uuid.Nil {
clientID = uuid.NewV4()
2016-05-21 11:35:32 -07:00
}
2017-09-15 12:58:14 -07:00
} else if _, ok := ary[1].(bool); ok {
2016-05-21 11:35:32 -07:00
// opt out
clientID = AnonymousClientID
2017-09-15 12:58:14 -07:00
} else if ary[1] == nil {
clientID = uuid.NewV4()
2016-05-21 11:38:48 -07:00
} else {
err = ErrExpectedTwoStrings
return
}
client.Mutex.Lock()
client.ClientID = clientID
2017-09-15 13:11:19 -07:00
client.VersionString = copyString(version)
client.Version = VersionFromString(version)
client.HelloOK = true
client.Mutex.Unlock()
2017-09-15 13:20:28 -07:00
uniqueUserChannel <- client.ClientID
SubscribeGlobal(client)
jsTime := float64(time.Now().UnixNano()/1000) / 1000
return ClientMessage{
Arguments: []interface{}{
client.ClientID.String(),
jsTime,
},
}, nil
}
func C2SPing(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) {
2015-11-16 15:23:27 -08:00
return ClientMessage{
2015-11-16 20:35:03 -08:00
Arguments: float64(time.Now().UnixNano()/1000) / 1000,
2015-11-16 16:56:27 -08:00
}, nil
2015-11-16 15:23:27 -08:00
}
func C2SSetUser(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
2015-11-16 15:23:27 -08:00
username, err := msg.ArgumentsAsString()
if err != nil {
2017-09-15 16:40:40 -07:00
return ClientMessage{}, err
2015-11-16 15:23:27 -08:00
}
username = copyString(username)
2015-11-16 15:23:27 -08:00
client.Mutex.Lock()
client.UsernameValidated = false
2016-01-17 18:01:21 -08:00
client.TwitchUsername = username
2015-11-16 15:23:27 -08:00
client.Mutex.Unlock()
if Configuration.SendAuthToNewClients {
client.MsgChannelKeepalive.Add(1)
go client.StartAuthorization(func(_ *ClientInfo, _ bool) {
client.MsgChannelKeepalive.Done()
})
}
return ResponseSuccess, nil
}
func C2SReady(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
2015-10-26 14:55:20 -07:00
client.Mutex.Lock()
2016-04-28 14:39:20 -07:00
client.ReadyComplete = true
2015-10-26 14:55:20 -07:00
client.Mutex.Unlock()
client.MsgChannelKeepalive.Add(1)
go func() {
client.Send(msg.Reply(SuccessCommand, nil))
SendBacklogForNewClient(client)
client.MsgChannelKeepalive.Done()
}()
return ClientMessage{Command: AsyncResponseCommand}, nil
2015-10-26 14:55:20 -07:00
}
func C2SSubscribe(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
channel, err := msg.ArgumentsAsString()
if err != nil {
2017-09-15 16:40:40 -07:00
return ClientMessage{}, err
}
channel = PubSubChannelPool.Intern(channel)
2015-10-25 03:21:50 -07:00
client.Mutex.Lock()
AddToSliceS(&client.CurrentChannels, channel)
2015-10-25 03:21:50 -07:00
client.Mutex.Unlock()
2015-11-08 22:34:06 -08:00
SubscribeChannel(client, channel)
2016-04-28 14:39:20 -07:00
if client.ReadyComplete {
client.MsgChannelKeepalive.Add(1)
go func() {
SendBacklogForChannel(client, channel)
client.MsgChannelKeepalive.Done()
}()
}
return ResponseSuccess, nil
}
2015-11-16 12:50:00 -08:00
// C2SUnsubscribe implements the `unsub` C2S Command.
// It removes the channel from ClientInfo.CurrentChannels and calls UnsubscribeSingleChat.
func C2SUnsubscribe(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
channel, err := msg.ArgumentsAsString()
if err != nil {
2017-09-15 16:40:40 -07:00
return ClientMessage{}, err
}
channel = PubSubChannelPool.Intern(channel)
2015-10-25 03:21:50 -07:00
client.Mutex.Lock()
RemoveFromSliceS(&client.CurrentChannels, channel)
2015-10-25 03:21:50 -07:00
client.Mutex.Unlock()
UnsubscribeSingleChat(client, channel)
return ResponseSuccess, nil
}
2015-11-16 12:50:00 -08:00
// C2SSurvey implements the survey C2S Command.
func C2SSurvey(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) {
// Surveys are not collected.
return ResponseSuccess, nil
}
2015-11-16 12:50:00 -08:00
type followEvent struct {
2015-11-05 23:24:35 -08:00
User string `json:"u"`
Channel string `json:"c"`
NowFollowing bool `json:"f"`
Timestamp time.Time `json:"t"`
2015-10-25 03:21:50 -07:00
}
2015-11-16 12:50:00 -08:00
var followEvents []followEvent
2015-10-25 03:21:50 -07:00
2015-11-16 12:50:00 -08:00
// followEventsLock is the lock for followEvents.
var followEventsLock sync.Mutex
// C2STrackFollow implements the `track_follow` C2S Command.
// It adds the record to `followEvents`, which is submitted to the backend on a timer.
func C2STrackFollow(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (_ ClientMessage, err error) {
2015-10-25 03:21:50 -07:00
channel, following, err := msg.ArgumentsAsStringAndBool()
if err != nil {
return
}
now := time.Now()
channel = TwitchChannelPool.Intern(channel)
2015-11-16 12:50:00 -08:00
followEventsLock.Lock()
2016-01-17 18:01:21 -08:00
followEvents = append(followEvents, followEvent{User: client.TwitchUsername, Channel: channel, NowFollowing: following, Timestamp: now})
2015-11-16 12:50:00 -08:00
followEventsLock.Unlock()
return ResponseSuccess, nil
}
2015-11-15 18:43:34 -08:00
// AggregateEmoteUsage is a map from emoteID to a map from chatroom name to usage count.
//var aggregateEmoteUsage = make(map[int]map[string]int)
2015-11-15 18:43:34 -08:00
// AggregateEmoteUsageLock is the lock for AggregateEmoteUsage.
//var aggregateEmoteUsageLock sync.Mutex
2015-11-15 18:43:34 -08:00
// ErrNegativeEmoteUsage is emitted when the submitted emote usage is negative.
//var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative")
2015-10-25 00:58:05 -07:00
2015-11-16 12:50:00 -08:00
// C2SEmoticonUses implements the `emoticon_uses` C2S Command.
// msg.Arguments are in the JSON format of [1]map[emoteID]map[ChatroomName]float64.
func C2SEmoticonUses(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) {
// We do not collect emote usage data
return ResponseSuccess, nil
}
// is_init_func
2015-11-16 12:50:00 -08:00
func aggregateDataSender() {
for {
2015-10-28 23:27:04 -07:00
time.Sleep(5 * time.Minute)
aggregateDataSender_do()
}
}
func aggregateDataSender_do() {
2015-11-16 12:50:00 -08:00
followEventsLock.Lock()
follows := followEvents
followEvents = nil
followEventsLock.Unlock()
//aggregateEmoteUsageLock.Lock()
//emoteUsage := aggregateEmoteUsage
//aggregateEmoteUsage = make(map[int]map[string]int)
//aggregateEmoteUsageLock.Unlock()
reportForm := url.Values{}
2015-11-15 18:43:34 -08:00
followJSON, err := json.Marshal(follows)
if err != nil {
2015-11-16 13:07:02 -08:00
log.Println("error reporting aggregate data:", err)
} else {
2015-11-15 18:43:34 -08:00
reportForm.Set("follows", string(followJSON))
}
strEmoteUsage := make(map[string]map[string]int)
//for emoteID, usageByChannel := range emoteUsage {
// strEmoteID := strconv.Itoa(emoteID)
// strEmoteUsage[strEmoteID] = usageByChannel
//}
2015-11-15 18:43:34 -08:00
emoteJSON, err := json.Marshal(strEmoteUsage)
if err != nil {
2015-11-16 13:07:02 -08:00
log.Println("error reporting aggregate data:", err)
} else {
2015-11-15 18:43:34 -08:00
reportForm.Set("emotes", string(emoteJSON))
}
2016-06-02 08:47:07 -07:00
err = Backend.SendAggregatedData(reportForm)
if err != nil {
2015-11-16 13:07:02 -08:00
log.Println("error reporting aggregate data:", err)
return
}
// done
}
2015-11-16 12:50:00 -08:00
type bunchedRequest struct {
Command Command
2015-11-01 13:17:35 -08:00
Param string
}
2015-11-01 13:17:35 -08:00
2015-11-16 12:50:00 -08:00
type cachedBunchedResponse struct {
2015-11-01 13:17:35 -08:00
Response string
Timestamp time.Time
}
2015-11-16 12:50:00 -08:00
type bunchSubscriber struct {
2015-11-01 13:17:35 -08:00
Client *ClientInfo
MessageID int
}
2015-11-16 12:50:00 -08:00
type bunchSubscriberList struct {
sync.Mutex
2015-11-16 12:50:00 -08:00
Members []bunchSubscriber
}
var bunchGroup singleflight.Group
2015-12-02 19:09:50 -08:00
2015-11-16 12:50:00 -08:00
// C2SHandleBunchedCommand handles C2S Commands such as `get_link`.
// It makes a request to the backend server for the data, but any other requests coming in while the first is pending also get the responses from the first one.
func C2SHandleBunchedCommand(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
key := fmt.Sprintf("%s:%s", msg.Command, msg.origArguments)
2015-12-02 19:08:19 -08:00
resultCh := bunchGroup.DoChan(key, func() (interface{}, error) {
2017-09-15 12:28:01 -07:00
return Backend.SendRemoteCommandCached(string(msg.Command), msg.origArguments, AuthInfo{})
})
client.MsgChannelKeepalive.Add(1)
go func() {
result := <-resultCh
if efb, ok := result.Err.(ErrForwardedFromBackend); ok {
client.Send(msg.Reply(ErrorCommand, efb.JSONError))
} else if result.Err != nil {
client.Send(msg.Reply(ErrorCommand, result.Err.Error()))
} else {
client.Send(msg.ReplyJSON(SuccessCommand, result.Val.(string)))
2015-11-02 22:59:38 -08:00
}
client.MsgChannelKeepalive.Done()
}()
2015-11-02 22:59:38 -08:00
return ClientMessage{Command: AsyncResponseCommand}, nil
}
func C2SHandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
client.MsgChannelKeepalive.Add(1)
2015-11-08 16:44:16 -08:00
go doRemoteCommand(conn, msg, client)
return ClientMessage{Command: AsyncResponseCommand}, nil
}
2015-11-08 16:44:16 -08:00
const AuthorizationFailedErrorString = "Failed to verify your Twitch username."
const AuthorizationNeededError = "You must be signed in to use that command."
2015-11-08 16:44:16 -08:00
func doRemoteCommand(conn *websocket.Conn, msg ClientMessage, client *ClientInfo) {
2016-06-02 08:36:02 -07:00
resp, err := Backend.SendRemoteCommandCached(string(msg.Command), copyString(msg.origArguments), client.AuthInfo)
2015-11-08 16:44:16 -08:00
2015-11-15 18:43:34 -08:00
if err == ErrAuthorizationNeeded {
if client.TwitchUsername == "" {
// Not logged in
client.Send(msg.Reply(ErrorCommand, AuthorizationNeededError))
client.MsgChannelKeepalive.Done()
return
}
2015-11-08 16:44:16 -08:00
client.StartAuthorization(func(_ *ClientInfo, success bool) {
if success {
doRemoteCommand(conn, msg, client)
} else {
client.Send(msg.Reply(ErrorCommand, AuthorizationFailedErrorString))
2015-11-08 16:44:16 -08:00
client.MsgChannelKeepalive.Done()
}
})
return // without keepalive.Done()
} else if bfe, ok := err.(ErrForwardedFromBackend); ok {
client.Send(msg.Reply(ErrorCommand, bfe.JSONError))
2015-11-08 16:44:16 -08:00
} else if err != nil {
client.Send(msg.Reply(ErrorCommand, err.Error()))
2015-11-08 16:44:16 -08:00
} else {
client.Send(msg.ReplyJSON(SuccessCommand, resp))
2015-11-08 16:44:16 -08:00
}
client.MsgChannelKeepalive.Done()
}