diff --git a/socketserver/server/backend.go b/socketserver/server/backend.go index 8c96fd51..f5b67c37 100644 --- a/socketserver/server/backend.go +++ b/socketserver/server/backend.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/gorilla/websocket" "github.com/pmylund/go-cache" "golang.org/x/crypto/nacl/box" "io/ioutil" @@ -23,7 +22,6 @@ var backendHTTPClient http.Client var backendURL string var responseCache *cache.Cache -var getBacklogURL string var postStatisticsURL string var addTopicURL string var announceStartupURL string @@ -41,7 +39,6 @@ func setupBackend(config *ConfigFile) { } responseCache = cache.New(60*time.Second, 120*time.Second) - getBacklogURL = fmt.Sprintf("%s/backlog", backendURL) postStatisticsURL = fmt.Sprintf("%s/stats", backendURL) addTopicURL = fmt.Sprintf("%s/topics", backendURL) announceStartupURL = fmt.Sprintf("%s/startup", backendURL) @@ -96,18 +93,16 @@ func HTTPBackendUncachedPublish(w http.ResponseWriter, r *http.Request) { var count int switch target { - case MsgTargetTypeSingle: - // TODO case MsgTargetTypeChat: count = PublishToChannel(channel, cm) case MsgTargetTypeMultichat: count = PublishToMultiple(strings.Split(channel, ","), cm) case MsgTargetTypeGlobal: count = PublishToAll(cm) - case MsgTargetTypeInvalid: + case MsgTargetTypeInvalid: fallthrough default: w.WriteHeader(422) - fmt.Fprint(w, "Invalid 'scope'. must be single, chat, multichat, channel, or global") + fmt.Fprint(w, "Invalid 'scope'. must be chat, multichat, channel, or global") return } fmt.Fprint(w, count) @@ -204,41 +199,6 @@ func SendAggregatedData(sealedForm url.Values) error { return resp.Body.Close() } -// FetchBacklogData makes a request to the backend for backlog data on a set of pub/sub topics. -// TODO scrap this, replaced by /cached_pub -func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) { - formData := url.Values{ - "subs": chatSubs, - } - - sealedForm, err := SealRequest(formData) - if err != nil { - return nil, err - } - - resp, err := backendHTTPClient.PostForm(getBacklogURL, sealedForm) - if err != nil { - return nil, err - } - defer resp.Body.Close() - if resp.StatusCode != 200 { - return nil, httpError(resp.StatusCode) - } - dec := json.NewDecoder(resp.Body) - var messageStrings []string - err = dec.Decode(messageStrings) - if err != nil { - return nil, err - } - - var messages = make([]ClientMessage, len(messageStrings)) - for i, str := range messageStrings { - UnmarshalClientMessage([]byte(str), websocket.TextMessage, &messages[i]) - } - - return messages, nil -} - // ErrBackendNotOK indicates that the backend replied with something other than the string "ok". type ErrBackendNotOK struct { Response string diff --git a/socketserver/server/commands.go b/socketserver/server/commands.go index 57461278..b68d4ebf 100644 --- a/socketserver/server/commands.go +++ b/socketserver/server/commands.go @@ -172,9 +172,6 @@ func C2SReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg go func() { client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand} SendBacklogForNewClient(client) - // if disconnectAt != 0 { - // SendTimedBacklogMessages(client, time.Unix(disconnectAt, 0)) - // } client.MsgChannelKeepalive.Done() }() return ClientMessage{Command: AsyncResponseCommand}, nil diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index a8272b71..7ab17b1e 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -13,8 +13,8 @@ import ( "strconv" "strings" "sync" - "time" "sync/atomic" + "time" ) // SuccessCommand is a Reply Command to indicate success in reply to a C2S Command. @@ -91,7 +91,6 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { } go authorizationJanitor() - go backlogJanitor() go bunchCacheJanitor() go pubsubJanitor() go aggregateDataSender() diff --git a/socketserver/server/publisher.go b/socketserver/server/publisher.go index 9f9be8ff..b6e9a79a 100644 --- a/socketserver/server/publisher.go +++ b/socketserver/server/publisher.go @@ -6,7 +6,6 @@ import ( "net/http" "sort" "strconv" - "strings" "sync" "time" ) @@ -16,32 +15,18 @@ type PushCommandCacheInfo struct { Target MessageTargetType } -// this value is just docs right now -var ServerInitiatedCommands = map[Command]PushCommandCacheInfo{ - /// Global updates & notices - "update_news": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global - "message": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global - "reload_ff": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global - - /// Emote updates - "reload_badges": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global - "set_badge": {CacheTypeTimestamps, MsgTargetTypeMultichat}, // timecache:multichat - "reload_set": {}, // timecache:multichat - "load_set": {}, // TODO what are the semantics of this? - - /// User auth - "do_authorize": {CacheTypeNever, MsgTargetTypeSingle}, // nocache:single - +// S2CCommandsCacheInfo details what the behavior is of each command that can be sent to /cached_pub. +var S2CCommandsCacheInfo = map[Command]PushCommandCacheInfo{ /// Channel data // follow_sets: extra emote sets included in the chat // follow_buttons: extra follow buttons below the stream - "follow_sets": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:chat - "follow_buttons": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:watching - "srl_race": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching + "follow_sets": {CacheTypePersistent, MsgTargetTypeChat}, + "follow_buttons": {CacheTypePersistent, MsgTargetTypeChat}, + "srl_race": {CacheTypeLastOnly, MsgTargetTypeChat}, /// Chatter/viewer counts - "chatters": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching - "viewers": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching + "chatters": {CacheTypeLastOnly, MsgTargetTypeChat}, + "viewers": {CacheTypeLastOnly, MsgTargetTypeChat}, } type BacklogCacheType int @@ -51,10 +36,6 @@ const ( CacheTypeInvalid BacklogCacheType = iota // This message cannot be cached. CacheTypeNever - // Save the last 24 hours of this message. - // If a client indicates that it has reconnected, replay the messages sent after the disconnect. - // Do not replay if the client indicates that this is a firstload. - CacheTypeTimestamps // Save only the last copy of this message, and always send it when the backlog is requested. CacheTypeLastOnly // Save this backlog data to disk with its timestamp. @@ -67,8 +48,6 @@ type MessageTargetType int const ( // This is not a message target. MsgTargetTypeInvalid MessageTargetType = iota - // This message is targeted to a single TODO(user or connection) - MsgTargetTypeSingle // This message is targeted to all users in a chat MsgTargetTypeChat // This message is targeted to all users in multiple chats @@ -85,19 +64,6 @@ var ErrorUnrecognizedCacheType = errors.New("Invalid value for cachetype") // Returned by MessageTargetType.UnmarshalJSON() var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target") -type TimestampedGlobalMessage struct { - Timestamp time.Time - Command Command - Data string -} - -type TimestampedMultichatMessage struct { - Timestamp time.Time - Channels []string - Command Command - Data string -} - type LastSavedMessage struct { Timestamp time.Time Data string @@ -113,10 +79,6 @@ var CachedLSMLock sync.RWMutex var PersistentLastMessages map[Command]map[string]LastSavedMessage var PersistentLSMLock sync.RWMutex -var CachedGlobalMessages []TimestampedGlobalMessage -var CachedChannelMessages []TimestampedMultichatMessage -var CacheListsLock sync.RWMutex - // DumpBacklogData drops all /cached_pub data. func DumpBacklogData() { CachedLSMLock.Lock() @@ -126,11 +88,6 @@ func DumpBacklogData() { PersistentLSMLock.Lock() PersistentLastMessages = make(map[Command]map[string]LastSavedMessage) PersistentLSMLock.Unlock() - - CacheListsLock.Lock() - CachedGlobalMessages = make(tgmarray, 0) - CachedChannelMessages = make(tmmarray, 0) - CacheListsLock.Unlock() } // SendBacklogForNewClient sends any backlog data relevant to a new client. @@ -174,77 +131,6 @@ func SendBacklogForNewClient(client *ClientInfo) { client.Mutex.Unlock() } -// SendTimedBacklogMessages sends any once-off messages that the client may have missed while it was disconnected. -// Effectively, this can only process CacheTypeTimestamps. -func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) { - client.Mutex.Lock() // reading CurrentChannels - CacheListsLock.RLock() - - globIdx := findFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime) - - if globIdx != -1 { - for i := globIdx; i < len(CachedGlobalMessages); i++ { - item := CachedGlobalMessages[i] - msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data} - msg.parseOrigArguments() - client.MessageChannel <- msg - } - } - - chanIdx := findFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime) - - if chanIdx != -1 { - for i := chanIdx; i < len(CachedChannelMessages); i++ { - item := CachedChannelMessages[i] - var send bool - for _, channel := range item.Channels { - for _, matchChannel := range client.CurrentChannels { - if channel == matchChannel { - send = true - break - } - } - if send { - break - } - } - if send { - msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data} - msg.parseOrigArguments() - client.MessageChannel <- msg - } - } - } - - CacheListsLock.RUnlock() - client.Mutex.Unlock() -} - -func backlogJanitor() { - for { - time.Sleep(1 * time.Hour) - cleanupTimedBacklogMessages() - } -} - -func cleanupTimedBacklogMessages() { - CacheListsLock.Lock() - oneHourAgo := time.Now().Add(-24 * time.Hour) - globIdx := findFirstNewMessage(tgmarray(CachedGlobalMessages), oneHourAgo) - if globIdx != -1 { - newGlobMsgs := make([]TimestampedGlobalMessage, len(CachedGlobalMessages)-globIdx) - copy(newGlobMsgs, CachedGlobalMessages[globIdx:]) - CachedGlobalMessages = newGlobMsgs - } - chanIdx := findFirstNewMessage(tmmarray(CachedChannelMessages), oneHourAgo) - if chanIdx != -1 { - newChanMsgs := make([]TimestampedMultichatMessage, len(CachedChannelMessages)-chanIdx) - copy(newChanMsgs, CachedChannelMessages[chanIdx:]) - CachedChannelMessages = newChanMsgs - } - CacheListsLock.Unlock() -} - // insertionSort implements insertion sort. // CacheTypeTimestamps should use insertion sort for O(N) average performance. // (The average case is the array is still sorted after insertion of the new item.) @@ -309,23 +195,9 @@ func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync. } } -func SaveGlobalMessage(cmd Command, timestamp time.Time, data string) { - CacheListsLock.Lock() - CachedGlobalMessages = append(CachedGlobalMessages, TimestampedGlobalMessage{timestamp, cmd, data}) - insertionSort(tgmarray(CachedGlobalMessages)) - CacheListsLock.Unlock() -} - -func SaveMultichanMessage(cmd Command, channels string, timestamp time.Time, data string) { - CacheListsLock.Lock() - CachedChannelMessages = append(CachedChannelMessages, TimestampedMultichatMessage{timestamp, strings.Split(channels, ","), cmd, data}) - insertionSort(tmmarray(CachedChannelMessages)) - CacheListsLock.Unlock() -} - func GetCommandsOfType(match PushCommandCacheInfo) []Command { var ret []Command - for cmd, info := range ServerInitiatedCommands { + for cmd, info := range S2CCommandsCacheInfo { if info == match { ret = append(ret, cmd) } @@ -371,7 +243,7 @@ func HTTPBackendCachedPublish(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "error parsing time: %v", err) } - cacheinfo, ok := ServerInitiatedCommands[cmd] + cacheinfo, ok := S2CCommandsCacheInfo[cmd] if !ok { w.WriteHeader(422) fmt.Fprintf(w, "Caching semantics unknown for command '%s'. Post to /addcachedcommand first.", cmd) @@ -388,12 +260,6 @@ func HTTPBackendCachedPublish(w http.ResponseWriter, r *http.Request) { } else if cacheinfo.Caching == CacheTypePersistent && cacheinfo.Target == MsgTargetTypeChat { SaveLastMessage(PersistentLastMessages, &PersistentLSMLock, cmd, channel, timestamp, json, deleteMode) count = PublishToChannel(channel, msg) - } else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeMultichat { - SaveMultichanMessage(cmd, channel, timestamp, json) - count = PublishToMultiple(strings.Split(channel, ","), msg) - } else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeGlobal { - SaveGlobalMessage(cmd, timestamp, json) - count = PublishToAll(msg) } w.Write([]byte(strconv.Itoa(count))) diff --git a/socketserver/server/subscriptions_test.go b/socketserver/server/subscriptions_test.go index 5d7adba3..c798ca2d 100644 --- a/socketserver/server/subscriptions_test.go +++ b/socketserver/server/subscriptions_test.go @@ -190,9 +190,9 @@ func TestSubscriptionAndPublish(t *testing.T) { const TestData3 = false var TestData4 = []interface{}{"str1", "str2", "str3"} - ServerInitiatedCommands[TestCommandChan] = PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat} - ServerInitiatedCommands[TestCommandMulti] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeMultichat} - ServerInitiatedCommands[TestCommandGlobal] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeGlobal} + S2CCommandsCacheInfo[TestCommandChan] = PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat} + S2CCommandsCacheInfo[TestCommandMulti] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeMultichat} + S2CCommandsCacheInfo[TestCommandGlobal] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeGlobal} var server *httptest.Server var urls TURLs diff --git a/socketserver/server/types.go b/socketserver/server/types.go index 1910c889..bc660aed 100644 --- a/socketserver/server/types.go +++ b/socketserver/server/types.go @@ -122,7 +122,7 @@ type ClientInfo struct { type esReportBasic struct { Timestamp time.Time - Host string + Host string } type esDisconnectReport struct { } @@ -159,42 +159,12 @@ func (cv *ClientVersion) Equal(cv2 *ClientVersion) bool { const usePendingSubscrptionsBacklog = false -type tgmarray []TimestampedGlobalMessage -type tmmarray []TimestampedMultichatMessage - -func (ta tgmarray) Len() int { - return len(ta) -} -func (ta tgmarray) Less(i, j int) bool { - return ta[i].Timestamp.Before(ta[j].Timestamp) -} -func (ta tgmarray) Swap(i, j int) { - ta[i], ta[j] = ta[j], ta[i] -} -func (ta tgmarray) GetTime(i int) time.Time { - return ta[i].Timestamp -} -func (ta tmmarray) Len() int { - return len(ta) -} -func (ta tmmarray) Less(i, j int) bool { - return ta[i].Timestamp.Before(ta[j].Timestamp) -} -func (ta tmmarray) Swap(i, j int) { - ta[i], ta[j] = ta[j], ta[i] -} -func (ta tmmarray) GetTime(i int) time.Time { - return ta[i].Timestamp -} - func (bct BacklogCacheType) Name() string { switch bct { case CacheTypeInvalid: return "" case CacheTypeNever: return "never" - case CacheTypeTimestamps: - return "timed" case CacheTypeLastOnly: return "last" case CacheTypePersistent: @@ -205,7 +175,6 @@ func (bct BacklogCacheType) Name() string { var CacheTypesByName = map[string]BacklogCacheType{ "never": CacheTypeNever, - "timed": CacheTypeTimestamps, "last": CacheTypeLastOnly, "persist": CacheTypePersistent, } @@ -247,8 +216,6 @@ func (mtt MessageTargetType) Name() string { switch mtt { case MsgTargetTypeInvalid: return "" - case MsgTargetTypeSingle: - return "single" case MsgTargetTypeChat: return "chat" case MsgTargetTypeMultichat: @@ -260,7 +227,6 @@ func (mtt MessageTargetType) Name() string { } var TargetTypesByName = map[string]MessageTargetType{ - "single": MsgTargetTypeSingle, "chat": MsgTargetTypeChat, "multichat": MsgTargetTypeMultichat, "global": MsgTargetTypeGlobal,