2015-10-25 00:44:25 -07:00
package server
import (
2015-10-28 15:19:22 -07:00
"encoding/json"
2015-11-01 18:23:22 -08:00
"errors"
2015-11-16 12:50:00 -08:00
"fmt"
2015-10-28 15:19:22 -07:00
"github.com/gorilla/websocket"
2015-10-25 00:44:25 -07:00
"github.com/satori/go.uuid"
"log"
2015-10-28 18:12:20 -07:00
"net/url"
2015-10-25 00:58:05 -07:00
"strconv"
2015-10-26 10:06:45 -07:00
"sync"
2015-10-25 03:21:50 -07:00
"time"
2015-10-25 00:44:25 -07:00
)
2015-11-15 18:43:34 -08:00
// Command is a string indicating which RPC is requested.
// The Commands sent from Client -> Server and Server -> Client are disjoint sets.
2015-11-08 22:34:06 -08:00
type Command string
2015-11-15 18:43:34 -08:00
// CommandHandler is a RPC handler assosciated with a Command.
2015-11-08 22:34:06 -08:00
type CommandHandler func ( * websocket . Conn , * ClientInfo , ClientMessage ) ( ClientMessage , error )
2015-11-15 18:43:34 -08:00
var commandHandlers = map [ Command ] CommandHandler {
2015-12-16 13:59:51 -08:00
HelloCommand : C2SHello ,
"ping" : C2SPing ,
SetUserCommand : C2SSetUser ,
ReadyCommand : C2SReady ,
2015-11-16 12:50:00 -08:00
"sub" : C2SSubscribe ,
"unsub" : C2SUnsubscribe ,
"track_follow" : C2STrackFollow ,
"emoticon_uses" : C2SEmoticonUses ,
"survey" : C2SSurvey ,
2015-11-16 22:07:43 -08:00
"twitch_emote" : C2SHandleBunchedCommand ,
2015-11-16 12:50:00 -08:00
"get_link" : C2SHandleBunchedCommand ,
"get_display_name" : C2SHandleBunchedCommand ,
"update_follow_buttons" : C2SHandleRemoteCommand ,
"chat_history" : C2SHandleRemoteCommand ,
2015-11-16 22:10:55 -08:00
"user_history" : C2SHandleRemoteCommand ,
2015-11-08 22:34:06 -08:00
}
2015-10-25 00:44:25 -07:00
2015-11-16 12:50:00 -08:00
// DispatchC2SCommand handles a C2S Command in the provided ClientMessage.
// It calls the correct CommandHandler function, catching panics.
// It sends either the returned Reply ClientMessage, setting the correct messageID, or sends an ErrorCommand
func DispatchC2SCommand ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) {
2015-11-15 18:43:34 -08:00
handler , ok := commandHandlers [ msg . Command ]
2015-10-25 00:44:25 -07:00
if ! ok {
2015-11-16 12:50:00 -08:00
handler = C2SHandleRemoteCommand
2015-10-25 00:44:25 -07:00
}
2015-12-16 11:58:48 -08:00
CommandCounter <- msg . Command
2015-11-16 13:07:02 -08:00
2015-11-16 12:50:00 -08:00
response , err := callHandler ( handler , conn , client , msg )
2015-10-25 00:44:25 -07:00
if err == nil {
2015-10-26 14:55:20 -07:00
if response . Command == AsyncResponseCommand {
// Don't send anything
// The response will be delivered over client.MessageChannel / serverMessageChan
} else {
response . MessageID = msg . MessageID
2015-10-28 15:19:22 -07:00
SendMessage ( conn , response )
2015-10-26 14:55:20 -07:00
}
2015-10-25 00:44:25 -07:00
} else {
2015-10-28 15:19:22 -07:00
SendMessage ( conn , ClientMessage {
2015-10-25 00:44:25 -07:00
MessageID : msg . MessageID ,
2015-11-16 12:50:00 -08:00
Command : ErrorCommand ,
2015-10-25 00:44:25 -07:00
Arguments : err . Error ( ) ,
} )
}
}
2015-11-16 12:50:00 -08:00
func callHandler ( handler CommandHandler , conn * websocket . Conn , client * ClientInfo , cmsg ClientMessage ) ( rmsg ClientMessage , err error ) {
defer func ( ) {
if r := recover ( ) ; r != nil {
var ok bool
fmt . Print ( "[!] Error executing command" , cmsg . Command , "--" , r )
err , ok = r . ( error )
if ! ok {
err = fmt . Errorf ( "command handler: %v" , r )
}
}
} ( )
return handler ( conn , client , cmsg )
}
2015-11-16 14:30:09 -08:00
var lastVersionWithoutReplyWithServerTime = VersionFromString ( "ffz_3.5.78" )
2015-11-16 12:50:00 -08:00
// C2SHello implements the `hello` C2S Command.
// It calls SubscribeGlobal() and SubscribeDefaults() with the client, and fills out ClientInfo.Version and ClientInfo.ClientID.
func C2SHello ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) ( rmsg ClientMessage , err error ) {
2015-11-15 18:43:34 -08:00
version , clientID , err := msg . ArgumentsAsTwoStrings ( )
2015-10-25 00:44:25 -07:00
if err != nil {
return
}
2015-11-16 14:30:09 -08:00
client . VersionString = version
client . Version = VersionFromString ( version )
2015-11-15 18:43:34 -08:00
client . ClientID = uuid . FromStringOrNil ( clientID )
2015-10-25 00:44:25 -07:00
if client . ClientID == uuid . Nil {
client . ClientID = uuid . NewV4 ( )
}
2015-12-23 21:57:33 -08:00
uniqueUserChannel <- client . ClientID
2015-10-26 10:07:15 -07:00
SubscribeGlobal ( client )
2015-11-09 14:44:33 -08:00
SubscribeDefaults ( client )
2015-10-26 10:07:15 -07:00
2015-11-16 14:48:24 -08:00
if client . Version . After ( & lastVersionWithoutReplyWithServerTime ) {
2015-11-16 20:35:03 -08:00
jsTime := float64 ( time . Now ( ) . UnixNano ( ) / 1000 ) / 1000
2015-11-16 14:30:09 -08:00
return ClientMessage {
Arguments : [ ] interface { } {
client . ClientID . String ( ) ,
2015-11-16 14:48:24 -08:00
jsTime ,
2015-11-16 14:30:09 -08:00
} ,
2015-11-16 14:48:24 -08:00
} , nil
2015-11-16 14:30:09 -08:00
} else {
return ClientMessage {
Arguments : client . ClientID . String ( ) ,
} , nil
}
2015-10-25 00:44:25 -07:00
}
2015-11-16 15:23:27 -08:00
func C2SPing ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) ( rmsg ClientMessage , err error ) {
return ClientMessage {
2015-11-16 20:35:03 -08:00
Arguments : float64 ( time . Now ( ) . UnixNano ( ) / 1000 ) / 1000 ,
2015-11-16 16:56:27 -08:00
} , nil
2015-11-16 15:23:27 -08:00
}
func C2SSetUser ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) ( rmsg ClientMessage , err error ) {
username , err := msg . ArgumentsAsString ( )
if err != nil {
return
}
client . Mutex . Lock ( )
client . TwitchUsername = username
client . UsernameValidated = false
client . Mutex . Unlock ( )
if Configuration . SendAuthToNewClients {
client . MsgChannelKeepalive . Add ( 1 )
go client . StartAuthorization ( func ( _ * ClientInfo , _ bool ) {
client . MsgChannelKeepalive . Done ( )
} )
}
return ResponseSuccess , nil
}
2015-11-16 12:50:00 -08:00
func C2SReady ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) ( rmsg ClientMessage , err error ) {
2015-11-16 20:35:03 -08:00
// disconnectAt, err := msg.ArgumentsAsInt()
// if err != nil {
// return
// }
2015-10-26 14:55:20 -07:00
client . Mutex . Lock ( )
if client . MakePendingRequests != nil {
if ! client . MakePendingRequests . Stop ( ) {
// Timer already fired, GetSubscriptionBacklog() has started
rmsg . Command = SuccessCommand
return
}
}
client . PendingSubscriptionsBacklog = nil
client . MakePendingRequests = nil
client . Mutex . Unlock ( )
2015-11-02 22:54:53 -08:00
client . MsgChannelKeepalive . Add ( 1 )
2015-10-29 01:23:58 -07:00
go func ( ) {
client . MessageChannel <- ClientMessage { MessageID : msg . MessageID , Command : SuccessCommand }
SendBacklogForNewClient ( client )
2015-11-02 22:54:53 -08:00
client . MsgChannelKeepalive . Done ( )
2015-10-29 01:23:58 -07:00
} ( )
return ClientMessage { Command : AsyncResponseCommand } , nil
2015-10-26 14:55:20 -07:00
}
2015-11-16 12:50:00 -08:00
func C2SSubscribe ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) ( rmsg ClientMessage , err error ) {
2015-10-25 00:44:25 -07:00
channel , err := msg . ArgumentsAsString ( )
2015-10-25 20:17:17 -07:00
if err != nil {
return
}
2015-10-25 03:21:50 -07:00
client . Mutex . Lock ( )
2015-10-25 00:44:25 -07:00
AddToSliceS ( & client . CurrentChannels , channel )
2015-11-16 12:50:00 -08:00
if usePendingSubscrptionsBacklog {
client . PendingSubscriptionsBacklog = append ( client . PendingSubscriptionsBacklog , channel )
}
2015-10-25 03:21:50 -07:00
client . Mutex . Unlock ( )
2015-11-08 22:34:06 -08:00
SubscribeChannel ( client , channel )
2015-10-25 00:44:25 -07:00
return ResponseSuccess , nil
}
2015-11-16 12:50:00 -08:00
// C2SUnsubscribe implements the `unsub` C2S Command.
// It removes the channel from ClientInfo.CurrentChannels and calls UnsubscribeSingleChat.
func C2SUnsubscribe ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) ( rmsg ClientMessage , err error ) {
2015-10-25 00:44:25 -07:00
channel , err := msg . ArgumentsAsString ( )
2015-10-25 20:17:17 -07:00
if err != nil {
return
}
2015-10-25 03:21:50 -07:00
client . Mutex . Lock ( )
2015-10-25 00:44:25 -07:00
RemoveFromSliceS ( & client . CurrentChannels , channel )
2015-10-25 03:21:50 -07:00
client . Mutex . Unlock ( )
UnsubscribeSingleChat ( client , channel )
2015-10-25 00:44:25 -07:00
return ResponseSuccess , nil
}
2015-11-16 12:50:00 -08:00
// C2SSurvey implements the survey C2S Command.
// Surveys are discarded.s
func C2SSurvey ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) ( rmsg ClientMessage , err error ) {
2015-10-28 15:19:22 -07:00
// Discard
2015-10-25 00:44:25 -07:00
return ResponseSuccess , nil
}
2015-11-16 12:50:00 -08:00
type followEvent struct {
2015-11-05 23:24:35 -08:00
User string ` json:"u" `
Channel string ` json:"c" `
NowFollowing bool ` json:"f" `
Timestamp time . Time ` json:"t" `
2015-10-25 03:21:50 -07:00
}
2015-10-26 10:06:45 -07:00
2015-11-16 12:50:00 -08:00
var followEvents [ ] followEvent
2015-10-25 03:21:50 -07:00
2015-11-16 12:50:00 -08:00
// followEventsLock is the lock for followEvents.
var followEventsLock sync . Mutex
// C2STrackFollow implements the `track_follow` C2S Command.
// It adds the record to `followEvents`, which is submitted to the backend on a timer.
func C2STrackFollow ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) ( rmsg ClientMessage , err error ) {
2015-10-25 03:21:50 -07:00
channel , following , err := msg . ArgumentsAsStringAndBool ( )
if err != nil {
return
}
now := time . Now ( )
2015-11-16 12:50:00 -08:00
followEventsLock . Lock ( )
followEvents = append ( followEvents , followEvent { client . TwitchUsername , channel , following , now } )
followEventsLock . Unlock ( )
2015-10-25 00:44:25 -07:00
return ResponseSuccess , nil
}
2015-11-15 18:43:34 -08:00
// AggregateEmoteUsage is a map from emoteID to a map from chatroom name to usage count.
2015-11-16 12:50:00 -08:00
var aggregateEmoteUsage = make ( map [ int ] map [ string ] int )
2015-11-15 18:43:34 -08:00
// AggregateEmoteUsageLock is the lock for AggregateEmoteUsage.
2015-11-16 12:50:00 -08:00
var aggregateEmoteUsageLock sync . Mutex
2015-11-15 18:43:34 -08:00
// ErrNegativeEmoteUsage is emitted when the submitted emote usage is negative.
var ErrNegativeEmoteUsage = errors . New ( "Emote usage count cannot be negative" )
2015-10-25 00:58:05 -07:00
2015-11-16 12:50:00 -08:00
// C2SEmoticonUses implements the `emoticon_uses` C2S Command.
// msg.Arguments are in the JSON format of [1]map[emoteID]map[ChatroomName]float64.
func C2SEmoticonUses ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) ( rmsg ClientMessage , err error ) {
// if this panics, will be caught by callHandler
2015-10-25 00:58:05 -07:00
mapRoot := msg . Arguments . ( [ ] interface { } ) [ 0 ] . ( map [ string ] interface { } )
2015-11-16 12:50:00 -08:00
// Validate: male suire
2015-11-01 18:23:22 -08:00
for strEmote , val1 := range mapRoot {
_ , err = strconv . Atoi ( strEmote )
if err != nil {
return
}
mapInner := val1 . ( map [ string ] interface { } )
for _ , val2 := range mapInner {
2015-11-16 12:50:00 -08:00
var count = int ( val2 . ( float64 ) )
2015-11-01 18:23:22 -08:00
if count <= 0 {
2015-11-15 18:43:34 -08:00
err = ErrNegativeEmoteUsage
2015-11-01 18:23:22 -08:00
return
}
}
}
2015-11-16 12:50:00 -08:00
aggregateEmoteUsageLock . Lock ( )
defer aggregateEmoteUsageLock . Unlock ( )
2015-10-25 00:58:05 -07:00
2015-11-16 21:57:18 -08:00
var total int
2015-10-25 00:58:05 -07:00
for strEmote , val1 := range mapRoot {
2015-11-15 18:43:34 -08:00
var emoteID int
emoteID , err = strconv . Atoi ( strEmote )
2015-10-25 00:58:05 -07:00
if err != nil {
return
}
2015-11-16 12:50:00 -08:00
destMapInner , ok := aggregateEmoteUsage [ emoteID ]
2015-10-25 00:58:05 -07:00
if ! ok {
destMapInner = make ( map [ string ] int )
2015-11-16 12:50:00 -08:00
aggregateEmoteUsage [ emoteID ] = destMapInner
2015-10-25 00:58:05 -07:00
}
mapInner := val1 . ( map [ string ] interface { } )
for roomName , val2 := range mapInner {
2015-11-16 12:50:00 -08:00
var count = int ( val2 . ( float64 ) )
2015-11-01 18:23:22 -08:00
if count > 200 {
count = 200
}
2015-10-25 00:58:05 -07:00
destMapInner [ roomName ] += count
2015-11-16 21:57:18 -08:00
total += count
2015-10-25 00:58:05 -07:00
}
}
2015-11-16 21:57:18 -08:00
Statistics . EmotesReportedTotal += uint64 ( total )
2015-10-25 00:44:25 -07:00
return ResponseSuccess , nil
}
2015-12-23 21:56:56 -08:00
// is_init_func
2015-11-16 12:50:00 -08:00
func aggregateDataSender ( ) {
2015-10-28 15:19:22 -07:00
for {
2015-10-28 23:27:04 -07:00
time . Sleep ( 5 * time . Minute )
2015-12-16 13:59:51 -08:00
aggregateDataSender_do ( )
2015-10-28 15:19:22 -07:00
}
}
2015-12-16 13:59:51 -08:00
func aggregateDataSender_do ( ) {
2015-11-16 12:50:00 -08:00
followEventsLock . Lock ( )
follows := followEvents
followEvents = nil
followEventsLock . Unlock ( )
aggregateEmoteUsageLock . Lock ( )
emoteUsage := aggregateEmoteUsage
aggregateEmoteUsage = make ( map [ int ] map [ string ] int )
aggregateEmoteUsageLock . Unlock ( )
2015-10-28 15:19:22 -07:00
reportForm := url . Values { }
2015-11-15 18:43:34 -08:00
followJSON , err := json . Marshal ( follows )
2015-10-28 15:19:22 -07:00
if err != nil {
2015-11-16 13:07:02 -08:00
log . Println ( "error reporting aggregate data:" , err )
2015-10-28 15:19:22 -07:00
} else {
2015-11-15 18:43:34 -08:00
reportForm . Set ( "follows" , string ( followJSON ) )
2015-10-28 15:19:22 -07:00
}
2015-10-28 15:49:53 -07:00
strEmoteUsage := make ( map [ string ] map [ string ] int )
2015-11-15 18:43:34 -08:00
for emoteID , usageByChannel := range emoteUsage {
strEmoteID := strconv . Itoa ( emoteID )
strEmoteUsage [ strEmoteID ] = usageByChannel
2015-10-28 15:49:53 -07:00
}
2015-11-15 18:43:34 -08:00
emoteJSON , err := json . Marshal ( strEmoteUsage )
2015-10-28 15:19:22 -07:00
if err != nil {
2015-11-16 13:07:02 -08:00
log . Println ( "error reporting aggregate data:" , err )
2015-10-28 15:19:22 -07:00
} else {
2015-11-15 18:43:34 -08:00
reportForm . Set ( "emotes" , string ( emoteJSON ) )
2015-10-28 15:19:22 -07:00
}
form , err := SealRequest ( reportForm )
if err != nil {
2015-11-16 13:07:02 -08:00
log . Println ( "error reporting aggregate data:" , err )
2015-10-28 15:19:22 -07:00
return
}
err = SendAggregatedData ( form )
if err != nil {
2015-11-16 13:07:02 -08:00
log . Println ( "error reporting aggregate data:" , err )
2015-10-28 15:19:22 -07:00
return
}
// done
}
2015-11-16 12:50:00 -08:00
type bunchedRequest struct {
2015-11-01 00:26:46 -07:00
Command Command
2015-11-01 13:17:35 -08:00
Param string
2015-11-01 00:26:46 -07:00
}
2015-11-01 13:17:35 -08:00
2015-11-16 12:50:00 -08:00
type cachedBunchedResponse struct {
2015-11-01 13:17:35 -08:00
Response string
2015-11-01 00:26:46 -07:00
Timestamp time . Time
}
2015-11-16 12:50:00 -08:00
type bunchSubscriber struct {
2015-11-01 13:17:35 -08:00
Client * ClientInfo
2015-11-01 00:26:46 -07:00
MessageID int
}
2015-11-16 12:50:00 -08:00
type bunchSubscriberList struct {
2015-11-01 00:26:46 -07:00
sync . Mutex
2015-11-16 12:50:00 -08:00
Members [ ] bunchSubscriber
2015-11-01 00:26:46 -07:00
}
2015-12-02 19:08:19 -08:00
type cacheStatus byte
2015-11-15 18:43:34 -08:00
2015-11-15 15:52:37 -08:00
const (
CacheStatusNotFound = iota
CacheStatusFound
CacheStatusExpired
)
2015-11-16 12:50:00 -08:00
var pendingBunchedRequests = make ( map [ bunchedRequest ] * bunchSubscriberList )
var pendingBunchLock sync . Mutex
var bunchCache = make ( map [ bunchedRequest ] cachedBunchedResponse )
var bunchCacheLock sync . RWMutex
var bunchCacheCleanupSignal = sync . NewCond ( & bunchCacheLock )
var bunchCacheLastCleanup time . Time
func bunchedRequestFromCM ( msg * ClientMessage ) bunchedRequest {
return bunchedRequest { Command : msg . Command , Param : msg . origArguments }
}
2015-11-15 15:52:37 -08:00
2015-12-23 21:56:56 -08:00
// is_init_func
2015-11-15 15:52:37 -08:00
func bunchCacheJanitor ( ) {
go func ( ) {
for {
2015-11-15 18:43:34 -08:00
time . Sleep ( 30 * time . Minute )
2015-11-16 12:50:00 -08:00
bunchCacheCleanupSignal . Signal ( )
2015-11-15 15:52:37 -08:00
}
} ( )
2015-11-16 12:50:00 -08:00
bunchCacheLock . Lock ( )
2015-11-15 15:52:37 -08:00
for {
// Unlocks CachedBunchLock, waits for signal, re-locks
2015-11-16 12:50:00 -08:00
bunchCacheCleanupSignal . Wait ( )
2015-11-15 15:52:37 -08:00
2015-11-16 12:50:00 -08:00
if bunchCacheLastCleanup . After ( time . Now ( ) . Add ( - 1 * time . Second ) ) {
2015-11-15 15:52:37 -08:00
// skip if it's been less than 1 second
continue
}
// CachedBunchLock is held here
2015-11-15 18:43:34 -08:00
keepIfAfter := time . Now ( ) . Add ( - 5 * time . Minute )
2015-11-16 12:50:00 -08:00
for req , resp := range bunchCache {
2015-11-15 15:52:37 -08:00
if ! resp . Timestamp . After ( keepIfAfter ) {
2015-11-16 12:50:00 -08:00
delete ( bunchCache , req )
2015-11-15 15:52:37 -08:00
}
}
2015-11-16 12:50:00 -08:00
bunchCacheLastCleanup = time . Now ( )
2015-11-15 15:52:37 -08:00
// Loop and Wait(), which re-locks
}
}
2015-11-01 13:17:35 -08:00
2015-12-02 19:08:19 -08:00
var emptyCachedBunchedResponse cachedBunchedResponse
func bunchGetCacheStatus ( br bunchedRequest , client * ClientInfo ) ( cacheStatus , cachedBunchedResponse ) {
bunchCacheLock . RLock ( )
defer bunchCacheLock . RUnlock ( )
cachedResponse , ok := bunchCache [ br ]
if ok && cachedResponse . Timestamp . After ( time . Now ( ) . Add ( - 5 * time . Minute ) ) {
return CacheStatusFound , cachedResponse
} else if ok {
return CacheStatusExpired , emptyCachedBunchedResponse
}
return CacheStatusNotFound , emptyCachedBunchedResponse
}
2015-12-02 19:09:50 -08:00
func normalizeBunchedRequest ( br bunchedRequest ) bunchedRequest {
if br . Command == "get_link" {
// TODO
}
return br
}
2015-11-16 12:50:00 -08:00
// C2SHandleBunchedCommand handles C2S Commands such as `get_link`.
// It makes a request to the backend server for the data, but any other requests coming in while the first is pending also get the responses from the first one.
// Additionally, results are cached.
func C2SHandleBunchedCommand ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) ( rmsg ClientMessage , err error ) {
2015-12-02 19:08:19 -08:00
// FIXME(riking): Function is too complex
2015-11-16 12:50:00 -08:00
br := bunchedRequestFromCM ( & msg )
2015-12-02 19:09:50 -08:00
br = normalizeBunchedRequest ( br )
2015-11-01 00:26:46 -07:00
2015-12-02 19:08:19 -08:00
cacheStatus , cachedResponse := bunchGetCacheStatus ( br , client )
2015-11-15 15:52:37 -08:00
if cacheStatus == CacheStatusFound {
2015-12-02 19:08:19 -08:00
var response ClientMessage
response . Command = SuccessCommand
response . MessageID = msg . MessageID
response . origArguments = cachedResponse . Response
response . parseOrigArguments ( )
return response , nil
2015-11-15 15:52:37 -08:00
} else if cacheStatus == CacheStatusExpired {
// Wake up the lazy janitor
2015-11-16 12:50:00 -08:00
bunchCacheCleanupSignal . Signal ( )
2015-11-15 15:52:37 -08:00
}
2015-11-16 12:50:00 -08:00
pendingBunchLock . Lock ( )
defer pendingBunchLock . Unlock ( )
list , ok := pendingBunchedRequests [ br ]
2015-11-01 00:26:46 -07:00
if ok {
list . Lock ( )
AddToSliceB ( & list . Members , client , msg . MessageID )
list . Unlock ( )
return ClientMessage { Command : AsyncResponseCommand } , nil
}
2015-11-16 12:50:00 -08:00
pendingBunchedRequests [ br ] = & bunchSubscriberList { Members : [ ] bunchSubscriber { { Client : client , MessageID : msg . MessageID } } }
2015-11-01 00:26:46 -07:00
2015-11-16 12:50:00 -08:00
go func ( request bunchedRequest ) {
2015-11-15 15:52:37 -08:00
respStr , err := SendRemoteCommandCached ( string ( request . Command ) , request . Param , AuthInfo { } )
2015-11-01 00:26:46 -07:00
2015-11-02 22:59:38 -08:00
var msg ClientMessage
if err == nil {
2015-11-04 12:09:24 -08:00
msg . Command = SuccessCommand
2015-11-15 15:52:37 -08:00
msg . origArguments = respStr
2015-11-04 12:09:24 -08:00
msg . parseOrigArguments ( )
2015-11-02 22:59:38 -08:00
} else {
msg . Command = ErrorCommand
msg . Arguments = err . Error ( )
}
2015-11-15 15:52:37 -08:00
if err == nil {
2015-11-16 12:50:00 -08:00
bunchCacheLock . Lock ( )
bunchCache [ request ] = cachedBunchedResponse { Response : respStr , Timestamp : time . Now ( ) }
bunchCacheLock . Unlock ( )
2015-11-15 15:52:37 -08:00
}
2015-11-16 12:50:00 -08:00
pendingBunchLock . Lock ( )
bsl := pendingBunchedRequests [ request ]
delete ( pendingBunchedRequests , request )
pendingBunchLock . Unlock ( )
2015-11-04 12:09:24 -08:00
2015-11-02 22:59:38 -08:00
bsl . Lock ( )
for _ , member := range bsl . Members {
msg . MessageID = member . MessageID
2015-11-03 16:44:42 -08:00
select {
case member . Client . MessageChannel <- msg :
case <- member . Client . MsgChannelIsDone :
}
2015-11-02 22:59:38 -08:00
}
bsl . Unlock ( )
} ( br )
return ClientMessage { Command : AsyncResponseCommand } , nil
2015-11-01 00:26:46 -07:00
}
2015-11-16 12:50:00 -08:00
func C2SHandleRemoteCommand ( conn * websocket . Conn , client * ClientInfo , msg ClientMessage ) ( rmsg ClientMessage , err error ) {
2015-11-02 22:54:53 -08:00
client . MsgChannelKeepalive . Add ( 1 )
2015-11-08 16:44:16 -08:00
go doRemoteCommand ( conn , msg , client )
2015-10-25 00:44:25 -07:00
return ClientMessage { Command : AsyncResponseCommand } , nil
}
2015-11-08 16:44:16 -08:00
const AuthorizationFailedErrorString = "Failed to verify your Twitch username."
2015-12-16 13:59:51 -08:00
const AuthorizationNeededError = "You must be signed in to use that command."
2015-11-08 16:44:16 -08:00
func doRemoteCommand ( conn * websocket . Conn , msg ClientMessage , client * ClientInfo ) {
2015-11-08 22:01:32 -08:00
resp , err := SendRemoteCommandCached ( string ( msg . Command ) , msg . origArguments , client . AuthInfo )
2015-11-08 16:44:16 -08:00
2015-11-15 18:43:34 -08:00
if err == ErrAuthorizationNeeded {
2015-12-16 13:59:51 -08:00
if client . TwitchUsername == "" {
// Not logged in
client . MessageChannel <- ClientMessage { MessageID : msg . MessageID , Command : ErrorCommand , Arguments : AuthorizationNeededError }
client . MsgChannelKeepalive . Done ( )
return
}
2015-11-08 16:44:16 -08:00
client . StartAuthorization ( func ( _ * ClientInfo , success bool ) {
if success {
doRemoteCommand ( conn , msg , client )
} else {
client . MessageChannel <- ClientMessage { MessageID : msg . MessageID , Command : ErrorCommand , Arguments : AuthorizationFailedErrorString }
client . MsgChannelKeepalive . Done ( )
}
} )
return // without keepalive.Done()
2016-01-03 13:54:26 -08:00
} else if bfe , ok := err . ( ErrForwardedFromBackend ) ; ok {
client . MessageChannel <- ClientMessage { MessageID : msg . MessageID , Command : ErrorCommand , Arguments : bfe . JSONError }
2015-11-08 16:44:16 -08:00
} else if err != nil {
client . MessageChannel <- ClientMessage { MessageID : msg . MessageID , Command : ErrorCommand , Arguments : err . Error ( ) }
} else {
msg := ClientMessage { MessageID : msg . MessageID , Command : SuccessCommand , origArguments : resp }
msg . parseOrigArguments ( )
client . MessageChannel <- msg
}
client . MsgChannelKeepalive . Done ( )
}