mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-08-03 00:18:31 +00:00
commit
6f7d4e478d
11 changed files with 206 additions and 255 deletions
|
@ -15,6 +15,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/naclform"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
"golang.org/x/crypto/nacl/box"
|
||||
)
|
||||
|
@ -33,8 +34,7 @@ type backendInfo struct {
|
|||
addTopicURL string
|
||||
announceStartupURL string
|
||||
|
||||
sharedKey [32]byte
|
||||
serverID int
|
||||
secureForm naclform.ServerInfo
|
||||
|
||||
lastSuccess map[string]time.Time
|
||||
lastSuccessLock sync.Mutex
|
||||
|
@ -45,7 +45,7 @@ var Backend *backendInfo
|
|||
func setupBackend(config *ConfigFile) *backendInfo {
|
||||
b := new(backendInfo)
|
||||
Backend = b
|
||||
b.serverID = config.ServerID
|
||||
b.secureForm.ServerID = config.ServerID
|
||||
|
||||
b.HTTPClient.Timeout = 60 * time.Second
|
||||
b.baseURL = config.BackendURL
|
||||
|
@ -68,7 +68,7 @@ func setupBackend(config *ConfigFile) *backendInfo {
|
|||
copy(theirPublic[:], config.BackendPublicKey)
|
||||
copy(ourPrivate[:], config.OurPrivateKey)
|
||||
|
||||
box.Precompute(&b.sharedKey, &theirPublic, &ourPrivate)
|
||||
box.Precompute(&b.secureForm.SharedKey, &theirPublic, &ourPrivate)
|
||||
|
||||
return b
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth A
|
|||
formData.Set("authenticated", "0")
|
||||
}
|
||||
|
||||
sealedForm, err := backend.SealRequest(formData)
|
||||
sealedForm, err := backend.secureForm.Seal(formData)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -178,7 +178,7 @@ func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth A
|
|||
|
||||
// SendAggregatedData sends aggregated emote usage and following data to the backend server.
|
||||
func (backend *backendInfo) SendAggregatedData(form url.Values) error {
|
||||
sealedForm, err := backend.SealRequest(form)
|
||||
sealedForm, err := backend.secureForm.Seal(form)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -235,7 +235,7 @@ func (backend *backendInfo) sendTopicNotice(topic string, added bool) error {
|
|||
formData.Set("added", "f")
|
||||
}
|
||||
|
||||
sealedForm, err := backend.SealRequest(formData)
|
||||
sealedForm, err := backend.secureForm.Seal(formData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -18,14 +18,14 @@ func TestSealRequest(t *testing.T) {
|
|||
"QuickBrownFox": []string{"LazyDog"},
|
||||
}
|
||||
|
||||
sealedValues, err := b.SealRequest(values)
|
||||
sealedValues, err := b.secureForm.Seal(values)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// sealedValues.Encode()
|
||||
// id=0&msg=KKtbng49dOLLyjeuX5AnXiEe6P0uZwgeP_7mMB5vhP-wMAAPZw%3D%3D&nonce=-wRbUnifscisWUvhm3gBEXHN5QzrfzgV
|
||||
|
||||
unsealedValues, err := b.UnsealRequest(sealedValues)
|
||||
unsealedValues, err := b.secureForm.Unseal(sealedValues)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -2,11 +2,9 @@ package server
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -96,7 +94,7 @@ func DispatchC2SCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMess
|
|||
}
|
||||
}
|
||||
|
||||
func callHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
func callHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (_ ClientMessage, err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
var ok bool
|
||||
|
@ -112,7 +110,7 @@ func callHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInf
|
|||
|
||||
// C2SHello implements the `hello` C2S Command.
|
||||
// It calls SubscribeGlobal() and SubscribeDefaults() with the client, and fills out ClientInfo.Version and ClientInfo.ClientID.
|
||||
func C2SHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
func C2SHello(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (_ ClientMessage, err error) {
|
||||
ary, ok := msg.Arguments.([]interface{})
|
||||
if !ok {
|
||||
err = ErrExpectedTwoStrings
|
||||
|
@ -163,16 +161,16 @@ func C2SHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg
|
|||
}, nil
|
||||
}
|
||||
|
||||
func C2SPing(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
func C2SPing(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) {
|
||||
return ClientMessage{
|
||||
Arguments: float64(time.Now().UnixNano()/1000) / 1000,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func C2SSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
func C2SSetUser(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
|
||||
username, err := msg.ArgumentsAsString()
|
||||
if err != nil {
|
||||
return
|
||||
return ClientMessage{}, err
|
||||
}
|
||||
|
||||
username = copyString(username)
|
||||
|
@ -192,29 +190,24 @@ func C2SSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rm
|
|||
return ResponseSuccess, nil
|
||||
}
|
||||
|
||||
func C2SReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
// disconnectAt, err := msg.ArgumentsAsInt()
|
||||
// if err != nil {
|
||||
// return
|
||||
// }
|
||||
|
||||
func C2SReady(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
|
||||
client.Mutex.Lock()
|
||||
client.ReadyComplete = true
|
||||
client.Mutex.Unlock()
|
||||
|
||||
client.MsgChannelKeepalive.Add(1)
|
||||
go func() {
|
||||
client.Send(ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand})
|
||||
client.Send(msg.Reply(SuccessCommand, nil))
|
||||
SendBacklogForNewClient(client)
|
||||
client.MsgChannelKeepalive.Done()
|
||||
}()
|
||||
return ClientMessage{Command: AsyncResponseCommand}, nil
|
||||
}
|
||||
|
||||
func C2SSubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
func C2SSubscribe(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
|
||||
channel, err := msg.ArgumentsAsString()
|
||||
if err != nil {
|
||||
return
|
||||
return ClientMessage{}, err
|
||||
}
|
||||
|
||||
channel = PubSubChannelPool.Intern(channel)
|
||||
|
@ -238,10 +231,10 @@ func C2SSubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (
|
|||
|
||||
// C2SUnsubscribe implements the `unsub` C2S Command.
|
||||
// It removes the channel from ClientInfo.CurrentChannels and calls UnsubscribeSingleChat.
|
||||
func C2SUnsubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
func C2SUnsubscribe(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
|
||||
channel, err := msg.ArgumentsAsString()
|
||||
if err != nil {
|
||||
return
|
||||
return ClientMessage{}, err
|
||||
}
|
||||
|
||||
channel = PubSubChannelPool.Intern(channel)
|
||||
|
@ -256,9 +249,8 @@ func C2SUnsubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage)
|
|||
}
|
||||
|
||||
// C2SSurvey implements the survey C2S Command.
|
||||
// Surveys are discarded.s
|
||||
func C2SSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
// Discard
|
||||
func C2SSurvey(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) {
|
||||
// Surveys are not collected.
|
||||
return ResponseSuccess, nil
|
||||
}
|
||||
|
||||
|
@ -276,7 +268,7 @@ var followEventsLock sync.Mutex
|
|||
|
||||
// C2STrackFollow implements the `track_follow` C2S Command.
|
||||
// It adds the record to `followEvents`, which is submitted to the backend on a timer.
|
||||
func C2STrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
func C2STrackFollow(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (_ ClientMessage, err error) {
|
||||
channel, following, err := msg.ArgumentsAsStringAndBool()
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -293,68 +285,18 @@ func C2STrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage)
|
|||
}
|
||||
|
||||
// AggregateEmoteUsage is a map from emoteID to a map from chatroom name to usage count.
|
||||
var aggregateEmoteUsage = make(map[int]map[string]int)
|
||||
//var aggregateEmoteUsage = make(map[int]map[string]int)
|
||||
|
||||
// AggregateEmoteUsageLock is the lock for AggregateEmoteUsage.
|
||||
var aggregateEmoteUsageLock sync.Mutex
|
||||
//var aggregateEmoteUsageLock sync.Mutex
|
||||
|
||||
// ErrNegativeEmoteUsage is emitted when the submitted emote usage is negative.
|
||||
var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative")
|
||||
//var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative")
|
||||
|
||||
// C2SEmoticonUses implements the `emoticon_uses` C2S Command.
|
||||
// msg.Arguments are in the JSON format of [1]map[emoteID]map[ChatroomName]float64.
|
||||
func C2SEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
// if this panics, will be caught by callHandler
|
||||
mapRoot := msg.Arguments.([]interface{})[0].(map[string]interface{})
|
||||
|
||||
// Validate: male suire
|
||||
for strEmote, val1 := range mapRoot {
|
||||
_, err = strconv.Atoi(strEmote)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
mapInner := val1.(map[string]interface{})
|
||||
for _, val2 := range mapInner {
|
||||
var count = int(val2.(float64))
|
||||
if count <= 0 {
|
||||
err = ErrNegativeEmoteUsage
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
aggregateEmoteUsageLock.Lock()
|
||||
defer aggregateEmoteUsageLock.Unlock()
|
||||
|
||||
var total int
|
||||
|
||||
for strEmote, val1 := range mapRoot {
|
||||
var emoteID int
|
||||
emoteID, err = strconv.Atoi(strEmote)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
destMapInner, ok := aggregateEmoteUsage[emoteID]
|
||||
if !ok {
|
||||
destMapInner = make(map[string]int)
|
||||
aggregateEmoteUsage[emoteID] = destMapInner
|
||||
}
|
||||
|
||||
mapInner := val1.(map[string]interface{})
|
||||
for roomName, val2 := range mapInner {
|
||||
var count = int(val2.(float64))
|
||||
if count > 200 {
|
||||
count = 200
|
||||
}
|
||||
roomName = TwitchChannelPool.Intern(roomName)
|
||||
destMapInner[roomName] += count
|
||||
total += count
|
||||
}
|
||||
}
|
||||
|
||||
Statistics.EmotesReportedTotal += uint64(total)
|
||||
|
||||
func C2SEmoticonUses(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) {
|
||||
// We do not collect emote usage data
|
||||
return ResponseSuccess, nil
|
||||
}
|
||||
|
||||
|
@ -371,10 +313,10 @@ func aggregateDataSender_do() {
|
|||
follows := followEvents
|
||||
followEvents = nil
|
||||
followEventsLock.Unlock()
|
||||
aggregateEmoteUsageLock.Lock()
|
||||
emoteUsage := aggregateEmoteUsage
|
||||
aggregateEmoteUsage = make(map[int]map[string]int)
|
||||
aggregateEmoteUsageLock.Unlock()
|
||||
//aggregateEmoteUsageLock.Lock()
|
||||
//emoteUsage := aggregateEmoteUsage
|
||||
//aggregateEmoteUsage = make(map[int]map[string]int)
|
||||
//aggregateEmoteUsageLock.Unlock()
|
||||
|
||||
reportForm := url.Values{}
|
||||
|
||||
|
@ -386,10 +328,10 @@ func aggregateDataSender_do() {
|
|||
}
|
||||
|
||||
strEmoteUsage := make(map[string]map[string]int)
|
||||
for emoteID, usageByChannel := range emoteUsage {
|
||||
strEmoteID := strconv.Itoa(emoteID)
|
||||
strEmoteUsage[strEmoteID] = usageByChannel
|
||||
}
|
||||
//for emoteID, usageByChannel := range emoteUsage {
|
||||
// strEmoteID := strconv.Itoa(emoteID)
|
||||
// strEmoteUsage[strEmoteID] = usageByChannel
|
||||
//}
|
||||
emoteJSON, err := json.Marshal(strEmoteUsage)
|
||||
if err != nil {
|
||||
log.Println("error reporting aggregate data:", err)
|
||||
|
@ -429,7 +371,7 @@ var bunchGroup singleflight.Group
|
|||
|
||||
// C2SHandleBunchedCommand handles C2S Commands such as `get_link`.
|
||||
// It makes a request to the backend server for the data, but any other requests coming in while the first is pending also get the responses from the first one.
|
||||
func C2SHandleBunchedCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
|
||||
func C2SHandleBunchedCommand(_ *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
|
||||
key := fmt.Sprintf("%s:%s", msg.Command, msg.origArguments)
|
||||
|
||||
resultCh := bunchGroup.DoChan(key, func() (interface{}, error) {
|
||||
|
@ -439,29 +381,21 @@ func C2SHandleBunchedCommand(conn *websocket.Conn, client *ClientInfo, msg Clien
|
|||
client.MsgChannelKeepalive.Add(1)
|
||||
go func() {
|
||||
result := <-resultCh
|
||||
var reply ClientMessage
|
||||
reply.MessageID = msg.MessageID
|
||||
if result.Err != nil {
|
||||
reply.Command = ErrorCommand
|
||||
if efb, ok := result.Err.(ErrForwardedFromBackend); ok {
|
||||
reply.Arguments = efb.JSONError
|
||||
} else {
|
||||
reply.Arguments = result.Err.Error()
|
||||
}
|
||||
if efb, ok := result.Err.(ErrForwardedFromBackend); ok {
|
||||
client.Send(msg.Reply(ErrorCommand, efb.JSONError))
|
||||
} else if result.Err != nil {
|
||||
client.Send(msg.Reply(ErrorCommand, result.Err.Error()))
|
||||
} else {
|
||||
reply.Command = SuccessCommand
|
||||
reply.origArguments = result.Val.(string)
|
||||
reply.parseOrigArguments()
|
||||
client.Send(msg.ReplyJSON(SuccessCommand, result.Val.(string)))
|
||||
}
|
||||
|
||||
client.Send(reply)
|
||||
client.MsgChannelKeepalive.Done()
|
||||
}()
|
||||
|
||||
return ClientMessage{Command: AsyncResponseCommand}, nil
|
||||
}
|
||||
|
||||
func C2SHandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
|
||||
func C2SHandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (ClientMessage, error) {
|
||||
client.MsgChannelKeepalive.Add(1)
|
||||
go doRemoteCommand(conn, msg, client)
|
||||
|
||||
|
@ -477,7 +411,7 @@ func doRemoteCommand(conn *websocket.Conn, msg ClientMessage, client *ClientInfo
|
|||
if err == ErrAuthorizationNeeded {
|
||||
if client.TwitchUsername == "" {
|
||||
// Not logged in
|
||||
client.Send(ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: AuthorizationNeededError})
|
||||
client.Send(msg.Reply(ErrorCommand, AuthorizationNeededError))
|
||||
client.MsgChannelKeepalive.Done()
|
||||
return
|
||||
}
|
||||
|
@ -485,19 +419,17 @@ func doRemoteCommand(conn *websocket.Conn, msg ClientMessage, client *ClientInfo
|
|||
if success {
|
||||
doRemoteCommand(conn, msg, client)
|
||||
} else {
|
||||
client.Send(ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: AuthorizationFailedErrorString})
|
||||
client.Send(msg.Reply(ErrorCommand, AuthorizationFailedErrorString))
|
||||
client.MsgChannelKeepalive.Done()
|
||||
}
|
||||
})
|
||||
return // without keepalive.Done()
|
||||
} else if bfe, ok := err.(ErrForwardedFromBackend); ok {
|
||||
client.Send(ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: bfe.JSONError})
|
||||
client.Send(msg.Reply(ErrorCommand, bfe.JSONError))
|
||||
} else if err != nil {
|
||||
client.Send(ClientMessage{MessageID: msg.MessageID, Command: ErrorCommand, Arguments: err.Error()})
|
||||
client.Send(msg.Reply(ErrorCommand, err.Error()))
|
||||
} else {
|
||||
msg := ClientMessage{MessageID: msg.MessageID, Command: SuccessCommand, origArguments: resp}
|
||||
msg.parseOrigArguments()
|
||||
client.Send(msg)
|
||||
client.Send(msg.ReplyJSON(SuccessCommand, resp))
|
||||
}
|
||||
client.MsgChannelKeepalive.Done()
|
||||
}
|
||||
|
|
|
@ -104,7 +104,7 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) {
|
|||
serveMux.HandleFunc("/cached_pub", HTTPBackendCachedPublish)
|
||||
serveMux.HandleFunc("/get_sub_count", HTTPGetSubscriberCount)
|
||||
|
||||
announceForm, err := Backend.SealRequest(url.Values{
|
||||
announceForm, err := Backend.secureForm.Seal(url.Values{
|
||||
"startup": []string{"1"},
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -249,11 +249,21 @@ func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
type fatalDecodeError string
|
||||
|
||||
func (e fatalDecodeError) Error() string {
|
||||
return string(e)
|
||||
}
|
||||
|
||||
func (e fatalDecodeError) IsFatal() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// ErrProtocolGeneric is sent in a ErrorCommand Reply.
|
||||
var ErrProtocolGeneric error = errors.New("FFZ Socket protocol error.")
|
||||
var ErrProtocolGeneric error = fatalDecodeError("FFZ Socket protocol error.")
|
||||
|
||||
// ErrProtocolNegativeMsgID is sent in a ErrorCommand Reply when a negative MessageID is received.
|
||||
var ErrProtocolNegativeMsgID error = errors.New("FFZ Socket protocol error: negative or zero message ID.")
|
||||
var ErrProtocolNegativeMsgID error = fatalDecodeError("FFZ Socket protocol error: negative or zero message ID.")
|
||||
|
||||
// ErrExpectedSingleString is sent in a ErrorCommand Reply when the Arguments are of the wrong type.
|
||||
var ErrExpectedSingleString = errors.New("Error: Expected single string as arguments.")
|
||||
|
@ -344,7 +354,7 @@ func RunSocketConnection(conn *websocket.Conn) {
|
|||
})
|
||||
|
||||
// All set up, now enter the work loop
|
||||
go runSocketReader(conn, _errorChan, _clientChan, stoppedChan)
|
||||
go runSocketReader(conn, &client, _errorChan, _clientChan)
|
||||
closeReason := runSocketWriter(conn, &client, _errorChan, _clientChan, _serverMessageChan)
|
||||
|
||||
// Exit
|
||||
|
@ -376,12 +386,14 @@ func RunSocketConnection(conn *websocket.Conn) {
|
|||
}
|
||||
}
|
||||
|
||||
func runSocketReader(conn *websocket.Conn, errorChan chan<- error, clientChan chan<- ClientMessage, stoppedChan <-chan struct{}) {
|
||||
func runSocketReader(conn *websocket.Conn, client *ClientInfo, errorChan chan<- error, clientChan chan<- ClientMessage) {
|
||||
var msg ClientMessage
|
||||
var messageType int
|
||||
var packet []byte
|
||||
var err error
|
||||
|
||||
stoppedChan := client.MsgChannelIsDone
|
||||
|
||||
defer close(errorChan)
|
||||
defer close(clientChan)
|
||||
|
||||
|
@ -395,8 +407,15 @@ func runSocketReader(conn *websocket.Conn, errorChan chan<- error, clientChan ch
|
|||
break
|
||||
}
|
||||
|
||||
UnmarshalClientMessage(packet, messageType, &msg)
|
||||
if msg.MessageID == 0 {
|
||||
msg = ClientMessage{}
|
||||
msgErr := UnmarshalClientMessage(packet, messageType, &msg)
|
||||
if _, ok := msgErr.(interface{IsFatal() bool}); ok {
|
||||
errorChan <- msgErr
|
||||
continue
|
||||
} else if msgErr != nil {
|
||||
client.Send(msg.Reply(ErrorCommand, msgErr.Error()))
|
||||
continue
|
||||
} else if msg.MessageID == 0 {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
|
@ -505,7 +524,7 @@ func SendMessage(conn *websocket.Conn, msg ClientMessage) {
|
|||
}
|
||||
|
||||
// UnmarshalClientMessage unpacks websocket TextMessage into a ClientMessage provided in the `v` parameter.
|
||||
func UnmarshalClientMessage(data []byte, payloadType int, v interface{}) (err error) {
|
||||
func UnmarshalClientMessage(data []byte, _ int, v interface{}) (err error) {
|
||||
var spaceIdx int
|
||||
|
||||
out := v.(*ClientMessage)
|
||||
|
@ -514,11 +533,11 @@ func UnmarshalClientMessage(data []byte, payloadType int, v interface{}) (err er
|
|||
// Message ID
|
||||
spaceIdx = strings.IndexRune(dataStr, ' ')
|
||||
if spaceIdx == -1 {
|
||||
return ErrProtocolGeneric
|
||||
return ErrProtocolGeneric // fatal error
|
||||
}
|
||||
messageID, err := strconv.Atoi(dataStr[:spaceIdx])
|
||||
if messageID < -1 || messageID == 0 {
|
||||
return ErrProtocolNegativeMsgID
|
||||
return ErrProtocolNegativeMsgID // fatal error
|
||||
}
|
||||
|
||||
out.MessageID = messageID
|
||||
|
@ -551,7 +570,8 @@ func (cm *ClientMessage) parseOrigArguments() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func MarshalClientMessage(clientMessage interface{}) (payloadType int, data []byte, err error) {
|
||||
// returns payloadType, data, err
|
||||
func MarshalClientMessage(clientMessage interface{}) (int, []byte, error) {
|
||||
var msg ClientMessage
|
||||
var ok bool
|
||||
msg, ok = clientMessage.(ClientMessage)
|
||||
|
|
96
socketserver/server/naclform/seal.go
Normal file
96
socketserver/server/naclform/seal.go
Normal file
|
@ -0,0 +1,96 @@
|
|||
package naclform
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/crypto/nacl/box"
|
||||
)
|
||||
|
||||
var ErrorShortNonce = errors.New("Nonce too short.")
|
||||
var ErrorInvalidSignature = errors.New("Invalid signature or contents")
|
||||
|
||||
type ServerInfo struct {
|
||||
SharedKey [32]byte
|
||||
ServerID int
|
||||
}
|
||||
|
||||
func fillCryptoRandom(buf []byte) error {
|
||||
remaining := len(buf)
|
||||
for remaining > 0 {
|
||||
count, err := rand.Read(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
remaining -= count
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *ServerInfo) Seal(form url.Values) (url.Values, error) {
|
||||
var nonce [24]byte
|
||||
var err error
|
||||
|
||||
err = fillCryptoRandom(nonce[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cipherMsg := box.SealAfterPrecomputation(nil, []byte(form.Encode()), &nonce, &i.SharedKey)
|
||||
|
||||
bufMessage := new(bytes.Buffer)
|
||||
enc := base64.NewEncoder(base64.URLEncoding, bufMessage)
|
||||
enc.Write(cipherMsg)
|
||||
enc.Close()
|
||||
cipherString := bufMessage.String()
|
||||
|
||||
bufNonce := new(bytes.Buffer)
|
||||
enc = base64.NewEncoder(base64.URLEncoding, bufNonce)
|
||||
enc.Write(nonce[:])
|
||||
enc.Close()
|
||||
nonceString := bufNonce.String()
|
||||
|
||||
retval := url.Values{
|
||||
"nonce": []string{nonceString},
|
||||
"msg": []string{cipherString},
|
||||
"id": []string{strconv.Itoa(i.ServerID)},
|
||||
}
|
||||
|
||||
return retval, nil
|
||||
}
|
||||
|
||||
func (i *ServerInfo) Unseal(form url.Values) (url.Values, error) {
|
||||
var nonce [24]byte
|
||||
|
||||
nonceString := form.Get("nonce")
|
||||
dec := base64.NewDecoder(base64.URLEncoding, strings.NewReader(nonceString))
|
||||
count, err := dec.Read(nonce[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if count != 24 {
|
||||
return nil, ErrorShortNonce
|
||||
}
|
||||
|
||||
cipherString := form.Get("msg")
|
||||
dec = base64.NewDecoder(base64.URLEncoding, strings.NewReader(cipherString))
|
||||
cipherBuffer := new(bytes.Buffer)
|
||||
cipherBuffer.ReadFrom(dec)
|
||||
|
||||
message, ok := box.OpenAfterPrecomputation(nil, cipherBuffer.Bytes(), &nonce, &i.SharedKey)
|
||||
if !ok {
|
||||
return nil, ErrorInvalidSignature
|
||||
}
|
||||
|
||||
retValues, err := url.ParseQuery(string(message))
|
||||
if err != nil {
|
||||
return nil, ErrorInvalidSignature
|
||||
}
|
||||
|
||||
return retValues, nil
|
||||
}
|
|
@ -123,7 +123,7 @@ func saveLastMessage(cmd Command, channel string, expires time.Time, data string
|
|||
|
||||
func HTTPBackendDropBacklog(w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
formData, err := Backend.UnsealRequest(r.Form)
|
||||
formData, err := Backend.secureForm.Unseal(r.Form)
|
||||
if err != nil {
|
||||
w.WriteHeader(403)
|
||||
fmt.Fprintf(w, "Error: %v", err)
|
||||
|
@ -160,7 +160,7 @@ func rateLimitFromRequest(r *http.Request) (rate.Limiter, error) {
|
|||
// If the 'expires' parameter is not specified, the message will not expire (though it is only kept in-memory).
|
||||
func HTTPBackendCachedPublish(w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
formData, err := Backend.UnsealRequest(r.Form)
|
||||
formData, err := Backend.secureForm.Unseal(r.Form)
|
||||
if err != nil {
|
||||
w.WriteHeader(403)
|
||||
fmt.Fprintf(w, "Error: %v", err)
|
||||
|
@ -227,7 +227,7 @@ func HTTPBackendCachedPublish(w http.ResponseWriter, r *http.Request) {
|
|||
// If "scope" is "global", then "channel" is not used.
|
||||
func HTTPBackendUncachedPublish(w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
formData, err := Backend.UnsealRequest(r.Form)
|
||||
formData, err := Backend.secureForm.Unseal(r.Form)
|
||||
if err != nil {
|
||||
w.WriteHeader(403)
|
||||
fmt.Fprintf(w, "Error: %v", err)
|
||||
|
@ -292,7 +292,7 @@ func HTTPBackendUncachedPublish(w http.ResponseWriter, r *http.Request) {
|
|||
// A "global" option is not available, use fetch(/stats).CurrentClientCount instead.
|
||||
func HTTPGetSubscriberCount(w http.ResponseWriter, r *http.Request) {
|
||||
r.ParseForm()
|
||||
formData, err := Backend.UnsealRequest(r.Form)
|
||||
formData, err := Backend.secureForm.Unseal(r.Form)
|
||||
if err != nil {
|
||||
w.WriteHeader(403)
|
||||
fmt.Fprintf(w, "Error: %v", err)
|
||||
|
|
|
@ -209,7 +209,7 @@ func updateSysMem() {
|
|||
}
|
||||
|
||||
// HTTPShowStatistics handles the /stats endpoint. It writes out the Statistics object as indented JSON.
|
||||
func HTTPShowStatistics(w http.ResponseWriter, r *http.Request) {
|
||||
func HTTPShowStatistics(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
updateStatsIfNeeded()
|
||||
|
|
|
@ -97,7 +97,7 @@ func (er *TExpectedBackendRequest) String() string {
|
|||
if MethodIsPost == "" {
|
||||
return er.Path
|
||||
}
|
||||
return fmt.Sprint("%s %s: %s", MethodIsPost, er.Path, er.PostForm.Encode())
|
||||
return fmt.Sprintf("%s %s: %s", MethodIsPost, er.Path, er.PostForm.Encode())
|
||||
}
|
||||
|
||||
type TBackendRequestChecker struct {
|
||||
|
@ -123,7 +123,7 @@ func (backend *TBackendRequestChecker) ServeHTTP(w http.ResponseWriter, r *http.
|
|||
|
||||
r.ParseForm()
|
||||
|
||||
unsealedForm, err := Backend.UnsealRequest(r.PostForm)
|
||||
unsealedForm, err := Backend.secureForm.Unseal(r.PostForm)
|
||||
if err != nil {
|
||||
backend.tb.Errorf("Failed to unseal backend request: %v", err)
|
||||
}
|
||||
|
@ -276,7 +276,7 @@ func TSealForSavePubMsg(tb testing.TB, cmd Command, channel string, arguments in
|
|||
}
|
||||
form.Set("time", strconv.FormatInt(time.Now().Unix(), 10))
|
||||
|
||||
sealed, err := Backend.SealRequest(form)
|
||||
sealed, err := Backend.secureForm.Seal(form)
|
||||
if err != nil {
|
||||
tb.Error(err)
|
||||
return nil, err
|
||||
|
@ -300,7 +300,7 @@ func TSealForUncachedPubMsg(tb testing.TB, cmd Command, channel string, argument
|
|||
form.Set("time", time.Now().Format(time.UnixDate))
|
||||
form.Set("scope", scope)
|
||||
|
||||
sealed, err := Backend.SealRequest(form)
|
||||
sealed, err := Backend.secureForm.Seal(form)
|
||||
if err != nil {
|
||||
tb.Error(err)
|
||||
return nil, err
|
||||
|
|
|
@ -64,6 +64,24 @@ type ClientMessage struct {
|
|||
origArguments string
|
||||
}
|
||||
|
||||
func (cm ClientMessage) Reply(cmd Command, args interface{}) ClientMessage {
|
||||
return ClientMessage{
|
||||
MessageID: cm.MessageID,
|
||||
Command: cmd,
|
||||
Arguments: args,
|
||||
}
|
||||
}
|
||||
|
||||
func (cm ClientMessage) ReplyJSON(cmd Command, argsJSON string) ClientMessage {
|
||||
n := ClientMessage{
|
||||
MessageID: cm.MessageID,
|
||||
Command: cmd,
|
||||
origArguments: argsJSON,
|
||||
}
|
||||
n.parseOrigArguments()
|
||||
return n
|
||||
}
|
||||
|
||||
type AuthInfo struct {
|
||||
// The client's claimed username on Twitch.
|
||||
TwitchUsername string
|
||||
|
|
|
@ -46,11 +46,6 @@ var uniqueCtrWritingToken chan usageToken
|
|||
|
||||
var CounterLocation *time.Location = time.FixedZone("UTC-5", int((time.Hour*-5)/time.Second))
|
||||
|
||||
func TruncateToMidnight(at time.Time) time.Time {
|
||||
year, month, day := at.Date()
|
||||
return time.Date(year, month, day, 0, 0, 0, 0, CounterLocation)
|
||||
}
|
||||
|
||||
// GetCounterPeriod calculates the start and end timestamps for the HLL measurement period that includes the 'at' timestamp.
|
||||
func GetCounterPeriod(at time.Time) (start time.Time, end time.Time) {
|
||||
year, month, day := at.Date()
|
||||
|
@ -132,7 +127,7 @@ func HTTPShowHLL(w http.ResponseWriter, r *http.Request) {
|
|||
hllFileServer.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func HTTPWriteHLL(w http.ResponseWriter, r *http.Request) {
|
||||
func HTTPWriteHLL(w http.ResponseWriter, _ *http.Request) {
|
||||
writeHLL()
|
||||
w.WriteHeader(200)
|
||||
w.Write([]byte("ok"))
|
||||
|
@ -150,9 +145,7 @@ func loadUniqueUsers() {
|
|||
now := time.Now().In(CounterLocation)
|
||||
uniqueCounter.Start, uniqueCounter.End = GetCounterPeriod(now)
|
||||
err = loadHLL(now, &uniqueCounter)
|
||||
isIgnorableError := err != nil && (false ||
|
||||
(os.IsNotExist(err)) ||
|
||||
(err == io.EOF))
|
||||
isIgnorableError := err != nil && (os.IsNotExist(err) || err == io.EOF)
|
||||
|
||||
if isIgnorableError {
|
||||
// file didn't finish writing
|
||||
|
@ -227,10 +220,10 @@ func rolloverCounters_do() {
|
|||
|
||||
// Attempt to rescue the data into the log
|
||||
var buf bytes.Buffer
|
||||
bytes, err := uniqueCounter.Counter.GobEncode()
|
||||
by, err := uniqueCounter.Counter.GobEncode()
|
||||
if err == nil {
|
||||
enc := base64.NewEncoder(base64.StdEncoding, &buf)
|
||||
enc.Write(bytes)
|
||||
enc.Write(by)
|
||||
enc.Close()
|
||||
log.Print("data for ", GetHLLFilename(uniqueCounter.Start), ":", buf.String())
|
||||
}
|
||||
|
|
|
@ -1,103 +1,9 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/crypto/nacl/box"
|
||||
)
|
||||
|
||||
func FillCryptoRandom(buf []byte) error {
|
||||
remaining := len(buf)
|
||||
for remaining > 0 {
|
||||
count, err := rand.Read(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
remaining -= count
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func copyString(s string) string {
|
||||
return string([]byte(s))
|
||||
}
|
||||
|
||||
func (backend *backendInfo) SealRequest(form url.Values) (url.Values, error) {
|
||||
var nonce [24]byte
|
||||
var err error
|
||||
|
||||
err = FillCryptoRandom(nonce[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cipherMsg := box.SealAfterPrecomputation(nil, []byte(form.Encode()), &nonce, &backend.sharedKey)
|
||||
|
||||
bufMessage := new(bytes.Buffer)
|
||||
enc := base64.NewEncoder(base64.URLEncoding, bufMessage)
|
||||
enc.Write(cipherMsg)
|
||||
enc.Close()
|
||||
cipherString := bufMessage.String()
|
||||
|
||||
bufNonce := new(bytes.Buffer)
|
||||
enc = base64.NewEncoder(base64.URLEncoding, bufNonce)
|
||||
enc.Write(nonce[:])
|
||||
enc.Close()
|
||||
nonceString := bufNonce.String()
|
||||
|
||||
retval := url.Values{
|
||||
"nonce": []string{nonceString},
|
||||
"msg": []string{cipherString},
|
||||
"id": []string{strconv.Itoa(Backend.serverID)},
|
||||
}
|
||||
|
||||
return retval, nil
|
||||
}
|
||||
|
||||
var ErrorShortNonce = errors.New("Nonce too short.")
|
||||
var ErrorInvalidSignature = errors.New("Invalid signature or contents")
|
||||
|
||||
func (backend *backendInfo) UnsealRequest(form url.Values) (url.Values, error) {
|
||||
var nonce [24]byte
|
||||
|
||||
nonceString := form.Get("nonce")
|
||||
dec := base64.NewDecoder(base64.URLEncoding, strings.NewReader(nonceString))
|
||||
count, err := dec.Read(nonce[:])
|
||||
if err != nil {
|
||||
Statistics.BackendVerifyFails++
|
||||
return nil, err
|
||||
}
|
||||
if count != 24 {
|
||||
Statistics.BackendVerifyFails++
|
||||
return nil, ErrorShortNonce
|
||||
}
|
||||
|
||||
cipherString := form.Get("msg")
|
||||
dec = base64.NewDecoder(base64.URLEncoding, strings.NewReader(cipherString))
|
||||
cipherBuffer := new(bytes.Buffer)
|
||||
cipherBuffer.ReadFrom(dec)
|
||||
|
||||
message, ok := box.OpenAfterPrecomputation(nil, cipherBuffer.Bytes(), &nonce, &backend.sharedKey)
|
||||
if !ok {
|
||||
Statistics.BackendVerifyFails++
|
||||
return nil, ErrorInvalidSignature
|
||||
}
|
||||
|
||||
retValues, err := url.ParseQuery(string(message))
|
||||
if err != nil {
|
||||
Statistics.BackendVerifyFails++
|
||||
return nil, ErrorInvalidSignature
|
||||
}
|
||||
|
||||
return retValues, nil
|
||||
}
|
||||
|
||||
func AddToSliceS(ary *[]string, val string) bool {
|
||||
slice := *ary
|
||||
for _, v := range slice {
|
||||
|
@ -161,17 +67,3 @@ func RemoveFromSliceCl(ary *[]*ClientInfo, val *ClientInfo) bool {
|
|||
*ary = slice
|
||||
return true
|
||||
}
|
||||
|
||||
func AddToSliceB(ary *[]bunchSubscriber, client *ClientInfo, mid int) bool {
|
||||
newSub := bunchSubscriber{Client: client, MessageID: mid}
|
||||
slice := *ary
|
||||
for _, v := range slice {
|
||||
if v == newSub {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
slice = append(slice, newSub)
|
||||
*ary = slice
|
||||
return true
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue