1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-08-03 08:28:31 +00:00

Add submission of connection reports to logstash

This commit is contained in:
Kane York 2015-12-16 11:15:17 -08:00
parent da57357793
commit 03e6e99cb9
4 changed files with 264 additions and 56 deletions

View file

@ -19,6 +19,8 @@ import (
"syscall"
"time"
"unicode/utf8"
"./logstash"
)
// SuccessCommand is a Reply Command to indicate success in reply to a C2S Command.
@ -251,6 +253,10 @@ func RunSocketConnection(conn *websocket.Conn) {
// Close the connection when we're done.
defer closer()
var report logstash.ConnectionReport
report.ConnectTime = time.Now()
report.RemoteAddr = conn.RemoteAddr()
_clientChan := make(chan ClientMessage)
_serverMessageChan := make(chan ClientMessage, sendMessageBufferLength)
_errorChan := make(chan error)
@ -261,44 +267,6 @@ func RunSocketConnection(conn *websocket.Conn) {
client.RemoteAddr = conn.RemoteAddr()
client.MsgChannelIsDone = stoppedChan
// Launch receiver goroutine
go func(errorChan chan<- error, clientChan chan<- ClientMessage, stoppedChan <-chan struct{}) {
var msg ClientMessage
var messageType int
var packet []byte
var err error
defer close(errorChan)
defer close(clientChan)
for ; err == nil; messageType, packet, err = conn.ReadMessage() {
if messageType == websocket.BinaryMessage {
err = &CloseGotBinaryMessage
break
}
if messageType == websocket.CloseMessage {
err = io.EOF
break
}
UnmarshalClientMessage(packet, messageType, &msg)
if msg.MessageID == 0 {
continue
}
select {
case clientChan <- msg:
case <-stoppedChan:
return
}
}
select {
case errorChan <- err:
case <-stoppedChan:
}
// exit goroutine
}(_errorChan, _clientChan, stoppedChan)
conn.SetPongHandler(func(pongBody string) error {
client.Mutex.Lock()
client.pingCount = 0
@ -307,10 +275,11 @@ func RunSocketConnection(conn *websocket.Conn) {
})
// All set up, now enter the work loop
closeReason := runSocketWriter(_errorChan, _clientChan, _serverMessageChan, conn, &client)
go runSocketReader(conn, _errorChan, _clientChan, stoppedChan)
closeReason := runSocketWriter(conn, &client, _errorChan, _clientChan, _serverMessageChan)
// Exit
CloseConnection(conn, closeReason)
closeConnection(conn, closeReason, &report)
// Launch message draining goroutine - we aren't out of the pub/sub records
go func() {
@ -334,12 +303,50 @@ func RunSocketConnection(conn *websocket.Conn) {
if !StopAcceptingConnections {
// Don't perform high contention operations when server is closing
atomic.AddUint64(&Statistics.ClientDisconnectsTotal, 1)
atomic.AddUint64(&Statistics.CurrentClientCount, ^uint64(0))
atomic.AddUint64(&Statistics.CurrentClientCount, NegativeOne)
}
logstash.Submit(report)
}
func runSocketWriter(errorChan <-chan error, clientChan <-chan ClientMessage, serverMessageChan <-chan ClientMessage, conn *websocket.Conn, client *ClientInfo) websocket.CloseError {
func runSocketReader(conn *websocket.Conn, errorChan chan<- error, clientChan chan<- ClientMessage, stoppedChan <-chan struct{}) {
var msg ClientMessage
var messageType int
var packet []byte
var err error
defer close(errorChan)
defer close(clientChan)
for ; err == nil; messageType, packet, err = conn.ReadMessage() {
if messageType == websocket.BinaryMessage {
err = &CloseGotBinaryMessage
break
}
if messageType == websocket.CloseMessage {
err = io.EOF
break
}
UnmarshalClientMessage(packet, messageType, &msg)
if msg.MessageID == 0 {
continue
}
select {
case clientChan <- msg:
case <-stoppedChan:
return
}
}
select {
case errorChan <- err:
case <-stoppedChan:
}
// exit goroutine
}
func runSocketWriter(conn *websocket.Conn, client *ClientInfo, errorChan <-chan error, clientChan <-chan ClientMessage, serverMessageChan <-chan ClientMessage) websocket.CloseError {
for {
select {
case err := <-errorChan:
@ -400,7 +407,7 @@ func getDeadline() time.Time {
return time.Now().Add(1 * time.Minute)
}
func CloseConnection(conn *websocket.Conn, closeMsg websocket.CloseError) {
func closeConnection(conn *websocket.Conn, closeMsg websocket.CloseError, report *esConnectionReport) {
closeTxt := closeMsg.Text
if strings.Contains(closeTxt, "read: connection reset by peer") {
closeTxt = "read: connection reset by peer"
@ -409,9 +416,10 @@ func CloseConnection(conn *websocket.Conn, closeMsg websocket.CloseError) {
} else if closeMsg.Code == 1001 {
closeTxt = "clean shutdown"
}
// todo kibana cannot analyze these
Statistics.DisconnectCodes[strconv.Itoa(closeMsg.Code)]++
Statistics.DisconnectReasons[closeTxt]++
report.DisconnectCode = closeMsg.Code
report.DisconnectReason = closeTxt
report.DisconnectTime = time.Now()
conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(closeMsg.Code, closeMsg.Text), getDeadline())
conn.Close()

View file

@ -0,0 +1,205 @@
package server
import (
"bytes"
"crypto/rand"
"encoding/base64"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"time"
)
// ID is a 128-bit ID for an elasticsearch document.
// Textually, it is base64-encoded.
// The Next() method increments the ID.
type ID struct {
High uint64
Low uint64
}
// Text converts the ID into a base64 string.
func (id ID) String() string {
var buf bytes.Buffer
buf.Grow(21)
enc := base64.NewEncoder(base64.StdEncoding, &buf)
var bytes [16]byte
binary.LittleEndian.PutUint64(bytes[0:8], id.High)
binary.LittleEndian.PutUint64(bytes[8:16], id.Low)
enc.Write(bytes[:])
enc.Close()
return buf.String()
}
// Next increments the ID and returns the prior state.
// Overflow is not checked because it's a uint64, do you really expect me to overflow that
func (id *ID) Next() ID {
ret := ID{
High: id.High,
Low: id.Low,
}
id.Low++
return ret
}
var idPool = sync.Pool{New: func() interface{} {
var bytes [16]byte
n, err := rand.Reader.Read(bytes[:])
if n != 16 || err != nil {
panic(fmt.Errorf("Short read from crypto/rand: %v", err))
}
return &ID{
High: binary.LittleEndian.Uint64(bytes[0:8]),
Low: binary.LittleEndian.Uint64(bytes[8:16]),
}
}}
func ExampleID_Next() {
id := idPool.Get().(*ID).Next()
fmt.Println(id)
idPool.Put(id)
}
// Report is the interface presented to the Submit() function.
// FillReport() is satisfied by ReportBasic, but ReportType must always be specified.
type Report interface {
FillReport() error
ReportType() string
GetID() string
GetTimestamp() time.Time
}
// ReportBasic is the essential fields of any report.
type ReportBasic struct {
ID string
Timestamp time.Time
Host string
}
// FillReport sets the Host and Timestamp fields.
func (report *ReportBasic) FillReport() error {
report.Host = hostMarker
report.Timestamp = time.Now()
id := idPool.Get().(*ID).Next()
report.ID = id.String()
idPool.Put(id)
return nil
}
func (report *ReportBasic) GetID() string {
return report.ID
}
func (report *ReportBasic) GetTimestamp() time.Time {
return report.Timestamp
}
type ConnectionReport struct {
ReportBasic
ConnectTime time.Time
DisconnectTime time.Time
// calculated
ConnectionDuration time.Duration
DisconnectCode int
DisconnectReason string
RemoteAddr net.Addr
}
// FillReport sets all the calculated fields, and calls esReportBasic.FillReport().
func (report *ConnectionReport) FillReport() error {
report.ReportBasic.FillReport()
report.ConnectionDuration = report.DisconnectTime.Sub(report.ConnectTime)
return nil
}
func (report *ConnectionReport) ReportType() string {
return "conn"
}
var serverPresent bool
var esClient http.Client
var submitChan chan Report
var serverBase, indexPrefix, hostMarker string
func checkServerPresent() {
if serverBase == "" {
serverBase = "http://localhost:9200"
}
if indexPrefix == "" {
indexPrefix = "sockreport"
}
urlHealth := fmt.Sprintf("%s/_cluster/health", serverBase)
resp, err := esClient.Get(urlHealth)
if err == nil {
resp.Body.Close()
serverPresent = true
submitChan = make(chan Report, 8)
go submissionWorker()
} else {
serverPresent = false
}
}
// Setup sets up the global variables for the package.
func Setup(ESServer, ESIndexPrefix, ESHostname string) {
serverBase = ESServer
indexPrefix = ESIndexPrefix
hostMarker = ESHostname
checkServerPresent()
}
// Submit inserts a report into elasticsearch (this is basically a manual logstash).
func Submit(report Report) {
if !serverPresent {
return
}
report.FillReport()
submitChan <- report
}
func submissionWorker() {
for report := range submitChan {
time := report.GetTimestamp()
rType := report.ReportType()
// prefix-type-date
indexName := fmt.Sprintf("%s-%s-%d-%d-%d", indexPrefix, rType, time.Year(), time.Month(), time.Day())
// base/index/type/id
putUrl, err := url.Parse(fmt.Sprintf("%s/%s/%s/%s", serverBase, indexName, rType, report.GetID()))
if err != nil {
panic(fmt.Errorf("logstash: cannot parse url: %v", err))
}
body, err := json.Marshal(report)
if err != nil {
panic(fmt.Errorf("logstash: cannot marshal json: %v", err))
}
req := &http.Request{
Method: "PUT",
URL: putUrl,
Body: ioutil.NopCloser(bytes.NewReader(body)),
}
resp, err := esClient.Do(req)
if err != nil {
// ignore, the show must go on
} else {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
}
}

View file

@ -11,6 +11,8 @@ import (
const CryptoBoxKeyLength = 32
const NegativeOne = ^uint64(0)
type ConfigFile struct {
// Numeric server id known to the backend
ServerID int
@ -121,13 +123,6 @@ type ClientInfo struct {
pingCount int
}
type esReportBasic struct {
Timestamp time.Time
Host string
}
type esDisconnectReport struct {
}
func VersionFromString(v string) ClientVersion {
var cv ClientVersion
fmt.Sscanf(v, "ffz_%d.%d.%d", &cv.Major, &cv.Minor, &cv.Revision)