diff --git a/_examples/chat_json/main.go b/_examples/chat_json/main.go index e3347d5b..bd3960ac 100644 --- a/_examples/chat_json/main.go +++ b/_examples/chat_json/main.go @@ -72,6 +72,12 @@ func main() { LogLevel: centrifuge.LogLevelInfo, LogHandler: handleLog, HistoryMetaTTL: 24 * time.Hour, + + GetChannelNamespaceLabel: func(channel string) string { + return channel + }, + ChannelNamespaceLabelForMessagesSent: true, + ChannelNamespaceLabelForMessagesReceived: true, }) node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) { diff --git a/client.go b/client.go index 30e28441..e2de7c69 100644 --- a/client.go +++ b/client.go @@ -563,7 +563,7 @@ func (c *Client) transportEnqueue(data []byte, ch string) error { item := queue.Item{ Data: data, } - if c.node.config.GetChannelGroupLabel != nil { + if c.node.config.GetChannelNamespaceLabel != nil { item.Channel = ch } disconnect := c.messageWriter.enqueue(item) @@ -1148,6 +1148,15 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec return &DisconnectBadRequest, false } + var metricChannel string + defer func() { + channelGroup := "_" + if metricChannel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesReceived { + channelGroup = c.node.config.GetChannelNamespaceLabel(metricChannel) + } + incTransportMessagesReceived(c.transport.Name(), channelGroup, cmdSize) + }() + if isPong(cmd) { c.mu.Lock() if c.status == statusClosed { @@ -1216,16 +1225,22 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec } else if cmd.Ping != nil { handleErr = c.handlePing(cmd, started, nil) } else if cmd.Subscribe != nil { + metricChannel = cmd.Subscribe.Channel handleErr = c.handleSubscribe(cmd.Subscribe, cmd, started, nil) } else if cmd.Unsubscribe != nil { + metricChannel = cmd.Unsubscribe.Channel handleErr = c.handleUnsubscribe(cmd.Unsubscribe, cmd, started, nil) } else if cmd.Publish != nil { + metricChannel = cmd.Publish.Channel handleErr = c.handlePublish(cmd.Publish, cmd, started, nil) } else if cmd.Presence != nil { + metricChannel = cmd.Presence.Channel handleErr = c.handlePresence(cmd.Presence, cmd, started, nil) } else if cmd.PresenceStats != nil { + metricChannel = cmd.PresenceStats.Channel handleErr = c.handlePresenceStats(cmd.PresenceStats, cmd, started, nil) } else if cmd.History != nil { + metricChannel = cmd.History.Channel handleErr = c.handleHistory(cmd.History, cmd, started, nil) } else if cmd.Rpc != nil { handleErr = c.handleRPC(cmd.Rpc, cmd, started, nil) @@ -1234,6 +1249,7 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec } else if cmd.Refresh != nil { handleErr = c.handleRefresh(cmd.Refresh, cmd, started, nil) } else if cmd.SubRefresh != nil { + metricChannel = cmd.SubRefresh.Channel handleErr = c.handleSubRefresh(cmd.SubRefresh, cmd, started, nil) } else { return &DisconnectBadRequest, false @@ -2432,10 +2448,10 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q MaxQueueSize: c.node.config.ClientQueueMaxSize, WriteFn: func(item queue.Item) error { channelGroup := "_" - if item.Channel != "" && c.node.config.GetChannelGroupLabel != nil { - channelGroup = c.node.config.GetChannelGroupLabel(item.Channel) + if item.Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesSent { + channelGroup = c.node.config.GetChannelNamespaceLabel(item.Channel) } - incTransportMessages(c.transport.Name(), channelGroup, len(item.Data)) + incTransportMessagesSent(c.transport.Name(), channelGroup, len(item.Data)) if c.node.clientEvents.transportWriteHandler != nil { pass := c.node.clientEvents.transportWriteHandler(c, TransportWriteEvent(item)) @@ -2469,10 +2485,10 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q } messages = append(messages, items[i].Data) channelGroup := "_" - if items[i].Channel != "" && c.node.config.GetChannelGroupLabel != nil { - channelGroup = c.node.config.GetChannelGroupLabel(items[i].Channel) + if items[i].Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesSent { + channelGroup = c.node.config.GetChannelNamespaceLabel(items[i].Channel) } - incTransportMessages(c.transport.Name(), channelGroup, len(items[i].Data)) + incTransportMessagesSent(c.transport.Name(), channelGroup, len(items[i].Data)) } writeMu.Lock() defer writeMu.Unlock() diff --git a/config.go b/config.go index 739b4ef3..e0f19969 100644 --- a/config.go +++ b/config.go @@ -92,11 +92,15 @@ type Config struct { // When zero Centrifuge uses default 30 days which we believe is more than enough // for most use cases. HistoryMetaTTL time.Duration - // GetChannelGroupLabel if set will be used by Centrifuge to extract channel_group label + // GetChannelNamespaceLabel if set will be used by Centrifuge to extract channel_group label // for channel related metrics. Make sure to maintain low cardinality of returned // values to avoid issues with Prometheus performance. This function may introduce - // sufficient overhead since it's called in a hot path right before transport write. - GetChannelGroupLabel func(channel string) string + // sufficient overhead since it's called in hot paths. + GetChannelNamespaceLabel func(channel string) string + // ChannelNamespaceLabelForMessagesSent ... + ChannelNamespaceLabelForMessagesSent bool + // ChannelNamespaceLabelForMessagesReceived ... + ChannelNamespaceLabelForMessagesReceived bool } const ( diff --git a/hub_test.go b/hub_test.go index 08aacfb6..56d45c94 100644 --- a/hub_test.go +++ b/hub_test.go @@ -453,7 +453,7 @@ func TestHubBroadcastPublication(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { n := defaultTestNode() - n.config.GetChannelGroupLabel = func(channel string) string { + n.config.GetChannelNamespaceLabel = func(channel string) string { return channel } defer func() { _ = n.Shutdown(context.Background()) }() diff --git a/metrics.go b/metrics.go index befed2a3..46eb3882 100644 --- a/metrics.go +++ b/metrics.go @@ -15,24 +15,26 @@ var defaultMetricsNamespace = "centrifuge" var registryMu sync.RWMutex var ( - messagesSentCount *prometheus.CounterVec - messagesReceivedCount *prometheus.CounterVec - actionCount *prometheus.CounterVec - buildInfoGauge *prometheus.GaugeVec - numClientsGauge prometheus.Gauge - numUsersGauge prometheus.Gauge - numSubsGauge prometheus.Gauge - numChannelsGauge prometheus.Gauge - numNodesGauge prometheus.Gauge - replyErrorCount *prometheus.CounterVec - serverDisconnectCount *prometheus.CounterVec - commandDurationSummary *prometheus.SummaryVec - surveyDurationSummary *prometheus.SummaryVec - recoverCount *prometheus.CounterVec - transportConnectCount *prometheus.CounterVec - transportMessagesSent *prometheus.CounterVec - transportMessagesSentSize *prometheus.CounterVec - messagesBroadcastedCount *prometheus.CounterVec + messagesSentCount *prometheus.CounterVec + messagesReceivedCount *prometheus.CounterVec + actionCount *prometheus.CounterVec + buildInfoGauge *prometheus.GaugeVec + numClientsGauge prometheus.Gauge + numUsersGauge prometheus.Gauge + numSubsGauge prometheus.Gauge + numChannelsGauge prometheus.Gauge + numNodesGauge prometheus.Gauge + replyErrorCount *prometheus.CounterVec + serverDisconnectCount *prometheus.CounterVec + commandDurationSummary *prometheus.SummaryVec + surveyDurationSummary *prometheus.SummaryVec + recoverCount *prometheus.CounterVec + transportConnectCount *prometheus.CounterVec + transportMessagesSent *prometheus.CounterVec + transportMessagesSentSize *prometheus.CounterVec + transportMessagesReceived *prometheus.CounterVec + transportMessagesReceivedSize *prometheus.CounterVec + messagesBroadcastedCount *prometheus.CounterVec messagesBroadcastedPublication prometheus.Counter messagesBroadcastedJoin prometheus.Counter @@ -235,53 +237,90 @@ func incTransportConnect(transport string) { } } -type messagesSentLabels struct { +type transportMessageLabels struct { Transport string ChannelGroup string } var ( - transportMessagesSentCache map[messagesSentLabels]prometheus.Counter - transportMessagesSentSizeCache map[messagesSentLabels]prometheus.Counter - metricCacheMu sync.RWMutex + transportMessagesSentCache map[transportMessageLabels]prometheus.Counter + transportMessagesSentSizeCache map[transportMessageLabels]prometheus.Counter + messagesSentCacheMu sync.RWMutex + + transportMessagesReceivedCache map[transportMessageLabels]prometheus.Counter + transportMessagesReceivedSizeCache map[transportMessageLabels]prometheus.Counter + messagesReceivedCacheMu sync.RWMutex ) func init() { - transportMessagesSentCache = make(map[messagesSentLabels]prometheus.Counter) - transportMessagesSentSizeCache = make(map[messagesSentLabels]prometheus.Counter) + transportMessagesSentCache = make(map[transportMessageLabels]prometheus.Counter) + transportMessagesSentSizeCache = make(map[transportMessageLabels]prometheus.Counter) + transportMessagesReceivedCache = make(map[transportMessageLabels]prometheus.Counter) + transportMessagesReceivedSizeCache = make(map[transportMessageLabels]prometheus.Counter) } -func incTransportMessages(transport string, channelGroup string, size int) { +func incTransportMessagesSent(transport string, channelGroup string, size int) { registryMu.RLock() defer registryMu.RUnlock() - labels := messagesSentLabels{ + labels := transportMessageLabels{ Transport: transport, ChannelGroup: channelGroup, } - metricCacheMu.RLock() + messagesSentCacheMu.RLock() counterSent, okSent := transportMessagesSentCache[labels] counterSentSize, okSentSize := transportMessagesSentSizeCache[labels] - metricCacheMu.RUnlock() + messagesSentCacheMu.RUnlock() if !okSent { counterSent = transportMessagesSent.WithLabelValues(transport, channelGroup) - metricCacheMu.Lock() + messagesSentCacheMu.Lock() transportMessagesSentCache[labels] = counterSent - metricCacheMu.Unlock() + messagesSentCacheMu.Unlock() } if !okSentSize { counterSentSize = transportMessagesSentSize.WithLabelValues(transport, channelGroup) - metricCacheMu.Lock() + messagesSentCacheMu.Lock() transportMessagesSentSizeCache[labels] = counterSentSize - metricCacheMu.Unlock() + messagesSentCacheMu.Unlock() } counterSent.Inc() counterSentSize.Add(float64(size)) } +func incTransportMessagesReceived(transport string, channelGroup string, size int) { + registryMu.RLock() + defer registryMu.RUnlock() + + labels := transportMessageLabels{ + Transport: transport, + ChannelGroup: channelGroup, + } + + messagesReceivedCacheMu.RLock() + counterReceived, okReceived := transportMessagesReceivedCache[labels] + counterReceivedSize, okReceivedSize := transportMessagesReceivedSizeCache[labels] + messagesReceivedCacheMu.RUnlock() + + if !okReceived { + counterReceived = transportMessagesReceived.WithLabelValues(transport, channelGroup) + messagesReceivedCacheMu.Lock() + transportMessagesReceivedCache[labels] = counterReceived + messagesReceivedCacheMu.Unlock() + } + + if !okReceivedSize { + counterReceivedSize = transportMessagesReceivedSize.WithLabelValues(transport, channelGroup) + messagesReceivedCacheMu.Lock() + transportMessagesReceivedSizeCache[labels] = counterReceivedSize + messagesReceivedCacheMu.Unlock() + } + counterReceived.Inc() + counterReceivedSize.Add(float64(size)) +} + func incServerDisconnect(code uint32) { registryMu.RLock() defer registryMu.RUnlock() @@ -513,14 +552,28 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string Subsystem: "transport", Name: "messages_sent", Help: "Number of messages sent over specific transport.", - }, []string{"transport", "channel_group"}) + }, []string{"transport", "channel_namespace"}) transportMessagesSentSize = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "transport", Name: "messages_sent_size", Help: "Size in bytes of messages sent over specific transport.", - }, []string{"transport", "channel_group"}) + }, []string{"transport", "channel_namespace"}) + + transportMessagesReceived = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "transport", + Name: "messages_received", + Help: "Number of messages received over specific transport.", + }, []string{"transport", "channel_namespace"}) + + transportMessagesReceivedSize = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "transport", + Name: "messages_received_size", + Help: "Size in bytes of messages received over specific transport.", + }, []string{"transport", "channel_namespace"}) messagesReceivedCountPublication = messagesReceivedCount.WithLabelValues("publication") messagesReceivedCountJoin = messagesReceivedCount.WithLabelValues("join") @@ -625,6 +678,12 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string if err := registry.Register(transportMessagesSentSize); err != nil { return err } + if err := registry.Register(transportMessagesReceived); err != nil { + return err + } + if err := registry.Register(transportMessagesReceivedSize); err != nil { + return err + } if err := registry.Register(buildInfoGauge); err != nil { return err }