diff --git a/socketserver/cmd/ffzsocketserver/socketserver.go b/socketserver/cmd/ffzsocketserver/socketserver.go index 59b33e3b..7382b736 100644 --- a/socketserver/cmd/ffzsocketserver/socketserver.go +++ b/socketserver/cmd/ffzsocketserver/socketserver.go @@ -44,7 +44,7 @@ func main() { Addr: conf.ListenAddr, } - server.SetupServerAndHandle(conf, httpServer.TLSConfig, nil) + server.SetupServerAndHandle(conf, nil) go commandLineConsole() diff --git a/socketserver/internal/server/backend.go b/socketserver/internal/server/backend.go index aae7ce79..a7528a39 100644 --- a/socketserver/internal/server/backend.go +++ b/socketserver/internal/server/backend.go @@ -22,6 +22,7 @@ var backendUrl string var responseCache *cache.Cache var getBacklogUrl string +var postStatisticsUrl string var backendSharedKey [32]byte var serverId int @@ -37,6 +38,7 @@ func SetupBackend(config *ConfigFile) { responseCache = cache.New(60*time.Second, 120*time.Second) getBacklogUrl = fmt.Sprintf("%s/backlog", backendUrl) + postStatisticsUrl = fmt.Sprintf("%s/stats", backendUrl) messageBufferPool.New = New4KByteBuffer @@ -155,6 +157,15 @@ func RequestRemoteData(remoteCommand, data string, auth AuthInfo) (responseStr s return } +func SendAggregatedData(sealedForm url.Values) (error) { + resp, err := backendHttpClient.PostForm(postStatisticsUrl, sealedForm) + if err != nil { + return err + } + + return resp.Body.Close() +} + func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) { formData := url.Values{ "subs": chatSubs, diff --git a/socketserver/internal/server/backlog.go b/socketserver/internal/server/backlog.go index 278944a0..9b4f981f 100644 --- a/socketserver/internal/server/backlog.go +++ b/socketserver/internal/server/backlog.go @@ -214,7 +214,7 @@ func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) { client.Mutex.Unlock() } -func TimedBacklogJanitor() { +func backlogJanitor() { for { time.Sleep(1 * time.Hour) CleanupTimedBacklogMessages() diff --git a/socketserver/internal/server/commands.go b/socketserver/internal/server/commands.go index aba9df08..c0d3a9e3 100644 --- a/socketserver/internal/server/commands.go +++ b/socketserver/internal/server/commands.go @@ -1,13 +1,15 @@ package server import ( + "encoding/json" "fmt" + "github.com/gorilla/websocket" "github.com/satori/go.uuid" - "golang.org/x/net/websocket" "log" "strconv" "sync" "time" + "net/url" ) var ResponseSuccess = ClientMessage{Command: SuccessCommand} @@ -19,7 +21,7 @@ func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) handler, ok := CommandHandlers[msg.Command] if !ok { log.Println("[!] Unknown command", msg.Command, "- sent by client", client.ClientID, "@", conn.RemoteAddr()) - FFZCodec.Send(conn, ClientMessage{ + SendMessage(conn, ClientMessage{ MessageID: msg.MessageID, Command: "error", Arguments: fmt.Sprintf("Unknown command %s", msg.Command), @@ -35,10 +37,10 @@ func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) // The response will be delivered over client.MessageChannel / serverMessageChan } else { response.MessageID = msg.MessageID - FFZCodec.Send(conn, response) + SendMessage(conn, response) } } else { - FFZCodec.Send(conn, ClientMessage{ + SendMessage(conn, ClientMessage{ MessageID: msg.MessageID, Command: "error", Arguments: err.Error(), @@ -196,27 +198,16 @@ func GetSubscriptionBacklog(conn *websocket.Conn, client *ClientInfo) { } } -type SurveySubmission struct { - User string - Json string -} - -var SurveySubmissions []SurveySubmission -var SurveySubmissionLock sync.Mutex - func HandleSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { - SurveySubmissionLock.Lock() - SurveySubmissions = append(SurveySubmissions, SurveySubmission{client.TwitchUsername, msg.origArguments}) - SurveySubmissionLock.Unlock() - + // Discard return ResponseSuccess, nil } type FollowEvent struct { - User string - Channel string - NowFollowing bool - Timestamp time.Time + User string `json:u` + Channel string `json:c` + NowFollowing bool `json:f` + Timestamp time.Time `json:t` } var FollowEvents []FollowEvent @@ -270,14 +261,62 @@ func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMess return ResponseSuccess, nil } +func sendAggregateData() { + for { + time.Sleep(15 * time.Minute) + DoSendAggregateData() + } +} + +func DoSendAggregateData() { + FollowEventsLock.Lock() + follows := FollowEvents + FollowEvents = nil + FollowEventsLock.Unlock() + AggregateEmoteUsageLock.Lock() + emoteUsage := AggregateEmoteUsage + AggregateEmoteUsage = make(map[int]map[string]int) + AggregateEmoteUsageLock.Unlock() + + reportForm := url.Values{} + + followJson, err := json.Marshal(follows) + if err != nil { + log.Print(err) + } else { + reportForm.Set("follows", string(followJson)) + } + + emoteJson, err := json.Marshal(emoteUsage) + if err != nil { + log.Print(err) + } else { + reportForm.Set("emotes", string(emoteJson)) + } + + form, err := SealRequest(reportForm) + if err != nil { + log.Print(err) + return + } + + err = SendAggregatedData(form) + if err != nil { + log.Print(err) + return + } + + // done +} + func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { go func(conn *websocket.Conn, msg ClientMessage, authInfo AuthInfo) { resp, err := RequestRemoteDataCached(string(msg.Command), msg.origArguments, authInfo) if err != nil { - FFZCodec.Send(conn, ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()}) + SendMessage(conn, ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()}) } else { - FFZCodec.Send(conn, ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp}) + SendMessage(conn, ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp}) } }(conn, msg, client.AuthInfo) diff --git a/socketserver/internal/server/handlecore.go b/socketserver/internal/server/handlecore.go index b673571e..a285770b 100644 --- a/socketserver/internal/server/handlecore.go +++ b/socketserver/internal/server/handlecore.go @@ -1,17 +1,17 @@ package server // import "bitbucket.org/stendec/frankerfacez/socketserver/internal/server" import ( - "crypto/tls" "encoding/json" "errors" "fmt" - "golang.org/x/net/websocket" + "github.com/gorilla/websocket" "io" "log" "net/http" "strconv" "strings" "sync" + "time" ) const MAX_PACKET_SIZE = 1024 @@ -54,10 +54,12 @@ const HelloCommand Command = "hello" // It signals that the work has been handed off to a background goroutine. const AsyncResponseCommand Command = "_async" -// A websocket.Codec that translates the protocol into ClientMessage objects. -var FFZCodec websocket.Codec = websocket.Codec{ - Marshal: MarshalClientMessage, - Unmarshal: UnmarshalClientMessage, +var SocketUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return r.Header.Get("Origin") == "http://www.twitch.tv" + }, } // Errors that get returned to the client. @@ -72,69 +74,54 @@ var ExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a floa var gconfig *ConfigFile -// Create a websocket.Server with the options from the provided Config. -func setupServer(config *ConfigFile, tlsConfig *tls.Config) *websocket.Server { - gconfig = config - // sockConf, err := websocket.NewConfig("/", config.SocketOrigin) - // if err != nil { - // log.Fatal(err) - // } - - SetupBackend(config) - - // if config.UseSSL { - // cert, err := tls.LoadX509KeyPair(config.SSLCertificateFile, config.SSLKeyFile) - // if err != nil { - // log.Fatal(err) - // } - // tlsConfig.Certificates = []tls.Certificate{cert} - // tlsConfig.ServerName = config.SocketOrigin - // tlsConfig.BuildNameToCertificate() - // sockConf.TlsConfig = tlsConfig - // } - - // sockServer := &websocket.Server{} - // sockServer.Config = *sockConf - // sockServer.Handler = HandleSocketConnection - - go deadChannelReaper() - - return nil -} - // Set up a websocket listener and register it on /. // (Uses http.DefaultServeMux .) -func SetupServerAndHandle(config *ConfigFile, tlsConfig *tls.Config, serveMux *http.ServeMux) { - _ = setupServer(config, tlsConfig) - log.Print("hi") +func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { + gconfig = config + + SetupBackend(config) if serveMux == nil { serveMux = http.DefaultServeMux } - handler := websocket.Handler(HandleSocketConnection) - serveMux.HandleFunc("/", ServeWebsocketOrCatbag(handler.ServeHTTP)) + + serveMux.HandleFunc("/", ServeWebsocketOrCatbag) serveMux.HandleFunc("/pub_msg", HBackendPublishRequest) serveMux.HandleFunc("/dump_backlog", HBackendDumpBacklog) serveMux.HandleFunc("/update_and_pub", HBackendUpdateAndPublish) + + go deadChannelReaper() + go backlogJanitor() + go sendAggregateData() } -func ServeWebsocketOrCatbag(sockfunc func(http.ResponseWriter, *http.Request)) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("Connection") == "Upgrade" { - sockfunc(w, r) +func ServeWebsocketOrCatbag(w http.ResponseWriter, r *http.Request) { + fmt.Println("hi") + fmt.Println(r.Header) + if r.Header.Get("Connection") == "Upgrade" { + conn, err := SocketUpgrader.Upgrade(w, r, nil) + if err != nil { + fmt.Fprintf(w, "error: %v", err) return - } else { - w.Write([]byte(gconfig.BannerHTML)) } + fmt.Println("upgraded!") + HandleSocketConnection(conn) + + return + } else { + w.Write([]byte(gconfig.BannerHTML)) } } +var CloseGotBinaryMessage = websocket.CloseError{Code: websocket.CloseUnsupportedData, Text: "got binary packet"} +var CloseGotMessageId0 = websocket.CloseError{Code: websocket.ClosePolicyViolation, Text: "got messageid 0"} + // Handle a new websocket connection from a FFZ client. // This runs in a goroutine started by net/http. func HandleSocketConnection(conn *websocket.Conn) { // websocket.Conn is a ReadWriteCloser - fmt.Println("Got socket connection from", conn.Request().RemoteAddr) + log.Println("Got socket connection from", conn.RemoteAddr()) var _closer sync.Once closer := func() { @@ -150,19 +137,35 @@ func HandleSocketConnection(conn *websocket.Conn) { _serverMessageChan := make(chan ClientMessage) _errorChan := make(chan error) + var client ClientInfo + client.MessageChannel = _serverMessageChan + // Launch receiver goroutine go func(errorChan chan<- error, clientChan chan<- ClientMessage) { var msg ClientMessage + var messageType int + var packet []byte var err error - for ; err == nil; err = FFZCodec.Receive(conn, &msg) { + for ; err == nil; messageType, packet, err = conn.ReadMessage() { + if messageType == websocket.BinaryMessage { + err = &CloseGotBinaryMessage + break + } + if messageType == websocket.CloseMessage { + err = io.EOF + break + } + + UnmarshalClientMessage(packet, messageType, &msg) if msg.MessageID == 0 { continue } clientChan <- msg } - if err != io.EOF { - fmt.Println("Error while reading from client:", err) + _, isClose := err.(*websocket.CloseError) + if err != io.EOF && !isClose { + log.Println("Error while reading from client:", err) } errorChan <- err close(errorChan) @@ -174,39 +177,42 @@ func HandleSocketConnection(conn *websocket.Conn) { var clientChan <-chan ClientMessage = _clientChan var serverMessageChan <-chan ClientMessage = _serverMessageChan - var client ClientInfo - client.MessageChannel = _serverMessageChan - // All set up, now enter the work loop RunLoop: for { select { case err := <-errorChan: - FFZCodec.Send(conn, ClientMessage{ - MessageID: -1, - Command: "error", - Arguments: err.Error(), - }) // note - socket might be closed, but don't care + if err == io.EOF { + conn.Close() // no need to send a close frame :) + break RunLoop + } else if closeMsg, isClose := err.(*websocket.CloseError); isClose { + CloseConnection(conn, closeMsg) + } else { + CloseConnection(conn, &websocket.CloseError{ + Code: websocket.CloseInternalServerErr, + Text: err.Error(), + }) + } + break RunLoop + case msg := <-clientChan: if client.Version == "" && msg.Command != HelloCommand { - FFZCodec.Send(conn, ClientMessage{ - MessageID: msg.MessageID, - Command: "error", - Arguments: "Error - the first message sent must be a 'hello'", + CloseConnection(conn, &websocket.CloseError{ + Text: "Error - the first message sent must be a 'hello'", + Code: websocket.ClosePolicyViolation, }) break RunLoop } HandleCommand(conn, &client, msg) case smsg := <-serverMessageChan: - FFZCodec.Send(conn, smsg) + SendMessage(conn, smsg) } } // Exit - fmt.Println("End socket connection from", conn.Request().RemoteAddr) // Launch message draining goroutine - we aren't out of the pub/sub records go func() { @@ -220,6 +226,8 @@ RunLoop: // And finished. // Close the channel so the draining goroutine can finish, too. close(_serverMessageChan) + + log.Println("End socket connection from", conn.RemoteAddr()) } func CallHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) { @@ -236,8 +244,23 @@ func CallHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInf return handler(conn, client, cmsg) } +func CloseConnection(conn *websocket.Conn, closeMsg *websocket.CloseError) { + fmt.Println("Terminating connection with", conn.RemoteAddr(), "-", closeMsg.Text) + conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(closeMsg.Code, closeMsg.Text), time.Now().Add(2*time.Minute)) + conn.Close() +} + +func SendMessage(conn *websocket.Conn, msg ClientMessage) { + messageType, packet, err := MarshalClientMessage(msg) + if err != nil { + panic(fmt.Sprintf("failed to marshal: %v %v", err, msg)) + } + fmt.Println(string(packet)) + conn.WriteMessage(messageType, packet) +} + // Unpack a message sent from the client into a ClientMessage. -func UnmarshalClientMessage(data []byte, payloadType byte, v interface{}) (err error) { +func UnmarshalClientMessage(data []byte, payloadType int, v interface{}) (err error) { var spaceIdx int out := v.(*ClientMessage) @@ -282,7 +305,7 @@ func (cm *ClientMessage) parseOrigArguments() error { return nil } -func MarshalClientMessage(clientMessage interface{}) (data []byte, payloadType byte, err error) { +func MarshalClientMessage(clientMessage interface{}) (payloadType int, data []byte, err error) { var msg ClientMessage var ok bool msg, ok = clientMessage.(ClientMessage) @@ -309,7 +332,7 @@ func MarshalClientMessage(clientMessage interface{}) (data []byte, payloadType b if msg.Arguments != nil { argBytes, err := json.Marshal(msg.Arguments) if err != nil { - return nil, 0, err + return 0, nil, err } dataStr = fmt.Sprintf("%d %s %s", msg.MessageID, msg.Command, string(argBytes)) @@ -317,7 +340,7 @@ func MarshalClientMessage(clientMessage interface{}) (data []byte, payloadType b dataStr = fmt.Sprintf("%d %s", msg.MessageID, msg.Command) } - return []byte(dataStr), websocket.TextFrame, nil + return websocket.TextMessage, []byte(dataStr), nil } // Command handlers should use this to construct responses. diff --git a/socketserver/internal/server/handlecore_test.go b/socketserver/internal/server/handlecore_test.go index 161b5921..c6444c08 100644 --- a/socketserver/internal/server/handlecore_test.go +++ b/socketserver/internal/server/handlecore_test.go @@ -2,14 +2,14 @@ package server import ( "fmt" - "golang.org/x/net/websocket" + "github.com/gorilla/websocket" "testing" ) func ExampleUnmarshalClientMessage() { sourceData := []byte("100 hello [\"ffz_3.5.30\",\"898b5bfa-b577-47bb-afb4-252c703b67d6\"]") var cm ClientMessage - err := UnmarshalClientMessage(sourceData, websocket.TextFrame, &cm) + err := UnmarshalClientMessage(sourceData, websocket.TextMessage, &cm) fmt.Println(err) fmt.Println(cm.MessageID) fmt.Println(cm.Command) @@ -27,9 +27,9 @@ func ExampleMarshalClientMessage() { Command: "do_authorize", Arguments: "1234567890", } - data, payloadType, err := MarshalClientMessage(&cm) + payloadType, data, err := MarshalClientMessage(&cm) fmt.Println(err) - fmt.Println(payloadType == websocket.TextFrame) + fmt.Println(payloadType == websocket.TextMessage) fmt.Println(string(data)) // Output: // @@ -40,7 +40,7 @@ func ExampleMarshalClientMessage() { func TestArgumentsAsStringAndBool(t *testing.T) { sourceData := []byte("1 foo [\"string\", false]") var cm ClientMessage - err := UnmarshalClientMessage(sourceData, websocket.TextFrame, &cm) + err := UnmarshalClientMessage(sourceData, websocket.TextMessage, &cm) if err != nil { t.Fatal(err) } diff --git a/socketserver/internal/server/publisher_test.go b/socketserver/internal/server/publisher_test.go index 2dc54ed6..7c8fbb89 100644 --- a/socketserver/internal/server/publisher_test.go +++ b/socketserver/internal/server/publisher_test.go @@ -3,8 +3,8 @@ package server import ( "encoding/json" "fmt" + "github.com/gorilla/websocket" "github.com/satori/go.uuid" - "golang.org/x/net/websocket" "io/ioutil" "net/http" "net/http/httptest" @@ -27,7 +27,17 @@ const IgnoreReceivedArguments = 1 + 2i func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) { var msg ClientMessage var fail bool - err := FFZCodec.Receive(conn, &msg) + messageType, packet, err := conn.ReadMessage() + if err != nil { + tb.Error(err) + return msg, false + } + if messageType != websocket.TextMessage { + tb.Error("got non-text message", packet) + return msg, false + } + + err = UnmarshalClientMessage(packet, messageType, &msg) if err != nil { tb.Error(err) return msg, false @@ -56,11 +66,8 @@ func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, } func TSendMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) bool { - err := FFZCodec.Send(conn, ClientMessage{MessageID: messageId, Command: command, Arguments: arguments}) - if err != nil { - tb.Error(err) - } - return err == nil + SendMessage(conn, ClientMessage{MessageID: messageId, Command: command, Arguments: arguments}) + return true } func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments interface{}, deleteMode bool) (url.Values, error) { @@ -157,7 +164,7 @@ func TSetup(testserver **httptest.Server, urls *TURLs) { if testserver != nil { serveMux := http.NewServeMux() - SetupServerAndHandle(conf, nil, serveMux) + SetupServerAndHandle(conf, serveMux) tserv := httptest.NewUnstartedServer(serveMux) *testserver = tserv @@ -195,6 +202,7 @@ func TestSubscriptionAndPublish(t *testing.T) { defer unsubscribeAllClients() var conn *websocket.Conn + var resp *http.Response var err error // client 1: sub ch1, ch2 @@ -207,7 +215,7 @@ func TestSubscriptionAndPublish(t *testing.T) { // msg 4: global // Client 1 - conn, err = websocket.Dial(urls.Websocket, "", urls.Origin) + conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) if err != nil { t.Error(err) return @@ -236,7 +244,7 @@ func TestSubscriptionAndPublish(t *testing.T) { }(conn) // Client 2 - conn, err = websocket.Dial(urls.Websocket, "", urls.Origin) + conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) if err != nil { t.Error(err) return @@ -265,7 +273,7 @@ func TestSubscriptionAndPublish(t *testing.T) { }(conn) // Client 3 - conn, err = websocket.Dial(urls.Websocket, "", urls.Origin) + conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) if err != nil { t.Error(err) return @@ -291,7 +299,6 @@ func TestSubscriptionAndPublish(t *testing.T) { readyWg.Wait() var form url.Values - var resp *http.Response // Publish message 1 - should go to clients 1, 2 @@ -338,7 +345,7 @@ func TestSubscriptionAndPublish(t *testing.T) { } // Start client 4 - conn, err = websocket.Dial(urls.Websocket, "", urls.Origin) + conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) if err != nil { t.Error(err) return @@ -401,7 +408,7 @@ func BenchmarkUserSubscriptionSinglePublish(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - conn, err := websocket.Dial(urls.Websocket, "", urls.Origin) + conn, _, err := websocket.DefaultDialer.Dial(urls.Websocket, http.Header{}) if err != nil { b.Error(err) break