1
0
Fork 0
mirror of https://github.com/FrankerFaceZ/FrankerFaceZ.git synced 2025-06-28 15:27:43 +00:00
FrankerFaceZ/socketserver/server/publisher.go

328 lines
8 KiB
Go
Raw Normal View History

2015-10-25 03:21:50 -07:00
package server
import (
"encoding/json"
2015-11-08 22:34:06 -08:00
"fmt"
"net/http"
2018-07-21 16:28:46 -04:00
"net/url"
2015-11-08 22:34:06 -08:00
"strconv"
"strings"
2015-10-25 03:21:50 -07:00
"sync"
"time"
2017-02-02 23:08:21 -08:00
"github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/rate"
2017-02-02 23:20:57 -08:00
"github.com/pkg/errors"
"golang.org/x/sync/singleflight"
2015-10-25 03:21:50 -07:00
)
2017-02-02 22:59:17 -08:00
// LastSavedMessage contains a reply to a command along with an expiration time.
2015-11-08 22:34:06 -08:00
type LastSavedMessage struct {
2016-07-08 13:08:36 -07:00
Expires time.Time
2017-02-02 23:08:21 -08:00
Data string
2015-11-08 22:34:06 -08:00
}
2015-10-25 03:21:50 -07:00
2015-11-08 22:34:06 -08:00
// map is command -> channel -> data
2016-04-28 15:33:49 -07:00
// CachedLastMessages is of CacheTypeLastOnly.
// Not actually cleaned up by reaper goroutine every ~hour.
2016-01-03 09:31:22 -08:00
var CachedLastMessages = make(map[Command]map[string]LastSavedMessage)
2015-11-08 22:34:06 -08:00
var CachedLSMLock sync.RWMutex
var singleFlighter singleflight.Group
2016-07-08 13:08:36 -07:00
func cachedMessageJanitor() {
for {
2017-02-02 23:08:21 -08:00
time.Sleep(1 * time.Hour)
2016-07-08 13:08:36 -07:00
cachedMessageJanitor_do()
}
}
func cachedMessageJanitor_do() {
CachedLSMLock.Lock()
defer CachedLSMLock.Unlock()
now := time.Now()
for cmd, chanMap := range CachedLastMessages {
for channel, msg := range chanMap {
if !msg.Expires.IsZero() && msg.Expires.Before(now) {
delete(chanMap, channel)
}
}
if len(chanMap) == 0 {
delete(CachedLastMessages, cmd)
}
}
}
2015-11-16 12:50:00 -08:00
// DumpBacklogData drops all /cached_pub data.
func DumpBacklogData() {
2015-11-08 22:34:06 -08:00
CachedLSMLock.Lock()
CachedLastMessages = make(map[Command]map[string]LastSavedMessage)
CachedLSMLock.Unlock()
}
2015-11-16 12:50:00 -08:00
// SendBacklogForNewClient sends any backlog data relevant to a new client.
// This should be done when the client sends a `ready` message.
// This will only send data for CacheTypePersistent and CacheTypeLastOnly because those do not involve timestamps.
2015-11-08 22:34:06 -08:00
func SendBacklogForNewClient(client *ClientInfo) {
client.Mutex.Lock() // reading CurrentChannels
2015-11-23 23:34:57 -08:00
curChannels := make([]string, len(client.CurrentChannels))
copy(curChannels, client.CurrentChannels)
client.Mutex.Unlock()
2015-11-08 22:34:06 -08:00
CachedLSMLock.RLock()
2016-07-08 12:46:16 -07:00
for cmd, chanMap := range CachedLastMessages {
2015-11-08 22:34:06 -08:00
if chanMap == nil {
continue
}
2015-11-23 23:34:57 -08:00
for _, channel := range curChannels {
2015-11-08 22:34:06 -08:00
msg, ok := chanMap[channel]
if ok {
msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data}
msg.parseOrigArguments()
2017-02-02 22:59:17 -08:00
client.Send(msg)
2015-11-08 22:34:06 -08:00
}
}
}
CachedLSMLock.RUnlock()
2015-10-25 03:21:50 -07:00
}
2016-04-28 14:39:20 -07:00
func SendBacklogForChannel(client *ClientInfo, channel string) {
CachedLSMLock.RLock()
2016-07-08 12:46:16 -07:00
for cmd, chanMap := range CachedLastMessages {
2016-04-28 14:39:20 -07:00
if chanMap == nil {
continue
}
if msg, ok := chanMap[channel]; ok {
msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: msg.Data}
msg.parseOrigArguments()
2017-02-02 22:59:17 -08:00
client.Send(msg)
2016-04-28 14:39:20 -07:00
}
}
CachedLSMLock.RUnlock()
}
2015-11-16 12:50:00 -08:00
type timestampArray interface {
2015-11-08 22:34:06 -08:00
Len() int
GetTime(int) time.Time
}
2016-07-08 13:08:36 -07:00
// the CachedLSMLock must be held when calling this
func saveLastMessage(cmd Command, channel string, expires time.Time, data string, deleting bool) {
2016-04-28 15:33:49 -07:00
chanMap, ok := CachedLastMessages[cmd]
2015-11-08 22:34:06 -08:00
if !ok {
if deleting {
return
}
chanMap = make(map[string]LastSavedMessage)
2016-04-28 15:33:49 -07:00
CachedLastMessages[cmd] = chanMap
2015-11-08 22:34:06 -08:00
}
if deleting {
delete(chanMap, channel)
} else {
2016-07-08 13:08:36 -07:00
chanMap[channel] = LastSavedMessage{Expires: expires, Data: data}
2015-11-08 22:34:06 -08:00
}
}
2015-11-16 12:50:00 -08:00
func HTTPBackendDropBacklog(w http.ResponseWriter, r *http.Request) {
2015-11-08 22:34:06 -08:00
r.ParseForm()
2017-09-15 16:40:40 -07:00
formData, err := Backend.secureForm.Unseal(r.Form)
2015-11-08 22:34:06 -08:00
if err != nil {
w.WriteHeader(403)
fmt.Fprintf(w, "Error: %v", err)
return
}
confirm := formData.Get("confirm")
if confirm == "1" {
2015-11-16 12:50:00 -08:00
DumpBacklogData()
2015-10-29 10:29:16 -07:00
}
2015-10-25 03:21:50 -07:00
}
2018-07-21 16:28:46 -04:00
func rateLimitFromFormData(formData url.Values) (rate.Limiter, error) {
rateCount := formData.Get("rateCount")
if rateCount != "" {
c, err := strconv.ParseInt(rateCount, 10, 32)
2017-02-02 22:59:17 -08:00
if err != nil {
return nil, errors.Wrap(err, "rateCount")
}
2018-07-21 16:28:46 -04:00
d, err := time.ParseDuration(formData.Get("rateTime"))
2017-02-02 22:59:17 -08:00
if err != nil {
return nil, errors.Wrap(err, "rateTime")
}
return rate.NewRateLimit(int(c), d), nil
2017-02-02 22:59:17 -08:00
}
return rate.Unlimited(), nil
2017-02-02 22:59:17 -08:00
}
2016-01-17 18:01:21 -08:00
// HTTPBackendCachedPublish handles the /cached_pub route.
// It publishes a message to clients, and then updates the in-server cache for the message.
2016-07-08 14:20:35 -07:00
//
// The 'channel' parameter is a comma-separated list of topics to publish the message to.
// The 'args' parameter is the JSON-encoded command data.
// If the 'delete' parameter is present, an entry is removed from the cache instead of publishing a message.
// If the 'expires' parameter is not specified, the message will not expire (though it is only kept in-memory).
2015-11-16 13:25:25 -08:00
func HTTPBackendCachedPublish(w http.ResponseWriter, r *http.Request) {
2015-11-08 22:34:06 -08:00
r.ParseForm()
2017-09-15 16:40:40 -07:00
formData, err := Backend.secureForm.Unseal(r.Form)
2015-11-08 22:34:06 -08:00
if err != nil {
w.WriteHeader(403)
fmt.Fprintf(w, "Error: %v", err)
return
}
cmd := CommandPool.InternCommand(formData.Get("cmd"))
2015-11-08 22:34:06 -08:00
json := formData.Get("args")
channel := formData.Get("channel")
deleteMode := formData.Get("delete") != ""
2016-07-08 13:08:36 -07:00
timeStr := formData.Get("expires")
var expires time.Time
if timeStr != "" {
timeNum, err := strconv.ParseInt(timeStr, 10, 64)
if err != nil {
w.WriteHeader(422)
fmt.Fprintf(w, "error parsing time: %v", err)
return
}
expires = time.Unix(timeNum, 0)
2015-11-08 22:34:06 -08:00
}
rl, err := rateLimitFromFormData(formData)
2017-02-02 22:59:17 -08:00
if err != nil {
w.WriteHeader(422)
fmt.Fprintf(w, "error parsing ratelimit: %v", err)
return
}
2015-10-25 03:21:50 -07:00
2015-11-08 22:34:06 -08:00
var count int
msg := ClientMessage{MessageID: -1, Command: cmd, origArguments: json}
msg.parseOrigArguments()
2016-07-08 12:46:16 -07:00
channels := strings.Split(channel, ",")
CachedLSMLock.Lock()
for _, channel := range channels {
2016-07-08 13:08:36 -07:00
saveLastMessage(cmd, channel, expires, json, deleteMode)
2015-10-25 03:21:50 -07:00
}
2016-07-08 12:46:16 -07:00
CachedLSMLock.Unlock()
2015-11-08 22:34:06 -08:00
2017-02-02 22:59:17 -08:00
var wg sync.WaitGroup
wg.Add(1)
go rl.Run()
go func() {
count = PublishToMultiple(channels, msg, rl)
wg.Done()
rl.Close()
}()
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
select {
case <-time.After(3 * time.Second):
2017-02-02 22:59:17 -08:00
count = -1
case <-ch:
}
2015-11-08 22:34:06 -08:00
w.Write([]byte(strconv.Itoa(count)))
}
2016-07-08 12:47:13 -07:00
// HTTPBackendUncachedPublish handles the /uncached_pub route.
// The backend can POST here to publish a message to clients with no caching.
// The POST arguments are `cmd`, `args`, `channel`, and `scope`.
// If "scope" is "global", then "channel" is not used.
func HTTPBackendUncachedPublish(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
2017-09-15 16:40:40 -07:00
formData, err := Backend.secureForm.Unseal(r.Form)
2016-07-08 12:47:13 -07:00
if err != nil {
w.WriteHeader(403)
fmt.Fprintf(w, "Error: %v", err)
return
}
cmd := formData.Get("cmd")
json := formData.Get("args")
channel := formData.Get("channel")
scope := formData.Get("scope")
if cmd == "" {
w.WriteHeader(422)
2017-02-02 22:59:17 -08:00
fmt.Fprint(w, "Error: cmd cannot be blank")
2016-07-08 12:47:13 -07:00
return
}
if channel == "" && scope != "global" {
w.WriteHeader(422)
2017-02-02 22:59:17 -08:00
fmt.Fprint(w, "Error: channel must be specified")
return
}
2018-07-21 16:28:46 -04:00
rl, err := rateLimitFromFormData(formData)
2017-02-02 22:59:17 -08:00
if err != nil {
w.WriteHeader(422)
fmt.Fprintf(w, "error parsing ratelimit: %v", err)
2016-07-08 12:47:13 -07:00
return
}
cm := ClientMessage{MessageID: -1, Command: CommandPool.InternCommand(cmd), origArguments: json}
cm.parseOrigArguments()
2017-02-02 22:59:17 -08:00
var count int
var wg sync.WaitGroup
wg.Add(1)
go rl.Run()
go func() {
switch scope {
default:
count = PublishToMultiple(strings.Split(channel, ","), cm, rl)
case "global":
count = PublishToAll(cm, rl)
}
wg.Done()
rl.Close()
}()
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
select {
case <-time.After(3 * time.Second):
2017-02-02 22:59:17 -08:00
count = -1
case <-ch:
2016-07-08 12:47:13 -07:00
}
2017-02-02 22:59:17 -08:00
w.Write([]byte(strconv.Itoa(count)))
2016-07-08 12:47:13 -07:00
}
2016-07-08 19:16:10 -07:00
// HTTPGetSubscriberCount handles the /get_sub_count route.
// It replies with the number of clients subscribed to a pub/sub topic.
// A "global" option is not available, use fetch(/stats).CurrentClientCount instead.
func HTTPGetSubscriberCount(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
2017-09-15 16:40:40 -07:00
formData, err := Backend.secureForm.Unseal(r.Form)
2016-07-08 19:16:10 -07:00
if err != nil {
w.WriteHeader(403)
fmt.Fprintf(w, "Error: %v", err)
return
}
channel := formData.Get("channel")
fmt.Fprint(w, CountSubscriptions(strings.Split(channel, ",")))
2017-02-02 23:08:21 -08:00
}
func HTTPListAllTopics(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
_, err := Backend.secureForm.Unseal(r.Form)
if err != nil {
//w.WriteHeader(403)
//fmt.Fprintf(w, "Error: %v", err)
//return
}
topicList, _, _ := singleFlighter.Do("/all_topics", func() (interface{}, error) {
return GetAllTopics(), nil
})
w.WriteHeader(200)
json.NewEncoder(w).Encode(topicList)
}