mirror of
https://github.com/FrankerFaceZ/FrankerFaceZ.git
synced 2025-06-27 21:05:53 +00:00
Compile fixes, switch cache implementation
This commit is contained in:
parent
75e1a67e9a
commit
7657357164
5 changed files with 60 additions and 15 deletions
|
@ -16,8 +16,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/FrankerFaceZ/FrankerFaceZ/socketserver/server/naclform"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
"github.com/karlseguin/ccache"
|
||||
"golang.org/x/crypto/nacl/box"
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
const bPathAnnounceStartup = "/startup"
|
||||
|
@ -28,7 +29,8 @@ const bPathOtherCommand = "/cmd/"
|
|||
type backendInfo struct {
|
||||
HTTPClient http.Client
|
||||
baseURL string
|
||||
responseCache *cache.Cache
|
||||
responseCache *ccache.Cache
|
||||
reloadGroup singleflight.Group
|
||||
|
||||
postStatisticsURL string
|
||||
addTopicURL string
|
||||
|
@ -49,7 +51,8 @@ func setupBackend(config *ConfigFile) *backendInfo {
|
|||
|
||||
b.HTTPClient.Timeout = 60 * time.Second
|
||||
b.baseURL = config.BackendURL
|
||||
b.responseCache = cache.New(60*time.Second, 120*time.Second)
|
||||
// size in bytes of string payload
|
||||
b.responseCache = ccache.New(ccache.Configure().MaxSize(250 * 1000 * 1024))
|
||||
|
||||
b.announceStartupURL = fmt.Sprintf("%s%s", b.baseURL, bPathAnnounceStartup)
|
||||
b.addTopicURL = fmt.Sprintf("%s%s", b.baseURL, bPathAddTopic)
|
||||
|
@ -77,6 +80,18 @@ func getCacheKey(remoteCommand, data string) string {
|
|||
return fmt.Sprintf("%s/%s", remoteCommand, data)
|
||||
}
|
||||
|
||||
type cachedResponseStr string
|
||||
|
||||
// implements ccache.Sized
|
||||
func (c cachedResponseStr) Size() int64 {
|
||||
return int64(len(string(c)))
|
||||
}
|
||||
|
||||
// implements Stringer
|
||||
func (c cachedResponseStr) String() string {
|
||||
return string(c)
|
||||
}
|
||||
|
||||
// ErrForwardedFromBackend is an error returned by the backend server.
|
||||
type ErrForwardedFromBackend struct {
|
||||
JSONError interface{}
|
||||
|
@ -88,22 +103,47 @@ func (bfe ErrForwardedFromBackend) Error() string {
|
|||
}
|
||||
|
||||
// ErrAuthorizationNeeded is emitted when the backend replies with HTTP 401.
|
||||
//
|
||||
// Indicates that an attempt to validate `ClientInfo.TwitchUsername` should be attempted.
|
||||
var ErrAuthorizationNeeded = errors.New("Must authenticate Twitch username to use this command")
|
||||
|
||||
// SendRemoteCommandCached performs a RPC call on the backend, but caches responses.
|
||||
// SendRemoteCommandCached performs a RPC call on the backend, checking for a
|
||||
// cached response first.
|
||||
//
|
||||
// If a cached, but expired, response is found, the existing value is returned
|
||||
// and the cache is updated in the background.
|
||||
func (backend *backendInfo) SendRemoteCommandCached(remoteCommand, data string, auth AuthInfo) (string, error) {
|
||||
cached, ok := backend.responseCache.Get(getCacheKey(remoteCommand, data))
|
||||
if ok {
|
||||
return cached.(string), nil
|
||||
cacheKey := getCacheKey(remoteCommand, data)
|
||||
item := backend.responseCache.Get(cacheKey)
|
||||
if item != nil {
|
||||
if item.Expired() {
|
||||
// reload in background
|
||||
go backend.reloadGroup.Do(cacheKey, func() (interface{}, error) {
|
||||
backend.SendRemoteCommand(remoteCommand, data, auth)
|
||||
return nil, nil
|
||||
})
|
||||
}
|
||||
return item.Value().(cachedResponseStr).String(), nil
|
||||
}
|
||||
return backend.SendRemoteCommand(remoteCommand, data, auth)
|
||||
}
|
||||
|
||||
// SendRemoteCommand performs a RPC call on the backend by POSTing to `/cmd/$remoteCommand`.
|
||||
//
|
||||
// The form data is as follows: `clientData` is the JSON in the `data` parameter
|
||||
// (should be retrieved from ClientMessage.Arguments), and either `username` or
|
||||
// `usernameClaimed` depending on whether AuthInfo.UsernameValidates is true is AuthInfo.TwitchUsername.
|
||||
// (should be retrieved from ClientMessage.Arguments), `username` is AuthInfo.TwitchUsername,
|
||||
// and `authenticated` is 1 or 0 depending on AuthInfo.UsernameValidated.
|
||||
//
|
||||
// 401 responses return an ErrAuthorizationNeeded.
|
||||
//
|
||||
// Non-2xx responses return the response body as an error to the client (application/json
|
||||
// responses are sent as-is, non-json are sent as a JSON string).
|
||||
//
|
||||
// If a 2xx response has the FFZ-Cache header, its value is used as a minimum number of
|
||||
// seconds to cache the response for. (Responses may be cached for longer, see
|
||||
// SendRemoteCommandCached and the cache implementation.)
|
||||
//
|
||||
// A successful response updates the Statistics.Health.Backend map.
|
||||
func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth AuthInfo) (responseStr string, err error) {
|
||||
destURL := fmt.Sprintf("%s/cmd/%s", backend.baseURL, remoteCommand)
|
||||
healthBucket := fmt.Sprintf("/cmd/%s", remoteCommand)
|
||||
|
@ -166,7 +206,11 @@ func (backend *backendInfo) SendRemoteCommand(remoteCommand, data string, auth A
|
|||
return "", fmt.Errorf("The RPC server returned a non-integer cache duration: %v", err)
|
||||
}
|
||||
duration := time.Duration(durSecs) * time.Second
|
||||
backend.responseCache.Set(getCacheKey(remoteCommand, data), responseStr, duration)
|
||||
backend.responseCache.Set(
|
||||
getCacheKey(remoteCommand, data),
|
||||
cachedResponseStr(responseStr),
|
||||
duration,
|
||||
)
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
|
|
@ -34,7 +34,7 @@ var commandHandlers = map[Command]CommandHandler{
|
|||
"survey": C2SSurvey,
|
||||
}
|
||||
|
||||
var bunchedCommands = []string{
|
||||
var bunchedCommands = []Command{
|
||||
"get_display_name",
|
||||
"get_emote",
|
||||
"get_emote_set",
|
||||
|
|
|
@ -205,7 +205,6 @@ func HTTPHandleRootURL(w http.ResponseWriter, r *http.Request) {
|
|||
updateSysMem()
|
||||
|
||||
if Statistics.SysMemFreeKB > 0 && Statistics.SysMemFreeKB < Configuration.MinMemoryKBytes {
|
||||
atomic.AddUint64(&Statistics.LowMemDroppedConnections, 1)
|
||||
w.WriteHeader(503)
|
||||
fmt.Fprint(w, "error: low memory")
|
||||
return
|
||||
|
|
|
@ -169,7 +169,6 @@ func updatePeriodicStats() {
|
|||
|
||||
{
|
||||
Statistics.Uptime = nowUpdate.Sub(Statistics.StartTime).String()
|
||||
Statistics.ResponseCacheItems = Backend.responseCache.ItemCount()
|
||||
}
|
||||
|
||||
{
|
||||
|
|
|
@ -156,8 +156,11 @@ func UnsubscribeSingleChat(client *ClientInfo, channelName string) {
|
|||
// - write lock to SubscriptionInfos
|
||||
// - write lock to ClientInfo
|
||||
func UnsubscribeAll(client *ClientInfo) {
|
||||
if StopAcceptingConnections {
|
||||
return // no need to remove from a high-contention list when the server is closing
|
||||
select {
|
||||
case <-StopAcceptingConnectionsCh:
|
||||
// Skip high-contention client removal operations while server shutting down
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
GlobalSubscriptionLock.Lock()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue