1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-06-29 07:45:33 +00:00

Work on pub/sub and peer cert setup

This commit is contained in:
Kane York 2015-10-25 03:21:50 -07:00
parent d4afc3c4c7
commit 401f66f15b
7 changed files with 478 additions and 48 deletions

View file

@ -9,8 +9,13 @@ import (
var origin *string = flag.String("origin", "localhost:8001", "Client-visible origin of the socket server") var origin *string = flag.String("origin", "localhost:8001", "Client-visible origin of the socket server")
var bindAddress *string = flag.String("listen", "", "Address to bind to, if different from origin") var bindAddress *string = flag.String("listen", "", "Address to bind to, if different from origin")
var certificateFile *string = flag.String("crt", "", "SSL certificate file") var usessl *bool = flag.Bool("ssl", false, "Enable the use of SSL for connecting clients and backend connections")
var privateKeyFile *string = flag.String("key", "", "SSL private key file") var certificateFile *string = flag.String("crt", "ssl.crt", "CA-signed SSL certificate file")
var privateKeyFile *string = flag.String("key", "ssl.key", "SSL private key file")
var backendRootFile *string = flag.String("peerroot", "backend_issuer.pem", "Root certificate that issued client certificates for backend servers")
var backendCertFile *string = flag.String("peercrt", "backend_cert.crt", "Backend-trusted certificate, for use as a client certificate")
var backendKeyFile *string = flag.String("peerkey", "backend_cert.key", "Private key for backend-trusted certificate, for use as a client certificate")
var basicAuthPwd *string = flag.String("password", "", "Password for HTTP Basic Auth") // TODO
func main() { func main() {
flag.Parse() flag.Parse()
@ -29,17 +34,24 @@ func main() {
SSLKeyFile: *privateKeyFile, SSLKeyFile: *privateKeyFile,
SSLCertificateFile: *certificateFile, SSLCertificateFile: *certificateFile,
UseSSL: *certificateFile != "", UseSSL: *certificateFile != "",
BackendRootCertFile: *backendRootFile,
BackendClientCertFile: *backendCertFile,
BackendClientKeyFile: *backendKeyFile,
SocketOrigin: *origin, SocketOrigin: *origin,
} }
server.SetupServerAndHandle(conf) httpServer := &http.Server{
Addr: *bindAddress
}
server.SetupServerAndHandle(conf, httpServer.TLSConfig)
var err error var err error
if conf.UseSSL { if conf.UseSSL {
err = http.ListenAndServeTLS(*bindAddress, *certificateFile, *privateKeyFile, nil) err = httpServer.ListenAndServeTLS(nil, nil)
} else { } else {
err = http.ListenAndServe(*bindAddress, nil) err = httpServer.ListenAndServe()
} }
if err != nil { if err != nil {

View file

@ -8,16 +8,41 @@ import (
"github.com/pmylund/go-cache" "github.com/pmylund/go-cache"
"strconv" "strconv"
"io/ioutil" "io/ioutil"
"encoding/json"
"crypto/tls"
"crypto/x509"
"log"
) )
var httpClient http.Client var backendHttpClient http.Client
var backendUrl string var backendUrl string
var responseCache *cache.Cache var responseCache *cache.Cache
func SetupBackend(url string) { var getBacklogUrl string
httpClient.Timeout = 60 * time.Second
backendUrl = url func SetupBackend(config *Config) {
backendHttpClient.Timeout = 60 * time.Second
backendUrl = config.BackendUrl
if responseCache != nil {
responseCache.Flush()
}
responseCache = cache.New(60 * time.Second, 120 * time.Second) responseCache = cache.New(60 * time.Second, 120 * time.Second)
getBacklogUrl = fmt.Sprintf("%s/backlog", backendUrl)
}
func SetupBackendCertificates(config *Config, certPool x509.CertPool) {
myCert, err := tls.LoadX509KeyPair(config.BackendClientCertFile, config.BackendClientKeyFile)
if err != nil {
log.Fatal(err)
}
tlsConfig := tls.Config{
Certificates: []tls.Certificate{myCert},
RootCAs: certPool,
}
tlsConfig.BuildNameToCertificate()
transport := &http.Transport{TLSClientConfig: tlsConfig}
backendHttpClient.Transport = transport
} }
func getCacheKey(remoteCommand, data string) string { func getCacheKey(remoteCommand, data string) string {
@ -33,7 +58,7 @@ func RequestRemoteDataCached(remoteCommand, data string, auth AuthInfo) (string,
} }
func RequestRemoteData(remoteCommand, data string, auth AuthInfo) (string, error) { func RequestRemoteData(remoteCommand, data string, auth AuthInfo) (string, error) {
destUrl := fmt.Sprintf("%s/%s", backendUrl, remoteCommand) destUrl := fmt.Sprintf("%s/cmd/%s", backendUrl, remoteCommand)
var authKey string var authKey string
if auth.UsernameValidated { if auth.UsernameValidated {
authKey = "usernameClaimed" authKey = "usernameClaimed"
@ -45,8 +70,11 @@ func RequestRemoteData(remoteCommand, data string, auth AuthInfo) (string, error
"clientData": []string{data}, "clientData": []string{data},
authKey: []string{auth.TwitchUsername}, authKey: []string{auth.TwitchUsername},
} }
if gconfig.BasicAuthPassword != "" {
formData["password"] = gconfig.BasicAuthPassword
}
resp, err := httpClient.PostForm(destUrl, formData) resp, err := backendHttpClient.PostForm(destUrl, formData)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -69,4 +97,24 @@ func RequestRemoteData(remoteCommand, data string, auth AuthInfo) (string, error
} }
return responseJson, nil return responseJson, nil
}
func FetchBacklogData(chatSubs, channelSubs []string) ([]ClientMessage, error) {
formData := url.Values{
"chatSubs": chatSubs,
"channelSubs": channelSubs,
}
resp, err := backendHttpClient.PostForm(getBacklogUrl, formData)
if err != nil {
return nil, err
}
dec := json.NewDecoder(resp.Body)
var messages []ClientMessage
err = dec.Decode(messages)
if err != nil {
return nil, err
}
return messages, nil
} }

View file

@ -6,11 +6,14 @@ import (
"log" "log"
"sync" "sync"
"strconv" "strconv"
"time"
) )
var ResponseSuccess = ClientMessage{Command: SuccessCommand} var ResponseSuccess = ClientMessage{Command: SuccessCommand}
var ResponseFailure = ClientMessage{Command: "False"} var ResponseFailure = ClientMessage{Command: "False"}
const ChannelInfoDelay = 2 * time.Second
func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) { func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) {
handler, ok := CommandHandlers[msg.Command] handler, ok := CommandHandlers[msg.Command]
if !ok { if !ok {
@ -22,9 +25,7 @@ func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage)
log.Println(conn.RemoteAddr(), msg.MessageID, msg.Command, msg.Arguments) log.Println(conn.RemoteAddr(), msg.MessageID, msg.Command, msg.Arguments)
client.Mutex.Lock()
response, err := CallHandler(handler, conn, client, msg) response, err := CallHandler(handler, conn, client, msg)
client.Mutex.Unlock()
if err == nil { if err == nil {
response.MessageID = msg.MessageID response.MessageID = msg.MessageID
@ -64,8 +65,10 @@ func HandleSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage)
return return
} }
client.Mutex.Lock()
client.TwitchUsername = username client.TwitchUsername = username
client.UsernameValidated = false client.UsernameValidated = false
client.Mutex.Unlock()
return ResponseSuccess, nil return ResponseSuccess, nil
} }
@ -73,9 +76,22 @@ func HandleSetUser(conn *websocket.Conn, client *ClientInfo, msg ClientMessage)
func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
channel, err := msg.ArgumentsAsString() channel, err := msg.ArgumentsAsString()
AddToSliceS(&client.CurrentChannels, channel) client.Mutex.Lock()
// TODO - get backlog AddToSliceS(&client.CurrentChannels, channel)
client.PendingChatBacklogs = append(client.PendingChatBacklogs, channel)
if client.MakePendingRequests == nil {
client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client))
} else {
if !client.MakePendingRequests.Reset(ChannelInfoDelay) {
client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client))
}
}
client.Mutex.Unlock()
// note - pub/sub updating happens in GetSubscriptionBacklog
return ResponseSuccess, nil return ResponseSuccess, nil
} }
@ -83,7 +99,11 @@ func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rms
func HandleUnsub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleUnsub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
channel, err := msg.ArgumentsAsString() channel, err := msg.ArgumentsAsString()
client.Mutex.Lock()
RemoveFromSliceS(&client.CurrentChannels, channel) RemoveFromSliceS(&client.CurrentChannels, channel)
client.Mutex.Unlock()
UnsubscribeSingleChat(client, channel)
return ResponseSuccess, nil return ResponseSuccess, nil
} }
@ -91,9 +111,22 @@ func HandleUnsub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (r
func HandleSubChannel(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleSubChannel(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
channel, err := msg.ArgumentsAsString() channel, err := msg.ArgumentsAsString()
AddToSliceS(&client.WatchingChannels, channel) client.Mutex.Lock()
// TODO - get backlog AddToSliceS(&client.WatchingChannels, channel)
client.PendingStreamBacklogs = append(client.PendingStreamBacklogs, channel)
if client.MakePendingRequests == nil {
client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client))
} else {
if !client.MakePendingRequests.Reset(ChannelInfoDelay) {
client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client))
}
}
client.Mutex.Unlock()
// note - pub/sub updating happens in GetSubscriptionBacklog
return ResponseSuccess, nil return ResponseSuccess, nil
} }
@ -101,17 +134,88 @@ func HandleSubChannel(conn *websocket.Conn, client *ClientInfo, msg ClientMessag
func HandleUnsubChannel(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleUnsubChannel(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
channel, err := msg.ArgumentsAsString() channel, err := msg.ArgumentsAsString()
client.Mutex.Lock()
RemoveFromSliceS(&client.WatchingChannels, channel) RemoveFromSliceS(&client.WatchingChannels, channel)
client.Mutex.Unlock()
UnsubscribeSingleChannel(client, channel)
return ResponseSuccess, nil return ResponseSuccess, nil
} }
func GetSubscriptionBacklogFor(conn *websocket.Conn, client *ClientInfo) func() {
return func() {
GetSubscriptionBacklog(conn, client)
}
}
// On goroutine
func GetSubscriptionBacklog(conn *websocket.Conn, client *ClientInfo) {
var chatSubs, channelSubs []string
// Lock, grab the data, and reset it
client.Mutex.Lock()
chatSubs = client.PendingChatBacklogs
channelSubs = client.PendingStreamBacklogs
client.PendingChatBacklogs = nil
client.PendingStreamBacklogs = nil
client.MakePendingRequests = nil
client.Mutex.Unlock()
if len(chatSubs) == 0 && len(channelSubs) == 0 {
return
}
SubscribeBatch(client, chatSubs, channelSubs)
messages, err := FetchBacklogData(chatSubs, channelSubs)
if err != nil {
// Oh well.
log.Print("error in GetSubscriptionBacklog:", err)
return
}
// Deliver to client
for _, msg := range messages {
client.MessageChannel <- msg
}
}
type SurveySubmission struct {
User string
Json string
}
var SurveySubmissions []SurveySubmission
var SurveySubmissionLock sync.Mutex
func HandleSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
log.Println("Ignoring survey response from", client.ClientID) SurveySubmissionLock.Lock()
SurveySubmissions = append(SurveySubmissions, SurveySubmission{client.TwitchUsername, msg.origArguments})
SurveySubmissionLock.Unlock()
return ResponseSuccess, nil return ResponseSuccess, nil
} }
type FollowEvent struct {
User string
Channel string
NowFollowing bool
Timestamp time.Time
}
var FollowEvents []FollowEvent
var FollowEventsLock sync.Mutex
func HandleTrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) { func HandleTrackFollow(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rmsg ClientMessage, err error) {
channel, following, err := msg.ArgumentsAsStringAndBool()
if err != nil {
return
}
now := time.Now()
FollowEventsLock.Lock()
FollowEvents = append(FollowEvents, FollowEvent{client.TwitchUsername, channel, following, now})
FollowEventsLock.Unlock()
return ResponseSuccess, nil return ResponseSuccess, nil
} }

View file

@ -4,23 +4,34 @@ import (
"net/http" "net/http"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
"crypto/tls" "crypto/tls"
"log"
"strings" "strings"
"strconv" "strconv"
"errors" "errors"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync" "sync"
"crypto/x509"
"io/ioutil"
) )
const MAX_PACKET_SIZE = 1024 const MAX_PACKET_SIZE = 1024
type Config struct { type Config struct {
// SSL
SSLCertificateFile string SSLCertificateFile string
SSLKeyFile string SSLKeyFile string
UseSSL bool UseSSL bool
// CA for client validation (pub/sub commands only)
BackendRootCertFile string
BackendClientCertFile string
BackendClientKeyFile string
// Password for client validation (pub/sub commands only)
BasicAuthPassword string
// Hostname of the socket server
SocketOrigin string SocketOrigin string
// URL to the backend server
BackendUrl string BackendUrl string
} }
@ -75,39 +86,58 @@ var ExpectedStringAndInt = errors.New("Error: Expected array of string, int as a
var ExpectedStringAndBool = errors.New("Error: Expected array of string, bool as arguments.") var ExpectedStringAndBool = errors.New("Error: Expected array of string, bool as arguments.")
var ExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.") var ExpectedStringAndIntGotFloat = errors.New("Error: Second argument was a float, expected an integer.")
var gconfig *Config
// Create a websocket.Server with the options from the provided Config. // Create a websocket.Server with the options from the provided Config.
func SetupServer(config *Config) *websocket.Server { func setupServer(config *Config, tlsConfig *tls.Config) *websocket.Server {
gconfig = config
sockConf, err := websocket.NewConfig("/", config.SocketOrigin) sockConf, err := websocket.NewConfig("/", config.SocketOrigin)
if err != nil { if err != nil {
panic(err) panic(err)
} }
SetupBackend(config)
if config.UseSSL { if config.UseSSL {
cert, err := tls.LoadX509KeyPair(config.SSLCertificateFile, config.SSLKeyFile) cert, err := tls.LoadX509KeyPair(config.SSLCertificateFile, config.SSLKeyFile)
if err != nil { if err != nil {
panic(err) panic(err)
} }
tlsConf := &tls.Config{ tlsConfig.Certificates = []tls.Certificate{cert}
Certificates: []tls.Certificate{cert}, tlsConfig.ServerName = config.SocketOrigin
ServerName: config.SocketOrigin, tlsConfig.BuildNameToCertificate()
sockConf.TlsConfig = tlsConfig
certBytes, err := ioutil.ReadFile(config.BackendRootCertFile)
if err != nil {
panic(err)
} }
tlsConf.BuildNameToCertificate() clientCA, err := x509.ParseCertificate(certBytes)
sockConf.TlsConfig = tlsConf if err != nil {
panic(err)
}
certPool := x509.NewCertPool()
certPool.AddCert(clientCA)
tlsConfig.ClientCAs = certPool
SetupBackendCertificates(config, certPool)
} }
sockServer := &websocket.Server{} sockServer := &websocket.Server{}
sockServer.Config = *sockConf sockServer.Config = *sockConf
sockServer.Handler = HandleSocketConnection sockServer.Handler = HandleSocketConnection
SetupBackend(config.BackendUrl) go deadChannelReaper()
return sockServer return sockServer
} }
// Set up a websocket listener and register it on /. // Set up a websocket listener and register it on /.
// (Uses http.DefaultServeMux .) // (Uses http.DefaultServeMux .)
func SetupServerAndHandle(config *Config) { func SetupServerAndHandle(config *Config, tlsConfig *tls.Config) {
sockServer := SetupServer(config) sockServer := setupServer(config, tlsConfig)
http.HandleFunc("/", sockServer.ServeHTTP) http.HandleFunc("/", sockServer.ServeHTTP)
http.HandleFunc("/pub", HandlePublishRequest)
} }
// Handle a new websocket connection from a FFZ client. // Handle a new websocket connection from a FFZ client.
@ -122,17 +152,14 @@ func HandleSocketConnection(conn *websocket.Conn) {
}) })
} }
defer func() { // Close the connection when we're done.
closer() defer closer()
}()
log.Print("! Got a connection from ", conn.RemoteAddr())
_clientChan := make(chan ClientMessage) _clientChan := make(chan ClientMessage)
_serverMessageChan := make(chan ClientMessage) _serverMessageChan := make(chan ClientMessage)
_errorChan := make(chan error) _errorChan := make(chan error)
// Receive goroutine // Launch receiver goroutine
go func(errorChan chan <- error, clientChan chan <- ClientMessage) { go func(errorChan chan <- error, clientChan chan <- ClientMessage) {
var msg ClientMessage var msg ClientMessage
var err error var err error
@ -148,13 +175,15 @@ func HandleSocketConnection(conn *websocket.Conn) {
// exit // exit
}(_errorChan, _clientChan) }(_errorChan, _clientChan)
var client ClientInfo
client.MessageChannel = _serverMessageChan
var errorChan <-chan error = _errorChan var errorChan <-chan error = _errorChan
var clientChan <-chan ClientMessage = _clientChan var clientChan <-chan ClientMessage = _clientChan
var serverMessageChan <-chan ClientMessage = _serverMessageChan var serverMessageChan <-chan ClientMessage = _serverMessageChan
var client ClientInfo
client.MessageChannel = _serverMessageChan
// All set up, now enter the work loop
RunLoop: RunLoop:
for { for {
select { select {
@ -180,7 +209,20 @@ func HandleSocketConnection(conn *websocket.Conn) {
FFZCodec.Send(conn, smsg) FFZCodec.Send(conn, smsg)
} }
} }
// exit
// Exit
// Launch message draining goroutine - we aren't out of the pub/sub records
go func() {
for _ := range _serverMessageChan {}
}()
// Stop getting messages...
UnsubscribeAll(&client)
// And finished.
// Close the channel so the draining goroutine can finish, too.
close(_serverMessageChan)
} }
func CallHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) { func CallHandler(handler CommandHandler, conn *websocket.Conn, client *ClientInfo, cmsg ClientMessage) (rmsg ClientMessage, err error) {

View file

@ -0,0 +1,173 @@
package server
// This is the scariest code I've written yet for the server.
// If I screwed up the locking, I won't know until it's too late.
import (
"sync"
"time"
"net/http"
)
type SubscriberList struct {
sync.RWMutex
Members []chan <- ClientMessage
}
var ChatSubscriptionInfo map[string]*SubscriberList
var ChatSubscriptionLock sync.RWMutex
var WatchingSubscriptionInfo map[string]*SubscriberList
var WatchingSubscriptionLock sync.RWMutex
func PublishToChat(channel string, msg ClientMessage) {
ChatSubscriptionLock.RLock()
list := ChatSubscriptionInfo[channel]
if list != nil {
list.RLock()
for _, ch := range list.Members {
ch <- msg
}
list.RUnlock()
}
ChatSubscriptionLock.RUnlock()
}
func PublishToWatchers(channel string, msg ClientMessage) {
WatchingSubscriptionLock.RLock()
list := WatchingSubscriptionInfo[channel]
if list != nil {
list.RLock()
for _, ch := range list.Members {
ch <- msg
}
list.RUnlock()
}
WatchingSubscriptionLock.RUnlock()
}
func HandlePublishRequest(w http.ResponseWriter, r *http.Request) {
if r.TLS {
PeerCertificates
}
}
// Add a channel to the subscriptions while holding a read-lock to the map.
// Locks:
// - ALREADY HOLDING a read-lock to the 'which' top-level map via the rlocker object
// - possible write lock to the 'which' top-level map via the wlocker object
// - write lock to SubscriptionInfo (if not creating new)
func _subscribeWhileRlocked(which map[string]*SubscriberList, channelName string, value chan <- ClientMessage, rlocker sync.Locker, wlocker sync.Locker) {
list := which[channelName]
if list == nil {
// Not found, so create it
rlocker.Unlock()
wlocker.Lock()
list = &SubscriberList{}
list.Members = &[]chan <- ClientMessage{value} // Create it populated, to avoid reaper
which[channelName] = list
wlocker.Unlock()
rlocker.Lock()
} else {
list.Lock()
AddToSliceC(&list.Members, value)
list.Unlock()
}
}
// Locks:
// - read lock to top-level maps
// - possible write lock to top-level maps
// - write lock to SubscriptionInfos
func SubscribeBatch(client *ClientInfo, chatSubs, channelSubs []string) {
mchan := client.MessageChannel
if len(chatSubs) > 0 {
rlocker := ChatSubscriptionLock.RLocker()
rlocker.Lock()
for _, v := range chatSubs {
_subscribeWhileRlocked(ChatSubscriptionInfo, v, mchan, rlocker, ChatSubscriptionLock)
}
rlocker.Unlock()
}
if len(channelSubs) > 0 {
rlocker := WatchingSubscriptionLock.RLocker()
rlocker.Lock()
for _, v := range channelSubs {
_subscribeWhileRlocked(WatchingSubscriptionInfo, v, mchan, rlocker, WatchingSubscriptionLock)
}
rlocker.Unlock()
}
}
// Unsubscribe the client from all channels, AND clear the CurrentChannels / WatchingChannels fields.
// Locks:
// - read lock to top-level maps
// - write lock to SubscriptionInfos
// - write lock to ClientInfo
func UnsubscribeAll(client *ClientInfo) {
ChatSubscriptionLock.RLock()
client.Mutex.Lock()
for _, v := range client.CurrentChannels {
list := ChatSubscriptionInfo[v]
list.Lock()
RemoveFromSliceC(&list.Members, client.MessageChannel)
list.Unlock()
}
client.CurrentChannels = nil
client.Mutex.Unlock()
ChatSubscriptionLock.RUnlock()
WatchingSubscriptionLock.RLock()
client.Mutex.Lock()
for _, v := range client.WatchingChannels {
list := WatchingSubscriptionInfo[v]
list.Lock()
RemoveFromSliceC(&list.Members, client.MessageChannel)
list.Unlock()
}
client.WatchingChannels = nil
client.Mutex.Unlock()
WatchingSubscriptionLock.RUnlock()
}
func UnsubscribeSingleChat(client *ClientInfo, channelName string) {
ChatSubscriptionLock.RLock()
list := ChatSubscriptionInfo[channelName]
list.Lock()
RemoveFromSliceC(&list.Members, client.MessageChannel)
list.Unlock()
ChatSubscriptionLock.RUnlock()
}
func UnsubscribeSingleChannel(client *ClientInfo, channelName string) {
WatchingSubscriptionLock.RLock()
list := WatchingSubscriptionInfo[channelName]
list.Lock()
RemoveFromSliceC(&list.Members, client.MessageChannel)
list.Unlock()
WatchingSubscriptionLock.RUnlock()
}
const ReapingDelay = 120 * time.Minute
// Checks each of ChatSubscriptionInfo / WatchingSubscriptionInfo
// for entries with no subscribers every ReapingDelay.
// Started from SetupServer().
func deadChannelReaper() {
for {
time.Sleep(ReapingDelay / 2)
ChatSubscriptionLock.Lock()
for key, val := range ChatSubscriptionInfo {
if len(val.Members) == 0 {
ChatSubscriptionInfo[key] = nil
}
}
ChatSubscriptionLock.Unlock()
time.Sleep(ReapingDelay / 2)
WatchingSubscriptionLock.Lock()
for key, val := range WatchingSubscriptionInfo {
if len(val.Members) == 0 {
WatchingSubscriptionInfo[key] = nil
}
}
}
}

View file

@ -3,19 +3,20 @@ package server
import ( import (
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"sync" "sync"
"time"
) )
type ClientMessage struct { type ClientMessage struct {
// Message ID. Increments by 1 for each message sent from the client. // Message ID. Increments by 1 for each message sent from the client.
// When replying to a command, the message ID must be echoed. // When replying to a command, the message ID must be echoed.
// When sending a server-initiated message, this is -1. // When sending a server-initiated message, this is -1.
MessageID int MessageID int `json:_`
// The command that the client wants from the server. // The command that the client wants from the server.
// When sent from the server, the literal string 'True' indicates success. // When sent from the server, the literal string 'True' indicates success.
// Before sending, a blank Command will be converted into SuccessCommand. // Before sending, a blank Command will be converted into SuccessCommand.
Command Command Command Command `json:cmd`
// Result of json.Unmarshal on the third field send from the client // Result of json.Unmarshal on the third field send from the client
Arguments interface{} Arguments interface{} `json:data`
origArguments string origArguments string
} }
@ -41,19 +42,35 @@ type ClientInfo struct {
// If it seems to be a performance problem, we can split this. // If it seems to be a performance problem, we can split this.
Mutex sync.Mutex Mutex sync.Mutex
// TODO(riking) - does this need to be protected cross-thread?
AuthInfo AuthInfo
// Username validation nonce. // Username validation nonce.
ValidationNonce string ValidationNonce string
// The list of chats this client is currently in. // The list of chats this client is currently in.
// Protected by Mutex // Protected by Mutex.
CurrentChannels []string CurrentChannels []string
// This list of channels this client needs UI updates for. // This list of channels this client needs UI updates for.
// Protected by Mutex // Protected by Mutex.
WatchingChannels []string WatchingChannels []string
// List of channels that we have not yet checked current chat-related channel info for.
// This lets us batch the backlog requests.
// Protected by Mutex.
PendingChatBacklogs []string
// List of channels that we have not yet checked current stream-related channel info for.
// This lets us batch the backlog requests.
// Protected by Mutex.
PendingStreamBacklogs []string
// A timer that, when fired, will make the pending backlog requests.
// Usually nil. Protected by Mutex.
MakePendingRequests *time.Timer
// Server-initiated messages should be sent here // Server-initiated messages should be sent here
// Never nil.
MessageChannel chan <- ClientMessage MessageChannel chan <- ClientMessage
} }

View file

@ -3,19 +3,20 @@ package server
import ( import (
) )
func AddToSliceS(ary *[]string, val string) { func AddToSliceS(ary *[]string, val string) bool {
slice := *ary slice := *ary
for _, v := range slice { for _, v := range slice {
if v == val { if v == val {
return return false
} }
} }
slice = append(slice, val) slice = append(slice, val)
*ary = slice *ary = slice
return true
} }
func RemoveFromSliceS(ary *[]string, val string) { func RemoveFromSliceS(ary *[]string, val string) bool {
slice := *ary slice := *ary
var idx int = -1 var idx int = -1
for i, v := range slice { for i, v := range slice {
@ -25,10 +26,43 @@ func RemoveFromSliceS(ary *[]string, val string) {
} }
} }
if idx == -1 { if idx == -1 {
return return false
} }
slice[idx] = slice[len(slice) - 1] slice[idx] = slice[len(slice) - 1]
slice = slice[:len(slice) - 1] slice = slice[:len(slice) - 1]
*ary = slice *ary = slice
return true
} }
func AddToSliceC(ary *[]chan <- ClientMessage, val chan <- ClientMessage) bool {
slice := *ary
for _, v := range slice {
if v == val {
return false
}
}
slice = append(slice, val)
*ary = slice
return true
}
func RemoveFromSliceC(ary *[]chan <- ClientMessage, val chan <- ClientMessage) bool {
slice := *ary
var idx int = -1
for i, v := range slice {
if v == val {
idx = i
break
}
}
if idx == -1 {
return false
}
slice[idx] = slice[len(slice) - 1]
slice = slice[:len(slice) - 1]
*ary = slice
return true
}