From edb728f7bb3b3020c3738de49d0867a0015bc58a Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 23 Oct 2017 15:39:00 -0700 Subject: [PATCH 1/7] Optimize MarshalClientMessage to avoid copies --- socketserver/server/handlecore.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 525101aa..fe62c927 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -1,6 +1,7 @@ package server // import "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server" import ( + "bytes" "encoding/json" "errors" "fmt" @@ -619,7 +620,6 @@ func MarshalClientMessage(clientMessage interface{}) (int, []byte, error) { } msg = *pMsg } - var dataStr string if msg.Command == "" && msg.MessageID == 0 { panic("MarshalClientMessage: attempt to send an empty ClientMessage") @@ -632,20 +632,25 @@ func MarshalClientMessage(clientMessage interface{}) (int, []byte, error) { msg.MessageID = -1 } + // optimized from fmt.Sprintf("%d %s %s", msg.MessageID, msg.Command, ...) + var buf bytes.Buffer + fmt.Fprint(&buf, msg.MessageID) + buf.WriteByte(' ') + buf.WriteString(string(msg.Command)) + if msg.origArguments != "" { - dataStr = fmt.Sprintf("%d %s %s", msg.MessageID, msg.Command, msg.origArguments) + buf.WriteByte(' ') + buf.WriteString(msg.origArguments) } else if msg.Arguments != nil { argBytes, err := json.Marshal(msg.Arguments) if err != nil { return 0, nil, err } - - dataStr = fmt.Sprintf("%d %s %s", msg.MessageID, msg.Command, string(argBytes)) - } else { - dataStr = fmt.Sprintf("%d %s", msg.MessageID, msg.Command) + buf.WriteByte(' ') + buf.Write(argBytes) } - return websocket.TextMessage, []byte(dataStr), nil + return websocket.TextMessage, buf.Bytes(), nil } // ArgumentsAsString parses the arguments of the ClientMessage as a single string. From a900d1521e890ba65f2b936fd55115b1ecf5e447 Mon Sep 17 00:00:00 2001 From: Kane York Date: Fri, 27 Oct 2017 13:13:39 -0700 Subject: [PATCH 2/7] Optimize pings to reduce timer percolation --- socketserver/server/handlecore.go | 25 +++++++++++++------------ socketserver/server/types.go | 4 ---- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index fe62c927..bba45d62 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -375,11 +375,7 @@ func RunSocketConnection(conn *websocket.Conn) { // report.RemoteAddr = client.RemoteAddr conn.SetPongHandler(func(pongBody string) error { - client.Mutex.Lock() - if client.HelloOK { // do not accept PONGs until hello sent - client.pingCount = 0 - } - client.Mutex.Unlock() + _clientChan <- ClientMessage{Command: "__ping"} return nil }) @@ -467,6 +463,10 @@ func runSocketReader(conn *websocket.Conn, client *ClientInfo, errorChan chan<- } func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan error, clientChan <-chan ClientMessage, serverMessageChan <-chan ClientMessage) websocket.CloseError { + pingTicker := time.NewTicker(1*time.Minute) + defer pingTicker.Stop() + lastPacket := time.Now() + for { select { case err := <-errorChan: @@ -488,6 +488,10 @@ func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan if !client.HelloOK && msg.Command != HelloCommand { return CloseFirstMessageNotHello } + lastPacket = time.Now() + if msg.Command == "__ping" { + continue // generated by server, not by client + } for _, char := range msg.Command { if char == utf8.RuneError { @@ -506,14 +510,11 @@ func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan } SendMessage(conn, msg) - case <-time.After(1 * time.Minute): - client.Mutex.Lock() - client.pingCount++ - tooManyPings := client.pingCount == 5 - client.Mutex.Unlock() - if tooManyPings { + case <-pingTicker.C: + now := time.Now() + if lastPacket.Add(5*time.Minute).Before(now) { return CloseTimedOut - } else { + } else if lastPacket.Add(1*time.Minute).Before(now) { conn.WriteControl(websocket.PingMessage, []byte(strconv.FormatInt(time.Now().Unix(), 10)), getDeadline()) } diff --git a/socketserver/server/types.go b/socketserver/server/types.go index 3face4c4..e5b464d9 100644 --- a/socketserver/server/types.go +++ b/socketserver/server/types.go @@ -141,10 +141,6 @@ type ClientInfo struct { // Take out an Add() on this during a command if you need to use the MessageChannel later. MsgChannelKeepalive sync.WaitGroup - - // The number of pings sent without a response. - // Protected by Mutex - pingCount int } func VersionFromString(v string) ClientVersion { From 85a0fb7b790ca7434cc69bcc799a5382b0612432 Mon Sep 17 00:00:00 2001 From: Kane York Date: Fri, 27 Oct 2017 13:14:29 -0700 Subject: [PATCH 3/7] Make client.messageChannel unexported --- socketserver/server/handlecore.go | 2 +- socketserver/server/subscriptions.go | 4 +++- socketserver/server/types.go | 16 ++++++++-------- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index bba45d62..882d6f76 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -366,7 +366,7 @@ func RunSocketConnection(conn *websocket.Conn) { stoppedChan := make(chan struct{}) var client ClientInfo - client.MessageChannel = _serverMessageChan + client.messageChannel = _serverMessageChan client.RemoteAddr = conn.RemoteAddr() client.MsgChannelIsDone = stoppedChan diff --git a/socketserver/server/subscriptions.go b/socketserver/server/subscriptions.go index 136fa603..7486876b 100644 --- a/socketserver/server/subscriptions.go +++ b/socketserver/server/subscriptions.go @@ -18,9 +18,11 @@ var ChatSubscriptionLock sync.RWMutex var GlobalSubscriptionInfo []*ClientInfo var GlobalSubscriptionLock sync.RWMutex +// Send a message to the client. +// Drops if buffer is full. func (client *ClientInfo) Send(msg ClientMessage) bool { select { - case client.MessageChannel <- msg: + case client.messageChannel <- msg: return true case <-client.MsgChannelIsDone: return false diff --git a/socketserver/server/types.go b/socketserver/server/types.go index e5b464d9..24482261 100644 --- a/socketserver/server/types.go +++ b/socketserver/server/types.go @@ -94,12 +94,6 @@ type AuthInfo struct { UsernameValidated bool } -type ClientVersion struct { - Major int - Minor int - Revision int -} - type ClientInfo struct { // The client ID. // This must be written once by the owning goroutine before the struct is passed off to any other goroutines. @@ -134,15 +128,21 @@ type ClientInfo struct { ReadyComplete bool // Server-initiated messages should be sent via the Send() method. - MessageChannel chan<- ClientMessage + messageChannel chan<- ClientMessage // Closed when the client is shutting down. MsgChannelIsDone <-chan struct{} - // Take out an Add() on this during a command if you need to use the MessageChannel later. + // Take out an Add() on this during a command if you need to call Send() later. MsgChannelKeepalive sync.WaitGroup } +type ClientVersion struct { + Major int + Minor int + Revision int +} + func VersionFromString(v string) ClientVersion { var cv ClientVersion fmt.Sscanf(v, "ffz_%d.%d.%d", &cv.Major, &cv.Minor, &cv.Revision) From ea4909cdc6c4256cd52e620d7308686c6994f008 Mon Sep 17 00:00:00 2001 From: Kane York Date: Fri, 27 Oct 2017 13:15:00 -0700 Subject: [PATCH 4/7] Reformat --- socketserver/server/handlecore.go | 6 +++--- socketserver/server/types.go | 6 ++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 882d6f76..6e572819 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -463,7 +463,7 @@ func runSocketReader(conn *websocket.Conn, client *ClientInfo, errorChan chan<- } func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan error, clientChan <-chan ClientMessage, serverMessageChan <-chan ClientMessage) websocket.CloseError { - pingTicker := time.NewTicker(1*time.Minute) + pingTicker := time.NewTicker(1 * time.Minute) defer pingTicker.Stop() lastPacket := time.Now() @@ -512,9 +512,9 @@ func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan case <-pingTicker.C: now := time.Now() - if lastPacket.Add(5*time.Minute).Before(now) { + if lastPacket.Add(5 * time.Minute).Before(now) { return CloseTimedOut - } else if lastPacket.Add(1*time.Minute).Before(now) { + } else if lastPacket.Add(1 * time.Minute).Before(now) { conn.WriteControl(websocket.PingMessage, []byte(strconv.FormatInt(time.Now().Unix(), 10)), getDeadline()) } diff --git a/socketserver/server/types.go b/socketserver/server/types.go index 24482261..b4fae2af 100644 --- a/socketserver/server/types.go +++ b/socketserver/server/types.go @@ -47,13 +47,11 @@ type ConfigFile struct { ProxyRoutes []ProxyRoute } - type ProxyRoute struct { - Route string - Server string + Route string + Server string } - type ClientMessage struct { // Message ID. Increments by 1 for each message sent from the client. // When replying to a command, the message ID must be echoed. From 9c9fde01602a3c4d0810e9aba634b0a9c4d2416d Mon Sep 17 00:00:00 2001 From: Kane York Date: Wed, 8 Nov 2017 14:34:56 -0800 Subject: [PATCH 5/7] Use read deadlines instead of ping tickers --- socketserver/server/handlecore.go | 53 +++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 6e572819..197e6715 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -425,7 +425,24 @@ func runSocketReader(conn *websocket.Conn, client *ClientInfo, errorChan chan<- defer close(errorChan) defer close(clientChan) - for ; err == nil; messageType, packet, err = conn.ReadMessage() { + for { + conn.SetReadDeadline(time.Now().Add(1 * time.Minute)) + messageType, packet, err = conn.ReadMessage() + // handle ReadDeadline by sending a ping + // writer loop handles repeated ping timeouts + if tmErr, ok := err.(interface { + Timeout() bool + }); ok && tmErr.Timeout() { + select { + case <-stoppedChan: + return + case clientChan <- ClientMessage{Command: "__readTimeout"}: + } + continue // re-set deadline and wait for pong packet + } + if err != nil { + break + } if messageType == websocket.BinaryMessage { err = &CloseGotBinaryMessage break @@ -448,23 +465,22 @@ func runSocketReader(conn *websocket.Conn, client *ClientInfo, errorChan chan<- } else if msg.MessageID == 0 { continue } + select { - case clientChan <- msg: case <-stoppedChan: return + case clientChan <- msg: } } select { - case errorChan <- err: case <-stoppedChan: + case errorChan <- err: } // exit goroutine } func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan error, clientChan <-chan ClientMessage, serverMessageChan <-chan ClientMessage) websocket.CloseError { - pingTicker := time.NewTicker(1 * time.Minute) - defer pingTicker.Stop() lastPacket := time.Now() for { @@ -485,12 +501,29 @@ func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan } case msg := <-clientChan: + if msg.Command == "__readTimeout" { + // generated on 60 seconds without a message + now := time.Now() + if lastPacket.Add(5 * time.Minute).Before(now) { + return CloseTimedOut + } + conn.WriteControl( + websocket.PingMessage, + []byte(strconv.FormatInt(time.Now().Unix(), 10)), + getDeadline(), + ) + continue + } + if !client.HelloOK && msg.Command != HelloCommand { return CloseFirstMessageNotHello } lastPacket = time.Now() + if msg.Command == "__ping" { - continue // generated by server, not by client + // generated for PONG packets + // want this to run AFTER lastPacket was set + continue } for _, char := range msg.Command { @@ -510,14 +543,6 @@ func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan } SendMessage(conn, msg) - case <-pingTicker.C: - now := time.Now() - if lastPacket.Add(5 * time.Minute).Before(now) { - return CloseTimedOut - } else if lastPacket.Add(1 * time.Minute).Before(now) { - conn.WriteControl(websocket.PingMessage, []byte(strconv.FormatInt(time.Now().Unix(), 10)), getDeadline()) - } - case <-StopAcceptingConnectionsCh: return CloseGoingAway } From 26cddf5271cf618680e04f9cad3ebde1b9c9d4be Mon Sep 17 00:00:00 2001 From: Kane York Date: Wed, 8 Nov 2017 14:40:51 -0800 Subject: [PATCH 6/7] send const ping payload --- socketserver/server/handlecore.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 197e6715..fa3158fe 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -480,6 +480,8 @@ func runSocketReader(conn *websocket.Conn, client *ClientInfo, errorChan chan<- // exit goroutine } +var pingPayload = []byte("PING") + func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan error, clientChan <-chan ClientMessage, serverMessageChan <-chan ClientMessage) websocket.CloseError { lastPacket := time.Now() @@ -509,7 +511,7 @@ func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan } conn.WriteControl( websocket.PingMessage, - []byte(strconv.FormatInt(time.Now().Unix(), 10)), + pingPayload, getDeadline(), ) continue @@ -522,7 +524,7 @@ func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan if msg.Command == "__ping" { // generated for PONG packets - // want this to run AFTER lastPacket was set + // want to branch AFTER lastPacket is set continue } From a0b3e049d03056bd2c8bc8163130f5a9692df5c9 Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 13 Nov 2017 12:48:56 -0800 Subject: [PATCH 7/7] Reject over capacity before finishing TLS handshake --- .../cmd/ffzsocketserver/socketserver.go | 3 +- socketserver/server/handlecore.go | 37 +++++++++++++------ 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/socketserver/cmd/ffzsocketserver/socketserver.go b/socketserver/cmd/ffzsocketserver/socketserver.go index 1f7f2db0..cea1bb07 100644 --- a/socketserver/cmd/ffzsocketserver/socketserver.go +++ b/socketserver/cmd/ffzsocketserver/socketserver.go @@ -81,7 +81,8 @@ func main() { Addr: conf.SSLListenAddr, Handler: http.DefaultServeMux, TLSConfig: &tls.Config{ - GetCertificate: reloader.GetCertificateFunc(), + GetCertificate: reloader.GetCertificateFunc(), + GetConfigForClient: server.TLSEarlyReject, }, } go func() { diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index fa3158fe..a1383a91 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -2,6 +2,7 @@ package server // import "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/serv import ( "bytes" + "crypto/tls" "encoding/json" "errors" "fmt" @@ -239,6 +240,29 @@ var BannerHTML []byte // StopAcceptingConnectionsCh is closed while the server is shutting down. var StopAcceptingConnectionsCh = make(chan struct{}) +func shouldRejectConnection() bool { + memFreeKB := atomic.LoadUint64(&Statistics.SysMemFreeKB) + if memFreeKB > 0 && memFreeKB < Configuration.MinMemoryKBytes { + return true + } + + curClients := atomic.LoadUint64(&Statistics.CurrentClientCount) + if Configuration.MaxClientCount != 0 && curClients >= Configuration.MaxClientCount { + return true + } + + return false +} + +var errEarlyTLSReject = errors.New("over capacity") + +func TLSEarlyReject(*tls.ClientHelloInfo) (*tls.Config, error) { + if shouldRejectConnection() { + return nil, errEarlyTLSReject + } + return nil, nil +} + // HTTPHandleRootURL is the http.HandleFunc for requests on `/`. // It either uses the SocketUpgrader or writes out the BannerHTML. func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) { @@ -251,21 +275,12 @@ func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) { if strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") { updateSysMem() - if Statistics.SysMemFreeKB > 0 && Statistics.SysMemFreeKB < Configuration.MinMemoryKBytes { + if shouldRejectConnection() { w.WriteHeader(503) - fmt.Fprint(w, "error: low memory") + fmt.Fprint(w, "connection rejected: over capacity") return } - if Configuration.MaxClientCount != 0 { - curClients := atomic.LoadUint64(&Statistics.CurrentClientCount) - if curClients >= Configuration.MaxClientCount { - w.WriteHeader(503) - fmt.Fprint(w, "error: client limit reached") - return - } - } - conn, err := SocketUpgrader.Upgrade(w, r, nil) if err != nil { fmt.Fprintf(w, "error: %v", err)