1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-07-26 04:28:31 +00:00

Merge pull request #265 from riking/go-optimize

Small CPU optimizations for socket server
This commit is contained in:
Mike 2017-11-13 15:53:39 -05:00 committed by GitHub
commit 1f3ad54d83
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 100 additions and 55 deletions

View file

@ -81,7 +81,8 @@ func main() {
Addr: conf.SSLListenAddr, Addr: conf.SSLListenAddr,
Handler: http.DefaultServeMux, Handler: http.DefaultServeMux,
TLSConfig: &tls.Config{ TLSConfig: &tls.Config{
GetCertificate: reloader.GetCertificateFunc(), GetCertificate: reloader.GetCertificateFunc(),
GetConfigForClient: server.TLSEarlyReject,
}, },
} }
go func() { go func() {

View file

@ -1,6 +1,8 @@
package server // import "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server" package server // import "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server"
import ( import (
"bytes"
"crypto/tls"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -238,6 +240,29 @@ var BannerHTML []byte
// StopAcceptingConnectionsCh is closed while the server is shutting down. // StopAcceptingConnectionsCh is closed while the server is shutting down.
var StopAcceptingConnectionsCh = make(chan struct{}) var StopAcceptingConnectionsCh = make(chan struct{})
func shouldRejectConnection() bool {
memFreeKB := atomic.LoadUint64(&Statistics.SysMemFreeKB)
if memFreeKB > 0 && memFreeKB < Configuration.MinMemoryKBytes {
return true
}
curClients := atomic.LoadUint64(&Statistics.CurrentClientCount)
if Configuration.MaxClientCount != 0 && curClients >= Configuration.MaxClientCount {
return true
}
return false
}
var errEarlyTLSReject = errors.New("over capacity")
func TLSEarlyReject(*tls.ClientHelloInfo) (*tls.Config, error) {
if shouldRejectConnection() {
return nil, errEarlyTLSReject
}
return nil, nil
}
// HTTPHandleRootURL is the http.HandleFunc for requests on `/`. // HTTPHandleRootURL is the http.HandleFunc for requests on `/`.
// It either uses the SocketUpgrader or writes out the BannerHTML. // It either uses the SocketUpgrader or writes out the BannerHTML.
func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) { func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) {
@ -250,21 +275,12 @@ func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) {
if strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") { if strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") {
updateSysMem() updateSysMem()
if Statistics.SysMemFreeKB > 0 && Statistics.SysMemFreeKB < Configuration.MinMemoryKBytes { if shouldRejectConnection() {
w.WriteHeader(503) w.WriteHeader(503)
fmt.Fprint(w, "error: low memory") fmt.Fprint(w, "connection rejected: over capacity")
return return
} }
if Configuration.MaxClientCount != 0 {
curClients := atomic.LoadUint64(&Statistics.CurrentClientCount)
if curClients >= Configuration.MaxClientCount {
w.WriteHeader(503)
fmt.Fprint(w, "error: client limit reached")
return
}
}
conn, err := SocketUpgrader.Upgrade(w, r, nil) conn, err := SocketUpgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
fmt.Fprintf(w, "error: %v", err) fmt.Fprintf(w, "error: %v", err)
@ -365,7 +381,7 @@ func RunSocketConnection(conn *websocket.Conn) {
stoppedChan := make(chan struct{}) stoppedChan := make(chan struct{})
var client ClientInfo var client ClientInfo
client.MessageChannel = _serverMessageChan client.messageChannel = _serverMessageChan
client.RemoteAddr = conn.RemoteAddr() client.RemoteAddr = conn.RemoteAddr()
client.MsgChannelIsDone = stoppedChan client.MsgChannelIsDone = stoppedChan
@ -374,11 +390,7 @@ func RunSocketConnection(conn *websocket.Conn) {
// report.RemoteAddr = client.RemoteAddr // report.RemoteAddr = client.RemoteAddr
conn.SetPongHandler(func(pongBody string) error { conn.SetPongHandler(func(pongBody string) error {
client.Mutex.Lock() _clientChan <- ClientMessage{Command: "__ping"}
if client.HelloOK { // do not accept PONGs until hello sent
client.pingCount = 0
}
client.Mutex.Unlock()
return nil return nil
}) })
@ -428,7 +440,24 @@ func runSocketReader(conn *websocket.Conn, client *ClientInfo, errorChan chan<-
defer close(errorChan) defer close(errorChan)
defer close(clientChan) defer close(clientChan)
for ; err == nil; messageType, packet, err = conn.ReadMessage() { for {
conn.SetReadDeadline(time.Now().Add(1 * time.Minute))
messageType, packet, err = conn.ReadMessage()
// handle ReadDeadline by sending a ping
// writer loop handles repeated ping timeouts
if tmErr, ok := err.(interface {
Timeout() bool
}); ok && tmErr.Timeout() {
select {
case <-stoppedChan:
return
case clientChan <- ClientMessage{Command: "__readTimeout"}:
}
continue // re-set deadline and wait for pong packet
}
if err != nil {
break
}
if messageType == websocket.BinaryMessage { if messageType == websocket.BinaryMessage {
err = &CloseGotBinaryMessage err = &CloseGotBinaryMessage
break break
@ -451,21 +480,26 @@ func runSocketReader(conn *websocket.Conn, client *ClientInfo, errorChan chan<-
} else if msg.MessageID == 0 { } else if msg.MessageID == 0 {
continue continue
} }
select { select {
case clientChan <- msg:
case <-stoppedChan: case <-stoppedChan:
return return
case clientChan <- msg:
} }
} }
select { select {
case errorChan <- err:
case <-stoppedChan: case <-stoppedChan:
case errorChan <- err:
} }
// exit goroutine // exit goroutine
} }
var pingPayload = []byte("PING")
func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan error, clientChan <-chan ClientMessage, serverMessageChan <-chan ClientMessage) websocket.CloseError { func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan error, clientChan <-chan ClientMessage, serverMessageChan <-chan ClientMessage) websocket.CloseError {
lastPacket := time.Now()
for { for {
select { select {
case err := <-errorChan: case err := <-errorChan:
@ -484,9 +518,30 @@ func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan
} }
case msg := <-clientChan: case msg := <-clientChan:
if msg.Command == "__readTimeout" {
// generated on 60 seconds without a message
now := time.Now()
if lastPacket.Add(5 * time.Minute).Before(now) {
return CloseTimedOut
}
conn.WriteControl(
websocket.PingMessage,
pingPayload,
getDeadline(),
)
continue
}
if !client.HelloOK && msg.Command != HelloCommand { if !client.HelloOK && msg.Command != HelloCommand {
return CloseFirstMessageNotHello return CloseFirstMessageNotHello
} }
lastPacket = time.Now()
if msg.Command == "__ping" {
// generated for PONG packets
// want to branch AFTER lastPacket is set
continue
}
for _, char := range msg.Command { for _, char := range msg.Command {
if char == utf8.RuneError { if char == utf8.RuneError {
@ -505,17 +560,6 @@ func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan
} }
SendMessage(conn, msg) SendMessage(conn, msg)
case <-time.After(1 * time.Minute):
client.Mutex.Lock()
client.pingCount++
tooManyPings := client.pingCount == 5
client.Mutex.Unlock()
if tooManyPings {
return CloseTimedOut
} else {
conn.WriteControl(websocket.PingMessage, []byte(strconv.FormatInt(time.Now().Unix(), 10)), getDeadline())
}
case <-StopAcceptingConnectionsCh: case <-StopAcceptingConnectionsCh:
return CloseGoingAway return CloseGoingAway
} }
@ -619,7 +663,6 @@ func MarshalClientMessage(clientMessage interface{}) (int, []byte, error) {
} }
msg = *pMsg msg = *pMsg
} }
var dataStr string
if msg.Command == "" && msg.MessageID == 0 { if msg.Command == "" && msg.MessageID == 0 {
panic("MarshalClientMessage: attempt to send an empty ClientMessage") panic("MarshalClientMessage: attempt to send an empty ClientMessage")
@ -632,20 +675,25 @@ func MarshalClientMessage(clientMessage interface{}) (int, []byte, error) {
msg.MessageID = -1 msg.MessageID = -1
} }
// optimized from fmt.Sprintf("%d %s %s", msg.MessageID, msg.Command, ...)
var buf bytes.Buffer
fmt.Fprint(&buf, msg.MessageID)
buf.WriteByte(' ')
buf.WriteString(string(msg.Command))
if msg.origArguments != "" { if msg.origArguments != "" {
dataStr = fmt.Sprintf("%d %s %s", msg.MessageID, msg.Command, msg.origArguments) buf.WriteByte(' ')
buf.WriteString(msg.origArguments)
} else if msg.Arguments != nil { } else if msg.Arguments != nil {
argBytes, err := json.Marshal(msg.Arguments) argBytes, err := json.Marshal(msg.Arguments)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
buf.WriteByte(' ')
dataStr = fmt.Sprintf("%d %s %s", msg.MessageID, msg.Command, string(argBytes)) buf.Write(argBytes)
} else {
dataStr = fmt.Sprintf("%d %s", msg.MessageID, msg.Command)
} }
return websocket.TextMessage, []byte(dataStr), nil return websocket.TextMessage, buf.Bytes(), nil
} }
// ArgumentsAsString parses the arguments of the ClientMessage as a single string. // ArgumentsAsString parses the arguments of the ClientMessage as a single string.

View file

@ -18,9 +18,11 @@ var ChatSubscriptionLock sync.RWMutex
var GlobalSubscriptionInfo []*ClientInfo var GlobalSubscriptionInfo []*ClientInfo
var GlobalSubscriptionLock sync.RWMutex var GlobalSubscriptionLock sync.RWMutex
// Send a message to the client.
// Drops if buffer is full.
func (client *ClientInfo) Send(msg ClientMessage) bool { func (client *ClientInfo) Send(msg ClientMessage) bool {
select { select {
case client.MessageChannel <- msg: case client.messageChannel <- msg:
return true return true
case <-client.MsgChannelIsDone: case <-client.MsgChannelIsDone:
return false return false

View file

@ -47,13 +47,11 @@ type ConfigFile struct {
ProxyRoutes []ProxyRoute ProxyRoutes []ProxyRoute
} }
type ProxyRoute struct { type ProxyRoute struct {
Route string Route string
Server string Server string
} }
type ClientMessage struct { type ClientMessage struct {
// Message ID. Increments by 1 for each message sent from the client. // Message ID. Increments by 1 for each message sent from the client.
// When replying to a command, the message ID must be echoed. // When replying to a command, the message ID must be echoed.
@ -94,12 +92,6 @@ type AuthInfo struct {
UsernameValidated bool UsernameValidated bool
} }
type ClientVersion struct {
Major int
Minor int
Revision int
}
type ClientInfo struct { type ClientInfo struct {
// The client ID. // The client ID.
// This must be written once by the owning goroutine before the struct is passed off to any other goroutines. // This must be written once by the owning goroutine before the struct is passed off to any other goroutines.
@ -134,17 +126,19 @@ type ClientInfo struct {
ReadyComplete bool ReadyComplete bool
// Server-initiated messages should be sent via the Send() method. // Server-initiated messages should be sent via the Send() method.
MessageChannel chan<- ClientMessage messageChannel chan<- ClientMessage
// Closed when the client is shutting down. // Closed when the client is shutting down.
MsgChannelIsDone <-chan struct{} MsgChannelIsDone <-chan struct{}
// Take out an Add() on this during a command if you need to use the MessageChannel later. // Take out an Add() on this during a command if you need to call Send() later.
MsgChannelKeepalive sync.WaitGroup MsgChannelKeepalive sync.WaitGroup
}
// The number of pings sent without a response. type ClientVersion struct {
// Protected by Mutex Major int
pingCount int Minor int
Revision int
} }
func VersionFromString(v string) ClientVersion { func VersionFromString(v string) ClientVersion {