mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-06-27 21:05:53 +00:00
209 lines
4.6 KiB
Go
209 lines
4.6 KiB
Go
package logstasher
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/rand"
|
|
"encoding/base64"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// ID is a 128-bit ID for an elasticsearch document.
|
|
// Textually, it is base64-encoded.
|
|
// The Next() method increments the ID.
|
|
type ID struct {
|
|
High uint64
|
|
Low uint64
|
|
}
|
|
|
|
// Text converts the ID into a base64 string.
|
|
func (id ID) String() string {
|
|
var buf bytes.Buffer
|
|
buf.Grow(21)
|
|
enc := base64.NewEncoder(base64.StdEncoding, &buf)
|
|
var bytes [16]byte
|
|
binary.LittleEndian.PutUint64(bytes[0:8], id.High)
|
|
binary.LittleEndian.PutUint64(bytes[8:16], id.Low)
|
|
enc.Write(bytes[:])
|
|
enc.Close()
|
|
return buf.String()
|
|
}
|
|
|
|
// Next increments the ID and returns the prior state.
|
|
// Overflow is not checked because it's a uint64, do you really expect me to overflow that
|
|
func (id *ID) Next() ID {
|
|
ret := ID{
|
|
High: id.High,
|
|
Low: id.Low,
|
|
}
|
|
id.Low++
|
|
return ret
|
|
}
|
|
|
|
var idPool = sync.Pool{New: func() interface{} {
|
|
var bytes [16]byte
|
|
n, err := rand.Reader.Read(bytes[:])
|
|
if n != 16 || err != nil {
|
|
panic(fmt.Errorf("Short read from crypto/rand: %v", err))
|
|
}
|
|
|
|
return &ID{
|
|
High: binary.LittleEndian.Uint64(bytes[0:8]),
|
|
Low: binary.LittleEndian.Uint64(bytes[8:16]),
|
|
}
|
|
}}
|
|
|
|
func ExampleID_Next() {
|
|
id := idPool.Get().(*ID).Next()
|
|
fmt.Println(id)
|
|
idPool.Put(id)
|
|
}
|
|
|
|
// Report is the interface presented to the Submit() function.
|
|
// FillReport() is satisfied by ReportBasic, but ReportType must always be specified.
|
|
type Report interface {
|
|
FillReport() error
|
|
ReportType() string
|
|
|
|
GetID() string
|
|
GetTimestamp() time.Time
|
|
}
|
|
|
|
// ReportBasic is the essential fields of any report.
|
|
type ReportBasic struct {
|
|
ID string
|
|
Timestamp time.Time
|
|
Host string
|
|
}
|
|
|
|
// FillReport sets the Host and Timestamp fields.
|
|
func (report *ReportBasic) FillReport() error {
|
|
report.Host = hostMarker
|
|
report.Timestamp = time.Now()
|
|
id := idPool.Get().(*ID).Next()
|
|
report.ID = id.String()
|
|
idPool.Put(id)
|
|
return nil
|
|
}
|
|
|
|
func (report *ReportBasic) GetID() string {
|
|
return report.ID
|
|
}
|
|
|
|
func (report *ReportBasic) GetTimestamp() time.Time {
|
|
return report.Timestamp
|
|
}
|
|
|
|
type ConnectionReport struct {
|
|
ReportBasic
|
|
|
|
ConnectTime time.Time
|
|
DisconnectTime time.Time
|
|
// calculated
|
|
ConnectionDuration time.Duration
|
|
|
|
DisconnectCode int
|
|
DisconnectReason string
|
|
|
|
UsernameWasValidated bool
|
|
|
|
RemoteAddr net.Addr `json:"-"` // not transmitted until I can figure out data minimization
|
|
TwitchUsername string `json:"-"` // also not transmitted
|
|
}
|
|
|
|
// FillReport sets all the calculated fields, and calls esReportBasic.FillReport().
|
|
func (report *ConnectionReport) FillReport() error {
|
|
report.ReportBasic.FillReport()
|
|
report.ConnectionDuration = report.DisconnectTime.Sub(report.ConnectTime)
|
|
return nil
|
|
}
|
|
|
|
func (report *ConnectionReport) ReportType() string {
|
|
return "conn"
|
|
}
|
|
|
|
var serverPresent bool
|
|
var esClient http.Client
|
|
var submitChan chan Report
|
|
var serverBase, indexPrefix, hostMarker string
|
|
|
|
func checkServerPresent() {
|
|
if serverBase == "" {
|
|
serverBase = "http://localhost:9200"
|
|
}
|
|
if indexPrefix == "" {
|
|
indexPrefix = "sockreport"
|
|
}
|
|
|
|
urlHealth := fmt.Sprintf("%s/_cluster/health", serverBase)
|
|
resp, err := esClient.Get(urlHealth)
|
|
if err == nil {
|
|
resp.Body.Close()
|
|
serverPresent = true
|
|
submitChan = make(chan Report, 8)
|
|
fmt.Println("elasticsearch reports enabled")
|
|
go submissionWorker()
|
|
} else {
|
|
serverPresent = false
|
|
}
|
|
}
|
|
|
|
// Setup sets up the global variables for the package.
|
|
func Setup(ESServer, ESIndexPrefix, ESHostname string) {
|
|
serverBase = ESServer
|
|
indexPrefix = ESIndexPrefix
|
|
hostMarker = ESHostname
|
|
checkServerPresent()
|
|
}
|
|
|
|
// Submit inserts a report into elasticsearch (this is basically a manual logstash).
|
|
func Submit(report Report) {
|
|
if !serverPresent {
|
|
return
|
|
}
|
|
|
|
report.FillReport()
|
|
submitChan <- report
|
|
}
|
|
|
|
func submissionWorker() {
|
|
for report := range submitChan {
|
|
time := report.GetTimestamp()
|
|
rType := report.ReportType()
|
|
|
|
// prefix-type-date
|
|
indexName := fmt.Sprintf("%s-%s-%d-%d-%d", indexPrefix, rType, time.Year(), time.Month(), time.Day())
|
|
// base/index/type/id
|
|
putUrl, err := url.Parse(fmt.Sprintf("%s/%s/%s/%s", serverBase, indexName, rType, report.GetID()))
|
|
if err != nil {
|
|
panic(fmt.Errorf("logstash: cannot parse url: %v", err))
|
|
}
|
|
body, err := json.Marshal(report)
|
|
if err != nil {
|
|
panic(fmt.Errorf("logstash: cannot marshal json: %v", err))
|
|
}
|
|
|
|
req := &http.Request{
|
|
Method: "PUT",
|
|
URL: putUrl,
|
|
Body: ioutil.NopCloser(bytes.NewReader(body)),
|
|
}
|
|
|
|
resp, err := esClient.Do(req)
|
|
|
|
if err != nil {
|
|
// ignore, the show must go on
|
|
} else {
|
|
io.Copy(ioutil.Discard, resp.Body)
|
|
resp.Body.Close()
|
|
}
|
|
}
|
|
}
|