diff --git a/socketserver/internal/server/backend.go b/socketserver/internal/server/backend.go index 42adb5ff..4557f534 100644 --- a/socketserver/internal/server/backend.go +++ b/socketserver/internal/server/backend.go @@ -97,7 +97,7 @@ func HBackendPublishRequest(w http.ResponseWriter, r *http.Request) { case MsgTargetTypeSingle: // TODO case MsgTargetTypeChat: - count = PublishToChat(channel, cm) + count = PublishToChannel(channel, cm) case MsgTargetTypeMultichat: count = PublishToMultiple(strings.Split(channel, ","), cm) case MsgTargetTypeGlobal: diff --git a/socketserver/internal/server/backlog.go b/socketserver/internal/server/backlog.go deleted file mode 100644 index 6ced8141..00000000 --- a/socketserver/internal/server/backlog.go +++ /dev/null @@ -1,392 +0,0 @@ -package server - -import ( - "errors" - "fmt" - "net/http" - "sort" - "strconv" - "strings" - "sync" - "time" -) - -type PushCommandCacheInfo struct { - Caching BacklogCacheType - Target MessageTargetType -} - -// this value is just docs right now -var ServerInitiatedCommands = map[Command]PushCommandCacheInfo{ - /// Global updates & notices - "update_news": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global - "message": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global - "reload_ff": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global - - /// Emote updates - "reload_badges": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global - "set_badge": {CacheTypeTimestamps, MsgTargetTypeMultichat}, // timecache:multichat - "reload_set": {}, // timecache:multichat - "load_set": {}, // TODO what are the semantics of this? - - /// User auth - "do_authorize": {CacheTypeNever, MsgTargetTypeSingle}, // nocache:single - - /// Channel data - // follow_sets: extra emote sets included in the chat - // follow_buttons: extra follow buttons below the stream - "follow_sets": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:chat - "follow_buttons": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:watching - "srl_race": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching - - /// Chatter/viewer counts - "chatters": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching - "viewers": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching -} - -type BacklogCacheType int - -const ( - // This is not a cache type. - CacheTypeInvalid BacklogCacheType = iota - // This message cannot be cached. - CacheTypeNever - // Save the last 24 hours of this message. - // If a client indicates that it has reconnected, replay the messages sent after the disconnect. - // Do not replay if the client indicates that this is a firstload. - CacheTypeTimestamps - // Save only the last copy of this message, and always send it when the backlog is requested. - CacheTypeLastOnly - // Save this backlog data to disk with its timestamp. - // Send it when the backlog is requested, or after a reconnect if it was updated. - CacheTypePersistent -) - -type MessageTargetType int - -const ( - // This is not a message target. - MsgTargetTypeInvalid MessageTargetType = iota - // This message is targeted to a single TODO(user or connection) - MsgTargetTypeSingle - // This message is targeted to all users in a chat - MsgTargetTypeChat - // This message is targeted to all users in multiple chats - MsgTargetTypeMultichat - // This message is sent to all FFZ users. - MsgTargetTypeGlobal -) - -// note: see types.go for methods on these - -// Returned by BacklogCacheType.UnmarshalJSON() -var ErrorUnrecognizedCacheType = errors.New("Invalid value for cachetype") - -// Returned by MessageTargetType.UnmarshalJSON() -var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target") - -type TimestampedGlobalMessage struct { - Timestamp time.Time - Command Command - Data string -} - -type TimestampedMultichatMessage struct { - Timestamp time.Time - Channels []string - Command Command - Data string -} - -type LastSavedMessage struct { - Timestamp time.Time - Data string -} - -// map is command -> channel -> data - -// CacheTypeLastOnly. Cleaned up by reaper goroutine every ~hour. -var CachedLastMessages map[Command]map[string]LastSavedMessage -var CachedLSMLock sync.RWMutex - -// CacheTypePersistent. Never cleaned. -var PersistentLastMessages map[Command]map[string]LastSavedMessage -var PersistentLSMLock sync.RWMutex - -var CachedGlobalMessages []TimestampedGlobalMessage -var CachedChannelMessages []TimestampedMultichatMessage -var CacheListsLock sync.RWMutex - -func DumpCache() { - CachedLSMLock.Lock() - CachedLastMessages = make(map[Command]map[string]LastSavedMessage) - CachedLSMLock.Unlock() - - PersistentLSMLock.Lock() - PersistentLastMessages = make(map[Command]map[string]LastSavedMessage) - PersistentLSMLock.Unlock() - - CacheListsLock.Lock() - CachedGlobalMessages = make(tgmarray, 0) - CachedChannelMessages = make(tmmarray, 0) - CacheListsLock.Unlock() -} - -func SendBacklogForNewClient(client *ClientInfo) { - client.Mutex.Lock() // reading CurrentChannels - PersistentLSMLock.RLock() - for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypePersistent, MsgTargetTypeChat}) { - chanMap := CachedLastMessages[cmd] - if chanMap == nil { - continue - } - for _, channel := range client.CurrentChannels { - msg, ok := chanMap[channel] - if ok { - msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data} - msg.parseOrigArguments() - client.MessageChannel <- msg - } - } - } - PersistentLSMLock.RUnlock() - - CachedLSMLock.RLock() - for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}) { - chanMap := CachedLastMessages[cmd] - if chanMap == nil { - continue - } - for _, channel := range client.CurrentChannels { - msg, ok := chanMap[channel] - if ok { - msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data} - msg.parseOrigArguments() - client.MessageChannel <- msg - } - } - } - CachedLSMLock.RUnlock() - client.Mutex.Unlock() -} - -func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) { - client.Mutex.Lock() // reading CurrentChannels - CacheListsLock.RLock() - - globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime) - - if globIdx != -1 { - for i := globIdx; i < len(CachedGlobalMessages); i++ { - item := CachedGlobalMessages[i] - msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data} - msg.parseOrigArguments() - client.MessageChannel <- msg - } - } - - chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime) - - if chanIdx != -1 { - for i := chanIdx; i < len(CachedChannelMessages); i++ { - item := CachedChannelMessages[i] - var send bool - for _, channel := range item.Channels { - for _, matchChannel := range client.CurrentChannels { - if channel == matchChannel { - send = true - break - } - } - if send { - break - } - } - if send { - msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data} - msg.parseOrigArguments() - client.MessageChannel <- msg - } - } - } - - CacheListsLock.RUnlock() - client.Mutex.Unlock() -} - -func backlogJanitor() { - for { - time.Sleep(1 * time.Hour) - CleanupTimedBacklogMessages() - } -} - -func CleanupTimedBacklogMessages() { - CacheListsLock.Lock() - oneHourAgo := time.Now().Add(-24 * time.Hour) - globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), oneHourAgo) - if globIdx != -1 { - newGlobMsgs := make([]TimestampedGlobalMessage, len(CachedGlobalMessages)-globIdx) - copy(newGlobMsgs, CachedGlobalMessages[globIdx:]) - CachedGlobalMessages = newGlobMsgs - } - chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), oneHourAgo) - if chanIdx != -1 { - newChanMsgs := make([]TimestampedMultichatMessage, len(CachedChannelMessages)-chanIdx) - copy(newChanMsgs, CachedChannelMessages[chanIdx:]) - CachedChannelMessages = newChanMsgs - } - CacheListsLock.Unlock() -} - -func InsertionSort(ary sort.Interface) { - for i := 1; i < ary.Len(); i++ { - for j := i; j > 0 && ary.Less(j, j-1); j-- { - ary.Swap(j, j-1) - } - } -} - -type TimestampArray interface { - Len() int - GetTime(int) time.Time -} - -func FindFirstNewMessage(ary TimestampArray, disconnectTime time.Time) (idx int) { - // TODO needs tests - len := ary.Len() - i := len - - // Walk backwards until we find GetTime() before disconnectTime - step := 1 - for i > 0 { - i -= step - if i < 0 { - i = 0 - } - if !ary.GetTime(i).After(disconnectTime) { - break - } - step = int(float64(step)*1.5) + 1 - } - - // Walk forwards until we find GetTime() after disconnectTime - for i < len && !ary.GetTime(i).After(disconnectTime) { - i++ - } - - if i == len { - return -1 - } - return i -} - -func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync.Locker, cmd Command, channel string, timestamp time.Time, data string, deleting bool) { - locker.Lock() - defer locker.Unlock() - - chanMap, ok := CachedLastMessages[cmd] - if !ok { - if deleting { - return - } - chanMap = make(map[string]LastSavedMessage) - CachedLastMessages[cmd] = chanMap - } - - if deleting { - delete(chanMap, channel) - } else { - chanMap[channel] = LastSavedMessage{timestamp, data} - } -} - -func SaveGlobalMessage(cmd Command, timestamp time.Time, data string) { - CacheListsLock.Lock() - CachedGlobalMessages = append(CachedGlobalMessages, TimestampedGlobalMessage{timestamp, cmd, data}) - InsertionSort(tgmarray(CachedGlobalMessages)) - CacheListsLock.Unlock() -} - -func SaveMultichanMessage(cmd Command, channels string, timestamp time.Time, data string) { - CacheListsLock.Lock() - CachedChannelMessages = append(CachedChannelMessages, TimestampedMultichatMessage{timestamp, strings.Split(channels, ","), cmd, data}) - InsertionSort(tmmarray(CachedChannelMessages)) - CacheListsLock.Unlock() -} - -func GetCommandsOfType(match PushCommandCacheInfo) []Command { - var ret []Command - for cmd, info := range ServerInitiatedCommands { - if info == match { - ret = append(ret, cmd) - } - } - return ret -} - -func HBackendDumpBacklog(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - formData, err := UnsealRequest(r.Form) - if err != nil { - w.WriteHeader(403) - fmt.Fprintf(w, "Error: %v", err) - return - } - - confirm := formData.Get("confirm") - if confirm == "1" { - DumpCache() - } -} - -// Publish a message to clients, and update the in-server cache for the message. -// notes: -// `scope` is implicit in the command -func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - formData, err := UnsealRequest(r.Form) - if err != nil { - w.WriteHeader(403) - fmt.Fprintf(w, "Error: %v", err) - return - } - - cmd := Command(formData.Get("cmd")) - json := formData.Get("args") - channel := formData.Get("channel") - deleteMode := formData.Get("delete") != "" - timeStr := formData.Get("time") - timestamp, err := time.Parse(time.UnixDate, timeStr) - if err != nil { - w.WriteHeader(422) - fmt.Fprintf(w, "error parsing time: %v", err) - } - - cacheinfo, ok := ServerInitiatedCommands[cmd] - if !ok { - w.WriteHeader(422) - fmt.Fprintf(w, "Caching semantics unknown for command '%s'. Post to /addcachedcommand first.", cmd) - return - } - - var count int - msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: json} - msg.parseOrigArguments() - - if cacheinfo.Caching == CacheTypeLastOnly && cacheinfo.Target == MsgTargetTypeChat { - SaveLastMessage(CachedLastMessages, &CachedLSMLock, cmd, channel, timestamp, json, deleteMode) - count = PublishToChat(channel, msg) - } else if cacheinfo.Caching == CacheTypePersistent && cacheinfo.Target == MsgTargetTypeChat { - SaveLastMessage(PersistentLastMessages, &PersistentLSMLock, cmd, channel, timestamp, json, deleteMode) - count = PublishToChat(channel, msg) - } else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeMultichat { - SaveMultichanMessage(cmd, channel, timestamp, json) - count = PublishToMultiple(strings.Split(channel, ","), msg) - } else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeGlobal { - SaveGlobalMessage(cmd, timestamp, json) - count = PublishToAll(msg) - } - - w.Write([]byte(strconv.Itoa(count))) -} diff --git a/socketserver/internal/server/backlog_test.go b/socketserver/internal/server/backlog_test.go deleted file mode 100644 index 99ce4f5a..00000000 --- a/socketserver/internal/server/backlog_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package server - -import ( - "testing" - "time" -) - -func TestCleanupBacklogMessages(t *testing.T) { - -} - -func TestFindFirstNewMessageEmpty(t *testing.T) { - CachedGlobalMessages = []TimestampedGlobalMessage{} - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) - if i != -1 { - t.Errorf("Expected -1, got %d", i) - } -} -func TestFindFirstNewMessageOneBefore(t *testing.T) { - CachedGlobalMessages = []TimestampedGlobalMessage{ - {Timestamp: time.Unix(8, 0)}, - } - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) - if i != -1 { - t.Errorf("Expected -1, got %d", i) - } -} -func TestFindFirstNewMessageSeveralBefore(t *testing.T) { - CachedGlobalMessages = []TimestampedGlobalMessage{ - {Timestamp: time.Unix(1, 0)}, - {Timestamp: time.Unix(2, 0)}, - {Timestamp: time.Unix(3, 0)}, - {Timestamp: time.Unix(4, 0)}, - {Timestamp: time.Unix(5, 0)}, - } - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) - if i != -1 { - t.Errorf("Expected -1, got %d", i) - } -} -func TestFindFirstNewMessageInMiddle(t *testing.T) { - CachedGlobalMessages = []TimestampedGlobalMessage{ - {Timestamp: time.Unix(1, 0)}, - {Timestamp: time.Unix(2, 0)}, - {Timestamp: time.Unix(3, 0)}, - {Timestamp: time.Unix(4, 0)}, - {Timestamp: time.Unix(5, 0)}, - {Timestamp: time.Unix(11, 0)}, - {Timestamp: time.Unix(12, 0)}, - {Timestamp: time.Unix(13, 0)}, - {Timestamp: time.Unix(14, 0)}, - {Timestamp: time.Unix(15, 0)}, - } - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) - if i != 5 { - t.Errorf("Expected 5, got %d", i) - } -} -func TestFindFirstNewMessageOneAfter(t *testing.T) { - CachedGlobalMessages = []TimestampedGlobalMessage{ - {Timestamp: time.Unix(15, 0)}, - } - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) - if i != 0 { - t.Errorf("Expected 0, got %d", i) - } -} -func TestFindFirstNewMessageSeveralAfter(t *testing.T) { - CachedGlobalMessages = []TimestampedGlobalMessage{ - {Timestamp: time.Unix(11, 0)}, - {Timestamp: time.Unix(12, 0)}, - {Timestamp: time.Unix(13, 0)}, - {Timestamp: time.Unix(14, 0)}, - {Timestamp: time.Unix(15, 0)}, - } - i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) - if i != 0 { - t.Errorf("Expected 0, got %d", i) - } -} diff --git a/socketserver/internal/server/commands.go b/socketserver/internal/server/commands.go index 00adc88a..9f8fed86 100644 --- a/socketserver/internal/server/commands.go +++ b/socketserver/internal/server/commands.go @@ -12,8 +12,30 @@ import ( "time" ) -var ResponseSuccess = ClientMessage{Command: SuccessCommand} -var ResponseFailure = ClientMessage{Command: "False"} +// A command is how the client refers to a function on the server. It's just a string. +type Command string + +// A function that is called to respond to a Command. +type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) + +var CommandHandlers = map[Command]CommandHandler{ + HelloCommand: HandleHello, + "setuser": HandleSetUser, + "ready": HandleReady, + + "sub": HandleSub, + "unsub": HandleUnsub, + + "track_follow": HandleTrackFollow, + "emoticon_uses": HandleEmoticonUses, + "survey": HandleSurvey, + + "twitch_emote": HandleRemoteCommand, + "get_link": HandleBunchedRemoteCommand, + "get_display_name": HandleBunchedRemoteCommand, + "update_follow_buttons": HandleRemoteCommand, + "chat_history": HandleRemoteCommand, +} const ChannelInfoDelay = 2 * time.Second @@ -135,7 +157,7 @@ func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rms client.Mutex.Unlock() - SubscribeChat(client, channel) + SubscribeChannel(client, channel) return ResponseSuccess, nil } diff --git a/socketserver/internal/server/handlecore.go b/socketserver/internal/server/handlecore.go index 526111bb..d0d0c694 100644 --- a/socketserver/internal/server/handlecore.go +++ b/socketserver/internal/server/handlecore.go @@ -18,31 +18,6 @@ import ( const MAX_PACKET_SIZE = 1024 -// A command is how the client refers to a function on the server. It's just a string. -type Command string - -// A function that is called to respond to a Command. -type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) - -var CommandHandlers = map[Command]CommandHandler{ - HelloCommand: HandleHello, - "setuser": HandleSetUser, - "ready": HandleReady, - - "sub": HandleSub, - "unsub": HandleUnsub, - - "track_follow": HandleTrackFollow, - "emoticon_uses": HandleEmoticonUses, - "survey": HandleSurvey, - - "twitch_emote": HandleRemoteCommand, - "get_link": HandleBunchedRemoteCommand, - "get_display_name": HandleBunchedRemoteCommand, - "update_follow_buttons": HandleRemoteCommand, - "chat_history": HandleRemoteCommand, -} - // Sent by the server in ClientMessage.Command to indicate success. const SuccessCommand Command = "ok" @@ -58,28 +33,11 @@ const AuthorizeCommand Command = "do_authorize" // It signals that the work has been handed off to a background goroutine. const AsyncResponseCommand Command = "_async" -var SocketUpgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - return r.Header.Get("Origin") == "http://www.twitch.tv" - }, -} - -// Errors that get returned to the client. -var ProtocolError error = errors.New("FFZ Socket protocol error.") -var ProtocolErrorNegativeID error = errors.New("FFZ Socket protocol error: negative or zero message ID.") -var ExpectedSingleString = errors.New("Error: Expected single string as arguments.") -var ExpectedSingleInt = errors.New("Error: Expected single integer as arguments.") -var ExpectedTwoStrings = errors.New("Error: Expected array of string, string as arguments.") -var ExpectedStringAndInt = errors.New("Error: Expected array of string, int as arguments.") -var ExpectedStringAndBool = errors.New("Error: Expected array of string, bool as arguments.") -var ExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.") +var ResponseSuccess = ClientMessage{Command: SuccessCommand} +var ResponseFailure = ClientMessage{Command: "False"} var Configuation *ConfigFile -var BannerHTML []byte - // Set up a websocket listener and register it on /. // (Uses http.DefaultServeMux .) func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { @@ -98,9 +56,9 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { BannerHTML = bannerBytes serveMux.HandleFunc("/", ServeWebsocketOrCatbag) - serveMux.HandleFunc("/pub_msg", HBackendPublishRequest) - serveMux.HandleFunc("/dump_backlog", HBackendDumpBacklog) - serveMux.HandleFunc("/update_and_pub", HBackendUpdateAndPublish) + serveMux.HandleFunc("/drop_backlog", HBackendDropBacklog) + serveMux.HandleFunc("/uncached_pub", HBackendPublishRequest) + serveMux.HandleFunc("/cached_pub", HBackendUpdateAndPublish) announceForm, err := SealRequest(url.Values{ "startup": []string{"1"}, @@ -123,6 +81,16 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { go ircConnection() } +var SocketUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return r.Header.Get("Origin") == "http://www.twitch.tv" + }, +} + +var BannerHTML []byte + func ServeWebsocketOrCatbag(w http.ResponseWriter, r *http.Request) { if r.Header.Get("Connection") == "Upgrade" { conn, err := SocketUpgrader.Upgrade(w, r, nil) @@ -138,6 +106,16 @@ func ServeWebsocketOrCatbag(w http.ResponseWriter, r *http.Request) { } } +// Errors that get returned to the client. +var ProtocolError error = errors.New("FFZ Socket protocol error.") +var ProtocolErrorNegativeID error = errors.New("FFZ Socket protocol error: negative or zero message ID.") +var ExpectedSingleString = errors.New("Error: Expected single string as arguments.") +var ExpectedSingleInt = errors.New("Error: Expected single integer as arguments.") +var ExpectedTwoStrings = errors.New("Error: Expected array of string, string as arguments.") +var ExpectedStringAndInt = errors.New("Error: Expected array of string, int as arguments.") +var ExpectedStringAndBool = errors.New("Error: Expected array of string, bool as arguments.") +var ExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.") + var CloseGotBinaryMessage = websocket.CloseError{Code: websocket.CloseUnsupportedData, Text: "got binary packet"} var CloseGotMessageId0 = websocket.CloseError{Code: websocket.ClosePolicyViolation, Text: "got messageid 0"} var CloseTimedOut = websocket.CloseError{Code: websocket.CloseNoStatusReceived, Text: "no ping replies for 5 minutes"} diff --git a/socketserver/internal/server/publisher.go b/socketserver/internal/server/publisher.go index 92b14f68..a90efb1c 100644 --- a/socketserver/internal/server/publisher.go +++ b/socketserver/internal/server/publisher.go @@ -1,188 +1,392 @@ package server -// This is the scariest code I've written yet for the server. -// If I screwed up the locking, I won't know until it's too late. - import ( - "log" + "errors" + "fmt" + "net/http" + "sort" + "strconv" + "strings" "sync" "time" ) -type SubscriberList struct { - sync.RWMutex - Members []chan<- ClientMessage +type PushCommandCacheInfo struct { + Caching BacklogCacheType + Target MessageTargetType } -var ChatSubscriptionInfo map[string]*SubscriberList = make(map[string]*SubscriberList) -var ChatSubscriptionLock sync.RWMutex -var GlobalSubscriptionInfo SubscriberList +// this value is just docs right now +var ServerInitiatedCommands = map[Command]PushCommandCacheInfo{ + /// Global updates & notices + "update_news": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global + "message": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global + "reload_ff": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global -func PublishToChat(channel string, msg ClientMessage) (count int) { - ChatSubscriptionLock.RLock() - list := ChatSubscriptionInfo[channel] - if list != nil { - list.RLock() - for _, msgChan := range list.Members { - msgChan <- msg - count++ + /// Emote updates + "reload_badges": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global + "set_badge": {CacheTypeTimestamps, MsgTargetTypeMultichat}, // timecache:multichat + "reload_set": {}, // timecache:multichat + "load_set": {}, // TODO what are the semantics of this? + + /// User auth + "do_authorize": {CacheTypeNever, MsgTargetTypeSingle}, // nocache:single + + /// Channel data + // follow_sets: extra emote sets included in the chat + // follow_buttons: extra follow buttons below the stream + "follow_sets": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:chat + "follow_buttons": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:watching + "srl_race": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching + + /// Chatter/viewer counts + "chatters": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching + "viewers": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching +} + +type BacklogCacheType int + +const ( + // This is not a cache type. + CacheTypeInvalid BacklogCacheType = iota + // This message cannot be cached. + CacheTypeNever + // Save the last 24 hours of this message. + // If a client indicates that it has reconnected, replay the messages sent after the disconnect. + // Do not replay if the client indicates that this is a firstload. + CacheTypeTimestamps + // Save only the last copy of this message, and always send it when the backlog is requested. + CacheTypeLastOnly + // Save this backlog data to disk with its timestamp. + // Send it when the backlog is requested, or after a reconnect if it was updated. + CacheTypePersistent +) + +type MessageTargetType int + +const ( + // This is not a message target. + MsgTargetTypeInvalid MessageTargetType = iota + // This message is targeted to a single TODO(user or connection) + MsgTargetTypeSingle + // This message is targeted to all users in a chat + MsgTargetTypeChat + // This message is targeted to all users in multiple chats + MsgTargetTypeMultichat + // This message is sent to all FFZ users. + MsgTargetTypeGlobal +) + +// note: see types.go for methods on these + +// Returned by BacklogCacheType.UnmarshalJSON() +var ErrorUnrecognizedCacheType = errors.New("Invalid value for cachetype") + +// Returned by MessageTargetType.UnmarshalJSON() +var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target") + +type TimestampedGlobalMessage struct { + Timestamp time.Time + Command Command + Data string +} + +type TimestampedMultichatMessage struct { + Timestamp time.Time + Channels []string + Command Command + Data string +} + +type LastSavedMessage struct { + Timestamp time.Time + Data string +} + +// map is command -> channel -> data + +// CacheTypeLastOnly. Cleaned up by reaper goroutine every ~hour. +var CachedLastMessages map[Command]map[string]LastSavedMessage +var CachedLSMLock sync.RWMutex + +// CacheTypePersistent. Never cleaned. +var PersistentLastMessages map[Command]map[string]LastSavedMessage +var PersistentLSMLock sync.RWMutex + +var CachedGlobalMessages []TimestampedGlobalMessage +var CachedChannelMessages []TimestampedMultichatMessage +var CacheListsLock sync.RWMutex + +func DumpCache() { + CachedLSMLock.Lock() + CachedLastMessages = make(map[Command]map[string]LastSavedMessage) + CachedLSMLock.Unlock() + + PersistentLSMLock.Lock() + PersistentLastMessages = make(map[Command]map[string]LastSavedMessage) + PersistentLSMLock.Unlock() + + CacheListsLock.Lock() + CachedGlobalMessages = make(tgmarray, 0) + CachedChannelMessages = make(tmmarray, 0) + CacheListsLock.Unlock() +} + +func SendBacklogForNewClient(client *ClientInfo) { + client.Mutex.Lock() // reading CurrentChannels + PersistentLSMLock.RLock() + for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypePersistent, MsgTargetTypeChat}) { + chanMap := CachedLastMessages[cmd] + if chanMap == nil { + continue } - list.RUnlock() - } - ChatSubscriptionLock.RUnlock() - return -} - -func PublishToMultiple(channels []string, msg ClientMessage) (count int) { - found := make(map[chan<- ClientMessage]struct{}) - - ChatSubscriptionLock.RLock() - - for _, channel := range channels { - list := ChatSubscriptionInfo[channel] - if list != nil { - list.RLock() - for _, msgChan := range list.Members { - found[msgChan] = struct{}{} + for _, channel := range client.CurrentChannels { + msg, ok := chanMap[channel] + if ok { + msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data} + msg.parseOrigArguments() + client.MessageChannel <- msg } - list.RUnlock() } } + PersistentLSMLock.RUnlock() - ChatSubscriptionLock.RUnlock() - - for msgChan, _ := range found { - msgChan <- msg - count++ - } - return -} - -func PublishToAll(msg ClientMessage) (count int) { - GlobalSubscriptionInfo.RLock() - for _, msgChan := range GlobalSubscriptionInfo.Members { - msgChan <- msg - count++ - } - GlobalSubscriptionInfo.RUnlock() - return -} - -// Add a channel to the subscriptions while holding a read-lock to the map. -// Locks: -// - ALREADY HOLDING a read-lock to the 'which' top-level map via the rlocker object -// - possible write lock to the 'which' top-level map via the wlocker object -// - write lock to SubscriptionInfo (if not creating new) -func _subscribeWhileRlocked(channelName string, value chan<- ClientMessage) { - list := ChatSubscriptionInfo[channelName] - if list == nil { - // Not found, so create it - ChatSubscriptionLock.RUnlock() - ChatSubscriptionLock.Lock() - list = &SubscriberList{} - list.Members = []chan<- ClientMessage{value} // Create it populated, to avoid reaper - ChatSubscriptionInfo[channelName] = list - ChatSubscriptionLock.Unlock() - - go func(topic string) { - err := SendNewTopicNotice(topic) - if err != nil { - log.Println("error reporting new sub:", err) + CachedLSMLock.RLock() + for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}) { + chanMap := CachedLastMessages[cmd] + if chanMap == nil { + continue + } + for _, channel := range client.CurrentChannels { + msg, ok := chanMap[channel] + if ok { + msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data} + msg.parseOrigArguments() + client.MessageChannel <- msg } - }(channelName) - - ChatSubscriptionLock.RLock() - } else { - list.Lock() - AddToSliceC(&list.Members, value) - list.Unlock() - } -} - -func SubscribeGlobal(client *ClientInfo) { - GlobalSubscriptionInfo.Lock() - AddToSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel) - GlobalSubscriptionInfo.Unlock() -} - -func SubscribeChat(client *ClientInfo, channelName string) { - ChatSubscriptionLock.RLock() - _subscribeWhileRlocked(channelName, client.MessageChannel) - ChatSubscriptionLock.RUnlock() -} - -func unsubscribeAllClients() { - GlobalSubscriptionInfo.Lock() - GlobalSubscriptionInfo.Members = nil - GlobalSubscriptionInfo.Unlock() - ChatSubscriptionLock.Lock() - ChatSubscriptionInfo = make(map[string]*SubscriberList) - ChatSubscriptionLock.Unlock() -} - -// Unsubscribe the client from all channels, AND clear the CurrentChannels / WatchingChannels fields. -// Locks: -// - read lock to top-level maps -// - write lock to SubscriptionInfos -// - write lock to ClientInfo -func UnsubscribeAll(client *ClientInfo) { - client.Mutex.Lock() - client.PendingSubscriptionsBacklog = nil - client.PendingSubscriptionsBacklog = nil - client.Mutex.Unlock() - - GlobalSubscriptionInfo.Lock() - RemoveFromSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel) - GlobalSubscriptionInfo.Unlock() - - ChatSubscriptionLock.RLock() - client.Mutex.Lock() - for _, v := range client.CurrentChannels { - list := ChatSubscriptionInfo[v] - if list != nil { - list.Lock() - RemoveFromSliceC(&list.Members, client.MessageChannel) - list.Unlock() } } - client.CurrentChannels = nil + CachedLSMLock.RUnlock() client.Mutex.Unlock() - ChatSubscriptionLock.RUnlock() } -func UnsubscribeSingleChat(client *ClientInfo, channelName string) { - ChatSubscriptionLock.RLock() - list := ChatSubscriptionInfo[channelName] - if list != nil { - list.Lock() - RemoveFromSliceC(&list.Members, client.MessageChannel) - list.Unlock() +func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) { + client.Mutex.Lock() // reading CurrentChannels + CacheListsLock.RLock() + + globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime) + + if globIdx != -1 { + for i := globIdx; i < len(CachedGlobalMessages); i++ { + item := CachedGlobalMessages[i] + msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data} + msg.parseOrigArguments() + client.MessageChannel <- msg + } } - ChatSubscriptionLock.RUnlock() + + chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime) + + if chanIdx != -1 { + for i := chanIdx; i < len(CachedChannelMessages); i++ { + item := CachedChannelMessages[i] + var send bool + for _, channel := range item.Channels { + for _, matchChannel := range client.CurrentChannels { + if channel == matchChannel { + send = true + break + } + } + if send { + break + } + } + if send { + msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data} + msg.parseOrigArguments() + client.MessageChannel <- msg + } + } + } + + CacheListsLock.RUnlock() + client.Mutex.Unlock() } -const ReapingDelay = 20 * time.Minute - -// Checks ChatSubscriptionInfo for entries with no subscribers every ReapingDelay. -// Started from SetupServer(). -func pubsubJanitor() { +func backlogJanitor() { for { - time.Sleep(ReapingDelay) - var cleanedUp = make([]string, 0, 6) - ChatSubscriptionLock.Lock() - for key, val := range ChatSubscriptionInfo { - if val == nil || len(val.Members) == 0 { - delete(ChatSubscriptionInfo, key) - cleanedUp = append(cleanedUp, key) - } - } - ChatSubscriptionLock.Unlock() + time.Sleep(1 * time.Hour) + CleanupTimedBacklogMessages() + } +} - if len(cleanedUp) != 0 { - err := SendCleanupTopicsNotice(cleanedUp) - if err != nil { - log.Println("error reporting cleaned subs:", err) - } +func CleanupTimedBacklogMessages() { + CacheListsLock.Lock() + oneHourAgo := time.Now().Add(-24 * time.Hour) + globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), oneHourAgo) + if globIdx != -1 { + newGlobMsgs := make([]TimestampedGlobalMessage, len(CachedGlobalMessages)-globIdx) + copy(newGlobMsgs, CachedGlobalMessages[globIdx:]) + CachedGlobalMessages = newGlobMsgs + } + chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), oneHourAgo) + if chanIdx != -1 { + newChanMsgs := make([]TimestampedMultichatMessage, len(CachedChannelMessages)-chanIdx) + copy(newChanMsgs, CachedChannelMessages[chanIdx:]) + CachedChannelMessages = newChanMsgs + } + CacheListsLock.Unlock() +} + +func InsertionSort(ary sort.Interface) { + for i := 1; i < ary.Len(); i++ { + for j := i; j > 0 && ary.Less(j, j-1); j-- { + ary.Swap(j, j-1) } } } + +type TimestampArray interface { + Len() int + GetTime(int) time.Time +} + +func FindFirstNewMessage(ary TimestampArray, disconnectTime time.Time) (idx int) { + // TODO needs tests + len := ary.Len() + i := len + + // Walk backwards until we find GetTime() before disconnectTime + step := 1 + for i > 0 { + i -= step + if i < 0 { + i = 0 + } + if !ary.GetTime(i).After(disconnectTime) { + break + } + step = int(float64(step)*1.5) + 1 + } + + // Walk forwards until we find GetTime() after disconnectTime + for i < len && !ary.GetTime(i).After(disconnectTime) { + i++ + } + + if i == len { + return -1 + } + return i +} + +func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync.Locker, cmd Command, channel string, timestamp time.Time, data string, deleting bool) { + locker.Lock() + defer locker.Unlock() + + chanMap, ok := CachedLastMessages[cmd] + if !ok { + if deleting { + return + } + chanMap = make(map[string]LastSavedMessage) + CachedLastMessages[cmd] = chanMap + } + + if deleting { + delete(chanMap, channel) + } else { + chanMap[channel] = LastSavedMessage{timestamp, data} + } +} + +func SaveGlobalMessage(cmd Command, timestamp time.Time, data string) { + CacheListsLock.Lock() + CachedGlobalMessages = append(CachedGlobalMessages, TimestampedGlobalMessage{timestamp, cmd, data}) + InsertionSort(tgmarray(CachedGlobalMessages)) + CacheListsLock.Unlock() +} + +func SaveMultichanMessage(cmd Command, channels string, timestamp time.Time, data string) { + CacheListsLock.Lock() + CachedChannelMessages = append(CachedChannelMessages, TimestampedMultichatMessage{timestamp, strings.Split(channels, ","), cmd, data}) + InsertionSort(tmmarray(CachedChannelMessages)) + CacheListsLock.Unlock() +} + +func GetCommandsOfType(match PushCommandCacheInfo) []Command { + var ret []Command + for cmd, info := range ServerInitiatedCommands { + if info == match { + ret = append(ret, cmd) + } + } + return ret +} + +func HBackendDropBacklog(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + formData, err := UnsealRequest(r.Form) + if err != nil { + w.WriteHeader(403) + fmt.Fprintf(w, "Error: %v", err) + return + } + + confirm := formData.Get("confirm") + if confirm == "1" { + DumpCache() + } +} + +// Publish a message to clients, and update the in-server cache for the message. +// notes: +// `scope` is implicit in the command +func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + formData, err := UnsealRequest(r.Form) + if err != nil { + w.WriteHeader(403) + fmt.Fprintf(w, "Error: %v", err) + return + } + + cmd := Command(formData.Get("cmd")) + json := formData.Get("args") + channel := formData.Get("channel") + deleteMode := formData.Get("delete") != "" + timeStr := formData.Get("time") + timestamp, err := time.Parse(time.UnixDate, timeStr) + if err != nil { + w.WriteHeader(422) + fmt.Fprintf(w, "error parsing time: %v", err) + } + + cacheinfo, ok := ServerInitiatedCommands[cmd] + if !ok { + w.WriteHeader(422) + fmt.Fprintf(w, "Caching semantics unknown for command '%s'. Post to /addcachedcommand first.", cmd) + return + } + + var count int + msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: json} + msg.parseOrigArguments() + + if cacheinfo.Caching == CacheTypeLastOnly && cacheinfo.Target == MsgTargetTypeChat { + SaveLastMessage(CachedLastMessages, &CachedLSMLock, cmd, channel, timestamp, json, deleteMode) + count = PublishToChannel(channel, msg) + } else if cacheinfo.Caching == CacheTypePersistent && cacheinfo.Target == MsgTargetTypeChat { + SaveLastMessage(PersistentLastMessages, &PersistentLSMLock, cmd, channel, timestamp, json, deleteMode) + count = PublishToChannel(channel, msg) + } else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeMultichat { + SaveMultichanMessage(cmd, channel, timestamp, json) + count = PublishToMultiple(strings.Split(channel, ","), msg) + } else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeGlobal { + SaveGlobalMessage(cmd, timestamp, json) + count = PublishToAll(msg) + } + + w.Write([]byte(strconv.Itoa(count))) +} diff --git a/socketserver/internal/server/publisher_test.go b/socketserver/internal/server/publisher_test.go index 7c8fbb89..99ce4f5a 100644 --- a/socketserver/internal/server/publisher_test.go +++ b/socketserver/internal/server/publisher_test.go @@ -1,448 +1,80 @@ package server import ( - "encoding/json" - "fmt" - "github.com/gorilla/websocket" - "github.com/satori/go.uuid" - "io/ioutil" - "net/http" - "net/http/httptest" - "net/url" - "os" - "strconv" - "sync" - "syscall" "testing" "time" ) -func TCountOpenFDs() uint64 { - ary, _ := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())) - return uint64(len(ary)) +func TestCleanupBacklogMessages(t *testing.T) { + } -const IgnoreReceivedArguments = 1 + 2i - -func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) { - var msg ClientMessage - var fail bool - messageType, packet, err := conn.ReadMessage() - if err != nil { - tb.Error(err) - return msg, false - } - if messageType != websocket.TextMessage { - tb.Error("got non-text message", packet) - return msg, false - } - - err = UnmarshalClientMessage(packet, messageType, &msg) - if err != nil { - tb.Error(err) - return msg, false - } - if msg.MessageID != messageId { - tb.Error("Message ID was wrong. Expected", messageId, ", got", msg.MessageID, ":", msg) - fail = true - } - if msg.Command != command { - tb.Error("Command was wrong. Expected", command, ", got", msg.Command, ":", msg) - fail = true - } - if arguments != IgnoreReceivedArguments { - if arguments == nil { - if msg.origArguments != "" { - tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg) - } - } else { - argBytes, _ := json.Marshal(arguments) - if msg.origArguments != string(argBytes) { - tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg) - } - } - } - return msg, !fail -} - -func TSendMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) bool { - SendMessage(conn, ClientMessage{MessageID: messageId, Command: command, Arguments: arguments}) - return true -} - -func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments interface{}, deleteMode bool) (url.Values, error) { - form := url.Values{} - form.Set("cmd", string(cmd)) - argsBytes, err := json.Marshal(arguments) - if err != nil { - tb.Error(err) - return nil, err - } - form.Set("args", string(argsBytes)) - form.Set("channel", channel) - if deleteMode { - form.Set("delete", "1") - } - form.Set("time", time.Now().Format(time.UnixDate)) - - sealed, err := SealRequest(form) - if err != nil { - tb.Error(err) - return nil, err - } - return sealed, nil -} - -func TCheckResponse(tb testing.TB, resp *http.Response, expected string) bool { - var failed bool - respBytes, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - respStr := string(respBytes) - - if err != nil { - tb.Error(err) - failed = true - } - - if resp.StatusCode != 200 { - tb.Error("Publish failed: ", resp.StatusCode, respStr) - failed = true - } - - if respStr != expected { - tb.Errorf("Got wrong response from server. Expected: '%s' Got: '%s'", expected, respStr) - failed = true - } - return !failed -} - -type TURLs struct { - Websocket string - Origin string - PubMsg string - SavePubMsg string // update_and_pub -} - -func TGetUrls(testserver *httptest.Server) TURLs { - addr := testserver.Listener.Addr().String() - return TURLs{ - Websocket: fmt.Sprintf("ws://%s/", addr), - Origin: fmt.Sprintf("http://%s", addr), - PubMsg: fmt.Sprintf("http://%s/pub_msg", addr), - SavePubMsg: fmt.Sprintf("http://%s/update_and_pub", addr), +func TestFindFirstNewMessageEmpty(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{} + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != -1 { + t.Errorf("Expected -1, got %d", i) } } - -func TSetup(testserver **httptest.Server, urls *TURLs) { - DumpCache() - - conf := &ConfigFile{ - ServerId: 20, - UseSSL: false, - SocketOrigin: "localhost:2002", - BannerHTML: ` - -CatBag - -
-
-
-
-
-
- A FrankerFaceZ Service - — CatBag by Wolsk -
-
-`, - OurPublicKey: []byte{176, 149, 72, 209, 35, 42, 110, 220, 22, 236, 212, 129, 213, 199, 1, 227, 185, 167, 150, 159, 117, 202, 164, 100, 9, 107, 45, 141, 122, 221, 155, 73}, - OurPrivateKey: []byte{247, 133, 147, 194, 70, 240, 211, 216, 223, 16, 241, 253, 120, 14, 198, 74, 237, 180, 89, 33, 146, 146, 140, 58, 88, 160, 2, 246, 112, 35, 239, 87}, - BackendPublicKey: []byte{19, 163, 37, 157, 50, 139, 193, 85, 229, 47, 166, 21, 153, 231, 31, 133, 41, 158, 8, 53, 73, 0, 113, 91, 13, 181, 131, 248, 176, 18, 1, 107}, +func TestFindFirstNewMessageOneBefore(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{ + {Timestamp: time.Unix(8, 0)}, } - gconfig = conf - SetupBackend(conf) - - if testserver != nil { - serveMux := http.NewServeMux() - SetupServerAndHandle(conf, serveMux) - - tserv := httptest.NewUnstartedServer(serveMux) - *testserver = tserv - tserv.Start() - if urls != nil { - *urls = TGetUrls(tserv) - } + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != -1 { + t.Errorf("Expected -1, got %d", i) } } - -func TestSubscriptionAndPublish(t *testing.T) { - var doneWg sync.WaitGroup - var readyWg sync.WaitGroup - - const TestChannelName1 = "room.testchannel" - const TestChannelName2 = "room.chan2" - const TestChannelName3 = "room.chan3" - const TestChannelNameUnused = "room.empty" - const TestCommandChan = "testdata_single" - const TestCommandMulti = "testdata_multi" - const TestCommandGlobal = "testdata_global" - const TestData1 = "123456789" - const TestData2 = 42 - const TestData3 = false - var TestData4 = []interface{}{"str1", "str2", "str3"} - - ServerInitiatedCommands[TestCommandChan] = PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat} - ServerInitiatedCommands[TestCommandMulti] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeMultichat} - ServerInitiatedCommands[TestCommandGlobal] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeGlobal} - - var server *httptest.Server - var urls TURLs - TSetup(&server, &urls) - defer server.CloseClientConnections() - defer unsubscribeAllClients() - - var conn *websocket.Conn - var resp *http.Response - var err error - - // client 1: sub ch1, ch2 - // client 2: sub ch1, ch3 - // client 3: sub none - // client 4: delayed sub ch1 - // msg 1: ch1 - // msg 2: ch2, ch3 - // msg 3: chEmpty - // msg 4: global - - // Client 1 - conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) - if err != nil { - t.Error(err) - return +func TestFindFirstNewMessageSeveralBefore(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{ + {Timestamp: time.Unix(1, 0)}, + {Timestamp: time.Unix(2, 0)}, + {Timestamp: time.Unix(3, 0)}, + {Timestamp: time.Unix(4, 0)}, + {Timestamp: time.Unix(5, 0)}, } - - doneWg.Add(1) - readyWg.Add(1) - go func(conn *websocket.Conn) { - TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) - TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) - TSendMessage(t, conn, 2, "sub", TestChannelName1) - TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) - TSendMessage(t, conn, 3, "sub", TestChannelName2) // 2 - TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil) - TSendMessage(t, conn, 4, "ready", 0) - TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil) - - readyWg.Done() - - TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1) - TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2) - TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4) - - conn.Close() - doneWg.Done() - }(conn) - - // Client 2 - conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) - if err != nil { - t.Error(err) - return + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != -1 { + t.Errorf("Expected -1, got %d", i) } - - doneWg.Add(1) - readyWg.Add(1) - go func(conn *websocket.Conn) { - TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) - TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) - TSendMessage(t, conn, 2, "sub", TestChannelName1) - TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) - TSendMessage(t, conn, 3, "sub", TestChannelName3) // 3 - TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil) - TSendMessage(t, conn, 4, "ready", 0) - TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil) - - readyWg.Done() - - TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1) - TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2) - TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4) - - conn.Close() - doneWg.Done() - }(conn) - - // Client 3 - conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) - if err != nil { - t.Error(err) - return - } - - doneWg.Add(1) - readyWg.Add(1) - go func(conn *websocket.Conn) { - TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) - TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) - TSendMessage(t, conn, 2, "ready", 0) - TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) - - readyWg.Done() - - TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4) - - conn.Close() - doneWg.Done() - }(conn) - - // Wait for clients 1-3 - readyWg.Wait() - - var form url.Values - - // Publish message 1 - should go to clients 1, 2 - - form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelName1, TestData1, false) - if err != nil { - t.FailNow() - } - resp, err = http.PostForm(urls.SavePubMsg, form) - if !TCheckResponse(t, resp, strconv.Itoa(2)) { - t.FailNow() - } - - // Publish message 2 - should go to clients 1, 2 - - form, err = TSealForSavePubMsg(t, TestCommandMulti, TestChannelName2+","+TestChannelName3, TestData2, false) - if err != nil { - t.FailNow() - } - resp, err = http.PostForm(urls.SavePubMsg, form) - if !TCheckResponse(t, resp, strconv.Itoa(2)) { - t.FailNow() - } - - // Publish message 3 - should go to no clients - - form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelNameUnused, TestData3, false) - if err != nil { - t.FailNow() - } - resp, err = http.PostForm(urls.SavePubMsg, form) - if !TCheckResponse(t, resp, strconv.Itoa(0)) { - t.FailNow() - } - - // Publish message 4 - should go to clients 1, 2, 3 - - form, err = TSealForSavePubMsg(t, TestCommandGlobal, "", TestData4, false) - if err != nil { - t.FailNow() - } - resp, err = http.PostForm(urls.SavePubMsg, form) - if !TCheckResponse(t, resp, strconv.Itoa(3)) { - t.FailNow() - } - - // Start client 4 - conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) - if err != nil { - t.Error(err) - return - } - - doneWg.Add(1) - readyWg.Add(1) - go func(conn *websocket.Conn) { - TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) - TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) - TSendMessage(t, conn, 2, "sub", TestChannelName1) - TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) - TSendMessage(t, conn, 3, "ready", 0) - TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil) - - // backlog message - TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1) - - readyWg.Done() - - conn.Close() - doneWg.Done() - }(conn) - - readyWg.Wait() - - doneWg.Wait() - server.Close() } - -func BenchmarkUserSubscriptionSinglePublish(b *testing.B) { - var doneWg sync.WaitGroup - var readyWg sync.WaitGroup - - const TestChannelName = "room.testchannel" - const TestCommand = "testdata" - const TestData = "123456789" - - message := ClientMessage{MessageID: -1, Command: "testdata", Arguments: TestData} - - fmt.Println() - fmt.Println(b.N) - - var limit syscall.Rlimit - syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit) - - limit.Cur = TCountOpenFDs() + uint64(b.N)*2 + 100 - - if limit.Cur > limit.Max { - b.Skip("Open file limit too low") - return +func TestFindFirstNewMessageInMiddle(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{ + {Timestamp: time.Unix(1, 0)}, + {Timestamp: time.Unix(2, 0)}, + {Timestamp: time.Unix(3, 0)}, + {Timestamp: time.Unix(4, 0)}, + {Timestamp: time.Unix(5, 0)}, + {Timestamp: time.Unix(11, 0)}, + {Timestamp: time.Unix(12, 0)}, + {Timestamp: time.Unix(13, 0)}, + {Timestamp: time.Unix(14, 0)}, + {Timestamp: time.Unix(15, 0)}, } - - syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit) - - var server *httptest.Server - var urls TURLs - TSetup(&server, &urls) - defer unsubscribeAllClients() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - conn, _, err := websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) - if err != nil { - b.Error(err) - break - } - doneWg.Add(1) - readyWg.Add(1) - go func(i int, conn *websocket.Conn) { - TSendMessage(b, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) - TSendMessage(b, conn, 2, "sub", TestChannelName) - - TReceiveExpectedMessage(b, conn, 1, SuccessCommand, IgnoreReceivedArguments) - TReceiveExpectedMessage(b, conn, 2, SuccessCommand, nil) - - readyWg.Done() - - TReceiveExpectedMessage(b, conn, -1, TestCommand, TestData) - - conn.Close() - doneWg.Done() - }(i, conn) + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != 5 { + t.Errorf("Expected 5, got %d", i) + } +} +func TestFindFirstNewMessageOneAfter(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{ + {Timestamp: time.Unix(15, 0)}, + } + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != 0 { + t.Errorf("Expected 0, got %d", i) + } +} +func TestFindFirstNewMessageSeveralAfter(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{ + {Timestamp: time.Unix(11, 0)}, + {Timestamp: time.Unix(12, 0)}, + {Timestamp: time.Unix(13, 0)}, + {Timestamp: time.Unix(14, 0)}, + {Timestamp: time.Unix(15, 0)}, + } + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != 0 { + t.Errorf("Expected 0, got %d", i) } - - readyWg.Wait() - - fmt.Println("publishing...") - if PublishToChat(TestChannelName, message) != b.N { - b.Error("not enough sent") - server.CloseClientConnections() - panic("halting test instead of waiting") - } - doneWg.Wait() - fmt.Println("...done.") - - b.StopTimer() - server.Close() - server.CloseClientConnections() } diff --git a/socketserver/internal/server/subscriptions.go b/socketserver/internal/server/subscriptions.go new file mode 100644 index 00000000..338224b8 --- /dev/null +++ b/socketserver/internal/server/subscriptions.go @@ -0,0 +1,188 @@ +package server + +// This is the scariest code I've written yet for the server. +// If I screwed up the locking, I won't know until it's too late. + +import ( + "log" + "sync" + "time" +) + +type SubscriberList struct { + sync.RWMutex + Members []chan<- ClientMessage +} + +var ChatSubscriptionInfo map[string]*SubscriberList = make(map[string]*SubscriberList) +var ChatSubscriptionLock sync.RWMutex +var GlobalSubscriptionInfo SubscriberList + +func SubscribeGlobal(client *ClientInfo) { + GlobalSubscriptionInfo.Lock() + AddToSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel) + GlobalSubscriptionInfo.Unlock() +} + +func SubscribeChannel(client *ClientInfo, channelName string) { + ChatSubscriptionLock.RLock() + _subscribeWhileRlocked(channelName, client.MessageChannel) + ChatSubscriptionLock.RUnlock() +} + +func PublishToChannel(channel string, msg ClientMessage) (count int) { + ChatSubscriptionLock.RLock() + list := ChatSubscriptionInfo[channel] + if list != nil { + list.RLock() + for _, msgChan := range list.Members { + msgChan <- msg + count++ + } + list.RUnlock() + } + ChatSubscriptionLock.RUnlock() + return +} + +func PublishToMultiple(channels []string, msg ClientMessage) (count int) { + found := make(map[chan<- ClientMessage]struct{}) + + ChatSubscriptionLock.RLock() + + for _, channel := range channels { + list := ChatSubscriptionInfo[channel] + if list != nil { + list.RLock() + for _, msgChan := range list.Members { + found[msgChan] = struct{}{} + } + list.RUnlock() + } + } + + ChatSubscriptionLock.RUnlock() + + for msgChan, _ := range found { + msgChan <- msg + count++ + } + return +} + +func PublishToAll(msg ClientMessage) (count int) { + GlobalSubscriptionInfo.RLock() + for _, msgChan := range GlobalSubscriptionInfo.Members { + msgChan <- msg + count++ + } + GlobalSubscriptionInfo.RUnlock() + return +} + +func UnsubscribeSingleChat(client *ClientInfo, channelName string) { + ChatSubscriptionLock.RLock() + list := ChatSubscriptionInfo[channelName] + if list != nil { + list.Lock() + RemoveFromSliceC(&list.Members, client.MessageChannel) + list.Unlock() + } + ChatSubscriptionLock.RUnlock() +} + +// Unsubscribe the client from all channels, AND clear the CurrentChannels / WatchingChannels fields. +// Locks: +// - read lock to top-level maps +// - write lock to SubscriptionInfos +// - write lock to ClientInfo +func UnsubscribeAll(client *ClientInfo) { + client.Mutex.Lock() + client.PendingSubscriptionsBacklog = nil + client.PendingSubscriptionsBacklog = nil + client.Mutex.Unlock() + + GlobalSubscriptionInfo.Lock() + RemoveFromSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel) + GlobalSubscriptionInfo.Unlock() + + ChatSubscriptionLock.RLock() + client.Mutex.Lock() + for _, v := range client.CurrentChannels { + list := ChatSubscriptionInfo[v] + if list != nil { + list.Lock() + RemoveFromSliceC(&list.Members, client.MessageChannel) + list.Unlock() + } + } + client.CurrentChannels = nil + client.Mutex.Unlock() + ChatSubscriptionLock.RUnlock() +} + +func unsubscribeAllClients() { + GlobalSubscriptionInfo.Lock() + GlobalSubscriptionInfo.Members = nil + GlobalSubscriptionInfo.Unlock() + ChatSubscriptionLock.Lock() + ChatSubscriptionInfo = make(map[string]*SubscriberList) + ChatSubscriptionLock.Unlock() +} + +const ReapingDelay = 20 * time.Minute + +// Checks ChatSubscriptionInfo for entries with no subscribers every ReapingDelay. +// Started from SetupServer(). +func pubsubJanitor() { + for { + time.Sleep(ReapingDelay) + var cleanedUp = make([]string, 0, 6) + ChatSubscriptionLock.Lock() + for key, val := range ChatSubscriptionInfo { + if val == nil || len(val.Members) == 0 { + delete(ChatSubscriptionInfo, key) + cleanedUp = append(cleanedUp, key) + } + } + ChatSubscriptionLock.Unlock() + + if len(cleanedUp) != 0 { + err := SendCleanupTopicsNotice(cleanedUp) + if err != nil { + log.Println("error reporting cleaned subs:", err) + } + } + } +} + +// Add a channel to the subscriptions while holding a read-lock to the map. +// Locks: +// - ALREADY HOLDING a read-lock to the 'which' top-level map via the rlocker object +// - possible write lock to the 'which' top-level map via the wlocker object +// - write lock to SubscriptionInfo (if not creating new) +func _subscribeWhileRlocked(channelName string, value chan<- ClientMessage) { + list := ChatSubscriptionInfo[channelName] + if list == nil { + // Not found, so create it + ChatSubscriptionLock.RUnlock() + ChatSubscriptionLock.Lock() + list = &SubscriberList{} + list.Members = []chan<- ClientMessage{value} // Create it populated, to avoid reaper + ChatSubscriptionInfo[channelName] = list + ChatSubscriptionLock.Unlock() + + go func(topic string) { + err := SendNewTopicNotice(topic) + if err != nil { + log.Println("error reporting new sub:", err) + } + }(channelName) + + ChatSubscriptionLock.RLock() + } else { + list.Lock() + AddToSliceC(&list.Members, value) + list.Unlock() + } +} \ No newline at end of file diff --git a/socketserver/internal/server/subscriptions_test.go b/socketserver/internal/server/subscriptions_test.go new file mode 100644 index 00000000..cd0364bc --- /dev/null +++ b/socketserver/internal/server/subscriptions_test.go @@ -0,0 +1,448 @@ +package server + +import ( + "encoding/json" + "fmt" + "github.com/gorilla/websocket" + "github.com/satori/go.uuid" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strconv" + "sync" + "syscall" + "testing" + "time" +) + +func TCountOpenFDs() uint64 { + ary, _ := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())) + return uint64(len(ary)) +} + +const IgnoreReceivedArguments = 1 + 2i + +func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) { + var msg ClientMessage + var fail bool + messageType, packet, err := conn.ReadMessage() + if err != nil { + tb.Error(err) + return msg, false + } + if messageType != websocket.TextMessage { + tb.Error("got non-text message", packet) + return msg, false + } + + err = UnmarshalClientMessage(packet, messageType, &msg) + if err != nil { + tb.Error(err) + return msg, false + } + if msg.MessageID != messageId { + tb.Error("Message ID was wrong. Expected", messageId, ", got", msg.MessageID, ":", msg) + fail = true + } + if msg.Command != command { + tb.Error("Command was wrong. Expected", command, ", got", msg.Command, ":", msg) + fail = true + } + if arguments != IgnoreReceivedArguments { + if arguments == nil { + if msg.origArguments != "" { + tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg) + } + } else { + argBytes, _ := json.Marshal(arguments) + if msg.origArguments != string(argBytes) { + tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg) + } + } + } + return msg, !fail +} + +func TSendMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) bool { + SendMessage(conn, ClientMessage{MessageID: messageId, Command: command, Arguments: arguments}) + return true +} + +func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments interface{}, deleteMode bool) (url.Values, error) { + form := url.Values{} + form.Set("cmd", string(cmd)) + argsBytes, err := json.Marshal(arguments) + if err != nil { + tb.Error(err) + return nil, err + } + form.Set("args", string(argsBytes)) + form.Set("channel", channel) + if deleteMode { + form.Set("delete", "1") + } + form.Set("time", time.Now().Format(time.UnixDate)) + + sealed, err := SealRequest(form) + if err != nil { + tb.Error(err) + return nil, err + } + return sealed, nil +} + +func TCheckResponse(tb testing.TB, resp *http.Response, expected string) bool { + var failed bool + respBytes, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + respStr := string(respBytes) + + if err != nil { + tb.Error(err) + failed = true + } + + if resp.StatusCode != 200 { + tb.Error("Publish failed: ", resp.StatusCode, respStr) + failed = true + } + + if respStr != expected { + tb.Errorf("Got wrong response from server. Expected: '%s' Got: '%s'", expected, respStr) + failed = true + } + return !failed +} + +type TURLs struct { + Websocket string + Origin string + PubMsg string + SavePubMsg string // update_and_pub +} + +func TGetUrls(testserver *httptest.Server) TURLs { + addr := testserver.Listener.Addr().String() + return TURLs{ + Websocket: fmt.Sprintf("ws://%s/", addr), + Origin: fmt.Sprintf("http://%s", addr), + PubMsg: fmt.Sprintf("http://%s/pub_msg", addr), + SavePubMsg: fmt.Sprintf("http://%s/update_and_pub", addr), + } +} + +func TSetup(testserver **httptest.Server, urls *TURLs) { + DumpCache() + + conf := &ConfigFile{ + ServerId: 20, + UseSSL: false, + SocketOrigin: "localhost:2002", + BannerHTML: ` + +CatBag + +
+
+
+
+
+
+ A FrankerFaceZ Service + — CatBag by Wolsk +
+
+`, + OurPublicKey: []byte{176, 149, 72, 209, 35, 42, 110, 220, 22, 236, 212, 129, 213, 199, 1, 227, 185, 167, 150, 159, 117, 202, 164, 100, 9, 107, 45, 141, 122, 221, 155, 73}, + OurPrivateKey: []byte{247, 133, 147, 194, 70, 240, 211, 216, 223, 16, 241, 253, 120, 14, 198, 74, 237, 180, 89, 33, 146, 146, 140, 58, 88, 160, 2, 246, 112, 35, 239, 87}, + BackendPublicKey: []byte{19, 163, 37, 157, 50, 139, 193, 85, 229, 47, 166, 21, 153, 231, 31, 133, 41, 158, 8, 53, 73, 0, 113, 91, 13, 181, 131, 248, 176, 18, 1, 107}, + } + gconfig = conf + SetupBackend(conf) + + if testserver != nil { + serveMux := http.NewServeMux() + SetupServerAndHandle(conf, serveMux) + + tserv := httptest.NewUnstartedServer(serveMux) + *testserver = tserv + tserv.Start() + if urls != nil { + *urls = TGetUrls(tserv) + } + } +} + +func TestSubscriptionAndPublish(t *testing.T) { + var doneWg sync.WaitGroup + var readyWg sync.WaitGroup + + const TestChannelName1 = "room.testchannel" + const TestChannelName2 = "room.chan2" + const TestChannelName3 = "room.chan3" + const TestChannelNameUnused = "room.empty" + const TestCommandChan = "testdata_single" + const TestCommandMulti = "testdata_multi" + const TestCommandGlobal = "testdata_global" + const TestData1 = "123456789" + const TestData2 = 42 + const TestData3 = false + var TestData4 = []interface{}{"str1", "str2", "str3"} + + ServerInitiatedCommands[TestCommandChan] = PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat} + ServerInitiatedCommands[TestCommandMulti] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeMultichat} + ServerInitiatedCommands[TestCommandGlobal] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeGlobal} + + var server *httptest.Server + var urls TURLs + TSetup(&server, &urls) + defer server.CloseClientConnections() + defer unsubscribeAllClients() + + var conn *websocket.Conn + var resp *http.Response + var err error + + // client 1: sub ch1, ch2 + // client 2: sub ch1, ch3 + // client 3: sub none + // client 4: delayed sub ch1 + // msg 1: ch1 + // msg 2: ch2, ch3 + // msg 3: chEmpty + // msg 4: global + + // Client 1 + conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) + if err != nil { + t.Error(err) + return + } + + doneWg.Add(1) + readyWg.Add(1) + go func(conn *websocket.Conn) { + TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) + TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) + TSendMessage(t, conn, 2, "sub", TestChannelName1) + TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) + TSendMessage(t, conn, 3, "sub", TestChannelName2) // 2 + TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil) + TSendMessage(t, conn, 4, "ready", 0) + TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil) + + readyWg.Done() + + TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1) + TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2) + TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4) + + conn.Close() + doneWg.Done() + }(conn) + + // Client 2 + conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) + if err != nil { + t.Error(err) + return + } + + doneWg.Add(1) + readyWg.Add(1) + go func(conn *websocket.Conn) { + TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) + TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) + TSendMessage(t, conn, 2, "sub", TestChannelName1) + TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) + TSendMessage(t, conn, 3, "sub", TestChannelName3) // 3 + TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil) + TSendMessage(t, conn, 4, "ready", 0) + TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil) + + readyWg.Done() + + TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1) + TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2) + TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4) + + conn.Close() + doneWg.Done() + }(conn) + + // Client 3 + conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) + if err != nil { + t.Error(err) + return + } + + doneWg.Add(1) + readyWg.Add(1) + go func(conn *websocket.Conn) { + TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) + TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) + TSendMessage(t, conn, 2, "ready", 0) + TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) + + readyWg.Done() + + TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4) + + conn.Close() + doneWg.Done() + }(conn) + + // Wait for clients 1-3 + readyWg.Wait() + + var form url.Values + + // Publish message 1 - should go to clients 1, 2 + + form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelName1, TestData1, false) + if err != nil { + t.FailNow() + } + resp, err = http.PostForm(urls.SavePubMsg, form) + if !TCheckResponse(t, resp, strconv.Itoa(2)) { + t.FailNow() + } + + // Publish message 2 - should go to clients 1, 2 + + form, err = TSealForSavePubMsg(t, TestCommandMulti, TestChannelName2+","+TestChannelName3, TestData2, false) + if err != nil { + t.FailNow() + } + resp, err = http.PostForm(urls.SavePubMsg, form) + if !TCheckResponse(t, resp, strconv.Itoa(2)) { + t.FailNow() + } + + // Publish message 3 - should go to no clients + + form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelNameUnused, TestData3, false) + if err != nil { + t.FailNow() + } + resp, err = http.PostForm(urls.SavePubMsg, form) + if !TCheckResponse(t, resp, strconv.Itoa(0)) { + t.FailNow() + } + + // Publish message 4 - should go to clients 1, 2, 3 + + form, err = TSealForSavePubMsg(t, TestCommandGlobal, "", TestData4, false) + if err != nil { + t.FailNow() + } + resp, err = http.PostForm(urls.SavePubMsg, form) + if !TCheckResponse(t, resp, strconv.Itoa(3)) { + t.FailNow() + } + + // Start client 4 + conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) + if err != nil { + t.Error(err) + return + } + + doneWg.Add(1) + readyWg.Add(1) + go func(conn *websocket.Conn) { + TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) + TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) + TSendMessage(t, conn, 2, "sub", TestChannelName1) + TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) + TSendMessage(t, conn, 3, "ready", 0) + TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil) + + // backlog message + TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1) + + readyWg.Done() + + conn.Close() + doneWg.Done() + }(conn) + + readyWg.Wait() + + doneWg.Wait() + server.Close() +} + +func BenchmarkUserSubscriptionSinglePublish(b *testing.B) { + var doneWg sync.WaitGroup + var readyWg sync.WaitGroup + + const TestChannelName = "room.testchannel" + const TestCommand = "testdata" + const TestData = "123456789" + + message := ClientMessage{MessageID: -1, Command: "testdata", Arguments: TestData} + + fmt.Println() + fmt.Println(b.N) + + var limit syscall.Rlimit + syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit) + + limit.Cur = TCountOpenFDs() + uint64(b.N)*2 + 100 + + if limit.Cur > limit.Max { + b.Skip("Open file limit too low") + return + } + + syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit) + + var server *httptest.Server + var urls TURLs + TSetup(&server, &urls) + defer unsubscribeAllClients() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + conn, _, err := websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) + if err != nil { + b.Error(err) + break + } + doneWg.Add(1) + readyWg.Add(1) + go func(i int, conn *websocket.Conn) { + TSendMessage(b, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) + TSendMessage(b, conn, 2, "sub", TestChannelName) + + TReceiveExpectedMessage(b, conn, 1, SuccessCommand, IgnoreReceivedArguments) + TReceiveExpectedMessage(b, conn, 2, SuccessCommand, nil) + + readyWg.Done() + + TReceiveExpectedMessage(b, conn, -1, TestCommand, TestData) + + conn.Close() + doneWg.Done() + }(i, conn) + } + + readyWg.Wait() + + fmt.Println("publishing...") + if PublishToChannel(TestChannelName, message) != b.N { + b.Error("not enough sent") + server.CloseClientConnections() + panic("halting test instead of waiting") + } + doneWg.Wait() + fmt.Println("...done.") + + b.StopTimer() + server.Close() + server.CloseClientConnections() +}