diff --git a/socketserver/internal/server/backend.go b/socketserver/internal/server/backend.go index 68be3b7e..3e98e742 100644 --- a/socketserver/internal/server/backend.go +++ b/socketserver/internal/server/backend.go @@ -15,6 +15,7 @@ import ( "strings" "sync" "time" + "github.com/gorilla/websocket" ) var backendHttpClient http.Client @@ -23,6 +24,7 @@ var responseCache *cache.Cache var getBacklogUrl string var postStatisticsUrl string +var addTopicUrl string var backendSharedKey [32]byte var serverId int @@ -35,10 +37,11 @@ func SetupBackend(config *ConfigFile) { if responseCache != nil { responseCache.Flush() } - responseCache = cache.New(60*time.Second, 120*time.Second) + responseCache = cache.New(60 * time.Second, 120 * time.Second) getBacklogUrl = fmt.Sprintf("%s/backlog", backendUrl) postStatisticsUrl = fmt.Sprintf("%s/stats", backendUrl) + addTopicUrl = fmt.Sprintf("%s/topics", backendUrl) messageBufferPool.New = New4KByteBuffer @@ -203,15 +206,69 @@ func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) { return nil, httpError(resp.StatusCode) } dec := json.NewDecoder(resp.Body) - var messages []ClientMessage - err = dec.Decode(messages) + var messageStrings []string + err = dec.Decode(messageStrings) if err != nil { return nil, err } + var messages = make([]ClientMessage, len(messageStrings)) + for i, str := range messageStrings { + UnmarshalClientMessage([]byte(str), websocket.TextMessage, &messages[i]) + } + return messages, nil } +type NotOkError struct { + Response string + Code int +} +func (noe NotOkError) Error() string { + return fmt.Sprintf("backend returned %d: %s", noe.Code, noe.Response) +} + +func SendNewTopicNotice(topic string) error { + return sendTopicNotice(topic, true) +} + +func SendCleanupTopicsNotice(topics []string) error { + return sendTopicNotice(strings.Join(topics, ","), false) +} + +func sendTopicNotice(topic string, added bool) error { + formData := url.Values{} + formData.Set("channels", topic) + if added { + formData.Set("added", "t") + } else { + formData.Set("added", "f") + } + + sealedForm, err := SealRequest(formData) + if err != nil { + return err + } + + resp, err := backendHttpClient.PostForm(addTopicUrl, sealedForm) + if err != nil { + return err + } + defer resp.Body.Close() + + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + respStr := string(respBytes) + if respStr != "ok" { + return NotOkError{Code: resp.StatusCode, Response: respStr} + } + + return nil +} + func httpError(statusCode int) error { return fmt.Errorf("backend http error: %d", statusCode) } diff --git a/socketserver/internal/server/publisher.go b/socketserver/internal/server/publisher.go index db1f74ac..54a5b3d1 100644 --- a/socketserver/internal/server/publisher.go +++ b/socketserver/internal/server/publisher.go @@ -6,6 +6,7 @@ package server import ( "sync" "time" + "log" ) type SubscriberList struct { @@ -82,6 +83,14 @@ func _subscribeWhileRlocked(channelName string, value chan<- ClientMessage) { list.Members = []chan<- ClientMessage{value} // Create it populated, to avoid reaper ChatSubscriptionInfo[channelName] = list ChatSubscriptionLock.Unlock() + + go func(topic string) { + err := SendNewTopicNotice(topic) + if err != nil { + log.Println("error reporting new sub:", err) + } + }(channelName) + ChatSubscriptionLock.RLock() } else { list.Lock() @@ -152,21 +161,28 @@ func UnsubscribeSingleChat(client *ClientInfo, channelName string) { ChatSubscriptionLock.RUnlock() } -const ReapingDelay = 120 * time.Minute +const ReapingDelay = 20 * time.Minute // Checks ChatSubscriptionInfo for entries with no subscribers every ReapingDelay. // Started from SetupServer(). func deadChannelReaper() { for { time.Sleep(ReapingDelay) + var cleanedUp = make([]string, 0, 6) ChatSubscriptionLock.Lock() for key, val := range ChatSubscriptionInfo { - if val != nil { - if len(val.Members) == 0 { - delete(ChatSubscriptionInfo, key) - } + if val == nil || len(val.Members) == 0 { + delete(ChatSubscriptionInfo, key) + cleanedUp = append(cleanedUp, key) } } ChatSubscriptionLock.Unlock() + + if len(cleanedUp) != 0 { + err := SendCleanupTopicsNotice(cleanedUp) + if err != nil { + log.Println("error reporting cleaned subs:", err) + } + } } } diff --git a/socketserver/internal/server/types.go b/socketserver/internal/server/types.go index 2f6013b3..48d3534d 100644 --- a/socketserver/internal/server/types.go +++ b/socketserver/internal/server/types.go @@ -34,13 +34,13 @@ type ClientMessage struct { // Message ID. Increments by 1 for each message sent from the client. // When replying to a command, the message ID must be echoed. // When sending a server-initiated message, this is -1. - MessageID int + MessageID int `json:m` // The command that the client wants from the server. // When sent from the server, the literal string 'True' indicates success. // Before sending, a blank Command will be converted into SuccessCommand. - Command Command + Command Command `json:c` // Result of json.Unmarshal on the third field send from the client - Arguments interface{} + Arguments interface{} `json:a` origArguments string }