mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-09-16 18:06:55 +00:00
Code cleanup: Remove single-target, timestamp-cache
This commit is contained in:
parent
00175cad39
commit
3805fa1b66
6 changed files with 16 additions and 228 deletions
|
@ -6,7 +6,6 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/pmylund/go-cache"
|
||||
"golang.org/x/crypto/nacl/box"
|
||||
"io/ioutil"
|
||||
|
@ -23,7 +22,6 @@ var backendHTTPClient http.Client
|
|||
var backendURL string
|
||||
var responseCache *cache.Cache
|
||||
|
||||
var getBacklogURL string
|
||||
var postStatisticsURL string
|
||||
var addTopicURL string
|
||||
var announceStartupURL string
|
||||
|
@ -41,7 +39,6 @@ func setupBackend(config *ConfigFile) {
|
|||
}
|
||||
responseCache = cache.New(60*time.Second, 120*time.Second)
|
||||
|
||||
getBacklogURL = fmt.Sprintf("%s/backlog", backendURL)
|
||||
postStatisticsURL = fmt.Sprintf("%s/stats", backendURL)
|
||||
addTopicURL = fmt.Sprintf("%s/topics", backendURL)
|
||||
announceStartupURL = fmt.Sprintf("%s/startup", backendURL)
|
||||
|
@ -96,18 +93,16 @@ func HTTPBackendUncachedPublish(w http.ResponseWriter, r *http.Request) {
|
|||
var count int
|
||||
|
||||
switch target {
|
||||
case MsgTargetTypeSingle:
|
||||
// TODO
|
||||
case MsgTargetTypeChat:
|
||||
count = PublishToChannel(channel, cm)
|
||||
case MsgTargetTypeMultichat:
|
||||
count = PublishToMultiple(strings.Split(channel, ","), cm)
|
||||
case MsgTargetTypeGlobal:
|
||||
count = PublishToAll(cm)
|
||||
case MsgTargetTypeInvalid:
|
||||
case MsgTargetTypeInvalid: fallthrough
|
||||
default:
|
||||
w.WriteHeader(422)
|
||||
fmt.Fprint(w, "Invalid 'scope'. must be single, chat, multichat, channel, or global")
|
||||
fmt.Fprint(w, "Invalid 'scope'. must be chat, multichat, channel, or global")
|
||||
return
|
||||
}
|
||||
fmt.Fprint(w, count)
|
||||
|
@ -204,41 +199,6 @@ func SendAggregatedData(sealedForm url.Values) error {
|
|||
return resp.Body.Close()
|
||||
}
|
||||
|
||||
// FetchBacklogData makes a request to the backend for backlog data on a set of pub/sub topics.
|
||||
// TODO scrap this, replaced by /cached_pub
|
||||
func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) {
|
||||
formData := url.Values{
|
||||
"subs": chatSubs,
|
||||
}
|
||||
|
||||
sealedForm, err := SealRequest(formData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := backendHTTPClient.PostForm(getBacklogURL, sealedForm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
return nil, httpError(resp.StatusCode)
|
||||
}
|
||||
dec := json.NewDecoder(resp.Body)
|
||||
var messageStrings []string
|
||||
err = dec.Decode(messageStrings)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var messages = make([]ClientMessage, len(messageStrings))
|
||||
for i, str := range messageStrings {
|
||||
UnmarshalClientMessage([]byte(str), websocket.TextMessage, &messages[i])
|
||||
}
|
||||
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
// ErrBackendNotOK indicates that the backend replied with something other than the string "ok".
|
||||
type ErrBackendNotOK struct {
|
||||
Response string
|
||||
|
|
|
@ -172,9 +172,6 @@ func C2SReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg
|
|||
go func() {
|
||||
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand}
|
||||
SendBacklogForNewClient(client)
|
||||
// if disconnectAt != 0 {
|
||||
// SendTimedBacklogMessages(client, time.Unix(disconnectAt, 0))
|
||||
// }
|
||||
client.MsgChannelKeepalive.Done()
|
||||
}()
|
||||
return ClientMessage{Command: AsyncResponseCommand}, nil
|
||||
|
|
|
@ -13,8 +13,8 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SuccessCommand is a Reply Command to indicate success in reply to a C2S Command.
|
||||
|
@ -91,7 +91,6 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) {
|
|||
}
|
||||
|
||||
go authorizationJanitor()
|
||||
go backlogJanitor()
|
||||
go bunchCacheJanitor()
|
||||
go pubsubJanitor()
|
||||
go aggregateDataSender()
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -16,32 +15,18 @@ type PushCommandCacheInfo struct {
|
|||
Target MessageTargetType
|
||||
}
|
||||
|
||||
// this value is just docs right now
|
||||
var ServerInitiatedCommands = map[Command]PushCommandCacheInfo{
|
||||
/// Global updates & notices
|
||||
"update_news": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
|
||||
"message": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
|
||||
"reload_ff": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
|
||||
|
||||
/// Emote updates
|
||||
"reload_badges": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
|
||||
"set_badge": {CacheTypeTimestamps, MsgTargetTypeMultichat}, // timecache:multichat
|
||||
"reload_set": {}, // timecache:multichat
|
||||
"load_set": {}, // TODO what are the semantics of this?
|
||||
|
||||
/// User auth
|
||||
"do_authorize": {CacheTypeNever, MsgTargetTypeSingle}, // nocache:single
|
||||
|
||||
// S2CCommandsCacheInfo details what the behavior is of each command that can be sent to /cached_pub.
|
||||
var S2CCommandsCacheInfo = map[Command]PushCommandCacheInfo{
|
||||
/// Channel data
|
||||
// follow_sets: extra emote sets included in the chat
|
||||
// follow_buttons: extra follow buttons below the stream
|
||||
"follow_sets": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:chat
|
||||
"follow_buttons": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:watching
|
||||
"srl_race": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching
|
||||
"follow_sets": {CacheTypePersistent, MsgTargetTypeChat},
|
||||
"follow_buttons": {CacheTypePersistent, MsgTargetTypeChat},
|
||||
"srl_race": {CacheTypeLastOnly, MsgTargetTypeChat},
|
||||
|
||||
/// Chatter/viewer counts
|
||||
"chatters": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching
|
||||
"viewers": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching
|
||||
"chatters": {CacheTypeLastOnly, MsgTargetTypeChat},
|
||||
"viewers": {CacheTypeLastOnly, MsgTargetTypeChat},
|
||||
}
|
||||
|
||||
type BacklogCacheType int
|
||||
|
@ -51,10 +36,6 @@ const (
|
|||
CacheTypeInvalid BacklogCacheType = iota
|
||||
// This message cannot be cached.
|
||||
CacheTypeNever
|
||||
// Save the last 24 hours of this message.
|
||||
// If a client indicates that it has reconnected, replay the messages sent after the disconnect.
|
||||
// Do not replay if the client indicates that this is a firstload.
|
||||
CacheTypeTimestamps
|
||||
// Save only the last copy of this message, and always send it when the backlog is requested.
|
||||
CacheTypeLastOnly
|
||||
// Save this backlog data to disk with its timestamp.
|
||||
|
@ -67,8 +48,6 @@ type MessageTargetType int
|
|||
const (
|
||||
// This is not a message target.
|
||||
MsgTargetTypeInvalid MessageTargetType = iota
|
||||
// This message is targeted to a single TODO(user or connection)
|
||||
MsgTargetTypeSingle
|
||||
// This message is targeted to all users in a chat
|
||||
MsgTargetTypeChat
|
||||
// This message is targeted to all users in multiple chats
|
||||
|
@ -85,19 +64,6 @@ var ErrorUnrecognizedCacheType = errors.New("Invalid value for cachetype")
|
|||
// Returned by MessageTargetType.UnmarshalJSON()
|
||||
var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target")
|
||||
|
||||
type TimestampedGlobalMessage struct {
|
||||
Timestamp time.Time
|
||||
Command Command
|
||||
Data string
|
||||
}
|
||||
|
||||
type TimestampedMultichatMessage struct {
|
||||
Timestamp time.Time
|
||||
Channels []string
|
||||
Command Command
|
||||
Data string
|
||||
}
|
||||
|
||||
type LastSavedMessage struct {
|
||||
Timestamp time.Time
|
||||
Data string
|
||||
|
@ -113,10 +79,6 @@ var CachedLSMLock sync.RWMutex
|
|||
var PersistentLastMessages map[Command]map[string]LastSavedMessage
|
||||
var PersistentLSMLock sync.RWMutex
|
||||
|
||||
var CachedGlobalMessages []TimestampedGlobalMessage
|
||||
var CachedChannelMessages []TimestampedMultichatMessage
|
||||
var CacheListsLock sync.RWMutex
|
||||
|
||||
// DumpBacklogData drops all /cached_pub data.
|
||||
func DumpBacklogData() {
|
||||
CachedLSMLock.Lock()
|
||||
|
@ -126,11 +88,6 @@ func DumpBacklogData() {
|
|||
PersistentLSMLock.Lock()
|
||||
PersistentLastMessages = make(map[Command]map[string]LastSavedMessage)
|
||||
PersistentLSMLock.Unlock()
|
||||
|
||||
CacheListsLock.Lock()
|
||||
CachedGlobalMessages = make(tgmarray, 0)
|
||||
CachedChannelMessages = make(tmmarray, 0)
|
||||
CacheListsLock.Unlock()
|
||||
}
|
||||
|
||||
// SendBacklogForNewClient sends any backlog data relevant to a new client.
|
||||
|
@ -174,77 +131,6 @@ func SendBacklogForNewClient(client *ClientInfo) {
|
|||
client.Mutex.Unlock()
|
||||
}
|
||||
|
||||
// SendTimedBacklogMessages sends any once-off messages that the client may have missed while it was disconnected.
|
||||
// Effectively, this can only process CacheTypeTimestamps.
|
||||
func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) {
|
||||
client.Mutex.Lock() // reading CurrentChannels
|
||||
CacheListsLock.RLock()
|
||||
|
||||
globIdx := findFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime)
|
||||
|
||||
if globIdx != -1 {
|
||||
for i := globIdx; i < len(CachedGlobalMessages); i++ {
|
||||
item := CachedGlobalMessages[i]
|
||||
msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data}
|
||||
msg.parseOrigArguments()
|
||||
client.MessageChannel <- msg
|
||||
}
|
||||
}
|
||||
|
||||
chanIdx := findFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime)
|
||||
|
||||
if chanIdx != -1 {
|
||||
for i := chanIdx; i < len(CachedChannelMessages); i++ {
|
||||
item := CachedChannelMessages[i]
|
||||
var send bool
|
||||
for _, channel := range item.Channels {
|
||||
for _, matchChannel := range client.CurrentChannels {
|
||||
if channel == matchChannel {
|
||||
send = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if send {
|
||||
break
|
||||
}
|
||||
}
|
||||
if send {
|
||||
msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data}
|
||||
msg.parseOrigArguments()
|
||||
client.MessageChannel <- msg
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CacheListsLock.RUnlock()
|
||||
client.Mutex.Unlock()
|
||||
}
|
||||
|
||||
func backlogJanitor() {
|
||||
for {
|
||||
time.Sleep(1 * time.Hour)
|
||||
cleanupTimedBacklogMessages()
|
||||
}
|
||||
}
|
||||
|
||||
func cleanupTimedBacklogMessages() {
|
||||
CacheListsLock.Lock()
|
||||
oneHourAgo := time.Now().Add(-24 * time.Hour)
|
||||
globIdx := findFirstNewMessage(tgmarray(CachedGlobalMessages), oneHourAgo)
|
||||
if globIdx != -1 {
|
||||
newGlobMsgs := make([]TimestampedGlobalMessage, len(CachedGlobalMessages)-globIdx)
|
||||
copy(newGlobMsgs, CachedGlobalMessages[globIdx:])
|
||||
CachedGlobalMessages = newGlobMsgs
|
||||
}
|
||||
chanIdx := findFirstNewMessage(tmmarray(CachedChannelMessages), oneHourAgo)
|
||||
if chanIdx != -1 {
|
||||
newChanMsgs := make([]TimestampedMultichatMessage, len(CachedChannelMessages)-chanIdx)
|
||||
copy(newChanMsgs, CachedChannelMessages[chanIdx:])
|
||||
CachedChannelMessages = newChanMsgs
|
||||
}
|
||||
CacheListsLock.Unlock()
|
||||
}
|
||||
|
||||
// insertionSort implements insertion sort.
|
||||
// CacheTypeTimestamps should use insertion sort for O(N) average performance.
|
||||
// (The average case is the array is still sorted after insertion of the new item.)
|
||||
|
@ -309,23 +195,9 @@ func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync.
|
|||
}
|
||||
}
|
||||
|
||||
func SaveGlobalMessage(cmd Command, timestamp time.Time, data string) {
|
||||
CacheListsLock.Lock()
|
||||
CachedGlobalMessages = append(CachedGlobalMessages, TimestampedGlobalMessage{timestamp, cmd, data})
|
||||
insertionSort(tgmarray(CachedGlobalMessages))
|
||||
CacheListsLock.Unlock()
|
||||
}
|
||||
|
||||
func SaveMultichanMessage(cmd Command, channels string, timestamp time.Time, data string) {
|
||||
CacheListsLock.Lock()
|
||||
CachedChannelMessages = append(CachedChannelMessages, TimestampedMultichatMessage{timestamp, strings.Split(channels, ","), cmd, data})
|
||||
insertionSort(tmmarray(CachedChannelMessages))
|
||||
CacheListsLock.Unlock()
|
||||
}
|
||||
|
||||
func GetCommandsOfType(match PushCommandCacheInfo) []Command {
|
||||
var ret []Command
|
||||
for cmd, info := range ServerInitiatedCommands {
|
||||
for cmd, info := range S2CCommandsCacheInfo {
|
||||
if info == match {
|
||||
ret = append(ret, cmd)
|
||||
}
|
||||
|
@ -371,7 +243,7 @@ func HTTPBackendCachedPublish(w http.ResponseWriter, r *http.Request) {
|
|||
fmt.Fprintf(w, "error parsing time: %v", err)
|
||||
}
|
||||
|
||||
cacheinfo, ok := ServerInitiatedCommands[cmd]
|
||||
cacheinfo, ok := S2CCommandsCacheInfo[cmd]
|
||||
if !ok {
|
||||
w.WriteHeader(422)
|
||||
fmt.Fprintf(w, "Caching semantics unknown for command '%s'. Post to /addcachedcommand first.", cmd)
|
||||
|
@ -388,12 +260,6 @@ func HTTPBackendCachedPublish(w http.ResponseWriter, r *http.Request) {
|
|||
} else if cacheinfo.Caching == CacheTypePersistent && cacheinfo.Target == MsgTargetTypeChat {
|
||||
SaveLastMessage(PersistentLastMessages, &PersistentLSMLock, cmd, channel, timestamp, json, deleteMode)
|
||||
count = PublishToChannel(channel, msg)
|
||||
} else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeMultichat {
|
||||
SaveMultichanMessage(cmd, channel, timestamp, json)
|
||||
count = PublishToMultiple(strings.Split(channel, ","), msg)
|
||||
} else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeGlobal {
|
||||
SaveGlobalMessage(cmd, timestamp, json)
|
||||
count = PublishToAll(msg)
|
||||
}
|
||||
|
||||
w.Write([]byte(strconv.Itoa(count)))
|
||||
|
|
|
@ -190,9 +190,9 @@ func TestSubscriptionAndPublish(t *testing.T) {
|
|||
const TestData3 = false
|
||||
var TestData4 = []interface{}{"str1", "str2", "str3"}
|
||||
|
||||
ServerInitiatedCommands[TestCommandChan] = PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}
|
||||
ServerInitiatedCommands[TestCommandMulti] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeMultichat}
|
||||
ServerInitiatedCommands[TestCommandGlobal] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeGlobal}
|
||||
S2CCommandsCacheInfo[TestCommandChan] = PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}
|
||||
S2CCommandsCacheInfo[TestCommandMulti] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeMultichat}
|
||||
S2CCommandsCacheInfo[TestCommandGlobal] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeGlobal}
|
||||
|
||||
var server *httptest.Server
|
||||
var urls TURLs
|
||||
|
|
|
@ -159,42 +159,12 @@ func (cv *ClientVersion) Equal(cv2 *ClientVersion) bool {
|
|||
|
||||
const usePendingSubscrptionsBacklog = false
|
||||
|
||||
type tgmarray []TimestampedGlobalMessage
|
||||
type tmmarray []TimestampedMultichatMessage
|
||||
|
||||
func (ta tgmarray) Len() int {
|
||||
return len(ta)
|
||||
}
|
||||
func (ta tgmarray) Less(i, j int) bool {
|
||||
return ta[i].Timestamp.Before(ta[j].Timestamp)
|
||||
}
|
||||
func (ta tgmarray) Swap(i, j int) {
|
||||
ta[i], ta[j] = ta[j], ta[i]
|
||||
}
|
||||
func (ta tgmarray) GetTime(i int) time.Time {
|
||||
return ta[i].Timestamp
|
||||
}
|
||||
func (ta tmmarray) Len() int {
|
||||
return len(ta)
|
||||
}
|
||||
func (ta tmmarray) Less(i, j int) bool {
|
||||
return ta[i].Timestamp.Before(ta[j].Timestamp)
|
||||
}
|
||||
func (ta tmmarray) Swap(i, j int) {
|
||||
ta[i], ta[j] = ta[j], ta[i]
|
||||
}
|
||||
func (ta tmmarray) GetTime(i int) time.Time {
|
||||
return ta[i].Timestamp
|
||||
}
|
||||
|
||||
func (bct BacklogCacheType) Name() string {
|
||||
switch bct {
|
||||
case CacheTypeInvalid:
|
||||
return ""
|
||||
case CacheTypeNever:
|
||||
return "never"
|
||||
case CacheTypeTimestamps:
|
||||
return "timed"
|
||||
case CacheTypeLastOnly:
|
||||
return "last"
|
||||
case CacheTypePersistent:
|
||||
|
@ -205,7 +175,6 @@ func (bct BacklogCacheType) Name() string {
|
|||
|
||||
var CacheTypesByName = map[string]BacklogCacheType{
|
||||
"never": CacheTypeNever,
|
||||
"timed": CacheTypeTimestamps,
|
||||
"last": CacheTypeLastOnly,
|
||||
"persist": CacheTypePersistent,
|
||||
}
|
||||
|
@ -247,8 +216,6 @@ func (mtt MessageTargetType) Name() string {
|
|||
switch mtt {
|
||||
case MsgTargetTypeInvalid:
|
||||
return ""
|
||||
case MsgTargetTypeSingle:
|
||||
return "single"
|
||||
case MsgTargetTypeChat:
|
||||
return "chat"
|
||||
case MsgTargetTypeMultichat:
|
||||
|
@ -260,7 +227,6 @@ func (mtt MessageTargetType) Name() string {
|
|||
}
|
||||
|
||||
var TargetTypesByName = map[string]MessageTargetType{
|
||||
"single": MsgTargetTypeSingle,
|
||||
"chat": MsgTargetTypeChat,
|
||||
"multichat": MsgTargetTypeMultichat,
|
||||
"global": MsgTargetTypeGlobal,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue