mirror of
				https://github.com/FrankerFaceZ/FrankerFaceZ.git
				synced 2025-10-22 10:52:02 +00:00 
			
		
		
		
	Reformat and add global message subscriptions
This commit is contained in:
		
							parent
							
								
									df7d607556
								
							
						
					
					
						commit
						8918b9ac3a
					
				
					 11 changed files with 182 additions and 158 deletions
				
			
		|  | @ -1,21 +1,21 @@ | |||
| package server | ||||
| 
 | ||||
| import ( | ||||
| 	"golang.org/x/crypto/nacl/box" | ||||
| 	"net/http" | ||||
| 	"time" | ||||
| 	"fmt" | ||||
| 	"net/url" | ||||
| 	"github.com/pmylund/go-cache" | ||||
| 	"strconv" | ||||
| 	"io/ioutil" | ||||
| 	"encoding/json" | ||||
| 	"sync" | ||||
| 	"log" | ||||
| 	"os" | ||||
| 	"crypto/rand" | ||||
| 	"encoding/base64" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"github.com/pmylund/go-cache" | ||||
| 	"golang.org/x/crypto/nacl/box" | ||||
| 	"io/ioutil" | ||||
| 	"log" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| 	"os" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| var backendHttpClient http.Client | ||||
|  | @ -35,7 +35,7 @@ func SetupBackend(config *Config) { | |||
| 	if responseCache != nil { | ||||
| 		responseCache.Flush() | ||||
| 	} | ||||
| 	responseCache = cache.New(60 * time.Second, 120 * time.Second) | ||||
| 	responseCache = cache.New(60*time.Second, 120*time.Second) | ||||
| 
 | ||||
| 	getBacklogUrl = fmt.Sprintf("%s/backlog", backendUrl) | ||||
| 
 | ||||
|  | @ -64,7 +64,7 @@ func getCacheKey(remoteCommand, data string) string { | |||
| 	return fmt.Sprintf("%s/%s", remoteCommand, data) | ||||
| } | ||||
| 
 | ||||
| func HandlePublishRequest(w http.ResponseWriter, r *http.Request) { | ||||
| func HBackendPublishRequest(w http.ResponseWriter, r *http.Request) { | ||||
| 	formData, err := UnsealRequest(r.Form) | ||||
| 	if err != nil { | ||||
| 		w.WriteHeader(403) | ||||
|  | @ -109,7 +109,7 @@ func RequestRemoteData(remoteCommand, data string, auth AuthInfo) (responseStr s | |||
| 
 | ||||
| 	formData := url.Values{ | ||||
| 		"clientData": []string{data}, | ||||
| 		authKey: []string{auth.TwitchUsername}, | ||||
| 		authKey:      []string{auth.TwitchUsername}, | ||||
| 	} | ||||
| 
 | ||||
| 	sealedForm, err := SealRequest(formData) | ||||
|  | @ -144,7 +144,7 @@ func RequestRemoteData(remoteCommand, data string, auth AuthInfo) (responseStr s | |||
| 
 | ||||
| func FetchBacklogData(chatSubs, channelSubs []string) ([]ClientMessage, error) { | ||||
| 	formData := url.Values{ | ||||
| 		"chatSubs": chatSubs, | ||||
| 		"chatSubs":    chatSubs, | ||||
| 		"channelSubs": channelSubs, | ||||
| 	} | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,9 +1,10 @@ | |||
| package server | ||||
| 
 | ||||
| import ( | ||||
| 	"testing" | ||||
| 	"net/url" | ||||
| 	"golang.org/x/crypto/nacl/box" | ||||
| 	"crypto/rand" | ||||
| 	"golang.org/x/crypto/nacl/box" | ||||
| 	"net/url" | ||||
| 	"testing" | ||||
| ) | ||||
| 
 | ||||
| func SetupRandomKeys(t testing.TB) { | ||||
|  |  | |||
							
								
								
									
										1
									
								
								socketserver/internal/server/backlog.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								socketserver/internal/server/backlog.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1 @@ | |||
| package server | ||||
|  | @ -1,11 +1,11 @@ | |||
| package server | ||||
| 
 | ||||
| import ( | ||||
| 	"golang.org/x/net/websocket" | ||||
| 	"github.com/satori/go.uuid" | ||||
| 	"golang.org/x/net/websocket" | ||||
| 	"log" | ||||
| 	"sync" | ||||
| 	"strconv" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
|  | @ -23,7 +23,7 @@ func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) | |||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| //	log.Println(conn.RemoteAddr(), msg.MessageID, msg.Command, msg.Arguments) | ||||
| 	//	log.Println(conn.RemoteAddr(), msg.MessageID, msg.Command, msg.Arguments) | ||||
| 
 | ||||
| 	response, err := CallHandler(handler, conn, client, msg) | ||||
| 
 | ||||
|  | @ -36,7 +36,7 @@ func HandleCommand(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) | |||
| 	} else { | ||||
| 		FFZCodec.Send(conn, ClientMessage{ | ||||
| 			MessageID: msg.MessageID, | ||||
| 			Command: "error", | ||||
| 			Command:   "error", | ||||
| 			Arguments: err.Error(), | ||||
| 		}) | ||||
| 	} | ||||
|  | @ -203,6 +203,7 @@ type SurveySubmission struct { | |||
| 	User string | ||||
| 	Json string | ||||
| } | ||||
| 
 | ||||
| var SurveySubmissions []SurveySubmission | ||||
| var SurveySubmissionLock sync.Mutex | ||||
| 
 | ||||
|  | @ -215,11 +216,12 @@ func HandleSurvey(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) ( | |||
| } | ||||
| 
 | ||||
| type FollowEvent struct { | ||||
| 	User string | ||||
| 	Channel string | ||||
| 	User         string | ||||
| 	Channel      string | ||||
| 	NowFollowing bool | ||||
| 	Timestamp time.Time | ||||
| 	Timestamp    time.Time | ||||
| } | ||||
| 
 | ||||
| var FollowEvents []FollowEvent | ||||
| var FollowEventsLock sync.Mutex | ||||
| 
 | ||||
|  | @ -268,7 +270,6 @@ func HandleEmoticonUses(conn *websocket.Conn, client *ClientInfo, msg ClientMess | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 
 | ||||
| 	return ResponseSuccess, nil | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,16 +1,16 @@ | |||
| package server // import "bitbucket.org/stendec/frankerfacez/socketserver/server" | ||||
| 
 | ||||
| import ( | ||||
| 	"net/http" | ||||
| 	"golang.org/x/net/websocket" | ||||
| 	"crypto/tls" | ||||
| 	"strings" | ||||
| 	"strconv" | ||||
| 	"errors" | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"sync" | ||||
| 	"golang.org/x/net/websocket" | ||||
| 	"log" | ||||
| 	"net/http" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| ) | ||||
| 
 | ||||
| const MAX_PACKET_SIZE = 1024 | ||||
|  | @ -22,12 +22,12 @@ type Config struct { | |||
| 	UseSSL             bool | ||||
| 
 | ||||
| 	// NaCl keys for backend messages | ||||
| 	NaclKeysFile       string | ||||
| 	NaclKeysFile string | ||||
| 
 | ||||
| 	// Hostname of the socket server | ||||
| 	SocketOrigin       string | ||||
| 	SocketOrigin string | ||||
| 	// URL to the backend server | ||||
| 	BackendUrl         string | ||||
| 	BackendUrl string | ||||
| } | ||||
| 
 | ||||
| // A command is how the client refers to a function on the server. It's just a string. | ||||
|  | @ -38,37 +38,40 @@ type CommandHandler func(*websocket.Conn, *ClientInfo, ClientMessage) (ClientMes | |||
| 
 | ||||
| var CommandHandlers = map[Command]CommandHandler{ | ||||
| 	HelloCommand: HandleHello, | ||||
| 	"setuser": HandleSetUser, | ||||
| 	"setuser":    HandleSetUser, | ||||
| 
 | ||||
| 	"sub": HandleSub, | ||||
| 	"unsub": HandleUnsub, | ||||
| 	"sub_channel": HandleSubChannel, | ||||
| 	"sub":           HandleSub, | ||||
| 	"unsub":         HandleUnsub, | ||||
| 	"sub_channel":   HandleSubChannel, | ||||
| 	"unsub_channel": HandleUnsubChannel, | ||||
| 
 | ||||
| 	"track_follow": HandleTrackFollow, | ||||
| 	"track_follow":  HandleTrackFollow, | ||||
| 	"emoticon_uses": HandleEmoticonUses, | ||||
| 	"survey": HandleSurvey, | ||||
| 	"survey":        HandleSurvey, | ||||
| 
 | ||||
| 	"twitch_emote": HandleRemoteCommand, | ||||
| 	"get_link": HandleRemoteCommand, | ||||
| 	"get_display_name": HandleRemoteCommand, | ||||
| 	"twitch_emote":          HandleRemoteCommand, | ||||
| 	"get_link":              HandleRemoteCommand, | ||||
| 	"get_display_name":      HandleRemoteCommand, | ||||
| 	"update_follow_buttons": HandleRemoteCommand, | ||||
| 	"chat_history": HandleRemoteCommand, | ||||
| 	"chat_history":          HandleRemoteCommand, | ||||
| } | ||||
| 
 | ||||
| // Sent by the server in ClientMessage.Command to indicate success. | ||||
| const SuccessCommand Command = "True" | ||||
| 
 | ||||
| // Sent by the server in ClientMessage.Command to indicate failure. | ||||
| const ErrorCommand Command = "error" | ||||
| 
 | ||||
| // This must be the first command sent by the client once the connection is established. | ||||
| const HelloCommand Command = "hello" | ||||
| 
 | ||||
| // A handler returning a ClientMessage with this Command will prevent replying to the client. | ||||
| // It signals that the work has been handed off to a background goroutine. | ||||
| const AsyncResponseCommand Command = "_async" | ||||
| 
 | ||||
| // A websocket.Codec that translates the protocol into ClientMessage objects. | ||||
| var FFZCodec websocket.Codec = websocket.Codec{ | ||||
| 	Marshal: MarshalClientMessage, | ||||
| 	Marshal:   MarshalClientMessage, | ||||
| 	Unmarshal: UnmarshalClientMessage, | ||||
| } | ||||
| 
 | ||||
|  | @ -124,7 +127,7 @@ func SetupServerAndHandle(config *Config, tlsConfig *tls.Config, serveMux *http. | |||
| 		serveMux = http.DefaultServeMux | ||||
| 	} | ||||
| 	serveMux.HandleFunc("/", sockServer.ServeHTTP) | ||||
| 	serveMux.HandleFunc("/pub", HandlePublishRequest) | ||||
| 	serveMux.HandleFunc("/pub", HBackendPublishRequest) | ||||
| } | ||||
| 
 | ||||
| // Handle a new websocket connection from a FFZ client. | ||||
|  | @ -147,7 +150,7 @@ func HandleSocketConnection(conn *websocket.Conn) { | |||
| 	_errorChan := make(chan error) | ||||
| 
 | ||||
| 	// Launch receiver goroutine | ||||
| 	go func(errorChan chan <- error, clientChan chan <- ClientMessage) { | ||||
| 	go func(errorChan chan<- error, clientChan chan<- ClientMessage) { | ||||
| 		var msg ClientMessage | ||||
| 		var err error | ||||
| 		for ; err == nil; err = FFZCodec.Receive(conn, &msg) { | ||||
|  | @ -171,13 +174,13 @@ func HandleSocketConnection(conn *websocket.Conn) { | |||
| 
 | ||||
| 	// All set up, now enter the work loop | ||||
| 
 | ||||
| 	RunLoop: | ||||
| RunLoop: | ||||
| 	for { | ||||
| 		select { | ||||
| 		case err := <-errorChan: | ||||
| 			FFZCodec.Send(conn, ClientMessage{ | ||||
| 				MessageID: -1, | ||||
| 				Command: "error", | ||||
| 				Command:   "error", | ||||
| 				Arguments: err.Error(), | ||||
| 			}) // note - socket might be closed, but don't care | ||||
| 			break RunLoop | ||||
|  | @ -185,7 +188,7 @@ func HandleSocketConnection(conn *websocket.Conn) { | |||
| 			if client.Version == "" && msg.Command != HelloCommand { | ||||
| 				FFZCodec.Send(conn, ClientMessage{ | ||||
| 					MessageID: msg.MessageID, | ||||
| 					Command: "error", | ||||
| 					Command:   "error", | ||||
| 					Arguments: "Error - the first message sent must be a 'hello'", | ||||
| 				}) | ||||
| 				break RunLoop | ||||
|  | @ -201,7 +204,8 @@ func HandleSocketConnection(conn *websocket.Conn) { | |||
| 
 | ||||
| 	// Launch message draining goroutine - we aren't out of the pub/sub records | ||||
| 	go func() { | ||||
| 		for _ = range _serverMessageChan {} | ||||
| 		for _ = range _serverMessageChan { | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	// Stop getting messages... | ||||
|  | @ -244,7 +248,7 @@ func UnmarshalClientMessage(data []byte, payloadType byte, v interface{}) (err e | |||
| 	} | ||||
| 
 | ||||
| 	out.MessageID = messageId | ||||
| 	dataStr = dataStr[spaceIdx + 1:] | ||||
| 	dataStr = dataStr[spaceIdx+1:] | ||||
| 
 | ||||
| 	spaceIdx = strings.IndexRune(dataStr, ' ') | ||||
| 	if spaceIdx == -1 { | ||||
|  | @ -254,7 +258,7 @@ func UnmarshalClientMessage(data []byte, payloadType byte, v interface{}) (err e | |||
| 	} else { | ||||
| 		out.Command = Command(dataStr[:spaceIdx]) | ||||
| 	} | ||||
| 	dataStr = dataStr[spaceIdx + 1:] | ||||
| 	dataStr = dataStr[spaceIdx+1:] | ||||
| 	argumentsJson := dataStr | ||||
| 	out.origArguments = argumentsJson | ||||
| 	err = json.Unmarshal([]byte(argumentsJson), &out.Arguments) | ||||
|  | @ -306,7 +310,7 @@ func MarshalClientMessage(clientMessage interface{}) (data []byte, payloadType b | |||
| func NewClientMessage(arguments interface{}) ClientMessage { | ||||
| 	return ClientMessage{ | ||||
| 		MessageID: 0, // filled by the select loop | ||||
| 		Command: SuccessCommand, | ||||
| 		Command:   SuccessCommand, | ||||
| 		Arguments: arguments, | ||||
| 	} | ||||
| } | ||||
|  | @ -316,7 +320,8 @@ func (cm *ClientMessage) ArgumentsAsString() (string1 string, err error) { | |||
| 	var ok bool | ||||
| 	string1, ok = cm.Arguments.(string) | ||||
| 	if !ok { | ||||
| 		err = ExpectedSingleString; return | ||||
| 		err = ExpectedSingleString | ||||
| 		return | ||||
| 	} else { | ||||
| 		return string1, nil | ||||
| 	} | ||||
|  | @ -328,7 +333,8 @@ func (cm *ClientMessage) ArgumentsAsInt() (int1 int, err error) { | |||
| 	var num float64 | ||||
| 	num, ok = cm.Arguments.(float64) | ||||
| 	if !ok { | ||||
| 		err = ExpectedSingleInt; return | ||||
| 		err = ExpectedSingleInt | ||||
| 		return | ||||
| 	} else { | ||||
| 		int1 = int(num) | ||||
| 		return int1, nil | ||||
|  | @ -341,18 +347,22 @@ func (cm *ClientMessage) ArgumentsAsTwoStrings() (string1, string2 string, err e | |||
| 	var ary []interface{} | ||||
| 	ary, ok = cm.Arguments.([]interface{}) | ||||
| 	if !ok { | ||||
| 		err = ExpectedTwoStrings; return | ||||
| 		err = ExpectedTwoStrings | ||||
| 		return | ||||
| 	} else { | ||||
| 		if len(ary) != 2 { | ||||
| 			err = ExpectedTwoStrings; return | ||||
| 			err = ExpectedTwoStrings | ||||
| 			return | ||||
| 		} | ||||
| 		string1, ok = ary[0].(string) | ||||
| 		if !ok { | ||||
| 			err = ExpectedTwoStrings; return | ||||
| 			err = ExpectedTwoStrings | ||||
| 			return | ||||
| 		} | ||||
| 		string2, ok = ary[1].(string) | ||||
| 		if !ok { | ||||
| 			err = ExpectedTwoStrings; return | ||||
| 			err = ExpectedTwoStrings | ||||
| 			return | ||||
| 		} | ||||
| 		return string1, string2, nil | ||||
| 	} | ||||
|  | @ -364,23 +374,28 @@ func (cm *ClientMessage) ArgumentsAsStringAndInt() (string1 string, int int64, e | |||
| 	var ary []interface{} | ||||
| 	ary, ok = cm.Arguments.([]interface{}) | ||||
| 	if !ok { | ||||
| 		err = ExpectedStringAndInt; return | ||||
| 		err = ExpectedStringAndInt | ||||
| 		return | ||||
| 	} else { | ||||
| 		if len(ary) != 2 { | ||||
| 			err = ExpectedStringAndInt; return | ||||
| 			err = ExpectedStringAndInt | ||||
| 			return | ||||
| 		} | ||||
| 		string1, ok = ary[0].(string) | ||||
| 		if !ok { | ||||
| 			err = ExpectedStringAndInt; return | ||||
| 			err = ExpectedStringAndInt | ||||
| 			return | ||||
| 		} | ||||
| 		var num float64 | ||||
| 		num, ok = ary[1].(float64) | ||||
| 		if !ok { | ||||
| 			err = ExpectedStringAndInt; return | ||||
| 			err = ExpectedStringAndInt | ||||
| 			return | ||||
| 		} | ||||
| 		int = int64(num) | ||||
| 		if float64(int) != num { | ||||
| 			err = ExpectedStringAndIntGotFloat; return | ||||
| 			err = ExpectedStringAndIntGotFloat | ||||
| 			return | ||||
| 		} | ||||
| 		return string1, int, nil | ||||
| 	} | ||||
|  | @ -392,18 +407,22 @@ func (cm *ClientMessage) ArgumentsAsStringAndBool() (str string, flag bool, err | |||
| 	var ary []interface{} | ||||
| 	ary, ok = cm.Arguments.([]interface{}) | ||||
| 	if !ok { | ||||
| 		err = ExpectedStringAndBool; return | ||||
| 		err = ExpectedStringAndBool | ||||
| 		return | ||||
| 	} else { | ||||
| 		if len(ary) != 2 { | ||||
| 			err = ExpectedStringAndBool; return | ||||
| 			err = ExpectedStringAndBool | ||||
| 			return | ||||
| 		} | ||||
| 		str, ok = ary[0].(string) | ||||
| 		if !ok { | ||||
| 			err = ExpectedStringAndBool; return | ||||
| 			err = ExpectedStringAndBool | ||||
| 			return | ||||
| 		} | ||||
| 		flag, ok = ary[1].(bool) | ||||
| 		if !ok { | ||||
| 			err = ExpectedStringAndBool; return | ||||
| 			err = ExpectedStringAndBool | ||||
| 			return | ||||
| 		} | ||||
| 		return str, flag, nil | ||||
| 	} | ||||
|  |  | |||
|  | @ -1,8 +1,8 @@ | |||
| package server | ||||
| 
 | ||||
| import ( | ||||
| 	"golang.org/x/net/websocket" | ||||
| 	"fmt" | ||||
| 	"golang.org/x/net/websocket" | ||||
| 	"testing" | ||||
| ) | ||||
| 
 | ||||
|  | @ -24,7 +24,7 @@ func ExampleUnmarshalClientMessage() { | |||
| func ExampleMarshalClientMessage() { | ||||
| 	var cm ClientMessage = ClientMessage{ | ||||
| 		MessageID: -1, | ||||
| 		Command: "do_authorize", | ||||
| 		Command:   "do_authorize", | ||||
| 		Arguments: "1234567890", | ||||
| 	} | ||||
| 	data, payloadType, err := MarshalClientMessage(&cm) | ||||
|  | @ -54,4 +54,4 @@ func TestArgumentsAsStringAndBool(t *testing.T) { | |||
| 	if boolean != false { | ||||
| 		t.Error("Expected second array item to be false, got", boolean) | ||||
| 	} | ||||
| } | ||||
| } | ||||
|  |  | |||
|  | @ -4,29 +4,30 @@ package server | |||
| // If I screwed up the locking, I won't know until it's too late. | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 	"net/http" | ||||
| 	"fmt" | ||||
| ) | ||||
| 
 | ||||
| type SubscriberList struct { | ||||
| 	sync.RWMutex | ||||
| 	Members []chan <- ClientMessage | ||||
| 	Members []chan<- ClientMessage | ||||
| } | ||||
| 
 | ||||
| var ChatSubscriptionInfo map[string]*SubscriberList = make(map[string]*SubscriberList) | ||||
| var ChatSubscriptionLock sync.RWMutex | ||||
| var WatchingSubscriptionInfo map[string]*SubscriberList = make(map[string]*SubscriberList) | ||||
| var WatchingSubscriptionLock sync.RWMutex | ||||
| var GlobalSubscriptionInfo SubscriberList | ||||
| 
 | ||||
| func PublishToChat(channel string, msg ClientMessage) (count int) { | ||||
| 	ChatSubscriptionLock.RLock() | ||||
| 	list := ChatSubscriptionInfo[channel] | ||||
| 	if list != nil { | ||||
| 		list.RLock() | ||||
| 		for _, ch := range list.Members { | ||||
| 			ch <- msg | ||||
| 		for _, msgChan := range list.Members { | ||||
| 			msgChan <- msg | ||||
| 			count++ | ||||
| 		} | ||||
| 		list.RUnlock() | ||||
|  | @ -40,8 +41,8 @@ func PublishToWatchers(channel string, msg ClientMessage) (count int) { | |||
| 	list := WatchingSubscriptionInfo[channel] | ||||
| 	if list != nil { | ||||
| 		list.RLock() | ||||
| 		for _, ch := range list.Members { | ||||
| 			ch <- msg | ||||
| 		for _, msgChan := range list.Members { | ||||
| 			msgChan <- msg | ||||
| 			count++ | ||||
| 		} | ||||
| 		list.RUnlock() | ||||
|  | @ -50,19 +51,28 @@ func PublishToWatchers(channel string, msg ClientMessage) (count int) { | |||
| 	return | ||||
| } | ||||
| 
 | ||||
| func PublishToAll(msg ClientMessage) (count int) { | ||||
| 	GlobalSubscriptionInfo.RLock() | ||||
| 	for _, msgChan := range GlobalSubscriptionInfo.Members { | ||||
| 		msgChan <- msg | ||||
| 		count++ | ||||
| 	} | ||||
| 	GlobalSubscriptionInfo.RUnlock() | ||||
| } | ||||
| 
 | ||||
| // Add a channel to the subscriptions while holding a read-lock to the map. | ||||
| // Locks: | ||||
| //   - ALREADY HOLDING a read-lock to the 'which' top-level map via the rlocker object | ||||
| //   - possible write lock to the 'which' top-level map via the wlocker object | ||||
| //   - write lock to SubscriptionInfo (if not creating new) | ||||
| func _subscribeWhileRlocked(which map[string]*SubscriberList, channelName string, value chan <- ClientMessage, rlocker sync.Locker, wlocker sync.Locker) { | ||||
| func _subscribeWhileRlocked(which map[string]*SubscriberList, channelName string, value chan<- ClientMessage, rlocker sync.Locker, wlocker sync.Locker) { | ||||
| 	list := which[channelName] | ||||
| 	if list == nil { | ||||
| 		// Not found, so create it | ||||
| 		rlocker.Unlock() | ||||
| 		wlocker.Lock() | ||||
| 		list = &SubscriberList{} | ||||
| 		list.Members = []chan <- ClientMessage{value} // Create it populated, to avoid reaper | ||||
| 		list.Members = []chan<- ClientMessage{value} // Create it populated, to avoid reaper | ||||
| 		which[channelName] = list | ||||
| 		wlocker.Unlock() | ||||
| 		rlocker.Lock() | ||||
|  | @ -73,6 +83,12 @@ func _subscribeWhileRlocked(which map[string]*SubscriberList, channelName string | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func SubscribeGlobal(client *ClientInfo) { | ||||
| 	GlobalSubscriptionInfo.Lock() | ||||
| 	AddToSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel) | ||||
| 	GlobalSubscriptionInfo.Unlock() | ||||
| } | ||||
| 
 | ||||
| func SubscribeChat(client *ClientInfo, channelName string) { | ||||
| 	ChatSubscriptionLock.RLock() | ||||
| 	_subscribeWhileRlocked(ChatSubscriptionInfo, channelName, client.MessageChannel, ChatSubscriptionLock.RLocker(), &ChatSubscriptionLock) | ||||
|  | @ -85,28 +101,16 @@ func SubscribeWatching(client *ClientInfo, channelName string) { | |||
| 	WatchingSubscriptionLock.RUnlock() | ||||
| } | ||||
| 
 | ||||
| // Locks: | ||||
| //   - read lock to top-level maps | ||||
| //   - possible write lock to top-level maps | ||||
| //   - write lock to SubscriptionInfos | ||||
| func SubscribeBatch(client *ClientInfo, chatSubs, channelSubs []string) { | ||||
| 	mchan := client.MessageChannel | ||||
| 	if len(chatSubs) > 0 { | ||||
| 		rlocker := ChatSubscriptionLock.RLocker() | ||||
| 		rlocker.Lock() | ||||
| 		for _, v := range chatSubs { | ||||
| 			_subscribeWhileRlocked(ChatSubscriptionInfo, v, mchan, rlocker, &ChatSubscriptionLock) | ||||
| 		} | ||||
| 		rlocker.Unlock() | ||||
| 	} | ||||
| 	if len(channelSubs) > 0 { | ||||
| 		rlocker := WatchingSubscriptionLock.RLocker() | ||||
| 		rlocker.Lock() | ||||
| 		for _, v := range channelSubs { | ||||
| 			_subscribeWhileRlocked(WatchingSubscriptionInfo, v, mchan, rlocker, &WatchingSubscriptionLock) | ||||
| 		} | ||||
| 		rlocker.Unlock() | ||||
| 	} | ||||
| func unsubscribeAllClients() { | ||||
| 	GlobalSubscriptionInfo.Lock() | ||||
| 	GlobalSubscriptionInfo.Members = nil | ||||
| 	GlobalSubscriptionInfo.Unlock() | ||||
| 	ChatSubscriptionLock.Lock() | ||||
| 	ChatSubscriptionInfo = make(map[string]*SubscriberList) | ||||
| 	ChatSubscriptionLock.Unlock() | ||||
| 	WatchingSubscriptionLock.Lock() | ||||
| 	WatchingSubscriptionInfo = make(map[string]*SubscriberList) | ||||
| 	WatchingSubscriptionLock.Unlock() | ||||
| } | ||||
| 
 | ||||
| // Unsubscribe the client from all channels, AND clear the CurrentChannels / WatchingChannels fields. | ||||
|  | @ -120,6 +124,10 @@ func UnsubscribeAll(client *ClientInfo) { | |||
| 	client.PendingStreamBacklogs = nil | ||||
| 	client.Mutex.Unlock() | ||||
| 
 | ||||
| 	GlobalSubscriptionInfo.Lock() | ||||
| 	RemoveFromSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel) | ||||
| 	GlobalSubscriptionInfo.Unlock() | ||||
| 
 | ||||
| 	ChatSubscriptionLock.RLock() | ||||
| 	client.Mutex.Lock() | ||||
| 	for _, v := range client.CurrentChannels { | ||||
|  | @ -149,15 +157,6 @@ func UnsubscribeAll(client *ClientInfo) { | |||
| 	WatchingSubscriptionLock.RUnlock() | ||||
| } | ||||
| 
 | ||||
| func unsubscribeAllClients() { | ||||
| 	ChatSubscriptionLock.Lock() | ||||
| 	ChatSubscriptionInfo = make(map[string]*SubscriberList) | ||||
| 	ChatSubscriptionLock.Unlock() | ||||
| 	WatchingSubscriptionLock.Lock() | ||||
| 	WatchingSubscriptionInfo = make(map[string]*SubscriberList) | ||||
| 	WatchingSubscriptionLock.Unlock() | ||||
| } | ||||
| 
 | ||||
| func UnsubscribeSingleChat(client *ClientInfo, channelName string) { | ||||
| 	ChatSubscriptionLock.RLock() | ||||
| 	list := ChatSubscriptionInfo[channelName] | ||||
|  | @ -199,4 +198,4 @@ func deadChannelReaper() { | |||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| } | ||||
|  |  | |||
|  | @ -1,15 +1,16 @@ | |||
| package server | ||||
| 
 | ||||
| import ( | ||||
| 	"testing" | ||||
| 	"net/http/httptest" | ||||
| 	"net/http" | ||||
| 	"sync" | ||||
| 	"golang.org/x/net/websocket" | ||||
| 	"github.com/satori/go.uuid" | ||||
| 	"fmt" | ||||
| 	"syscall" | ||||
| 	"os" | ||||
| 	"github.com/satori/go.uuid" | ||||
| 	"golang.org/x/net/websocket" | ||||
| 	"io/ioutil" | ||||
| 	"net/http" | ||||
| 	"net/http/httptest" | ||||
| 	"os" | ||||
| 	"sync" | ||||
| 	"syscall" | ||||
| 	"testing" | ||||
| ) | ||||
| 
 | ||||
| func CountOpenFDs() uint64 { | ||||
|  | @ -26,7 +27,7 @@ func BenchmarkThousandUserSubscription(b *testing.B) { | |||
| 
 | ||||
| 	GenerateKeys("/tmp/test_naclkeys.json", "2", "+ZMqOmxhaVrCV5c0OMZ09QoSGcJHuqQtJrwzRD+JOjE=") | ||||
| 	conf := &Config{ | ||||
| 		UseSSL: false, | ||||
| 		UseSSL:       false, | ||||
| 		NaclKeysFile: "/tmp/test_naclkeys.json", | ||||
| 		SocketOrigin: "localhost:2002", | ||||
| 	} | ||||
|  | @ -47,7 +48,7 @@ func BenchmarkThousandUserSubscription(b *testing.B) { | |||
| 	var limit syscall.Rlimit | ||||
| 	syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limit) | ||||
| 
 | ||||
| 	limit.Cur = CountOpenFDs() + uint64(b.N) * 2 + 100 | ||||
| 	limit.Cur = CountOpenFDs() + uint64(b.N)*2 + 100 | ||||
| 
 | ||||
| 	if limit.Cur > limit.Max { | ||||
| 		b.Skip("Open file limit too low") | ||||
|  |  | |||
|  | @ -1,6 +1,9 @@ | |||
| package server | ||||
| 
 | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"github.com/satori/go.uuid" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | @ -9,30 +12,30 @@ import ( | |||
| const CryptoBoxKeyLength = 32 | ||||
| 
 | ||||
| type CryptoKeysBuf struct { | ||||
| 	OurPrivateKey []byte | ||||
| 	OurPublicKey []byte | ||||
| 	OurPrivateKey  []byte | ||||
| 	OurPublicKey   []byte | ||||
| 	TheirPublicKey []byte | ||||
| 	ServerId int | ||||
| 	ServerId       int | ||||
| } | ||||
| 
 | ||||
| type ClientMessage struct { | ||||
| 	// Message ID. Increments by 1 for each message sent from the client. | ||||
| 	// When replying to a command, the message ID must be echoed. | ||||
| 	// When sending a server-initiated message, this is -1. | ||||
| 	MessageID     int `json:_` | ||||
| 	MessageID int `json:_` | ||||
| 	// The command that the client wants from the server. | ||||
| 	// When sent from the server, the literal string 'True' indicates success. | ||||
| 	// Before sending, a blank Command will be converted into SuccessCommand. | ||||
| 	Command       Command `json:cmd` | ||||
| 	Command Command `json:cmd` | ||||
| 	// Result of json.Unmarshal on the third field send from the client | ||||
| 	Arguments     interface{} `json:data` | ||||
| 	Arguments interface{} `json:data` | ||||
| 
 | ||||
| 	origArguments string | ||||
| } | ||||
| 
 | ||||
| type AuthInfo struct { | ||||
| 	// The client's claimed username on Twitch. | ||||
| 	TwitchUsername    string | ||||
| 	TwitchUsername string | ||||
| 
 | ||||
| 	// Whether or not the server has validated the client's claimed username. | ||||
| 	UsernameValidated bool | ||||
|  | @ -41,25 +44,25 @@ type AuthInfo struct { | |||
| type ClientInfo struct { | ||||
| 	// The client ID. | ||||
| 	// This must be written once by the owning goroutine before the struct is passed off to any other goroutines. | ||||
| 	ClientID         uuid.UUID | ||||
| 	ClientID uuid.UUID | ||||
| 
 | ||||
| 	// The client's version. | ||||
| 	// This must be written once by the owning goroutine before the struct is passed off to any other goroutines. | ||||
| 	Version          string | ||||
| 	Version string | ||||
| 
 | ||||
| 	// This mutex protects writable data in this struct. | ||||
| 	// If it seems to be a performance problem, we can split this. | ||||
| 	Mutex            sync.Mutex | ||||
| 	Mutex sync.Mutex | ||||
| 
 | ||||
| 	// TODO(riking) - does this need to be protected cross-thread? | ||||
| 	AuthInfo | ||||
| 
 | ||||
| 	// Username validation nonce. | ||||
| 	ValidationNonce  string | ||||
| 	ValidationNonce string | ||||
| 
 | ||||
| 	// The list of chats this client is currently in. | ||||
| 	// Protected by Mutex. | ||||
| 	CurrentChannels  []string | ||||
| 	CurrentChannels []string | ||||
| 
 | ||||
| 	// This list of channels this client needs UI updates for. | ||||
| 	// Protected by Mutex. | ||||
|  |  | |||
|  | @ -1,15 +1,15 @@ | |||
| package server | ||||
| 
 | ||||
| import ( | ||||
| 	"crypto/rand" | ||||
| 	"net/url" | ||||
| 	"golang.org/x/crypto/nacl/box" | ||||
| 	"bytes" | ||||
| 	"crypto/rand" | ||||
| 	"encoding/base64" | ||||
| 	"errors" | ||||
| 	"golang.org/x/crypto/nacl/box" | ||||
| 	"log" | ||||
| 	"net/url" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"errors" | ||||
| 	"log" | ||||
| ) | ||||
| 
 | ||||
| func FillCryptoRandom(buf []byte) error { | ||||
|  | @ -53,8 +53,8 @@ func SealRequest(form url.Values) (url.Values, error) { | |||
| 
 | ||||
| 	retval := url.Values{ | ||||
| 		"nonce": []string{nonceString}, | ||||
| 		"msg": []string{cipherString}, | ||||
| 		"id": []string{strconv.Itoa(serverId)}, | ||||
| 		"msg":   []string{cipherString}, | ||||
| 		"id":    []string{strconv.Itoa(serverId)}, | ||||
| 	} | ||||
| 
 | ||||
| 	return retval, nil | ||||
|  | @ -122,13 +122,13 @@ func RemoveFromSliceS(ary *[]string, val string) bool { | |||
| 		return false | ||||
| 	} | ||||
| 
 | ||||
| 	slice[idx] = slice[len(slice) - 1] | ||||
| 	slice = slice[:len(slice) - 1] | ||||
| 	slice[idx] = slice[len(slice)-1] | ||||
| 	slice = slice[:len(slice)-1] | ||||
| 	*ary = slice | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| func AddToSliceC(ary *[]chan <- ClientMessage, val chan <- ClientMessage) bool { | ||||
| func AddToSliceC(ary *[]chan<- ClientMessage, val chan<- ClientMessage) bool { | ||||
| 	slice := *ary | ||||
| 	for _, v := range slice { | ||||
| 		if v == val { | ||||
|  | @ -141,7 +141,7 @@ func AddToSliceC(ary *[]chan <- ClientMessage, val chan <- ClientMessage) bool { | |||
| 	return true | ||||
| } | ||||
| 
 | ||||
| func RemoveFromSliceC(ary *[]chan <- ClientMessage, val chan <- ClientMessage) bool { | ||||
| func RemoveFromSliceC(ary *[]chan<- ClientMessage, val chan<- ClientMessage) bool { | ||||
| 	slice := *ary | ||||
| 	var idx int = -1 | ||||
| 	for i, v := range slice { | ||||
|  | @ -154,8 +154,8 @@ func RemoveFromSliceC(ary *[]chan <- ClientMessage, val chan <- ClientMessage) b | |||
| 		return false | ||||
| 	} | ||||
| 
 | ||||
| 	slice[idx] = slice[len(slice) - 1] | ||||
| 	slice = slice[:len(slice) - 1] | ||||
| 	slice[idx] = slice[len(slice)-1] | ||||
| 	slice = slice[:len(slice)-1] | ||||
| 	*ary = slice | ||||
| 	return true | ||||
| } | ||||
| } | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue