1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-08-10 16:10:55 +00:00

Implement backlog and test it

This commit is contained in:
Kane York 2015-10-26 14:55:20 -07:00
parent 44bcd7df05
commit 8ba87e1a27
8 changed files with 649 additions and 83 deletions

View file

@ -4,6 +4,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"sort"
"strconv"
"strings"
"sync"
"time" "time"
) )
@ -13,7 +17,7 @@ type PushCommandCacheInfo struct {
} }
// this value is just docs right now // this value is just docs right now
var ServerInitiatedCommands = map[string]PushCommandCacheInfo{ var ServerInitiatedCommands = map[Command]PushCommandCacheInfo{
/// Global updates & notices /// Global updates & notices
"update_news": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global "update_news": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
"message": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global "message": {CacheTypeTimestamps, MsgTargetTypeGlobal}, // timecache:global
@ -81,21 +85,16 @@ var ErrorUnrecognizedCacheType = errors.New("Invalid value for cachetype")
// Returned by MessageTargetType.UnmarshalJSON() // Returned by MessageTargetType.UnmarshalJSON()
var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target") var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target")
type PersistentCachedMessage struct {
Timestamp time.Time
Channel string
Watching bool
Data string
}
type TimestampedGlobalMessage struct { type TimestampedGlobalMessage struct {
Timestamp time.Time Timestamp time.Time
Command Command
Data string Data string
} }
type TimestampedMultichatMessage struct { type TimestampedMultichatMessage struct {
Timestamp time.Time Timestamp time.Time
Channels string Channels []string
Command Command
Data string Data string
} }
@ -104,11 +103,198 @@ type LastSavedMessage struct {
Data string Data string
} }
// map command -> channel -> data // map is command -> channel -> data
var CachedDataLast map[Command]map[string]string
// CacheTypeLastOnly. Cleaned up by reaper goroutine every ~hour.
var CachedLastMessages map[Command]map[string]LastSavedMessage
var CachedLSMLock sync.RWMutex
// CacheTypePersistent. Never cleaned.
var PersistentLastMessages map[Command]map[string]LastSavedMessage
var PersistentLSMLock sync.RWMutex
var CachedGlobalMessages []TimestampedGlobalMessage
var CachedChannelMessages []TimestampedMultichatMessage
var CacheListsLock sync.RWMutex
func DumpCache() { func DumpCache() {
CachedDataLast = make(map[Command]map[string]string) CachedLSMLock.Lock()
CachedLastMessages = make(map[Command]map[string]LastSavedMessage)
CachedLSMLock.Unlock()
PersistentLSMLock.Lock()
PersistentLastMessages = make(map[Command]map[string]LastSavedMessage)
// TODO delete file?
PersistentLSMLock.Unlock()
CacheListsLock.Lock()
CachedGlobalMessages = make(tgmarray, 0)
CachedChannelMessages = make(tmmarray, 0)
CacheListsLock.Unlock()
}
func SendBacklogForNewClient(client *ClientInfo) {
client.Mutex.Lock() // reading CurrentChannels
PersistentLSMLock.RLock()
for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypePersistent, MsgTargetTypeChat}) {
chanMap := CachedLastMessages[cmd]
if chanMap == nil {
continue
}
for _, channel := range client.CurrentChannels {
msg, ok := chanMap[channel]
if ok {
msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data}
msg.parseOrigArguments()
client.MessageChannel <- msg
}
}
}
PersistentLSMLock.RUnlock()
CachedLSMLock.RLock()
for _, cmd := range GetCommandsOfType(PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}) {
chanMap := CachedLastMessages[cmd]
if chanMap == nil {
continue
}
for _, channel := range client.CurrentChannels {
msg, ok := chanMap[channel]
if ok {
msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data}
msg.parseOrigArguments()
client.MessageChannel <- msg
}
}
}
CachedLSMLock.RUnlock()
client.Mutex.Unlock()
}
func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) {
client.Mutex.Lock() // reading CurrentChannels
CacheListsLock.RLock()
globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime)
for i := globIdx; i < len(CachedGlobalMessages); i++ {
item := CachedGlobalMessages[i]
msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data}
msg.parseOrigArguments()
client.MessageChannel <- msg
}
chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime)
for i := chanIdx; i < len(CachedChannelMessages); i++ {
item := CachedChannelMessages[i]
var send bool
for _, channel := range item.Channels {
for _, matchChannel := range client.CurrentChannels {
if channel == matchChannel {
send = true
break
}
}
if send {
break
}
}
if send {
msg := ClientMessage{MessageID: -1, Command: item.Command, origArguments: item.Data}
msg.parseOrigArguments()
client.MessageChannel <- msg
}
}
CacheListsLock.RUnlock()
client.Mutex.Unlock()
}
func InsertionSort(ary sort.Interface) {
for i := 1; i < ary.Len(); i++ {
for j := i; j > 0 && ary.Less(j, j-1); j-- {
ary.Swap(j, j-1)
}
}
}
type TimestampArray interface {
Len() int
GetTime(int) time.Time
}
func FindFirstNewMessage(ary TimestampArray, disconnectTime time.Time) (idx int) {
// TODO needs tests
len := ary.Len()
i := len
// Walk backwards until we find GetTime() before disconnectTime
step := 1
for i > 0 {
i -= step
if i < 0 {
i = 0
}
if !ary.GetTime(i).After(disconnectTime) {
break
}
step = int(float64(step)*1.5) + 1
}
// Walk forwards until we find GetTime() after disconnectTime
for i < len && !ary.GetTime(i).After(disconnectTime) {
i++
}
if i == len {
return -1
}
return i
}
func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync.Locker, cmd Command, channel string, timestamp time.Time, data string, deleting bool) {
locker.Lock()
defer locker.Unlock()
chanMap, ok := CachedLastMessages[cmd]
if !ok {
if deleting {
return
}
chanMap = make(map[string]LastSavedMessage)
CachedLastMessages[cmd] = chanMap
}
if deleting {
delete(chanMap, channel)
} else {
chanMap[channel] = LastSavedMessage{timestamp, data}
}
}
func SaveGlobalMessage(cmd Command, timestamp time.Time, data string) {
CacheListsLock.Lock()
CachedGlobalMessages = append(CachedGlobalMessages, TimestampedGlobalMessage{timestamp, cmd, data})
InsertionSort(tgmarray(CachedGlobalMessages))
CacheListsLock.Unlock()
}
func SaveMultichanMessage(cmd Command, channels string, timestamp time.Time, data string) {
CacheListsLock.Lock()
CachedChannelMessages = append(CachedChannelMessages, TimestampedMultichatMessage{timestamp, strings.Split(channels, ","), cmd, data})
InsertionSort(tmmarray(CachedChannelMessages))
CacheListsLock.Unlock()
}
func GetCommandsOfType(match PushCommandCacheInfo) []Command {
var ret []Command
for cmd, info := range ServerInitiatedCommands {
if info == match {
ret = append(ret, cmd)
}
}
return ret
} }
func HBackendDumpCache(w http.ResponseWriter, r *http.Request) { func HBackendDumpCache(w http.ResponseWriter, r *http.Request) {
@ -138,9 +324,16 @@ func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) {
return return
} }
cmd := formData.Get("cmd") cmd := Command(formData.Get("cmd"))
json := formData.Get("args") json := formData.Get("args")
channel := formData.Get("channel") channel := formData.Get("channel")
deleteMode := formData.Get("delete") != ""
timeStr := formData.Get("time")
timestamp, err := time.Parse(time.UnixDate, timeStr)
if err != nil {
w.WriteHeader(422)
fmt.Fprintf(w, "error parsing time: %v", err)
}
cacheinfo, ok := ServerInitiatedCommands[cmd] cacheinfo, ok := ServerInitiatedCommands[cmd]
if !ok { if !ok {
@ -149,7 +342,23 @@ func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) {
return return
} }
_ = cacheinfo var count int
_ = json msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: json}
_ = channel msg.parseOrigArguments()
if cacheinfo.Caching == CacheTypeLastOnly && cacheinfo.Target == MsgTargetTypeChat {
SaveLastMessage(CachedLastMessages, &CachedLSMLock, cmd, channel, timestamp, json, deleteMode)
count = PublishToChat(channel, msg)
} else if cacheinfo.Caching == CacheTypePersistent && cacheinfo.Target == MsgTargetTypeChat {
SaveLastMessage(PersistentLastMessages, &PersistentLSMLock, cmd, channel, timestamp, json, deleteMode)
count = PublishToChat(channel, msg)
} else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeMultichat {
SaveMultichanMessage(cmd, channel, timestamp, json)
count = PublishToMultiple(strings.Split(channel, ","), msg)
} else if cacheinfo.Caching == CacheTypeTimestamps && cacheinfo.Target == MsgTargetTypeGlobal {
SaveGlobalMessage(cmd, timestamp, json)
count = PublishToAll(msg)
}
w.Write([]byte(strconv.Itoa(count)))
} }

View file

@ -0,0 +1,76 @@
package server
import (
"testing"
"time"
)
func TestFindFirstNewMessageEmpty(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != -1 {
t.Errorf("Expected -1, got %d", i)
}
}
func TestFindFirstNewMessageOneBefore(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(8, 0)},
}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != -1 {
t.Errorf("Expected -1, got %d", i)
}
}
func TestFindFirstNewMessageSeveralBefore(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(1, 0)},
{Timestamp: time.Unix(2, 0)},
{Timestamp: time.Unix(3, 0)},
{Timestamp: time.Unix(4, 0)},
{Timestamp: time.Unix(5, 0)},
}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != -1 {
t.Errorf("Expected -1, got %d", i)
}
}
func TestFindFirstNewMessageInMiddle(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(1, 0)},
{Timestamp: time.Unix(2, 0)},
{Timestamp: time.Unix(3, 0)},
{Timestamp: time.Unix(4, 0)},
{Timestamp: time.Unix(5, 0)},
{Timestamp: time.Unix(11, 0)},
{Timestamp: time.Unix(12, 0)},
{Timestamp: time.Unix(13, 0)},
{Timestamp: time.Unix(14, 0)},
{Timestamp: time.Unix(15, 0)},
}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != 5 {
t.Errorf("Expected 5, got %d", i)
}
}
func TestFindFirstNewMessageOneAfter(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(15, 0)},
}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != 0 {
t.Errorf("Expected 0, got %d", i)
}
}
func TestFindFirstNewMessageSeveralAfter(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(11, 0)},
{Timestamp: time.Unix(12, 0)},
{Timestamp: time.Unix(13, 0)},
{Timestamp: time.Unix(14, 0)},
{Timestamp: time.Unix(15, 0)},
}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != 0 {
t.Errorf("Expected 0, got %d", i)
}
}

View file

@ -1,6 +1,7 @@
package server package server
import ( import (
"fmt"
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
"log" "log"
@ -17,22 +18,25 @@ const ChannelInfoDelay = 2 * time.Second
func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) { func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) {
handler, ok := CommandHandlers[msg.Command] handler, ok := CommandHandlers[msg.Command]
if !ok { if !ok {
log.Print("[!] Unknown command", msg.Command, "- sent by client", client.ClientID, "@", conn.RemoteAddr()) log.Println("[!] Unknown command", msg.Command, "- sent by client", client.ClientID, "@", conn.RemoteAddr())
// uncomment after commands are implemented FFZCodec.Send(conn, ClientMessage{
// closer() MessageID: msg.MessageID,
Command: "error",
Arguments: fmt.Sprintf("Unknown command %s", msg.Command),
})
return return
} }
// log.Println(conn.RemoteAddr(), msg.MessageID, msg.Command, msg.Arguments)
response, err := CallHandler(handler, conn, client, msg) response, err := CallHandler(handler, conn, client, msg)
if err == nil { if err == nil {
response.MessageID = msg.MessageID if response.Command == AsyncResponseCommand {
FFZCodec.Send(conn, response)
} else if response.Command == AsyncResponseCommand {
// Don't send anything // Don't send anything
// The response will be delivered over client.MessageChannel / serverMessageChan // The response will be delivered over client.MessageChannel / serverMessageChan
} else {
response.MessageID = msg.MessageID
FFZCodec.Send(conn, response)
}
} else { } else {
FFZCodec.Send(conn, ClientMessage{ FFZCodec.Send(conn, ClientMessage{
MessageID: msg.MessageID, MessageID: msg.MessageID,
@ -61,6 +65,42 @@ func HandleHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (r
}, nil }, nil
} }
func HandleReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
disconnectAt, err := msg.ArgumentsAsInt()
if err != nil {
return
}
client.Mutex.Lock()
if client.MakePendingRequests != nil {
if !client.MakePendingRequests.Stop() {
// Timer already fired, GetSubscriptionBacklog() has started
rmsg.Command = SuccessCommand
return
}
}
client.PendingSubscriptionsBacklog = nil
client.MakePendingRequests = nil
client.Mutex.Unlock()
if disconnectAt == 0 {
// backlog only
go func() {
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand}
SendBacklogForNewClient(client)
}()
return ClientMessage{Command: AsyncResponseCommand}, nil
} else {
// backlog and timed
go func() {
client.MessageChannel <- ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand}
SendBacklogForNewClient(client)
SendTimedBacklogMessages(client, time.Unix(disconnectAt, 0))
}()
return ClientMessage{Command: AsyncResponseCommand}, nil
}
}
func HandleSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
username, err := msg.ArgumentsAsString() username, err := msg.ArgumentsAsString()
if err != nil { if err != nil {

View file

@ -39,6 +39,7 @@ type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMes
var CommandHandlers = map[Command]CommandHandler{ var CommandHandlers = map[Command]CommandHandler{
HelloCommand: HandleHello, HelloCommand: HandleHello,
"setuser": HandleSetUser, "setuser": HandleSetUser,
"ready": HandleReady,
"sub": HandleSub, "sub": HandleSub,
"unsub": HandleUnsub, "unsub": HandleUnsub,
@ -335,7 +336,7 @@ func (cm *ClientMessage) ArgumentsAsString() (string1 string, err error) {
} }
// Convenience method: Parse the arguments of the ClientMessage as a single int. // Convenience method: Parse the arguments of the ClientMessage as a single int.
func (cm *ClientMessage) ArgumentsAsInt() (int1 int, err error) { func (cm *ClientMessage) ArgumentsAsInt() (int1 int64, err error) {
var ok bool var ok bool
var num float64 var num float64
num, ok = cm.Arguments.(float64) num, ok = cm.Arguments.(float64)
@ -343,7 +344,7 @@ func (cm *ClientMessage) ArgumentsAsInt() (int1 int, err error) {
err = ExpectedSingleInt err = ExpectedSingleInt
return return
} else { } else {
int1 = int(num) int1 = int64(num)
return int1, nil return int1, nil
} }
} }

View file

@ -32,6 +32,31 @@ func PublishToChat(channel string, msg ClientMessage) (count int) {
return return
} }
func PublishToMultiple(channels []string, msg ClientMessage) (count int) {
found := make(map[chan<- ClientMessage]struct{})
ChatSubscriptionLock.RLock()
for _, channel := range channels {
list := ChatSubscriptionInfo[channel]
if list != nil {
list.RLock()
for _, msgChan := range list.Members {
found[msgChan] = struct{}{}
}
list.RUnlock()
}
}
ChatSubscriptionLock.RUnlock()
for msgChan, _ := range found {
msgChan <- msg
count++
}
return
}
func PublishToAll(msg ClientMessage) (count int) { func PublishToAll(msg ClientMessage) (count int) {
GlobalSubscriptionInfo.RLock() GlobalSubscriptionInfo.RLock()
for _, msgChan := range GlobalSubscriptionInfo.Members { for _, msgChan := range GlobalSubscriptionInfo.Members {

View file

@ -10,9 +10,11 @@ import (
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"os" "os"
"strconv"
"sync" "sync"
"syscall" "syscall"
"testing" "testing"
"time"
) )
func TCountOpenFDs() uint64 { func TCountOpenFDs() uint64 {
@ -21,6 +23,7 @@ func TCountOpenFDs() uint64 {
} }
const IgnoreReceivedArguments = 1 + 2i const IgnoreReceivedArguments = 1 + 2i
func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) { func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) {
var msg ClientMessage var msg ClientMessage
var fail bool var fail bool
@ -38,9 +41,16 @@ func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int,
fail = true fail = true
} }
if arguments != IgnoreReceivedArguments { if arguments != IgnoreReceivedArguments {
if msg.Arguments != arguments { if arguments == nil {
if msg.origArguments != "" {
tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg) tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg)
} }
} else {
argBytes, _ := json.Marshal(arguments)
if msg.origArguments != string(argBytes) {
tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg)
}
}
} }
return msg, !fail return msg, !fail
} }
@ -53,10 +63,57 @@ func TSendMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Co
return err == nil return err == nil
} }
func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments interface{}, deleteMode bool) (url.Values, error) {
form := url.Values{}
form.Set("cmd", string(cmd))
argsBytes, err := json.Marshal(arguments)
if err != nil {
tb.Error(err)
return nil, err
}
form.Set("args", string(argsBytes))
form.Set("channel", channel)
if deleteMode {
form.Set("delete", "1")
}
form.Set("time", time.Now().Format(time.UnixDate))
sealed, err := SealRequest(form)
if err != nil {
tb.Error(err)
return nil, err
}
return sealed, nil
}
func TCheckResponse(tb testing.TB, resp *http.Response, expected string) bool {
var failed bool
respBytes, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
respStr := string(respBytes)
if err != nil {
tb.Error(err)
failed = true
}
if resp.StatusCode != 200 {
tb.Error("Publish failed: ", resp.StatusCode, respStr)
failed = true
}
if respStr != expected {
tb.Errorf("Got wrong response from server. Expected: '%s' Got: '%s'", expected, respStr)
failed = true
}
return !failed
}
type TURLs struct { type TURLs struct {
Websocket string Websocket string
Origin string Origin string
PubMsg string PubMsg string
SavePubMsg string // update_and_pub
} }
func TGetUrls(testserver *httptest.Server) TURLs { func TGetUrls(testserver *httptest.Server) TURLs {
@ -65,6 +122,7 @@ func TGetUrls(testserver *httptest.Server) TURLs {
Websocket: fmt.Sprintf("ws://%s/", addr), Websocket: fmt.Sprintf("ws://%s/", addr),
Origin: fmt.Sprintf("http://%s", addr), Origin: fmt.Sprintf("http://%s", addr),
PubMsg: fmt.Sprintf("http://%s/pub_msg", addr), PubMsg: fmt.Sprintf("http://%s/pub_msg", addr),
SavePubMsg: fmt.Sprintf("http://%s/update_and_pub", addr),
} }
} }
@ -98,32 +156,192 @@ func TestSubscriptionAndPublish(t *testing.T) {
var doneWg sync.WaitGroup var doneWg sync.WaitGroup
var readyWg sync.WaitGroup var readyWg sync.WaitGroup
const TestChannelName = "room.testchannel" const TestChannelName1 = "room.testchannel"
const TestCommand = "testdata" const TestChannelName2 = "room.chan2"
const TestData = "123456789" const TestChannelName3 = "room.chan3"
const TestChannelNameUnused = "room.empty"
const TestCommandChan = "testdata_single"
const TestCommandMulti = "testdata_multi"
const TestCommandGlobal = "testdata_global"
const TestData1 = "123456789"
const TestData2 = 42
const TestData3 = false
var TestData4 = []interface{}{"str1", "str2", "str3"}
ServerInitiatedCommands[TestCommandChan] = PushCommandCacheInfo{CacheTypeLastOnly, MsgTargetTypeChat}
ServerInitiatedCommands[TestCommandMulti] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeMultichat}
ServerInitiatedCommands[TestCommandGlobal] = PushCommandCacheInfo{CacheTypeTimestamps, MsgTargetTypeGlobal}
var server *httptest.Server var server *httptest.Server
var urls TURLs var urls TURLs
TSetup(&server, &urls) TSetup(&server, &urls)
defer server.CloseClientConnections()
defer unsubscribeAllClients() defer unsubscribeAllClients()
conn, err := websocket.Dial(urls.Websocket, "", urls.Origin) var conn *websocket.Conn
var err error
// client 1: sub ch1, ch2
// client 2: sub ch1, ch3
// client 3: sub none
// client 4: delayed sub ch1
// msg 1: ch1
// msg 2: ch2, ch3
// msg 3: chEmpty
// msg 4: global
// Client 1
conn, err = websocket.Dial(urls.Websocket, "", urls.Origin)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
} }
doneWg.Add(1) doneWg.Add(1)
readyWg.Add(1) readyWg.Add(1)
go func(conn *websocket.Conn) { go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "sub", TestChannelName) TSendMessage(t, conn, 2, "sub", TestChannelName1)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
TSendMessage(t, conn, 3, "sub", TestChannelName2) // 2
TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil)
TSendMessage(t, conn, 4, "ready", 0)
TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil)
readyWg.Done()
TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1)
TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2)
TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4)
conn.Close()
doneWg.Done()
}(conn)
// Client 2
conn, err = websocket.Dial(urls.Websocket, "", urls.Origin)
if err != nil {
t.Error(err)
return
}
doneWg.Add(1)
readyWg.Add(1)
go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "sub", TestChannelName1)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
TSendMessage(t, conn, 3, "sub", TestChannelName3) // 3
TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil)
TSendMessage(t, conn, 4, "ready", 0)
TReceiveExpectedMessage(t, conn, 4, SuccessCommand, nil)
readyWg.Done()
TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1)
TReceiveExpectedMessage(t, conn, -1, TestCommandMulti, TestData2)
TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4)
conn.Close()
doneWg.Done()
}(conn)
// Client 3
conn, err = websocket.Dial(urls.Websocket, "", urls.Origin)
if err != nil {
t.Error(err)
return
}
doneWg.Add(1)
readyWg.Add(1)
go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "ready", 0)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
readyWg.Done() readyWg.Done()
TReceiveExpectedMessage(t, conn, -1, TestCommand, TestData) TReceiveExpectedMessage(t, conn, -1, TestCommandGlobal, TestData4)
conn.Close()
doneWg.Done()
}(conn)
// Wait for clients 1-3
readyWg.Wait()
var form url.Values
var resp *http.Response
// Publish message 1 - should go to clients 1, 2
form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelName1, TestData1, false)
if err != nil {
t.FailNow()
}
resp, err = http.PostForm(urls.SavePubMsg, form)
if !TCheckResponse(t, resp, strconv.Itoa(2)) {
t.FailNow()
}
// Publish message 2 - should go to clients 1, 2
form, err = TSealForSavePubMsg(t, TestCommandMulti, TestChannelName2+","+TestChannelName3, TestData2, false)
if err != nil {
t.FailNow()
}
resp, err = http.PostForm(urls.SavePubMsg, form)
if !TCheckResponse(t, resp, strconv.Itoa(2)) {
t.FailNow()
}
// Publish message 3 - should go to no clients
form, err = TSealForSavePubMsg(t, TestCommandChan, TestChannelNameUnused, TestData3, false)
if err != nil {
t.FailNow()
}
resp, err = http.PostForm(urls.SavePubMsg, form)
if !TCheckResponse(t, resp, strconv.Itoa(0)) {
t.FailNow()
}
// Publish message 4 - should go to clients 1, 2, 3
form, err = TSealForSavePubMsg(t, TestCommandGlobal, "", TestData4, false)
if err != nil {
t.FailNow()
}
resp, err = http.PostForm(urls.SavePubMsg, form)
if !TCheckResponse(t, resp, strconv.Itoa(3)) {
t.FailNow()
}
// Start client 4
conn, err = websocket.Dial(urls.Websocket, "", urls.Origin)
if err != nil {
t.Error(err)
return
}
doneWg.Add(1)
readyWg.Add(1)
go func(conn *websocket.Conn) {
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
TSendMessage(t, conn, 2, "sub", TestChannelName1)
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
TSendMessage(t, conn, 3, "ready", 0)
TReceiveExpectedMessage(t, conn, 3, SuccessCommand, nil)
// backlog message
TReceiveExpectedMessage(t, conn, -1, TestCommandChan, TestData1)
readyWg.Done()
conn.Close() conn.Close()
doneWg.Done() doneWg.Done()
@ -131,37 +349,6 @@ func TestSubscriptionAndPublish(t *testing.T) {
readyWg.Wait() readyWg.Wait()
form := url.Values{}
form.Set("cmd", TestCommand)
argsBytes, _ := json.Marshal(TestData)
form.Set("args", string(argsBytes))
form.Set("channel", TestChannelName)
form.Set("scope", MsgTargetTypeChat.Name())
sealedForm, err := SealRequest(form)
if err != nil {
t.Error(err)
server.CloseClientConnections()
panic("halting test")
}
resp, err := http.PostForm(urls.PubMsg, sealedForm)
if err != nil {
t.Error(err)
server.CloseClientConnections()
panic("halting test")
}
respBytes, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
respStr := string(respBytes)
if resp.StatusCode != 200 {
t.Error("Publish failed: ", resp.StatusCode, respStr)
server.CloseClientConnections()
panic("halting test")
}
doneWg.Wait() doneWg.Wait()
server.Close() server.Close()
} }

View file

@ -76,6 +76,34 @@ type ClientInfo struct {
MessageChannel chan<- ClientMessage MessageChannel chan<- ClientMessage
} }
type tgmarray []TimestampedGlobalMessage
type tmmarray []TimestampedMultichatMessage
func (ta tgmarray) Len() int {
return len(ta)
}
func (ta tgmarray) Less(i, j int) bool {
return ta[i].Timestamp.Before(ta[j].Timestamp)
}
func (ta tgmarray) Swap(i, j int) {
ta[i], ta[j] = ta[j], ta[i]
}
func (ta tgmarray) GetTime(i int) time.Time {
return ta[i].Timestamp
}
func (ta tmmarray) Len() int {
return len(ta)
}
func (ta tmmarray) Less(i, j int) bool {
return ta[i].Timestamp.Before(ta[j].Timestamp)
}
func (ta tmmarray) Swap(i, j int) {
ta[i], ta[j] = ta[j], ta[i]
}
func (ta tmmarray) GetTime(i int) time.Time {
return ta[i].Timestamp
}
func (bct BacklogCacheType) Name() string { func (bct BacklogCacheType) Name() string {
switch bct { switch bct {
case CacheTypeInvalid: case CacheTypeInvalid: