mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-07-28 05:28:30 +00:00
switch to gorilla/websocket, post aggregate data
This commit is contained in:
parent
9c4891db9f
commit
9f1c369fdb
7 changed files with 192 additions and 112 deletions
|
@ -44,7 +44,7 @@ func main() {
|
||||||
Addr: conf.ListenAddr,
|
Addr: conf.ListenAddr,
|
||||||
}
|
}
|
||||||
|
|
||||||
server.SetupServerAndHandle(conf, httpServer.TLSConfig, nil)
|
server.SetupServerAndHandle(conf, nil)
|
||||||
|
|
||||||
go commandLineConsole()
|
go commandLineConsole()
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ var backendUrl string
|
||||||
var responseCache *cache.Cache
|
var responseCache *cache.Cache
|
||||||
|
|
||||||
var getBacklogUrl string
|
var getBacklogUrl string
|
||||||
|
var postStatisticsUrl string
|
||||||
|
|
||||||
var backendSharedKey [32]byte
|
var backendSharedKey [32]byte
|
||||||
var serverId int
|
var serverId int
|
||||||
|
@ -37,6 +38,7 @@ func SetupBackend(config *ConfigFile) {
|
||||||
responseCache = cache.New(60*time.Second, 120*time.Second)
|
responseCache = cache.New(60*time.Second, 120*time.Second)
|
||||||
|
|
||||||
getBacklogUrl = fmt.Sprintf("%s/backlog", backendUrl)
|
getBacklogUrl = fmt.Sprintf("%s/backlog", backendUrl)
|
||||||
|
postStatisticsUrl = fmt.Sprintf("%s/stats", backendUrl)
|
||||||
|
|
||||||
messageBufferPool.New = New4KByteBuffer
|
messageBufferPool.New = New4KByteBuffer
|
||||||
|
|
||||||
|
@ -155,6 +157,15 @@ func RequestRemoteData(remoteCommand, data string, auth AuthInfo) (responseStr s
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SendAggregatedData(sealedForm url.Values) (error) {
|
||||||
|
resp, err := backendHttpClient.PostForm(postStatisticsUrl, sealedForm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.Body.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) {
|
func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) {
|
||||||
formData := url.Values{
|
formData := url.Values{
|
||||||
"subs": chatSubs,
|
"subs": chatSubs,
|
||||||
|
|
|
@ -214,7 +214,7 @@ func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) {
|
||||||
client.Mutex.Unlock()
|
client.Mutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TimedBacklogJanitor() {
|
func backlogJanitor() {
|
||||||
for {
|
for {
|
||||||
time.Sleep(1 * time.Hour)
|
time.Sleep(1 * time.Hour)
|
||||||
CleanupTimedBacklogMessages()
|
CleanupTimedBacklogMessages()
|
||||||
|
|
|
@ -1,13 +1,15 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
"github.com/satori/go.uuid"
|
"github.com/satori/go.uuid"
|
||||||
"golang.org/x/net/websocket"
|
|
||||||
"log"
|
"log"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
"net/url"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ResponseSuccess = ClientMessage{Command: SuccessCommand}
|
var ResponseSuccess = ClientMessage{Command: SuccessCommand}
|
||||||
|
@ -19,7 +21,7 @@ func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage)
|
||||||
handler, ok := CommandHandlers[msg.Command]
|
handler, ok := CommandHandlers[msg.Command]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Println("[!] Unknown command", msg.Command, "- sent by client", client.ClientID, "@", conn.RemoteAddr())
|
log.Println("[!] Unknown command", msg.Command, "- sent by client", client.ClientID, "@", conn.RemoteAddr())
|
||||||
FFZCodec.Send(conn, ClientMessage{
|
SendMessage(conn, ClientMessage{
|
||||||
MessageID: msg.MessageID,
|
MessageID: msg.MessageID,
|
||||||
Command: "error",
|
Command: "error",
|
||||||
Arguments: fmt.Sprintf("Unknown command %s", msg.Command),
|
Arguments: fmt.Sprintf("Unknown command %s", msg.Command),
|
||||||
|
@ -35,10 +37,10 @@ func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage)
|
||||||
// The response will be delivered over client.MessageChannel / serverMessageChan
|
// The response will be delivered over client.MessageChannel / serverMessageChan
|
||||||
} else {
|
} else {
|
||||||
response.MessageID = msg.MessageID
|
response.MessageID = msg.MessageID
|
||||||
FFZCodec.Send(conn, response)
|
SendMessage(conn, response)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
FFZCodec.Send(conn, ClientMessage{
|
SendMessage(conn, ClientMessage{
|
||||||
MessageID: msg.MessageID,
|
MessageID: msg.MessageID,
|
||||||
Command: "error",
|
Command: "error",
|
||||||
Arguments: err.Error(),
|
Arguments: err.Error(),
|
||||||
|
@ -196,27 +198,16 @@ func GetSubscriptionBacklog(conn *websocket.Conn, client *ClientInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type SurveySubmission struct {
|
|
||||||
User string
|
|
||||||
Json string
|
|
||||||
}
|
|
||||||
|
|
||||||
var SurveySubmissions []SurveySubmission
|
|
||||||
var SurveySubmissionLock sync.Mutex
|
|
||||||
|
|
||||||
func HandleSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
func HandleSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||||
SurveySubmissionLock.Lock()
|
// Discard
|
||||||
SurveySubmissions = append(SurveySubmissions, SurveySubmission{client.TwitchUsername, msg.origArguments})
|
|
||||||
SurveySubmissionLock.Unlock()
|
|
||||||
|
|
||||||
return ResponseSuccess, nil
|
return ResponseSuccess, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type FollowEvent struct {
|
type FollowEvent struct {
|
||||||
User string
|
User string `json:u`
|
||||||
Channel string
|
Channel string `json:c`
|
||||||
NowFollowing bool
|
NowFollowing bool `json:f`
|
||||||
Timestamp time.Time
|
Timestamp time.Time `json:t`
|
||||||
}
|
}
|
||||||
|
|
||||||
var FollowEvents []FollowEvent
|
var FollowEvents []FollowEvent
|
||||||
|
@ -270,14 +261,62 @@ func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMess
|
||||||
return ResponseSuccess, nil
|
return ResponseSuccess, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func sendAggregateData() {
|
||||||
|
for {
|
||||||
|
time.Sleep(15 * time.Minute)
|
||||||
|
DoSendAggregateData()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func DoSendAggregateData() {
|
||||||
|
FollowEventsLock.Lock()
|
||||||
|
follows := FollowEvents
|
||||||
|
FollowEvents = nil
|
||||||
|
FollowEventsLock.Unlock()
|
||||||
|
AggregateEmoteUsageLock.Lock()
|
||||||
|
emoteUsage := AggregateEmoteUsage
|
||||||
|
AggregateEmoteUsage = make(map[int]map[string]int)
|
||||||
|
AggregateEmoteUsageLock.Unlock()
|
||||||
|
|
||||||
|
reportForm := url.Values{}
|
||||||
|
|
||||||
|
followJson, err := json.Marshal(follows)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
} else {
|
||||||
|
reportForm.Set("follows", string(followJson))
|
||||||
|
}
|
||||||
|
|
||||||
|
emoteJson, err := json.Marshal(emoteUsage)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
} else {
|
||||||
|
reportForm.Set("emotes", string(emoteJson))
|
||||||
|
}
|
||||||
|
|
||||||
|
form, err := SealRequest(reportForm)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = SendAggregatedData(form)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// done
|
||||||
|
}
|
||||||
|
|
||||||
func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||||
go func(conn *websocket.Conn, msg ClientMessage, authInfo AuthInfo) {
|
go func(conn *websocket.Conn, msg ClientMessage, authInfo AuthInfo) {
|
||||||
resp, err := RequestRemoteDataCached(string(msg.Command), msg.origArguments, authInfo)
|
resp, err := RequestRemoteDataCached(string(msg.Command), msg.origArguments, authInfo)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
FFZCodec.Send(conn, ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()})
|
SendMessage(conn, ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()})
|
||||||
} else {
|
} else {
|
||||||
FFZCodec.Send(conn, ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp})
|
SendMessage(conn, ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp})
|
||||||
}
|
}
|
||||||
}(conn, msg, client.AuthInfo)
|
}(conn, msg, client.AuthInfo)
|
||||||
|
|
||||||
|
|
|
@ -1,17 +1,17 @@
|
||||||
package server // import "bitbucket.org/stendec/frankerfacez/socketserver/internal/server"
|
package server // import "bitbucket.org/stendec/frankerfacez/socketserver/internal/server"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"golang.org/x/net/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const MAX_PACKET_SIZE = 1024
|
const MAX_PACKET_SIZE = 1024
|
||||||
|
@ -54,10 +54,12 @@ const HelloCommand Command = "hello"
|
||||||
// It signals that the work has been handed off to a background goroutine.
|
// It signals that the work has been handed off to a background goroutine.
|
||||||
const AsyncResponseCommand Command = "_async"
|
const AsyncResponseCommand Command = "_async"
|
||||||
|
|
||||||
// A websocket.Codec that translates the protocol into ClientMessage objects.
|
var SocketUpgrader = websocket.Upgrader{
|
||||||
var FFZCodec websocket.Codec = websocket.Codec{
|
ReadBufferSize: 1024,
|
||||||
Marshal: MarshalClientMessage,
|
WriteBufferSize: 1024,
|
||||||
Unmarshal: UnmarshalClientMessage,
|
CheckOrigin: func(r *http.Request) bool {
|
||||||
|
return r.Header.Get("Origin") == "http://www.twitch.tv"
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Errors that get returned to the client.
|
// Errors that get returned to the client.
|
||||||
|
@ -72,69 +74,54 @@ var ExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a floa
|
||||||
|
|
||||||
var gconfig *ConfigFile
|
var gconfig *ConfigFile
|
||||||
|
|
||||||
// Create a websocket.Server with the options from the provided Config.
|
|
||||||
func setupServer(config *ConfigFile, tlsConfig *tls.Config) *websocket.Server {
|
|
||||||
gconfig = config
|
|
||||||
// sockConf, err := websocket.NewConfig("/", config.SocketOrigin)
|
|
||||||
// if err != nil {
|
|
||||||
// log.Fatal(err)
|
|
||||||
// }
|
|
||||||
|
|
||||||
SetupBackend(config)
|
|
||||||
|
|
||||||
// if config.UseSSL {
|
|
||||||
// cert, err := tls.LoadX509KeyPair(config.SSLCertificateFile, config.SSLKeyFile)
|
|
||||||
// if err != nil {
|
|
||||||
// log.Fatal(err)
|
|
||||||
// }
|
|
||||||
// tlsConfig.Certificates = []tls.Certificate{cert}
|
|
||||||
// tlsConfig.ServerName = config.SocketOrigin
|
|
||||||
// tlsConfig.BuildNameToCertificate()
|
|
||||||
// sockConf.TlsConfig = tlsConfig
|
|
||||||
// }
|
|
||||||
|
|
||||||
// sockServer := &websocket.Server{}
|
|
||||||
// sockServer.Config = *sockConf
|
|
||||||
// sockServer.Handler = HandleSocketConnection
|
|
||||||
|
|
||||||
go deadChannelReaper()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set up a websocket listener and register it on /.
|
// Set up a websocket listener and register it on /.
|
||||||
// (Uses http.DefaultServeMux .)
|
// (Uses http.DefaultServeMux .)
|
||||||
func SetupServerAndHandle(config *ConfigFile, tlsConfig *tls.Config, serveMux *http.ServeMux) {
|
func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) {
|
||||||
_ = setupServer(config, tlsConfig)
|
gconfig = config
|
||||||
log.Print("hi")
|
|
||||||
|
SetupBackend(config)
|
||||||
|
|
||||||
if serveMux == nil {
|
if serveMux == nil {
|
||||||
serveMux = http.DefaultServeMux
|
serveMux = http.DefaultServeMux
|
||||||
}
|
}
|
||||||
handler := websocket.Handler(HandleSocketConnection)
|
|
||||||
serveMux.HandleFunc("/", ServeWebsocketOrCatbag(handler.ServeHTTP))
|
serveMux.HandleFunc("/", ServeWebsocketOrCatbag)
|
||||||
serveMux.HandleFunc("/pub_msg", HBackendPublishRequest)
|
serveMux.HandleFunc("/pub_msg", HBackendPublishRequest)
|
||||||
serveMux.HandleFunc("/dump_backlog", HBackendDumpBacklog)
|
serveMux.HandleFunc("/dump_backlog", HBackendDumpBacklog)
|
||||||
serveMux.HandleFunc("/update_and_pub", HBackendUpdateAndPublish)
|
serveMux.HandleFunc("/update_and_pub", HBackendUpdateAndPublish)
|
||||||
|
|
||||||
|
go deadChannelReaper()
|
||||||
|
go backlogJanitor()
|
||||||
|
go sendAggregateData()
|
||||||
}
|
}
|
||||||
|
|
||||||
func ServeWebsocketOrCatbag(sockfunc func(http.ResponseWriter, *http.Request)) http.HandlerFunc {
|
func ServeWebsocketOrCatbag(w http.ResponseWriter, r *http.Request) {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
fmt.Println("hi")
|
||||||
|
fmt.Println(r.Header)
|
||||||
if r.Header.Get("Connection") == "Upgrade" {
|
if r.Header.Get("Connection") == "Upgrade" {
|
||||||
sockfunc(w, r)
|
conn, err := SocketUpgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(w, "error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fmt.Println("upgraded!")
|
||||||
|
HandleSocketConnection(conn)
|
||||||
|
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
w.Write([]byte(gconfig.BannerHTML))
|
w.Write([]byte(gconfig.BannerHTML))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
var CloseGotBinaryMessage = websocket.CloseError{Code: websocket.CloseUnsupportedData, Text: "got binary packet"}
|
||||||
|
var CloseGotMessageId0 = websocket.CloseError{Code: websocket.ClosePolicyViolation, Text: "got messageid 0"}
|
||||||
|
|
||||||
// Handle a new websocket connection from a FFZ client.
|
// Handle a new websocket connection from a FFZ client.
|
||||||
// This runs in a goroutine started by net/http.
|
// This runs in a goroutine started by net/http.
|
||||||
func HandleSocketConnection(conn *websocket.Conn) {
|
func HandleSocketConnection(conn *websocket.Conn) {
|
||||||
// websocket.Conn is a ReadWriteCloser
|
// websocket.Conn is a ReadWriteCloser
|
||||||
|
|
||||||
fmt.Println("Got socket connection from", conn.Request().RemoteAddr)
|
log.Println("Got socket connection from", conn.RemoteAddr())
|
||||||
|
|
||||||
var _closer sync.Once
|
var _closer sync.Once
|
||||||
closer := func() {
|
closer := func() {
|
||||||
|
@ -150,19 +137,35 @@ func HandleSocketConnection(conn *websocket.Conn) {
|
||||||
_serverMessageChan := make(chan ClientMessage)
|
_serverMessageChan := make(chan ClientMessage)
|
||||||
_errorChan := make(chan error)
|
_errorChan := make(chan error)
|
||||||
|
|
||||||
|
var client ClientInfo
|
||||||
|
client.MessageChannel = _serverMessageChan
|
||||||
|
|
||||||
// Launch receiver goroutine
|
// Launch receiver goroutine
|
||||||
go func(errorChan chan<- error, clientChan chan<- ClientMessage) {
|
go func(errorChan chan<- error, clientChan chan<- ClientMessage) {
|
||||||
var msg ClientMessage
|
var msg ClientMessage
|
||||||
|
var messageType int
|
||||||
|
var packet []byte
|
||||||
var err error
|
var err error
|
||||||
for ; err == nil; err = FFZCodec.Receive(conn, &msg) {
|
for ; err == nil; messageType, packet, err = conn.ReadMessage() {
|
||||||
|
if messageType == websocket.BinaryMessage {
|
||||||
|
err = &CloseGotBinaryMessage
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if messageType == websocket.CloseMessage {
|
||||||
|
err = io.EOF
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
UnmarshalClientMessage(packet, messageType, &msg)
|
||||||
if msg.MessageID == 0 {
|
if msg.MessageID == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
clientChan <- msg
|
clientChan <- msg
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != io.EOF {
|
_, isClose := err.(*websocket.CloseError)
|
||||||
fmt.Println("Error while reading from client:", err)
|
if err != io.EOF && !isClose {
|
||||||
|
log.Println("Error while reading from client:", err)
|
||||||
}
|
}
|
||||||
errorChan <- err
|
errorChan <- err
|
||||||
close(errorChan)
|
close(errorChan)
|
||||||
|
@ -174,39 +177,42 @@ func HandleSocketConnection(conn *websocket.Conn) {
|
||||||
var clientChan <-chan ClientMessage = _clientChan
|
var clientChan <-chan ClientMessage = _clientChan
|
||||||
var serverMessageChan <-chan ClientMessage = _serverMessageChan
|
var serverMessageChan <-chan ClientMessage = _serverMessageChan
|
||||||
|
|
||||||
var client ClientInfo
|
|
||||||
client.MessageChannel = _serverMessageChan
|
|
||||||
|
|
||||||
// All set up, now enter the work loop
|
// All set up, now enter the work loop
|
||||||
|
|
||||||
RunLoop:
|
RunLoop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case err := <-errorChan:
|
case err := <-errorChan:
|
||||||
FFZCodec.Send(conn, ClientMessage{
|
if err == io.EOF {
|
||||||
MessageID: -1,
|
conn.Close() // no need to send a close frame :)
|
||||||
Command: "error",
|
|
||||||
Arguments: err.Error(),
|
|
||||||
}) // note - socket might be closed, but don't care
|
|
||||||
break RunLoop
|
break RunLoop
|
||||||
|
} else if closeMsg, isClose := err.(*websocket.CloseError); isClose {
|
||||||
|
CloseConnection(conn, closeMsg)
|
||||||
|
} else {
|
||||||
|
CloseConnection(conn, &websocket.CloseError{
|
||||||
|
Code: websocket.CloseInternalServerErr,
|
||||||
|
Text: err.Error(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
break RunLoop
|
||||||
|
|
||||||
case msg := <-clientChan:
|
case msg := <-clientChan:
|
||||||
if client.Version == "" && msg.Command != HelloCommand {
|
if client.Version == "" && msg.Command != HelloCommand {
|
||||||
FFZCodec.Send(conn, ClientMessage{
|
CloseConnection(conn, &websocket.CloseError{
|
||||||
MessageID: msg.MessageID,
|
Text: "Error - the first message sent must be a 'hello'",
|
||||||
Command: "error",
|
Code: websocket.ClosePolicyViolation,
|
||||||
Arguments: "Error - the first message sent must be a 'hello'",
|
|
||||||
})
|
})
|
||||||
break RunLoop
|
break RunLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
HandleCommand(conn, &client, msg)
|
HandleCommand(conn, &client, msg)
|
||||||
case smsg := <-serverMessageChan:
|
case smsg := <-serverMessageChan:
|
||||||
FFZCodec.Send(conn, smsg)
|
SendMessage(conn, smsg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exit
|
// Exit
|
||||||
fmt.Println("End socket connection from", conn.Request().RemoteAddr)
|
|
||||||
|
|
||||||
// Launch message draining goroutine - we aren't out of the pub/sub records
|
// Launch message draining goroutine - we aren't out of the pub/sub records
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -220,6 +226,8 @@ RunLoop:
|
||||||
// And finished.
|
// And finished.
|
||||||
// Close the channel so the draining goroutine can finish, too.
|
// Close the channel so the draining goroutine can finish, too.
|
||||||
close(_serverMessageChan)
|
close(_serverMessageChan)
|
||||||
|
|
||||||
|
log.Println("End socket connection from", conn.RemoteAddr())
|
||||||
}
|
}
|
||||||
|
|
||||||
func CallHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) {
|
func CallHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) {
|
||||||
|
@ -236,8 +244,23 @@ func CallHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInf
|
||||||
return handler(conn, client, cmsg)
|
return handler(conn, client, cmsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CloseConnection(conn *websocket.Conn, closeMsg *websocket.CloseError) {
|
||||||
|
fmt.Println("Terminating connection with", conn.RemoteAddr(), "-", closeMsg.Text)
|
||||||
|
conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(closeMsg.Code, closeMsg.Text), time.Now().Add(2*time.Minute))
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func SendMessage(conn *websocket.Conn, msg ClientMessage) {
|
||||||
|
messageType, packet, err := MarshalClientMessage(msg)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("failed to marshal: %v %v", err, msg))
|
||||||
|
}
|
||||||
|
fmt.Println(string(packet))
|
||||||
|
conn.WriteMessage(messageType, packet)
|
||||||
|
}
|
||||||
|
|
||||||
// Unpack a message sent from the client into a ClientMessage.
|
// Unpack a message sent from the client into a ClientMessage.
|
||||||
func UnmarshalClientMessage(data []byte, payloadType byte, v interface{}) (err error) {
|
func UnmarshalClientMessage(data []byte, payloadType int, v interface{}) (err error) {
|
||||||
var spaceIdx int
|
var spaceIdx int
|
||||||
|
|
||||||
out := v.(*ClientMessage)
|
out := v.(*ClientMessage)
|
||||||
|
@ -282,7 +305,7 @@ func (cm *ClientMessage) parseOrigArguments() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func MarshalClientMessage(clientMessage interface{}) (data []byte, payloadType byte, err error) {
|
func MarshalClientMessage(clientMessage interface{}) (payloadType int, data []byte, err error) {
|
||||||
var msg ClientMessage
|
var msg ClientMessage
|
||||||
var ok bool
|
var ok bool
|
||||||
msg, ok = clientMessage.(ClientMessage)
|
msg, ok = clientMessage.(ClientMessage)
|
||||||
|
@ -309,7 +332,7 @@ func MarshalClientMessage(clientMessage interface{}) (data []byte, payloadType b
|
||||||
if msg.Arguments != nil {
|
if msg.Arguments != nil {
|
||||||
argBytes, err := json.Marshal(msg.Arguments)
|
argBytes, err := json.Marshal(msg.Arguments)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
dataStr = fmt.Sprintf("%d %s %s", msg.MessageID, msg.Command, string(argBytes))
|
dataStr = fmt.Sprintf("%d %s %s", msg.MessageID, msg.Command, string(argBytes))
|
||||||
|
@ -317,7 +340,7 @@ func MarshalClientMessage(clientMessage interface{}) (data []byte, payloadType b
|
||||||
dataStr = fmt.Sprintf("%d %s", msg.MessageID, msg.Command)
|
dataStr = fmt.Sprintf("%d %s", msg.MessageID, msg.Command)
|
||||||
}
|
}
|
||||||
|
|
||||||
return []byte(dataStr), websocket.TextFrame, nil
|
return websocket.TextMessage, []byte(dataStr), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Command handlers should use this to construct responses.
|
// Command handlers should use this to construct responses.
|
||||||
|
|
|
@ -2,14 +2,14 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"golang.org/x/net/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ExampleUnmarshalClientMessage() {
|
func ExampleUnmarshalClientMessage() {
|
||||||
sourceData := []byte("100 hello [\"ffz_3.5.30\",\"898b5bfa-b577-47bb-afb4-252c703b67d6\"]")
|
sourceData := []byte("100 hello [\"ffz_3.5.30\",\"898b5bfa-b577-47bb-afb4-252c703b67d6\"]")
|
||||||
var cm ClientMessage
|
var cm ClientMessage
|
||||||
err := UnmarshalClientMessage(sourceData, websocket.TextFrame, &cm)
|
err := UnmarshalClientMessage(sourceData, websocket.TextMessage, &cm)
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
fmt.Println(cm.MessageID)
|
fmt.Println(cm.MessageID)
|
||||||
fmt.Println(cm.Command)
|
fmt.Println(cm.Command)
|
||||||
|
@ -27,9 +27,9 @@ func ExampleMarshalClientMessage() {
|
||||||
Command: "do_authorize",
|
Command: "do_authorize",
|
||||||
Arguments: "1234567890",
|
Arguments: "1234567890",
|
||||||
}
|
}
|
||||||
data, payloadType, err := MarshalClientMessage(&cm)
|
payloadType, data, err := MarshalClientMessage(&cm)
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
fmt.Println(payloadType == websocket.TextFrame)
|
fmt.Println(payloadType == websocket.TextMessage)
|
||||||
fmt.Println(string(data))
|
fmt.Println(string(data))
|
||||||
// Output:
|
// Output:
|
||||||
// <nil>
|
// <nil>
|
||||||
|
@ -40,7 +40,7 @@ func ExampleMarshalClientMessage() {
|
||||||
func TestArgumentsAsStringAndBool(t *testing.T) {
|
func TestArgumentsAsStringAndBool(t *testing.T) {
|
||||||
sourceData := []byte("1 foo [\"string\", false]")
|
sourceData := []byte("1 foo [\"string\", false]")
|
||||||
var cm ClientMessage
|
var cm ClientMessage
|
||||||
err := UnmarshalClientMessage(sourceData, websocket.TextFrame, &cm)
|
err := UnmarshalClientMessage(sourceData, websocket.TextMessage, &cm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,8 @@ package server
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
"github.com/satori/go.uuid"
|
"github.com/satori/go.uuid"
|
||||||
"golang.org/x/net/websocket"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
@ -27,7 +27,17 @@ const IgnoreReceivedArguments = 1 + 2i
|
||||||
func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) {
|
func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) (ClientMessage, bool) {
|
||||||
var msg ClientMessage
|
var msg ClientMessage
|
||||||
var fail bool
|
var fail bool
|
||||||
err := FFZCodec.Receive(conn, &msg)
|
messageType, packet, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
tb.Error(err)
|
||||||
|
return msg, false
|
||||||
|
}
|
||||||
|
if messageType != websocket.TextMessage {
|
||||||
|
tb.Error("got non-text message", packet)
|
||||||
|
return msg, false
|
||||||
|
}
|
||||||
|
|
||||||
|
err = UnmarshalClientMessage(packet, messageType, &msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tb.Error(err)
|
tb.Error(err)
|
||||||
return msg, false
|
return msg, false
|
||||||
|
@ -56,11 +66,8 @@ func TReceiveExpectedMessage(tb testing.TB, conn *websocket.Conn, messageId int,
|
||||||
}
|
}
|
||||||
|
|
||||||
func TSendMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) bool {
|
func TSendMessage(tb testing.TB, conn *websocket.Conn, messageId int, command Command, arguments interface{}) bool {
|
||||||
err := FFZCodec.Send(conn, ClientMessage{MessageID: messageId, Command: command, Arguments: arguments})
|
SendMessage(conn, ClientMessage{MessageID: messageId, Command: command, Arguments: arguments})
|
||||||
if err != nil {
|
return true
|
||||||
tb.Error(err)
|
|
||||||
}
|
|
||||||
return err == nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments interface{}, deleteMode bool) (url.Values, error) {
|
func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments interface{}, deleteMode bool) (url.Values, error) {
|
||||||
|
@ -157,7 +164,7 @@ func TSetup(testserver **httptest.Server, urls *TURLs) {
|
||||||
|
|
||||||
if testserver != nil {
|
if testserver != nil {
|
||||||
serveMux := http.NewServeMux()
|
serveMux := http.NewServeMux()
|
||||||
SetupServerAndHandle(conf, nil, serveMux)
|
SetupServerAndHandle(conf, serveMux)
|
||||||
|
|
||||||
tserv := httptest.NewUnstartedServer(serveMux)
|
tserv := httptest.NewUnstartedServer(serveMux)
|
||||||
*testserver = tserv
|
*testserver = tserv
|
||||||
|
@ -195,6 +202,7 @@ func TestSubscriptionAndPublish(t *testing.T) {
|
||||||
defer unsubscribeAllClients()
|
defer unsubscribeAllClients()
|
||||||
|
|
||||||
var conn *websocket.Conn
|
var conn *websocket.Conn
|
||||||
|
var resp *http.Response
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// client 1: sub ch1, ch2
|
// client 1: sub ch1, ch2
|
||||||
|
@ -207,7 +215,7 @@ func TestSubscriptionAndPublish(t *testing.T) {
|
||||||
// msg 4: global
|
// msg 4: global
|
||||||
|
|
||||||
// Client 1
|
// Client 1
|
||||||
conn, err = websocket.Dial(urls.Websocket, "", urls.Origin)
|
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -236,7 +244,7 @@ func TestSubscriptionAndPublish(t *testing.T) {
|
||||||
}(conn)
|
}(conn)
|
||||||
|
|
||||||
// Client 2
|
// Client 2
|
||||||
conn, err = websocket.Dial(urls.Websocket, "", urls.Origin)
|
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -265,7 +273,7 @@ func TestSubscriptionAndPublish(t *testing.T) {
|
||||||
}(conn)
|
}(conn)
|
||||||
|
|
||||||
// Client 3
|
// Client 3
|
||||||
conn, err = websocket.Dial(urls.Websocket, "", urls.Origin)
|
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -291,7 +299,6 @@ func TestSubscriptionAndPublish(t *testing.T) {
|
||||||
readyWg.Wait()
|
readyWg.Wait()
|
||||||
|
|
||||||
var form url.Values
|
var form url.Values
|
||||||
var resp *http.Response
|
|
||||||
|
|
||||||
// Publish message 1 - should go to clients 1, 2
|
// Publish message 1 - should go to clients 1, 2
|
||||||
|
|
||||||
|
@ -338,7 +345,7 @@ func TestSubscriptionAndPublish(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start client 4
|
// Start client 4
|
||||||
conn, err = websocket.Dial(urls.Websocket, "", urls.Origin)
|
conn, resp, err = websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -401,7 +408,7 @@ func BenchmarkUserSubscriptionSinglePublish(b *testing.B) {
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
conn, err := websocket.Dial(urls.Websocket, "", urls.Origin)
|
conn, _, err := websocket.DefaultDialer.Dial(urls.Websocket, http.Header{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Error(err)
|
b.Error(err)
|
||||||
break
|
break
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue