diff --git a/socketserver/cmd/ffzsocketserver/console.go b/socketserver/cmd/ffzsocketserver/console.go index 21da6e4c..b3a77b0c 100644 --- a/socketserver/cmd/ffzsocketserver/console.go +++ b/socketserver/cmd/ffzsocketserver/console.go @@ -6,8 +6,8 @@ import ( "github.com/abiosoft/ishell" "github.com/gorilla/websocket" "runtime" - "strings" "strconv" + "strings" ) func commandLineConsole() { @@ -106,7 +106,7 @@ func commandLineConsole() { server.GlobalSubscriptionLock.RUnlock() } - msg := server.ClientMessage{ Arguments: &server.CloseRebalance } + msg := server.ClientMessage{Arguments: &server.CloseRebalance} server.GlobalSubscriptionLock.RLock() defer server.GlobalSubscriptionLock.RUnlock() diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index c70880de..4d96f704 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -19,6 +19,8 @@ import ( "syscall" "time" "unicode/utf8" + + "./logstash" ) // SuccessCommand is a Reply Command to indicate success in reply to a C2S Command. @@ -251,6 +253,10 @@ func RunSocketConnection(conn *websocket.Conn) { // Close the connection when we're done. defer closer() + var report logstash.ConnectionReport + report.ConnectTime = time.Now() + report.RemoteAddr = conn.RemoteAddr() + _clientChan := make(chan ClientMessage) _serverMessageChan := make(chan ClientMessage, sendMessageBufferLength) _errorChan := make(chan error) @@ -261,44 +267,6 @@ func RunSocketConnection(conn *websocket.Conn) { client.RemoteAddr = conn.RemoteAddr() client.MsgChannelIsDone = stoppedChan - // Launch receiver goroutine - go func(errorChan chan<- error, clientChan chan<- ClientMessage, stoppedChan <-chan struct{}) { - var msg ClientMessage - var messageType int - var packet []byte - var err error - - defer close(errorChan) - defer close(clientChan) - - for ; err == nil; messageType, packet, err = conn.ReadMessage() { - if messageType == websocket.BinaryMessage { - err = &CloseGotBinaryMessage - break - } - if messageType == websocket.CloseMessage { - err = io.EOF - break - } - - UnmarshalClientMessage(packet, messageType, &msg) - if msg.MessageID == 0 { - continue - } - select { - case clientChan <- msg: - case <-stoppedChan: - return - } - } - - select { - case errorChan <- err: - case <-stoppedChan: - } - // exit goroutine - }(_errorChan, _clientChan, stoppedChan) - conn.SetPongHandler(func(pongBody string) error { client.Mutex.Lock() client.pingCount = 0 @@ -307,10 +275,11 @@ func RunSocketConnection(conn *websocket.Conn) { }) // All set up, now enter the work loop - closeReason := runSocketWriter(_errorChan, _clientChan, _serverMessageChan, conn, &client) + go runSocketReader(conn, _errorChan, _clientChan, stoppedChan) + closeReason := runSocketWriter(conn, &client, _errorChan, _clientChan, _serverMessageChan) // Exit - CloseConnection(conn, closeReason) + closeConnection(conn, closeReason, &report) // Launch message draining goroutine - we aren't out of the pub/sub records go func() { @@ -334,12 +303,50 @@ func RunSocketConnection(conn *websocket.Conn) { if !StopAcceptingConnections { // Don't perform high contention operations when server is closing - atomic.AddUint64(&Statistics.ClientDisconnectsTotal, 1) - atomic.AddUint64(&Statistics.CurrentClientCount, ^uint64(0)) + atomic.AddUint64(&Statistics.CurrentClientCount, NegativeOne) } + + logstash.Submit(report) } -func runSocketWriter(errorChan <-chan error, clientChan <-chan ClientMessage, serverMessageChan <-chan ClientMessage, conn *websocket.Conn, client *ClientInfo) websocket.CloseError { +func runSocketReader(conn *websocket.Conn, errorChan chan<- error, clientChan chan<- ClientMessage, stoppedChan <-chan struct{}) { + var msg ClientMessage + var messageType int + var packet []byte + var err error + + defer close(errorChan) + defer close(clientChan) + + for ; err == nil; messageType, packet, err = conn.ReadMessage() { + if messageType == websocket.BinaryMessage { + err = &CloseGotBinaryMessage + break + } + if messageType == websocket.CloseMessage { + err = io.EOF + break + } + + UnmarshalClientMessage(packet, messageType, &msg) + if msg.MessageID == 0 { + continue + } + select { + case clientChan <- msg: + case <-stoppedChan: + return + } + } + + select { + case errorChan <- err: + case <-stoppedChan: + } + // exit goroutine +} + +func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan error, clientChan <-chan ClientMessage, serverMessageChan <-chan ClientMessage) websocket.CloseError { for { select { case err := <-errorChan: @@ -400,7 +407,7 @@ func getDeadline() time.Time { return time.Now().Add(1 * time.Minute) } -func CloseConnection(conn *websocket.Conn, closeMsg websocket.CloseError) { +func closeConnection(conn *websocket.Conn, closeMsg websocket.CloseError, report *esConnectionReport) { closeTxt := closeMsg.Text if strings.Contains(closeTxt, "read: connection reset by peer") { closeTxt = "read: connection reset by peer" @@ -409,9 +416,10 @@ func CloseConnection(conn *websocket.Conn, closeMsg websocket.CloseError) { } else if closeMsg.Code == 1001 { closeTxt = "clean shutdown" } - // todo kibana cannot analyze these - Statistics.DisconnectCodes[strconv.Itoa(closeMsg.Code)]++ - Statistics.DisconnectReasons[closeTxt]++ + + report.DisconnectCode = closeMsg.Code + report.DisconnectReason = closeTxt + report.DisconnectTime = time.Now() conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(closeMsg.Code, closeMsg.Text), getDeadline()) conn.Close() diff --git a/socketserver/server/logstash/elasticsearch.go b/socketserver/server/logstash/elasticsearch.go new file mode 100644 index 00000000..b8c88299 --- /dev/null +++ b/socketserver/server/logstash/elasticsearch.go @@ -0,0 +1,205 @@ +package server + +import ( + "bytes" + "crypto/rand" + "encoding/base64" + "encoding/binary" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "sync" + "time" +) + +// ID is a 128-bit ID for an elasticsearch document. +// Textually, it is base64-encoded. +// The Next() method increments the ID. +type ID struct { + High uint64 + Low uint64 +} + +// Text converts the ID into a base64 string. +func (id ID) String() string { + var buf bytes.Buffer + buf.Grow(21) + enc := base64.NewEncoder(base64.StdEncoding, &buf) + var bytes [16]byte + binary.LittleEndian.PutUint64(bytes[0:8], id.High) + binary.LittleEndian.PutUint64(bytes[8:16], id.Low) + enc.Write(bytes[:]) + enc.Close() + return buf.String() +} + +// Next increments the ID and returns the prior state. +// Overflow is not checked because it's a uint64, do you really expect me to overflow that +func (id *ID) Next() ID { + ret := ID{ + High: id.High, + Low: id.Low, + } + id.Low++ + return ret +} + +var idPool = sync.Pool{New: func() interface{} { + var bytes [16]byte + n, err := rand.Reader.Read(bytes[:]) + if n != 16 || err != nil { + panic(fmt.Errorf("Short read from crypto/rand: %v", err)) + } + + return &ID{ + High: binary.LittleEndian.Uint64(bytes[0:8]), + Low: binary.LittleEndian.Uint64(bytes[8:16]), + } +}} + +func ExampleID_Next() { + id := idPool.Get().(*ID).Next() + fmt.Println(id) + idPool.Put(id) +} + +// Report is the interface presented to the Submit() function. +// FillReport() is satisfied by ReportBasic, but ReportType must always be specified. +type Report interface { + FillReport() error + ReportType() string + + GetID() string + GetTimestamp() time.Time +} + +// ReportBasic is the essential fields of any report. +type ReportBasic struct { + ID string + Timestamp time.Time + Host string +} + +// FillReport sets the Host and Timestamp fields. +func (report *ReportBasic) FillReport() error { + report.Host = hostMarker + report.Timestamp = time.Now() + id := idPool.Get().(*ID).Next() + report.ID = id.String() + idPool.Put(id) + return nil +} + +func (report *ReportBasic) GetID() string { + return report.ID +} + +func (report *ReportBasic) GetTimestamp() time.Time { + return report.Timestamp +} + +type ConnectionReport struct { + ReportBasic + + ConnectTime time.Time + DisconnectTime time.Time + // calculated + ConnectionDuration time.Duration + + DisconnectCode int + DisconnectReason string + + RemoteAddr net.Addr +} + +// FillReport sets all the calculated fields, and calls esReportBasic.FillReport(). +func (report *ConnectionReport) FillReport() error { + report.ReportBasic.FillReport() + report.ConnectionDuration = report.DisconnectTime.Sub(report.ConnectTime) + return nil +} + +func (report *ConnectionReport) ReportType() string { + return "conn" +} + +var serverPresent bool +var esClient http.Client +var submitChan chan Report +var serverBase, indexPrefix, hostMarker string + +func checkServerPresent() { + if serverBase == "" { + serverBase = "http://localhost:9200" + } + if indexPrefix == "" { + indexPrefix = "sockreport" + } + + urlHealth := fmt.Sprintf("%s/_cluster/health", serverBase) + resp, err := esClient.Get(urlHealth) + if err == nil { + resp.Body.Close() + serverPresent = true + submitChan = make(chan Report, 8) + go submissionWorker() + } else { + serverPresent = false + } +} + +// Setup sets up the global variables for the package. +func Setup(ESServer, ESIndexPrefix, ESHostname string) { + serverBase = ESServer + indexPrefix = ESIndexPrefix + hostMarker = ESHostname + checkServerPresent() +} + +// Submit inserts a report into elasticsearch (this is basically a manual logstash). +func Submit(report Report) { + if !serverPresent { + return + } + + report.FillReport() + submitChan <- report +} + +func submissionWorker() { + for report := range submitChan { + time := report.GetTimestamp() + rType := report.ReportType() + + // prefix-type-date + indexName := fmt.Sprintf("%s-%s-%d-%d-%d", indexPrefix, rType, time.Year(), time.Month(), time.Day()) + // base/index/type/id + putUrl, err := url.Parse(fmt.Sprintf("%s/%s/%s/%s", serverBase, indexName, rType, report.GetID())) + if err != nil { + panic(fmt.Errorf("logstash: cannot parse url: %v", err)) + } + body, err := json.Marshal(report) + if err != nil { + panic(fmt.Errorf("logstash: cannot marshal json: %v", err)) + } + + req := &http.Request{ + Method: "PUT", + URL: putUrl, + Body: ioutil.NopCloser(bytes.NewReader(body)), + } + + resp, err := esClient.Do(req) + + if err != nil { + // ignore, the show must go on + } else { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + } + } +} diff --git a/socketserver/server/types.go b/socketserver/server/types.go index 6fddf4d9..fe6193ba 100644 --- a/socketserver/server/types.go +++ b/socketserver/server/types.go @@ -11,6 +11,8 @@ import ( const CryptoBoxKeyLength = 32 +const NegativeOne = ^uint64(0) + type ConfigFile struct { // Numeric server id known to the backend ServerID int @@ -121,13 +123,6 @@ type ClientInfo struct { pingCount int } -type esReportBasic struct { - Timestamp time.Time - Host string -} -type esDisconnectReport struct { -} - func VersionFromString(v string) ClientVersion { var cv ClientVersion fmt.Sscanf(v, "ffz_%d.%d.%d", &cv.Major, &cv.Minor, &cv.Revision)