1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-06-28 05:15:54 +00:00

Close server on SIGUSR1, 'kickclients' for rebalancing

This commit is contained in:
Kane York 2015-11-19 17:49:48 -08:00
parent 62c9659430
commit 38972364fb
3 changed files with 93 additions and 6 deletions

View file

@ -7,6 +7,7 @@ import (
"github.com/gorilla/websocket"
"runtime"
"strings"
"strconv"
)
func commandLineConsole() {
@ -88,6 +89,41 @@ func commandLineConsole() {
return "Usage: authorizeeveryone [ on | off ]", nil
})
shell.Register("kickclients", func(args ...string) (string, error) {
if len(args) == 0 {
return "Please enter either a count or a fraction of clients to kick.", nil
}
input, err := strconv.ParseFloat(args[0], 64)
if err != nil {
return "Argument must be a number", err
}
var count int
if input >= 1 {
count = int(input)
} else {
server.GlobalSubscriptionLock.RLock()
count = int(float64(len(server.GlobalSubscriptionInfo)) * input)
server.GlobalSubscriptionLock.RUnlock()
}
msg := server.ClientMessage{ Arguments: &server.CloseRebalance }
server.GlobalSubscriptionLock.RLock()
defer server.GlobalSubscriptionLock.RUnlock()
kickCount := 0
for i, cl := range server.GlobalSubscriptionInfo {
if i >= count {
break
}
select {
case cl.MessageChannel <- msg:
case <-cl.MsgChannelIsDone:
}
kickCount++
}
return fmt.Sprintf("Kicked %d clients", kickCount), nil
})
shell.Register("panic", func(args ...string) (string, error) {
go func() {
panic("requested panic")

View file

@ -16,6 +16,9 @@ import (
"sync/atomic"
"time"
"unicode/utf8"
"os"
"os/signal"
"syscall"
)
// SuccessCommand is a Reply Command to indicate success in reply to a C2S Command.
@ -97,6 +100,20 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) {
go aggregateDataSender()
go ircConnection()
go shutdownHandler()
}
func shutdownHandler() {
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGUSR1)
<-ch
log.Println("Shutting down...")
StopAcceptingConnections = true
close(StopAcceptingConnectionsCh)
time.Sleep(1*time.Second)
os.Exit(0)
}
// SocketUpgrader is the websocket.Upgrader currently in use.
@ -112,6 +129,10 @@ var SocketUpgrader = websocket.Upgrader{
// Memes go here.
var BannerHTML []byte
// StopAcceptingConnections is closed while the server is shutting down.
var StopAcceptingConnectionsCh = make(chan struct{})
var StopAcceptingConnections = false
// HTTPHandleRootURL is the http.HandleFunc for requests on `/`.
// It either uses the SocketUpgrader or writes out the BannerHTML.
func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) {
@ -120,6 +141,14 @@ func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) {
fmt.Println(404)
return
}
// racy, but should be ok?
if StopAcceptingConnections {
w.WriteHeader(503)
fmt.Fprint(w, "server is shutting down")
return
}
if r.Header.Get("Connection") == "Upgrade" {
updateSysMem()
@ -165,6 +194,12 @@ var ErrExpectedStringAndInt = errors.New("Error: Expected array of string, int a
// ErrExpectedStringAndIntGotFloat is sent in a ErrorCommand Reply when the Arguments are of the wrong type.
var ErrExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.")
// CloseGoingAway is sent when the server is restarting.
var CloseGoingAway = websocket.CloseError{Code: websocket.CloseGoingAway, Text: "server restarting"}
// CloseRebalance is sent when the server has too many clients and needs to shunt some to another server.
var CloseRebalance = websocket.CloseError{Code: websocket.CloseGoingAway, Text: "kicked for rebalancing, please select a new server"}
// CloseGotBinaryMessage is the termination reason when the client sends a binary websocket frame.
var CloseGotBinaryMessage = websocket.CloseError{Code: websocket.CloseUnsupportedData, Text: "got binary packet"}
@ -232,6 +267,10 @@ func RunSocketConnection(conn *websocket.Conn) {
var messageType int
var packet []byte
var err error
defer close(errorChan)
defer close(clientChan)
for ; err == nil; messageType, packet, err = conn.ReadMessage() {
if messageType == websocket.BinaryMessage {
err = &CloseGotBinaryMessage
@ -249,8 +288,6 @@ func RunSocketConnection(conn *websocket.Conn) {
select {
case clientChan <- msg:
case <-stoppedChan:
close(errorChan)
close(clientChan)
return
}
}
@ -259,8 +296,6 @@ func RunSocketConnection(conn *websocket.Conn) {
case errorChan <- err:
case <-stoppedChan:
}
close(errorChan)
close(clientChan)
// exit goroutine
}(_errorChan, _clientChan, stoppedChan)
@ -318,6 +353,10 @@ RunLoop:
closeReason = CloseTooManyBufferedMessages
break RunLoop
}
if cls, ok := msg.Arguments.(*websocket.CloseError); ok {
closeReason = *cls
break RunLoop
}
SendMessage(conn, msg)
case <-time.After(1 * time.Minute):
@ -331,6 +370,10 @@ RunLoop:
} else {
conn.WriteControl(websocket.PingMessage, []byte(strconv.FormatInt(time.Now().Unix(), 10)), getDeadline())
}
case <-StopAcceptingConnectionsCh:
closeReason = CloseGoingAway
break RunLoop
}
}
@ -343,6 +386,7 @@ RunLoop:
}
}()
// Closes client.MsgChannelIsDone and also stops the reader thread
close(stoppedChan)
// Stop getting messages...
@ -356,8 +400,11 @@ RunLoop:
// Close the channel so the draining goroutine can finish, too.
close(_serverMessageChan)
atomic.AddUint64(&Statistics.ClientDisconnectsTotal, 1)
atomic.AddUint64(&Statistics.CurrentClientCount, ^uint64(0))
if !StopAcceptingConnections {
// Don't perform high contention operations when server is closing
atomic.AddUint64(&Statistics.ClientDisconnectsTotal, 1)
atomic.AddUint64(&Statistics.CurrentClientCount, ^uint64(0))
}
}
func getDeadline() time.Time {

View file

@ -105,6 +105,10 @@ func UnsubscribeSingleChat(client *ClientInfo, channelName string) {
// - write lock to SubscriptionInfos
// - write lock to ClientInfo
func UnsubscribeAll(client *ClientInfo) {
if StopAcceptingConnections {
return // no need to remove from a high-contention list when the server is closing
}
client.Mutex.Lock()
client.PendingSubscriptionsBacklog = nil
client.PendingSubscriptionsBacklog = nil