mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-09-16 10:06:54 +00:00
Add test that posts to /pub_msg, test infrastructure
This commit is contained in:
parent
730ce39f72
commit
0be3693c99
6 changed files with 193 additions and 62 deletions
|
@ -64,7 +64,10 @@ func getCacheKey(remoteCommand, data string) string {
|
||||||
return fmt.Sprintf("%s/%s", remoteCommand, data)
|
return fmt.Sprintf("%s/%s", remoteCommand, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Publish a message to clients with no caching.
|
||||||
|
// The scope must be specified because no attempt is made to recognize the command.
|
||||||
func HBackendPublishRequest(w http.ResponseWriter, r *http.Request) {
|
func HBackendPublishRequest(w http.ResponseWriter, r *http.Request) {
|
||||||
|
r.ParseForm()
|
||||||
formData, err := UnsealRequest(r.Form)
|
formData, err := UnsealRequest(r.Form)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(403)
|
w.WriteHeader(403)
|
||||||
|
@ -91,6 +94,7 @@ func HBackendPublishRequest(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cm := ClientMessage{MessageID: -1, Command: Command(cmd), origArguments: json}
|
cm := ClientMessage{MessageID: -1, Command: Command(cmd), origArguments: json}
|
||||||
|
cm.parseOrigArguments()
|
||||||
var count int
|
var count int
|
||||||
|
|
||||||
switch target {
|
switch target {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PushCommandCacheInfo struct {
|
type PushCommandCacheInfo struct {
|
||||||
|
@ -82,7 +83,22 @@ var ErrorUnrecognizedCacheType = errors.New("Invalid value for cachetype")
|
||||||
// Returned by MessageTargetType.UnmarshalJSON()
|
// Returned by MessageTargetType.UnmarshalJSON()
|
||||||
var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target")
|
var ErrorUnrecognizedTargetType = errors.New("Invalid value for message target")
|
||||||
|
|
||||||
func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) {
|
type PersistentCachedData struct {
|
||||||
|
Timestamp time.Time
|
||||||
|
Channel string
|
||||||
|
Watching bool
|
||||||
|
Data string
|
||||||
|
}
|
||||||
|
|
||||||
|
// map command -> channel -> data
|
||||||
|
var CachedDataLast map[Command]map[string]string
|
||||||
|
|
||||||
|
func DumpCache() {
|
||||||
|
CachedDataLast = make(map[Command]map[string]string)
|
||||||
|
}
|
||||||
|
|
||||||
|
func HBackendDumpCache(w http.ResponseWriter, r *http.Request) {
|
||||||
|
r.ParseForm()
|
||||||
formData, err := UnsealRequest(r.Form)
|
formData, err := UnsealRequest(r.Form)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(403)
|
w.WriteHeader(403)
|
||||||
|
@ -90,9 +106,36 @@ func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd := formData.Get("command")
|
confirm := formData.Get("confirm")
|
||||||
|
if confirm == "1" {
|
||||||
|
DumpCache()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish a message to clients, and update the in-server cache for the message.
|
||||||
|
// notes:
|
||||||
|
// `scope` is implicit in the command
|
||||||
|
func HBackendUpdateAndPublish(w http.ResponseWriter, r *http.Request) {
|
||||||
|
r.ParseForm()
|
||||||
|
formData, err := UnsealRequest(r.Form)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(403)
|
||||||
|
fmt.Fprintf(w, "Error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := formData.Get("cmd")
|
||||||
|
json := formData.Get("args")
|
||||||
|
channel := formData.Get("channel")
|
||||||
|
|
||||||
cacheinfo, ok := ServerInitiatedCommands[cmd]
|
cacheinfo, ok := ServerInitiatedCommands[cmd]
|
||||||
if !ok {
|
if !ok {
|
||||||
w.WriteHeader(422)
|
w.WriteHeader(422)
|
||||||
|
fmt.Fprintf(w, "Caching semantics unknown for command '%s'. Post to /addcachedcommand first.")
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ = cacheinfo
|
||||||
|
_ = json
|
||||||
|
_ = channel
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package server // import "bitbucket.org/stendec/frankerfacez/socketserver/server"
|
package server // import "bitbucket.org/stendec/frankerfacez/socketserver/internal/server"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
@ -262,13 +262,21 @@ func UnmarshalClientMessage(data []byte, payloadType byte, v interface{}) (err e
|
||||||
dataStr = dataStr[spaceIdx+1:]
|
dataStr = dataStr[spaceIdx+1:]
|
||||||
argumentsJson := dataStr
|
argumentsJson := dataStr
|
||||||
out.origArguments = argumentsJson
|
out.origArguments = argumentsJson
|
||||||
err = json.Unmarshal([]byte(argumentsJson), &out.Arguments)
|
err = out.parseOrigArguments()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cm *ClientMessage) parseOrigArguments() error {
|
||||||
|
err := json.Unmarshal([]byte(cm.origArguments), &cm.Arguments)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func MarshalClientMessage(clientMessage interface{}) (data []byte, payloadType byte, err error) {
|
func MarshalClientMessage(clientMessage interface{}) (data []byte, payloadType byte, err error) {
|
||||||
var msg ClientMessage
|
var msg ClientMessage
|
||||||
var ok bool
|
var ok bool
|
||||||
|
|
|
@ -4,8 +4,6 @@ package server
|
||||||
// If I screwed up the locking, I won't know until it's too late.
|
// If I screwed up the locking, I won't know until it's too late.
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -58,6 +56,7 @@ func PublishToAll(msg ClientMessage) (count int) {
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
GlobalSubscriptionInfo.RUnlock()
|
GlobalSubscriptionInfo.RUnlock()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a channel to the subscriptions while holding a read-lock to the map.
|
// Add a channel to the subscriptions while holding a read-lock to the map.
|
||||||
|
|
|
@ -1,31 +1,152 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/satori/go.uuid"
|
"github.com/satori/go.uuid"
|
||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func CountOpenFDs() uint64 {
|
func TCountOpenFDs() uint64 {
|
||||||
ary, _ := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid()))
|
ary, _ := ioutil.ReadDir(fmt.Sprintf("/proc/%d/fd", os.Getpid()))
|
||||||
return uint64(len(ary))
|
return uint64(len(ary))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const IgnoreReceivedArguments = 1+2i
|
||||||
|
func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) {
|
||||||
|
var msg ClientMessage
|
||||||
|
var fail bool
|
||||||
|
err := FFZCodec.Receive(conn, &msg)
|
||||||
|
if err != nil {
|
||||||
|
tb.Error(err)
|
||||||
|
return msg, false
|
||||||
|
}
|
||||||
|
if msg.MessageID != messageId {
|
||||||
|
tb.Error("Message ID was wrong. Expected", messageId, ", got", msg.MessageID, ":", msg)
|
||||||
|
fail = true
|
||||||
|
}
|
||||||
|
if msg.Command != command {
|
||||||
|
tb.Error("Command was wrong. Expected", command, ", got", msg.Command, ":", msg)
|
||||||
|
fail = true
|
||||||
|
}
|
||||||
|
if arguments != IgnoreReceivedArguments {
|
||||||
|
if msg.Arguments != arguments {
|
||||||
|
tb.Error("Arguments are wrong. Expected", arguments, ", got", msg.Arguments, ":", msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return msg, !fail
|
||||||
|
}
|
||||||
|
|
||||||
|
func TSendMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) bool {
|
||||||
|
err := FFZCodec.Send(conn, ClientMessage{MessageID: messageId, Command: command, Arguments: arguments})
|
||||||
|
if err != nil {
|
||||||
|
tb.Error(err)
|
||||||
|
}
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscriptionAndPublish(t *testing.T) {
|
||||||
|
var doneWg sync.WaitGroup
|
||||||
|
var readyWg sync.WaitGroup
|
||||||
|
|
||||||
|
const TestChannelName = "testchannel"
|
||||||
|
const TestCommand = "testdata"
|
||||||
|
const TestData = "123456789"
|
||||||
|
|
||||||
|
GenerateKeys("/tmp/test_naclkeys.json", "2", "+ZMqOmxhaVrCV5c0OMZ09QoSGcJHuqQtJrwzRD+JOjE=")
|
||||||
|
DumpCache()
|
||||||
|
conf := &Config{
|
||||||
|
UseSSL: false,
|
||||||
|
NaclKeysFile: "/tmp/test_naclkeys.json",
|
||||||
|
SocketOrigin: "localhost:2002",
|
||||||
|
}
|
||||||
|
serveMux := http.NewServeMux()
|
||||||
|
SetupServerAndHandle(conf, nil, serveMux)
|
||||||
|
|
||||||
|
server := httptest.NewUnstartedServer(serveMux)
|
||||||
|
server.Start()
|
||||||
|
|
||||||
|
wsUrl := fmt.Sprintf("ws://%s/", server.Listener.Addr().String())
|
||||||
|
originUrl := fmt.Sprintf("http://%s", server.Listener.Addr().String())
|
||||||
|
publishUrl := fmt.Sprintf("http://%s/pub_msg", server.Listener.Addr().String())
|
||||||
|
|
||||||
|
conn, err := websocket.Dial(wsUrl, "", originUrl)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
doneWg.Add(1)
|
||||||
|
readyWg.Add(1)
|
||||||
|
|
||||||
|
go func(conn *websocket.Conn) {
|
||||||
|
TSendMessage(t, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
|
||||||
|
TReceiveExpectedMessage(t, conn, 1, SuccessCommand, IgnoreReceivedArguments)
|
||||||
|
TSendMessage(t, conn, 2, "sub", TestChannelName)
|
||||||
|
TReceiveExpectedMessage(t, conn, 2, SuccessCommand, nil)
|
||||||
|
|
||||||
|
readyWg.Done()
|
||||||
|
|
||||||
|
TReceiveExpectedMessage(t, conn, -1, TestCommand, TestData)
|
||||||
|
|
||||||
|
conn.Close()
|
||||||
|
doneWg.Done()
|
||||||
|
}(conn)
|
||||||
|
|
||||||
|
readyWg.Wait()
|
||||||
|
|
||||||
|
form := url.Values{}
|
||||||
|
form.Set("cmd", TestCommand)
|
||||||
|
argsBytes, _ := json.Marshal(TestData)
|
||||||
|
form.Set("args", string(argsBytes))
|
||||||
|
form.Set("channel", TestChannelName)
|
||||||
|
form.Set("scope", MsgTargetTypeChat.Name())
|
||||||
|
|
||||||
|
sealedForm, err := SealRequest(form)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
server.CloseClientConnections()
|
||||||
|
panic("halting test")
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := http.PostForm(publishUrl, sealedForm)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
server.CloseClientConnections()
|
||||||
|
panic("halting test")
|
||||||
|
}
|
||||||
|
|
||||||
|
respBytes, err := ioutil.ReadAll(resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
respStr := string(respBytes)
|
||||||
|
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
t.Error("Publish failed: ", resp.StatusCode, respStr)
|
||||||
|
server.CloseClientConnections()
|
||||||
|
panic("halting test")
|
||||||
|
}
|
||||||
|
|
||||||
|
doneWg.Wait()
|
||||||
|
server.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkThousandUserSubscription(b *testing.B) {
|
func BenchmarkThousandUserSubscription(b *testing.B) {
|
||||||
var doneWg sync.WaitGroup
|
var doneWg sync.WaitGroup
|
||||||
var readyWg sync.WaitGroup
|
var readyWg sync.WaitGroup
|
||||||
|
|
||||||
const TestChannelName = "testchannel"
|
const TestChannelName = "testchannel"
|
||||||
const TestCommand = "testdata"
|
const TestCommand = "testdata"
|
||||||
|
const TestData = "123456789"
|
||||||
|
|
||||||
GenerateKeys("/tmp/test_naclkeys.json", "2", "+ZMqOmxhaVrCV5c0OMZ09QoSGcJHuqQtJrwzRD+JOjE=")
|
GenerateKeys("/tmp/test_naclkeys.json", "2", "+ZMqOmxhaVrCV5c0OMZ09QoSGcJHuqQtJrwzRD+JOjE=")
|
||||||
|
DumpCache()
|
||||||
conf := &Config{
|
conf := &Config{
|
||||||
UseSSL: false,
|
UseSSL: false,
|
||||||
NaclKeysFile: "/tmp/test_naclkeys.json",
|
NaclKeysFile: "/tmp/test_naclkeys.json",
|
||||||
|
@ -40,7 +161,7 @@ func BenchmarkThousandUserSubscription(b *testing.B) {
|
||||||
wsUrl := fmt.Sprintf("ws://%s/", server.Listener.Addr().String())
|
wsUrl := fmt.Sprintf("ws://%s/", server.Listener.Addr().String())
|
||||||
originUrl := fmt.Sprintf("http://%s", server.Listener.Addr().String())
|
originUrl := fmt.Sprintf("http://%s", server.Listener.Addr().String())
|
||||||
|
|
||||||
message := ClientMessage{MessageID: -1, Command: "testdata", Arguments: "123456789"}
|
message := ClientMessage{MessageID: -1, Command: "testdata", Arguments: TestData}
|
||||||
|
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
fmt.Println(b.N)
|
fmt.Println(b.N)
|
||||||
|
@ -48,7 +169,7 @@ func BenchmarkThousandUserSubscription(b *testing.B) {
|
||||||
var limit syscall.Rlimit
|
var limit syscall.Rlimit
|
||||||
syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit)
|
syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit)
|
||||||
|
|
||||||
limit.Cur = CountOpenFDs() + uint64(b.N)*2 + 100
|
limit.Cur = TCountOpenFDs() + uint64(b.N)*2 + 100
|
||||||
|
|
||||||
if limit.Cur > limit.Max {
|
if limit.Cur > limit.Max {
|
||||||
b.Skip("Open file limit too low")
|
b.Skip("Open file limit too low")
|
||||||
|
@ -67,60 +188,17 @@ func BenchmarkThousandUserSubscription(b *testing.B) {
|
||||||
doneWg.Add(1)
|
doneWg.Add(1)
|
||||||
readyWg.Add(1)
|
readyWg.Add(1)
|
||||||
go func(i int, conn *websocket.Conn) {
|
go func(i int, conn *websocket.Conn) {
|
||||||
var err error
|
TSendMessage(b, conn, 1, HelloCommand, []interface{}{"ffz_0.0-test", uuid.NewV4().String()})
|
||||||
var msg ClientMessage
|
TSendMessage(b, conn, 2, "sub", TestChannelName)
|
||||||
err = FFZCodec.Send(conn, ClientMessage{MessageID: 1, Command: HelloCommand, Arguments: []interface{}{"ffz_test", uuid.NewV4().String()}})
|
|
||||||
if err != nil {
|
TReceiveExpectedMessage(b, conn, 1, SuccessCommand, IgnoreReceivedArguments)
|
||||||
b.Error(err)
|
TReceiveExpectedMessage(b, conn, 2, SuccessCommand, nil)
|
||||||
}
|
|
||||||
err = FFZCodec.Send(conn, ClientMessage{MessageID: 2, Command: "sub", Arguments: TestChannelName})
|
|
||||||
if err != nil {
|
|
||||||
b.Error(err)
|
|
||||||
}
|
|
||||||
err = FFZCodec.Receive(conn, &msg)
|
|
||||||
if err != nil {
|
|
||||||
b.Error(err)
|
|
||||||
}
|
|
||||||
if msg.MessageID != 1 {
|
|
||||||
b.Error("Got out-of-order message ID", msg)
|
|
||||||
}
|
|
||||||
if msg.Command != SuccessCommand {
|
|
||||||
b.Error("Command was not a success", msg)
|
|
||||||
}
|
|
||||||
err = FFZCodec.Receive(conn, &msg)
|
|
||||||
if err != nil {
|
|
||||||
b.Error(err)
|
|
||||||
}
|
|
||||||
if msg.MessageID != 2 {
|
|
||||||
b.Error("Got out-of-order message ID", msg)
|
|
||||||
}
|
|
||||||
if msg.Command != SuccessCommand {
|
|
||||||
b.Error("Command was not a success", msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println(i, " ready")
|
fmt.Println(i, " ready")
|
||||||
readyWg.Done()
|
readyWg.Done()
|
||||||
|
|
||||||
err = FFZCodec.Receive(conn, &msg)
|
TReceiveExpectedMessage(b, conn, -1, TestCommand, TestData)
|
||||||
if err != nil {
|
|
||||||
b.Error(err)
|
|
||||||
}
|
|
||||||
if msg.MessageID != -1 {
|
|
||||||
fmt.Println(msg)
|
|
||||||
b.Error("Client did not get expected messageID of -1")
|
|
||||||
}
|
|
||||||
if msg.Command != TestCommand {
|
|
||||||
fmt.Println(msg)
|
|
||||||
b.Error("Client did not get expected command")
|
|
||||||
}
|
|
||||||
str, err := msg.ArgumentsAsString()
|
|
||||||
if err != nil {
|
|
||||||
b.Error(err)
|
|
||||||
}
|
|
||||||
if str != "123456789" {
|
|
||||||
fmt.Println(msg)
|
|
||||||
b.Error("Client did not get expected data")
|
|
||||||
}
|
|
||||||
conn.Close()
|
conn.Close()
|
||||||
doneWg.Done()
|
doneWg.Done()
|
||||||
}(i, conn)
|
}(i, conn)
|
||||||
|
@ -131,7 +209,8 @@ func BenchmarkThousandUserSubscription(b *testing.B) {
|
||||||
fmt.Println("publishing...")
|
fmt.Println("publishing...")
|
||||||
if PublishToChat(TestChannelName, message) != b.N {
|
if PublishToChat(TestChannelName, message) != b.N {
|
||||||
b.Error("not enough sent")
|
b.Error("not enough sent")
|
||||||
b.FailNow()
|
server.CloseClientConnections()
|
||||||
|
panic("halting test instead of waiting")
|
||||||
}
|
}
|
||||||
doneWg.Wait()
|
doneWg.Wait()
|
||||||
|
|
||||||
|
|
|
@ -2,8 +2,6 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"github.com/satori/go.uuid"
|
"github.com/satori/go.uuid"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue