From 85d261afb3e319af1464d04fb2eb8858fdae07d2 Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 26 Oct 2015 12:13:28 -0700 Subject: [PATCH] client: Move room/channel distinction into name (room.trihex) Also, have the client send a "ready" message when it has sent all its initial requests. --- socketserver/internal/server/backend.go | 9 +-- socketserver/internal/server/backlog.go | 12 ++- socketserver/internal/server/commands.go | 57 ++----------- socketserver/internal/server/handlecore.go | 2 - socketserver/internal/server/publisher.go | 81 +++---------------- .../internal/server/publisher_test.go | 4 +- socketserver/internal/server/types.go | 14 +--- src/ember/channel.js | 8 +- src/ember/room.js | 4 +- src/socket.js | 33 ++++++-- 10 files changed, 62 insertions(+), 162 deletions(-) diff --git a/socketserver/internal/server/backend.go b/socketserver/internal/server/backend.go index c1073370..80eb3f64 100644 --- a/socketserver/internal/server/backend.go +++ b/socketserver/internal/server/backend.go @@ -87,7 +87,7 @@ func HBackendPublishRequest(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Error: cmd cannot be blank") return } - if channel == "" && (target == MsgTargetTypeChat || target == MsgTargetTypeMultichat || target == MsgTargetTypeWatching) { + if channel == "" && (target == MsgTargetTypeChat || target == MsgTargetTypeMultichat) { w.WriteHeader(422) fmt.Fprintf(w, "Error: channel must be specified") return @@ -104,8 +104,6 @@ func HBackendPublishRequest(w http.ResponseWriter, r *http.Request) { count = PublishToChat(channel, cm) case MsgTargetTypeMultichat: // TODO - case MsgTargetTypeWatching: - count = PublishToWatchers(channel, cm) case MsgTargetTypeGlobal: count = PublishToAll(cm) case MsgTargetTypeInvalid: @@ -169,10 +167,9 @@ func RequestRemoteData(remoteCommand, data string, auth AuthInfo) (responseStr s return } -func FetchBacklogData(chatSubs, channelSubs []string) ([]ClientMessage, error) { +func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) { formData := url.Values{ - "chatSubs": chatSubs, - "channelSubs": channelSubs, + "subs": chatSubs, } sealedForm, err := SealRequest(formData) diff --git a/socketserver/internal/server/backlog.go b/socketserver/internal/server/backlog.go index 057ff9b2..261bf2a6 100644 --- a/socketserver/internal/server/backlog.go +++ b/socketserver/internal/server/backlog.go @@ -22,7 +22,7 @@ var ServerInitiatedCommands = map[string]PushCommandCacheInfo{ /// Emote updates "reload_badges": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global "set_badge": {CacheTypeTimestamps, MsgTargetTypeMultichat}, // timecache:multichat - "reload_set": {CacheTypeTimestamps, MsgTargetTypeMultichat}, // timecache:multichat + "reload_set": {}, // timecache:multichat "load_set": {}, // TODO what are the semantics of this? /// User auth @@ -32,12 +32,12 @@ var ServerInitiatedCommands = map[string]PushCommandCacheInfo{ // follow_sets: extra emote sets included in the chat // follow_buttons: extra follow buttons below the stream "follow_sets": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:chat - "follow_buttons": {CacheTypePersistent, MsgTargetTypeWatching}, // mustcache:watching - "srl_race": {CacheTypeLastOnly, MsgTargetTypeWatching}, // cachelast:watching + "follow_buttons": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:watching + "srl_race": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching /// Chatter/viewer counts - "chatters": {CacheTypeLastOnly, MsgTargetTypeWatching}, // cachelast:watching - "viewers": {CacheTypeLastOnly, MsgTargetTypeWatching}, // cachelast:watching + "chatters": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching + "viewers": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching } type BacklogCacheType int @@ -69,8 +69,6 @@ const ( MsgTargetTypeChat // This message is targeted to all users in multiple chats MsgTargetTypeMultichat - // This message is targeted to all users watching a stream - MsgTargetTypeWatching // This message is sent to all FFZ users. MsgTargetTypeGlobal ) diff --git a/socketserver/internal/server/commands.go b/socketserver/internal/server/commands.go index abbf01ff..5fa82577 100644 --- a/socketserver/internal/server/commands.go +++ b/socketserver/internal/server/commands.go @@ -85,7 +85,7 @@ func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rms client.Mutex.Lock() AddToSliceS(&client.CurrentChannels, channel) - client.PendingChatBacklogs = append(client.PendingChatBacklogs, channel) + client.PendingSubscriptionsBacklog = append(client.PendingSubscriptionsBacklog, channel) if client.MakePendingRequests == nil { client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client)) @@ -118,49 +118,6 @@ func HandleUnsub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (r return ResponseSuccess, nil } -func HandleSubChannel(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { - channel, err := msg.ArgumentsAsString() - - if err != nil { - return - } - - client.Mutex.Lock() - - AddToSliceS(&client.WatchingChannels, channel) - client.PendingStreamBacklogs = append(client.PendingStreamBacklogs, channel) - - if client.MakePendingRequests == nil { - client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client)) - } else { - if !client.MakePendingRequests.Reset(ChannelInfoDelay) { - client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client)) - } - } - - client.Mutex.Unlock() - - SubscribeWatching(client, channel) - - return ResponseSuccess, nil -} - -func HandleUnsubChannel(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { - channel, err := msg.ArgumentsAsString() - - if err != nil { - return - } - - client.Mutex.Lock() - RemoveFromSliceS(&client.WatchingChannels, channel) - client.Mutex.Unlock() - - UnsubscribeSingleChannel(client, channel) - - return ResponseSuccess, nil -} - func GetSubscriptionBacklogFor(conn *websocket.Conn, client *ClientInfo) func() { return func() { GetSubscriptionBacklog(conn, client) @@ -169,25 +126,23 @@ func GetSubscriptionBacklogFor(conn *websocket.Conn, client *ClientInfo) func() // On goroutine func GetSubscriptionBacklog(conn *websocket.Conn, client *ClientInfo) { - var chatSubs, channelSubs []string + var subs []string // Lock, grab the data, and reset it client.Mutex.Lock() - chatSubs = client.PendingChatBacklogs - channelSubs = client.PendingStreamBacklogs - client.PendingChatBacklogs = nil - client.PendingStreamBacklogs = nil + subs = client.PendingSubscriptionsBacklog + client.PendingSubscriptionsBacklog = nil client.MakePendingRequests = nil client.Mutex.Unlock() - if len(chatSubs) == 0 && len(channelSubs) == 0 { + if len(subs) == 0 { return } if backendUrl == "" { return // for testing runs } - messages, err := FetchBacklogData(chatSubs, channelSubs) + messages, err := FetchBacklogData(subs) if err != nil { // Oh well. diff --git a/socketserver/internal/server/handlecore.go b/socketserver/internal/server/handlecore.go index 1842a0ee..3e1af8b3 100644 --- a/socketserver/internal/server/handlecore.go +++ b/socketserver/internal/server/handlecore.go @@ -42,8 +42,6 @@ var CommandHandlers = map[Command]CommandHandler{ "sub": HandleSub, "unsub": HandleUnsub, - "sub_channel": HandleSubChannel, - "unsub_channel": HandleUnsubChannel, "track_follow": HandleTrackFollow, "emoticon_uses": HandleEmoticonUses, diff --git a/socketserver/internal/server/publisher.go b/socketserver/internal/server/publisher.go index 65aaf96a..afb2d944 100644 --- a/socketserver/internal/server/publisher.go +++ b/socketserver/internal/server/publisher.go @@ -15,8 +15,6 @@ type SubscriberList struct { var ChatSubscriptionInfo map[string]*SubscriberList = make(map[string]*SubscriberList) var ChatSubscriptionLock sync.RWMutex -var WatchingSubscriptionInfo map[string]*SubscriberList = make(map[string]*SubscriberList) -var WatchingSubscriptionLock sync.RWMutex var GlobalSubscriptionInfo SubscriberList func PublishToChat(channel string, msg ClientMessage) (count int) { @@ -34,21 +32,6 @@ func PublishToChat(channel string, msg ClientMessage) (count int) { return } -func PublishToWatchers(channel string, msg ClientMessage) (count int) { - WatchingSubscriptionLock.RLock() - list := WatchingSubscriptionInfo[channel] - if list != nil { - list.RLock() - for _, msgChan := range list.Members { - msgChan <- msg - count++ - } - list.RUnlock() - } - WatchingSubscriptionLock.RUnlock() - return -} - func PublishToAll(msg ClientMessage) (count int) { GlobalSubscriptionInfo.RLock() for _, msgChan := range GlobalSubscriptionInfo.Members { @@ -64,17 +47,17 @@ func PublishToAll(msg ClientMessage) (count int) { // - ALREADY HOLDING a read-lock to the 'which' top-level map via the rlocker object // - possible write lock to the 'which' top-level map via the wlocker object // - write lock to SubscriptionInfo (if not creating new) -func _subscribeWhileRlocked(which map[string]*SubscriberList, channelName string, value chan<- ClientMessage, rlocker sync.Locker, wlocker sync.Locker) { - list := which[channelName] +func _subscribeWhileRlocked(channelName string, value chan<- ClientMessage) { + list := ChatSubscriptionInfo[channelName] if list == nil { // Not found, so create it - rlocker.Unlock() - wlocker.Lock() + ChatSubscriptionLock.RUnlock() + ChatSubscriptionLock.Lock() list = &SubscriberList{} list.Members = []chan<- ClientMessage{value} // Create it populated, to avoid reaper - which[channelName] = list - wlocker.Unlock() - rlocker.Lock() + ChatSubscriptionInfo[channelName] = list + ChatSubscriptionLock.Unlock() + ChatSubscriptionLock.RLock() } else { list.Lock() AddToSliceC(&list.Members, value) @@ -90,16 +73,10 @@ func SubscribeGlobal(client *ClientInfo) { func SubscribeChat(client *ClientInfo, channelName string) { ChatSubscriptionLock.RLock() - _subscribeWhileRlocked(ChatSubscriptionInfo, channelName, client.MessageChannel, ChatSubscriptionLock.RLocker(), &ChatSubscriptionLock) + _subscribeWhileRlocked(channelName, client.MessageChannel) ChatSubscriptionLock.RUnlock() } -func SubscribeWatching(client *ClientInfo, channelName string) { - WatchingSubscriptionLock.RLock() - _subscribeWhileRlocked(WatchingSubscriptionInfo, channelName, client.MessageChannel, WatchingSubscriptionLock.RLocker(), &WatchingSubscriptionLock) - WatchingSubscriptionLock.RUnlock() -} - func unsubscribeAllClients() { GlobalSubscriptionInfo.Lock() GlobalSubscriptionInfo.Members = nil @@ -107,9 +84,6 @@ func unsubscribeAllClients() { ChatSubscriptionLock.Lock() ChatSubscriptionInfo = make(map[string]*SubscriberList) ChatSubscriptionLock.Unlock() - WatchingSubscriptionLock.Lock() - WatchingSubscriptionInfo = make(map[string]*SubscriberList) - WatchingSubscriptionLock.Unlock() } // Unsubscribe the client from all channels, AND clear the CurrentChannels / WatchingChannels fields. @@ -119,8 +93,8 @@ func unsubscribeAllClients() { // - write lock to ClientInfo func UnsubscribeAll(client *ClientInfo) { client.Mutex.Lock() - client.PendingChatBacklogs = nil - client.PendingStreamBacklogs = nil + client.PendingSubscriptionsBacklog = nil + client.PendingSubscriptionsBacklog = nil client.Mutex.Unlock() GlobalSubscriptionInfo.Lock() @@ -140,20 +114,6 @@ func UnsubscribeAll(client *ClientInfo) { client.CurrentChannels = nil client.Mutex.Unlock() ChatSubscriptionLock.RUnlock() - - WatchingSubscriptionLock.RLock() - client.Mutex.Lock() - for _, v := range client.WatchingChannels { - list := WatchingSubscriptionInfo[v] - if list != nil { - list.Lock() - RemoveFromSliceC(&list.Members, client.MessageChannel) - list.Unlock() - } - } - client.WatchingChannels = nil - client.Mutex.Unlock() - WatchingSubscriptionLock.RUnlock() } func UnsubscribeSingleChat(client *ClientInfo, channelName string) { @@ -165,23 +125,13 @@ func UnsubscribeSingleChat(client *ClientInfo, channelName string) { ChatSubscriptionLock.RUnlock() } -func UnsubscribeSingleChannel(client *ClientInfo, channelName string) { - WatchingSubscriptionLock.RLock() - list := WatchingSubscriptionInfo[channelName] - list.Lock() - RemoveFromSliceC(&list.Members, client.MessageChannel) - list.Unlock() - WatchingSubscriptionLock.RUnlock() -} - const ReapingDelay = 120 * time.Minute -// Checks each of ChatSubscriptionInfo / WatchingSubscriptionInfo -// for entries with no subscribers every ReapingDelay. +// Checks ChatSubscriptionInfo for entries with no subscribers every ReapingDelay. // Started from SetupServer(). func deadChannelReaper() { for { - time.Sleep(ReapingDelay / 2) + time.Sleep(ReapingDelay) ChatSubscriptionLock.Lock() for key, val := range ChatSubscriptionInfo { if len(val.Members) == 0 { @@ -189,12 +139,5 @@ func deadChannelReaper() { } } ChatSubscriptionLock.Unlock() - time.Sleep(ReapingDelay / 2) - WatchingSubscriptionLock.Lock() - for key, val := range WatchingSubscriptionInfo { - if len(val.Members) == 0 { - WatchingSubscriptionInfo[key] = nil - } - } } } diff --git a/socketserver/internal/server/publisher_test.go b/socketserver/internal/server/publisher_test.go index 2f84f4ac..ebc42ec6 100644 --- a/socketserver/internal/server/publisher_test.go +++ b/socketserver/internal/server/publisher_test.go @@ -98,7 +98,7 @@ func TestSubscriptionAndPublish(t *testing.T) { var doneWg sync.WaitGroup var readyWg sync.WaitGroup - const TestChannelName = "testchannel" + const TestChannelName = "room.testchannel" const TestCommand = "testdata" const TestData = "123456789" @@ -170,7 +170,7 @@ func BenchmarkUserSubscriptionSinglePublish(b *testing.B) { var doneWg sync.WaitGroup var readyWg sync.WaitGroup - const TestChannelName = "testchannel" + const TestChannelName = "room.testchannel" const TestCommand = "testdata" const TestData = "123456789" diff --git a/socketserver/internal/server/types.go b/socketserver/internal/server/types.go index 32cc940d..f99e059d 100644 --- a/socketserver/internal/server/types.go +++ b/socketserver/internal/server/types.go @@ -62,19 +62,10 @@ type ClientInfo struct { // Protected by Mutex. CurrentChannels []string - // This list of channels this client needs UI updates for. - // Protected by Mutex. - WatchingChannels []string - // List of channels that we have not yet checked current chat-related channel info for. // This lets us batch the backlog requests. // Protected by Mutex. - PendingChatBacklogs []string - - // List of channels that we have not yet checked current stream-related channel info for. - // This lets us batch the backlog requests. - // Protected by Mutex. - PendingStreamBacklogs []string + PendingSubscriptionsBacklog []string // A timer that, when fired, will make the pending backlog requests. // Usually nil. Protected by Mutex. @@ -151,8 +142,6 @@ func (mtt MessageTargetType) Name() string { return "chat" case MsgTargetTypeMultichat: return "multichat" - case MsgTargetTypeWatching: - return "channel" case MsgTargetTypeGlobal: return "global" } @@ -163,7 +152,6 @@ var TargetTypesByName = map[string]MessageTargetType{ "single": MsgTargetTypeSingle, "chat": MsgTargetTypeChat, "multichat": MsgTargetTypeMultichat, - "channel": MsgTargetTypeWatching, "global": MsgTargetTypeGlobal, } diff --git a/src/ember/channel.js b/src/ember/channel.js index ea154da4..d10eee99 100644 --- a/src/ember/channel.js +++ b/src/ember/channel.js @@ -153,10 +153,10 @@ FFZ.prototype.setup_channel = function() { if ( id !== f.__old_host_target ) { if ( f.__old_host_target ) - f.ws_send("unsub_channel", f.__old_host_target); + f.ws_send("unsub", "channel." + f.__old_host_target); if ( id ) { - f.ws_send("sub_channel", id); + f.ws_send("sub", "channel." + id); f.__old_host_target = id; } else delete f.__old_host_target; @@ -208,7 +208,7 @@ FFZ.prototype._modify_cindex = function(view) { el = this.get('element'); f._cindex = this; - f.ws_send("sub_channel", id); + f.ws_send("sub", "channel." + id); el.setAttribute('data-channel', id); el.classList.add('ffz-channel'); @@ -621,7 +621,7 @@ FFZ.prototype._modify_cindex = function(view) { ffzTeardown: function() { var id = this.get('controller.id'); if ( id ) - f.ws_send("unsub_channel", id); + f.ws_send("unsub", "channel." + id); this.get('element').setAttribute('data-channel', ''); f._cindex = undefined; diff --git a/src/ember/room.js b/src/ember/room.js index a4b04135..a0ff9e8c 100644 --- a/src/ember/room.js +++ b/src/ember/room.js @@ -591,7 +591,7 @@ FFZ.prototype.add_room = function(id, room) { } // Let the server know where we are. - this.ws_send("sub", id); + this.ws_send("sub", "room." + id); // See if we need history? if ( ! this.has_bttv && this.settings.chat_history && room && (room.get('messages.length') || 0) < 10 ) { @@ -619,7 +619,7 @@ FFZ.prototype.remove_room = function(id) { utils.update_css(this._room_style, id, null); // Let the server know we're gone and delete our data for this room. - this.ws_send("unsub", id); + this.ws_send("unsub", "room." + id); delete this.rooms[id]; // Clean up sets we aren't using any longer. diff --git a/src/socket.js b/src/socket.js index 3e3d2092..08c1f59f 100644 --- a/src/socket.js +++ b/src/socket.js @@ -64,8 +64,8 @@ FFZ.prototype.ws_create = function() { if ( f.is_dashboard ) { var match = location.pathname.match(/\/([^\/]+)/); if ( match ) { - f.ws_send("sub", match[1]); - f.ws_send("sub_channel", match[1]); + f.ws_send("sub", "room." + match[1]); + f.ws_send("sub", "channel." + match[1]); } } @@ -74,7 +74,7 @@ FFZ.prototype.ws_create = function() { if ( ! f.rooms.hasOwnProperty(room_id) || ! f.rooms[room_id] ) continue; - f.ws_send("sub", room_id); + f.ws_send("sub", "room." + room_id); if ( f.rooms[room_id].needs_history ) { f.rooms[room_id].needs_history = false; @@ -89,10 +89,10 @@ FFZ.prototype.ws_create = function() { hosted_id = f._cindex.get('controller.hostModeTarget.id'); if ( channel_id ) - f.ws_send("sub_channel", channel_id); + f.ws_send("sub", "channel." + channel_id); if ( hosted_id ) - f.ws_send("sub_channel", hosted_id); + f.ws_send("sub", "channel." + hosted_id); } // Send any pending commands. @@ -103,11 +103,32 @@ FFZ.prototype.ws_create = function() { var d = pending[i]; f.ws_send(d[0], d[1], d[2]); } + + // If reconnecting, get the backlog that we missed. + if ( f._ws_offline_time ) { + var timestamp = f._ws_offline_time; + delete f._ws_offline_time; + f.ws_send("ready", timestamp); + } else { + f.ws_send("ready", 0); + } + } + + ws.onerror = function() { + if ( ! f._ws_offline_time ) { + f._ws_offline_time = new Date().getTime(); + } + + // Cycle selected server + f._ws_host_idx = (f._ws_host_idx + 1) % constants.WS_SERVERS.length; } ws.onclose = function(e) { f.log("Socket closed. (Code: " + e.code + ", Reason: " + e.reason + ")"); f._ws_open = false; + if ( ! f._ws_offline_time ) { + f._ws_offline_time = new Date().getTime(); + } // When the connection closes, run our callbacks. for (var i=0; i < FFZ.ws_on_close.length; i++) { @@ -118,7 +139,7 @@ FFZ.prototype.ws_create = function() { } } - // Attempt to cycle to backup server + // Cycle selected server f._ws_host_idx = (f._ws_host_idx + 1) % constants.WS_SERVERS.length; if ( f._ws_delay > 10000 ) {