From 74bc15e0e90a67742f6a6deb4d8b338ba03d41f6 Mon Sep 17 00:00:00 2001 From: Kane York Date: Tue, 27 Oct 2015 21:21:06 -0700 Subject: [PATCH] lol commit messages --- socketserver/cmd/ffzsocketserver/console.go | 47 +++++++++++++++++ .../cmd/ffzsocketserver/socketserver.go | 2 + socketserver/internal/server/backend.go | 4 +- socketserver/internal/server/backlog.go | 25 +++++++++ socketserver/internal/server/backlog_test.go | 4 ++ socketserver/internal/server/commands.go | 14 ++--- socketserver/internal/server/handlecore.go | 51 +++++++++++-------- src/socket.js | 1 - 8 files changed, 117 insertions(+), 31 deletions(-) create mode 100644 socketserver/cmd/ffzsocketserver/console.go diff --git a/socketserver/cmd/ffzsocketserver/console.go b/socketserver/cmd/ffzsocketserver/console.go new file mode 100644 index 00000000..584b68bd --- /dev/null +++ b/socketserver/cmd/ffzsocketserver/console.go @@ -0,0 +1,47 @@ +package main + +import ( + "../../internal/server" + "fmt" + "github.com/abiosoft/ishell" + "runtime" +) + +func commandLineConsole() { + + shell := ishell.NewShell() + + shell.Register("clientcount", func(args ...string) (string, error) { + server.GlobalSubscriptionInfo.RLock() + count := len(server.GlobalSubscriptionInfo.Members) + server.GlobalSubscriptionInfo.RUnlock() + return fmt.Sprintln(count, "clients connected"), nil + }) + + shell.Register("globalnotice", func(args ...string) (string, error) { + msg := server.ClientMessage{ + MessageID: -1, + Command: "message", + Arguments: args[0], + } + server.PublishToAll(msg) + return "Message sent.", nil + }) + + shell.Register("memstatsbysize", func(args ...string) (string, error) { + runtime.GC() + + m := runtime.MemStats{} + runtime.ReadMemStats(&m) + for _, val := range m.BySize { + if val.Mallocs == 0 { + continue + } + shell.Println(val.Size, "bytes:", val.Mallocs, "allocs", val.Frees, "frees") + } + shell.Println(m.NumGC, "collections occurred") + return "", nil + }) + + shell.Start() +} diff --git a/socketserver/cmd/ffzsocketserver/socketserver.go b/socketserver/cmd/ffzsocketserver/socketserver.go index 5de7a059..d0f7a6f6 100644 --- a/socketserver/cmd/ffzsocketserver/socketserver.go +++ b/socketserver/cmd/ffzsocketserver/socketserver.go @@ -46,6 +46,8 @@ func main() { server.SetupServerAndHandle(conf, httpServer.TLSConfig, nil) + go commandLineConsole() + if conf.UseSSL { err = httpServer.ListenAndServeTLS(conf.SSLCertificateFile, conf.SSLKeyFile) } else { diff --git a/socketserver/internal/server/backend.go b/socketserver/internal/server/backend.go index 72d74b22..aae7ce79 100644 --- a/socketserver/internal/server/backend.go +++ b/socketserver/internal/server/backend.go @@ -182,9 +182,9 @@ func FetchBacklogData(chatSubs []string) ([]ClientMessage, error) { func GenerateKeys(outputFile, serverId, theirPublicStr string) { var err error output := ConfigFile{ - ListenAddr: "0.0.0.0:8001", + ListenAddr: "0.0.0.0:8001", SocketOrigin: "localhost:8001", - BackendUrl: "http://localhost:8002/ffz", + BackendUrl: "http://localhost:8002/ffz", BannerHTML: ` CatBag diff --git a/socketserver/internal/server/backlog.go b/socketserver/internal/server/backlog.go index c08565fb..278944a0 100644 --- a/socketserver/internal/server/backlog.go +++ b/socketserver/internal/server/backlog.go @@ -214,6 +214,31 @@ func SendTimedBacklogMessages(client *ClientInfo, disconnectTime time.Time) { client.Mutex.Unlock() } +func TimedBacklogJanitor() { + for { + time.Sleep(1 * time.Hour) + CleanupTimedBacklogMessages() + } +} + +func CleanupTimedBacklogMessages() { + CacheListsLock.Lock() + oneHourAgo := time.Now().Add(-24 * time.Hour) + globIdx := FindFirstNewMessage(tgmarray(CachedGlobalMessages), oneHourAgo) + if globIdx != -1 { + newGlobMsgs := make([]TimestampedGlobalMessage, len(CachedGlobalMessages)-globIdx) + copy(newGlobMsgs, CachedGlobalMessages[globIdx:]) + CachedGlobalMessages = newGlobMsgs + } + chanIdx := FindFirstNewMessage(tmmarray(CachedChannelMessages), oneHourAgo) + if chanIdx != -1 { + newChanMsgs := make([]TimestampedMultichatMessage, len(CachedChannelMessages)-chanIdx) + copy(newChanMsgs, CachedChannelMessages[chanIdx:]) + CachedChannelMessages = newChanMsgs + } + CacheListsLock.Unlock() +} + func InsertionSort(ary sort.Interface) { for i := 1; i < ary.Len(); i++ { for j := i; j > 0 && ary.Less(j, j-1); j-- { diff --git a/socketserver/internal/server/backlog_test.go b/socketserver/internal/server/backlog_test.go index 68757587..99ce4f5a 100644 --- a/socketserver/internal/server/backlog_test.go +++ b/socketserver/internal/server/backlog_test.go @@ -5,6 +5,10 @@ import ( "time" ) +func TestCleanupBacklogMessages(t *testing.T) { + +} + func TestFindFirstNewMessageEmpty(t *testing.T) { CachedGlobalMessages = []TimestampedGlobalMessage{} i := FindFirstNewMessage(tgmarray(CachedGlobalMessages), time.Unix(10, 0)) diff --git a/socketserver/internal/server/commands.go b/socketserver/internal/server/commands.go index c947bf08..aba9df08 100644 --- a/socketserver/internal/server/commands.go +++ b/socketserver/internal/server/commands.go @@ -127,13 +127,13 @@ func HandleSub(conn *websocket.Conn, client *ClientInfo, msg ClientMessage) (rms AddToSliceS(&client.CurrentChannels, channel) client.PendingSubscriptionsBacklog = append(client.PendingSubscriptionsBacklog, channel) - if client.MakePendingRequests == nil { - client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client)) - } else { - if !client.MakePendingRequests.Reset(ChannelInfoDelay) { - client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client)) - } - } + // if client.MakePendingRequests == nil { + // client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client)) + // } else { + // if !client.MakePendingRequests.Reset(ChannelInfoDelay) { + // client.MakePendingRequests = time.AfterFunc(ChannelInfoDelay, GetSubscriptionBacklogFor(conn, client)) + // } + // } client.Mutex.Unlock() diff --git a/socketserver/internal/server/handlecore.go b/socketserver/internal/server/handlecore.go index f227db8b..b673571e 100644 --- a/socketserver/internal/server/handlecore.go +++ b/socketserver/internal/server/handlecore.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "golang.org/x/net/websocket" + "io" "log" "net/http" "strconv" @@ -74,43 +75,44 @@ var gconfig *ConfigFile // Create a websocket.Server with the options from the provided Config. func setupServer(config *ConfigFile, tlsConfig *tls.Config) *websocket.Server { gconfig = config - sockConf, err := websocket.NewConfig("/", config.SocketOrigin) - if err != nil { - log.Fatal(err) - } + // sockConf, err := websocket.NewConfig("/", config.SocketOrigin) + // if err != nil { + // log.Fatal(err) + // } SetupBackend(config) - if config.UseSSL { - cert, err := tls.LoadX509KeyPair(config.SSLCertificateFile, config.SSLKeyFile) - if err != nil { - log.Fatal(err) - } - tlsConfig.Certificates = []tls.Certificate{cert} - tlsConfig.ServerName = config.SocketOrigin - tlsConfig.BuildNameToCertificate() - sockConf.TlsConfig = tlsConfig + // if config.UseSSL { + // cert, err := tls.LoadX509KeyPair(config.SSLCertificateFile, config.SSLKeyFile) + // if err != nil { + // log.Fatal(err) + // } + // tlsConfig.Certificates = []tls.Certificate{cert} + // tlsConfig.ServerName = config.SocketOrigin + // tlsConfig.BuildNameToCertificate() + // sockConf.TlsConfig = tlsConfig + // } - } - - sockServer := &websocket.Server{} - sockServer.Config = *sockConf - sockServer.Handler = HandleSocketConnection + // sockServer := &websocket.Server{} + // sockServer.Config = *sockConf + // sockServer.Handler = HandleSocketConnection go deadChannelReaper() - return sockServer + return nil } // Set up a websocket listener and register it on /. // (Uses http.DefaultServeMux .) func SetupServerAndHandle(config *ConfigFile, tlsConfig *tls.Config, serveMux *http.ServeMux) { - sockServer := setupServer(config, tlsConfig) + _ = setupServer(config, tlsConfig) + log.Print("hi") if serveMux == nil { serveMux = http.DefaultServeMux } - serveMux.HandleFunc("/", ServeWebsocketOrCatbag(sockServer.ServeHTTP)) + handler := websocket.Handler(HandleSocketConnection) + serveMux.HandleFunc("/", ServeWebsocketOrCatbag(handler.ServeHTTP)) serveMux.HandleFunc("/pub_msg", HBackendPublishRequest) serveMux.HandleFunc("/dump_backlog", HBackendDumpBacklog) serveMux.HandleFunc("/update_and_pub", HBackendUpdateAndPublish) @@ -132,6 +134,8 @@ func ServeWebsocketOrCatbag(sockfunc func(http.ResponseWriter, *http.Request)) h func HandleSocketConnection(conn *websocket.Conn) { // websocket.Conn is a ReadWriteCloser + fmt.Println("Got socket connection from", conn.Request().RemoteAddr) + var _closer sync.Once closer := func() { _closer.Do(func() { @@ -156,6 +160,10 @@ func HandleSocketConnection(conn *websocket.Conn) { } clientChan <- msg } + + if err != io.EOF { + fmt.Println("Error while reading from client:", err) + } errorChan <- err close(errorChan) close(clientChan) @@ -198,6 +206,7 @@ RunLoop: } // Exit + fmt.Println("End socket connection from", conn.Request().RemoteAddr) // Launch message draining goroutine - we aren't out of the pub/sub records go func() { diff --git a/src/socket.js b/src/socket.js index 99847bfc..b9b9c210 100644 --- a/src/socket.js +++ b/src/socket.js @@ -37,7 +37,6 @@ FFZ.prototype.ws_iframe = function() { FFZ.prototype.ws_create = function() { // Disable sockets for now. - return; var f = this, ws;