mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-06-27 21:05:53 +00:00
255 lines
5.8 KiB
Go
255 lines
5.8 KiB
Go
package server
|
|
|
|
import (
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/rate"
|
|
)
|
|
|
|
type SubscriberList struct {
|
|
sync.RWMutex
|
|
Members []*ClientInfo
|
|
}
|
|
|
|
var ChatSubscriptionInfo map[string]*SubscriberList = make(map[string]*SubscriberList)
|
|
var ChatSubscriptionLock sync.RWMutex
|
|
var GlobalSubscriptionInfo []*ClientInfo
|
|
var GlobalSubscriptionLock sync.RWMutex
|
|
|
|
// Send a message to the client.
|
|
// Drops if buffer is full.
|
|
func (client *ClientInfo) Send(msg ClientMessage) bool {
|
|
select {
|
|
case client.messageChannel <- msg:
|
|
return true
|
|
case <-client.MsgChannelIsDone:
|
|
return false
|
|
default:
|
|
// if we can't immediately send, ignore it
|
|
return false
|
|
}
|
|
}
|
|
|
|
func CountSubscriptions(channels []string) int {
|
|
ChatSubscriptionLock.RLock()
|
|
defer ChatSubscriptionLock.RUnlock()
|
|
|
|
count := 0
|
|
for _, channelName := range channels {
|
|
list := ChatSubscriptionInfo[channelName]
|
|
if list != nil {
|
|
list.RLock()
|
|
count += len(list.Members)
|
|
list.RUnlock()
|
|
}
|
|
}
|
|
|
|
return count
|
|
}
|
|
|
|
func GetAllTopics() []string {
|
|
ChatSubscriptionLock.RLock()
|
|
defer ChatSubscriptionLock.RUnlock()
|
|
|
|
count := len(ChatSubscriptionInfo)
|
|
list := make([]string, count)
|
|
i := 0
|
|
for topicName := range ChatSubscriptionInfo {
|
|
list[i] = topicName
|
|
i++
|
|
}
|
|
return list
|
|
}
|
|
|
|
func SubscribeChannel(client *ClientInfo, channelName string) {
|
|
ChatSubscriptionLock.RLock()
|
|
_subscribeWhileRlocked(channelName, client)
|
|
ChatSubscriptionLock.RUnlock()
|
|
}
|
|
|
|
func SubscribeGlobal(client *ClientInfo) {
|
|
GlobalSubscriptionLock.Lock()
|
|
AddToSliceCl(&GlobalSubscriptionInfo, client)
|
|
GlobalSubscriptionLock.Unlock()
|
|
}
|
|
|
|
func PublishToChannel(channel string, msg ClientMessage, rl rate.Limiter) (count int) {
|
|
var found []*ClientInfo
|
|
|
|
ChatSubscriptionLock.RLock()
|
|
list := ChatSubscriptionInfo[channel]
|
|
if list != nil {
|
|
list.RLock()
|
|
found = make([]*ClientInfo, len(list.Members))
|
|
copy(found, list.Members)
|
|
list.RUnlock()
|
|
}
|
|
ChatSubscriptionLock.RUnlock()
|
|
|
|
for _, cl := range found {
|
|
rl.Performed()
|
|
if cl.Send(msg) {
|
|
count++
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func PublishToMultiple(channels []string, msg ClientMessage, rl rate.Limiter) (count int) {
|
|
var found []*ClientInfo
|
|
|
|
ChatSubscriptionLock.RLock()
|
|
for _, channel := range channels {
|
|
list := ChatSubscriptionInfo[channel]
|
|
if list != nil {
|
|
list.RLock()
|
|
for _, cl := range list.Members {
|
|
found = append(found, cl)
|
|
}
|
|
list.RUnlock()
|
|
}
|
|
}
|
|
ChatSubscriptionLock.RUnlock()
|
|
|
|
for _, cl := range found {
|
|
rl.Performed()
|
|
if cl.Send(msg) {
|
|
count++
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func PublishToAll(msg ClientMessage, rl rate.Limiter) (count int) {
|
|
var found []*ClientInfo
|
|
|
|
GlobalSubscriptionLock.RLock()
|
|
found = make([]*ClientInfo, len(GlobalSubscriptionInfo))
|
|
copy(found, GlobalSubscriptionInfo)
|
|
GlobalSubscriptionLock.RUnlock()
|
|
|
|
for _, cl := range found {
|
|
rl.Performed()
|
|
if cl.Send(msg) {
|
|
count++
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func UnsubscribeSingleChat(client *ClientInfo, channelName string) {
|
|
ChatSubscriptionLock.RLock()
|
|
list := ChatSubscriptionInfo[channelName]
|
|
if list != nil {
|
|
list.Lock()
|
|
RemoveFromSliceCl(&list.Members, client)
|
|
list.Unlock()
|
|
}
|
|
ChatSubscriptionLock.RUnlock()
|
|
}
|
|
|
|
// UnsubscribeAll will unsubscribe the client from all channels,
|
|
// AND clear the CurrentChannels / WatchingChannels fields.
|
|
//
|
|
// Locks:
|
|
// - read lock to top-level maps
|
|
// - write lock to SubscriptionInfos
|
|
// - write lock to ClientInfo
|
|
func UnsubscribeAll(client *ClientInfo) {
|
|
select {
|
|
case <-StopAcceptingConnectionsCh:
|
|
// Skip high-contention client removal operations while server shutting down
|
|
return
|
|
default:
|
|
}
|
|
|
|
GlobalSubscriptionLock.Lock()
|
|
RemoveFromSliceCl(&GlobalSubscriptionInfo, client)
|
|
GlobalSubscriptionLock.Unlock()
|
|
|
|
ChatSubscriptionLock.RLock()
|
|
client.Mutex.Lock()
|
|
for _, v := range client.CurrentChannels {
|
|
list := ChatSubscriptionInfo[v]
|
|
if list != nil {
|
|
list.Lock()
|
|
RemoveFromSliceCl(&list.Members, client)
|
|
list.Unlock()
|
|
}
|
|
}
|
|
client.CurrentChannels = nil
|
|
client.Mutex.Unlock()
|
|
ChatSubscriptionLock.RUnlock()
|
|
}
|
|
|
|
func unsubscribeAllClients() {
|
|
GlobalSubscriptionLock.Lock()
|
|
GlobalSubscriptionInfo = nil
|
|
GlobalSubscriptionLock.Unlock()
|
|
ChatSubscriptionLock.Lock()
|
|
ChatSubscriptionInfo = make(map[string]*SubscriberList)
|
|
ChatSubscriptionLock.Unlock()
|
|
}
|
|
|
|
const ReapingDelay = 1 * time.Minute
|
|
|
|
// Checks ChatSubscriptionInfo for entries with no subscribers every ReapingDelay.
|
|
// is_init_func
|
|
func pubsubJanitor() {
|
|
for {
|
|
time.Sleep(ReapingDelay)
|
|
pubsubJanitor_do()
|
|
}
|
|
}
|
|
|
|
func pubsubJanitor_do() {
|
|
var cleanedUp = make([]string, 0, 6)
|
|
ChatSubscriptionLock.Lock()
|
|
for key, val := range ChatSubscriptionInfo {
|
|
if val == nil || len(val.Members) == 0 {
|
|
delete(ChatSubscriptionInfo, key)
|
|
cleanedUp = append(cleanedUp, key)
|
|
}
|
|
}
|
|
ChatSubscriptionLock.Unlock()
|
|
|
|
if len(cleanedUp) != 0 {
|
|
err := Backend.SendCleanupTopicsNotice(cleanedUp)
|
|
if err != nil {
|
|
log.Println("error reporting cleaned subs:", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add a channel to the subscriptions while holding a read-lock to the map.
|
|
// Locks:
|
|
// - ALREADY HOLDING a read-lock to the 'which' top-level map via the rlocker object
|
|
// - possible write lock to the 'which' top-level map via the wlocker object
|
|
// - write lock to SubscriptionInfo (if not creating new)
|
|
func _subscribeWhileRlocked(channelName string, value *ClientInfo) {
|
|
list := ChatSubscriptionInfo[channelName]
|
|
if list == nil {
|
|
// Not found, so create it
|
|
ChatSubscriptionLock.RUnlock()
|
|
ChatSubscriptionLock.Lock()
|
|
list = &SubscriberList{}
|
|
list.Members = []*ClientInfo{value} // Create it populated, to avoid reaper
|
|
ChatSubscriptionInfo[channelName] = list
|
|
ChatSubscriptionLock.Unlock()
|
|
|
|
go func(topic string) {
|
|
err := Backend.SendNewTopicNotice(topic)
|
|
if err != nil {
|
|
log.Println("error reporting new sub:", err)
|
|
}
|
|
}(channelName)
|
|
|
|
ChatSubscriptionLock.RLock()
|
|
} else {
|
|
list.Lock()
|
|
AddToSliceCl(&list.Members, value)
|
|
list.Unlock()
|
|
}
|
|
}
|