diff --git a/socketserver/internal/server/backend.go b/socketserver/internal/server/backend.go index 00a55293..c1073370 100644 --- a/socketserver/internal/server/backend.go +++ b/socketserver/internal/server/backend.go @@ -64,7 +64,10 @@ func getCacheKey(remoteCommand, data string) string { return fmt.Sprintf("%s/%s", remoteCommand, data) } +// Publish a message to clients with no caching. +// The scope must be specified because no attempt is made to recognize the command. func HBackendPublishRequest(w http.ResponseWriter, r *http.Request) { + r.ParseForm() formData, err := UnsealRequest(r.Form) if err != nil { w.WriteHeader(403) @@ -91,6 +94,7 @@ func HBackendPublishRequest(w http.ResponseWriter, r *http.Request) { } cm := ClientMessage{MessageID: -1, Command: Command(cmd), origArguments: json} + cm.parseOrigArguments() var count int switch target { diff --git a/socketserver/internal/server/backlog.go b/socketserver/internal/server/backlog.go index 24e0b6be..d16b3cf5 100644 --- a/socketserver/internal/server/backlog.go +++ b/socketserver/internal/server/backlog.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net/http" + "time" ) type PushCommandCacheInfo struct { @@ -82,7 +83,22 @@ var ErrorUnrecognizedCacheType = errors.New("Invalid value for cachetype") // Returned by MessageTargetType.UnmarshalJSON() var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target") -func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) { +type PersistentCachedData struct { + Timestamp time.Time + Channel string + Watching bool + Data string +} + +// map command -> channel -> data +var CachedDataLast map[Command]map[string]string + +func DumpCache() { + CachedDataLast = make(map[Command]map[string]string) +} + +func HBackendDumpCache(w http.ResponseWriter, r *http.Request) { + r.ParseForm() formData, err := UnsealRequest(r.Form) if err != nil { w.WriteHeader(403) @@ -90,9 +106,36 @@ func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) { return } - cmd := formData.Get("command") + confirm := formData.Get("confirm") + if confirm == "1" { + DumpCache() + } +} + +// Publish a message to clients, and update the in-server cache for the message. +// notes: +// `scope` is implicit in the command +func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + formData, err := UnsealRequest(r.Form) + if err != nil { + w.WriteHeader(403) + fmt.Fprintf(w, "Error: %v", err) + return + } + + cmd := formData.Get("cmd") + json := formData.Get("args") + channel := formData.Get("channel") + cacheinfo, ok := ServerInitiatedCommands[cmd] if !ok { w.WriteHeader(422) + fmt.Fprintf(w, "Caching semantics unknown for command '%s'. Post to /addcachedcommand first.") + return } + + _ = cacheinfo + _ = json + _ = channel } diff --git a/socketserver/internal/server/handlecore.go b/socketserver/internal/server/handlecore.go index dd4c3f04..1842a0ee 100644 --- a/socketserver/internal/server/handlecore.go +++ b/socketserver/internal/server/handlecore.go @@ -1,4 +1,4 @@ -package server // import "bitbucket.org/stendec/frankerfacez/socketserver/server" +package server // import "bitbucket.org/stendec/frankerfacez/socketserver/internal/server" import ( "crypto/tls" @@ -262,13 +262,21 @@ func UnmarshalClientMessage(data []byte, payloadType byte, v interface{}) (err e dataStr = dataStr[spaceIdx+1:] argumentsJson := dataStr out.origArguments = argumentsJson - err = json.Unmarshal([]byte(argumentsJson), &out.Arguments) + err = out.parseOrigArguments() if err != nil { return } return nil } +func (cm *ClientMessage) parseOrigArguments() error { + err := json.Unmarshal([]byte(cm.origArguments), &cm.Arguments) + if err != nil { + return err + } + return nil +} + func MarshalClientMessage(clientMessage interface{}) (data []byte, payloadType byte, err error) { var msg ClientMessage var ok bool diff --git a/socketserver/internal/server/publisher.go b/socketserver/internal/server/publisher.go index 659a3038..65aaf96a 100644 --- a/socketserver/internal/server/publisher.go +++ b/socketserver/internal/server/publisher.go @@ -4,8 +4,6 @@ package server // If I screwed up the locking, I won't know until it's too late. import ( - "fmt" - "net/http" "sync" "time" ) @@ -58,6 +56,7 @@ func PublishToAll(msg ClientMessage) (count int) { count++ } GlobalSubscriptionInfo.RUnlock() + return } // Add a channel to the subscriptions while holding a read-lock to the map. diff --git a/socketserver/internal/server/publisher_test.go b/socketserver/internal/server/publisher_test.go index f25235a1..4d416ce4 100644 --- a/socketserver/internal/server/publisher_test.go +++ b/socketserver/internal/server/publisher_test.go @@ -1,31 +1,152 @@ package server import ( + "encoding/json" "fmt" "github.com/satori/go.uuid" "golang.org/x/net/websocket" "io/ioutil" "net/http" "net/http/httptest" + "net/url" "os" "sync" "syscall" "testing" ) -func CountOpenFDs() uint64 { +func TCountOpenFDs() uint64 { ary, _ := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid())) return uint64(len(ary)) } +const IgnoreReceivedArguments = 1+2i +func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) { + var msg ClientMessage + var fail bool + err := FFZCodec.Receive(conn, &msg) + if err != nil { + tb.Error(err) + return msg, false + } + if msg.MessageID != messageId { + tb.Error("Message ID was wrong. Expected", messageId, ", got", msg.MessageID, ":", msg) + fail = true + } + if msg.Command != command { + tb.Error("Command was wrong. Expected", command, ", got", msg.Command, ":", msg) + fail = true + } + if arguments != IgnoreReceivedArguments { + if msg.Arguments != arguments { + tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg) + } + } + return msg, !fail +} + +func TSendMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) bool { + err := FFZCodec.Send(conn, ClientMessage{MessageID: messageId, Command: command, Arguments: arguments}) + if err != nil { + tb.Error(err) + } + return err == nil +} + +func TestSubscriptionAndPublish(t *testing.T) { + var doneWg sync.WaitGroup + var readyWg sync.WaitGroup + + const TestChannelName = "testchannel" + const TestCommand = "testdata" + const TestData = "123456789" + + GenerateKeys("/tmp/test_naclkeys.json", "2", "+ZMqOmxhaVrCV5c0OMZ09QoSGcJHuqQtJrwzRD+JOjE=") + DumpCache() + conf := &Config{ + UseSSL: false, + NaclKeysFile: "/tmp/test_naclkeys.json", + SocketOrigin: "localhost:2002", + } + serveMux := http.NewServeMux() + SetupServerAndHandle(conf, nil, serveMux) + + server := httptest.NewUnstartedServer(serveMux) + server.Start() + + wsUrl := fmt.Sprintf("ws://%s/", server.Listener.Addr().String()) + originUrl := fmt.Sprintf("http://%s", server.Listener.Addr().String()) + publishUrl := fmt.Sprintf("http://%s/pub_msg", server.Listener.Addr().String()) + + conn, err := websocket.Dial(wsUrl, "", originUrl) + if err != nil { + t.Error(err) + return + } + doneWg.Add(1) + readyWg.Add(1) + + go func(conn *websocket.Conn) { + TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) + TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments) + TSendMessage(t, conn, 2, "sub", TestChannelName) + TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil) + + readyWg.Done() + + TReceiveExpectedMessage(t, conn, -1, TestCommand, TestData) + + conn.Close() + doneWg.Done() + }(conn) + + readyWg.Wait() + + form := url.Values{} + form.Set("cmd", TestCommand) + argsBytes, _ := json.Marshal(TestData) + form.Set("args", string(argsBytes)) + form.Set("channel", TestChannelName) + form.Set("scope", MsgTargetTypeChat.Name()) + + sealedForm, err := SealRequest(form) + if err != nil { + t.Error(err) + server.CloseClientConnections() + panic("halting test") + } + + resp, err := http.PostForm(publishUrl, sealedForm) + if err != nil { + t.Error(err) + server.CloseClientConnections() + panic("halting test") + } + + respBytes, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + respStr := string(respBytes) + + if resp.StatusCode != 200 { + t.Error("Publish failed: ", resp.StatusCode, respStr) + server.CloseClientConnections() + panic("halting test") + } + + doneWg.Wait() + server.Close() +} + func BenchmarkThousandUserSubscription(b *testing.B) { var doneWg sync.WaitGroup var readyWg sync.WaitGroup const TestChannelName = "testchannel" const TestCommand = "testdata" + const TestData = "123456789" GenerateKeys("/tmp/test_naclkeys.json", "2", "+ZMqOmxhaVrCV5c0OMZ09QoSGcJHuqQtJrwzRD+JOjE=") + DumpCache() conf := &Config{ UseSSL: false, NaclKeysFile: "/tmp/test_naclkeys.json", @@ -40,7 +161,7 @@ func BenchmarkThousandUserSubscription(b *testing.B) { wsUrl := fmt.Sprintf("ws://%s/", server.Listener.Addr().String()) originUrl := fmt.Sprintf("http://%s", server.Listener.Addr().String()) - message := ClientMessage{MessageID: -1, Command: "testdata", Arguments: "123456789"} + message := ClientMessage{MessageID: -1, Command: "testdata", Arguments: TestData} fmt.Println() fmt.Println(b.N) @@ -48,7 +169,7 @@ func BenchmarkThousandUserSubscription(b *testing.B) { var limit syscall.Rlimit syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit) - limit.Cur = CountOpenFDs() + uint64(b.N)*2 + 100 + limit.Cur = TCountOpenFDs() + uint64(b.N)*2 + 100 if limit.Cur > limit.Max { b.Skip("Open file limit too low") @@ -67,60 +188,17 @@ func BenchmarkThousandUserSubscription(b *testing.B) { doneWg.Add(1) readyWg.Add(1) go func(i int, conn *websocket.Conn) { - var err error - var msg ClientMessage - err = FFZCodec.Send(conn, ClientMessage{MessageID: 1, Command: HelloCommand, Arguments: []interface{}{"ffz_test", uuid.NewV4().String()}}) - if err != nil { - b.Error(err) - } - err = FFZCodec.Send(conn, ClientMessage{MessageID: 2, Command: "sub", Arguments: TestChannelName}) - if err != nil { - b.Error(err) - } - err = FFZCodec.Receive(conn, &msg) - if err != nil { - b.Error(err) - } - if msg.MessageID != 1 { - b.Error("Got out-of-order message ID", msg) - } - if msg.Command != SuccessCommand { - b.Error("Command was not a success", msg) - } - err = FFZCodec.Receive(conn, &msg) - if err != nil { - b.Error(err) - } - if msg.MessageID != 2 { - b.Error("Got out-of-order message ID", msg) - } - if msg.Command != SuccessCommand { - b.Error("Command was not a success", msg) - } + TSendMessage(b, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()}) + TSendMessage(b, conn, 2, "sub", TestChannelName) + + TReceiveExpectedMessage(b, conn, 1, SuccessCommand, IgnoreReceivedArguments) + TReceiveExpectedMessage(b, conn, 2, SuccessCommand, nil) fmt.Println(i, " ready") readyWg.Done() - err = FFZCodec.Receive(conn, &msg) - if err != nil { - b.Error(err) - } - if msg.MessageID != -1 { - fmt.Println(msg) - b.Error("Client did not get expected messageID of -1") - } - if msg.Command != TestCommand { - fmt.Println(msg) - b.Error("Client did not get expected command") - } - str, err := msg.ArgumentsAsString() - if err != nil { - b.Error(err) - } - if str != "123456789" { - fmt.Println(msg) - b.Error("Client did not get expected data") - } + TReceiveExpectedMessage(b, conn, -1, TestCommand, TestData) + conn.Close() doneWg.Done() }(i, conn) @@ -131,7 +209,8 @@ func BenchmarkThousandUserSubscription(b *testing.B) { fmt.Println("publishing...") if PublishToChat(TestChannelName, message) != b.N { b.Error("not enough sent") - b.FailNow() + server.CloseClientConnections() + panic("halting test instead of waiting") } doneWg.Wait() diff --git a/socketserver/internal/server/types.go b/socketserver/internal/server/types.go index 5ce92c81..32cc940d 100644 --- a/socketserver/internal/server/types.go +++ b/socketserver/internal/server/types.go @@ -2,8 +2,6 @@ package server import ( "encoding/json" - "errors" - "fmt" "github.com/satori/go.uuid" "sync" "time"