diff --git a/socketserver/cmd/ffzsocketserver/console.go b/socketserver/cmd/ffzsocketserver/console.go index 793d73d0..21da6e4c 100644 --- a/socketserver/cmd/ffzsocketserver/console.go +++ b/socketserver/cmd/ffzsocketserver/console.go @@ -7,6 +7,7 @@ import ( "github.com/gorilla/websocket" "runtime" "strings" + "strconv" ) func commandLineConsole() { @@ -88,6 +89,41 @@ func commandLineConsole() { return "Usage: authorizeeveryone [ on | off ]", nil }) + shell.Register("kickclients", func(args ...string) (string, error) { + if len(args) == 0 { + return "Please enter either a count or a fraction of clients to kick.", nil + } + input, err := strconv.ParseFloat(args[0], 64) + if err != nil { + return "Argument must be a number", err + } + var count int + if input >= 1 { + count = int(input) + } else { + server.GlobalSubscriptionLock.RLock() + count = int(float64(len(server.GlobalSubscriptionInfo)) * input) + server.GlobalSubscriptionLock.RUnlock() + } + + msg := server.ClientMessage{ Arguments: &server.CloseRebalance } + server.GlobalSubscriptionLock.RLock() + defer server.GlobalSubscriptionLock.RUnlock() + + kickCount := 0 + for i, cl := range server.GlobalSubscriptionInfo { + if i >= count { + break + } + select { + case cl.MessageChannel <- msg: + case <-cl.MsgChannelIsDone: + } + kickCount++ + } + return fmt.Sprintf("Kicked %d clients", kickCount), nil + }) + shell.Register("panic", func(args ...string) (string, error) { go func() { panic("requested panic") diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 4200b758..46c8adfd 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -16,6 +16,9 @@ import ( "sync/atomic" "time" "unicode/utf8" + "os" + "os/signal" + "syscall" ) // SuccessCommand is a Reply Command to indicate success in reply to a C2S Command. @@ -97,6 +100,20 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { go aggregateDataSender() go ircConnection() + go shutdownHandler() +} + +func shutdownHandler() { + ch := make(chan os.Signal) + signal.Notify(ch, syscall.SIGUSR1) + <-ch + log.Println("Shutting down...") + + StopAcceptingConnections = true + close(StopAcceptingConnectionsCh) + + time.Sleep(1*time.Second) + os.Exit(0) } // SocketUpgrader is the websocket.Upgrader currently in use. @@ -112,6 +129,10 @@ var SocketUpgrader = websocket.Upgrader{ // Memes go here. var BannerHTML []byte +// StopAcceptingConnections is closed while the server is shutting down. +var StopAcceptingConnectionsCh = make(chan struct{}) +var StopAcceptingConnections = false + // HTTPHandleRootURL is the http.HandleFunc for requests on `/`. // It either uses the SocketUpgrader or writes out the BannerHTML. func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) { @@ -120,6 +141,14 @@ func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) { fmt.Println(404) return } + + // racy, but should be ok? + if StopAcceptingConnections { + w.WriteHeader(503) + fmt.Fprint(w, "server is shutting down") + return + } + if r.Header.Get("Connection") == "Upgrade" { updateSysMem() @@ -165,6 +194,12 @@ var ErrExpectedStringAndInt = errors.New("Error: Expected array of string, int a // ErrExpectedStringAndIntGotFloat is sent in a ErrorCommand Reply when the Arguments are of the wrong type. var ErrExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.") +// CloseGoingAway is sent when the server is restarting. +var CloseGoingAway = websocket.CloseError{Code: websocket.CloseGoingAway, Text: "server restarting"} + +// CloseRebalance is sent when the server has too many clients and needs to shunt some to another server. +var CloseRebalance = websocket.CloseError{Code: websocket.CloseGoingAway, Text: "kicked for rebalancing, please select a new server"} + // CloseGotBinaryMessage is the termination reason when the client sends a binary websocket frame. var CloseGotBinaryMessage = websocket.CloseError{Code: websocket.CloseUnsupportedData, Text: "got binary packet"} @@ -232,6 +267,10 @@ func RunSocketConnection(conn *websocket.Conn) { var messageType int var packet []byte var err error + + defer close(errorChan) + defer close(clientChan) + for ; err == nil; messageType, packet, err = conn.ReadMessage() { if messageType == websocket.BinaryMessage { err = &CloseGotBinaryMessage @@ -249,8 +288,6 @@ func RunSocketConnection(conn *websocket.Conn) { select { case clientChan <- msg: case <-stoppedChan: - close(errorChan) - close(clientChan) return } } @@ -259,8 +296,6 @@ func RunSocketConnection(conn *websocket.Conn) { case errorChan <- err: case <-stoppedChan: } - close(errorChan) - close(clientChan) // exit goroutine }(_errorChan, _clientChan, stoppedChan) @@ -318,6 +353,10 @@ RunLoop: closeReason = CloseTooManyBufferedMessages break RunLoop } + if cls, ok := msg.Arguments.(*websocket.CloseError); ok { + closeReason = *cls + break RunLoop + } SendMessage(conn, msg) case <-time.After(1 * time.Minute): @@ -331,6 +370,10 @@ RunLoop: } else { conn.WriteControl(websocket.PingMessage, []byte(strconv.FormatInt(time.Now().Unix(), 10)), getDeadline()) } + + case <-StopAcceptingConnectionsCh: + closeReason = CloseGoingAway + break RunLoop } } @@ -343,6 +386,7 @@ RunLoop: } }() + // Closes client.MsgChannelIsDone and also stops the reader thread close(stoppedChan) // Stop getting messages... @@ -356,8 +400,11 @@ RunLoop: // Close the channel so the draining goroutine can finish, too. close(_serverMessageChan) - atomic.AddUint64(&Statistics.ClientDisconnectsTotal, 1) - atomic.AddUint64(&Statistics.CurrentClientCount, ^uint64(0)) + if !StopAcceptingConnections { + // Don't perform high contention operations when server is closing + atomic.AddUint64(&Statistics.ClientDisconnectsTotal, 1) + atomic.AddUint64(&Statistics.CurrentClientCount, ^uint64(0)) + } } func getDeadline() time.Time { diff --git a/socketserver/server/subscriptions.go b/socketserver/server/subscriptions.go index c9e85a1c..3a139165 100644 --- a/socketserver/server/subscriptions.go +++ b/socketserver/server/subscriptions.go @@ -105,6 +105,10 @@ func UnsubscribeSingleChat(client *ClientInfo, channelName string) { // - write lock to SubscriptionInfos // - write lock to ClientInfo func UnsubscribeAll(client *ClientInfo) { + if StopAcceptingConnections { + return // no need to remove from a high-contention list when the server is closing + } + client.Mutex.Lock() client.PendingSubscriptionsBacklog = nil client.PendingSubscriptionsBacklog = nil