1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-06-27 21:05:53 +00:00
FrankerFaceZ/socketserver/server/subscriptions.go
2017-10-27 13:14:29 -07:00

255 lines
5.8 KiB
Go

package server
import (
"log"
"sync"
"time"
"github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/rate"
)
type SubscriberList struct {
sync.RWMutex
Members []*ClientInfo
}
var ChatSubscriptionInfo map[string]*SubscriberList = make(map[string]*SubscriberList)
var ChatSubscriptionLock sync.RWMutex
var GlobalSubscriptionInfo []*ClientInfo
var GlobalSubscriptionLock sync.RWMutex
// Send a message to the client.
// Drops if buffer is full.
func (client *ClientInfo) Send(msg ClientMessage) bool {
select {
case client.messageChannel <- msg:
return true
case <-client.MsgChannelIsDone:
return false
default:
// if we can't immediately send, ignore it
return false
}
}
func CountSubscriptions(channels []string) int {
ChatSubscriptionLock.RLock()
defer ChatSubscriptionLock.RUnlock()
count := 0
for _, channelName := range channels {
list := ChatSubscriptionInfo[channelName]
if list != nil {
list.RLock()
count += len(list.Members)
list.RUnlock()
}
}
return count
}
func GetAllTopics() []string {
ChatSubscriptionLock.RLock()
defer ChatSubscriptionLock.RUnlock()
count := len(ChatSubscriptionInfo)
list := make([]string, count)
i := 0
for topicName := range ChatSubscriptionInfo {
list[i] = topicName
i++
}
return list
}
func SubscribeChannel(client *ClientInfo, channelName string) {
ChatSubscriptionLock.RLock()
_subscribeWhileRlocked(channelName, client)
ChatSubscriptionLock.RUnlock()
}
func SubscribeGlobal(client *ClientInfo) {
GlobalSubscriptionLock.Lock()
AddToSliceCl(&GlobalSubscriptionInfo, client)
GlobalSubscriptionLock.Unlock()
}
func PublishToChannel(channel string, msg ClientMessage, rl rate.Limiter) (count int) {
var found []*ClientInfo
ChatSubscriptionLock.RLock()
list := ChatSubscriptionInfo[channel]
if list != nil {
list.RLock()
found = make([]*ClientInfo, len(list.Members))
copy(found, list.Members)
list.RUnlock()
}
ChatSubscriptionLock.RUnlock()
for _, cl := range found {
rl.Performed()
if cl.Send(msg) {
count++
}
}
return
}
func PublishToMultiple(channels []string, msg ClientMessage, rl rate.Limiter) (count int) {
var found []*ClientInfo
ChatSubscriptionLock.RLock()
for _, channel := range channels {
list := ChatSubscriptionInfo[channel]
if list != nil {
list.RLock()
for _, cl := range list.Members {
found = append(found, cl)
}
list.RUnlock()
}
}
ChatSubscriptionLock.RUnlock()
for _, cl := range found {
rl.Performed()
if cl.Send(msg) {
count++
}
}
return
}
func PublishToAll(msg ClientMessage, rl rate.Limiter) (count int) {
var found []*ClientInfo
GlobalSubscriptionLock.RLock()
found = make([]*ClientInfo, len(GlobalSubscriptionInfo))
copy(found, GlobalSubscriptionInfo)
GlobalSubscriptionLock.RUnlock()
for _, cl := range found {
rl.Performed()
if cl.Send(msg) {
count++
}
}
return
}
func UnsubscribeSingleChat(client *ClientInfo, channelName string) {
ChatSubscriptionLock.RLock()
list := ChatSubscriptionInfo[channelName]
if list != nil {
list.Lock()
RemoveFromSliceCl(&list.Members, client)
list.Unlock()
}
ChatSubscriptionLock.RUnlock()
}
// UnsubscribeAll will unsubscribe the client from all channels,
// AND clear the CurrentChannels / WatchingChannels fields.
//
// Locks:
// - read lock to top-level maps
// - write lock to SubscriptionInfos
// - write lock to ClientInfo
func UnsubscribeAll(client *ClientInfo) {
select {
case <-StopAcceptingConnectionsCh:
// Skip high-contention client removal operations while server shutting down
return
default:
}
GlobalSubscriptionLock.Lock()
RemoveFromSliceCl(&GlobalSubscriptionInfo, client)
GlobalSubscriptionLock.Unlock()
ChatSubscriptionLock.RLock()
client.Mutex.Lock()
for _, v := range client.CurrentChannels {
list := ChatSubscriptionInfo[v]
if list != nil {
list.Lock()
RemoveFromSliceCl(&list.Members, client)
list.Unlock()
}
}
client.CurrentChannels = nil
client.Mutex.Unlock()
ChatSubscriptionLock.RUnlock()
}
func unsubscribeAllClients() {
GlobalSubscriptionLock.Lock()
GlobalSubscriptionInfo = nil
GlobalSubscriptionLock.Unlock()
ChatSubscriptionLock.Lock()
ChatSubscriptionInfo = make(map[string]*SubscriberList)
ChatSubscriptionLock.Unlock()
}
const ReapingDelay = 1 * time.Minute
// Checks ChatSubscriptionInfo for entries with no subscribers every ReapingDelay.
// is_init_func
func pubsubJanitor() {
for {
time.Sleep(ReapingDelay)
pubsubJanitor_do()
}
}
func pubsubJanitor_do() {
var cleanedUp = make([]string, 0, 6)
ChatSubscriptionLock.Lock()
for key, val := range ChatSubscriptionInfo {
if val == nil || len(val.Members) == 0 {
delete(ChatSubscriptionInfo, key)
cleanedUp = append(cleanedUp, key)
}
}
ChatSubscriptionLock.Unlock()
if len(cleanedUp) != 0 {
err := Backend.SendCleanupTopicsNotice(cleanedUp)
if err != nil {
log.Println("error reporting cleaned subs:", err)
}
}
}
// Add a channel to the subscriptions while holding a read-lock to the map.
// Locks:
// - ALREADY HOLDING a read-lock to the 'which' top-level map via the rlocker object
// - possible write lock to the 'which' top-level map via the wlocker object
// - write lock to SubscriptionInfo (if not creating new)
func _subscribeWhileRlocked(channelName string, value *ClientInfo) {
list := ChatSubscriptionInfo[channelName]
if list == nil {
// Not found, so create it
ChatSubscriptionLock.RUnlock()
ChatSubscriptionLock.Lock()
list = &SubscriberList{}
list.Members = []*ClientInfo{value} // Create it populated, to avoid reaper
ChatSubscriptionInfo[channelName] = list
ChatSubscriptionLock.Unlock()
go func(topic string) {
err := Backend.SendNewTopicNotice(topic)
if err != nil {
log.Println("error reporting new sub:", err)
}
}(channelName)
ChatSubscriptionLock.RLock()
} else {
list.Lock()
AddToSliceCl(&list.Members, value)
list.Unlock()
}
}