1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-06-29 07:45:33 +00:00

Rename & Reorganize pub/sub

This commit is contained in:
Kane York 2015-11-08 22:34:06 -08:00
parent a4ecc790b9
commit 29eb07f58c
9 changed files with 1110 additions and 1110 deletions

View file

@ -97,7 +97,7 @@ func HBackendPublishRequest(w http.ResponseWriter, r *http.Request) {
case MsgTargetTypeSingle: case MsgTargetTypeSingle:
// TODO // TODO
case MsgTargetTypeChat: case MsgTargetTypeChat:
count = PublishToChat(channel, cm) count = PublishToChannel(channel, cm)
case MsgTargetTypeMultichat: case MsgTargetTypeMultichat:
count = PublishToMultiple(strings.Split(channel, ","), cm) count = PublishToMultiple(strings.Split(channel, ","), cm)
case MsgTargetTypeGlobal: case MsgTargetTypeGlobal:

View file

@ -1,392 +0,0 @@
package server
import (
"errors"
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
)
type PushCommandCacheInfo struct {
Caching BacklogCacheType
Target MessageTargetType
}
// this value is just docs right now
var ServerInitiatedCommands = map[Command]PushCommandCacheInfo{
/// Global updates & notices
"update_news": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
"message": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
"reload_ff": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
/// Emote updates
"reload_badges": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
"set_badge": {CacheTypeTimestamps, MsgTargetTypeMultichat}, // timecache:multichat
"reload_set": {}, // timecache:multichat
"load_set": {}, // TODO what are the semantics of this?
/// User auth
"do_authorize": {CacheTypeNever, MsgTargetTypeSingle}, // nocache:single
/// Channel data
// follow_sets: extra emote sets included in the chat
// follow_buttons: extra follow buttons below the stream
"follow_sets": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:chat
"follow_buttons": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:watching
"srl_race": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching
/// Chatter/viewer counts
"chatters": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching
"viewers": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching
}
type BacklogCacheType int
const (
// This is not a cache type.
CacheTypeInvalid BacklogCacheType = iota
// This message cannot be cached.
CacheTypeNever
// Save the last 24 hours of this message.
// If a client indicates that it has reconnected, replay the messages sent after the disconnect.
// Do not replay if the client indicates that this is a firstload.
CacheTypeTimestamps
// Save only the last copy of this message, and always send it when the backlog is requested.
CacheTypeLastOnly
// Save this backlog data to disk with its timestamp.
// Send it when the backlog is requested, or after a reconnect if it was updated.
CacheTypePersistent
)
type MessageTargetType int
const (
// This is not a message target.
MsgTargetTypeInvalid MessageTargetType = iota
// This message is targeted to a single TODO(user or connection)
MsgTargetTypeSingle
// This message is targeted to all users in a chat
MsgTargetTypeChat
// This message is targeted to all users in multiple chats
MsgTargetTypeMultichat
// This message is sent to all FFZ users.
MsgTargetTypeGlobal
)
// note: see types.go for methods on these
// Returned by BacklogCacheType.UnmarshalJSON()
var ErrorUnrecognizedCacheType = errors.New("Invalid value for cachetype")
// Returned by MessageTargetType.UnmarshalJSON()
var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target")
type TimestampedGlobalMessage struct {
Timestamp time.Time
Command Command
Data string
}
type TimestampedMultichatMessage struct {
Timestamp time.Time
Channels []string
Command Command
Data string
}
type LastSavedMessage struct {
Timestamp time.Time
Data string
}
// map is command -> channel -> data
// CacheTypeLastOnly. Cleaned up by reaper goroutine every ~hour.
var CachedLastMessages map[Command]map[string]LastSavedMessage
var CachedLSMLock sync.RWMutex
// CacheTypePersistent. Never cleaned.
var PersistentLastMessages map[Command]map[string]LastSavedMessage
var PersistentLSMLock sync.RWMutex
var CachedGlobalMessages []TimestampedGlobalMessage
var CachedChannelMessages []TimestampedMultichatMessage
var CacheListsLock sync.RWMutex
func DumpCache() {
CachedLSMLock.Lock()
CachedLastMessages = make(map[Command]map[string]LastSavedMessage)
CachedLSMLock.Unlock()
PersistentLSMLock.Lock()
PersistentLastMessages = make(map[Command]map[string]LastSavedMessage)
PersistentLSMLock.Unlock()
CacheListsLock.Lock()
CachedGlobalMessages = make(tgmarray, 0)
CachedChannelMessages = make(tmmarray, 0)
CacheListsLock.Unlock()
}
func SendBacklogForNewClient(client *ClientInfo) {
client.Mutex.Lock() // reading CurrentChannels
PersistentLSMLock.RLock()
for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypePersistent, MsgTargetTypeChat}) {
chanMap := CachedLastMessages[cmd]
if chanMap == nil {
continue
}
for _, channel := range client.CurrentChannels {
msg, ok := chanMap[channel]
if ok {
msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data}
msg.parseOrigArguments()
client.MessageChannel <- msg
}
}
}
PersistentLSMLock.RUnlock()
CachedLSMLock.RLock()
for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}) {
chanMap := CachedLastMessages[cmd]
if chanMap == nil {
continue
}
for _, channel := range client.CurrentChannels {
msg, ok := chanMap[channel]
if ok {
msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data}
msg.parseOrigArguments()
client.MessageChannel <- msg
}
}
}
CachedLSMLock.RUnlock()
client.Mutex.Unlock()
}
func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) {
client.Mutex.Lock() // reading CurrentChannels
CacheListsLock.RLock()
globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime)
if globIdx != -1 {
for i := globIdx; i < len(CachedGlobalMessages); i++ {
item := CachedGlobalMessages[i]
msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data}
msg.parseOrigArguments()
client.MessageChannel <- msg
}
}
chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime)
if chanIdx != -1 {
for i := chanIdx; i < len(CachedChannelMessages); i++ {
item := CachedChannelMessages[i]
var send bool
for _, channel := range item.Channels {
for _, matchChannel := range client.CurrentChannels {
if channel == matchChannel {
send = true
break
}
}
if send {
break
}
}
if send {
msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data}
msg.parseOrigArguments()
client.MessageChannel <- msg
}
}
}
CacheListsLock.RUnlock()
client.Mutex.Unlock()
}
func backlogJanitor() {
for {
time.Sleep(1 * time.Hour)
CleanupTimedBacklogMessages()
}
}
func CleanupTimedBacklogMessages() {
CacheListsLock.Lock()
oneHourAgo := time.Now().Add(-24 * time.Hour)
globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), oneHourAgo)
if globIdx != -1 {
newGlobMsgs := make([]TimestampedGlobalMessage, len(CachedGlobalMessages)-globIdx)
copy(newGlobMsgs, CachedGlobalMessages[globIdx:])
CachedGlobalMessages = newGlobMsgs
}
chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), oneHourAgo)
if chanIdx != -1 {
newChanMsgs := make([]TimestampedMultichatMessage, len(CachedChannelMessages)-chanIdx)
copy(newChanMsgs, CachedChannelMessages[chanIdx:])
CachedChannelMessages = newChanMsgs
}
CacheListsLock.Unlock()
}
func InsertionSort(ary sort.Interface) {
for i := 1; i < ary.Len(); i++ {
for j := i; j > 0 && ary.Less(j, j-1); j-- {
ary.Swap(j, j-1)
}
}
}
type TimestampArray interface {
Len() int
GetTime(int) time.Time
}
func FindFirstNewMessage(ary TimestampArray, disconnectTime time.Time) (idx int) {
// TODO needs tests
len := ary.Len()
i := len
// Walk backwards until we find GetTime() before disconnectTime
step := 1
for i > 0 {
i -= step
if i < 0 {
i = 0
}
if !ary.GetTime(i).After(disconnectTime) {
break
}
step = int(float64(step)*1.5) + 1
}
// Walk forwards until we find GetTime() after disconnectTime
for i < len && !ary.GetTime(i).After(disconnectTime) {
i++
}
if i == len {
return -1
}
return i
}
func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync.Locker, cmd Command, channel string, timestamp time.Time, data string, deleting bool) {
locker.Lock()
defer locker.Unlock()
chanMap, ok := CachedLastMessages[cmd]
if !ok {
if deleting {
return
}
chanMap = make(map[string]LastSavedMessage)
CachedLastMessages[cmd] = chanMap
}
if deleting {
delete(chanMap, channel)
} else {
chanMap[channel] = LastSavedMessage{timestamp, data}
}
}
func SaveGlobalMessage(cmd Command, timestamp time.Time, data string) {
CacheListsLock.Lock()
CachedGlobalMessages = append(CachedGlobalMessages, TimestampedGlobalMessage{timestamp, cmd, data})
InsertionSort(tgmarray(CachedGlobalMessages))
CacheListsLock.Unlock()
}
func SaveMultichanMessage(cmd Command, channels string, timestamp time.Time, data string) {
CacheListsLock.Lock()
CachedChannelMessages = append(CachedChannelMessages, TimestampedMultichatMessage{timestamp, strings.Split(channels, ","), cmd, data})
InsertionSort(tmmarray(CachedChannelMessages))
CacheListsLock.Unlock()
}
func GetCommandsOfType(match PushCommandCacheInfo) []Command {
var ret []Command
for cmd, info := range ServerInitiatedCommands {
if info == match {
ret = append(ret, cmd)
}
}
return ret
}
func HBackendDumpBacklog(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
formData, err := UnsealRequest(r.Form)
if err != nil {
w.WriteHeader(403)
fmt.Fprintf(w, "Error: %v", err)
return
}
confirm := formData.Get("confirm")
if confirm == "1" {
DumpCache()
}
}
// Publish a message to clients, and update the in-server cache for the message.
// notes:
// `scope` is implicit in the command
func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
formData, err := UnsealRequest(r.Form)
if err != nil {
w.WriteHeader(403)
fmt.Fprintf(w, "Error: %v", err)
return
}
cmd := Command(formData.Get("cmd"))
json := formData.Get("args")
channel := formData.Get("channel")
deleteMode := formData.Get("delete") != ""
timeStr := formData.Get("time")
timestamp, err := time.Parse(time.UnixDate, timeStr)
if err != nil {
w.WriteHeader(422)
fmt.Fprintf(w, "error parsing time: %v", err)
}
cacheinfo, ok := ServerInitiatedCommands[cmd]
if !ok {
w.WriteHeader(422)
fmt.Fprintf(w, "Caching semantics unknown for command '%s'. Post to /addcachedcommand first.", cmd)
return
}
var count int
msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: json}
msg.parseOrigArguments()
if cacheinfo.Caching == CacheTypeLastOnly && cacheinfo.Target == MsgTargetTypeChat {
SaveLastMessage(CachedLastMessages, &CachedLSMLock, cmd, channel, timestamp, json, deleteMode)
count = PublishToChat(channel, msg)
} else if cacheinfo.Caching == CacheTypePersistent && cacheinfo.Target == MsgTargetTypeChat {
SaveLastMessage(PersistentLastMessages, &PersistentLSMLock, cmd, channel, timestamp, json, deleteMode)
count = PublishToChat(channel, msg)
} else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeMultichat {
SaveMultichanMessage(cmd, channel, timestamp, json)
count = PublishToMultiple(strings.Split(channel, ","), msg)
} else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeGlobal {
SaveGlobalMessage(cmd, timestamp, json)
count = PublishToAll(msg)
}
w.Write([]byte(strconv.Itoa(count)))
}

View file

@ -1,80 +0,0 @@
package server
import (
"testing"
"time"
)
func TestCleanupBacklogMessages(t *testing.T) {
}
func TestFindFirstNewMessageEmpty(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != -1 {
t.Errorf("Expected -1, got %d", i)
}
}
func TestFindFirstNewMessageOneBefore(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(8, 0)},
}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != -1 {
t.Errorf("Expected -1, got %d", i)
}
}
func TestFindFirstNewMessageSeveralBefore(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(1, 0)},
{Timestamp: time.Unix(2, 0)},
{Timestamp: time.Unix(3, 0)},
{Timestamp: time.Unix(4, 0)},
{Timestamp: time.Unix(5, 0)},
}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != -1 {
t.Errorf("Expected -1, got %d", i)
}
}
func TestFindFirstNewMessageInMiddle(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(1, 0)},
{Timestamp: time.Unix(2, 0)},
{Timestamp: time.Unix(3, 0)},
{Timestamp: time.Unix(4, 0)},
{Timestamp: time.Unix(5, 0)},
{Timestamp: time.Unix(11, 0)},
{Timestamp: time.Unix(12, 0)},
{Timestamp: time.Unix(13, 0)},
{Timestamp: time.Unix(14, 0)},
{Timestamp: time.Unix(15, 0)},
}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != 5 {
t.Errorf("Expected 5, got %d", i)
}
}
func TestFindFirstNewMessageOneAfter(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(15, 0)},
}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != 0 {
t.Errorf("Expected 0, got %d", i)
}
}
func TestFindFirstNewMessageSeveralAfter(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(11, 0)},
{Timestamp: time.Unix(12, 0)},
{Timestamp: time.Unix(13, 0)},
{Timestamp: time.Unix(14, 0)},
{Timestamp: time.Unix(15, 0)},
}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != 0 {
t.Errorf("Expected 0, got %d", i)
}
}

View file

@ -12,8 +12,30 @@ import (
"time" "time"
) )
var ResponseSuccess = ClientMessage{Command: SuccessCommand} // A command is how the client refers to a function on the server. It's just a string.
var ResponseFailure = ClientMessage{Command: "False"} type Command string
// A function that is called to respond to a Command.
type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error)
var CommandHandlers = map[Command]CommandHandler{
HelloCommand: HandleHello,
"setuser": HandleSetUser,
"ready": HandleReady,
"sub": HandleSub,
"unsub": HandleUnsub,
"track_follow": HandleTrackFollow,
"emoticon_uses": HandleEmoticonUses,
"survey": HandleSurvey,
"twitch_emote": HandleRemoteCommand,
"get_link": HandleBunchedRemoteCommand,
"get_display_name": HandleBunchedRemoteCommand,
"update_follow_buttons": HandleRemoteCommand,
"chat_history": HandleRemoteCommand,
}
const ChannelInfoDelay = 2 * time.Second const ChannelInfoDelay = 2 * time.Second
@ -135,7 +157,7 @@ func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rms
client.Mutex.Unlock() client.Mutex.Unlock()
SubscribeChat(client, channel) SubscribeChannel(client, channel)
return ResponseSuccess, nil return ResponseSuccess, nil
} }

View file

@ -18,31 +18,6 @@ import (
const MAX_PACKET_SIZE = 1024 const MAX_PACKET_SIZE = 1024
// A command is how the client refers to a function on the server. It's just a string.
type Command string
// A function that is called to respond to a Command.
type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error)
var CommandHandlers = map[Command]CommandHandler{
HelloCommand: HandleHello,
"setuser": HandleSetUser,
"ready": HandleReady,
"sub": HandleSub,
"unsub": HandleUnsub,
"track_follow": HandleTrackFollow,
"emoticon_uses": HandleEmoticonUses,
"survey": HandleSurvey,
"twitch_emote": HandleRemoteCommand,
"get_link": HandleBunchedRemoteCommand,
"get_display_name": HandleBunchedRemoteCommand,
"update_follow_buttons": HandleRemoteCommand,
"chat_history": HandleRemoteCommand,
}
// Sent by the server in ClientMessage.Command to indicate success. // Sent by the server in ClientMessage.Command to indicate success.
const SuccessCommand Command = "ok" const SuccessCommand Command = "ok"
@ -58,28 +33,11 @@ const AuthorizeCommand Command = "do_authorize"
// It signals that the work has been handed off to a background goroutine. // It signals that the work has been handed off to a background goroutine.
const AsyncResponseCommand Command = "_async" const AsyncResponseCommand Command = "_async"
var SocketUpgrader = websocket.Upgrader{ var ResponseSuccess = ClientMessage{Command: SuccessCommand}
ReadBufferSize: 1024, var ResponseFailure = ClientMessage{Command: "False"}
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return r.Header.Get("Origin") == "http://www.twitch.tv"
},
}
// Errors that get returned to the client.
var ProtocolError error = errors.New("FFZ Socket protocol error.")
var ProtocolErrorNegativeID error = errors.New("FFZ Socket protocol error: negative or zero message ID.")
var ExpectedSingleString = errors.New("Error: Expected single string as arguments.")
var ExpectedSingleInt = errors.New("Error: Expected single integer as arguments.")
var ExpectedTwoStrings = errors.New("Error: Expected array of string, string as arguments.")
var ExpectedStringAndInt = errors.New("Error: Expected array of string, int as arguments.")
var ExpectedStringAndBool = errors.New("Error: Expected array of string, bool as arguments.")
var ExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.")
var Configuation *ConfigFile var Configuation *ConfigFile
var BannerHTML []byte
// Set up a websocket listener and register it on /. // Set up a websocket listener and register it on /.
// (Uses http.DefaultServeMux .) // (Uses http.DefaultServeMux .)
func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) {
@ -98,9 +56,9 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) {
BannerHTML = bannerBytes BannerHTML = bannerBytes
serveMux.HandleFunc("/", ServeWebsocketOrCatbag) serveMux.HandleFunc("/", ServeWebsocketOrCatbag)
serveMux.HandleFunc("/pub_msg", HBackendPublishRequest) serveMux.HandleFunc("/drop_backlog", HBackendDropBacklog)
serveMux.HandleFunc("/dump_backlog", HBackendDumpBacklog) serveMux.HandleFunc("/uncached_pub", HBackendPublishRequest)
serveMux.HandleFunc("/update_and_pub", HBackendUpdateAndPublish) serveMux.HandleFunc("/cached_pub", HBackendUpdateAndPublish)
announceForm, err := SealRequest(url.Values{ announceForm, err := SealRequest(url.Values{
"startup": []string{"1"}, "startup": []string{"1"},
@ -123,6 +81,16 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) {
go ircConnection() go ircConnection()
} }
var SocketUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return r.Header.Get("Origin") == "http://www.twitch.tv"
},
}
var BannerHTML []byte
func ServeWebsocketOrCatbag(w http.ResponseWriter, r *http.Request) { func ServeWebsocketOrCatbag(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Connection") == "Upgrade" { if r.Header.Get("Connection") == "Upgrade" {
conn, err := SocketUpgrader.Upgrade(w, r, nil) conn, err := SocketUpgrader.Upgrade(w, r, nil)
@ -138,6 +106,16 @@ func ServeWebsocketOrCatbag(w http.ResponseWriter, r *http.Request) {
} }
} }
// Errors that get returned to the client.
var ProtocolError error = errors.New("FFZ Socket protocol error.")
var ProtocolErrorNegativeID error = errors.New("FFZ Socket protocol error: negative or zero message ID.")
var ExpectedSingleString = errors.New("Error: Expected single string as arguments.")
var ExpectedSingleInt = errors.New("Error: Expected single integer as arguments.")
var ExpectedTwoStrings = errors.New("Error: Expected array of string, string as arguments.")
var ExpectedStringAndInt = errors.New("Error: Expected array of string, int as arguments.")
var ExpectedStringAndBool = errors.New("Error: Expected array of string, bool as arguments.")
var ExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.")
var CloseGotBinaryMessage = websocket.CloseError{Code: websocket.CloseUnsupportedData, Text: "got binary packet"} var CloseGotBinaryMessage = websocket.CloseError{Code: websocket.CloseUnsupportedData, Text: "got binary packet"}
var CloseGotMessageId0 = websocket.CloseError{Code: websocket.ClosePolicyViolation, Text: "got messageid 0"} var CloseGotMessageId0 = websocket.CloseError{Code: websocket.ClosePolicyViolation, Text: "got messageid 0"}
var CloseTimedOut = websocket.CloseError{Code: websocket.CloseNoStatusReceived, Text: "no ping replies for 5 minutes"} var CloseTimedOut = websocket.CloseError{Code: websocket.CloseNoStatusReceived, Text: "no ping replies for 5 minutes"}

View file

@ -1,188 +1,392 @@
package server package server
// This is the scariest code I've written yet for the server.
// If I screwed up the locking, I won't know until it's too late.
import ( import (
"log" "errors"
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"sync" "sync"
"time" "time"
) )
type SubscriberList struct { type PushCommandCacheInfo struct {
sync.RWMutex Caching BacklogCacheType
Members []chan<- ClientMessage Target MessageTargetType
} }
var ChatSubscriptionInfo map[string]*SubscriberList = make(map[string]*SubscriberList) // this value is just docs right now
var ChatSubscriptionLock sync.RWMutex var ServerInitiatedCommands = map[Command]PushCommandCacheInfo{
var GlobalSubscriptionInfo SubscriberList /// Global updates & notices
"update_news": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
"message": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
"reload_ff": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
func PublishToChat(channel string, msg ClientMessage) (count int) { /// Emote updates
ChatSubscriptionLock.RLock() "reload_badges": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
list := ChatSubscriptionInfo[channel] "set_badge": {CacheTypeTimestamps, MsgTargetTypeMultichat}, // timecache:multichat
if list != nil { "reload_set": {}, // timecache:multichat
list.RLock() "load_set": {}, // TODO what are the semantics of this?
for _, msgChan := range list.Members {
msgChan <- msg /// User auth
count++ "do_authorize": {CacheTypeNever, MsgTargetTypeSingle}, // nocache:single
}
list.RUnlock() /// Channel data
} // follow_sets: extra emote sets included in the chat
ChatSubscriptionLock.RUnlock() // follow_buttons: extra follow buttons below the stream
return "follow_sets": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:chat
"follow_buttons": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:watching
"srl_race": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching
/// Chatter/viewer counts
"chatters": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching
"viewers": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching
} }
func PublishToMultiple(channels []string, msg ClientMessage) (count int) { type BacklogCacheType int
found := make(map[chan<- ClientMessage]struct{})
ChatSubscriptionLock.RLock() const (
// This is not a cache type.
CacheTypeInvalid BacklogCacheType = iota
// This message cannot be cached.
CacheTypeNever
// Save the last 24 hours of this message.
// If a client indicates that it has reconnected, replay the messages sent after the disconnect.
// Do not replay if the client indicates that this is a firstload.
CacheTypeTimestamps
// Save only the last copy of this message, and always send it when the backlog is requested.
CacheTypeLastOnly
// Save this backlog data to disk with its timestamp.
// Send it when the backlog is requested, or after a reconnect if it was updated.
CacheTypePersistent
)
for _, channel := range channels { type MessageTargetType int
list := ChatSubscriptionInfo[channel]
if list != nil { const (
list.RLock() // This is not a message target.
for _, msgChan := range list.Members { MsgTargetTypeInvalid MessageTargetType = iota
found[msgChan] = struct{}{} // This message is targeted to a single TODO(user or connection)
} MsgTargetTypeSingle
list.RUnlock() // This message is targeted to all users in a chat
} MsgTargetTypeChat
// This message is targeted to all users in multiple chats
MsgTargetTypeMultichat
// This message is sent to all FFZ users.
MsgTargetTypeGlobal
)
// note: see types.go for methods on these
// Returned by BacklogCacheType.UnmarshalJSON()
var ErrorUnrecognizedCacheType = errors.New("Invalid value for cachetype")
// Returned by MessageTargetType.UnmarshalJSON()
var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target")
type TimestampedGlobalMessage struct {
Timestamp time.Time
Command Command
Data string
} }
ChatSubscriptionLock.RUnlock() type TimestampedMultichatMessage struct {
Timestamp time.Time
for msgChan, _ := range found { Channels []string
msgChan <- msg Command Command
count++ Data string
}
return
} }
func PublishToAll(msg ClientMessage) (count int) { type LastSavedMessage struct {
GlobalSubscriptionInfo.RLock() Timestamp time.Time
for _, msgChan := range GlobalSubscriptionInfo.Members { Data string
msgChan <- msg
count++
}
GlobalSubscriptionInfo.RUnlock()
return
} }
// Add a channel to the subscriptions while holding a read-lock to the map. // map is command -> channel -> data
// Locks:
// - ALREADY HOLDING a read-lock to the 'which' top-level map via the rlocker object
// - possible write lock to the 'which' top-level map via the wlocker object
// - write lock to SubscriptionInfo (if not creating new)
func _subscribeWhileRlocked(channelName string, value chan<- ClientMessage) {
list := ChatSubscriptionInfo[channelName]
if list == nil {
// Not found, so create it
ChatSubscriptionLock.RUnlock()
ChatSubscriptionLock.Lock()
list = &SubscriberList{}
list.Members = []chan<- ClientMessage{value} // Create it populated, to avoid reaper
ChatSubscriptionInfo[channelName] = list
ChatSubscriptionLock.Unlock()
go func(topic string) { // CacheTypeLastOnly. Cleaned up by reaper goroutine every ~hour.
err := SendNewTopicNotice(topic) var CachedLastMessages map[Command]map[string]LastSavedMessage
if err != nil { var CachedLSMLock sync.RWMutex
log.Println("error reporting new sub:", err)
}
}(channelName)
ChatSubscriptionLock.RLock() // CacheTypePersistent. Never cleaned.
} else { var PersistentLastMessages map[Command]map[string]LastSavedMessage
list.Lock() var PersistentLSMLock sync.RWMutex
AddToSliceC(&list.Members, value)
list.Unlock() var CachedGlobalMessages []TimestampedGlobalMessage
} var CachedChannelMessages []TimestampedMultichatMessage
var CacheListsLock sync.RWMutex
func DumpCache() {
CachedLSMLock.Lock()
CachedLastMessages = make(map[Command]map[string]LastSavedMessage)
CachedLSMLock.Unlock()
PersistentLSMLock.Lock()
PersistentLastMessages = make(map[Command]map[string]LastSavedMessage)
PersistentLSMLock.Unlock()
CacheListsLock.Lock()
CachedGlobalMessages = make(tgmarray, 0)
CachedChannelMessages = make(tmmarray, 0)
CacheListsLock.Unlock()
} }
func SubscribeGlobal(client *ClientInfo) { func SendBacklogForNewClient(client *ClientInfo) {
GlobalSubscriptionInfo.Lock() client.Mutex.Lock() // reading CurrentChannels
AddToSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel) PersistentLSMLock.RLock()
GlobalSubscriptionInfo.Unlock() for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypePersistent, MsgTargetTypeChat}) {
chanMap := CachedLastMessages[cmd]
if chanMap == nil {
continue
} }
for _, channel := range client.CurrentChannels {
func SubscribeChat(client *ClientInfo, channelName string) { msg, ok := chanMap[channel]
ChatSubscriptionLock.RLock() if ok {
_subscribeWhileRlocked(channelName, client.MessageChannel) msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data}
ChatSubscriptionLock.RUnlock() msg.parseOrigArguments()
client.MessageChannel <- msg
} }
func unsubscribeAllClients() {
GlobalSubscriptionInfo.Lock()
GlobalSubscriptionInfo.Members = nil
GlobalSubscriptionInfo.Unlock()
ChatSubscriptionLock.Lock()
ChatSubscriptionInfo = make(map[string]*SubscriberList)
ChatSubscriptionLock.Unlock()
} }
}
PersistentLSMLock.RUnlock()
// Unsubscribe the client from all channels, AND clear the CurrentChannels / WatchingChannels fields. CachedLSMLock.RLock()
// Locks: for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}) {
// - read lock to top-level maps chanMap := CachedLastMessages[cmd]
// - write lock to SubscriptionInfos if chanMap == nil {
// - write lock to ClientInfo continue
func UnsubscribeAll(client *ClientInfo) { }
client.Mutex.Lock() for _, channel := range client.CurrentChannels {
client.PendingSubscriptionsBacklog = nil msg, ok := chanMap[channel]
client.PendingSubscriptionsBacklog = nil if ok {
msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data}
msg.parseOrigArguments()
client.MessageChannel <- msg
}
}
}
CachedLSMLock.RUnlock()
client.Mutex.Unlock() client.Mutex.Unlock()
}
GlobalSubscriptionInfo.Lock() func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) {
RemoveFromSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel) client.Mutex.Lock() // reading CurrentChannels
GlobalSubscriptionInfo.Unlock() CacheListsLock.RLock()
ChatSubscriptionLock.RLock() globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime)
client.Mutex.Lock()
for _, v := range client.CurrentChannels { if globIdx != -1 {
list := ChatSubscriptionInfo[v] for i := globIdx; i < len(CachedGlobalMessages); i++ {
if list != nil { item := CachedGlobalMessages[i]
list.Lock() msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data}
RemoveFromSliceC(&list.Members, client.MessageChannel) msg.parseOrigArguments()
list.Unlock() client.MessageChannel <- msg
} }
} }
client.CurrentChannels = nil
chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime)
if chanIdx != -1 {
for i := chanIdx; i < len(CachedChannelMessages); i++ {
item := CachedChannelMessages[i]
var send bool
for _, channel := range item.Channels {
for _, matchChannel := range client.CurrentChannels {
if channel == matchChannel {
send = true
break
}
}
if send {
break
}
}
if send {
msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data}
msg.parseOrigArguments()
client.MessageChannel <- msg
}
}
}
CacheListsLock.RUnlock()
client.Mutex.Unlock() client.Mutex.Unlock()
ChatSubscriptionLock.RUnlock()
} }
func UnsubscribeSingleChat(client *ClientInfo, channelName string) { func backlogJanitor() {
ChatSubscriptionLock.RLock()
list := ChatSubscriptionInfo[channelName]
if list != nil {
list.Lock()
RemoveFromSliceC(&list.Members, client.MessageChannel)
list.Unlock()
}
ChatSubscriptionLock.RUnlock()
}
const ReapingDelay = 20 * time.Minute
// Checks ChatSubscriptionInfo for entries with no subscribers every ReapingDelay.
// Started from SetupServer().
func pubsubJanitor() {
for { for {
time.Sleep(ReapingDelay) time.Sleep(1 * time.Hour)
var cleanedUp = make([]string, 0, 6) CleanupTimedBacklogMessages()
ChatSubscriptionLock.Lock()
for key, val := range ChatSubscriptionInfo {
if val == nil || len(val.Members) == 0 {
delete(ChatSubscriptionInfo, key)
cleanedUp = append(cleanedUp, key)
} }
} }
ChatSubscriptionLock.Unlock()
if len(cleanedUp) != 0 { func CleanupTimedBacklogMessages() {
err := SendCleanupTopicsNotice(cleanedUp) CacheListsLock.Lock()
oneHourAgo := time.Now().Add(-24 * time.Hour)
globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), oneHourAgo)
if globIdx != -1 {
newGlobMsgs := make([]TimestampedGlobalMessage, len(CachedGlobalMessages)-globIdx)
copy(newGlobMsgs, CachedGlobalMessages[globIdx:])
CachedGlobalMessages = newGlobMsgs
}
chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), oneHourAgo)
if chanIdx != -1 {
newChanMsgs := make([]TimestampedMultichatMessage, len(CachedChannelMessages)-chanIdx)
copy(newChanMsgs, CachedChannelMessages[chanIdx:])
CachedChannelMessages = newChanMsgs
}
CacheListsLock.Unlock()
}
func InsertionSort(ary sort.Interface) {
for i := 1; i < ary.Len(); i++ {
for j := i; j > 0 && ary.Less(j, j-1); j-- {
ary.Swap(j, j-1)
}
}
}
type TimestampArray interface {
Len() int
GetTime(int) time.Time
}
func FindFirstNewMessage(ary TimestampArray, disconnectTime time.Time) (idx int) {
// TODO needs tests
len := ary.Len()
i := len
// Walk backwards until we find GetTime() before disconnectTime
step := 1
for i > 0 {
i -= step
if i < 0 {
i = 0
}
if !ary.GetTime(i).After(disconnectTime) {
break
}
step = int(float64(step)*1.5) + 1
}
// Walk forwards until we find GetTime() after disconnectTime
for i < len && !ary.GetTime(i).After(disconnectTime) {
i++
}
if i == len {
return -1
}
return i
}
func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync.Locker, cmd Command, channel string, timestamp time.Time, data string, deleting bool) {
locker.Lock()
defer locker.Unlock()
chanMap, ok := CachedLastMessages[cmd]
if !ok {
if deleting {
return
}
chanMap = make(map[string]LastSavedMessage)
CachedLastMessages[cmd] = chanMap
}
if deleting {
delete(chanMap, channel)
} else {
chanMap[channel] = LastSavedMessage{timestamp, data}
}
}
func SaveGlobalMessage(cmd Command, timestamp time.Time, data string) {
CacheListsLock.Lock()
CachedGlobalMessages = append(CachedGlobalMessages, TimestampedGlobalMessage{timestamp, cmd, data})
InsertionSort(tgmarray(CachedGlobalMessages))
CacheListsLock.Unlock()
}
func SaveMultichanMessage(cmd Command, channels string, timestamp time.Time, data string) {
CacheListsLock.Lock()
CachedChannelMessages = append(CachedChannelMessages, TimestampedMultichatMessage{timestamp, strings.Split(channels, ","), cmd, data})
InsertionSort(tmmarray(CachedChannelMessages))
CacheListsLock.Unlock()
}
func GetCommandsOfType(match PushCommandCacheInfo) []Command {
var ret []Command
for cmd, info := range ServerInitiatedCommands {
if info == match {
ret = append(ret, cmd)
}
}
return ret
}
func HBackendDropBacklog(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
formData, err := UnsealRequest(r.Form)
if err != nil { if err != nil {
log.Println("error reporting cleaned subs:", err) w.WriteHeader(403)
fmt.Fprintf(w, "Error: %v", err)
return
}
confirm := formData.Get("confirm")
if confirm == "1" {
DumpCache()
} }
} }
// Publish a message to clients, and update the in-server cache for the message.
// notes:
// `scope` is implicit in the command
func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
formData, err := UnsealRequest(r.Form)
if err != nil {
w.WriteHeader(403)
fmt.Fprintf(w, "Error: %v", err)
return
} }
cmd := Command(formData.Get("cmd"))
json := formData.Get("args")
channel := formData.Get("channel")
deleteMode := formData.Get("delete") != ""
timeStr := formData.Get("time")
timestamp, err := time.Parse(time.UnixDate, timeStr)
if err != nil {
w.WriteHeader(422)
fmt.Fprintf(w, "error parsing time: %v", err)
}
cacheinfo, ok := ServerInitiatedCommands[cmd]
if !ok {
w.WriteHeader(422)
fmt.Fprintf(w, "Caching semantics unknown for command '%s'. Post to /addcachedcommand first.", cmd)
return
}
var count int
msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: json}
msg.parseOrigArguments()
if cacheinfo.Caching == CacheTypeLastOnly && cacheinfo.Target == MsgTargetTypeChat {
SaveLastMessage(CachedLastMessages, &CachedLSMLock, cmd, channel, timestamp, json, deleteMode)
count = PublishToChannel(channel, msg)
} else if cacheinfo.Caching == CacheTypePersistent && cacheinfo.Target == MsgTargetTypeChat {
SaveLastMessage(PersistentLastMessages, &PersistentLSMLock, cmd, channel, timestamp, json, deleteMode)
count = PublishToChannel(channel, msg)
} else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeMultichat {
SaveMultichanMessage(cmd, channel, timestamp, json)
count = PublishToMultiple(strings.Split(channel, ","), msg)
} else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeGlobal {
SaveGlobalMessage(cmd, timestamp, json)
count = PublishToAll(msg)
}
w.Write([]byte(strconv.Itoa(count)))
} }

View file

@ -1,448 +1,80 @@
package server package server
import ( import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"github.com/satori/go.uuid"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strconv"
"sync"
"syscall"
"testing" "testing"
"time" "time"
) )
func TCountOpenFDs() uint64 { func TestCleanupBacklogMessages(t *testing.T) {
ary, _ := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid()))
return uint64(len(ary))
} }
const IgnoreReceivedArguments = 1 + 2i func TestFindFirstNewMessageEmpty(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{}
func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) { i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
var msg ClientMessage if i != -1 {
var fail bool t.Errorf("Expected -1, got %d", i)
messageType, packet, err := conn.ReadMessage()
if err != nil {
tb.Error(err)
return msg, false
}
if messageType != websocket.TextMessage {
tb.Error("got non-text message", packet)
return msg, false
}
err = UnmarshalClientMessage(packet, messageType, &msg)
if err != nil {
tb.Error(err)
return msg, false
}
if msg.MessageID != messageId {
tb.Error("Message ID was wrong. Expected", messageId, ", got", msg.MessageID, ":", msg)
fail = true
}
if msg.Command != command {
tb.Error("Command was wrong. Expected", command, ", got", msg.Command, ":", msg)
fail = true
}
if arguments != IgnoreReceivedArguments {
if arguments == nil {
if msg.origArguments != "" {
tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg)
}
} else {
argBytes, _ := json.Marshal(arguments)
if msg.origArguments != string(argBytes) {
tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg)
} }
} }
func TestFindFirstNewMessageOneBefore(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(8, 0)},
} }
return msg, !fail i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
} if i != -1 {
t.Errorf("Expected -1, got %d", i)
func TSendMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) bool {
SendMessage(conn, ClientMessage{MessageID: messageId, Command: command, Arguments: arguments})
return true
}
func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments interface{}, deleteMode bool) (url.Values, error) {
form := url.Values{}
form.Set("cmd", string(cmd))
argsBytes, err := json.Marshal(arguments)
if err != nil {
tb.Error(err)
return nil, err
}
form.Set("args", string(argsBytes))
form.Set("channel", channel)
if deleteMode {
form.Set("delete", "1")
}
form.Set("time", time.Now().Format(time.UnixDate))
sealed, err := SealRequest(form)
if err != nil {
tb.Error(err)
return nil, err
}
return sealed, nil
}
func TCheckResponse(tb testing.TB, resp *http.Response, expected string) bool {
var failed bool
respBytes, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
respStr := string(respBytes)
if err != nil {
tb.Error(err)
failed = true
}
if resp.StatusCode != 200 {
tb.Error("Publish failed: ", resp.StatusCode, respStr)
failed = true
}
if respStr != expected {
tb.Errorf("Got wrong response from server. Expected: '%s' Got: '%s'", expected, respStr)
failed = true
}
return !failed
}
type TURLs struct {
Websocket string
Origin string
PubMsg string
SavePubMsg string // update_and_pub
}
func TGetUrls(testserver *httptest.Server) TURLs {
addr := testserver.Listener.Addr().String()
return TURLs{
Websocket: fmt.Sprintf("ws://%s/", addr),
Origin: fmt.Sprintf("http://%s", addr),
PubMsg: fmt.Sprintf("http://%s/pub_msg", addr),
SavePubMsg: fmt.Sprintf("http://%s/update_and_pub", addr),
} }
} }
func TestFindFirstNewMessageSeveralBefore(t *testing.T) {
func TSetup(testserver **httptest.Server, urls *TURLs) { CachedGlobalMessages = []TimestampedGlobalMessage{
DumpCache() {Timestamp: time.Unix(1, 0)},
{Timestamp: time.Unix(2, 0)},
conf := &ConfigFile{ {Timestamp: time.Unix(3, 0)},
ServerId: 20, {Timestamp: time.Unix(4, 0)},
UseSSL: false, {Timestamp: time.Unix(5, 0)},
SocketOrigin: "localhost:2002",
BannerHTML: `
<!DOCTYPE html>
<title>CatBag</title>
<link rel="stylesheet" href="//cdn.frankerfacez.com/script/catbag.css">
<div id="container">
<div id="zf0"></div><div id="zf1"></div><div id="zf2"></div>
<div id="zf3"></div><div id="zf4"></div><div id="zf5"></div>
<div id="zf6"></div><div id="zf7"></div><div id="zf8"></div>
<div id="zf9"></div><div id="catbag"></div>
<div id="bottom">
A <a href="http://www.frankerfacez.com/">FrankerFaceZ</a> Service
&mdash; CatBag by <a href="http://www.twitch.tv/wolsk">Wolsk</a>
</div>
</div>
`,
OurPublicKey: []byte{176, 149, 72, 209, 35, 42, 110, 220, 22, 236, 212, 129, 213, 199, 1, 227, 185, 167, 150, 159, 117, 202, 164, 100, 9, 107, 45, 141, 122, 221, 155, 73},
OurPrivateKey: []byte{247, 133, 147, 194, 70, 240, 211, 216, 223, 16, 241, 253, 120, 14, 198, 74, 237, 180, 89, 33, 146, 146, 140, 58, 88, 160, 2, 246, 112, 35, 239, 87},
BackendPublicKey: []byte{19, 163, 37, 157, 50, 139, 193, 85, 229, 47, 166, 21, 153, 231, 31, 133, 41, 158, 8, 53, 73, 0, 113, 91, 13, 181, 131, 248, 176, 18, 1, 107},
} }
gconfig = conf i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
SetupBackend(conf) if i != -1 {
t.Errorf("Expected -1, got %d", i)
if testserver != nil {
serveMux := http.NewServeMux()
SetupServerAndHandle(conf, serveMux)
tserv := httptest.NewUnstartedServer(serveMux)
*testserver = tserv
tserv.Start()
if urls != nil {
*urls = TGetUrls(tserv)
} }
} }
func TestFindFirstNewMessageInMiddle(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(1, 0)},
{Timestamp: time.Unix(2, 0)},
{Timestamp: time.Unix(3, 0)},
{Timestamp: time.Unix(4, 0)},
{Timestamp: time.Unix(5, 0)},
{Timestamp: time.Unix(11, 0)},
{Timestamp: time.Unix(12, 0)},
{Timestamp: time.Unix(13, 0)},
{Timestamp: time.Unix(14, 0)},
{Timestamp: time.Unix(15, 0)},
} }
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
func TestSubscriptionAndPublish(t *testing.T) { if i != 5 {
var doneWg sync.WaitGroup t.Errorf("Expected 5, got %d", i)
var readyWg sync.WaitGroup
const TestChannelName1 = "room.testchannel"
const TestChannelName2 = "room.chan2"
const TestChannelName3 = "room.chan3"
const TestChannelNameUnused = "room.empty"
const TestCommandChan = "testdata_single"
const TestCommandMulti = "testdata_multi"
const TestCommandGlobal = "testdata_global"
const TestData1 = "123456789"
const TestData2 = 42
const TestData3 = false
var TestData4 = []interface{}{"str1", "str2", "str3"}
ServerInitiatedCommands[TestCommandChan] = PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}
ServerInitiatedCommands[TestCommandMulti] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeMultichat}
ServerInitiatedCommands[TestCommandGlobal] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeGlobal}
var server *httptest.Server
var urls TURLs
TSetup(&server, &urls)
defer server.CloseClientConnections()
defer unsubscribeAllClients()
var conn *websocket.Conn
var resp *http.Response
var err error
// client 1: sub ch1, ch2
// client 2: sub ch1, ch3
// client 3: sub none
// client 4: delayed sub ch1
// msg 1: ch1
// msg 2: ch2, ch3
// msg 3: chEmpty
// msg 4: global
// Client 1
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
if err != nil {
t.Error(err)
return
} }
doneWg.Add(1)
readyWg.Add(1)
go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "sub", TestChannelName1)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
TSendMessage(t, conn, 3, "sub", TestChannelName2) // 2
TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil)
TSendMessage(t, conn, 4, "ready", 0)
TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil)
readyWg.Done()
TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1)
TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2)
TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4)
conn.Close()
doneWg.Done()
}(conn)
// Client 2
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
if err != nil {
t.Error(err)
return
} }
func TestFindFirstNewMessageOneAfter(t *testing.T) {
doneWg.Add(1) CachedGlobalMessages = []TimestampedGlobalMessage{
readyWg.Add(1) {Timestamp: time.Unix(15, 0)},
go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "sub", TestChannelName1)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
TSendMessage(t, conn, 3, "sub", TestChannelName3) // 3
TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil)
TSendMessage(t, conn, 4, "ready", 0)
TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil)
readyWg.Done()
TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1)
TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2)
TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4)
conn.Close()
doneWg.Done()
}(conn)
// Client 3
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
if err != nil {
t.Error(err)
return
} }
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
doneWg.Add(1) if i != 0 {
readyWg.Add(1) t.Errorf("Expected 0, got %d", i)
go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "ready", 0)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
readyWg.Done()
TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4)
conn.Close()
doneWg.Done()
}(conn)
// Wait for clients 1-3
readyWg.Wait()
var form url.Values
// Publish message 1 - should go to clients 1, 2
form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelName1, TestData1, false)
if err != nil {
t.FailNow()
} }
resp, err = http.PostForm(urls.SavePubMsg, form)
if !TCheckResponse(t, resp, strconv.Itoa(2)) {
t.FailNow()
} }
func TestFindFirstNewMessageSeveralAfter(t *testing.T) {
// Publish message 2 - should go to clients 1, 2 CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(11, 0)},
form, err = TSealForSavePubMsg(t, TestCommandMulti, TestChannelName2+","+TestChannelName3, TestData2, false) {Timestamp: time.Unix(12, 0)},
if err != nil { {Timestamp: time.Unix(13, 0)},
t.FailNow() {Timestamp: time.Unix(14, 0)},
{Timestamp: time.Unix(15, 0)},
} }
resp, err = http.PostForm(urls.SavePubMsg, form) i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if !TCheckResponse(t, resp, strconv.Itoa(2)) { if i != 0 {
t.FailNow() t.Errorf("Expected 0, got %d", i)
} }
// Publish message 3 - should go to no clients
form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelNameUnused, TestData3, false)
if err != nil {
t.FailNow()
}
resp, err = http.PostForm(urls.SavePubMsg, form)
if !TCheckResponse(t, resp, strconv.Itoa(0)) {
t.FailNow()
}
// Publish message 4 - should go to clients 1, 2, 3
form, err = TSealForSavePubMsg(t, TestCommandGlobal, "", TestData4, false)
if err != nil {
t.FailNow()
}
resp, err = http.PostForm(urls.SavePubMsg, form)
if !TCheckResponse(t, resp, strconv.Itoa(3)) {
t.FailNow()
}
// Start client 4
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
if err != nil {
t.Error(err)
return
}
doneWg.Add(1)
readyWg.Add(1)
go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "sub", TestChannelName1)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
TSendMessage(t, conn, 3, "ready", 0)
TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil)
// backlog message
TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1)
readyWg.Done()
conn.Close()
doneWg.Done()
}(conn)
readyWg.Wait()
doneWg.Wait()
server.Close()
}
func BenchmarkUserSubscriptionSinglePublish(b *testing.B) {
var doneWg sync.WaitGroup
var readyWg sync.WaitGroup
const TestChannelName = "room.testchannel"
const TestCommand = "testdata"
const TestData = "123456789"
message := ClientMessage{MessageID: -1, Command: "testdata", Arguments: TestData}
fmt.Println()
fmt.Println(b.N)
var limit syscall.Rlimit
syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit)
limit.Cur = TCountOpenFDs() + uint64(b.N)*2 + 100
if limit.Cur > limit.Max {
b.Skip("Open file limit too low")
return
}
syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit)
var server *httptest.Server
var urls TURLs
TSetup(&server, &urls)
defer unsubscribeAllClients()
b.ResetTimer()
for i := 0; i < b.N; i++ {
conn, _, err := websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
if err != nil {
b.Error(err)
break
}
doneWg.Add(1)
readyWg.Add(1)
go func(i int, conn *websocket.Conn) {
TSendMessage(b, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TSendMessage(b, conn, 2, "sub", TestChannelName)
TReceiveExpectedMessage(b, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TReceiveExpectedMessage(b, conn, 2, SuccessCommand, nil)
readyWg.Done()
TReceiveExpectedMessage(b, conn, -1, TestCommand, TestData)
conn.Close()
doneWg.Done()
}(i, conn)
}
readyWg.Wait()
fmt.Println("publishing...")
if PublishToChat(TestChannelName, message) != b.N {
b.Error("not enough sent")
server.CloseClientConnections()
panic("halting test instead of waiting")
}
doneWg.Wait()
fmt.Println("...done.")
b.StopTimer()
server.Close()
server.CloseClientConnections()
} }

View file

@ -0,0 +1,188 @@
package server
// This is the scariest code I've written yet for the server.
// If I screwed up the locking, I won't know until it's too late.
import (
"log"
"sync"
"time"
)
type SubscriberList struct {
sync.RWMutex
Members []chan<- ClientMessage
}
var ChatSubscriptionInfo map[string]*SubscriberList = make(map[string]*SubscriberList)
var ChatSubscriptionLock sync.RWMutex
var GlobalSubscriptionInfo SubscriberList
func SubscribeGlobal(client *ClientInfo) {
GlobalSubscriptionInfo.Lock()
AddToSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel)
GlobalSubscriptionInfo.Unlock()
}
func SubscribeChannel(client *ClientInfo, channelName string) {
ChatSubscriptionLock.RLock()
_subscribeWhileRlocked(channelName, client.MessageChannel)
ChatSubscriptionLock.RUnlock()
}
func PublishToChannel(channel string, msg ClientMessage) (count int) {
ChatSubscriptionLock.RLock()
list := ChatSubscriptionInfo[channel]
if list != nil {
list.RLock()
for _, msgChan := range list.Members {
msgChan <- msg
count++
}
list.RUnlock()
}
ChatSubscriptionLock.RUnlock()
return
}
func PublishToMultiple(channels []string, msg ClientMessage) (count int) {
found := make(map[chan<- ClientMessage]struct{})
ChatSubscriptionLock.RLock()
for _, channel := range channels {
list := ChatSubscriptionInfo[channel]
if list != nil {
list.RLock()
for _, msgChan := range list.Members {
found[msgChan] = struct{}{}
}
list.RUnlock()
}
}
ChatSubscriptionLock.RUnlock()
for msgChan, _ := range found {
msgChan <- msg
count++
}
return
}
func PublishToAll(msg ClientMessage) (count int) {
GlobalSubscriptionInfo.RLock()
for _, msgChan := range GlobalSubscriptionInfo.Members {
msgChan <- msg
count++
}
GlobalSubscriptionInfo.RUnlock()
return
}
func UnsubscribeSingleChat(client *ClientInfo, channelName string) {
ChatSubscriptionLock.RLock()
list := ChatSubscriptionInfo[channelName]
if list != nil {
list.Lock()
RemoveFromSliceC(&list.Members, client.MessageChannel)
list.Unlock()
}
ChatSubscriptionLock.RUnlock()
}
// Unsubscribe the client from all channels, AND clear the CurrentChannels / WatchingChannels fields.
// Locks:
// - read lock to top-level maps
// - write lock to SubscriptionInfos
// - write lock to ClientInfo
func UnsubscribeAll(client *ClientInfo) {
client.Mutex.Lock()
client.PendingSubscriptionsBacklog = nil
client.PendingSubscriptionsBacklog = nil
client.Mutex.Unlock()
GlobalSubscriptionInfo.Lock()
RemoveFromSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel)
GlobalSubscriptionInfo.Unlock()
ChatSubscriptionLock.RLock()
client.Mutex.Lock()
for _, v := range client.CurrentChannels {
list := ChatSubscriptionInfo[v]
if list != nil {
list.Lock()
RemoveFromSliceC(&list.Members, client.MessageChannel)
list.Unlock()
}
}
client.CurrentChannels = nil
client.Mutex.Unlock()
ChatSubscriptionLock.RUnlock()
}
func unsubscribeAllClients() {
GlobalSubscriptionInfo.Lock()
GlobalSubscriptionInfo.Members = nil
GlobalSubscriptionInfo.Unlock()
ChatSubscriptionLock.Lock()
ChatSubscriptionInfo = make(map[string]*SubscriberList)
ChatSubscriptionLock.Unlock()
}
const ReapingDelay = 20 * time.Minute
// Checks ChatSubscriptionInfo for entries with no subscribers every ReapingDelay.
// Started from SetupServer().
func pubsubJanitor() {
for {
time.Sleep(ReapingDelay)
var cleanedUp = make([]string, 0, 6)
ChatSubscriptionLock.Lock()
for key, val := range ChatSubscriptionInfo {
if val == nil || len(val.Members) == 0 {
delete(ChatSubscriptionInfo, key)
cleanedUp = append(cleanedUp, key)
}
}
ChatSubscriptionLock.Unlock()
if len(cleanedUp) != 0 {
err := SendCleanupTopicsNotice(cleanedUp)
if err != nil {
log.Println("error reporting cleaned subs:", err)
}
}
}
}
// Add a channel to the subscriptions while holding a read-lock to the map.
// Locks:
// - ALREADY HOLDING a read-lock to the 'which' top-level map via the rlocker object
// - possible write lock to the 'which' top-level map via the wlocker object
// - write lock to SubscriptionInfo (if not creating new)
func _subscribeWhileRlocked(channelName string, value chan<- ClientMessage) {
list := ChatSubscriptionInfo[channelName]
if list == nil {
// Not found, so create it
ChatSubscriptionLock.RUnlock()
ChatSubscriptionLock.Lock()
list = &SubscriberList{}
list.Members = []chan<- ClientMessage{value} // Create it populated, to avoid reaper
ChatSubscriptionInfo[channelName] = list
ChatSubscriptionLock.Unlock()
go func(topic string) {
err := SendNewTopicNotice(topic)
if err != nil {
log.Println("error reporting new sub:", err)
}
}(channelName)
ChatSubscriptionLock.RLock()
} else {
list.Lock()
AddToSliceC(&list.Members, value)
list.Unlock()
}
}

View file

@ -0,0 +1,448 @@
package server
import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"github.com/satori/go.uuid"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strconv"
"sync"
"syscall"
"testing"
"time"
)
func TCountOpenFDs() uint64 {
ary, _ := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid()))
return uint64(len(ary))
}
const IgnoreReceivedArguments = 1 + 2i
func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) {
var msg ClientMessage
var fail bool
messageType, packet, err := conn.ReadMessage()
if err != nil {
tb.Error(err)
return msg, false
}
if messageType != websocket.TextMessage {
tb.Error("got non-text message", packet)
return msg, false
}
err = UnmarshalClientMessage(packet, messageType, &msg)
if err != nil {
tb.Error(err)
return msg, false
}
if msg.MessageID != messageId {
tb.Error("Message ID was wrong. Expected", messageId, ", got", msg.MessageID, ":", msg)
fail = true
}
if msg.Command != command {
tb.Error("Command was wrong. Expected", command, ", got", msg.Command, ":", msg)
fail = true
}
if arguments != IgnoreReceivedArguments {
if arguments == nil {
if msg.origArguments != "" {
tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg)
}
} else {
argBytes, _ := json.Marshal(arguments)
if msg.origArguments != string(argBytes) {
tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg)
}
}
}
return msg, !fail
}
func TSendMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) bool {
SendMessage(conn, ClientMessage{MessageID: messageId, Command: command, Arguments: arguments})
return true
}
func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments interface{}, deleteMode bool) (url.Values, error) {
form := url.Values{}
form.Set("cmd", string(cmd))
argsBytes, err := json.Marshal(arguments)
if err != nil {
tb.Error(err)
return nil, err
}
form.Set("args", string(argsBytes))
form.Set("channel", channel)
if deleteMode {
form.Set("delete", "1")
}
form.Set("time", time.Now().Format(time.UnixDate))
sealed, err := SealRequest(form)
if err != nil {
tb.Error(err)
return nil, err
}
return sealed, nil
}
func TCheckResponse(tb testing.TB, resp *http.Response, expected string) bool {
var failed bool
respBytes, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
respStr := string(respBytes)
if err != nil {
tb.Error(err)
failed = true
}
if resp.StatusCode != 200 {
tb.Error("Publish failed: ", resp.StatusCode, respStr)
failed = true
}
if respStr != expected {
tb.Errorf("Got wrong response from server. Expected: '%s' Got: '%s'", expected, respStr)
failed = true
}
return !failed
}
type TURLs struct {
Websocket string
Origin string
PubMsg string
SavePubMsg string // update_and_pub
}
func TGetUrls(testserver *httptest.Server) TURLs {
addr := testserver.Listener.Addr().String()
return TURLs{
Websocket: fmt.Sprintf("ws://%s/", addr),
Origin: fmt.Sprintf("http://%s", addr),
PubMsg: fmt.Sprintf("http://%s/pub_msg", addr),
SavePubMsg: fmt.Sprintf("http://%s/update_and_pub", addr),
}
}
func TSetup(testserver **httptest.Server, urls *TURLs) {
DumpCache()
conf := &ConfigFile{
ServerId: 20,
UseSSL: false,
SocketOrigin: "localhost:2002",
BannerHTML: `
<!DOCTYPE html>
<title>CatBag</title>
<link rel="stylesheet" href="//cdn.frankerfacez.com/script/catbag.css">
<div id="container">
<div id="zf0"></div><div id="zf1"></div><div id="zf2"></div>
<div id="zf3"></div><div id="zf4"></div><div id="zf5"></div>
<div id="zf6"></div><div id="zf7"></div><div id="zf8"></div>
<div id="zf9"></div><div id="catbag"></div>
<div id="bottom">
A <a href="http://www.frankerfacez.com/">FrankerFaceZ</a> Service
&mdash; CatBag by <a href="http://www.twitch.tv/wolsk">Wolsk</a>
</div>
</div>
`,
OurPublicKey: []byte{176, 149, 72, 209, 35, 42, 110, 220, 22, 236, 212, 129, 213, 199, 1, 227, 185, 167, 150, 159, 117, 202, 164, 100, 9, 107, 45, 141, 122, 221, 155, 73},
OurPrivateKey: []byte{247, 133, 147, 194, 70, 240, 211, 216, 223, 16, 241, 253, 120, 14, 198, 74, 237, 180, 89, 33, 146, 146, 140, 58, 88, 160, 2, 246, 112, 35, 239, 87},
BackendPublicKey: []byte{19, 163, 37, 157, 50, 139, 193, 85, 229, 47, 166, 21, 153, 231, 31, 133, 41, 158, 8, 53, 73, 0, 113, 91, 13, 181, 131, 248, 176, 18, 1, 107},
}
gconfig = conf
SetupBackend(conf)
if testserver != nil {
serveMux := http.NewServeMux()
SetupServerAndHandle(conf, serveMux)
tserv := httptest.NewUnstartedServer(serveMux)
*testserver = tserv
tserv.Start()
if urls != nil {
*urls = TGetUrls(tserv)
}
}
}
func TestSubscriptionAndPublish(t *testing.T) {
var doneWg sync.WaitGroup
var readyWg sync.WaitGroup
const TestChannelName1 = "room.testchannel"
const TestChannelName2 = "room.chan2"
const TestChannelName3 = "room.chan3"
const TestChannelNameUnused = "room.empty"
const TestCommandChan = "testdata_single"
const TestCommandMulti = "testdata_multi"
const TestCommandGlobal = "testdata_global"
const TestData1 = "123456789"
const TestData2 = 42
const TestData3 = false
var TestData4 = []interface{}{"str1", "str2", "str3"}
ServerInitiatedCommands[TestCommandChan] = PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}
ServerInitiatedCommands[TestCommandMulti] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeMultichat}
ServerInitiatedCommands[TestCommandGlobal] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeGlobal}
var server *httptest.Server
var urls TURLs
TSetup(&server, &urls)
defer server.CloseClientConnections()
defer unsubscribeAllClients()
var conn *websocket.Conn
var resp *http.Response
var err error
// client 1: sub ch1, ch2
// client 2: sub ch1, ch3
// client 3: sub none
// client 4: delayed sub ch1
// msg 1: ch1
// msg 2: ch2, ch3
// msg 3: chEmpty
// msg 4: global
// Client 1
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
if err != nil {
t.Error(err)
return
}
doneWg.Add(1)
readyWg.Add(1)
go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "sub", TestChannelName1)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
TSendMessage(t, conn, 3, "sub", TestChannelName2) // 2
TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil)
TSendMessage(t, conn, 4, "ready", 0)
TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil)
readyWg.Done()
TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1)
TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2)
TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4)
conn.Close()
doneWg.Done()
}(conn)
// Client 2
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
if err != nil {
t.Error(err)
return
}
doneWg.Add(1)
readyWg.Add(1)
go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "sub", TestChannelName1)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
TSendMessage(t, conn, 3, "sub", TestChannelName3) // 3
TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil)
TSendMessage(t, conn, 4, "ready", 0)
TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil)
readyWg.Done()
TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1)
TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2)
TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4)
conn.Close()
doneWg.Done()
}(conn)
// Client 3
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
if err != nil {
t.Error(err)
return
}
doneWg.Add(1)
readyWg.Add(1)
go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "ready", 0)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
readyWg.Done()
TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4)
conn.Close()
doneWg.Done()
}(conn)
// Wait for clients 1-3
readyWg.Wait()
var form url.Values
// Publish message 1 - should go to clients 1, 2
form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelName1, TestData1, false)
if err != nil {
t.FailNow()
}
resp, err = http.PostForm(urls.SavePubMsg, form)
if !TCheckResponse(t, resp, strconv.Itoa(2)) {
t.FailNow()
}
// Publish message 2 - should go to clients 1, 2
form, err = TSealForSavePubMsg(t, TestCommandMulti, TestChannelName2+","+TestChannelName3, TestData2, false)
if err != nil {
t.FailNow()
}
resp, err = http.PostForm(urls.SavePubMsg, form)
if !TCheckResponse(t, resp, strconv.Itoa(2)) {
t.FailNow()
}
// Publish message 3 - should go to no clients
form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelNameUnused, TestData3, false)
if err != nil {
t.FailNow()
}
resp, err = http.PostForm(urls.SavePubMsg, form)
if !TCheckResponse(t, resp, strconv.Itoa(0)) {
t.FailNow()
}
// Publish message 4 - should go to clients 1, 2, 3
form, err = TSealForSavePubMsg(t, TestCommandGlobal, "", TestData4, false)
if err != nil {
t.FailNow()
}
resp, err = http.PostForm(urls.SavePubMsg, form)
if !TCheckResponse(t, resp, strconv.Itoa(3)) {
t.FailNow()
}
// Start client 4
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
if err != nil {
t.Error(err)
return
}
doneWg.Add(1)
readyWg.Add(1)
go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "sub", TestChannelName1)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
TSendMessage(t, conn, 3, "ready", 0)
TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil)
// backlog message
TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1)
readyWg.Done()
conn.Close()
doneWg.Done()
}(conn)
readyWg.Wait()
doneWg.Wait()
server.Close()
}
func BenchmarkUserSubscriptionSinglePublish(b *testing.B) {
var doneWg sync.WaitGroup
var readyWg sync.WaitGroup
const TestChannelName = "room.testchannel"
const TestCommand = "testdata"
const TestData = "123456789"
message := ClientMessage{MessageID: -1, Command: "testdata", Arguments: TestData}
fmt.Println()
fmt.Println(b.N)
var limit syscall.Rlimit
syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit)
limit.Cur = TCountOpenFDs() + uint64(b.N)*2 + 100
if limit.Cur > limit.Max {
b.Skip("Open file limit too low")
return
}
syscall.Setrlimit(syscall.RLIMIT_NOFILE, &limit)
var server *httptest.Server
var urls TURLs
TSetup(&server, &urls)
defer unsubscribeAllClients()
b.ResetTimer()
for i := 0; i < b.N; i++ {
conn, _, err := websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
if err != nil {
b.Error(err)
break
}
doneWg.Add(1)
readyWg.Add(1)
go func(i int, conn *websocket.Conn) {
TSendMessage(b, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TSendMessage(b, conn, 2, "sub", TestChannelName)
TReceiveExpectedMessage(b, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TReceiveExpectedMessage(b, conn, 2, SuccessCommand, nil)
readyWg.Done()
TReceiveExpectedMessage(b, conn, -1, TestCommand, TestData)
conn.Close()
doneWg.Done()
}(i, conn)
}
readyWg.Wait()
fmt.Println("publishing...")
if PublishToChannel(TestChannelName, message) != b.N {
b.Error("not enough sent")
server.CloseClientConnections()
panic("halting test instead of waiting")
}
doneWg.Wait()
fmt.Println("...done.")
b.StopTimer()
server.Close()
server.CloseClientConnections()
}