diff --git a/socketserver/server/commands.go b/socketserver/server/commands.go index ffe41293..a112e6dc 100644 --- a/socketserver/server/commands.go +++ b/socketserver/server/commands.go @@ -174,15 +174,7 @@ func C2SReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg // } client.Mutex.Lock() - if client.MakePendingRequests != nil { - if !client.MakePendingRequests.Stop() { - // Timer already fired, GetSubscriptionBacklog() has started - rmsg.Command = SuccessCommand - return - } - } - client.PendingSubscriptionsBacklog = nil - client.MakePendingRequests = nil + client.ReadyComplete = true client.Mutex.Unlock() client.MsgChannelKeepalive.Add(1) @@ -204,13 +196,18 @@ func C2SSubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) ( client.Mutex.Lock() AddToSliceS(&client.CurrentChannels, channel) - if usePendingSubscrptionsBacklog { - client.PendingSubscriptionsBacklog = append(client.PendingSubscriptionsBacklog, channel) - } client.Mutex.Unlock() SubscribeChannel(client, channel) + if client.ReadyComplete { + client.MsgChannelKeepalive.Add(1) + go func() { + SendBacklogForChannel(client, channel) + client.MsgChannelKeepalive.Done() + }() + } + return ResponseSuccess, nil } diff --git a/socketserver/server/irc.go b/socketserver/server/irc.go index 6ec1f7f9..c0af288c 100644 --- a/socketserver/server/irc.go +++ b/socketserver/server/irc.go @@ -56,7 +56,7 @@ func authorizationJanitor_do() { if !cullTime.After(v.EnteredAt) { newPendingAuths = append(newPendingAuths, v) } else { - v.Callback(v.Client, false) + go v.Callback(v.Client, false) } } @@ -64,12 +64,13 @@ func authorizationJanitor_do() { } func (client *ClientInfo) StartAuthorization(callback AuthCallback) { + if callback == nil { + return // callback must not be nil + } var nonce [32]byte _, err := rand.Read(nonce[:]) if err != nil { - go func(client *ClientInfo, callback AuthCallback) { - callback(client, false) - }(client, callback) + go callback(client, false) return } buf := bytes.NewBuffer(nil) @@ -153,11 +154,9 @@ func submitAuth(user, challenge string) { } auth.Client.Mutex.Unlock() - if auth.Callback != nil { - if !usernameChanged { - auth.Callback(auth.Client, true) - } else { - auth.Callback(auth.Client, false) - } + if !usernameChanged { + auth.Callback(auth.Client, true) + } else { + auth.Callback(auth.Client, false) } } diff --git a/socketserver/server/publisher.go b/socketserver/server/publisher.go index 45bfd9f3..f0531329 100644 --- a/socketserver/server/publisher.go +++ b/socketserver/server/publisher.go @@ -29,6 +29,9 @@ var S2CCommandsCacheInfo = map[Command]PushCommandCacheInfo{ "viewers": {CacheTypeLastOnly, MsgTargetTypeChat}, } +var PersistentCachingCommands = []Command{"follow_sets", "follow_buttons"} +var HourlyCachingCommands = []Command{"srl_race", "chatters", "viewers"} + type BacklogCacheType int const ( @@ -101,8 +104,8 @@ func SendBacklogForNewClient(client *ClientInfo) { client.Mutex.Unlock() PersistentLSMLock.RLock() - for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypePersistent, MsgTargetTypeChat}) { - chanMap := CachedLastMessages[cmd] + for _, cmd := range GetCommandsOfType(CacheTypePersistent) { + chanMap := PersistentLastMessages[cmd] if chanMap == nil { continue } @@ -118,7 +121,7 @@ func SendBacklogForNewClient(client *ClientInfo) { PersistentLSMLock.RUnlock() CachedLSMLock.RLock() - for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}) { + for _, cmd := range GetCommandsOfType(CacheTypeLastOnly) { chanMap := CachedLastMessages[cmd] if chanMap == nil { continue @@ -135,6 +138,36 @@ func SendBacklogForNewClient(client *ClientInfo) { CachedLSMLock.RUnlock() } +func SendBacklogForChannel(client *ClientInfo, channel string) { + PersistentLSMLock.RLock() + for _, cmd := range GetCommandsOfType(CacheTypePersistent) { + chanMap := PersistentLastMessages[cmd] + if chanMap == nil { + continue + } + if msg, ok := chanMap[channel]; ok { + msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data} + msg.parseOrigArguments() + client.MessageChannel <- msg + } + } + PersistentLSMLock.RUnlock() + + CachedLSMLock.RLock() + for _, cmd := range GetCommandsOfType(CacheTypeLastOnly) { + chanMap := CachedLastMessages[cmd] + if chanMap == nil { + continue + } + if msg, ok := chanMap[channel]; ok { + msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data} + msg.parseOrigArguments() + client.MessageChannel <- msg + } + } + CachedLSMLock.RUnlock() +} + type timestampArray interface { Len() int GetTime(int) time.Time @@ -144,13 +177,13 @@ func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync. locker.Lock() defer locker.Unlock() - chanMap, ok := CachedLastMessages[cmd] + chanMap, ok := which[cmd] if !ok { if deleting { return } chanMap = make(map[string]LastSavedMessage) - CachedLastMessages[cmd] = chanMap + which[cmd] = chanMap } if deleting { @@ -160,14 +193,14 @@ func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync. } } -func GetCommandsOfType(match PushCommandCacheInfo) []Command { - var ret []Command - for cmd, info := range S2CCommandsCacheInfo { - if info == match { - ret = append(ret, cmd) - } +func GetCommandsOfType(match BacklogCacheType) []Command { + if match == CacheTypePersistent { + return PersistentCachingCommands + } else if match == CacheTypeLastOnly { + return HourlyCachingCommands + } else { + panic("unknown caching type") } - return ret } func HTTPBackendDropBacklog(w http.ResponseWriter, r *http.Request) { diff --git a/socketserver/server/subscriptions.go b/socketserver/server/subscriptions.go index 890e2e19..0ea46b50 100644 --- a/socketserver/server/subscriptions.go +++ b/socketserver/server/subscriptions.go @@ -111,11 +111,6 @@ func UnsubscribeAll(client *ClientInfo) { return // no need to remove from a high-contention list when the server is closing } - client.Mutex.Lock() - client.PendingSubscriptionsBacklog = nil - client.PendingSubscriptionsBacklog = nil - client.Mutex.Unlock() - GlobalSubscriptionLock.Lock() RemoveFromSliceCl(&GlobalSubscriptionInfo, client) GlobalSubscriptionLock.Unlock() diff --git a/socketserver/server/types.go b/socketserver/server/types.go index 44afc158..3d360e26 100644 --- a/socketserver/server/types.go +++ b/socketserver/server/types.go @@ -104,14 +104,8 @@ type ClientInfo struct { // Protected by Mutex. CurrentChannels []string - // List of channels that we have not yet checked current chat-related channel info for. - // This lets us batch the backlog requests. - // Protected by Mutex. - PendingSubscriptionsBacklog []string - - // A timer that, when fired, will make the pending backlog requests. - // Usually nil. Protected by Mutex. - MakePendingRequests *time.Timer + // True if the client has already sent the 'ready' command + ReadyComplete bool // Server-initiated messages should be sent here // This field will be nil before it is closed. @@ -157,8 +151,6 @@ func (cv *ClientVersion) Equal(cv2 *ClientVersion) bool { return cv.Major == cv2.Major && cv.Minor == cv2.Minor && cv.Revision == cv2.Revision } -const usePendingSubscrptionsBacklog = false - func (bct BacklogCacheType) Name() string { switch bct { case CacheTypeInvalid: