1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-08-07 23:00:54 +00:00

Add reporting of added/removed subcriptions

This commit is contained in:
Kane York 2015-11-04 15:11:49 -08:00
parent 3ad095acf4
commit 525b19eccb
3 changed files with 84 additions and 11 deletions

View file

@ -15,6 +15,7 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/gorilla/websocket"
) )
var backendHttpClient http.Client var backendHttpClient http.Client
@ -23,6 +24,7 @@ var responseCache *cache.Cache
var getBacklogUrl string var getBacklogUrl string
var postStatisticsUrl string var postStatisticsUrl string
var addTopicUrl string
var backendSharedKey [32]byte var backendSharedKey [32]byte
var serverId int var serverId int
@ -39,6 +41,7 @@ func SetupBackend(config *ConfigFile) {
getBacklogUrl = fmt.Sprintf("%s/backlog", backendUrl) getBacklogUrl = fmt.Sprintf("%s/backlog", backendUrl)
postStatisticsUrl = fmt.Sprintf("%s/stats", backendUrl) postStatisticsUrl = fmt.Sprintf("%s/stats", backendUrl)
addTopicUrl = fmt.Sprintf("%s/topics", backendUrl)
messageBufferPool.New = New4KByteBuffer messageBufferPool.New = New4KByteBuffer
@ -203,15 +206,69 @@ func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) {
return nil, httpError(resp.StatusCode) return nil, httpError(resp.StatusCode)
} }
dec := json.NewDecoder(resp.Body) dec := json.NewDecoder(resp.Body)
var messages []ClientMessage var messageStrings []string
err = dec.Decode(messages) err = dec.Decode(messageStrings)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var messages = make([]ClientMessage, len(messageStrings))
for i, str := range messageStrings {
UnmarshalClientMessage([]byte(str), websocket.TextMessage, &messages[i])
}
return messages, nil return messages, nil
} }
type NotOkError struct {
Response string
Code int
}
func (noe NotOkError) Error() string {
return fmt.Sprintf("backend returned %d: %s", noe.Code, noe.Response)
}
func SendNewTopicNotice(topic string) error {
return sendTopicNotice(topic, true)
}
func SendCleanupTopicsNotice(topics []string) error {
return sendTopicNotice(strings.Join(topics, ","), false)
}
func sendTopicNotice(topic string, added bool) error {
formData := url.Values{}
formData.Set("channels", topic)
if added {
formData.Set("added", "t")
} else {
formData.Set("added", "f")
}
sealedForm, err := SealRequest(formData)
if err != nil {
return err
}
resp, err := backendHttpClient.PostForm(addTopicUrl, sealedForm)
if err != nil {
return err
}
defer resp.Body.Close()
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
respStr := string(respBytes)
if respStr != "ok" {
return NotOkError{Code: resp.StatusCode, Response: respStr}
}
return nil
}
func httpError(statusCode int) error { func httpError(statusCode int) error {
return fmt.Errorf("backend http error: %d", statusCode) return fmt.Errorf("backend http error: %d", statusCode)
} }

View file

@ -6,6 +6,7 @@ package server
import ( import (
"sync" "sync"
"time" "time"
"log"
) )
type SubscriberList struct { type SubscriberList struct {
@ -82,6 +83,14 @@ func _subscribeWhileRlocked(channelName string, value chan<- ClientMessage) {
list.Members = []chan<- ClientMessage{value} // Create it populated, to avoid reaper list.Members = []chan<- ClientMessage{value} // Create it populated, to avoid reaper
ChatSubscriptionInfo[channelName] = list ChatSubscriptionInfo[channelName] = list
ChatSubscriptionLock.Unlock() ChatSubscriptionLock.Unlock()
go func(topic string) {
err := SendNewTopicNotice(topic)
if err != nil {
log.Println("error reporting new sub:", err)
}
}(channelName)
ChatSubscriptionLock.RLock() ChatSubscriptionLock.RLock()
} else { } else {
list.Lock() list.Lock()
@ -152,21 +161,28 @@ func UnsubscribeSingleChat(client *ClientInfo, channelName string) {
ChatSubscriptionLock.RUnlock() ChatSubscriptionLock.RUnlock()
} }
const ReapingDelay = 120 * time.Minute const ReapingDelay = 20 * time.Minute
// Checks ChatSubscriptionInfo for entries with no subscribers every ReapingDelay. // Checks ChatSubscriptionInfo for entries with no subscribers every ReapingDelay.
// Started from SetupServer(). // Started from SetupServer().
func deadChannelReaper() { func deadChannelReaper() {
for { for {
time.Sleep(ReapingDelay) time.Sleep(ReapingDelay)
var cleanedUp = make([]string, 0, 6)
ChatSubscriptionLock.Lock() ChatSubscriptionLock.Lock()
for key, val := range ChatSubscriptionInfo { for key, val := range ChatSubscriptionInfo {
if val != nil { if val == nil || len(val.Members) == 0 {
if len(val.Members) == 0 {
delete(ChatSubscriptionInfo, key) delete(ChatSubscriptionInfo, key)
} cleanedUp = append(cleanedUp, key)
} }
} }
ChatSubscriptionLock.Unlock() ChatSubscriptionLock.Unlock()
if len(cleanedUp) != 0 {
err := SendCleanupTopicsNotice(cleanedUp)
if err != nil {
log.Println("error reporting cleaned subs:", err)
}
}
} }
} }

View file

@ -34,13 +34,13 @@ type ClientMessage struct {
// Message ID. Increments by 1 for each message sent from the client. // Message ID. Increments by 1 for each message sent from the client.
// When replying to a command, the message ID must be echoed. // When replying to a command, the message ID must be echoed.
// When sending a server-initiated message, this is -1. // When sending a server-initiated message, this is -1.
MessageID int MessageID int `json:m`
// The command that the client wants from the server. // The command that the client wants from the server.
// When sent from the server, the literal string 'True' indicates success. // When sent from the server, the literal string 'True' indicates success.
// Before sending, a blank Command will be converted into SuccessCommand. // Before sending, a blank Command will be converted into SuccessCommand.
Command Command Command Command `json:c`
// Result of json.Unmarshal on the third field send from the client // Result of json.Unmarshal on the third field send from the client
Arguments interface{} Arguments interface{} `json:a`
origArguments string origArguments string
} }