diff --git a/socketserver/server/handlecore.go b/socketserver/server/handlecore.go index 31bbe9d3..a8272b71 100644 --- a/socketserver/server/handlecore.go +++ b/socketserver/server/handlecore.go @@ -353,13 +353,16 @@ func getDeadline() time.Time { } func CloseConnection(conn *websocket.Conn, closeMsg websocket.CloseError) { - Statistics.DisconnectCodes[strconv.Itoa(closeMsg.Code)]++ closeTxt := closeMsg.Text if strings.Contains(closeTxt, "read: connection reset by peer") { closeTxt = "read: connection reset by peer" + } else if strings.Contains(closeTxt, "use of closed network connection") { + closeTxt = "read: use of closed network connection" } else if closeMsg.Code == 1001 { closeTxt = "clean shutdown" } + // todo kibana cannot analyze these + Statistics.DisconnectCodes[strconv.Itoa(closeMsg.Code)]++ Statistics.DisconnectReasons[closeTxt]++ conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(closeMsg.Code, closeMsg.Text), getDeadline()) diff --git a/socketserver/server/stats.go b/socketserver/server/stats.go index 0b9a8a30..34d0db25 100644 --- a/socketserver/server/stats.go +++ b/socketserver/server/stats.go @@ -119,9 +119,9 @@ func updatePeriodicStats() { Statistics.PubSubChannelCount = len(ChatSubscriptionInfo) ChatSubscriptionLock.RUnlock() - GlobalSubscriptionInfo.RLock() - Statistics.CurrentClientCount = uint64(len(GlobalSubscriptionInfo.Members)) - GlobalSubscriptionInfo.RUnlock() + GlobalSubscriptionLock.RLock() + Statistics.CurrentClientCount = uint64(len(GlobalSubscriptionInfo)) + GlobalSubscriptionLock.RUnlock() } { diff --git a/socketserver/server/subscriptions.go b/socketserver/server/subscriptions.go index 35b6020b..c9e85a1c 100644 --- a/socketserver/server/subscriptions.go +++ b/socketserver/server/subscriptions.go @@ -16,7 +16,8 @@ type SubscriberList struct { var ChatSubscriptionInfo map[string]*SubscriberList = make(map[string]*SubscriberList) var ChatSubscriptionLock sync.RWMutex -var GlobalSubscriptionInfo SubscriberList +var GlobalSubscriptionInfo []*ClientInfo +var GlobalSubscriptionLock sync.RWMutex func SubscribeChannel(client *ClientInfo, channelName string) { ChatSubscriptionLock.RLock() @@ -29,9 +30,9 @@ func SubscribeDefaults(client *ClientInfo) { } func SubscribeGlobal(client *ClientInfo) { - GlobalSubscriptionInfo.Lock() - AddToSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel) - GlobalSubscriptionInfo.Unlock() + GlobalSubscriptionLock.Lock() + AddToSliceCl(&GlobalSubscriptionInfo, client) + GlobalSubscriptionLock.Unlock() } func PublishToChannel(channel string, msg ClientMessage) (count int) { @@ -75,12 +76,15 @@ func PublishToMultiple(channels []string, msg ClientMessage) (count int) { } func PublishToAll(msg ClientMessage) (count int) { - GlobalSubscriptionInfo.RLock() - for _, msgChan := range GlobalSubscriptionInfo.Members { - msgChan <- msg + GlobalSubscriptionLock.RLock() + for _, client := range GlobalSubscriptionInfo { + select { + case client.MessageChannel <- msg: + case <-client.MsgChannelIsDone: + } count++ } - GlobalSubscriptionInfo.RUnlock() + GlobalSubscriptionLock.RUnlock() return } @@ -106,9 +110,9 @@ func UnsubscribeAll(client *ClientInfo) { client.PendingSubscriptionsBacklog = nil client.Mutex.Unlock() - GlobalSubscriptionInfo.Lock() - RemoveFromSliceC(&GlobalSubscriptionInfo.Members, client.MessageChannel) - GlobalSubscriptionInfo.Unlock() + GlobalSubscriptionLock.Lock() + RemoveFromSliceCl(&GlobalSubscriptionInfo, client) + GlobalSubscriptionLock.Unlock() ChatSubscriptionLock.RLock() client.Mutex.Lock() @@ -126,9 +130,9 @@ func UnsubscribeAll(client *ClientInfo) { } func unsubscribeAllClients() { - GlobalSubscriptionInfo.Lock() - GlobalSubscriptionInfo.Members = nil - GlobalSubscriptionInfo.Unlock() + GlobalSubscriptionLock.Lock() + GlobalSubscriptionInfo = nil + GlobalSubscriptionLock.Unlock() ChatSubscriptionLock.Lock() ChatSubscriptionInfo = make(map[string]*SubscriberList) ChatSubscriptionLock.Unlock() diff --git a/socketserver/server/utils.go b/socketserver/server/utils.go index 07866e4b..258acda1 100644 --- a/socketserver/server/utils.go +++ b/socketserver/server/utils.go @@ -160,6 +160,38 @@ func RemoveFromSliceC(ary *[]chan<- ClientMessage, val chan<- ClientMessage) boo return true } +func AddToSliceCl(ary *[]*ClientInfo, val *ClientInfo) bool { + slice := *ary + for _, v := range slice { + if v == val { + return false + } + } + + slice = append(slice, val) + *ary = slice + return true +} + +func RemoveFromSliceCl(ary *[]*ClientInfo, val *ClientInfo) bool { + slice := *ary + var idx int = -1 + for i, v := range slice { + if v == val { + idx = i + break + } + } + if idx == -1 { + return false + } + + slice[idx] = slice[len(slice)-1] + slice = slice[:len(slice)-1] + *ary = slice + return true +} + func AddToSliceB(ary *[]bunchSubscriber, client *ClientInfo, mid int) bool { newSub := bunchSubscriber{Client: client, MessageID: mid} slice := *ary