2015-10-25 00:44:25 -07:00
|
|
|
package server
|
|
|
|
|
|
|
|
import (
|
2015-10-28 15:19:22 -07:00
|
|
|
"encoding/json"
|
2015-11-01 18:23:22 -08:00
|
|
|
"errors"
|
2015-10-28 15:19:22 -07:00
|
|
|
"github.com/gorilla/websocket"
|
2015-10-25 00:44:25 -07:00
|
|
|
"github.com/satori/go.uuid"
|
|
|
|
"log"
|
2015-10-28 18:12:20 -07:00
|
|
|
"net/url"
|
2015-10-25 00:58:05 -07:00
|
|
|
"strconv"
|
2015-10-26 10:06:45 -07:00
|
|
|
"sync"
|
2015-10-25 03:21:50 -07:00
|
|
|
"time"
|
2015-10-25 00:44:25 -07:00
|
|
|
)
|
|
|
|
|
2015-11-08 22:34:06 -08:00
|
|
|
// A command is how the client refers to a function on the server. It's just a string.
|
|
|
|
type Command string
|
|
|
|
|
|
|
|
// A function that is called to respond to a Command.
|
|
|
|
type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error)
|
|
|
|
|
|
|
|
var CommandHandlers = map[Command]CommandHandler{
|
|
|
|
HelloCommand: HandleHello,
|
|
|
|
"setuser": HandleSetUser,
|
|
|
|
"ready": HandleReady,
|
|
|
|
|
|
|
|
"sub": HandleSub,
|
|
|
|
"unsub": HandleUnsub,
|
|
|
|
|
|
|
|
"track_follow": HandleTrackFollow,
|
|
|
|
"emoticon_uses": HandleEmoticonUses,
|
|
|
|
"survey": HandleSurvey,
|
|
|
|
|
|
|
|
"twitch_emote": HandleRemoteCommand,
|
|
|
|
"get_link": HandleBunchedRemoteCommand,
|
|
|
|
"get_display_name": HandleBunchedRemoteCommand,
|
|
|
|
"update_follow_buttons": HandleRemoteCommand,
|
|
|
|
"chat_history": HandleRemoteCommand,
|
|
|
|
}
|
2015-10-25 00:44:25 -07:00
|
|
|
|
2015-10-25 03:21:50 -07:00
|
|
|
const ChannelInfoDelay = 2 * time.Second
|
|
|
|
|
2015-10-25 00:44:25 -07:00
|
|
|
func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) {
|
|
|
|
handler, ok := CommandHandlers[msg.Command]
|
|
|
|
if !ok {
|
2015-11-08 21:20:32 -08:00
|
|
|
handler = HandleRemoteCommand
|
2015-10-25 00:44:25 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
response, err := CallHandler(handler, conn, client, msg)
|
|
|
|
|
|
|
|
if err == nil {
|
2015-10-26 14:55:20 -07:00
|
|
|
if response.Command == AsyncResponseCommand {
|
|
|
|
// Don't send anything
|
|
|
|
// The response will be delivered over client.MessageChannel / serverMessageChan
|
|
|
|
} else {
|
|
|
|
response.MessageID = msg.MessageID
|
2015-10-28 15:19:22 -07:00
|
|
|
SendMessage(conn, response)
|
2015-10-26 14:55:20 -07:00
|
|
|
}
|
2015-10-25 00:44:25 -07:00
|
|
|
} else {
|
2015-10-28 15:19:22 -07:00
|
|
|
SendMessage(conn, ClientMessage{
|
2015-10-25 00:44:25 -07:00
|
|
|
MessageID: msg.MessageID,
|
2015-10-26 10:06:45 -07:00
|
|
|
Command: "error",
|
2015-10-25 00:44:25 -07:00
|
|
|
Arguments: err.Error(),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func HandleHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
|
|
|
version, clientId, err := msg.ArgumentsAsTwoStrings()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
client.Version = version
|
|
|
|
client.ClientID = uuid.FromStringOrNil(clientId)
|
|
|
|
if client.ClientID == uuid.Nil {
|
|
|
|
client.ClientID = uuid.NewV4()
|
|
|
|
}
|
|
|
|
|
2015-10-26 10:07:15 -07:00
|
|
|
SubscribeGlobal(client)
|
2015-11-09 14:44:33 -08:00
|
|
|
SubscribeDefaults(client)
|
2015-10-26 10:07:15 -07:00
|
|
|
|
2015-10-25 00:44:25 -07:00
|
|
|
return ClientMessage{
|
|
|
|
Arguments: client.ClientID.String(),
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2015-10-26 14:55:20 -07:00
|
|
|
func HandleReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
|
|
|
disconnectAt, err := msg.ArgumentsAsInt()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
client.Mutex.Lock()
|
|
|
|
if client.MakePendingRequests != nil {
|
|
|
|
if !client.MakePendingRequests.Stop() {
|
|
|
|
// Timer already fired, GetSubscriptionBacklog() has started
|
|
|
|
rmsg.Command = SuccessCommand
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
client.PendingSubscriptionsBacklog = nil
|
|
|
|
client.MakePendingRequests = nil
|
|
|
|
client.Mutex.Unlock()
|
|
|
|
|
2015-11-02 22:54:53 -08:00
|
|
|
client.MsgChannelKeepalive.Add(1)
|
2015-10-29 01:23:58 -07:00
|
|
|
go func() {
|
|
|
|
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand}
|
|
|
|
SendBacklogForNewClient(client)
|
|
|
|
if disconnectAt != 0 {
|
2015-10-26 14:55:20 -07:00
|
|
|
SendTimedBacklogMessages(client, time.Unix(disconnectAt, 0))
|
2015-10-29 01:23:58 -07:00
|
|
|
}
|
2015-11-02 22:54:53 -08:00
|
|
|
client.MsgChannelKeepalive.Done()
|
2015-10-29 01:23:58 -07:00
|
|
|
}()
|
|
|
|
return ClientMessage{Command: AsyncResponseCommand}, nil
|
2015-10-26 14:55:20 -07:00
|
|
|
}
|
|
|
|
|
2015-10-25 00:44:25 -07:00
|
|
|
func HandleSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
|
|
|
username, err := msg.ArgumentsAsString()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-10-25 03:21:50 -07:00
|
|
|
client.Mutex.Lock()
|
2015-10-25 00:44:25 -07:00
|
|
|
client.TwitchUsername = username
|
|
|
|
client.UsernameValidated = false
|
2015-10-25 03:21:50 -07:00
|
|
|
client.Mutex.Unlock()
|
2015-10-25 00:44:25 -07:00
|
|
|
|
2015-11-08 16:44:16 -08:00
|
|
|
if Configuation.SendAuthToNewClients {
|
|
|
|
client.MsgChannelKeepalive.Add(1)
|
|
|
|
go client.StartAuthorization(func(_ *ClientInfo, _ bool) {
|
|
|
|
client.MsgChannelKeepalive.Done()
|
|
|
|
})
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2015-10-25 00:44:25 -07:00
|
|
|
return ResponseSuccess, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
|
|
|
channel, err := msg.ArgumentsAsString()
|
|
|
|
|
2015-10-25 20:17:17 -07:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-10-25 03:21:50 -07:00
|
|
|
client.Mutex.Lock()
|
|
|
|
|
2015-10-25 00:44:25 -07:00
|
|
|
AddToSliceS(&client.CurrentChannels, channel)
|
2015-10-26 12:13:28 -07:00
|
|
|
client.PendingSubscriptionsBacklog = append(client.PendingSubscriptionsBacklog, channel)
|
2015-10-25 00:44:25 -07:00
|
|
|
|
2015-10-27 21:21:06 -07:00
|
|
|
// if client.MakePendingRequests == nil {
|
|
|
|
// client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client))
|
|
|
|
// } else {
|
|
|
|
// if !client.MakePendingRequests.Reset(ChannelInfoDelay) {
|
|
|
|
// client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client))
|
|
|
|
// }
|
|
|
|
// }
|
2015-10-25 03:21:50 -07:00
|
|
|
|
|
|
|
client.Mutex.Unlock()
|
|
|
|
|
2015-11-08 22:34:06 -08:00
|
|
|
SubscribeChannel(client, channel)
|
2015-10-25 00:44:25 -07:00
|
|
|
|
|
|
|
return ResponseSuccess, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func HandleUnsub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
|
|
|
channel, err := msg.ArgumentsAsString()
|
|
|
|
|
2015-10-25 20:17:17 -07:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-10-25 03:21:50 -07:00
|
|
|
client.Mutex.Lock()
|
2015-10-25 00:44:25 -07:00
|
|
|
RemoveFromSliceS(&client.CurrentChannels, channel)
|
2015-10-25 03:21:50 -07:00
|
|
|
client.Mutex.Unlock()
|
|
|
|
|
|
|
|
UnsubscribeSingleChat(client, channel)
|
2015-10-25 00:44:25 -07:00
|
|
|
|
|
|
|
return ResponseSuccess, nil
|
|
|
|
}
|
|
|
|
|
2015-10-25 03:21:50 -07:00
|
|
|
func GetSubscriptionBacklogFor(conn *websocket.Conn, client *ClientInfo) func() {
|
|
|
|
return func() {
|
|
|
|
GetSubscriptionBacklog(conn, client)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// On goroutine
|
|
|
|
func GetSubscriptionBacklog(conn *websocket.Conn, client *ClientInfo) {
|
2015-10-26 12:13:28 -07:00
|
|
|
var subs []string
|
2015-10-25 03:21:50 -07:00
|
|
|
|
|
|
|
// Lock, grab the data, and reset it
|
|
|
|
client.Mutex.Lock()
|
2015-10-26 12:13:28 -07:00
|
|
|
subs = client.PendingSubscriptionsBacklog
|
|
|
|
client.PendingSubscriptionsBacklog = nil
|
2015-10-25 03:21:50 -07:00
|
|
|
client.MakePendingRequests = nil
|
|
|
|
client.Mutex.Unlock()
|
|
|
|
|
2015-10-26 12:13:28 -07:00
|
|
|
if len(subs) == 0 {
|
2015-10-25 03:21:50 -07:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-10-25 20:17:17 -07:00
|
|
|
if backendUrl == "" {
|
|
|
|
return // for testing runs
|
|
|
|
}
|
2015-10-26 12:13:28 -07:00
|
|
|
messages, err := FetchBacklogData(subs)
|
2015-10-25 03:21:50 -07:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
// Oh well.
|
|
|
|
log.Print("error in GetSubscriptionBacklog:", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Deliver to client
|
2015-11-02 22:54:53 -08:00
|
|
|
client.MsgChannelKeepalive.Add(1)
|
2015-10-29 01:23:58 -07:00
|
|
|
if client.MessageChannel != nil {
|
|
|
|
for _, msg := range messages {
|
|
|
|
client.MessageChannel <- msg
|
|
|
|
}
|
2015-10-25 03:21:50 -07:00
|
|
|
}
|
2015-11-02 22:54:53 -08:00
|
|
|
client.MsgChannelKeepalive.Done()
|
2015-10-25 03:21:50 -07:00
|
|
|
}
|
|
|
|
|
2015-10-25 00:44:25 -07:00
|
|
|
func HandleSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
2015-10-28 15:19:22 -07:00
|
|
|
// Discard
|
2015-10-25 00:44:25 -07:00
|
|
|
return ResponseSuccess, nil
|
|
|
|
}
|
|
|
|
|
2015-10-25 03:21:50 -07:00
|
|
|
type FollowEvent struct {
|
2015-11-05 23:24:35 -08:00
|
|
|
User string `json:"u"`
|
|
|
|
Channel string `json:"c"`
|
|
|
|
NowFollowing bool `json:"f"`
|
|
|
|
Timestamp time.Time `json:"t"`
|
2015-10-25 03:21:50 -07:00
|
|
|
}
|
2015-10-26 10:06:45 -07:00
|
|
|
|
2015-10-25 03:21:50 -07:00
|
|
|
var FollowEvents []FollowEvent
|
|
|
|
var FollowEventsLock sync.Mutex
|
|
|
|
|
2015-10-25 00:44:25 -07:00
|
|
|
func HandleTrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
2015-10-25 03:21:50 -07:00
|
|
|
channel, following, err := msg.ArgumentsAsStringAndBool()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
now := time.Now()
|
|
|
|
|
|
|
|
FollowEventsLock.Lock()
|
|
|
|
FollowEvents = append(FollowEvents, FollowEvent{client.TwitchUsername, channel, following, now})
|
|
|
|
FollowEventsLock.Unlock()
|
2015-10-25 00:44:25 -07:00
|
|
|
|
|
|
|
return ResponseSuccess, nil
|
|
|
|
}
|
|
|
|
|
2015-10-25 00:58:05 -07:00
|
|
|
var AggregateEmoteUsage map[int]map[string]int = make(map[int]map[string]int)
|
|
|
|
var AggregateEmoteUsageLock sync.Mutex
|
2015-11-01 18:23:22 -08:00
|
|
|
var ErrorNegativeEmoteUsage = errors.New("Emote usage count cannot be negative")
|
2015-10-25 00:58:05 -07:00
|
|
|
|
2015-10-25 00:44:25 -07:00
|
|
|
func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
2015-10-25 00:58:05 -07:00
|
|
|
// arguments is [1]map[EmoteId]map[RoomName]float64
|
|
|
|
|
|
|
|
mapRoot := msg.Arguments.([]interface{})[0].(map[string]interface{})
|
|
|
|
|
2015-11-01 18:23:22 -08:00
|
|
|
for strEmote, val1 := range mapRoot {
|
|
|
|
_, err = strconv.Atoi(strEmote)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
mapInner := val1.(map[string]interface{})
|
|
|
|
for _, val2 := range mapInner {
|
|
|
|
var count int = int(val2.(float64))
|
|
|
|
if count <= 0 {
|
|
|
|
err = ErrorNegativeEmoteUsage
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-10-25 00:58:05 -07:00
|
|
|
AggregateEmoteUsageLock.Lock()
|
|
|
|
defer AggregateEmoteUsageLock.Unlock()
|
|
|
|
|
|
|
|
for strEmote, val1 := range mapRoot {
|
|
|
|
var emoteId int
|
|
|
|
emoteId, err = strconv.Atoi(strEmote)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
destMapInner, ok := AggregateEmoteUsage[emoteId]
|
|
|
|
if !ok {
|
|
|
|
destMapInner = make(map[string]int)
|
|
|
|
AggregateEmoteUsage[emoteId] = destMapInner
|
|
|
|
}
|
|
|
|
|
|
|
|
mapInner := val1.(map[string]interface{})
|
|
|
|
for roomName, val2 := range mapInner {
|
|
|
|
var count int = int(val2.(float64))
|
2015-11-01 18:23:22 -08:00
|
|
|
if count > 200 {
|
|
|
|
count = 200
|
|
|
|
}
|
2015-10-25 00:58:05 -07:00
|
|
|
destMapInner[roomName] += count
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-10-25 00:44:25 -07:00
|
|
|
return ResponseSuccess, nil
|
|
|
|
}
|
|
|
|
|
2015-10-28 15:19:22 -07:00
|
|
|
func sendAggregateData() {
|
|
|
|
for {
|
2015-10-28 23:27:04 -07:00
|
|
|
time.Sleep(5 * time.Minute)
|
2015-10-28 15:19:22 -07:00
|
|
|
DoSendAggregateData()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func DoSendAggregateData() {
|
|
|
|
FollowEventsLock.Lock()
|
|
|
|
follows := FollowEvents
|
|
|
|
FollowEvents = nil
|
|
|
|
FollowEventsLock.Unlock()
|
|
|
|
AggregateEmoteUsageLock.Lock()
|
|
|
|
emoteUsage := AggregateEmoteUsage
|
|
|
|
AggregateEmoteUsage = make(map[int]map[string]int)
|
|
|
|
AggregateEmoteUsageLock.Unlock()
|
|
|
|
|
|
|
|
reportForm := url.Values{}
|
|
|
|
|
|
|
|
followJson, err := json.Marshal(follows)
|
|
|
|
if err != nil {
|
|
|
|
log.Print(err)
|
|
|
|
} else {
|
|
|
|
reportForm.Set("follows", string(followJson))
|
|
|
|
}
|
|
|
|
|
2015-10-28 15:49:53 -07:00
|
|
|
strEmoteUsage := make(map[string]map[string]int)
|
|
|
|
for emoteId, usageByChannel := range emoteUsage {
|
|
|
|
strEmoteId := strconv.Itoa(emoteId)
|
|
|
|
strEmoteUsage[strEmoteId] = usageByChannel
|
|
|
|
}
|
|
|
|
emoteJson, err := json.Marshal(strEmoteUsage)
|
2015-10-28 15:19:22 -07:00
|
|
|
if err != nil {
|
|
|
|
log.Print(err)
|
|
|
|
} else {
|
|
|
|
reportForm.Set("emotes", string(emoteJson))
|
|
|
|
}
|
|
|
|
|
|
|
|
form, err := SealRequest(reportForm)
|
|
|
|
if err != nil {
|
|
|
|
log.Print(err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
err = SendAggregatedData(form)
|
|
|
|
if err != nil {
|
|
|
|
log.Print(err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// done
|
|
|
|
}
|
|
|
|
|
2015-11-01 00:26:46 -07:00
|
|
|
type BunchedRequest struct {
|
|
|
|
Command Command
|
2015-11-01 13:17:35 -08:00
|
|
|
Param string
|
2015-11-01 00:26:46 -07:00
|
|
|
}
|
2015-11-01 13:17:35 -08:00
|
|
|
|
2015-11-01 00:26:46 -07:00
|
|
|
func BunchedRequestFromCM(msg *ClientMessage) BunchedRequest {
|
|
|
|
return BunchedRequest{Command: msg.Command, Param: msg.origArguments}
|
|
|
|
}
|
2015-11-01 13:17:35 -08:00
|
|
|
|
2015-11-01 00:26:46 -07:00
|
|
|
type BunchedResponse struct {
|
2015-11-01 13:17:35 -08:00
|
|
|
Response string
|
2015-11-01 00:26:46 -07:00
|
|
|
Timestamp time.Time
|
|
|
|
}
|
|
|
|
type BunchSubscriber struct {
|
2015-11-01 13:17:35 -08:00
|
|
|
Client *ClientInfo
|
2015-11-01 00:26:46 -07:00
|
|
|
MessageID int
|
|
|
|
}
|
|
|
|
|
|
|
|
type BunchSubscriberList struct {
|
|
|
|
sync.Mutex
|
|
|
|
Members []BunchSubscriber
|
|
|
|
}
|
|
|
|
|
2015-11-15 15:52:37 -08:00
|
|
|
type CacheStatus byte
|
|
|
|
const (
|
|
|
|
CacheStatusNotFound = iota
|
|
|
|
CacheStatusFound
|
|
|
|
CacheStatusExpired
|
|
|
|
)
|
|
|
|
|
2015-11-01 13:17:35 -08:00
|
|
|
var PendingBunchedRequests map[BunchedRequest]*BunchSubscriberList = make(map[BunchedRequest]*BunchSubscriberList)
|
2015-11-02 22:59:38 -08:00
|
|
|
var PendingBunchLock sync.Mutex
|
2015-11-15 15:52:37 -08:00
|
|
|
var CachedBunchedRequests map[BunchedRequest]BunchedResponse = make(map[BunchedRequest]BunchedResponse)
|
|
|
|
var CachedBunchLock sync.RWMutex
|
|
|
|
var BunchCacheCleanupSignal *sync.Cond = sync.NewCond(&CachedBunchLock)
|
|
|
|
var BunchCacheLastCleanup time.Time
|
|
|
|
|
|
|
|
func bunchCacheJanitor() {
|
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
time.Sleep(30*time.Minute)
|
|
|
|
BunchCacheCleanupSignal.Signal()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
CachedBunchLock.Lock()
|
|
|
|
for {
|
|
|
|
// Unlocks CachedBunchLock, waits for signal, re-locks
|
|
|
|
BunchCacheCleanupSignal.Wait()
|
|
|
|
|
|
|
|
if BunchCacheLastCleanup.After(time.Now().Add(-1*time.Second)) {
|
|
|
|
// skip if it's been less than 1 second
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// CachedBunchLock is held here
|
|
|
|
keepIfAfter := time.Now().Add(-5*time.Minute)
|
|
|
|
for req, resp := range CachedBunchedRequests {
|
|
|
|
if !resp.Timestamp.After(keepIfAfter) {
|
|
|
|
delete(CachedBunchedRequests, req)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
BunchCacheLastCleanup = time.Now()
|
|
|
|
// Loop and Wait(), which re-locks
|
|
|
|
}
|
|
|
|
}
|
2015-11-01 13:17:35 -08:00
|
|
|
|
2015-11-02 22:59:38 -08:00
|
|
|
func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
2015-11-01 00:26:46 -07:00
|
|
|
br := BunchedRequestFromCM(&msg)
|
|
|
|
|
2015-11-15 15:52:37 -08:00
|
|
|
cacheStatus := func() byte {
|
|
|
|
CachedBunchLock.RLock()
|
|
|
|
defer CachedBunchLock.RUnlock()
|
|
|
|
bresp, ok := CachedBunchedRequests[br]
|
|
|
|
if ok && bresp.Timestamp.After(time.Now().Add(-5*time.Minute)) {
|
|
|
|
client.MsgChannelKeepalive.Add(1)
|
|
|
|
go func() {
|
|
|
|
var rmsg ClientMessage
|
|
|
|
rmsg.Command = SuccessCommand
|
|
|
|
rmsg.MessageID = msg.MessageID
|
|
|
|
rmsg.origArguments = bresp.Response
|
|
|
|
rmsg.parseOrigArguments()
|
|
|
|
client.MessageChannel <- rmsg
|
|
|
|
client.MsgChannelKeepalive.Done()
|
|
|
|
}()
|
|
|
|
return CacheStatusFound
|
|
|
|
} else if ok {
|
|
|
|
return CacheStatusExpired
|
|
|
|
}
|
|
|
|
return CacheStatusNotFound
|
|
|
|
}()
|
|
|
|
|
|
|
|
if cacheStatus == CacheStatusFound {
|
|
|
|
return ClientMessage{Command: AsyncResponseCommand}, nil
|
|
|
|
} else if cacheStatus == CacheStatusExpired {
|
|
|
|
// Wake up the lazy janitor
|
|
|
|
BunchCacheCleanupSignal.Signal()
|
|
|
|
}
|
|
|
|
|
2015-11-02 22:59:38 -08:00
|
|
|
PendingBunchLock.Lock()
|
2015-11-04 12:09:24 -08:00
|
|
|
defer PendingBunchLock.Unlock()
|
2015-11-01 00:26:46 -07:00
|
|
|
list, ok := PendingBunchedRequests[br]
|
|
|
|
if ok {
|
|
|
|
list.Lock()
|
|
|
|
AddToSliceB(&list.Members, client, msg.MessageID)
|
|
|
|
list.Unlock()
|
|
|
|
|
|
|
|
return ClientMessage{Command: AsyncResponseCommand}, nil
|
|
|
|
}
|
|
|
|
|
2015-11-02 22:59:38 -08:00
|
|
|
PendingBunchedRequests[br] = &BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}}
|
2015-11-01 00:26:46 -07:00
|
|
|
|
2015-11-02 22:59:38 -08:00
|
|
|
go func(request BunchedRequest) {
|
2015-11-15 15:52:37 -08:00
|
|
|
respStr, err := SendRemoteCommandCached(string(request.Command), request.Param, AuthInfo{})
|
2015-11-01 00:26:46 -07:00
|
|
|
|
2015-11-02 22:59:38 -08:00
|
|
|
var msg ClientMessage
|
|
|
|
if err == nil {
|
2015-11-04 12:09:24 -08:00
|
|
|
msg.Command = SuccessCommand
|
2015-11-15 15:52:37 -08:00
|
|
|
msg.origArguments = respStr
|
2015-11-04 12:09:24 -08:00
|
|
|
msg.parseOrigArguments()
|
2015-11-02 22:59:38 -08:00
|
|
|
} else {
|
|
|
|
msg.Command = ErrorCommand
|
|
|
|
msg.Arguments = err.Error()
|
|
|
|
}
|
|
|
|
|
2015-11-15 15:52:37 -08:00
|
|
|
if err == nil {
|
|
|
|
CachedBunchLock.Lock()
|
|
|
|
CachedBunchedRequests[request] = BunchedResponse{Response: respStr, Timestamp: time.Now()}
|
|
|
|
CachedBunchLock.Unlock()
|
|
|
|
}
|
|
|
|
|
2015-11-04 12:09:24 -08:00
|
|
|
PendingBunchLock.Lock()
|
2015-11-02 22:59:38 -08:00
|
|
|
bsl := PendingBunchedRequests[request]
|
2015-11-04 12:09:24 -08:00
|
|
|
delete(PendingBunchedRequests, request)
|
|
|
|
PendingBunchLock.Unlock()
|
|
|
|
|
2015-11-02 22:59:38 -08:00
|
|
|
bsl.Lock()
|
|
|
|
for _, member := range bsl.Members {
|
|
|
|
msg.MessageID = member.MessageID
|
2015-11-03 16:44:42 -08:00
|
|
|
select {
|
|
|
|
case member.Client.MessageChannel <- msg:
|
|
|
|
case <-member.Client.MsgChannelIsDone:
|
|
|
|
}
|
2015-11-02 22:59:38 -08:00
|
|
|
}
|
|
|
|
bsl.Unlock()
|
|
|
|
}(br)
|
|
|
|
|
|
|
|
return ClientMessage{Command: AsyncResponseCommand}, nil
|
2015-11-01 00:26:46 -07:00
|
|
|
}
|
|
|
|
|
2015-10-25 00:44:25 -07:00
|
|
|
func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
2015-11-02 22:54:53 -08:00
|
|
|
client.MsgChannelKeepalive.Add(1)
|
2015-11-08 16:44:16 -08:00
|
|
|
go doRemoteCommand(conn, msg, client)
|
2015-10-25 00:44:25 -07:00
|
|
|
|
|
|
|
return ClientMessage{Command: AsyncResponseCommand}, nil
|
|
|
|
}
|
2015-11-08 16:44:16 -08:00
|
|
|
|
|
|
|
const AuthorizationFailedErrorString = "Failed to verify your Twitch username."
|
|
|
|
|
|
|
|
func doRemoteCommand(conn *websocket.Conn, msg ClientMessage, client *ClientInfo) {
|
2015-11-08 22:01:32 -08:00
|
|
|
resp, err := SendRemoteCommandCached(string(msg.Command), msg.origArguments, client.AuthInfo)
|
2015-11-08 16:44:16 -08:00
|
|
|
|
|
|
|
if err == AuthorizationNeededError {
|
|
|
|
client.StartAuthorization(func(_ *ClientInfo, success bool) {
|
|
|
|
if success {
|
|
|
|
doRemoteCommand(conn, msg, client)
|
|
|
|
} else {
|
|
|
|
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: AuthorizationFailedErrorString}
|
|
|
|
client.MsgChannelKeepalive.Done()
|
|
|
|
}
|
|
|
|
})
|
|
|
|
return // without keepalive.Done()
|
|
|
|
} else if err != nil {
|
|
|
|
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()}
|
|
|
|
} else {
|
|
|
|
msg := ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp}
|
|
|
|
msg.parseOrigArguments()
|
|
|
|
client.MessageChannel <- msg
|
|
|
|
}
|
|
|
|
client.MsgChannelKeepalive.Done()
|
|
|
|
}
|