From 8626b476db7e201bbac5a0bc200d1351ea00aea4 Mon Sep 17 00:00:00 2001 From: Kane York Date: Fri, 15 Sep 2017 16:25:52 -0700 Subject: [PATCH 01/15] msg.Reply(), remove unused parameter names, remove emote_usage --- socketserver/server/commands.go | 116 +++++++------------------------- socketserver/server/types.go | 18 +++++ 2 files changed, 43 insertions(+), 91 deletions(-) diff --git a/socketserver/server/commands.go b/socketserver/server/commands.go index dc400717..8ba96fb9 100644 --- a/socketserver/server/commands.go +++ b/socketserver/server/commands.go @@ -96,7 +96,7 @@ func DispatchC2SCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMess } } -func callHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) { +func callHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (_ ClientMessage, err error) { defer func() { if r := recover(); r != nil { var ok bool @@ -114,7 +114,7 @@ var DebugHello = "" // C2SHello implements the `hello` C2S Command. // It calls SubscribeGlobal() and SubscribeDefaults() with the client, and fills out ClientInfo.Version and ClientInfo.ClientID. -func C2SHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func C2SHello(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (_ ClientMessage, err error) { ary, ok := msg.Arguments.([]interface{}) if !ok { if DebugHello != "" { @@ -177,13 +177,13 @@ func C2SHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg }, nil } -func C2SPing(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func C2SPing(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) { return ClientMessage{ Arguments: float64(time.Now().UnixNano()/1000) / 1000, }, nil } -func C2SSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func C2SSetUser(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) { username, err := msg.ArgumentsAsString() if err != nil { return @@ -206,26 +206,21 @@ func C2SSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rm return ResponseSuccess, nil } -func C2SReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { - // disconnectAt, err := msg.ArgumentsAsInt() - // if err != nil { - // return - // } - +func C2SReady(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) { client.Mutex.Lock() client.ReadyComplete = true client.Mutex.Unlock() client.MsgChannelKeepalive.Add(1) go func() { - client.Send(ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand}) + client.Send(msg.Reply(SuccessCommand, nil)) SendBacklogForNewClient(client) client.MsgChannelKeepalive.Done() }() return ClientMessage{Command: AsyncResponseCommand}, nil } -func C2SSubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func C2SSubscribe(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) { channel, err := msg.ArgumentsAsString() if err != nil { return @@ -252,7 +247,7 @@ func C2SSubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) ( // C2SUnsubscribe implements the `unsub` C2S Command. // It removes the channel from ClientInfo.CurrentChannels and calls UnsubscribeSingleChat. -func C2SUnsubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func C2SUnsubscribe(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) { channel, err := msg.ArgumentsAsString() if err != nil { return @@ -270,9 +265,8 @@ func C2SUnsubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) } // C2SSurvey implements the survey C2S Command. -// Surveys are discarded.s -func C2SSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { - // Discard +func C2SSurvey(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) { + // Surveys are not collected. return ResponseSuccess, nil } @@ -290,7 +284,7 @@ var followEventsLock sync.Mutex // C2STrackFollow implements the `track_follow` C2S Command. // It adds the record to `followEvents`, which is submitted to the backend on a timer. -func C2STrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func C2STrackFollow(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (_ ClientMessage, err error) { channel, following, err := msg.ArgumentsAsStringAndBool() if err != nil { return @@ -317,58 +311,8 @@ var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative") // C2SEmoticonUses implements the `emoticon_uses` C2S Command. // msg.Arguments are in the JSON format of [1]map[emoteID]map[ChatroomName]float64. -func C2SEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { - // if this panics, will be caught by callHandler - mapRoot := msg.Arguments.([]interface{})[0].(map[string]interface{}) - - // Validate: male suire - for strEmote, val1 := range mapRoot { - _, err = strconv.Atoi(strEmote) - if err != nil { - return - } - mapInner := val1.(map[string]interface{}) - for _, val2 := range mapInner { - var count = int(val2.(float64)) - if count <= 0 { - err = ErrNegativeEmoteUsage - return - } - } - } - - aggregateEmoteUsageLock.Lock() - defer aggregateEmoteUsageLock.Unlock() - - var total int - - for strEmote, val1 := range mapRoot { - var emoteID int - emoteID, err = strconv.Atoi(strEmote) - if err != nil { - return - } - - destMapInner, ok := aggregateEmoteUsage[emoteID] - if !ok { - destMapInner = make(map[string]int) - aggregateEmoteUsage[emoteID] = destMapInner - } - - mapInner := val1.(map[string]interface{}) - for roomName, val2 := range mapInner { - var count = int(val2.(float64)) - if count > 200 { - count = 200 - } - roomName = TwitchChannelPool.Intern(roomName) - destMapInner[roomName] += count - total += count - } - } - - Statistics.EmotesReportedTotal += uint64(total) - +func C2SEmoticonUses(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) { + // We do not collect emote usage data return ResponseSuccess, nil } @@ -443,7 +387,7 @@ var bunchGroup singleflight.Group // C2SHandleBunchedCommand handles C2S Commands such as `get_link`. // It makes a request to the backend server for the data, but any other requests coming in while the first is pending also get the responses from the first one. -func C2SHandleBunchedCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) { +func C2SHandleBunchedCommand(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) { key := fmt.Sprintf("%s:%s", msg.Command, msg.origArguments) resultCh := bunchGroup.DoChan(key, func() (interface{}, error) { @@ -453,29 +397,21 @@ func C2SHandleBunchedCommand(conn *websocket.Conn, client *ClientInfo, msg Clien client.MsgChannelKeepalive.Add(1) go func() { result := <-resultCh - var reply ClientMessage - reply.MessageID = msg.MessageID - if result.Err != nil { - reply.Command = ErrorCommand - if efb, ok := result.Err.(ErrForwardedFromBackend); ok { - reply.Arguments = efb.JSONError - } else { - reply.Arguments = result.Err.Error() - } + if efb, ok := result.Err.(ErrForwardedFromBackend); ok { + client.Send(msg.Reply(ErrorCommand, efb.JSONError)) + } else if result.Err != nil { + client.Send(msg.Reply(ErrorCommand, result.Err.Error())) } else { - reply.Command = SuccessCommand - reply.origArguments = result.Val.(string) - reply.parseOrigArguments() + client.Send(msg.ReplyJSON(SuccessCommand, result.Val.(string))) } - client.Send(reply) client.MsgChannelKeepalive.Done() }() return ClientMessage{Command: AsyncResponseCommand}, nil } -func C2SHandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { +func C2SHandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) { client.MsgChannelKeepalive.Add(1) go doRemoteCommand(conn, msg, client) @@ -491,7 +427,7 @@ func doRemoteCommand(conn *websocket.Conn, msg ClientMessage, client *ClientInfo if err == ErrAuthorizationNeeded { if client.TwitchUsername == "" { // Not logged in - client.Send(ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: AuthorizationNeededError}) + client.Send(msg.Reply(ErrorCommand, AuthorizationNeededError)) client.MsgChannelKeepalive.Done() return } @@ -499,19 +435,17 @@ func doRemoteCommand(conn *websocket.Conn, msg ClientMessage, client *ClientInfo if success { doRemoteCommand(conn, msg, client) } else { - client.Send(ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: AuthorizationFailedErrorString}) + client.Send(msg.Reply(ErrorCommand, AuthorizationFailedErrorString)) client.MsgChannelKeepalive.Done() } }) return // without keepalive.Done() } else if bfe, ok := err.(ErrForwardedFromBackend); ok { - client.Send(ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: bfe.JSONError}) + client.Send(msg.Reply(ErrorCommand, bfe.JSONError)) } else if err != nil { - client.Send(ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()}) + client.Send(msg.Reply(ErrorCommand, err.Error())) } else { - msg := ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp} - msg.parseOrigArguments() - client.Send(msg) + client.Send(msg.ReplyJSON(SuccessCommand, resp)) } client.MsgChannelKeepalive.Done() } diff --git a/socketserver/server/types.go b/socketserver/server/types.go index 1048434c..52197790 100644 --- a/socketserver/server/types.go +++ b/socketserver/server/types.go @@ -64,6 +64,24 @@ type ClientMessage struct { origArguments string } +func (cm ClientMessage) Reply(cmd string, args interface{}) ClientMessage { + return ClientMessage{ + MessageID: cm.MessageID, + Command: cmd, + Arguments: args, + } +} + +func (cm ClientMessage) ReplyJSON(cmd string, argsJSON string) ClientMessage { + n := ClientMessage{ + MessageID: cm.MessageID, + Command: cmd, + origArguments: argsJSON, + } + n.parseOrigArguments() + return n +} + type AuthInfo struct { // The client's claimed username on Twitch. TwitchUsername string From 1aa10460737d5d3f7e330a29c59fb16911c7eafe Mon Sep 17 00:00:00 2001 From: Kane York Date: Fri, 15 Sep 2017 16:26:44 -0700 Subject: [PATCH 02/15] Finish removing emote usage data reporting --- socketserver/server/commands.go | 24 +++++++++++------------- socketserver/server/types.go | 6 +++--- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/socketserver/server/commands.go b/socketserver/server/commands.go index 8ba96fb9..4d61a43b 100644 --- a/socketserver/server/commands.go +++ b/socketserver/server/commands.go @@ -2,11 +2,9 @@ package server import ( "encoding/json" - "errors" "fmt" "log" "net/url" - "strconv" "sync" "time" @@ -301,13 +299,13 @@ func C2STrackFollow(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (_ } // AggregateEmoteUsage is a map from emoteID to a map from chatroom name to usage count. -var aggregateEmoteUsage = make(map[int]map[string]int) +//var aggregateEmoteUsage = make(map[int]map[string]int) // AggregateEmoteUsageLock is the lock for AggregateEmoteUsage. -var aggregateEmoteUsageLock sync.Mutex +//var aggregateEmoteUsageLock sync.Mutex // ErrNegativeEmoteUsage is emitted when the submitted emote usage is negative. -var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative") +//var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative") // C2SEmoticonUses implements the `emoticon_uses` C2S Command. // msg.Arguments are in the JSON format of [1]map[emoteID]map[ChatroomName]float64. @@ -329,10 +327,10 @@ func aggregateDataSender_do() { follows := followEvents followEvents = nil followEventsLock.Unlock() - aggregateEmoteUsageLock.Lock() - emoteUsage := aggregateEmoteUsage - aggregateEmoteUsage = make(map[int]map[string]int) - aggregateEmoteUsageLock.Unlock() + //aggregateEmoteUsageLock.Lock() + //emoteUsage := aggregateEmoteUsage + //aggregateEmoteUsage = make(map[int]map[string]int) + //aggregateEmoteUsageLock.Unlock() reportForm := url.Values{} @@ -344,10 +342,10 @@ func aggregateDataSender_do() { } strEmoteUsage := make(map[string]map[string]int) - for emoteID, usageByChannel := range emoteUsage { - strEmoteID := strconv.Itoa(emoteID) - strEmoteUsage[strEmoteID] = usageByChannel - } + //for emoteID, usageByChannel := range emoteUsage { + // strEmoteID := strconv.Itoa(emoteID) + // strEmoteUsage[strEmoteID] = usageByChannel + //} emoteJSON, err := json.Marshal(strEmoteUsage) if err != nil { log.Println("error reporting aggregate data:", err) diff --git a/socketserver/server/types.go b/socketserver/server/types.go index 52197790..3812b045 100644 --- a/socketserver/server/types.go +++ b/socketserver/server/types.go @@ -67,15 +67,15 @@ type ClientMessage struct { func (cm ClientMessage) Reply(cmd string, args interface{}) ClientMessage { return ClientMessage{ MessageID: cm.MessageID, - Command: cmd, + Command: cmd, Arguments: args, } } func (cm ClientMessage) ReplyJSON(cmd string, argsJSON string) ClientMessage { n := ClientMessage{ - MessageID: cm.MessageID, - Command: cmd, + MessageID: cm.MessageID, + Command: cmd, origArguments: argsJSON, } n.parseOrigArguments() From ced892fd1ad10708a68942628ee809db4f0ddbb1 Mon Sep 17 00:00:00 2001 From: Kane York Date: Fri, 15 Sep 2017 16:30:11 -0700 Subject: [PATCH 03/15] Fix more warnings --- socketserver/server/handlecore.go | 5 +++-- socketserver/server/stats.go | 2 +- socketserver/server/usercount.go | 15 ++++----------- socketserver/server/utils.go | 14 -------------- 4 files changed, 8 insertions(+), 28 deletions(-) diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 905dde04..d23afb58 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -505,7 +505,7 @@ func SendMessage(conn *websocket.Conn, msg ClientMessage) { } // UnmarshalClientMessage unpacks websocket TextMessage into a ClientMessage provided in the `v` parameter. -func UnmarshalClientMessage(data []byte, payloadType int, v interface{}) (err error) { +func UnmarshalClientMessage(data []byte, _ int, v interface{}) (err error) { var spaceIdx int out := v.(*ClientMessage) @@ -550,7 +550,8 @@ func (cm *ClientMessage) parseOrigArguments() error { return nil } -func MarshalClientMessage(clientMessage interface{}) (payloadType int, data []byte, err error) { +// returns payloadType, data, err +func MarshalClientMessage(clientMessage interface{}) (int, []byte, error) { var msg ClientMessage var ok bool msg, ok = clientMessage.(ClientMessage) diff --git a/socketserver/server/stats.go b/socketserver/server/stats.go index 8455fa8f..6eec5e93 100644 --- a/socketserver/server/stats.go +++ b/socketserver/server/stats.go @@ -209,7 +209,7 @@ func updateSysMem() { } // HTTPShowStatistics handles the /stats endpoint. It writes out the Statistics object as indented JSON. -func HTTPShowStatistics(w http.ResponseWriter, r *http.Request) { +func HTTPShowStatistics(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") updateStatsIfNeeded() diff --git a/socketserver/server/usercount.go b/socketserver/server/usercount.go index 1b9cbcbb..25fae97a 100644 --- a/socketserver/server/usercount.go +++ b/socketserver/server/usercount.go @@ -46,11 +46,6 @@ var uniqueCtrWritingToken chan usageToken var CounterLocation *time.Location = time.FixedZone("UTC-5", int((time.Hour*-5)/time.Second)) -func TruncateToMidnight(at time.Time) time.Time { - year, month, day := at.Date() - return time.Date(year, month, day, 0, 0, 0, 0, CounterLocation) -} - // GetCounterPeriod calculates the start and end timestamps for the HLL measurement period that includes the 'at' timestamp. func GetCounterPeriod(at time.Time) (start time.Time, end time.Time) { year, month, day := at.Date() @@ -132,7 +127,7 @@ func HTTPShowHLL(w http.ResponseWriter, r *http.Request) { hllFileServer.ServeHTTP(w, r) } -func HTTPWriteHLL(w http.ResponseWriter, r *http.Request) { +func HTTPWriteHLL(w http.ResponseWriter, _ *http.Request) { writeHLL() w.WriteHeader(200) w.Write([]byte("ok")) @@ -150,9 +145,7 @@ func loadUniqueUsers() { now := time.Now().In(CounterLocation) uniqueCounter.Start, uniqueCounter.End = GetCounterPeriod(now) err = loadHLL(now, &uniqueCounter) - isIgnorableError := err != nil && (false || - (os.IsNotExist(err)) || - (err == io.EOF)) + isIgnorableError := err != nil && (os.IsNotExist(err) || err == io.EOF) if isIgnorableError { // file didn't finish writing @@ -227,10 +220,10 @@ func rolloverCounters_do() { // Attempt to rescue the data into the log var buf bytes.Buffer - bytes, err := uniqueCounter.Counter.GobEncode() + by, err := uniqueCounter.Counter.GobEncode() if err == nil { enc := base64.NewEncoder(base64.StdEncoding, &buf) - enc.Write(bytes) + enc.Write(by) enc.Close() log.Print("data for ", GetHLLFilename(uniqueCounter.Start), ":", buf.String()) } diff --git a/socketserver/server/utils.go b/socketserver/server/utils.go index 6657dd02..a91feda4 100644 --- a/socketserver/server/utils.go +++ b/socketserver/server/utils.go @@ -161,17 +161,3 @@ func RemoveFromSliceCl(ary *[]*ClientInfo, val *ClientInfo) bool { *ary = slice return true } - -func AddToSliceB(ary *[]bunchSubscriber, client *ClientInfo, mid int) bool { - newSub := bunchSubscriber{Client: client, MessageID: mid} - slice := *ary - for _, v := range slice { - if v == newSub { - return false - } - } - - slice = append(slice, newSub) - *ary = slice - return true -} From 1c55e8fca7c792f6e737399469ac056da73d592a Mon Sep 17 00:00:00 2001 From: Kane York Date: Fri, 15 Sep 2017 16:40:40 -0700 Subject: [PATCH 04/15] Extract form sealing to a package --- socketserver/server/backend.go | 14 ++-- socketserver/server/backend_test.go | 4 +- socketserver/server/commands.go | 6 +- socketserver/server/handlecore.go | 2 +- socketserver/server/naclform/seal.go | 96 +++++++++++++++++++++++++++ socketserver/server/publisher.go | 8 +-- socketserver/server/testinfra_test.go | 8 +-- socketserver/server/types.go | 4 +- socketserver/server/utils.go | 94 -------------------------- 9 files changed, 119 insertions(+), 117 deletions(-) create mode 100644 socketserver/server/naclform/seal.go diff --git a/socketserver/server/backend.go b/socketserver/server/backend.go index 5bae575d..73b002c1 100644 --- a/socketserver/server/backend.go +++ b/socketserver/server/backend.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/naclform" cache "github.com/patrickmn/go-cache" "golang.org/x/crypto/nacl/box" ) @@ -33,8 +34,7 @@ type backendInfo struct { addTopicURL string announceStartupURL string - sharedKey [32]byte - serverID int + secureForm naclform.ServerInfo lastSuccess map[string]time.Time lastSuccessLock sync.Mutex @@ -45,7 +45,7 @@ var Backend *backendInfo func setupBackend(config *ConfigFile) *backendInfo { b := new(backendInfo) Backend = b - b.serverID = config.ServerID + b.secureForm.ServerID = config.ServerID b.HTTPClient.Timeout = 60 * time.Second b.baseURL = config.BackendURL @@ -68,7 +68,7 @@ func setupBackend(config *ConfigFile) *backendInfo { copy(theirPublic[:], config.BackendPublicKey) copy(ourPrivate[:], config.OurPrivateKey) - box.Precompute(&b.sharedKey, &theirPublic, &ourPrivate) + box.Precompute(&b.secureForm.SharedKey, &theirPublic, &ourPrivate) return b } @@ -119,7 +119,7 @@ func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth A formData.Set("authenticated", "0") } - sealedForm, err := backend.SealRequest(formData) + sealedForm, err := backend.secureForm.Seal(formData) if err != nil { return "", err } @@ -171,7 +171,7 @@ func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth A // SendAggregatedData sends aggregated emote usage and following data to the backend server. func (backend *backendInfo) SendAggregatedData(form url.Values) error { - sealedForm, err := backend.SealRequest(form) + sealedForm, err := backend.secureForm.Seal(form) if err != nil { return err } @@ -228,7 +228,7 @@ func (backend *backendInfo) sendTopicNotice(topic string, added bool) error { formData.Set("added", "f") } - sealedForm, err := backend.SealRequest(formData) + sealedForm, err := backend.secureForm.Seal(formData) if err != nil { return err } diff --git a/socketserver/server/backend_test.go b/socketserver/server/backend_test.go index d1c85adb..0e9d2d6f 100644 --- a/socketserver/server/backend_test.go +++ b/socketserver/server/backend_test.go @@ -18,14 +18,14 @@ func TestSealRequest(t *testing.T) { "QuickBrownFox": []string{"LazyDog"}, } - sealedValues, err := b.SealRequest(values) + sealedValues, err := b.secureForm.Seal(values) if err != nil { t.Fatal(err) } // sealedValues.Encode() // id=0&msg=KKtbng49dOLLyjeuX5AnXiEe6P0uZwgeP_7mMB5vhP-wMAAPZw%3D%3D&nonce=-wRbUnifscisWUvhm3gBEXHN5QzrfzgV - unsealedValues, err := b.UnsealRequest(sealedValues) + unsealedValues, err := b.secureForm.Unseal(sealedValues) if err != nil { t.Fatal(err) } diff --git a/socketserver/server/commands.go b/socketserver/server/commands.go index 4d61a43b..64b1ddd7 100644 --- a/socketserver/server/commands.go +++ b/socketserver/server/commands.go @@ -184,7 +184,7 @@ func C2SPing(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) func C2SSetUser(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) { username, err := msg.ArgumentsAsString() if err != nil { - return + return ClientMessage{}, err } username = copyString(username) @@ -221,7 +221,7 @@ func C2SReady(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientM func C2SSubscribe(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) { channel, err := msg.ArgumentsAsString() if err != nil { - return + return ClientMessage{}, err } channel = PubSubChannelPool.Intern(channel) @@ -248,7 +248,7 @@ func C2SSubscribe(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (Cli func C2SUnsubscribe(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) { channel, err := msg.ArgumentsAsString() if err != nil { - return + return ClientMessage{}, err } channel = PubSubChannelPool.Intern(channel) diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index d23afb58..5dd515da 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -104,7 +104,7 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { serveMux.HandleFunc("/cached_pub", HTTPBackendCachedPublish) serveMux.HandleFunc("/get_sub_count", HTTPGetSubscriberCount) - announceForm, err := Backend.SealRequest(url.Values{ + announceForm, err := Backend.secureForm.Seal(url.Values{ "startup": []string{"1"}, }) if err != nil { diff --git a/socketserver/server/naclform/seal.go b/socketserver/server/naclform/seal.go new file mode 100644 index 00000000..ef21ae35 --- /dev/null +++ b/socketserver/server/naclform/seal.go @@ -0,0 +1,96 @@ +package naclform + +import ( + "bytes" + "crypto/rand" + "encoding/base64" + "errors" + "net/url" + "strconv" + "strings" + + "golang.org/x/crypto/nacl/box" +) + +var ErrorShortNonce = errors.New("Nonce too short.") +var ErrorInvalidSignature = errors.New("Invalid signature or contents") + +type ServerInfo struct { + SharedKey [32]byte + ServerID int +} + +func fillCryptoRandom(buf []byte) error { + remaining := len(buf) + for remaining > 0 { + count, err := rand.Read(buf) + if err != nil { + return err + } + remaining -= count + } + return nil +} + +func (i *ServerInfo) Seal(form url.Values) (url.Values, error) { + var nonce [24]byte + var err error + + err = fillCryptoRandom(nonce[:]) + if err != nil { + return nil, err + } + + cipherMsg := box.SealAfterPrecomputation(nil, []byte(form.Encode()), &nonce, &i.SharedKey) + + bufMessage := new(bytes.Buffer) + enc := base64.NewEncoder(base64.URLEncoding, bufMessage) + enc.Write(cipherMsg) + enc.Close() + cipherString := bufMessage.String() + + bufNonce := new(bytes.Buffer) + enc = base64.NewEncoder(base64.URLEncoding, bufNonce) + enc.Write(nonce[:]) + enc.Close() + nonceString := bufNonce.String() + + retval := url.Values{ + "nonce": []string{nonceString}, + "msg": []string{cipherString}, + "id": []string{strconv.Itoa(i.ServerID)}, + } + + return retval, nil +} + +func (i *ServerInfo) Unseal(form url.Values) (url.Values, error) { + var nonce [24]byte + + nonceString := form.Get("nonce") + dec := base64.NewDecoder(base64.URLEncoding, strings.NewReader(nonceString)) + count, err := dec.Read(nonce[:]) + if err != nil { + return nil, err + } + if count != 24 { + return nil, ErrorShortNonce + } + + cipherString := form.Get("msg") + dec = base64.NewDecoder(base64.URLEncoding, strings.NewReader(cipherString)) + cipherBuffer := new(bytes.Buffer) + cipherBuffer.ReadFrom(dec) + + message, ok := box.OpenAfterPrecomputation(nil, cipherBuffer.Bytes(), &nonce, &i.SharedKey) + if !ok { + return nil, ErrorInvalidSignature + } + + retValues, err := url.ParseQuery(string(message)) + if err != nil { + return nil, ErrorInvalidSignature + } + + return retValues, nil +} diff --git a/socketserver/server/publisher.go b/socketserver/server/publisher.go index a1815a63..669aa727 100644 --- a/socketserver/server/publisher.go +++ b/socketserver/server/publisher.go @@ -123,7 +123,7 @@ func saveLastMessage(cmd Command, channel string, expires time.Time, data string func HTTPBackendDropBacklog(w http.ResponseWriter, r *http.Request) { r.ParseForm() - formData, err := Backend.UnsealRequest(r.Form) + formData, err := Backend.secureForm.Unseal(r.Form) if err != nil { w.WriteHeader(403) fmt.Fprintf(w, "Error: %v", err) @@ -160,7 +160,7 @@ func rateLimitFromRequest(r *http.Request) (rate.Limiter, error) { // If the 'expires' parameter is not specified, the message will not expire (though it is only kept in-memory). func HTTPBackendCachedPublish(w http.ResponseWriter, r *http.Request) { r.ParseForm() - formData, err := Backend.UnsealRequest(r.Form) + formData, err := Backend.secureForm.Unseal(r.Form) if err != nil { w.WriteHeader(403) fmt.Fprintf(w, "Error: %v", err) @@ -227,7 +227,7 @@ func HTTPBackendCachedPublish(w http.ResponseWriter, r *http.Request) { // If "scope" is "global", then "channel" is not used. func HTTPBackendUncachedPublish(w http.ResponseWriter, r *http.Request) { r.ParseForm() - formData, err := Backend.UnsealRequest(r.Form) + formData, err := Backend.secureForm.Unseal(r.Form) if err != nil { w.WriteHeader(403) fmt.Fprintf(w, "Error: %v", err) @@ -292,7 +292,7 @@ func HTTPBackendUncachedPublish(w http.ResponseWriter, r *http.Request) { // A "global" option is not available, use fetch(/stats).CurrentClientCount instead. func HTTPGetSubscriberCount(w http.ResponseWriter, r *http.Request) { r.ParseForm() - formData, err := Backend.UnsealRequest(r.Form) + formData, err := Backend.secureForm.Unseal(r.Form) if err != nil { w.WriteHeader(403) fmt.Fprintf(w, "Error: %v", err) diff --git a/socketserver/server/testinfra_test.go b/socketserver/server/testinfra_test.go index 5d8a4570..529214d9 100644 --- a/socketserver/server/testinfra_test.go +++ b/socketserver/server/testinfra_test.go @@ -97,7 +97,7 @@ func (er *TExpectedBackendRequest) String() string { if MethodIsPost == "" { return er.Path } - return fmt.Sprint("%s %s: %s", MethodIsPost, er.Path, er.PostForm.Encode()) + return fmt.Sprintf("%s %s: %s", MethodIsPost, er.Path, er.PostForm.Encode()) } type TBackendRequestChecker struct { @@ -123,7 +123,7 @@ func (backend *TBackendRequestChecker) ServeHTTP(w http.ResponseWriter, r *http. r.ParseForm() - unsealedForm, err := Backend.UnsealRequest(r.PostForm) + unsealedForm, err := Backend.secureForm.Unseal(r.PostForm) if err != nil { backend.tb.Errorf("Failed to unseal backend request: %v", err) } @@ -276,7 +276,7 @@ func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments in } form.Set("time", strconv.FormatInt(time.Now().Unix(), 10)) - sealed, err := Backend.SealRequest(form) + sealed, err := Backend.secureForm.Seal(form) if err != nil { tb.Error(err) return nil, err @@ -300,7 +300,7 @@ func TSealForUncachedPubMsg(tb testing.TB, cmd Command, channel string, argument form.Set("time", time.Now().Format(time.UnixDate)) form.Set("scope", scope) - sealed, err := Backend.SealRequest(form) + sealed, err := Backend.secureForm.Seal(form) if err != nil { tb.Error(err) return nil, err diff --git a/socketserver/server/types.go b/socketserver/server/types.go index 3812b045..3de13127 100644 --- a/socketserver/server/types.go +++ b/socketserver/server/types.go @@ -64,7 +64,7 @@ type ClientMessage struct { origArguments string } -func (cm ClientMessage) Reply(cmd string, args interface{}) ClientMessage { +func (cm ClientMessage) Reply(cmd Command, args interface{}) ClientMessage { return ClientMessage{ MessageID: cm.MessageID, Command: cmd, @@ -72,7 +72,7 @@ func (cm ClientMessage) Reply(cmd string, args interface{}) ClientMessage { } } -func (cm ClientMessage) ReplyJSON(cmd string, argsJSON string) ClientMessage { +func (cm ClientMessage) ReplyJSON(cmd Command, argsJSON string) ClientMessage { n := ClientMessage{ MessageID: cm.MessageID, Command: cmd, diff --git a/socketserver/server/utils.go b/socketserver/server/utils.go index a91feda4..cbbd760f 100644 --- a/socketserver/server/utils.go +++ b/socketserver/server/utils.go @@ -1,103 +1,9 @@ package server -import ( - "bytes" - "crypto/rand" - "encoding/base64" - "errors" - "net/url" - "strconv" - "strings" - - "golang.org/x/crypto/nacl/box" -) - -func FillCryptoRandom(buf []byte) error { - remaining := len(buf) - for remaining > 0 { - count, err := rand.Read(buf) - if err != nil { - return err - } - remaining -= count - } - return nil -} - func copyString(s string) string { return string([]byte(s)) } -func (backend *backendInfo) SealRequest(form url.Values) (url.Values, error) { - var nonce [24]byte - var err error - - err = FillCryptoRandom(nonce[:]) - if err != nil { - return nil, err - } - - cipherMsg := box.SealAfterPrecomputation(nil, []byte(form.Encode()), &nonce, &backend.sharedKey) - - bufMessage := new(bytes.Buffer) - enc := base64.NewEncoder(base64.URLEncoding, bufMessage) - enc.Write(cipherMsg) - enc.Close() - cipherString := bufMessage.String() - - bufNonce := new(bytes.Buffer) - enc = base64.NewEncoder(base64.URLEncoding, bufNonce) - enc.Write(nonce[:]) - enc.Close() - nonceString := bufNonce.String() - - retval := url.Values{ - "nonce": []string{nonceString}, - "msg": []string{cipherString}, - "id": []string{strconv.Itoa(Backend.serverID)}, - } - - return retval, nil -} - -var ErrorShortNonce = errors.New("Nonce too short.") -var ErrorInvalidSignature = errors.New("Invalid signature or contents") - -func (backend *backendInfo) UnsealRequest(form url.Values) (url.Values, error) { - var nonce [24]byte - - nonceString := form.Get("nonce") - dec := base64.NewDecoder(base64.URLEncoding, strings.NewReader(nonceString)) - count, err := dec.Read(nonce[:]) - if err != nil { - Statistics.BackendVerifyFails++ - return nil, err - } - if count != 24 { - Statistics.BackendVerifyFails++ - return nil, ErrorShortNonce - } - - cipherString := form.Get("msg") - dec = base64.NewDecoder(base64.URLEncoding, strings.NewReader(cipherString)) - cipherBuffer := new(bytes.Buffer) - cipherBuffer.ReadFrom(dec) - - message, ok := box.OpenAfterPrecomputation(nil, cipherBuffer.Bytes(), &nonce, &backend.sharedKey) - if !ok { - Statistics.BackendVerifyFails++ - return nil, ErrorInvalidSignature - } - - retValues, err := url.ParseQuery(string(message)) - if err != nil { - Statistics.BackendVerifyFails++ - return nil, ErrorInvalidSignature - } - - return retValues, nil -} - func AddToSliceS(ary *[]string, val string) bool { slice := *ary for _, v := range slice { From 0744019555acd46783b910991816262277957acc Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 18 Sep 2017 14:54:09 -0700 Subject: [PATCH 05/15] Handle errors in UnmarshalClientMessage --- socketserver/server/handlecore.go | 35 ++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 1328a523..ea4847ef 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -249,11 +249,21 @@ func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) { } } +type fatalDecodeError string + +func (e fatalDecodeError) Error() string { + return string(e) +} + +func (e fatalDecodeError) IsFatal() bool { + return true +} + // ErrProtocolGeneric is sent in a ErrorCommand Reply. -var ErrProtocolGeneric error = errors.New("FFZ Socket protocol error.") +var ErrProtocolGeneric error = fatalDecodeError("FFZ Socket protocol error.") // ErrProtocolNegativeMsgID is sent in a ErrorCommand Reply when a negative MessageID is received. -var ErrProtocolNegativeMsgID error = errors.New("FFZ Socket protocol error: negative or zero message ID.") +var ErrProtocolNegativeMsgID error = fatalDecodeError("FFZ Socket protocol error: negative or zero message ID.") // ErrExpectedSingleString is sent in a ErrorCommand Reply when the Arguments are of the wrong type. var ErrExpectedSingleString = errors.New("Error: Expected single string as arguments.") @@ -344,7 +354,7 @@ func RunSocketConnection(conn *websocket.Conn) { }) // All set up, now enter the work loop - go runSocketReader(conn, _errorChan, _clientChan, stoppedChan) + go runSocketReader(conn, &client, _errorChan, _clientChan) closeReason := runSocketWriter(conn, &client, _errorChan, _clientChan, _serverMessageChan) // Exit @@ -376,12 +386,14 @@ func RunSocketConnection(conn *websocket.Conn) { } } -func runSocketReader(conn *websocket.Conn, errorChan chan<- error, clientChan chan<- ClientMessage, stoppedChan <-chan struct{}) { +func runSocketReader(conn *websocket.Conn, client *ClientInfo, errorChan chan<- error, clientChan chan<- ClientMessage) { var msg ClientMessage var messageType int var packet []byte var err error + stoppedChan := client.MsgChannelIsDone + defer close(errorChan) defer close(clientChan) @@ -395,8 +407,15 @@ func runSocketReader(conn *websocket.Conn, errorChan chan<- error, clientChan ch break } - UnmarshalClientMessage(packet, messageType, &msg) - if msg.MessageID == 0 { + msg = ClientMessage{} + msgErr := UnmarshalClientMessage(packet, messageType, &msg) + if _, ok := msgErr.(interface{IsFatal() bool}); ok { + errorChan <- msgErr + continue + } else if msgErr != nil { + client.Send(msg.Reply(ErrorCommand, msgErr.Error())) + continue + } else if msg.MessageID == 0 { continue } select { @@ -514,11 +533,11 @@ func UnmarshalClientMessage(data []byte, _ int, v interface{}) (err error) { // Message ID spaceIdx = strings.IndexRune(dataStr, ' ') if spaceIdx == -1 { - return ErrProtocolGeneric + return ErrProtocolGeneric // fatal error } messageID, err := strconv.Atoi(dataStr[:spaceIdx]) if messageID < -1 || messageID == 0 { - return ErrProtocolNegativeMsgID + return ErrProtocolNegativeMsgID // fatal error } out.MessageID = messageID From 230f4e9ccdb02e0aecc6a26c5d44dab3339498e7 Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 18 Sep 2017 15:11:13 -0700 Subject: [PATCH 06/15] test: Ignore empty frames in Unmarshal() --- socketserver/server/handlecore.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index ea4847ef..43c50d0b 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -530,6 +530,10 @@ func UnmarshalClientMessage(data []byte, _ int, v interface{}) (err error) { out := v.(*ClientMessage) dataStr := string(data) + if len(dataStr) == 0 { + out.MessageID = 0 + return nil // test: ignore empty frames + } // Message ID spaceIdx = strings.IndexRune(dataStr, ' ') if spaceIdx == -1 { From b108177942018f60e78709af4f3076334af71296 Mon Sep 17 00:00:00 2001 From: Kane York Date: Thu, 21 Sep 2017 10:33:06 -0700 Subject: [PATCH 07/15] Add /all_topics endpoint Temporarily not protected with sealing, we'll see how the performance is first. --- socketserver/server/handlecore.go | 1 + socketserver/server/publisher.go | 20 ++++++++++++++++++++ socketserver/server/subscriptions.go | 14 ++++++++++++++ 3 files changed, 35 insertions(+) diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 43c50d0b..b78f76a6 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -103,6 +103,7 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) { serveMux.HandleFunc("/uncached_pub", HTTPBackendUncachedPublish) serveMux.HandleFunc("/cached_pub", HTTPBackendCachedPublish) serveMux.HandleFunc("/get_sub_count", HTTPGetSubscriberCount) + serveMux.HandleFunc("/all_topics", HTTPListAllTopics) announceForm, err := Backend.secureForm.Seal(url.Values{ "startup": []string{"1"}, diff --git a/socketserver/server/publisher.go b/socketserver/server/publisher.go index 669aa727..ffde2727 100644 --- a/socketserver/server/publisher.go +++ b/socketserver/server/publisher.go @@ -1,6 +1,7 @@ package server import ( + "encoding/json" "fmt" "net/http" "strconv" @@ -10,6 +11,7 @@ import ( "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/rate" "github.com/pkg/errors" + "golang.org/x/sync/singleflight" ) // LastSavedMessage contains a reply to a command along with an expiration time. @@ -25,6 +27,8 @@ type LastSavedMessage struct { var CachedLastMessages = make(map[Command]map[string]LastSavedMessage) var CachedLSMLock sync.RWMutex +var singleFlighter singleflight.Group + func cachedMessageJanitor() { for { time.Sleep(1 * time.Hour) @@ -303,3 +307,19 @@ func HTTPGetSubscriberCount(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, CountSubscriptions(strings.Split(channel, ","))) } + +func HTTPListAllTopics(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + _, err := Backend.secureForm.Unseal(r.Form) + if err != nil { + //w.WriteHeader(403) + //fmt.Fprintf(w, "Error: %v", err) + //return + } + + topicList, _, _ := singleFlighter.Do("/all_topics", func() (interface{}, error) { + return GetAllTopics(), nil + }) + w.WriteHeader(200) + json.NewEncoder(w).Encode(topicList) +} diff --git a/socketserver/server/subscriptions.go b/socketserver/server/subscriptions.go index 392e08aa..007aea03 100644 --- a/socketserver/server/subscriptions.go +++ b/socketserver/server/subscriptions.go @@ -47,6 +47,20 @@ func CountSubscriptions(channels []string) int { return count } +func GetAllTopics() []string { + ChatSubscriptionLock.RLock() + defer ChatSubscriptionLock.RUnlock() + + count := len(ChatSubscriptionInfo) + list := make([]string, count) + i := 0 + for topicName := range ChatSubscriptionInfo { + list[i] = topicName + i++ + } + return list +} + func SubscribeChannel(client *ClientInfo, channelName string) { ChatSubscriptionLock.RLock() _subscribeWhileRlocked(channelName, client) From 44249a3721875860ed7015e9f6869fd5504b91b8 Mon Sep 17 00:00:00 2001 From: Kane York Date: Thu, 21 Sep 2017 14:33:06 -0700 Subject: [PATCH 08/15] Better shutdown code, release listeners --- .../cmd/ffzsocketserver/socketserver.go | 59 +++++++++++++++++-- socketserver/server/backend.go | 4 +- socketserver/server/handlecore.go | 43 +++++--------- 3 files changed, 73 insertions(+), 33 deletions(-) diff --git a/socketserver/cmd/ffzsocketserver/socketserver.go b/socketserver/cmd/ffzsocketserver/socketserver.go index 312d38c5..cc28b327 100644 --- a/socketserver/cmd/ffzsocketserver/socketserver.go +++ b/socketserver/cmd/ffzsocketserver/socketserver.go @@ -1,6 +1,7 @@ package main // import "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/cmd/ffzsocketserver" import ( + "context" "encoding/json" "flag" "fmt" @@ -8,6 +9,10 @@ import ( "log" "net/http" "os" + "os/signal" + "sync" + "syscall" + "time" "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server" ) @@ -56,17 +61,63 @@ func main() { go commandLineConsole() + var server1, server2 *http.Server + + stopSig := make(chan os.Signal, 3) + signal.Notify(stopSig, os.Interrupt) + signal.Notify(stopSig, syscall.SIGUSR1) + signal.Notify(stopSig, syscall.SIGTERM) + if conf.UseSSL { + server1 = &http.Server{ + Addr: conf.SSLListenAddr, + Handler: http.DefaultServeMux, + } go func() { - if err := http.ListenAndServeTLS(conf.SSLListenAddr, conf.SSLCertificateFile, conf.SSLKeyFile, http.DefaultServeMux); err != nil { - log.Fatal("ListenAndServeTLS: ", err) + if err := server1.ListenAndServeTLS(conf.SSLCertificateFile, conf.SSLKeyFile); err != nil { + log.Println("ListenAndServeTLS:", err) + stopSig <- os.Interrupt } }() } - if err = http.ListenAndServe(conf.ListenAddr, http.DefaultServeMux); err != nil { - log.Fatal("ListenAndServe: ", err) + if true { + server2 = &http.Server{ + Addr: conf.ListenAddr, + Handler: http.DefaultServeMux, + } + go func() { + if err := server2.ListenAndServe(); err != nil { + log.Println("ListenAndServe: ", err) + stopSig <- os.Interrupt + } + }() + } + + <-stopSig + log.Println("Shutting down...") + + var wg sync.WaitGroup + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + wg.Add(1) + go func() { + defer wg.Done() + if conf.UseSSL { + server1.Shutdown(ctx) + } + }() + wg.Add(1) + go func() { + defer wg.Done() + server2.Shutdown(ctx) + }() + server.Shutdown(&wg) + + time.Sleep(1 * time.Second) + wg.Wait() } func generateKeys(outputFile string) { diff --git a/socketserver/server/backend.go b/socketserver/server/backend.go index a3826284..6b4643e8 100644 --- a/socketserver/server/backend.go +++ b/socketserver/server/backend.go @@ -141,9 +141,11 @@ func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth A return "", ErrAuthorizationNeeded } else if resp.StatusCode < 200 || resp.StatusCode > 299 { // any non-2xx // If the Content-Type header includes a charset, ignore it. + // typeStr, _, _ = mime.ParseMediaType(resp.Header.Get("Content-Type")) + // inline the part of the function we care about typeStr := resp.Header.Get("Content-Type") splitIdx := strings.IndexRune(typeStr, ';') - if ( splitIdx != -1 ) { + if splitIdx != -1 { typeStr = strings.TrimSpace(typeStr[0:splitIdx]) } diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index b78f76a6..15179638 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -139,30 +139,21 @@ func startJanitors() { go pubsubJanitor() go ircConnection() - go shutdownHandler() } -// is_init_func -func shutdownHandler() { - ch := make(chan os.Signal) - signal.Notify(ch, syscall.SIGUSR1) - signal.Notify(ch, syscall.SIGTERM) - <-ch - log.Println("Shutting down...") - - var wg sync.WaitGroup +// Shutdown disconnects all clients. +func Shutdown(wg *sync.WaitGroup) { wg.Add(1) go func() { + defer wg.Done() writeHLL() - wg.Done() }() - - StopAcceptingConnections = true - close(StopAcceptingConnectionsCh) - - time.Sleep(1 * time.Second) - wg.Wait() - os.Exit(0) + wg.Add(1) + go func() { + defer wg.Done() + close(StopAcceptingConnectionsCh) + time.Sleep(2 * time.Second) + }() } // is_init_func +test @@ -200,7 +191,6 @@ var BannerHTML []byte // StopAcceptingConnectionsCh is closed while the server is shutting down. var StopAcceptingConnectionsCh = make(chan struct{}) -var StopAcceptingConnections = false // HTTPHandleRootURL is the http.HandleFunc for requests on `/`. // It either uses the SocketUpgrader or writes out the BannerHTML. @@ -211,13 +201,6 @@ func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) { return } - // racy, but should be ok? - if StopAcceptingConnections { - w.WriteHeader(503) - fmt.Fprint(w, "server is shutting down") - return - } - if strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") { updateSysMem() @@ -376,8 +359,10 @@ func RunSocketConnection(conn *websocket.Conn) { // And done. - if !StopAcceptingConnections { + select { + case <-StopAcceptingConnectionsCh: // Don't perform high contention operations when server is closing + default: atomic.AddUint64(&Statistics.CurrentClientCount, NegativeOne) atomic.AddUint64(&Statistics.ClientDisconnectsTotal, 1) @@ -410,7 +395,9 @@ func runSocketReader(conn *websocket.Conn, client *ClientInfo, errorChan chan<- msg = ClientMessage{} msgErr := UnmarshalClientMessage(packet, messageType, &msg) - if _, ok := msgErr.(interface{IsFatal() bool}); ok { + if _, ok := msgErr.(interface { + IsFatal() bool + }); ok { errorChan <- msgErr continue } else if msgErr != nil { From bdd5b5416da541b142e85f2dd57c614e4d6b3b79 Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 25 Sep 2017 14:28:21 -0700 Subject: [PATCH 09/15] add visibility for how many items in response cache --- socketserver/server/stats.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/socketserver/server/stats.go b/socketserver/server/stats.go index 6eec5e93..3b413f0d 100644 --- a/socketserver/server/stats.go +++ b/socketserver/server/stats.go @@ -38,9 +38,8 @@ type StatsData struct { MemoryInUseKB uint64 MemoryRSSKB uint64 - LowMemDroppedConnections uint64 - - MemPerClientBytes uint64 + ResponseCacheItems uint64 + MemPerClientBytes uint64 CpuUsagePct float64 @@ -84,7 +83,7 @@ func commandCounter() { } // StatsDataVersion is the version of the StatsData struct. -const StatsDataVersion = 7 +const StatsDataVersion = 8 const pageSize = 4096 var cpuUsage struct { @@ -170,6 +169,7 @@ func updatePeriodicStats() { { Statistics.Uptime = nowUpdate.Sub(Statistics.StartTime).String() + Statistics.ResponseCacheItems = Backend.responseCache.ItemCount() } { From 75e1a67e9a71442d070ca370bc9566b8adb94bb4 Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 25 Sep 2017 14:37:48 -0700 Subject: [PATCH 10/15] Split bunched command list out to own array --- socketserver/server/commands.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/socketserver/server/commands.go b/socketserver/server/commands.go index 01d621da..3108f061 100644 --- a/socketserver/server/commands.go +++ b/socketserver/server/commands.go @@ -32,14 +32,17 @@ var commandHandlers = map[Command]CommandHandler{ "track_follow": C2STrackFollow, "emoticon_uses": C2SEmoticonUses, "survey": C2SSurvey, +} - "twitch_emote": C2SHandleBunchedCommand, - "get_link": C2SHandleBunchedCommand, - "get_display_name": C2SHandleBunchedCommand, - "get_emote": C2SHandleBunchedCommand, - "get_emote_set": C2SHandleBunchedCommand, - "has_logs": C2SHandleBunchedCommand, - "update_follow_buttons": C2SHandleRemoteCommand, +var bunchedCommands = []string{ + "get_display_name", + "get_emote", + "get_emote_set", + "get_link", + "get_itad_plain", + "get_itad_prices", + "get_name_history", + "has_logs", } func setupInterning() { @@ -71,6 +74,12 @@ func DispatchC2SCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMess handler, ok := commandHandlers[msg.Command] if !ok { handler = C2SHandleRemoteCommand + + for _, v := range bunchedCommands { + if msg.Command == v { + handler = C2SHandleBunchedCommand + } + } } CommandCounter <- msg.Command From 7657357164d995ac5f2c7e3747b4c55ad8cdcdda Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 25 Sep 2017 15:08:22 -0700 Subject: [PATCH 11/15] Compile fixes, switch cache implementation --- socketserver/server/backend.go | 64 +++++++++++++++++++++++----- socketserver/server/commands.go | 2 +- socketserver/server/handlecore.go | 1 - socketserver/server/stats.go | 1 - socketserver/server/subscriptions.go | 7 ++- 5 files changed, 60 insertions(+), 15 deletions(-) diff --git a/socketserver/server/backend.go b/socketserver/server/backend.go index 6b4643e8..7f15b310 100644 --- a/socketserver/server/backend.go +++ b/socketserver/server/backend.go @@ -16,8 +16,9 @@ import ( "time" "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/naclform" - cache "github.com/patrickmn/go-cache" + "github.com/karlseguin/ccache" "golang.org/x/crypto/nacl/box" + "golang.org/x/sync/singleflight" ) const bPathAnnounceStartup = "/startup" @@ -28,7 +29,8 @@ const bPathOtherCommand = "/cmd/" type backendInfo struct { HTTPClient http.Client baseURL string - responseCache *cache.Cache + responseCache *ccache.Cache + reloadGroup singleflight.Group postStatisticsURL string addTopicURL string @@ -49,7 +51,8 @@ func setupBackend(config *ConfigFile) *backendInfo { b.HTTPClient.Timeout = 60 * time.Second b.baseURL = config.BackendURL - b.responseCache = cache.New(60*time.Second, 120*time.Second) + // size in bytes of string payload + b.responseCache = ccache.New(ccache.Configure().MaxSize(250 * 1000 * 1024)) b.announceStartupURL = fmt.Sprintf("%s%s", b.baseURL, bPathAnnounceStartup) b.addTopicURL = fmt.Sprintf("%s%s", b.baseURL, bPathAddTopic) @@ -77,6 +80,18 @@ func getCacheKey(remoteCommand, data string) string { return fmt.Sprintf("%s/%s", remoteCommand, data) } +type cachedResponseStr string + +// implements ccache.Sized +func (c cachedResponseStr) Size() int64 { + return int64(len(string(c))) +} + +// implements Stringer +func (c cachedResponseStr) String() string { + return string(c) +} + // ErrForwardedFromBackend is an error returned by the backend server. type ErrForwardedFromBackend struct { JSONError interface{} @@ -88,22 +103,47 @@ func (bfe ErrForwardedFromBackend) Error() string { } // ErrAuthorizationNeeded is emitted when the backend replies with HTTP 401. +// // Indicates that an attempt to validate `ClientInfo.TwitchUsername` should be attempted. var ErrAuthorizationNeeded = errors.New("Must authenticate Twitch username to use this command") -// SendRemoteCommandCached performs a RPC call on the backend, but caches responses. +// SendRemoteCommandCached performs a RPC call on the backend, checking for a +// cached response first. +// +// If a cached, but expired, response is found, the existing value is returned +// and the cache is updated in the background. func (backend *backendInfo) SendRemoteCommandCached(remoteCommand, data string, auth AuthInfo) (string, error) { - cached, ok := backend.responseCache.Get(getCacheKey(remoteCommand, data)) - if ok { - return cached.(string), nil + cacheKey := getCacheKey(remoteCommand, data) + item := backend.responseCache.Get(cacheKey) + if item != nil { + if item.Expired() { + // reload in background + go backend.reloadGroup.Do(cacheKey, func() (interface{}, error) { + backend.SendRemoteCommand(remoteCommand, data, auth) + return nil, nil + }) + } + return item.Value().(cachedResponseStr).String(), nil } return backend.SendRemoteCommand(remoteCommand, data, auth) } // SendRemoteCommand performs a RPC call on the backend by POSTing to `/cmd/$remoteCommand`. +// // The form data is as follows: `clientData` is the JSON in the `data` parameter -// (should be retrieved from ClientMessage.Arguments), and either `username` or -// `usernameClaimed` depending on whether AuthInfo.UsernameValidates is true is AuthInfo.TwitchUsername. +// (should be retrieved from ClientMessage.Arguments), `username` is AuthInfo.TwitchUsername, +// and `authenticated` is 1 or 0 depending on AuthInfo.UsernameValidated. +// +// 401 responses return an ErrAuthorizationNeeded. +// +// Non-2xx responses return the response body as an error to the client (application/json +// responses are sent as-is, non-json are sent as a JSON string). +// +// If a 2xx response has the FFZ-Cache header, its value is used as a minimum number of +// seconds to cache the response for. (Responses may be cached for longer, see +// SendRemoteCommandCached and the cache implementation.) +// +// A successful response updates the Statistics.Health.Backend map. func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth AuthInfo) (responseStr string, err error) { destURL := fmt.Sprintf("%s/cmd/%s", backend.baseURL, remoteCommand) healthBucket := fmt.Sprintf("/cmd/%s", remoteCommand) @@ -166,7 +206,11 @@ func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth A return "", fmt.Errorf("The RPC server returned a non-integer cache duration: %v", err) } duration := time.Duration(durSecs) * time.Second - backend.responseCache.Set(getCacheKey(remoteCommand, data), responseStr, duration) + backend.responseCache.Set( + getCacheKey(remoteCommand, data), + cachedResponseStr(responseStr), + duration, + ) } now := time.Now().UTC() diff --git a/socketserver/server/commands.go b/socketserver/server/commands.go index 3108f061..8a8108be 100644 --- a/socketserver/server/commands.go +++ b/socketserver/server/commands.go @@ -34,7 +34,7 @@ var commandHandlers = map[Command]CommandHandler{ "survey": C2SSurvey, } -var bunchedCommands = []string{ +var bunchedCommands = []Command{ "get_display_name", "get_emote", "get_emote_set", diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 15179638..07d095db 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -205,7 +205,6 @@ func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) { updateSysMem() if Statistics.SysMemFreeKB > 0 && Statistics.SysMemFreeKB < Configuration.MinMemoryKBytes { - atomic.AddUint64(&Statistics.LowMemDroppedConnections, 1) w.WriteHeader(503) fmt.Fprint(w, "error: low memory") return diff --git a/socketserver/server/stats.go b/socketserver/server/stats.go index 3b413f0d..10dad36b 100644 --- a/socketserver/server/stats.go +++ b/socketserver/server/stats.go @@ -169,7 +169,6 @@ func updatePeriodicStats() { { Statistics.Uptime = nowUpdate.Sub(Statistics.StartTime).String() - Statistics.ResponseCacheItems = Backend.responseCache.ItemCount() } { diff --git a/socketserver/server/subscriptions.go b/socketserver/server/subscriptions.go index 007aea03..136fa603 100644 --- a/socketserver/server/subscriptions.go +++ b/socketserver/server/subscriptions.go @@ -156,8 +156,11 @@ func UnsubscribeSingleChat(client *ClientInfo, channelName string) { // - write lock to SubscriptionInfos // - write lock to ClientInfo func UnsubscribeAll(client *ClientInfo) { - if StopAcceptingConnections { - return // no need to remove from a high-contention list when the server is closing + select { + case <-StopAcceptingConnectionsCh: + // Skip high-contention client removal operations while server shutting down + return + default: } GlobalSubscriptionLock.Lock() From 87672061058aaef71a5c874f74b6e5c113e44045 Mon Sep 17 00:00:00 2001 From: Kane York Date: Mon, 25 Sep 2017 15:24:58 -0700 Subject: [PATCH 12/15] Revert switching cache implementation --- socketserver/server/backend.go | 33 +++++++-------------------------- socketserver/server/stats.go | 3 ++- 2 files changed, 9 insertions(+), 27 deletions(-) diff --git a/socketserver/server/backend.go b/socketserver/server/backend.go index 7f15b310..b3761795 100644 --- a/socketserver/server/backend.go +++ b/socketserver/server/backend.go @@ -16,7 +16,7 @@ import ( "time" "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/naclform" - "github.com/karlseguin/ccache" + cache "github.com/patrickmn/go-cache" "golang.org/x/crypto/nacl/box" "golang.org/x/sync/singleflight" ) @@ -29,7 +29,7 @@ const bPathOtherCommand = "/cmd/" type backendInfo struct { HTTPClient http.Client baseURL string - responseCache *ccache.Cache + responseCache *cache.Cache reloadGroup singleflight.Group postStatisticsURL string @@ -52,7 +52,7 @@ func setupBackend(config *ConfigFile) *backendInfo { b.HTTPClient.Timeout = 60 * time.Second b.baseURL = config.BackendURL // size in bytes of string payload - b.responseCache = ccache.New(ccache.Configure().MaxSize(250 * 1000 * 1024)) + b.responseCache = cache.New(60*time.Second, 10*time.Minute) b.announceStartupURL = fmt.Sprintf("%s%s", b.baseURL, bPathAnnounceStartup) b.addTopicURL = fmt.Sprintf("%s%s", b.baseURL, bPathAddTopic) @@ -80,18 +80,6 @@ func getCacheKey(remoteCommand, data string) string { return fmt.Sprintf("%s/%s", remoteCommand, data) } -type cachedResponseStr string - -// implements ccache.Sized -func (c cachedResponseStr) Size() int64 { - return int64(len(string(c))) -} - -// implements Stringer -func (c cachedResponseStr) String() string { - return string(c) -} - // ErrForwardedFromBackend is an error returned by the backend server. type ErrForwardedFromBackend struct { JSONError interface{} @@ -114,16 +102,9 @@ var ErrAuthorizationNeeded = errors.New("Must authenticate Twitch username to us // and the cache is updated in the background. func (backend *backendInfo) SendRemoteCommandCached(remoteCommand, data string, auth AuthInfo) (string, error) { cacheKey := getCacheKey(remoteCommand, data) - item := backend.responseCache.Get(cacheKey) - if item != nil { - if item.Expired() { - // reload in background - go backend.reloadGroup.Do(cacheKey, func() (interface{}, error) { - backend.SendRemoteCommand(remoteCommand, data, auth) - return nil, nil - }) - } - return item.Value().(cachedResponseStr).String(), nil + cached, ok := backend.responseCache.Get(cacheKey) + if ok { + return cached.(string), nil } return backend.SendRemoteCommand(remoteCommand, data, auth) } @@ -208,7 +189,7 @@ func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth A duration := time.Duration(durSecs) * time.Second backend.responseCache.Set( getCacheKey(remoteCommand, data), - cachedResponseStr(responseStr), + responseStr, duration, ) } diff --git a/socketserver/server/stats.go b/socketserver/server/stats.go index 10dad36b..da5debb1 100644 --- a/socketserver/server/stats.go +++ b/socketserver/server/stats.go @@ -38,7 +38,7 @@ type StatsData struct { MemoryInUseKB uint64 MemoryRSSKB uint64 - ResponseCacheItems uint64 + ResponseCacheItems int MemPerClientBytes uint64 CpuUsagePct float64 @@ -169,6 +169,7 @@ func updatePeriodicStats() { { Statistics.Uptime = nowUpdate.Sub(Statistics.StartTime).String() + Statistics.ResponseCacheItems = Backend.responseCache.ItemCount() } { From 7b0cdc4baafd3b960583225baa7c9528ba7bd079 Mon Sep 17 00:00:00 2001 From: Kane York Date: Tue, 26 Sep 2017 13:04:39 -0700 Subject: [PATCH 13/15] Add certificate reloader on SIGHUP --- socketserver/certreloader/reloader.go | 69 +++++++++++++++++++ .../cmd/ffzsocketserver/socketserver.go | 17 ++++- socketserver/server/types.go | 5 -- 3 files changed, 83 insertions(+), 8 deletions(-) create mode 100644 socketserver/certreloader/reloader.go diff --git a/socketserver/certreloader/reloader.go b/socketserver/certreloader/reloader.go new file mode 100644 index 00000000..456929f6 --- /dev/null +++ b/socketserver/certreloader/reloader.go @@ -0,0 +1,69 @@ +package certreloader + +import ( + "crypto/tls" + "log" + "os" + "os/signal" + "sync" +) + +type CertSource struct { + certMu sync.RWMutex + cert *tls.Certificate + certPath string + keyPath string +} + +// Create a CertSource +func New(certPath, keyPath string) (*CertSource, error) { + result := &CertSource{ + certPath: certPath, + keyPath: keyPath, + } + cert, err := tls.LoadX509KeyPair(certPath, keyPath) + if err != nil { + return nil, err + } + result.cert = &cert + return result, nil +} + +// Automatically reload certificate on the provided signal +func (kpr *CertSource) AutoCheck(sig os.Signal) { + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, sig) + for range c { + log.Printf("Received %v, reloading TLS certificate and key from %q and %q", sig, kpr.certPath, kpr.keyPath) + if err := kpr.maybeReload(); err != nil { + log.Printf("Keeping old TLS certificate because the new one could not be loaded: %v", err) + } + } + }() +} + +// Check() can be called manually to reload the certificate +func (kpr *CertSource) Check() error { + return kpr.maybeReload() +} + +func (kpr *CertSource) maybeReload() error { + newCert, err := tls.LoadX509KeyPair(kpr.certPath, kpr.keyPath) + if err != nil { + return err + } + kpr.certMu.Lock() + defer kpr.certMu.Unlock() + kpr.cert = &newCert + return nil +} + +// Returns a tls.Config.GetCertificate function. +func (kpr *CertSource) GetCertificateFunc() func(*tls.ClientHelloInfo) (*tls.Certificate, error) { + return func(clientHello *tls.ClientHelloInfo) (*tls.Certificate, error) { + kpr.certMu.RLock() + defer kpr.certMu.RUnlock() + return kpr.cert, nil + } +} diff --git a/socketserver/cmd/ffzsocketserver/socketserver.go b/socketserver/cmd/ffzsocketserver/socketserver.go index cc28b327..1f7f2db0 100644 --- a/socketserver/cmd/ffzsocketserver/socketserver.go +++ b/socketserver/cmd/ffzsocketserver/socketserver.go @@ -1,7 +1,10 @@ package main // import "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/cmd/ffzsocketserver" +import _ "net/http/pprof" + import ( "context" + "crypto/tls" "encoding/json" "flag" "fmt" @@ -14,11 +17,10 @@ import ( "syscall" "time" + "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/certreloader" "github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server" ) -import _ "net/http/pprof" - var configFilename = flag.String("config", "config.json", "Configuration file, including the keypairs for the NaCl crypto library, for communicating with the backend.") var flagGenerateKeys = flag.Bool("genkeys", false, "Generate NaCl keys instead of serving requests.\nArguments: [int serverId] [base64 backendPublic]\nThe backend public key can either be specified in base64 on the command line, or put in the json file later.") @@ -69,12 +71,21 @@ func main() { signal.Notify(stopSig, syscall.SIGTERM) if conf.UseSSL { + reloader, err := certreloader.New(conf.SSLCertificateFile, conf.SSLKeyFile) + if err != nil { + log.Fatalln("Could not load TLS certificate:", err) + } + reloader.AutoCheck(syscall.SIGHUP) + server1 = &http.Server{ Addr: conf.SSLListenAddr, Handler: http.DefaultServeMux, + TLSConfig: &tls.Config{ + GetCertificate: reloader.GetCertificateFunc(), + }, } go func() { - if err := server1.ListenAndServeTLS(conf.SSLCertificateFile, conf.SSLKeyFile); err != nil { + if err := server1.ListenAndServeTLS("", ""); err != nil { log.Println("ListenAndServeTLS:", err) stopSig <- os.Interrupt } diff --git a/socketserver/server/types.go b/socketserver/server/types.go index 3de13127..1097a159 100644 --- a/socketserver/server/types.go +++ b/socketserver/server/types.go @@ -35,11 +35,6 @@ type ConfigFile struct { // Path to key file. SSLKeyFile string - UseESLogStashing bool - ESServer string - ESIndexPrefix string - ESHostName string - // Nacl keys OurPrivateKey []byte OurPublicKey []byte From 206e36a5216ae5bb33385d9a24884dde825b4eb8 Mon Sep 17 00:00:00 2001 From: Kane York Date: Tue, 26 Sep 2017 13:04:54 -0700 Subject: [PATCH 14/15] Fix compile --- socketserver/server/subscriptions.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/socketserver/server/subscriptions.go b/socketserver/server/subscriptions.go index 007aea03..136fa603 100644 --- a/socketserver/server/subscriptions.go +++ b/socketserver/server/subscriptions.go @@ -156,8 +156,11 @@ func UnsubscribeSingleChat(client *ClientInfo, channelName string) { // - write lock to SubscriptionInfos // - write lock to ClientInfo func UnsubscribeAll(client *ClientInfo) { - if StopAcceptingConnections { - return // no need to remove from a high-contention list when the server is closing + select { + case <-StopAcceptingConnectionsCh: + // Skip high-contention client removal operations while server shutting down + return + default: } GlobalSubscriptionLock.Lock() From d135d0cde947645f58c4575efacec494f9329eac Mon Sep 17 00:00:00 2001 From: Kane York Date: Tue, 26 Sep 2017 13:06:20 -0700 Subject: [PATCH 15/15] add copyright notice --- socketserver/certreloader/reloader.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/socketserver/certreloader/reloader.go b/socketserver/certreloader/reloader.go index 456929f6..4847ee57 100644 --- a/socketserver/certreloader/reloader.go +++ b/socketserver/certreloader/reloader.go @@ -1,3 +1,6 @@ +// Copyright 2016 Michael Stapelberg, BSD-3 +// +// https://stackoverflow.com/a/40883377/1210278 package certreloader import (