diff --git a/socketserver/internal/server/backend.go b/socketserver/internal/server/backend.go index 80eb3f64..9f316929 100644 --- a/socketserver/internal/server/backend.go +++ b/socketserver/internal/server/backend.go @@ -169,7 +169,7 @@ func RequestRemoteData(remoteCommand, data string, auth AuthInfo) (responseStr s func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) { formData := url.Values{ - "subs": chatSubs, + "subs": chatSubs, } sealedForm, err := SealRequest(formData) diff --git a/socketserver/internal/server/backlog.go b/socketserver/internal/server/backlog.go index 261bf2a6..2f38f835 100644 --- a/socketserver/internal/server/backlog.go +++ b/socketserver/internal/server/backlog.go @@ -4,6 +4,10 @@ import ( "errors" "fmt" "net/http" + "sort" + "strconv" + "strings" + "sync" "time" ) @@ -13,7 +17,7 @@ type PushCommandCacheInfo struct { } // this value is just docs right now -var ServerInitiatedCommands = map[string]PushCommandCacheInfo{ +var ServerInitiatedCommands = map[Command]PushCommandCacheInfo{ /// Global updates & notices "update_news": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global "message": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global @@ -22,7 +26,7 @@ var ServerInitiatedCommands = map[string]PushCommandCacheInfo{ /// Emote updates "reload_badges": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global "set_badge": {CacheTypeTimestamps, MsgTargetTypeMultichat}, // timecache:multichat - "reload_set": {}, // timecache:multichat + "reload_set": {}, // timecache:multichat "load_set": {}, // TODO what are the semantics of this? /// User auth @@ -31,7 +35,7 @@ var ServerInitiatedCommands = map[string]PushCommandCacheInfo{ /// Channel data // follow_sets: extra emote sets included in the chat // follow_buttons: extra follow buttons below the stream - "follow_sets": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:chat + "follow_sets": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:chat "follow_buttons": {CacheTypePersistent, MsgTargetTypeChat}, // mustcache:watching "srl_race": {CacheTypeLastOnly, MsgTargetTypeChat}, // cachelast:watching @@ -81,34 +85,216 @@ var ErrorUnrecognizedCacheType = errors.New("Invalid value for cachetype") // Returned by MessageTargetType.UnmarshalJSON() var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target") -type PersistentCachedMessage struct { - Timestamp time.Time - Channel string - Watching bool - Data string -} - type TimestampedGlobalMessage struct { Timestamp time.Time - Data string + Command Command + Data string } type TimestampedMultichatMessage struct { Timestamp time.Time - Channels string - Data string + Channels []string + Command Command + Data string } type LastSavedMessage struct { Timestamp time.Time - Data string + Data string } -// map command -> channel -> data -var CachedDataLast map[Command]map[string]string +// map is command -> channel -> data + +// CacheTypeLastOnly. Cleaned up by reaper goroutine every ~hour. +var CachedLastMessages map[Command]map[string]LastSavedMessage +var CachedLSMLock sync.RWMutex + +// CacheTypePersistent. Never cleaned. +var PersistentLastMessages map[Command]map[string]LastSavedMessage +var PersistentLSMLock sync.RWMutex + +var CachedGlobalMessages []TimestampedGlobalMessage +var CachedChannelMessages []TimestampedMultichatMessage +var CacheListsLock sync.RWMutex func DumpCache() { - CachedDataLast = make(map[Command]map[string]string) + CachedLSMLock.Lock() + CachedLastMessages = make(map[Command]map[string]LastSavedMessage) + CachedLSMLock.Unlock() + + PersistentLSMLock.Lock() + PersistentLastMessages = make(map[Command]map[string]LastSavedMessage) + // TODO delete file? + PersistentLSMLock.Unlock() + + CacheListsLock.Lock() + CachedGlobalMessages = make(tgmarray, 0) + CachedChannelMessages = make(tmmarray, 0) + CacheListsLock.Unlock() +} + +func SendBacklogForNewClient(client *ClientInfo) { + client.Mutex.Lock() // reading CurrentChannels + PersistentLSMLock.RLock() + for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypePersistent, MsgTargetTypeChat}) { + chanMap := CachedLastMessages[cmd] + if chanMap == nil { + continue + } + for _, channel := range client.CurrentChannels { + msg, ok := chanMap[channel] + if ok { + msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data} + msg.parseOrigArguments() + client.MessageChannel <- msg + } + } + } + PersistentLSMLock.RUnlock() + + CachedLSMLock.RLock() + for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}) { + chanMap := CachedLastMessages[cmd] + if chanMap == nil { + continue + } + for _, channel := range client.CurrentChannels { + msg, ok := chanMap[channel] + if ok { + msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data} + msg.parseOrigArguments() + client.MessageChannel <- msg + } + } + } + CachedLSMLock.RUnlock() + client.Mutex.Unlock() +} + +func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) { + client.Mutex.Lock() // reading CurrentChannels + CacheListsLock.RLock() + + globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime) + + for i := globIdx; i < len(CachedGlobalMessages); i++ { + item := CachedGlobalMessages[i] + msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data} + msg.parseOrigArguments() + client.MessageChannel <- msg + } + + chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime) + + for i := chanIdx; i < len(CachedChannelMessages); i++ { + item := CachedChannelMessages[i] + var send bool + for _, channel := range item.Channels { + for _, matchChannel := range client.CurrentChannels { + if channel == matchChannel { + send = true + break + } + } + if send { + break + } + } + if send { + msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data} + msg.parseOrigArguments() + client.MessageChannel <- msg + } + } + + CacheListsLock.RUnlock() + client.Mutex.Unlock() +} + +func InsertionSort(ary sort.Interface) { + for i := 1; i < ary.Len(); i++ { + for j := i; j > 0 && ary.Less(j, j-1); j-- { + ary.Swap(j, j-1) + } + } +} + +type TimestampArray interface { + Len() int + GetTime(int) time.Time +} + +func FindFirstNewMessage(ary TimestampArray, disconnectTime time.Time) (idx int) { + // TODO needs tests + len := ary.Len() + i := len + + // Walk backwards until we find GetTime() before disconnectTime + step := 1 + for i > 0 { + i -= step + if i < 0 { + i = 0 + } + if !ary.GetTime(i).After(disconnectTime) { + break + } + step = int(float64(step)*1.5) + 1 + } + + // Walk forwards until we find GetTime() after disconnectTime + for i < len && !ary.GetTime(i).After(disconnectTime) { + i++ + } + + if i == len { + return -1 + } + return i +} + +func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync.Locker, cmd Command, channel string, timestamp time.Time, data string, deleting bool) { + locker.Lock() + defer locker.Unlock() + + chanMap, ok := CachedLastMessages[cmd] + if !ok { + if deleting { + return + } + chanMap = make(map[string]LastSavedMessage) + CachedLastMessages[cmd] = chanMap + } + + if deleting { + delete(chanMap, channel) + } else { + chanMap[channel] = LastSavedMessage{timestamp, data} + } +} + +func SaveGlobalMessage(cmd Command, timestamp time.Time, data string) { + CacheListsLock.Lock() + CachedGlobalMessages = append(CachedGlobalMessages, TimestampedGlobalMessage{timestamp, cmd, data}) + InsertionSort(tgmarray(CachedGlobalMessages)) + CacheListsLock.Unlock() +} + +func SaveMultichanMessage(cmd Command, channels string, timestamp time.Time, data string) { + CacheListsLock.Lock() + CachedChannelMessages = append(CachedChannelMessages, TimestampedMultichatMessage{timestamp, strings.Split(channels, ","), cmd, data}) + InsertionSort(tmmarray(CachedChannelMessages)) + CacheListsLock.Unlock() +} + +func GetCommandsOfType(match PushCommandCacheInfo) []Command { + var ret []Command + for cmd, info := range ServerInitiatedCommands { + if info == match { + ret = append(ret, cmd) + } + } + return ret } func HBackendDumpCache(w http.ResponseWriter, r *http.Request) { @@ -138,9 +324,16 @@ func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) { return } - cmd := formData.Get("cmd") + cmd := Command(formData.Get("cmd")) json := formData.Get("args") channel := formData.Get("channel") + deleteMode := formData.Get("delete") != "" + timeStr := formData.Get("time") + timestamp, err := time.Parse(time.UnixDate, timeStr) + if err != nil { + w.WriteHeader(422) + fmt.Fprintf(w, "error parsing time: %v", err) + } cacheinfo, ok := ServerInitiatedCommands[cmd] if !ok { @@ -149,7 +342,23 @@ func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) { return } - _ = cacheinfo - _ = json - _ = channel + var count int + msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: json} + msg.parseOrigArguments() + + if cacheinfo.Caching == CacheTypeLastOnly && cacheinfo.Target == MsgTargetTypeChat { + SaveLastMessage(CachedLastMessages, &CachedLSMLock, cmd, channel, timestamp, json, deleteMode) + count = PublishToChat(channel, msg) + } else if cacheinfo.Caching == CacheTypePersistent && cacheinfo.Target == MsgTargetTypeChat { + SaveLastMessage(PersistentLastMessages, &PersistentLSMLock, cmd, channel, timestamp, json, deleteMode) + count = PublishToChat(channel, msg) + } else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeMultichat { + SaveMultichanMessage(cmd, channel, timestamp, json) + count = PublishToMultiple(strings.Split(channel, ","), msg) + } else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeGlobal { + SaveGlobalMessage(cmd, timestamp, json) + count = PublishToAll(msg) + } + + w.Write([]byte(strconv.Itoa(count))) } diff --git a/socketserver/internal/server/backlog_test.go b/socketserver/internal/server/backlog_test.go new file mode 100644 index 00000000..68757587 --- /dev/null +++ b/socketserver/internal/server/backlog_test.go @@ -0,0 +1,76 @@ +package server + +import ( + "testing" + "time" +) + +func TestFindFirstNewMessageEmpty(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{} + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != -1 { + t.Errorf("Expected -1, got %d", i) + } +} +func TestFindFirstNewMessageOneBefore(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{ + {Timestamp: time.Unix(8, 0)}, + } + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != -1 { + t.Errorf("Expected -1, got %d", i) + } +} +func TestFindFirstNewMessageSeveralBefore(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{ + {Timestamp: time.Unix(1, 0)}, + {Timestamp: time.Unix(2, 0)}, + {Timestamp: time.Unix(3, 0)}, + {Timestamp: time.Unix(4, 0)}, + {Timestamp: time.Unix(5, 0)}, + } + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != -1 { + t.Errorf("Expected -1, got %d", i) + } +} +func TestFindFirstNewMessageInMiddle(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{ + {Timestamp: time.Unix(1, 0)}, + {Timestamp: time.Unix(2, 0)}, + {Timestamp: time.Unix(3, 0)}, + {Timestamp: time.Unix(4, 0)}, + {Timestamp: time.Unix(5, 0)}, + {Timestamp: time.Unix(11, 0)}, + {Timestamp: time.Unix(12, 0)}, + {Timestamp: time.Unix(13, 0)}, + {Timestamp: time.Unix(14, 0)}, + {Timestamp: time.Unix(15, 0)}, + } + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != 5 { + t.Errorf("Expected 5, got %d", i) + } +} +func TestFindFirstNewMessageOneAfter(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{ + {Timestamp: time.Unix(15, 0)}, + } + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != 0 { + t.Errorf("Expected 0, got %d", i) + } +} +func TestFindFirstNewMessageSeveralAfter(t *testing.T) { + CachedGlobalMessages = []TimestampedGlobalMessage{ + {Timestamp: time.Unix(11, 0)}, + {Timestamp: time.Unix(12, 0)}, + {Timestamp: time.Unix(13, 0)}, + {Timestamp: time.Unix(14, 0)}, + {Timestamp: time.Unix(15, 0)}, + } + i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) + if i != 0 { + t.Errorf("Expected 0, got %d", i) + } +} diff --git a/socketserver/internal/server/commands.go b/socketserver/internal/server/commands.go index 5fa82577..c947bf08 100644 --- a/socketserver/internal/server/commands.go +++ b/socketserver/internal/server/commands.go @@ -1,6 +1,7 @@ package server import ( + "fmt" "github.com/satori/go.uuid" "golang.org/x/net/websocket" "log" @@ -17,22 +18,25 @@ const ChannelInfoDelay = 2 * time.Second func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) { handler, ok := CommandHandlers[msg.Command] if !ok { - log.Print("[!] Unknown command", msg.Command, "- sent by client", client.ClientID, "@", conn.RemoteAddr()) - // uncomment after commands are implemented - // closer() + log.Println("[!] Unknown command", msg.Command, "- sent by client", client.ClientID, "@", conn.RemoteAddr()) + FFZCodec.Send(conn, ClientMessage{ + MessageID: msg.MessageID, + Command: "error", + Arguments: fmt.Sprintf("Unknown command %s", msg.Command), + }) return } - // log.Println(conn.RemoteAddr(), msg.MessageID, msg.Command, msg.Arguments) - response, err := CallHandler(handler, conn, client, msg) if err == nil { - response.MessageID = msg.MessageID - FFZCodec.Send(conn, response) - } else if response.Command == AsyncResponseCommand { - // Don't send anything - // The response will be delivered over client.MessageChannel / serverMessageChan + if response.Command == AsyncResponseCommand { + // Don't send anything + // The response will be delivered over client.MessageChannel / serverMessageChan + } else { + response.MessageID = msg.MessageID + FFZCodec.Send(conn, response) + } } else { FFZCodec.Send(conn, ClientMessage{ MessageID: msg.MessageID, @@ -61,6 +65,42 @@ func HandleHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (r }, nil } +func HandleReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { + disconnectAt, err := msg.ArgumentsAsInt() + if err != nil { + return + } + + client.Mutex.Lock() + if client.MakePendingRequests != nil { + if !client.MakePendingRequests.Stop() { + // Timer already fired, GetSubscriptionBacklog() has started + rmsg.Command = SuccessCommand + return + } + } + client.PendingSubscriptionsBacklog = nil + client.MakePendingRequests = nil + client.Mutex.Unlock() + + if disconnectAt == 0 { + // backlog only + go func() { + client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand} + SendBacklogForNewClient(client) + }() + return ClientMessage{Command: AsyncResponseCommand}, nil + } else { + // backlog and timed + go func() { + client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand} + SendBacklogForNewClient(client) + SendTimedBacklogMessages(client, time.Unix(disconnectAt, 0)) + }() + return ClientMessage{Command: AsyncResponseCommand}, nil + } +} + func HandleSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { username, err := msg.ArgumentsAsString() if err != nil { diff --git a/socketserver/internal/server/handlecore.go b/socketserver/internal/server/handlecore.go index 3e1af8b3..83d05ac4 100644 --- a/socketserver/internal/server/handlecore.go +++ b/socketserver/internal/server/handlecore.go @@ -39,9 +39,10 @@ type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMes var CommandHandlers = map[Command]CommandHandler{ HelloCommand: HandleHello, "setuser": HandleSetUser, + "ready": HandleReady, - "sub": HandleSub, - "unsub": HandleUnsub, + "sub": HandleSub, + "unsub": HandleUnsub, "track_follow": HandleTrackFollow, "emoticon_uses": HandleEmoticonUses, @@ -335,7 +336,7 @@ func (cm *ClientMessage) ArgumentsAsString() (string1 string, err error) { } // Convenience method: Parse the arguments of the ClientMessage as a single int. -func (cm *ClientMessage) ArgumentsAsInt() (int1 int, err error) { +func (cm *ClientMessage) ArgumentsAsInt() (int1 int64, err error) { var ok bool var num float64 num, ok = cm.Arguments.(float64) @@ -343,7 +344,7 @@ func (cm *ClientMessage) ArgumentsAsInt() (int1 int, err error) { err = ExpectedSingleInt return } else { - int1 = int(num) + int1 = int64(num) return int1, nil } } diff --git a/socketserver/internal/server/publisher.go b/socketserver/internal/server/publisher.go index afb2d944..d9658ac7 100644 --- a/socketserver/internal/server/publisher.go +++ b/socketserver/internal/server/publisher.go @@ -32,6 +32,31 @@ func PublishToChat(channel string, msg ClientMessage) (count int) { return } +func PublishToMultiple(channels []string, msg ClientMessage) (count int) { + found := make(map[chan<- ClientMessage]struct{}) + + ChatSubscriptionLock.RLock() + + for _, channel := range channels { + list := ChatSubscriptionInfo[channel] + if list != nil { + list.RLock() + for _, msgChan := range list.Members { + found[msgChan] = struct{}{} + } + list.RUnlock() + } + } + + ChatSubscriptionLock.RUnlock() + + for msgChan, _ := range found { + msgChan <- msg + count++ + } + return +} + func PublishToAll(msg ClientMessage) (count int) { GlobalSubscriptionInfo.RLock() for _, msgChan := range GlobalSubscriptionInfo.Members { diff --git a/socketserver/internal/server/publisher_test.go b/socketserver/internal/server/publisher_test.go index ebc42ec6..e878f709 100644 --- a/socketserver/internal/server/publisher_test.go +++ b/socketserver/internal/server/publisher_test.go @@ -10,9 +10,11 @@ import ( "net/http/httptest" "net/url" "os" + "strconv" "sync" "syscall" "testing" + "time" ) func TCountOpenFDs() uint64 { @@ -20,7 +22,8 @@ func TCountOpenFDs() uint64 { return uint64(len(ary)) } -const IgnoreReceivedArguments = 1+2i +const IgnoreReceivedArguments = 1 + 2i + func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) { var msg ClientMessage var fail bool @@ -38,8 +41,15 @@ func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, fail = true } if arguments != IgnoreReceivedArguments { - if msg.Arguments != arguments { - tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg) + if arguments == nil { + if msg.origArguments != "" { + tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg) + } + } else { + argBytes, _ := json.Marshal(arguments) + if msg.origArguments != string(argBytes) { + tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg) + } } } return msg, !fail @@ -53,18 +63,66 @@ func TSendMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Co return err == nil } +func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments interface{}, deleteMode bool) (url.Values, error) { + form := url.Values{} + form.Set("cmd", string(cmd)) + argsBytes, err := json.Marshal(arguments) + if err != nil { + tb.Error(err) + return nil, err + } + form.Set("args", string(argsBytes)) + form.Set("channel", channel) + if deleteMode { + form.Set("delete", "1") + } + form.Set("time", time.Now().Format(time.UnixDate)) + + sealed, err := SealRequest(form) + if err != nil { + tb.Error(err) + return nil, err + } + return sealed, nil +} + +func TCheckResponse(tb testing.TB, resp *http.Response, expected string) bool { + var failed bool + respBytes, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + respStr := string(respBytes) + + if err != nil { + tb.Error(err) + failed = true + } + + if resp.StatusCode != 200 { + tb.Error("Publish failed: ", resp.StatusCode, respStr) + failed = true + } + + if respStr != expected { + tb.Errorf("Got wrong response from server. Expected: '%s' Got: '%s'", expected, respStr) + failed = true + } + return !failed +} + type TURLs struct { - Websocket string - Origin string - PubMsg string + Websocket string + Origin string + PubMsg string + SavePubMsg string // update_and_pub } func TGetUrls(testserver *httptest.Server) TURLs { addr := testserver.Listener.Addr().String() return TURLs{ - Websocket: fmt.Sprintf("ws://%s/", addr), - Origin: fmt.Sprintf("http://%s", addr), - PubMsg: fmt.Sprintf("http://%s/pub_msg", addr), + Websocket: fmt.Sprintf("ws://%s/", addr), + Origin: fmt.Sprintf("http://%s", addr), + PubMsg: fmt.Sprintf("http://%s/pub_msg", addr), + SavePubMsg: fmt.Sprintf("http://%s/update_and_pub", addr), } } @@ -98,32 +156,192 @@ func TestSubscriptionAndPublish(t *testing.T) { var doneWg sync.WaitGroup var readyWg sync.WaitGroup - const TestChannelName = "room.testchannel" - const TestCommand = "testdata" - const TestData = "123456789" + const TestChannelName1 = "room.testchannel" + const TestChannelName2 = "room.chan2" + const TestChannelName3 = "room.chan3" + const TestChannelNameUnused = "room.empty" + const TestCommandChan = "testdata_single" + const TestCommandMulti = "testdata_multi" + const TestCommandGlobal = "testdata_global" + const TestData1 = "123456789" + const TestData2 = 42 + const TestData3 = false + var TestData4 = []interface{}{"str1", "str2", "str3"} + + ServerInitiatedCommands[TestCommandChan] = PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat} + ServerInitiatedCommands[TestCommandMulti] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeMultichat} + ServerInitiatedCommands[TestCommandGlobal] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeGlobal} var server *httptest.Server var urls TURLs TSetup(&server, &urls) + defer server.CloseClientConnections() defer unsubscribeAllClients() - conn, err := websocket.Dial(urls.Websocket, "", urls.Origin) + var conn *websocket.Conn + var err error + + // client 1: sub ch1, ch2 + // client 2: sub ch1, ch3 + // client 3: sub none + // client 4: delayed sub ch1 + // msg 1: ch1 + // msg 2: ch2, ch3 + // msg 3: chEmpty + // msg 4: global + + // Client 1 + conn, err = websocket.Dial(urls.Websocket, "", urls.Origin) if err != nil { t.Error(err) return } + doneWg.Add(1) readyWg.Add(1) - go func(conn *websocket.Conn) { TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) - TSendMessage(t, conn, 2, "sub", TestChannelName) + TSendMessage(t, conn, 2, "sub", TestChannelName1) + TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) + TSendMessage(t, conn, 3, "sub", TestChannelName2) // 2 + TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil) + TSendMessage(t, conn, 4, "ready", 0) + TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil) + + readyWg.Done() + + TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1) + TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2) + TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4) + + conn.Close() + doneWg.Done() + }(conn) + + // Client 2 + conn, err = websocket.Dial(urls.Websocket, "", urls.Origin) + if err != nil { + t.Error(err) + return + } + + doneWg.Add(1) + readyWg.Add(1) + go func(conn *websocket.Conn) { + TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) + TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) + TSendMessage(t, conn, 2, "sub", TestChannelName1) + TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) + TSendMessage(t, conn, 3, "sub", TestChannelName3) // 3 + TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil) + TSendMessage(t, conn, 4, "ready", 0) + TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil) + + readyWg.Done() + + TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1) + TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2) + TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4) + + conn.Close() + doneWg.Done() + }(conn) + + // Client 3 + conn, err = websocket.Dial(urls.Websocket, "", urls.Origin) + if err != nil { + t.Error(err) + return + } + + doneWg.Add(1) + readyWg.Add(1) + go func(conn *websocket.Conn) { + TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) + TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) + TSendMessage(t, conn, 2, "ready", 0) TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) readyWg.Done() - TReceiveExpectedMessage(t, conn, -1, TestCommand, TestData) + TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4) + + conn.Close() + doneWg.Done() + }(conn) + + // Wait for clients 1-3 + readyWg.Wait() + + var form url.Values + var resp *http.Response + + // Publish message 1 - should go to clients 1, 2 + + form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelName1, TestData1, false) + if err != nil { + t.FailNow() + } + resp, err = http.PostForm(urls.SavePubMsg, form) + if !TCheckResponse(t, resp, strconv.Itoa(2)) { + t.FailNow() + } + + // Publish message 2 - should go to clients 1, 2 + + form, err = TSealForSavePubMsg(t, TestCommandMulti, TestChannelName2+","+TestChannelName3, TestData2, false) + if err != nil { + t.FailNow() + } + resp, err = http.PostForm(urls.SavePubMsg, form) + if !TCheckResponse(t, resp, strconv.Itoa(2)) { + t.FailNow() + } + + // Publish message 3 - should go to no clients + + form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelNameUnused, TestData3, false) + if err != nil { + t.FailNow() + } + resp, err = http.PostForm(urls.SavePubMsg, form) + if !TCheckResponse(t, resp, strconv.Itoa(0)) { + t.FailNow() + } + + // Publish message 4 - should go to clients 1, 2, 3 + + form, err = TSealForSavePubMsg(t, TestCommandGlobal, "", TestData4, false) + if err != nil { + t.FailNow() + } + resp, err = http.PostForm(urls.SavePubMsg, form) + if !TCheckResponse(t, resp, strconv.Itoa(3)) { + t.FailNow() + } + + // Start client 4 + conn, err = websocket.Dial(urls.Websocket, "", urls.Origin) + if err != nil { + t.Error(err) + return + } + + doneWg.Add(1) + readyWg.Add(1) + go func(conn *websocket.Conn) { + TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) + TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) + TSendMessage(t, conn, 2, "sub", TestChannelName1) + TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) + TSendMessage(t, conn, 3, "ready", 0) + TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil) + + // backlog message + TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1) + + readyWg.Done() conn.Close() doneWg.Done() @@ -131,37 +349,6 @@ func TestSubscriptionAndPublish(t *testing.T) { readyWg.Wait() - form := url.Values{} - form.Set("cmd", TestCommand) - argsBytes, _ := json.Marshal(TestData) - form.Set("args", string(argsBytes)) - form.Set("channel", TestChannelName) - form.Set("scope", MsgTargetTypeChat.Name()) - - sealedForm, err := SealRequest(form) - if err != nil { - t.Error(err) - server.CloseClientConnections() - panic("halting test") - } - - resp, err := http.PostForm(urls.PubMsg, sealedForm) - if err != nil { - t.Error(err) - server.CloseClientConnections() - panic("halting test") - } - - respBytes, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - respStr := string(respBytes) - - if resp.StatusCode != 200 { - t.Error("Publish failed: ", resp.StatusCode, respStr) - server.CloseClientConnections() - panic("halting test") - } - doneWg.Wait() server.Close() } diff --git a/socketserver/internal/server/types.go b/socketserver/internal/server/types.go index f99e059d..a2bd074d 100644 --- a/socketserver/internal/server/types.go +++ b/socketserver/internal/server/types.go @@ -76,6 +76,34 @@ type ClientInfo struct { MessageChannel chan<- ClientMessage } +type tgmarray []TimestampedGlobalMessage +type tmmarray []TimestampedMultichatMessage + +func (ta tgmarray) Len() int { + return len(ta) +} +func (ta tgmarray) Less(i, j int) bool { + return ta[i].Timestamp.Before(ta[j].Timestamp) +} +func (ta tgmarray) Swap(i, j int) { + ta[i], ta[j] = ta[j], ta[i] +} +func (ta tgmarray) GetTime(i int) time.Time { + return ta[i].Timestamp +} +func (ta tmmarray) Len() int { + return len(ta) +} +func (ta tmmarray) Less(i, j int) bool { + return ta[i].Timestamp.Before(ta[j].Timestamp) +} +func (ta tmmarray) Swap(i, j int) { + ta[i], ta[j] = ta[j], ta[i] +} +func (ta tmmarray) GetTime(i int) time.Time { + return ta[i].Timestamp +} + func (bct BacklogCacheType) Name() string { switch bct { case CacheTypeInvalid: