1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-08-09 15:50:53 +00:00

more golint & cleanup

This commit is contained in:
Kane York 2015-11-16 12:50:00 -08:00
parent 66cc124e37
commit cc3a160c29
10 changed files with 251 additions and 245 deletions

View file

@ -1,7 +1,7 @@
package main package main
import ( import (
"../../internal/server" "bitbucket.org/stendec/frankerfacez/socketserver/internal/server"
"fmt" "fmt"
"github.com/abiosoft/ishell" "github.com/abiosoft/ishell"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"

View file

@ -1,7 +1,7 @@
package main package main // import "bitbucket.org/stendec/frankerfacez/socketserver/cmd/ffzsocketserver"
import ( import (
"../../internal/server" "bitbucket.org/stendec/frankerfacez/socketserver/internal/server"
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"

View file

@ -190,6 +190,7 @@ func SendRemoteCommand(remoteCommand, data string, auth AuthInfo) (responseStr s
return return
} }
// SendAggregatedData sends aggregated emote usage and following data to the backend server.
func SendAggregatedData(sealedForm url.Values) error { func SendAggregatedData(sealedForm url.Values) error {
resp, err := backendHTTPClient.PostForm(postStatisticsURL, sealedForm) resp, err := backendHTTPClient.PostForm(postStatisticsURL, sealedForm)
if err != nil { if err != nil {
@ -203,6 +204,8 @@ func SendAggregatedData(sealedForm url.Values) error {
return resp.Body.Close() return resp.Body.Close()
} }
// FetchBacklogData makes a request to the backend for backlog data on a set of pub/sub topics.
// TODO scrap this, replaced by /cached_pub
func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) { func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) {
formData := url.Values{ formData := url.Values{
"subs": chatSubs, "subs": chatSubs,
@ -247,10 +250,18 @@ func (noe ErrBackendNotOK) Error() string {
return fmt.Sprintf("backend returned %d: %s", noe.Code, noe.Response) return fmt.Sprintf("backend returned %d: %s", noe.Code, noe.Response)
} }
// SendNewTopicNotice notifies the backend that a client has performed the first subscription to a pub/sub topic.
// POST data:
// channels=room.trihex
// added=t
func SendNewTopicNotice(topic string) error { func SendNewTopicNotice(topic string) error {
return sendTopicNotice(topic, true) return sendTopicNotice(topic, true)
} }
// SendCleanupTopicsNotice notifies the backend that pub/sub topics have no subscribers anymore.
// POST data:
// channels=room.sirstendec,room.bobross,feature.foo
// added=f
func SendCleanupTopicsNotice(topics []string) error { func SendCleanupTopicsNotice(topics []string) error {
return sendTopicNotice(strings.Join(topics, ","), false) return sendTopicNotice(strings.Join(topics, ","), false)
} }
@ -292,6 +303,7 @@ func httpError(statusCode int) error {
return fmt.Errorf("backend http error: %d", statusCode) return fmt.Errorf("backend http error: %d", statusCode)
} }
// GenerateKeys generates a new NaCl keypair for the server and writes out the default configuration file.
func GenerateKeys(outputFile, serverID, theirPublicStr string) { func GenerateKeys(outputFile, serverID, theirPublicStr string) {
var err error var err error
output := ConfigFile{ output := ConfigFile{

View file

@ -3,6 +3,7 @@ package server
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"log" "log"
@ -20,33 +21,34 @@ type Command string
type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error) type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMessage, error)
var commandHandlers = map[Command]CommandHandler{ var commandHandlers = map[Command]CommandHandler{
HelloCommand: HandleHello, HelloCommand: C2SHello,
"setuser": HandleSetUser, "setuser": C2SSetUser,
"ready": HandleReady, "ready": C2SReady,
"sub": HandleSub, "sub": C2SSubscribe,
"unsub": HandleUnsub, "unsub": C2SUnsubscribe,
"track_follow": HandleTrackFollow, "track_follow": C2STrackFollow,
"emoticon_uses": HandleEmoticonUses, "emoticon_uses": C2SEmoticonUses,
"survey": HandleSurvey, "survey": C2SSurvey,
"twitch_emote": HandleRemoteCommand, "twitch_emote": C2SHandleRemoteCommand,
"get_link": HandleBunchedRemoteCommand, "get_link": C2SHandleBunchedCommand,
"get_display_name": HandleBunchedRemoteCommand, "get_display_name": C2SHandleBunchedCommand,
"update_follow_buttons": HandleRemoteCommand, "update_follow_buttons": C2SHandleRemoteCommand,
"chat_history": HandleRemoteCommand, "chat_history": C2SHandleRemoteCommand,
} }
const ChannelInfoDelay = 2 * time.Second // DispatchC2SCommand handles a C2S Command in the provided ClientMessage.
// It calls the correct CommandHandler function, catching panics.
func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) { // It sends either the returned Reply ClientMessage, setting the correct messageID, or sends an ErrorCommand
func DispatchC2SCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) {
handler, ok := commandHandlers[msg.Command] handler, ok := commandHandlers[msg.Command]
if !ok { if !ok {
handler = HandleRemoteCommand handler = C2SHandleRemoteCommand
} }
response, err := CallHandler(handler, conn, client, msg) response, err := callHandler(handler, conn, client, msg)
if err == nil { if err == nil {
if response.Command == AsyncResponseCommand { if response.Command == AsyncResponseCommand {
@ -59,13 +61,29 @@ func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage)
} else { } else {
SendMessage(conn, ClientMessage{ SendMessage(conn, ClientMessage{
MessageID: msg.MessageID, MessageID: msg.MessageID,
Command: "error", Command: ErrorCommand,
Arguments: err.Error(), Arguments: err.Error(),
}) })
} }
} }
func HandleHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func callHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) {
defer func() {
if r := recover(); r != nil {
var ok bool
fmt.Print("[!] Error executing command", cmsg.Command, "--", r)
err, ok = r.(error)
if !ok {
err = fmt.Errorf("command handler: %v", r)
}
}
}()
return handler(conn, client, cmsg)
}
// C2SHello implements the `hello` C2S Command.
// It calls SubscribeGlobal() and SubscribeDefaults() with the client, and fills out ClientInfo.Version and ClientInfo.ClientID.
func C2SHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
version, clientID, err := msg.ArgumentsAsTwoStrings() version, clientID, err := msg.ArgumentsAsTwoStrings()
if err != nil { if err != nil {
return return
@ -85,7 +103,7 @@ func HandleHello(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (r
}, nil }, nil
} }
func HandleReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func C2SReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
disconnectAt, err := msg.ArgumentsAsInt() disconnectAt, err := msg.ArgumentsAsInt()
if err != nil { if err != nil {
return return
@ -115,7 +133,7 @@ func HandleReady(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (r
return ClientMessage{Command: AsyncResponseCommand}, nil return ClientMessage{Command: AsyncResponseCommand}, nil
} }
func HandleSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func C2SSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
username, err := msg.ArgumentsAsString() username, err := msg.ArgumentsAsString()
if err != nil { if err != nil {
return return
@ -137,7 +155,7 @@ func HandleSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage)
return ResponseSuccess, nil return ResponseSuccess, nil
} }
func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func C2SSubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
channel, err := msg.ArgumentsAsString() channel, err := msg.ArgumentsAsString()
if err != nil { if err != nil {
@ -145,18 +163,10 @@ func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rms
} }
client.Mutex.Lock() client.Mutex.Lock()
AddToSliceS(&client.CurrentChannels, channel) AddToSliceS(&client.CurrentChannels, channel)
client.PendingSubscriptionsBacklog = append(client.PendingSubscriptionsBacklog, channel) if usePendingSubscrptionsBacklog {
client.PendingSubscriptionsBacklog = append(client.PendingSubscriptionsBacklog, channel)
// if client.MakePendingRequests == nil { }
// client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client))
// } else {
// if !client.MakePendingRequests.Reset(ChannelInfoDelay) {
// client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client))
// }
// }
client.Mutex.Unlock() client.Mutex.Unlock()
SubscribeChannel(client, channel) SubscribeChannel(client, channel)
@ -164,7 +174,9 @@ func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rms
return ResponseSuccess, nil return ResponseSuccess, nil
} }
func HandleUnsub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { // C2SUnsubscribe implements the `unsub` C2S Command.
// It removes the channel from ClientInfo.CurrentChannels and calls UnsubscribeSingleChat.
func C2SUnsubscribe(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
channel, err := msg.ArgumentsAsString() channel, err := msg.ArgumentsAsString()
if err != nil { if err != nil {
@ -180,91 +192,57 @@ func HandleUnsub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (r
return ResponseSuccess, nil return ResponseSuccess, nil
} }
func GetSubscriptionBacklogFor(conn *websocket.Conn, client *ClientInfo) func() { // C2SSurvey implements the survey C2S Command.
return func() { // Surveys are discarded.s
GetSubscriptionBacklog(conn, client) func C2SSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
}
}
// On goroutine
func GetSubscriptionBacklog(conn *websocket.Conn, client *ClientInfo) {
var subs []string
// Lock, grab the data, and reset it
client.Mutex.Lock()
subs = client.PendingSubscriptionsBacklog
client.PendingSubscriptionsBacklog = nil
client.MakePendingRequests = nil
client.Mutex.Unlock()
if len(subs) == 0 {
return
}
if backendURL == "" {
return // for testing runs
}
messages, err := FetchBacklogData(subs)
if err != nil {
// Oh well.
log.Print("error in GetSubscriptionBacklog:", err)
return
}
// Deliver to client
client.MsgChannelKeepalive.Add(1)
if client.MessageChannel != nil {
for _, msg := range messages {
client.MessageChannel <- msg
}
}
client.MsgChannelKeepalive.Done()
}
func HandleSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
// Discard // Discard
return ResponseSuccess, nil return ResponseSuccess, nil
} }
type FollowEvent struct { type followEvent struct {
User string `json:"u"` User string `json:"u"`
Channel string `json:"c"` Channel string `json:"c"`
NowFollowing bool `json:"f"` NowFollowing bool `json:"f"`
Timestamp time.Time `json:"t"` Timestamp time.Time `json:"t"`
} }
var FollowEvents []FollowEvent var followEvents []followEvent
var FollowEventsLock sync.Mutex
func HandleTrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { // followEventsLock is the lock for followEvents.
var followEventsLock sync.Mutex
// C2STrackFollow implements the `track_follow` C2S Command.
// It adds the record to `followEvents`, which is submitted to the backend on a timer.
func C2STrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
channel, following, err := msg.ArgumentsAsStringAndBool() channel, following, err := msg.ArgumentsAsStringAndBool()
if err != nil { if err != nil {
return return
} }
now := time.Now() now := time.Now()
FollowEventsLock.Lock() followEventsLock.Lock()
FollowEvents = append(FollowEvents, FollowEvent{client.TwitchUsername, channel, following, now}) followEvents = append(followEvents, followEvent{client.TwitchUsername, channel, following, now})
FollowEventsLock.Unlock() followEventsLock.Unlock()
return ResponseSuccess, nil return ResponseSuccess, nil
} }
// AggregateEmoteUsage is a map from emoteID to a map from chatroom name to usage count. // AggregateEmoteUsage is a map from emoteID to a map from chatroom name to usage count.
var AggregateEmoteUsage map[int]map[string]int = make(map[int]map[string]int) var aggregateEmoteUsage = make(map[int]map[string]int)
// AggregateEmoteUsageLock is the lock for AggregateEmoteUsage. // AggregateEmoteUsageLock is the lock for AggregateEmoteUsage.
var AggregateEmoteUsageLock sync.Mutex var aggregateEmoteUsageLock sync.Mutex
// ErrNegativeEmoteUsage is emitted when the submitted emote usage is negative. // ErrNegativeEmoteUsage is emitted when the submitted emote usage is negative.
var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative") var ErrNegativeEmoteUsage = errors.New("Emote usage count cannot be negative")
func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { // C2SEmoticonUses implements the `emoticon_uses` C2S Command.
// arguments is [1]map[emoteID]map[ChatroomName]float64 // msg.Arguments are in the JSON format of [1]map[emoteID]map[ChatroomName]float64.
func C2SEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
// if this panics, will be caught by callHandler
mapRoot := msg.Arguments.([]interface{})[0].(map[string]interface{}) mapRoot := msg.Arguments.([]interface{})[0].(map[string]interface{})
// Validate: male suire
for strEmote, val1 := range mapRoot { for strEmote, val1 := range mapRoot {
_, err = strconv.Atoi(strEmote) _, err = strconv.Atoi(strEmote)
if err != nil { if err != nil {
@ -272,7 +250,7 @@ func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMess
} }
mapInner := val1.(map[string]interface{}) mapInner := val1.(map[string]interface{})
for _, val2 := range mapInner { for _, val2 := range mapInner {
var count int = int(val2.(float64)) var count = int(val2.(float64))
if count <= 0 { if count <= 0 {
err = ErrNegativeEmoteUsage err = ErrNegativeEmoteUsage
return return
@ -280,8 +258,8 @@ func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMess
} }
} }
AggregateEmoteUsageLock.Lock() aggregateEmoteUsageLock.Lock()
defer AggregateEmoteUsageLock.Unlock() defer aggregateEmoteUsageLock.Unlock()
for strEmote, val1 := range mapRoot { for strEmote, val1 := range mapRoot {
var emoteID int var emoteID int
@ -290,15 +268,15 @@ func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMess
return return
} }
destMapInner, ok := AggregateEmoteUsage[emoteID] destMapInner, ok := aggregateEmoteUsage[emoteID]
if !ok { if !ok {
destMapInner = make(map[string]int) destMapInner = make(map[string]int)
AggregateEmoteUsage[emoteID] = destMapInner aggregateEmoteUsage[emoteID] = destMapInner
} }
mapInner := val1.(map[string]interface{}) mapInner := val1.(map[string]interface{})
for roomName, val2 := range mapInner { for roomName, val2 := range mapInner {
var count int = int(val2.(float64)) var count = int(val2.(float64))
if count > 200 { if count > 200 {
count = 200 count = 200
} }
@ -309,22 +287,22 @@ func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMess
return ResponseSuccess, nil return ResponseSuccess, nil
} }
func sendAggregateData() { func aggregateDataSender() {
for { for {
time.Sleep(5 * time.Minute) time.Sleep(5 * time.Minute)
DoSendAggregateData() doSendAggregateData()
} }
} }
func DoSendAggregateData() { func doSendAggregateData() {
FollowEventsLock.Lock() followEventsLock.Lock()
follows := FollowEvents follows := followEvents
FollowEvents = nil followEvents = nil
FollowEventsLock.Unlock() followEventsLock.Unlock()
AggregateEmoteUsageLock.Lock() aggregateEmoteUsageLock.Lock()
emoteUsage := AggregateEmoteUsage emoteUsage := aggregateEmoteUsage
AggregateEmoteUsage = make(map[int]map[string]int) aggregateEmoteUsage = make(map[int]map[string]int)
AggregateEmoteUsageLock.Unlock() aggregateEmoteUsageLock.Unlock()
reportForm := url.Values{} reportForm := url.Values{}
@ -362,27 +340,23 @@ func DoSendAggregateData() {
// done // done
} }
type BunchedRequest struct { type bunchedRequest struct {
Command Command Command Command
Param string Param string
} }
func BunchedRequestFromCM(msg *ClientMessage) BunchedRequest { type cachedBunchedResponse struct {
return BunchedRequest{Command: msg.Command, Param: msg.origArguments}
}
type CachedBunchedResponse struct {
Response string Response string
Timestamp time.Time Timestamp time.Time
} }
type BunchSubscriber struct { type bunchSubscriber struct {
Client *ClientInfo Client *ClientInfo
MessageID int MessageID int
} }
type BunchSubscriberList struct { type bunchSubscriberList struct {
sync.Mutex sync.Mutex
Members []BunchSubscriber Members []bunchSubscriber
} }
type CacheStatus byte type CacheStatus byte
@ -393,50 +367,57 @@ const (
CacheStatusExpired CacheStatusExpired
) )
var PendingBunchedRequests map[BunchedRequest]*BunchSubscriberList = make(map[BunchedRequest]*BunchSubscriberList) var pendingBunchedRequests = make(map[bunchedRequest]*bunchSubscriberList)
var PendingBunchLock sync.Mutex var pendingBunchLock sync.Mutex
var BunchCache map[BunchedRequest]CachedBunchedResponse = make(map[BunchedRequest]CachedBunchedResponse) var bunchCache = make(map[bunchedRequest]cachedBunchedResponse)
var BunchCacheLock sync.RWMutex var bunchCacheLock sync.RWMutex
var BunchCacheCleanupSignal *sync.Cond = sync.NewCond(&BunchCacheLock) var bunchCacheCleanupSignal = sync.NewCond(&bunchCacheLock)
var BunchCacheLastCleanup time.Time var bunchCacheLastCleanup time.Time
func bunchedRequestFromCM(msg *ClientMessage) bunchedRequest {
return bunchedRequest{Command: msg.Command, Param: msg.origArguments}
}
func bunchCacheJanitor() { func bunchCacheJanitor() {
go func() { go func() {
for { for {
time.Sleep(30 * time.Minute) time.Sleep(30 * time.Minute)
BunchCacheCleanupSignal.Signal() bunchCacheCleanupSignal.Signal()
} }
}() }()
BunchCacheLock.Lock() bunchCacheLock.Lock()
for { for {
// Unlocks CachedBunchLock, waits for signal, re-locks // Unlocks CachedBunchLock, waits for signal, re-locks
BunchCacheCleanupSignal.Wait() bunchCacheCleanupSignal.Wait()
if BunchCacheLastCleanup.After(time.Now().Add(-1 * time.Second)) { if bunchCacheLastCleanup.After(time.Now().Add(-1 * time.Second)) {
// skip if it's been less than 1 second // skip if it's been less than 1 second
continue continue
} }
// CachedBunchLock is held here // CachedBunchLock is held here
keepIfAfter := time.Now().Add(-5 * time.Minute) keepIfAfter := time.Now().Add(-5 * time.Minute)
for req, resp := range BunchCache { for req, resp := range bunchCache {
if !resp.Timestamp.After(keepIfAfter) { if !resp.Timestamp.After(keepIfAfter) {
delete(BunchCache, req) delete(bunchCache, req)
} }
} }
BunchCacheLastCleanup = time.Now() bunchCacheLastCleanup = time.Now()
// Loop and Wait(), which re-locks // Loop and Wait(), which re-locks
} }
} }
func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { // C2SHandleBunchedCommand handles C2S Commands such as `get_link`.
br := BunchedRequestFromCM(&msg) // It makes a request to the backend server for the data, but any other requests coming in while the first is pending also get the responses from the first one.
// Additionally, results are cached.
func C2SHandleBunchedCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
br := bunchedRequestFromCM(&msg)
cacheStatus := func() byte { cacheStatus := func() byte {
BunchCacheLock.RLock() bunchCacheLock.RLock()
defer BunchCacheLock.RUnlock() defer bunchCacheLock.RUnlock()
bresp, ok := BunchCache[br] bresp, ok := bunchCache[br]
if ok && bresp.Timestamp.After(time.Now().Add(-5*time.Minute)) { if ok && bresp.Timestamp.After(time.Now().Add(-5*time.Minute)) {
client.MsgChannelKeepalive.Add(1) client.MsgChannelKeepalive.Add(1)
go func() { go func() {
@ -459,12 +440,12 @@ func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg Cl
return ClientMessage{Command: AsyncResponseCommand}, nil return ClientMessage{Command: AsyncResponseCommand}, nil
} else if cacheStatus == CacheStatusExpired { } else if cacheStatus == CacheStatusExpired {
// Wake up the lazy janitor // Wake up the lazy janitor
BunchCacheCleanupSignal.Signal() bunchCacheCleanupSignal.Signal()
} }
PendingBunchLock.Lock() pendingBunchLock.Lock()
defer PendingBunchLock.Unlock() defer pendingBunchLock.Unlock()
list, ok := PendingBunchedRequests[br] list, ok := pendingBunchedRequests[br]
if ok { if ok {
list.Lock() list.Lock()
AddToSliceB(&list.Members, client, msg.MessageID) AddToSliceB(&list.Members, client, msg.MessageID)
@ -473,9 +454,9 @@ func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg Cl
return ClientMessage{Command: AsyncResponseCommand}, nil return ClientMessage{Command: AsyncResponseCommand}, nil
} }
PendingBunchedRequests[br] = &BunchSubscriberList{Members: []BunchSubscriber{{Client: client, MessageID: msg.MessageID}}} pendingBunchedRequests[br] = &bunchSubscriberList{Members: []bunchSubscriber{{Client: client, MessageID: msg.MessageID}}}
go func(request BunchedRequest) { go func(request bunchedRequest) {
respStr, err := SendRemoteCommandCached(string(request.Command), request.Param, AuthInfo{}) respStr, err := SendRemoteCommandCached(string(request.Command), request.Param, AuthInfo{})
var msg ClientMessage var msg ClientMessage
@ -489,15 +470,15 @@ func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg Cl
} }
if err == nil { if err == nil {
BunchCacheLock.Lock() bunchCacheLock.Lock()
BunchCache[request] = CachedBunchedResponse{Response: respStr, Timestamp: time.Now()} bunchCache[request] = cachedBunchedResponse{Response: respStr, Timestamp: time.Now()}
BunchCacheLock.Unlock() bunchCacheLock.Unlock()
} }
PendingBunchLock.Lock() pendingBunchLock.Lock()
bsl := PendingBunchedRequests[request] bsl := pendingBunchedRequests[request]
delete(PendingBunchedRequests, request) delete(pendingBunchedRequests, request)
PendingBunchLock.Unlock() pendingBunchLock.Unlock()
bsl.Lock() bsl.Lock()
for _, member := range bsl.Members { for _, member := range bsl.Members {
@ -513,7 +494,7 @@ func HandleBunchedRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg Cl
return ClientMessage{Command: AsyncResponseCommand}, nil return ClientMessage{Command: AsyncResponseCommand}, nil
} }
func HandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func C2SHandleRemoteCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
client.MsgChannelKeepalive.Add(1) client.MsgChannelKeepalive.Add(1)
go doRemoteCommand(conn, msg, client) go doRemoteCommand(conn, msg, client)

View file

@ -59,8 +59,8 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) {
} }
BannerHTML = bannerBytes BannerHTML = bannerBytes
serveMux.HandleFunc("/", ServeWebsocketOrCatbag) serveMux.HandleFunc("/", HTTPHandleRootURL)
serveMux.HandleFunc("/drop_backlog", HBackendDropBacklog) serveMux.HandleFunc("/drop_backlog", HTTPBackendDropBacklog)
serveMux.HandleFunc("/uncached_pub", HBackendPublishRequest) serveMux.HandleFunc("/uncached_pub", HBackendPublishRequest)
serveMux.HandleFunc("/cached_pub", HBackendUpdateAndPublish) serveMux.HandleFunc("/cached_pub", HBackendUpdateAndPublish)
@ -81,7 +81,7 @@ func SetupServerAndHandle(config *ConfigFile, serveMux *http.ServeMux) {
go backlogJanitor() go backlogJanitor()
go bunchCacheJanitor() go bunchCacheJanitor()
go pubsubJanitor() go pubsubJanitor()
go sendAggregateData() go aggregateDataSender()
go ircConnection() go ircConnection()
} }
@ -99,14 +99,16 @@ var SocketUpgrader = websocket.Upgrader{
// Memes go here. // Memes go here.
var BannerHTML []byte var BannerHTML []byte
func ServeWebsocketOrCatbag(w http.ResponseWriter, r *http.Request) { // HTTPHandleRootURL is the http.HandleFunc for requests on `/`.
// It either uses the SocketUpgrader or writes out the BannerHTML.
func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Connection") == "Upgrade" { if r.Header.Get("Connection") == "Upgrade" {
conn, err := SocketUpgrader.Upgrade(w, r, nil) conn, err := SocketUpgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
fmt.Fprintf(w, "error: %v", err) fmt.Fprintf(w, "error: %v", err)
return return
} }
HandleSocketConnection(conn) RunSocketConnection(conn)
return return
} else { } else {
@ -114,26 +116,46 @@ func ServeWebsocketOrCatbag(w http.ResponseWriter, r *http.Request) {
} }
} }
// Errors that get returned to the client. // ErrProtocolGeneric is sent in a ErrorCommand Reply.
var ProtocolError error = errors.New("FFZ Socket protocol error.") var ErrProtocolGeneric error = errors.New("FFZ Socket protocol error.")
var ProtocolErrorNegativeID error = errors.New("FFZ Socket protocol error: negative or zero message ID.") // ErrProtocolNegativeMsgID is sent in a ErrorCommand Reply when a negative MessageID is received.
var ExpectedSingleString = errors.New("Error: Expected single string as arguments.") var ErrProtocolNegativeMsgID error = errors.New("FFZ Socket protocol error: negative or zero message ID.")
var ExpectedSingleInt = errors.New("Error: Expected single integer as arguments.") // ErrExpectedSingleString is sent in a ErrorCommand Reply when the Arguments are of the wrong type.
var ExpectedTwoStrings = errors.New("Error: Expected array of string, string as arguments.") var ErrExpectedSingleString = errors.New("Error: Expected single string as arguments.")
var ExpectedStringAndInt = errors.New("Error: Expected array of string, int as arguments.") // ErrExpectedSingleInt is sent in a ErrorCommand Reply when the Arguments are of the wrong type.
var ExpectedStringAndBool = errors.New("Error: Expected array of string, bool as arguments.") var ErrExpectedSingleInt = errors.New("Error: Expected single integer as arguments.")
var ExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.") // ErrExpectedTwoStrings is sent in a ErrorCommand Reply when the Arguments are of the wrong type.
var ErrExpectedTwoStrings = errors.New("Error: Expected array of string, string as arguments.")
// ErrExpectedStringAndBool is sent in a ErrorCommand Reply when the Arguments are of the wrong type.
var ErrExpectedStringAndBool = errors.New("Error: Expected array of string, bool as arguments.")
// ErrExpectedStringAndInt is sent in a ErrorCommand Reply when the Arguments are of the wrong type.
var ErrExpectedStringAndInt = errors.New("Error: Expected array of string, int as arguments.")
// ErrExpectedStringAndIntGotFloat is sent in a ErrorCommand Reply when the Arguments are of the wrong type.
var ErrExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.")
// CloseGotBinaryMessage is the termination reason when the client sends a binary websocket frame.
var CloseGotBinaryMessage = websocket.CloseError{Code: websocket.CloseUnsupportedData, Text: "got binary packet"} var CloseGotBinaryMessage = websocket.CloseError{Code: websocket.CloseUnsupportedData, Text: "got binary packet"}
// CloseTimedOut is the termination reason when the client fails to send or respond to ping frames.
var CloseTimedOut = websocket.CloseError{Code: websocket.CloseNoStatusReceived, Text: "no ping replies for 5 minutes"} var CloseTimedOut = websocket.CloseError{Code: websocket.CloseNoStatusReceived, Text: "no ping replies for 5 minutes"}
// CloseFirstMessageNotHello is the termination reason
var CloseFirstMessageNotHello = websocket.CloseError{ var CloseFirstMessageNotHello = websocket.CloseError{
Text: "Error - the first message sent must be a 'hello'", Text: "Error - the first message sent must be a 'hello'",
Code: websocket.ClosePolicyViolation, Code: websocket.ClosePolicyViolation,
} }
// Handle a new websocket connection from a FFZ client. // RunSocketConnection contains the main run loop of a websocket connection.
// This runs in a goroutine started by net/http.
func HandleSocketConnection(conn *websocket.Conn) { // First, it sets up the channels, the ClientInfo object, and the pong frame handler.
// It starts the reader goroutine pointing at the newly created channels.
// The function then enters the run loop (a `for{select{}}`).
// The run loop is broken when an object is received on errorChan, or if `hello` is not the first C2S Command.
// After the run loop stops, the function launches a goroutine to drain
// client.MessageChannel, signals the reader goroutine to stop, unsubscribes
// from all pub/sub channels, waits on MsgChannelKeepalive (remember, the
// messages are being drained), and finally closes client.MessageChannel
// (which ends the drainer goroutine).
func RunSocketConnection(conn *websocket.Conn) {
// websocket.Conn is a ReadWriteCloser // websocket.Conn is a ReadWriteCloser
log.Println("Got socket connection from", conn.RemoteAddr()) log.Println("Got socket connection from", conn.RemoteAddr())
@ -201,7 +223,9 @@ func HandleSocketConnection(conn *websocket.Conn) {
}(_errorChan, _clientChan, stoppedChan) }(_errorChan, _clientChan, stoppedChan)
conn.SetPongHandler(func(pongBody string) error { conn.SetPongHandler(func(pongBody string) error {
client.Mutex.Lock()
client.pingCount = 0 client.pingCount = 0
client.Mutex.Unlock()
return nil return nil
}) })
@ -236,14 +260,17 @@ RunLoop:
break RunLoop break RunLoop
} }
HandleCommand(conn, &client, msg) DispatchC2SCommand(conn, &client, msg)
case smsg := <-serverMessageChan: case msg := <-serverMessageChan:
SendMessage(conn, smsg) SendMessage(conn, msg)
case <-time.After(1 * time.Minute): case <-time.After(1 * time.Minute):
client.Mutex.Lock()
client.pingCount++ client.pingCount++
if client.pingCount == 5 { tooManyPings := client.pingCount == 5
client.Mutex.Unlock()
if tooManyPings {
CloseConnection(conn, &CloseTimedOut) CloseConnection(conn, &CloseTimedOut)
break RunLoop break RunLoop
} else { } else {
@ -280,20 +307,6 @@ func getDeadline() time.Time {
return time.Now().Add(1 * time.Minute) return time.Now().Add(1 * time.Minute)
} }
func CallHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) {
defer func() {
if r := recover(); r != nil {
var ok bool
fmt.Print("[!] Error executing command", cmsg.Command, "--", r)
err, ok = r.(error)
if !ok {
err = fmt.Errorf("command handler: %v", r)
}
}
}()
return handler(conn, client, cmsg)
}
func CloseConnection(conn *websocket.Conn, closeMsg *websocket.CloseError) { func CloseConnection(conn *websocket.Conn, closeMsg *websocket.CloseError) {
if closeMsg != &CloseFirstMessageNotHello { if closeMsg != &CloseFirstMessageNotHello {
log.Println("Terminating connection with", conn.RemoteAddr(), "-", closeMsg.Text) log.Println("Terminating connection with", conn.RemoteAddr(), "-", closeMsg.Text)
@ -323,11 +336,11 @@ func UnmarshalClientMessage(data []byte, payloadType int, v interface{}) (err er
// Message ID // Message ID
spaceIdx = strings.IndexRune(dataStr, ' ') spaceIdx = strings.IndexRune(dataStr, ' ')
if spaceIdx == -1 { if spaceIdx == -1 {
return ProtocolError return ErrProtocolGeneric
} }
messageID, err := strconv.Atoi(dataStr[:spaceIdx]) messageID, err := strconv.Atoi(dataStr[:spaceIdx])
if messageID < -1 || messageID == 0 { if messageID < -1 || messageID == 0 {
return ProtocolErrorNegativeID return ErrProtocolNegativeMsgID
} }
out.MessageID = messageID out.MessageID = messageID
@ -397,23 +410,12 @@ func MarshalClientMessage(clientMessage interface{}) (payloadType int, data []by
return websocket.TextMessage, []byte(dataStr), nil return websocket.TextMessage, []byte(dataStr), nil
} }
// Command handlers should use this to construct responses.
func SuccessMessageFromString(arguments string) ClientMessage {
cm := ClientMessage{
MessageID: -1, // filled by the select loop
Command: SuccessCommand,
origArguments: arguments,
}
cm.parseOrigArguments()
return cm
}
// Convenience method: Parse the arguments of the ClientMessage as a single string. // Convenience method: Parse the arguments of the ClientMessage as a single string.
func (cm *ClientMessage) ArgumentsAsString() (string1 string, err error) { func (cm *ClientMessage) ArgumentsAsString() (string1 string, err error) {
var ok bool var ok bool
string1, ok = cm.Arguments.(string) string1, ok = cm.Arguments.(string)
if !ok { if !ok {
err = ExpectedSingleString err = ErrExpectedSingleString
return return
} else { } else {
return string1, nil return string1, nil
@ -426,7 +428,7 @@ func (cm *ClientMessage) ArgumentsAsInt() (int1 int64, err error) {
var num float64 var num float64
num, ok = cm.Arguments.(float64) num, ok = cm.Arguments.(float64)
if !ok { if !ok {
err = ExpectedSingleInt err = ErrExpectedSingleInt
return return
} else { } else {
int1 = int64(num) int1 = int64(num)
@ -440,16 +442,16 @@ func (cm *ClientMessage) ArgumentsAsTwoStrings() (string1, string2 string, err e
var ary []interface{} var ary []interface{}
ary, ok = cm.Arguments.([]interface{}) ary, ok = cm.Arguments.([]interface{})
if !ok { if !ok {
err = ExpectedTwoStrings err = ErrExpectedTwoStrings
return return
} else { } else {
if len(ary) != 2 { if len(ary) != 2 {
err = ExpectedTwoStrings err = ErrExpectedTwoStrings
return return
} }
string1, ok = ary[0].(string) string1, ok = ary[0].(string)
if !ok { if !ok {
err = ExpectedTwoStrings err = ErrExpectedTwoStrings
return return
} }
// clientID can be null // clientID can be null
@ -458,7 +460,7 @@ func (cm *ClientMessage) ArgumentsAsTwoStrings() (string1, string2 string, err e
} }
string2, ok = ary[1].(string) string2, ok = ary[1].(string)
if !ok { if !ok {
err = ExpectedTwoStrings err = ErrExpectedTwoStrings
return return
} }
return string1, string2, nil return string1, string2, nil
@ -471,27 +473,27 @@ func (cm *ClientMessage) ArgumentsAsStringAndInt() (string1 string, int int64, e
var ary []interface{} var ary []interface{}
ary, ok = cm.Arguments.([]interface{}) ary, ok = cm.Arguments.([]interface{})
if !ok { if !ok {
err = ExpectedStringAndInt err = ErrExpectedStringAndInt
return return
} else { } else {
if len(ary) != 2 { if len(ary) != 2 {
err = ExpectedStringAndInt err = ErrExpectedStringAndInt
return return
} }
string1, ok = ary[0].(string) string1, ok = ary[0].(string)
if !ok { if !ok {
err = ExpectedStringAndInt err = ErrExpectedStringAndInt
return return
} }
var num float64 var num float64
num, ok = ary[1].(float64) num, ok = ary[1].(float64)
if !ok { if !ok {
err = ExpectedStringAndInt err = ErrExpectedStringAndInt
return return
} }
int = int64(num) int = int64(num)
if float64(int) != num { if float64(int) != num {
err = ExpectedStringAndIntGotFloat err = ErrExpectedStringAndIntGotFloat
return return
} }
return string1, int, nil return string1, int, nil
@ -504,21 +506,21 @@ func (cm *ClientMessage) ArgumentsAsStringAndBool() (str string, flag bool, err
var ary []interface{} var ary []interface{}
ary, ok = cm.Arguments.([]interface{}) ary, ok = cm.Arguments.([]interface{})
if !ok { if !ok {
err = ExpectedStringAndBool err = ErrExpectedStringAndBool
return return
} else { } else {
if len(ary) != 2 { if len(ary) != 2 {
err = ExpectedStringAndBool err = ErrExpectedStringAndBool
return return
} }
str, ok = ary[0].(string) str, ok = ary[0].(string)
if !ok { if !ok {
err = ExpectedStringAndBool err = ErrExpectedStringAndBool
return return
} }
flag, ok = ary[1].(bool) flag, ok = ary[1].(bool)
if !ok { if !ok {
err = ExpectedStringAndBool err = ErrExpectedStringAndBool
return return
} }
return str, flag, nil return str, flag, nil

View file

@ -117,7 +117,8 @@ var CachedGlobalMessages []TimestampedGlobalMessage
var CachedChannelMessages []TimestampedMultichatMessage var CachedChannelMessages []TimestampedMultichatMessage
var CacheListsLock sync.RWMutex var CacheListsLock sync.RWMutex
func DumpCache() { // DumpBacklogData drops all /cached_pub data.
func DumpBacklogData() {
CachedLSMLock.Lock() CachedLSMLock.Lock()
CachedLastMessages = make(map[Command]map[string]LastSavedMessage) CachedLastMessages = make(map[Command]map[string]LastSavedMessage)
CachedLSMLock.Unlock() CachedLSMLock.Unlock()
@ -132,6 +133,9 @@ func DumpCache() {
CacheListsLock.Unlock() CacheListsLock.Unlock()
} }
// SendBacklogForNewClient sends any backlog data relevant to a new client.
// This should be done when the client sends a `ready` message.
// This will only send data for CacheTypePersistent and CacheTypeLastOnly because those do not involve timestamps.
func SendBacklogForNewClient(client *ClientInfo) { func SendBacklogForNewClient(client *ClientInfo) {
client.Mutex.Lock() // reading CurrentChannels client.Mutex.Lock() // reading CurrentChannels
PersistentLSMLock.RLock() PersistentLSMLock.RLock()
@ -170,11 +174,13 @@ func SendBacklogForNewClient(client *ClientInfo) {
client.Mutex.Unlock() client.Mutex.Unlock()
} }
// SendTimedBacklogMessages sends any once-off messages that the client may have missed while it was disconnected.
// Effectively, this can only process CacheTypeTimestamps.
func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) { func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) {
client.Mutex.Lock() // reading CurrentChannels client.Mutex.Lock() // reading CurrentChannels
CacheListsLock.RLock() CacheListsLock.RLock()
globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime) globIdx := findFirstNewMessage(tgmarray(CachedGlobalMessages), disconnectTime)
if globIdx != -1 { if globIdx != -1 {
for i := globIdx; i < len(CachedGlobalMessages); i++ { for i := globIdx; i < len(CachedGlobalMessages); i++ {
@ -185,7 +191,7 @@ func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) {
} }
} }
chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime) chanIdx := findFirstNewMessage(tmmarray(CachedChannelMessages), disconnectTime)
if chanIdx != -1 { if chanIdx != -1 {
for i := chanIdx; i < len(CachedChannelMessages); i++ { for i := chanIdx; i < len(CachedChannelMessages); i++ {
@ -217,20 +223,20 @@ func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) {
func backlogJanitor() { func backlogJanitor() {
for { for {
time.Sleep(1 * time.Hour) time.Sleep(1 * time.Hour)
CleanupTimedBacklogMessages() cleanupTimedBacklogMessages()
} }
} }
func CleanupTimedBacklogMessages() { func cleanupTimedBacklogMessages() {
CacheListsLock.Lock() CacheListsLock.Lock()
oneHourAgo := time.Now().Add(-24 * time.Hour) oneHourAgo := time.Now().Add(-24 * time.Hour)
globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), oneHourAgo) globIdx := findFirstNewMessage(tgmarray(CachedGlobalMessages), oneHourAgo)
if globIdx != -1 { if globIdx != -1 {
newGlobMsgs := make([]TimestampedGlobalMessage, len(CachedGlobalMessages)-globIdx) newGlobMsgs := make([]TimestampedGlobalMessage, len(CachedGlobalMessages)-globIdx)
copy(newGlobMsgs, CachedGlobalMessages[globIdx:]) copy(newGlobMsgs, CachedGlobalMessages[globIdx:])
CachedGlobalMessages = newGlobMsgs CachedGlobalMessages = newGlobMsgs
} }
chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), oneHourAgo) chanIdx := findFirstNewMessage(tmmarray(CachedChannelMessages), oneHourAgo)
if chanIdx != -1 { if chanIdx != -1 {
newChanMsgs := make([]TimestampedMultichatMessage, len(CachedChannelMessages)-chanIdx) newChanMsgs := make([]TimestampedMultichatMessage, len(CachedChannelMessages)-chanIdx)
copy(newChanMsgs, CachedChannelMessages[chanIdx:]) copy(newChanMsgs, CachedChannelMessages[chanIdx:])
@ -239,7 +245,10 @@ func CleanupTimedBacklogMessages() {
CacheListsLock.Unlock() CacheListsLock.Unlock()
} }
func InsertionSort(ary sort.Interface) { // insertionSort implements insertion sort.
// CacheTypeTimestamps should use insertion sort for O(N) average performance.
// (The average case is the array is still sorted after insertion of the new item.)
func insertionSort(ary sort.Interface) {
for i := 1; i < ary.Len(); i++ { for i := 1; i < ary.Len(); i++ {
for j := i; j > 0 && ary.Less(j, j-1); j-- { for j := i; j > 0 && ary.Less(j, j-1); j-- {
ary.Swap(j, j-1) ary.Swap(j, j-1)
@ -247,13 +256,12 @@ func InsertionSort(ary sort.Interface) {
} }
} }
type TimestampArray interface { type timestampArray interface {
Len() int Len() int
GetTime(int) time.Time GetTime(int) time.Time
} }
func FindFirstNewMessage(ary TimestampArray, disconnectTime time.Time) (idx int) { func findFirstNewMessage(ary timestampArray, disconnectTime time.Time) (idx int) {
// TODO needs tests
len := ary.Len() len := ary.Len()
i := len i := len
@ -304,14 +312,14 @@ func SaveLastMessage(which map[Command]map[string]LastSavedMessage, locker sync.
func SaveGlobalMessage(cmd Command, timestamp time.Time, data string) { func SaveGlobalMessage(cmd Command, timestamp time.Time, data string) {
CacheListsLock.Lock() CacheListsLock.Lock()
CachedGlobalMessages = append(CachedGlobalMessages, TimestampedGlobalMessage{timestamp, cmd, data}) CachedGlobalMessages = append(CachedGlobalMessages, TimestampedGlobalMessage{timestamp, cmd, data})
InsertionSort(tgmarray(CachedGlobalMessages)) insertionSort(tgmarray(CachedGlobalMessages))
CacheListsLock.Unlock() CacheListsLock.Unlock()
} }
func SaveMultichanMessage(cmd Command, channels string, timestamp time.Time, data string) { func SaveMultichanMessage(cmd Command, channels string, timestamp time.Time, data string) {
CacheListsLock.Lock() CacheListsLock.Lock()
CachedChannelMessages = append(CachedChannelMessages, TimestampedMultichatMessage{timestamp, strings.Split(channels, ","), cmd, data}) CachedChannelMessages = append(CachedChannelMessages, TimestampedMultichatMessage{timestamp, strings.Split(channels, ","), cmd, data})
InsertionSort(tmmarray(CachedChannelMessages)) insertionSort(tmmarray(CachedChannelMessages))
CacheListsLock.Unlock() CacheListsLock.Unlock()
} }
@ -325,7 +333,7 @@ func GetCommandsOfType(match PushCommandCacheInfo) []Command {
return ret return ret
} }
func HBackendDropBacklog(w http.ResponseWriter, r *http.Request) { func HTTPBackendDropBacklog(w http.ResponseWriter, r *http.Request) {
r.ParseForm() r.ParseForm()
formData, err := UnsealRequest(r.Form) formData, err := UnsealRequest(r.Form)
if err != nil { if err != nil {
@ -336,7 +344,7 @@ func HBackendDropBacklog(w http.ResponseWriter, r *http.Request) {
confirm := formData.Get("confirm") confirm := formData.Get("confirm")
if confirm == "1" { if confirm == "1" {
DumpCache() DumpBacklogData()
} }
} }

View file

@ -11,7 +11,7 @@ func TestCleanupBacklogMessages(t *testing.T) {
func TestFindFirstNewMessageEmpty(t *testing.T) { func TestFindFirstNewMessageEmpty(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{} CachedGlobalMessages = []TimestampedGlobalMessage{}
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != -1 { if i != -1 {
t.Errorf("Expected -1, got %d", i) t.Errorf("Expected -1, got %d", i)
} }
@ -20,7 +20,7 @@ func TestFindFirstNewMessageOneBefore(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{ CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(8, 0)}, {Timestamp: time.Unix(8, 0)},
} }
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != -1 { if i != -1 {
t.Errorf("Expected -1, got %d", i) t.Errorf("Expected -1, got %d", i)
} }
@ -33,7 +33,7 @@ func TestFindFirstNewMessageSeveralBefore(t *testing.T) {
{Timestamp: time.Unix(4, 0)}, {Timestamp: time.Unix(4, 0)},
{Timestamp: time.Unix(5, 0)}, {Timestamp: time.Unix(5, 0)},
} }
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != -1 { if i != -1 {
t.Errorf("Expected -1, got %d", i) t.Errorf("Expected -1, got %d", i)
} }
@ -51,7 +51,7 @@ func TestFindFirstNewMessageInMiddle(t *testing.T) {
{Timestamp: time.Unix(14, 0)}, {Timestamp: time.Unix(14, 0)},
{Timestamp: time.Unix(15, 0)}, {Timestamp: time.Unix(15, 0)},
} }
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != 5 { if i != 5 {
t.Errorf("Expected 5, got %d", i) t.Errorf("Expected 5, got %d", i)
} }
@ -60,7 +60,7 @@ func TestFindFirstNewMessageOneAfter(t *testing.T) {
CachedGlobalMessages = []TimestampedGlobalMessage{ CachedGlobalMessages = []TimestampedGlobalMessage{
{Timestamp: time.Unix(15, 0)}, {Timestamp: time.Unix(15, 0)},
} }
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != 0 { if i != 0 {
t.Errorf("Expected 0, got %d", i) t.Errorf("Expected 0, got %d", i)
} }
@ -73,7 +73,7 @@ func TestFindFirstNewMessageSeveralAfter(t *testing.T) {
{Timestamp: time.Unix(14, 0)}, {Timestamp: time.Unix(14, 0)},
{Timestamp: time.Unix(15, 0)}, {Timestamp: time.Unix(15, 0)},
} }
i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) i := findFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0))
if i != 0 { if i != 0 {
t.Errorf("Expected 0, got %d", i) t.Errorf("Expected 0, got %d", i)
} }

View file

@ -134,7 +134,7 @@ func TGetUrls(testserver *httptest.Server) TURLs {
} }
func TSetup(testserver **httptest.Server, urls *TURLs) { func TSetup(testserver **httptest.Server, urls *TURLs) {
DumpCache() DumpBacklogData()
conf := &ConfigFile{ conf := &ConfigFile{
ServerID: 20, ServerID: 20,

View file

@ -98,10 +98,13 @@ type ClientInfo struct {
// Take out an Add() on this during a command if you need to use the MessageChannel later. // Take out an Add() on this during a command if you need to use the MessageChannel later.
MsgChannelKeepalive sync.WaitGroup MsgChannelKeepalive sync.WaitGroup
// The number of pings sent without a response // The number of pings sent without a response.
// Protected by Mutex
pingCount int pingCount int
} }
const usePendingSubscrptionsBacklog = false
type tgmarray []TimestampedGlobalMessage type tgmarray []TimestampedGlobalMessage
type tmmarray []TimestampedMultichatMessage type tmmarray []TimestampedMultichatMessage
@ -178,9 +181,9 @@ func (bct *BacklogCacheType) UnmarshalJSON(data []byte) error {
*bct = CacheTypeInvalid *bct = CacheTypeInvalid
return nil return nil
} }
val := BacklogCacheTypeByName(str) newBct := BacklogCacheTypeByName(str)
if val != CacheTypeInvalid { if newBct != CacheTypeInvalid {
*bct = val *bct = newBct
return nil return nil
} }
return ErrorUnrecognizedCacheType return ErrorUnrecognizedCacheType
@ -234,9 +237,9 @@ func (mtt *MessageTargetType) UnmarshalJSON(data []byte) error {
*mtt = MsgTargetTypeInvalid *mtt = MsgTargetTypeInvalid
return nil return nil
} }
mtt := MessageTargetTypeByName(str) newMtt := MessageTargetTypeByName(str)
if mtt != MsgTargetTypeInvalid { if newMtt != MsgTargetTypeInvalid {
*mtt = mtt *mtt = newMtt
return nil return nil
} }
return ErrorUnrecognizedTargetType return ErrorUnrecognizedTargetType

View file

@ -160,8 +160,8 @@ func RemoveFromSliceC(ary *[]chan<- ClientMessage, val chan<- ClientMessage) boo
return true return true
} }
func AddToSliceB(ary *[]BunchSubscriber, client *ClientInfo, mid int) bool { func AddToSliceB(ary *[]bunchSubscriber, client *ClientInfo, mid int) bool {
newSub := BunchSubscriber{Client: client, MessageID: mid} newSub := bunchSubscriber{Client: client, MessageID: mid}
slice := *ary slice := *ary
for _, v := range slice { for _, v := range slice {
if v == newSub { if v == newSub {