diff --git a/client.go b/client.go index e2de7c69..3f75e370 100644 --- a/client.go +++ b/client.go @@ -995,7 +995,7 @@ func (c *Client) close(disconnect Disconnect) error { c.node.logger.log(newLogEntry(LogLevelDebug, "closing client connection", map[string]any{"client": c.uid, "user": c.user, "reason": disconnect.Reason})) } if disconnect.Code != DisconnectConnectionClosed.Code { - incServerDisconnect(disconnect.Code) + c.node.metrics.incServerDisconnect(disconnect.Code) } if c.eventHub.disconnectHandler != nil && prevStatus == statusConnected { c.eventHub.disconnectHandler(DisconnectEvent{ @@ -1095,7 +1095,7 @@ func isPong(cmd *protocol.Command) bool { func (c *Client) handleCommandFinished(cmd *protocol.Command, method commandMethodType, disconnect *Disconnect, reply *protocol.Reply, started time.Time) { defer func() { - observeCommandDuration(method, time.Since(started)) + c.node.metrics.observeCommandDuration(method, time.Since(started)) }() if c.node.clientEvents.commandProcessedHandler != nil { event := newCommandProcessedEvent(cmd, disconnect, reply, started) @@ -1105,7 +1105,7 @@ func (c *Client) handleCommandFinished(cmd *protocol.Command, method commandMeth func (c *Client) handleCommandDispatchError(cmd *protocol.Command, method commandMethodType, err error, started time.Time) (*Disconnect, bool) { defer func() { - observeCommandDuration(method, time.Since(started)) + c.node.metrics.observeCommandDuration(method, time.Since(started)) }() switch t := err.(type) { case *Disconnect: @@ -1154,7 +1154,7 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec if metricChannel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesReceived { channelGroup = c.node.config.GetChannelNamespaceLabel(metricChannel) } - incTransportMessagesReceived(c.transport.Name(), channelGroup, cmdSize) + c.node.metrics.incTransportMessagesReceived(c.transport.Name(), channelGroup, cmdSize) }() if isPong(cmd) { @@ -1284,7 +1284,7 @@ func (c *Client) writeEncodedCommandReply(method commandMethodType, cmd *protoco if c.node.LogEnabled(LogLevelInfo) { c.node.logger.log(newLogEntry(LogLevelInfo, "client command error", map[string]any{"reply": fmt.Sprintf("%v", rep), "command": fmt.Sprintf("%v", cmd), "client": c.ID(), "user": c.UserID(), "error": rep.Error.Message, "code": rep.Error.Code})) } - incReplyError(method, rep.Error.Code) + c.node.metrics.incReplyError(method, rep.Error.Code) } protoType := c.transport.Protocol().toProto() @@ -2051,7 +2051,7 @@ func (c *Client) writeError(method commandMethodType, cmd *protocol.Command, err func (c *Client) writeDisconnectOrErrorFlush(method commandMethodType, cmd *protocol.Command, replyError error, started time.Time, rw *replyWriter) { defer func() { - observeCommandDuration(method, time.Since(started)) + c.node.metrics.observeCommandDuration(method, time.Since(started)) }() switch t := replyError.(type) { case *Disconnect: @@ -2125,7 +2125,7 @@ func (c *Client) getRPCCommandReply(res *protocol.RPCResult) (*protocol.Reply, e func (c *Client) handleSend(req *protocol.SendRequest, cmd *protocol.Command, started time.Time) error { // Send handler is a bit special since it's a one way command: client does not expect any reply. if c.eventHub.messageHandler == nil { - observeCommandDuration(commandSend, time.Since(started)) + c.node.metrics.observeCommandDuration(commandSend, time.Since(started)) // Return DisconnectNotAvailable here since otherwise client won't even know // server does not have asynchronous message handler set. return DisconnectNotAvailable @@ -2451,7 +2451,7 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q if item.Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesSent { channelGroup = c.node.config.GetChannelNamespaceLabel(item.Channel) } - incTransportMessagesSent(c.transport.Name(), channelGroup, len(item.Data)) + c.node.metrics.incTransportMessagesSent(c.transport.Name(), channelGroup, len(item.Data)) if c.node.clientEvents.transportWriteHandler != nil { pass := c.node.clientEvents.transportWriteHandler(c, TransportWriteEvent(item)) @@ -2488,7 +2488,7 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q if items[i].Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesSent { channelGroup = c.node.config.GetChannelNamespaceLabel(items[i].Channel) } - incTransportMessagesSent(c.transport.Name(), channelGroup, len(items[i].Data)) + c.node.metrics.incTransportMessagesSent(c.transport.Name(), channelGroup, len(items[i].Data)) } writeMu.Lock() defer writeMu.Unlock() @@ -2757,7 +2757,7 @@ func (c *Client) subscribeCmd(req *protocol.SubscribeRequest, reply SubscribeRep latestOffset = historyResult.Offset latestEpoch = historyResult.Epoch res.Recovered = false - incRecover(res.Recovered) + c.node.metrics.incRecover(res.Recovered) } else { c.node.logger.log(newLogEntry(LogLevelError, "error on recover", map[string]any{"channel": channel, "user": c.user, "client": c.uid, "error": err.Error()})) c.pubSubSync.StopBuffering(channel) @@ -2773,7 +2773,7 @@ func (c *Client) subscribeCmd(req *protocol.SubscribeRequest, reply SubscribeRep var recovered bool recoveredPubs, recovered = isRecovered(historyResult, cmdOffset, cmdEpoch) res.Recovered = recovered - incRecover(res.Recovered) + c.node.metrics.incRecover(res.Recovered) } } else { streamTop, err := c.node.streamTop(channel, reply.Options.HistoryMetaTTL) @@ -3102,7 +3102,7 @@ func (c *Client) logDisconnectBadRequest(message string) error { func (c *Client) logWriteInternalErrorFlush(method commandMethodType, cmd *protocol.Command, err error, message string, started time.Time, rw *replyWriter) { defer func() { - observeCommandDuration(method, time.Since(started)) + c.node.metrics.observeCommandDuration(method, time.Since(started)) }() if clientErr, ok := err.(*Error); ok { errorReply := &protocol.Reply{Error: clientErr.toProto()} diff --git a/handler_http_stream.go b/handler_http_stream.go index ffb7bb53..78170342 100644 --- a/handler_http_stream.go +++ b/handler_http_stream.go @@ -39,7 +39,7 @@ const defaultMaxHTTPStreamingBodySize = 64 * 1024 const streamingResponseWriteTimeout = time.Second func (h *HTTPStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - incTransportConnect(transportHTTPStream) + h.node.metrics.incTransportConnect(transportHTTPStream) if r.Method == http.MethodOptions { w.WriteHeader(http.StatusOK) diff --git a/handler_sockjs.go b/handler_sockjs.go index eeb8f330..634f0fdc 100644 --- a/handler_sockjs.go +++ b/handler_sockjs.go @@ -122,7 +122,7 @@ func (s *SockjsHandler) sockJSHandler(sess sockjs.Session) { // sockJSHandler called when new client connection comes to SockJS endpoint. func (s *SockjsHandler) handleSession(sess sockjs.Session) { - incTransportConnect(transportSockJS) + s.node.metrics.incTransportConnect(transportSockJS) // Separate goroutine for better GC of caller's data. go func() { diff --git a/handler_sse.go b/handler_sse.go index 506ad9a3..d3e3efa5 100644 --- a/handler_sse.go +++ b/handler_sse.go @@ -40,7 +40,7 @@ const connectUrlParam = "cf_connect" const defaultMaxSSEBodySize = 64 * 1024 func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - incTransportConnect(transportSSE) + h.node.metrics.incTransportConnect(transportSSE) var requestData []byte if r.Method == http.MethodGet { diff --git a/handler_websocket.go b/handler_websocket.go index 79b24a0a..b94d6f37 100644 --- a/handler_websocket.go +++ b/handler_websocket.go @@ -99,7 +99,7 @@ func NewWebsocketHandler(node *Node, config WebsocketConfig) *WebsocketHandler { } func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { - incTransportConnect(transportWebsocket) + s.node.metrics.incTransportConnect(transportWebsocket) var protoType = ProtocolTypeJSON var useFramePingPong bool diff --git a/metrics.go b/metrics.go index 46eb3882..37bc0a30 100644 --- a/metrics.go +++ b/metrics.go @@ -14,7 +14,7 @@ var defaultMetricsNamespace = "centrifuge" var registryMu sync.RWMutex -var ( +type metrics struct { messagesSentCount *prometheus.CounterVec messagesReceivedCount *prometheus.CounterVec actionCount *prometheus.CounterVec @@ -86,7 +86,7 @@ var ( commandDurationRefresh prometheus.Observer commandDurationSubRefresh prometheus.Observer commandDurationUnknown prometheus.Observer -) +} type commandMethodType int32 @@ -122,7 +122,7 @@ var ( } ) -func observeCommandDuration(method commandMethodType, d time.Duration) { +func (m *metrics) observeCommandDuration(method commandMethodType, d time.Duration) { registryMu.RLock() defer registryMu.RUnlock() @@ -130,110 +130,83 @@ func observeCommandDuration(method commandMethodType, d time.Duration) { switch method { case commandConnect: - observer = commandDurationConnect + observer = m.commandDurationConnect case commandSubscribe: - observer = commandDurationSubscribe + observer = m.commandDurationSubscribe case commandUnsubscribe: - observer = commandDurationUnsubscribe + observer = m.commandDurationUnsubscribe case commandPublish: - observer = commandDurationPublish + observer = m.commandDurationPublish case commandPresence: - observer = commandDurationPresence + observer = m.commandDurationPresence case commandPresenceStats: - observer = commandDurationPresenceStats + observer = m.commandDurationPresenceStats case commandHistory: - observer = commandDurationHistory + observer = m.commandDurationHistory case commandPing: - observer = commandDurationPing + observer = m.commandDurationPing case commandSend: - observer = commandDurationSend + observer = m.commandDurationSend case commandRpc: - observer = commandDurationRPC + observer = m.commandDurationRPC case commandRefresh: - observer = commandDurationRefresh + observer = m.commandDurationRefresh case commandSubRefresh: - observer = commandDurationSubRefresh + observer = m.commandDurationSubRefresh default: - observer = commandDurationUnknown + observer = m.commandDurationUnknown } observer.Observe(d.Seconds()) } -func setBuildInfo(version string) { - registryMu.RLock() - defer registryMu.RUnlock() - - buildInfoGauge.WithLabelValues(version).Set(1) +func (m *metrics) setBuildInfo(version string) { + m.buildInfoGauge.WithLabelValues(version).Set(1) } -func setNumClients(n float64) { - registryMu.RLock() - defer registryMu.RUnlock() - - numClientsGauge.Set(n) +func (m *metrics) setNumClients(n float64) { + m.numClientsGauge.Set(n) } -func setNumUsers(n float64) { - registryMu.RLock() - defer registryMu.RUnlock() - - numUsersGauge.Set(n) +func (m *metrics) setNumUsers(n float64) { + m.numUsersGauge.Set(n) } -func setNumSubscriptions(n float64) { - registryMu.RLock() - defer registryMu.RUnlock() - - numSubsGauge.Set(n) +func (m *metrics) setNumSubscriptions(n float64) { + m.numSubsGauge.Set(n) } -func setNumChannels(n float64) { - registryMu.RLock() - defer registryMu.RUnlock() - - numChannelsGauge.Set(n) +func (m *metrics) setNumChannels(n float64) { + m.numChannelsGauge.Set(n) } -func setNumNodes(n float64) { - registryMu.RLock() - defer registryMu.RUnlock() - - numNodesGauge.Set(n) +func (m *metrics) setNumNodes(n float64) { + m.numNodesGauge.Set(n) } -func incReplyError(method commandMethodType, code uint32) { - registryMu.RLock() - defer registryMu.RUnlock() - - replyErrorCount.WithLabelValues(commandMethodTypeName[int32(method)], strconv.FormatUint(uint64(code), 10)).Inc() +func (m *metrics) incReplyError(method commandMethodType, code uint32) { + m.replyErrorCount.WithLabelValues(commandMethodTypeName[int32(method)], strconv.FormatUint(uint64(code), 10)).Inc() } -func incRecover(success bool) { - registryMu.RLock() - defer registryMu.RUnlock() - +func (m *metrics) incRecover(success bool) { if success { - recoverCountYes.Inc() + m.recoverCountYes.Inc() } else { - recoverCountNo.Inc() + m.recoverCountNo.Inc() } } -func incTransportConnect(transport string) { - registryMu.RLock() - defer registryMu.RUnlock() - +func (m *metrics) incTransportConnect(transport string) { switch transport { case transportWebsocket: - transportConnectCountWebsocket.Inc() + m.transportConnectCountWebsocket.Inc() case transportSockJS: - transportConnectCountSockJS.Inc() + m.transportConnectCountSockJS.Inc() case transportSSE: - transportConnectCountSSE.Inc() + m.transportConnectCountSSE.Inc() case transportHTTPStream: - transportConnectCountHTTPStream.Inc() + m.transportConnectCountHTTPStream.Inc() default: - transportConnectCount.WithLabelValues(transport).Inc() + m.transportConnectCount.WithLabelValues(transport).Inc() } } @@ -259,10 +232,7 @@ func init() { transportMessagesReceivedSizeCache = make(map[transportMessageLabels]prometheus.Counter) } -func incTransportMessagesSent(transport string, channelGroup string, size int) { - registryMu.RLock() - defer registryMu.RUnlock() - +func (m *metrics) incTransportMessagesSent(transport string, channelGroup string, size int) { labels := transportMessageLabels{ Transport: transport, ChannelGroup: channelGroup, @@ -274,14 +244,14 @@ func incTransportMessagesSent(transport string, channelGroup string, size int) { messagesSentCacheMu.RUnlock() if !okSent { - counterSent = transportMessagesSent.WithLabelValues(transport, channelGroup) + counterSent = m.transportMessagesSent.WithLabelValues(transport, channelGroup) messagesSentCacheMu.Lock() transportMessagesSentCache[labels] = counterSent messagesSentCacheMu.Unlock() } if !okSentSize { - counterSentSize = transportMessagesSentSize.WithLabelValues(transport, channelGroup) + counterSentSize = m.transportMessagesSentSize.WithLabelValues(transport, channelGroup) messagesSentCacheMu.Lock() transportMessagesSentSizeCache[labels] = counterSentSize messagesSentCacheMu.Unlock() @@ -290,7 +260,7 @@ func incTransportMessagesSent(transport string, channelGroup string, size int) { counterSentSize.Add(float64(size)) } -func incTransportMessagesReceived(transport string, channelGroup string, size int) { +func (m *metrics) incTransportMessagesReceived(transport string, channelGroup string, size int) { registryMu.RLock() defer registryMu.RUnlock() @@ -305,14 +275,14 @@ func incTransportMessagesReceived(transport string, channelGroup string, size in messagesReceivedCacheMu.RUnlock() if !okReceived { - counterReceived = transportMessagesReceived.WithLabelValues(transport, channelGroup) + counterReceived = m.transportMessagesReceived.WithLabelValues(transport, channelGroup) messagesReceivedCacheMu.Lock() transportMessagesReceivedCache[labels] = counterReceived messagesReceivedCacheMu.Unlock() } if !okReceivedSize { - counterReceivedSize = transportMessagesReceivedSize.WithLabelValues(transport, channelGroup) + counterReceivedSize = m.transportMessagesReceivedSize.WithLabelValues(transport, channelGroup) messagesReceivedCacheMu.Lock() transportMessagesReceivedSizeCache[labels] = counterReceivedSize messagesReceivedCacheMu.Unlock() @@ -321,108 +291,91 @@ func incTransportMessagesReceived(transport string, channelGroup string, size in counterReceivedSize.Add(float64(size)) } -func incServerDisconnect(code uint32) { - registryMu.RLock() - defer registryMu.RUnlock() - - serverDisconnectCount.WithLabelValues(strconv.FormatUint(uint64(code), 10)).Inc() +func (m *metrics) incServerDisconnect(code uint32) { + m.serverDisconnectCount.WithLabelValues(strconv.FormatUint(uint64(code), 10)).Inc() } -func incMessagesSent(msgType string) { - registryMu.RLock() - defer registryMu.RUnlock() - +func (m *metrics) incMessagesSent(msgType string) { switch msgType { case "publication": - messagesSentCountPublication.Inc() + m.messagesSentCountPublication.Inc() case "join": - messagesSentCountJoin.Inc() + m.messagesSentCountJoin.Inc() case "leave": - messagesSentCountLeave.Inc() + m.messagesSentCountLeave.Inc() case "control": - messagesSentCountControl.Inc() + m.messagesSentCountControl.Inc() default: - messagesSentCount.WithLabelValues(msgType).Inc() + m.messagesSentCount.WithLabelValues(msgType).Inc() } } -func incMessagesReceived(msgType string) { - registryMu.RLock() - defer registryMu.RUnlock() - +func (m *metrics) incMessagesReceived(msgType string) { switch msgType { case "publication": - messagesReceivedCountPublication.Inc() + m.messagesReceivedCountPublication.Inc() case "join": - messagesReceivedCountJoin.Inc() + m.messagesReceivedCountJoin.Inc() case "leave": - messagesReceivedCountLeave.Inc() + m.messagesReceivedCountLeave.Inc() case "control": - messagesReceivedCountControl.Inc() + m.messagesReceivedCountControl.Inc() default: - messagesReceivedCount.WithLabelValues(msgType).Inc() + m.messagesReceivedCount.WithLabelValues(msgType).Inc() } } -func incMessagesBroadcasted(msgType string, numSubscribers int) { - registryMu.RLock() - defer registryMu.RUnlock() - +func (m *metrics) incMessagesBroadcasted(msgType string, numSubscribers int) { switch msgType { case "publication": - messagesBroadcastedPublication.Add(float64(numSubscribers)) + m.messagesBroadcastedPublication.Add(float64(numSubscribers)) case "join": - messagesBroadcastedJoin.Add(float64(numSubscribers)) + m.messagesBroadcastedJoin.Add(float64(numSubscribers)) case "leave": - messagesBroadcastedLeave.Add(float64(numSubscribers)) + m.messagesBroadcastedLeave.Add(float64(numSubscribers)) default: - messagesBroadcastedCount.WithLabelValues(msgType).Add(float64(numSubscribers)) + m.messagesBroadcastedCount.WithLabelValues(msgType).Add(float64(numSubscribers)) } } -func incActionCount(action string) { - registryMu.RLock() - defer registryMu.RUnlock() - +func (m *metrics) incActionCount(action string) { switch action { case "add_client": - actionCountAddClient.Inc() + m.actionCountAddClient.Inc() case "remove_client": - actionCountRemoveClient.Inc() + m.actionCountRemoveClient.Inc() case "add_subscription": - actionCountAddSub.Inc() + m.actionCountAddSub.Inc() case "remove_subscription": - actionCountRemoveSub.Inc() + m.actionCountRemoveSub.Inc() case "add_presence": - actionCountAddPresence.Inc() + m.actionCountAddPresence.Inc() case "remove_presence": - actionCountRemovePresence.Inc() + m.actionCountRemovePresence.Inc() case "presence": - actionCountPresence.Inc() + m.actionCountPresence.Inc() case "presence_stats": - actionCountPresenceStats.Inc() + m.actionCountPresenceStats.Inc() case "history": - actionCountHistory.Inc() + m.actionCountHistory.Inc() case "history_recover": - actionCountHistoryRecover.Inc() + m.actionCountHistoryRecover.Inc() case "history_stream_top": - actionCountHistoryStreamTop.Inc() + m.actionCountHistoryStreamTop.Inc() case "history_remove": - actionCountHistoryRemove.Inc() + m.actionCountHistoryRemove.Inc() case "survey": - actionCountSurvey.Inc() + m.actionCountSurvey.Inc() case "notify": - actionCountNotify.Inc() + m.actionCountNotify.Inc() } } -func observeSurveyDuration(op string, d time.Duration) { - registryMu.RLock() - surveyDurationSummary.WithLabelValues(op).Observe(d.Seconds()) - registryMu.RUnlock() +func (m *metrics) observeSurveyDuration(op string, d time.Duration) { + m.surveyDurationSummary.WithLabelValues(op).Observe(d.Seconds()) } -func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string) error { +func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string) (*metrics, error) { registryMu.Lock() defer registryMu.Unlock() @@ -433,77 +386,79 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string registry = prometheus.DefaultRegisterer } - messagesSentCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + m := &metrics{} + + m.messagesSentCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "node", Name: "messages_sent_count", Help: "Number of messages sent.", }, []string{"type"}) - messagesReceivedCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.messagesReceivedCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "node", Name: "messages_received_count", Help: "Number of messages received from engine.", }, []string{"type"}) - messagesBroadcastedCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.messagesBroadcastedCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "node", Name: "messages_broadcasted_count", Help: "Number of messages broadcasted to subscribers.", }, []string{"type"}) - actionCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.actionCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "node", Name: "action_count", Help: "Number of node actions called.", }, []string{"action"}) - numClientsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + m.numClientsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, Subsystem: "node", Name: "num_clients", Help: "Number of clients connected.", }) - numUsersGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + m.numUsersGauge = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, Subsystem: "node", Name: "num_users", Help: "Number of unique users connected.", }) - numSubsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + m.numSubsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, Subsystem: "node", Name: "num_subscriptions", Help: "Number of subscriptions.", }) - numNodesGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + m.numNodesGauge = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, Subsystem: "node", Name: "num_nodes", Help: "Number of nodes in cluster.", }) - buildInfoGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + m.buildInfoGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: metricsNamespace, Subsystem: "node", Name: "build", Help: "Node build info.", }, []string{"version"}) - numChannelsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + m.numChannelsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, Subsystem: "node", Name: "num_channels", Help: "Number of channels with one or more subscribers.", }) - surveyDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + m.surveyDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: metricsNamespace, Subsystem: "node", Name: "survey_duration_seconds", @@ -511,21 +466,21 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string Help: "Survey duration summary.", }, []string{"op"}) - replyErrorCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.replyErrorCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "client", Name: "num_reply_errors", Help: "Number of errors in replies sent to clients.", }, []string{"method", "code"}) - serverDisconnectCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.serverDisconnectCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "client", Name: "num_server_disconnects", Help: "Number of server initiated disconnects.", }, []string{"code"}) - commandDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + m.commandDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: metricsNamespace, Subsystem: "client", Name: "command_duration_seconds", @@ -533,162 +488,162 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string Help: "Client command duration summary.", }, []string{"method"}) - recoverCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.recoverCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "client", Name: "recover", Help: "Count of recover operations.", }, []string{"recovered"}) - transportConnectCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.transportConnectCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "transport", Name: "connect_count", Help: "Number of connections to specific transport.", }, []string{"transport"}) - transportMessagesSent = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.transportMessagesSent = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "transport", Name: "messages_sent", Help: "Number of messages sent over specific transport.", }, []string{"transport", "channel_namespace"}) - transportMessagesSentSize = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.transportMessagesSentSize = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "transport", Name: "messages_sent_size", Help: "Size in bytes of messages sent over specific transport.", }, []string{"transport", "channel_namespace"}) - transportMessagesReceived = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.transportMessagesReceived = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "transport", Name: "messages_received", Help: "Number of messages received over specific transport.", }, []string{"transport", "channel_namespace"}) - transportMessagesReceivedSize = prometheus.NewCounterVec(prometheus.CounterOpts{ + m.transportMessagesReceivedSize = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "transport", Name: "messages_received_size", Help: "Size in bytes of messages received over specific transport.", }, []string{"transport", "channel_namespace"}) - messagesReceivedCountPublication = messagesReceivedCount.WithLabelValues("publication") - messagesReceivedCountJoin = messagesReceivedCount.WithLabelValues("join") - messagesReceivedCountLeave = messagesReceivedCount.WithLabelValues("leave") - messagesReceivedCountControl = messagesReceivedCount.WithLabelValues("control") - - messagesSentCountPublication = messagesSentCount.WithLabelValues("publication") - messagesSentCountJoin = messagesSentCount.WithLabelValues("join") - messagesSentCountLeave = messagesSentCount.WithLabelValues("leave") - messagesSentCountControl = messagesSentCount.WithLabelValues("control") - - messagesBroadcastedPublication = messagesBroadcastedCount.WithLabelValues("publication") - messagesBroadcastedJoin = messagesBroadcastedCount.WithLabelValues("join") - messagesBroadcastedLeave = messagesBroadcastedCount.WithLabelValues("leave") - - actionCountAddClient = actionCount.WithLabelValues("add_client") - actionCountRemoveClient = actionCount.WithLabelValues("remove_client") - actionCountAddSub = actionCount.WithLabelValues("add_subscription") - actionCountRemoveSub = actionCount.WithLabelValues("remove_subscription") - actionCountAddPresence = actionCount.WithLabelValues("add_presence") - actionCountRemovePresence = actionCount.WithLabelValues("remove_presence") - actionCountPresence = actionCount.WithLabelValues("presence") - actionCountPresenceStats = actionCount.WithLabelValues("presence_stats") - actionCountHistory = actionCount.WithLabelValues("history") - actionCountHistoryRecover = actionCount.WithLabelValues("history_recover") - actionCountHistoryStreamTop = actionCount.WithLabelValues("history_stream_top") - actionCountHistoryRemove = actionCount.WithLabelValues("history_remove") - actionCountSurvey = actionCount.WithLabelValues("survey") - actionCountNotify = actionCount.WithLabelValues("notify") - - recoverCountYes = recoverCount.WithLabelValues("yes") - recoverCountNo = recoverCount.WithLabelValues("no") - - transportConnectCountWebsocket = transportConnectCount.WithLabelValues(transportWebsocket) - transportConnectCountSockJS = transportConnectCount.WithLabelValues(transportSockJS) - transportConnectCountHTTPStream = transportConnectCount.WithLabelValues(transportHTTPStream) - transportConnectCountSSE = transportConnectCount.WithLabelValues(transportSSE) + m.messagesReceivedCountPublication = m.messagesReceivedCount.WithLabelValues("publication") + m.messagesReceivedCountJoin = m.messagesReceivedCount.WithLabelValues("join") + m.messagesReceivedCountLeave = m.messagesReceivedCount.WithLabelValues("leave") + m.messagesReceivedCountControl = m.messagesReceivedCount.WithLabelValues("control") + + m.messagesSentCountPublication = m.messagesSentCount.WithLabelValues("publication") + m.messagesSentCountJoin = m.messagesSentCount.WithLabelValues("join") + m.messagesSentCountLeave = m.messagesSentCount.WithLabelValues("leave") + m.messagesSentCountControl = m.messagesSentCount.WithLabelValues("control") + + m.messagesBroadcastedPublication = m.messagesBroadcastedCount.WithLabelValues("publication") + m.messagesBroadcastedJoin = m.messagesBroadcastedCount.WithLabelValues("join") + m.messagesBroadcastedLeave = m.messagesBroadcastedCount.WithLabelValues("leave") + + m.actionCountAddClient = m.actionCount.WithLabelValues("add_client") + m.actionCountRemoveClient = m.actionCount.WithLabelValues("remove_client") + m.actionCountAddSub = m.actionCount.WithLabelValues("add_subscription") + m.actionCountRemoveSub = m.actionCount.WithLabelValues("remove_subscription") + m.actionCountAddPresence = m.actionCount.WithLabelValues("add_presence") + m.actionCountRemovePresence = m.actionCount.WithLabelValues("remove_presence") + m.actionCountPresence = m.actionCount.WithLabelValues("presence") + m.actionCountPresenceStats = m.actionCount.WithLabelValues("presence_stats") + m.actionCountHistory = m.actionCount.WithLabelValues("history") + m.actionCountHistoryRecover = m.actionCount.WithLabelValues("history_recover") + m.actionCountHistoryStreamTop = m.actionCount.WithLabelValues("history_stream_top") + m.actionCountHistoryRemove = m.actionCount.WithLabelValues("history_remove") + m.actionCountSurvey = m.actionCount.WithLabelValues("survey") + m.actionCountNotify = m.actionCount.WithLabelValues("notify") + + m.recoverCountYes = m.recoverCount.WithLabelValues("yes") + m.recoverCountNo = m.recoverCount.WithLabelValues("no") + + m.transportConnectCountWebsocket = m.transportConnectCount.WithLabelValues(transportWebsocket) + m.transportConnectCountSockJS = m.transportConnectCount.WithLabelValues(transportSockJS) + m.transportConnectCountHTTPStream = m.transportConnectCount.WithLabelValues(transportHTTPStream) + m.transportConnectCountSSE = m.transportConnectCount.WithLabelValues(transportSSE) labelForMethod := func(methodType commandMethodType) string { return strings.ToLower(commandMethodTypeName[int32(methodType)]) } - commandDurationConnect = commandDurationSummary.WithLabelValues(labelForMethod(commandConnect)) - commandDurationSubscribe = commandDurationSummary.WithLabelValues(labelForMethod(commandSubscribe)) - commandDurationUnsubscribe = commandDurationSummary.WithLabelValues(labelForMethod(commandUnsubscribe)) - commandDurationPublish = commandDurationSummary.WithLabelValues(labelForMethod(commandPublish)) - commandDurationPresence = commandDurationSummary.WithLabelValues(labelForMethod(commandPresence)) - commandDurationPresenceStats = commandDurationSummary.WithLabelValues(labelForMethod(commandPresenceStats)) - commandDurationHistory = commandDurationSummary.WithLabelValues(labelForMethod(commandHistory)) - commandDurationPing = commandDurationSummary.WithLabelValues(labelForMethod(commandPing)) - commandDurationSend = commandDurationSummary.WithLabelValues(labelForMethod(commandSend)) - commandDurationRPC = commandDurationSummary.WithLabelValues(labelForMethod(commandRpc)) - commandDurationRefresh = commandDurationSummary.WithLabelValues(labelForMethod(commandRefresh)) - commandDurationSubRefresh = commandDurationSummary.WithLabelValues(labelForMethod(commandSubRefresh)) - commandDurationUnknown = commandDurationSummary.WithLabelValues("unknown") + m.commandDurationConnect = m.commandDurationSummary.WithLabelValues(labelForMethod(commandConnect)) + m.commandDurationSubscribe = m.commandDurationSummary.WithLabelValues(labelForMethod(commandSubscribe)) + m.commandDurationUnsubscribe = m.commandDurationSummary.WithLabelValues(labelForMethod(commandUnsubscribe)) + m.commandDurationPublish = m.commandDurationSummary.WithLabelValues(labelForMethod(commandPublish)) + m.commandDurationPresence = m.commandDurationSummary.WithLabelValues(labelForMethod(commandPresence)) + m.commandDurationPresenceStats = m.commandDurationSummary.WithLabelValues(labelForMethod(commandPresenceStats)) + m.commandDurationHistory = m.commandDurationSummary.WithLabelValues(labelForMethod(commandHistory)) + m.commandDurationPing = m.commandDurationSummary.WithLabelValues(labelForMethod(commandPing)) + m.commandDurationSend = m.commandDurationSummary.WithLabelValues(labelForMethod(commandSend)) + m.commandDurationRPC = m.commandDurationSummary.WithLabelValues(labelForMethod(commandRpc)) + m.commandDurationRefresh = m.commandDurationSummary.WithLabelValues(labelForMethod(commandRefresh)) + m.commandDurationSubRefresh = m.commandDurationSummary.WithLabelValues(labelForMethod(commandSubRefresh)) + m.commandDurationUnknown = m.commandDurationSummary.WithLabelValues("unknown") - if err := registry.Register(messagesBroadcastedCount); err != nil { - return err + if err := registry.Register(m.messagesBroadcastedCount); err != nil { + return nil, err } - if err := registry.Register(messagesSentCount); err != nil { - return err + if err := registry.Register(m.messagesSentCount); err != nil { + return nil, err } - if err := registry.Register(messagesReceivedCount); err != nil { - return err + if err := registry.Register(m.messagesReceivedCount); err != nil { + return nil, err } - if err := registry.Register(actionCount); err != nil { - return err + if err := registry.Register(m.actionCount); err != nil { + return nil, err } - if err := registry.Register(numClientsGauge); err != nil { - return err + if err := registry.Register(m.numClientsGauge); err != nil { + return nil, err } - if err := registry.Register(numUsersGauge); err != nil { - return err + if err := registry.Register(m.numUsersGauge); err != nil { + return nil, err } - if err := registry.Register(numSubsGauge); err != nil { - return err + if err := registry.Register(m.numSubsGauge); err != nil { + return nil, err } - if err := registry.Register(numChannelsGauge); err != nil { - return err + if err := registry.Register(m.numChannelsGauge); err != nil { + return nil, err } - if err := registry.Register(numNodesGauge); err != nil { - return err + if err := registry.Register(m.numNodesGauge); err != nil { + return nil, err } - if err := registry.Register(commandDurationSummary); err != nil { - return err + if err := registry.Register(m.commandDurationSummary); err != nil { + return nil, err } - if err := registry.Register(replyErrorCount); err != nil { - return err + if err := registry.Register(m.replyErrorCount); err != nil { + return nil, err } - if err := registry.Register(serverDisconnectCount); err != nil { - return err + if err := registry.Register(m.serverDisconnectCount); err != nil { + return nil, err } - if err := registry.Register(recoverCount); err != nil { - return err + if err := registry.Register(m.recoverCount); err != nil { + return nil, err } - if err := registry.Register(transportConnectCount); err != nil { - return err + if err := registry.Register(m.transportConnectCount); err != nil { + return nil, err } - if err := registry.Register(transportMessagesSent); err != nil { - return err + if err := registry.Register(m.transportMessagesSent); err != nil { + return nil, err } - if err := registry.Register(transportMessagesSentSize); err != nil { - return err + if err := registry.Register(m.transportMessagesSentSize); err != nil { + return nil, err } - if err := registry.Register(transportMessagesReceived); err != nil { - return err + if err := registry.Register(m.transportMessagesReceived); err != nil { + return nil, err } - if err := registry.Register(transportMessagesReceivedSize); err != nil { - return err + if err := registry.Register(m.transportMessagesReceivedSize); err != nil { + return nil, err } - if err := registry.Register(buildInfoGauge); err != nil { - return err + if err := registry.Register(m.buildInfoGauge); err != nil { + return nil, err } - if err := registry.Register(surveyDurationSummary); err != nil { - return err + if err := registry.Register(m.surveyDurationSummary); err != nil { + return nil, err } - return nil + return m, nil } diff --git a/node.go b/node.go index 7f5b987e..c6e7f539 100644 --- a/node.go +++ b/node.go @@ -46,6 +46,8 @@ type Node struct { presenceManager PresenceManager // nodes contains registry of known nodes. nodes *nodeRegistry + // metrics registry. + metrics *metrics // shutdown is a flag which is only true when node is going to shut down. shutdown bool // shutdownCh is a channel which is closed when node shutdown initiated. @@ -163,6 +165,18 @@ func New(c Config) (*Node, error) { } n.emulationSurveyHandler = newEmulationSurveyHandler(n) + if m, err := initMetricsRegistry(prometheus.DefaultRegisterer, c.MetricsNamespace); err != nil { + switch err.(type) { + case prometheus.AlreadyRegisteredError: + // Can happen when node initialized several times since we use DefaultRegisterer, + // skip for now. + default: + return nil, err + } + } else { + n.metrics = m + } + b, err := NewMemoryBroker(n, MemoryBrokerConfig{}) if err != nil { return nil, err @@ -175,16 +189,6 @@ func New(c Config) (*Node, error) { } n.SetPresenceManager(m) - if err := initMetricsRegistry(prometheus.DefaultRegisterer, c.MetricsNamespace); err != nil { - switch err.(type) { - case prometheus.AlreadyRegisteredError: - // Can happen when node initialized several times since we use DefaultRegisterer, - // skip for now. - default: - return nil, err - } - } - return n, nil } @@ -303,16 +307,16 @@ func (n *Node) NotifyShutdown() chan struct{} { } func (n *Node) updateGauges() { - setNumClients(float64(n.hub.NumClients())) - setNumUsers(float64(n.hub.NumUsers())) - setNumSubscriptions(float64(n.hub.NumSubscriptions())) - setNumChannels(float64(n.hub.NumChannels())) - setNumNodes(float64(n.nodes.size())) + n.metrics.setNumClients(float64(n.hub.NumClients())) + n.metrics.setNumUsers(float64(n.hub.NumUsers())) + n.metrics.setNumSubscriptions(float64(n.hub.NumSubscriptions())) + n.metrics.setNumChannels(float64(n.hub.NumChannels())) + n.metrics.setNumNodes(float64(n.nodes.size())) version := n.config.Version if version == "" { version = "_" } - setBuildInfo(version) + n.metrics.setBuildInfo(version) } func (n *Node) updateMetrics() { @@ -477,10 +481,10 @@ func (n *Node) Survey(ctx context.Context, op string, data []byte, toNodeID stri return nil, errSurveyHandlerNotRegistered } - incActionCount("survey") + n.metrics.incActionCount("survey") started := time.Now() defer func() { - observeSurveyDuration(op, time.Since(started)) + n.metrics.observeSurveyDuration(op, time.Since(started)) }() if _, ok := ctx.Deadline(); !ok { @@ -632,7 +636,7 @@ func (n *Node) Info() (Info, error) { // handleControl handles messages from control channel - control messages used for internal // communication between nodes to share state or proto. func (n *Node) handleControl(data []byte) error { - incMessagesReceived("control") + n.metrics.incMessagesReceived("control") cmd, err := n.controlDecoder.DecodeCommand(data) if err != nil { @@ -686,39 +690,39 @@ func (n *Node) handleControl(data []byte) error { // coming from Broker. The goal of method is to deliver this message // to all clients on this node currently subscribed to channel. func (n *Node) handlePublication(ch string, pub *Publication, sp StreamPosition) error { - incMessagesReceived("publication") + n.metrics.incMessagesReceived("publication") numSubscribers := n.hub.NumSubscribers(ch) hasCurrentSubscribers := numSubscribers > 0 if !hasCurrentSubscribers { return nil } - incMessagesBroadcasted("publication", numSubscribers) + n.metrics.incMessagesBroadcasted("publication", numSubscribers) return n.hub.BroadcastPublication(ch, pub, sp) } // handleJoin handles join messages - i.e. broadcasts it to // interested local clients subscribed to channel. func (n *Node) handleJoin(ch string, info *ClientInfo) error { - incMessagesReceived("join") + n.metrics.incMessagesReceived("join") numSubscribers := n.hub.NumSubscribers(ch) hasCurrentSubscribers := numSubscribers > 0 if !hasCurrentSubscribers { return nil } - incMessagesBroadcasted("join", numSubscribers) + n.metrics.incMessagesBroadcasted("join", numSubscribers) return n.hub.broadcastJoin(ch, info) } // handleLeave handles leave messages - i.e. broadcasts it to // interested local clients subscribed to channel. func (n *Node) handleLeave(ch string, info *ClientInfo) error { - incMessagesReceived("leave") + n.metrics.incMessagesReceived("leave") numSubscribers := n.hub.NumSubscribers(ch) hasCurrentSubscribers := numSubscribers > 0 if !hasCurrentSubscribers { return nil } - incMessagesBroadcasted("leave", numSubscribers) + n.metrics.incMessagesBroadcasted("leave", numSubscribers) return n.hub.broadcastLeave(ch, info) } @@ -727,7 +731,7 @@ func (n *Node) publish(ch string, data []byte, opts ...PublishOption) (PublishRe for _, opt := range opts { opt(pubOpts) } - incMessagesSent("publication") + n.metrics.incMessagesSent("publication") streamPos, err := n.broker.Publish(ch, data, *pubOpts) if err != nil { return PublishResult{}, err @@ -766,14 +770,14 @@ func (n *Node) Publish(channel string, data []byte, opts ...PublishOption) (Publ // publishJoin allows publishing join message into channel when someone subscribes on it // or leave message when someone unsubscribes from channel. func (n *Node) publishJoin(ch string, info *ClientInfo) error { - incMessagesSent("join") + n.metrics.incMessagesSent("join") return n.broker.PublishJoin(ch, info) } // publishLeave allows publishing join message into channel when someone subscribes on it // or leave message when someone unsubscribes from channel. func (n *Node) publishLeave(ch string, info *ClientInfo) error { - incMessagesSent("leave") + n.metrics.incMessagesSent("leave") return n.broker.PublishLeave(ch, info) } @@ -790,7 +794,7 @@ func (n *Node) Notify(op string, data []byte, toNodeID string) error { return errNotificationHandlerNotRegistered } - incActionCount("notify") + n.metrics.incActionCount("notify") if toNodeID == "" || n.ID() == toNodeID { // Invoke handler on this node since control message handler @@ -820,7 +824,7 @@ func (n *Node) Notify(op string, data []byte, toNodeID string) error { // publishControl publishes message into control channel so all running // nodes will receive and handle it. func (n *Node) publishControl(cmd *controlpb.Command, nodeID string) error { - incMessagesSent("control") + n.metrics.incMessagesSent("control") data, err := n.controlEncoder.EncodeCommand(cmd) if err != nil { return err @@ -963,20 +967,20 @@ func (n *Node) pubDisconnect(user string, disconnect Disconnect, clientID string // addClient registers authenticated connection in clientConnectionHub // this allows to make operations with user connection on demand. func (n *Node) addClient(c *Client) error { - incActionCount("add_client") + n.metrics.incActionCount("add_client") return n.hub.add(c) } // removeClient removes client connection from connection registry. func (n *Node) removeClient(c *Client) error { - incActionCount("remove_client") + n.metrics.incActionCount("remove_client") return n.hub.remove(c) } // addSubscription registers subscription of connection on channel in both // Hub and Broker. func (n *Node) addSubscription(ch string, c *Client) error { - incActionCount("add_subscription") + n.metrics.incActionCount("add_subscription") mu := n.subLock(ch) mu.Lock() defer mu.Unlock() @@ -997,7 +1001,7 @@ func (n *Node) addSubscription(ch string, c *Client) error { // removeSubscription removes subscription of connection on channel // from Hub and Broker. func (n *Node) removeSubscription(ch string, c *Client) error { - incActionCount("remove_subscription") + n.metrics.incActionCount("remove_subscription") mu := n.subLock(ch) mu.Lock() defer mu.Unlock() @@ -1128,7 +1132,7 @@ func (n *Node) addPresence(ch string, uid string, info *ClientInfo) error { if n.presenceManager == nil { return nil } - incActionCount("add_presence") + n.metrics.incActionCount("add_presence") return n.presenceManager.AddPresence(ch, uid, info) } @@ -1137,7 +1141,7 @@ func (n *Node) removePresence(ch string, uid string) error { if n.presenceManager == nil { return nil } - incActionCount("remove_presence") + n.metrics.incActionCount("remove_presence") return n.presenceManager.RemovePresence(ch, uid) } @@ -1165,7 +1169,7 @@ func (n *Node) Presence(ch string) (PresenceResult, error) { if n.presenceManager == nil { return PresenceResult{}, ErrorNotAvailable } - incActionCount("presence") + n.metrics.incActionCount("presence") if n.config.UseSingleFlight { result, err, _ := presenceGroup.Do(ch, func() (any, error) { return n.presence(ch) @@ -1251,7 +1255,7 @@ func (n *Node) PresenceStats(ch string) (PresenceStatsResult, error) { if n.presenceManager == nil { return PresenceStatsResult{}, ErrorNotAvailable } - incActionCount("presence_stats") + n.metrics.incActionCount("presence_stats") if n.config.UseSingleFlight { result, err, _ := presenceStatsGroup.Do(ch, func() (any, error) { return n.presenceStats(ch) @@ -1296,7 +1300,7 @@ func (n *Node) history(ch string, opts *HistoryOptions) (HistoryResult, error) { // History allows extracting Publications in channel. // The channel must belong to namespace where history is on. func (n *Node) History(ch string, opts ...HistoryOption) (HistoryResult, error) { - incActionCount("history") + n.metrics.incActionCount("history") historyOpts := &HistoryOptions{} for _, opt := range opts { opt(historyOpts) @@ -1329,7 +1333,7 @@ func (n *Node) History(ch string, opts ...HistoryOption) (HistoryResult, error) // recoverHistory recovers publications since StreamPosition last seen by client. func (n *Node) recoverHistory(ch string, since StreamPosition, historyMetaTTL time.Duration) (HistoryResult, error) { - incActionCount("history_recover") + n.metrics.incActionCount("history_recover") limit := NoLimit maxPublicationLimit := n.config.RecoveryMaxPublicationLimit if maxPublicationLimit > 0 { @@ -1343,7 +1347,7 @@ func (n *Node) recoverHistory(ch string, since StreamPosition, historyMetaTTL ti // streamTop returns current stream top StreamPosition for a channel. func (n *Node) streamTop(ch string, historyMetaTTL time.Duration) (StreamPosition, error) { - incActionCount("history_stream_top") + n.metrics.incActionCount("history_stream_top") historyResult, err := n.History(ch, WithHistoryMetaTTL(historyMetaTTL)) if err != nil { return StreamPosition{}, err @@ -1353,7 +1357,7 @@ func (n *Node) streamTop(ch string, historyMetaTTL time.Duration) (StreamPositio // RemoveHistory removes channel history. func (n *Node) RemoveHistory(ch string) error { - incActionCount("history_remove") + n.metrics.incActionCount("history_remove") return n.broker.RemoveHistory(ch) }