diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index c0527d14..31bbe9d3 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -14,6 +14,7 @@ import ( "strings" "sync" "time" + "sync/atomic" ) // SuccessCommand is a Reply Command to indicate success in reply to a C2S Command. @@ -197,8 +198,8 @@ const sendMessageAbortLength = 50 func RunSocketConnection(conn *websocket.Conn) { // websocket.Conn is a ReadWriteCloser - Statistics.ClientConnectsTotal++ - Statistics.CurrentClientCount++ + atomic.AddUint64(&Statistics.ClientConnectsTotal, 1) + atomic.AddUint64(&Statistics.CurrentClientCount, 1) var _closer sync.Once closer := func() { @@ -271,27 +272,30 @@ func RunSocketConnection(conn *websocket.Conn) { // All set up, now enter the work loop + var closeReason websocket.CloseError + RunLoop: for { select { case err := <-errorChan: if err == io.EOF { - conn.Close() // no need to send a close frame :) - break RunLoop + closeReason = websocket.CloseError{ + Code: websocket.CloseGoingAway, + Text: err.Error(), + } } else if closeMsg, isClose := err.(*websocket.CloseError); isClose { - CloseConnection(conn, closeMsg) + closeReason = *closeMsg } else { - CloseConnection(conn, &websocket.CloseError{ + closeReason = websocket.CloseError{ Code: websocket.CloseInternalServerErr, Text: err.Error(), - }) + } } - break RunLoop case msg := <-clientChan: if client.VersionString == "" && msg.Command != HelloCommand { - CloseConnection(conn, &CloseFirstMessageNotHello) + closeReason = CloseFirstMessageNotHello break RunLoop } @@ -299,7 +303,8 @@ RunLoop: case msg := <-serverMessageChan: if len(serverMessageChan) > sendMessageAbortLength { - CloseConnection(conn, &CloseTooManyBufferedMessages) + closeReason = CloseTooManyBufferedMessages + break RunLoop } SendMessage(conn, msg) @@ -309,7 +314,7 @@ RunLoop: tooManyPings := client.pingCount == 5 client.Mutex.Unlock() if tooManyPings { - CloseConnection(conn, &CloseTimedOut) + closeReason = CloseTimedOut break RunLoop } else { conn.WriteControl(websocket.PingMessage, []byte(strconv.FormatInt(time.Now().Unix(), 10)), getDeadline()) @@ -318,6 +323,7 @@ RunLoop: } // Exit + CloseConnection(conn, closeReason) // Launch message draining goroutine - we aren't out of the pub/sub records go func() { @@ -338,15 +344,15 @@ RunLoop: // Close the channel so the draining goroutine can finish, too. close(_serverMessageChan) - Statistics.ClientDisconnectsTotal++ - Statistics.CurrentClientCount-- + atomic.AddUint64(&Statistics.ClientDisconnectsTotal, 1) + atomic.AddUint64(&Statistics.CurrentClientCount, ^uint64(0)) } func getDeadline() time.Time { return time.Now().Add(1 * time.Minute) } -func CloseConnection(conn *websocket.Conn, closeMsg *websocket.CloseError) { +func CloseConnection(conn *websocket.Conn, closeMsg websocket.CloseError) { Statistics.DisconnectCodes[strconv.Itoa(closeMsg.Code)]++ closeTxt := closeMsg.Text if strings.Contains(closeTxt, "read: connection reset by peer") { diff --git a/socketserver/server/types.go b/socketserver/server/types.go index a3358028..1910c889 100644 --- a/socketserver/server/types.go +++ b/socketserver/server/types.go @@ -28,6 +28,11 @@ type ConfigFile struct { SSLCertificateFile string SSLKeyFile string + UseElasticSearch bool + ESServer string + ESIndexPrefix string + ESHostName string + // Nacl keys OurPrivateKey []byte OurPublicKey []byte @@ -115,6 +120,13 @@ type ClientInfo struct { pingCount int } +type esReportBasic struct { + Timestamp time.Time + Host string +} +type esDisconnectReport struct { +} + func VersionFromString(v string) ClientVersion { var cv ClientVersion fmt.Sscanf(v, "ffz_%d.%d.%d", &cv.Major, &cv.Minor, &cv.Revision)