Skip to content

Commit

Permalink
add transport msg received stats
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Oct 19, 2023
1 parent 69dd50d commit d78d9f0
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 45 deletions.
6 changes: 6 additions & 0 deletions _examples/chat_json/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ func main() {
LogLevel: centrifuge.LogLevelInfo,
LogHandler: handleLog,
HistoryMetaTTL: 24 * time.Hour,

GetChannelNamespaceLabel: func(channel string) string {
return channel
},
ChannelNamespaceLabelForMessagesSent: true,
ChannelNamespaceLabelForMessagesReceived: true,
})

node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
Expand Down
30 changes: 23 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func (c *Client) transportEnqueue(data []byte, ch string) error {
item := queue.Item{
Data: data,
}
if c.node.config.GetChannelGroupLabel != nil {
if c.node.config.GetChannelNamespaceLabel != nil {
item.Channel = ch
}
disconnect := c.messageWriter.enqueue(item)
Expand Down Expand Up @@ -1148,6 +1148,15 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec
return &DisconnectBadRequest, false
}

var metricChannel string
defer func() {
channelGroup := "_"
if metricChannel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesReceived {
channelGroup = c.node.config.GetChannelNamespaceLabel(metricChannel)
}
incTransportMessagesReceived(c.transport.Name(), channelGroup, cmdSize)
}()

if isPong(cmd) {
c.mu.Lock()
if c.status == statusClosed {
Expand Down Expand Up @@ -1216,16 +1225,22 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec
} else if cmd.Ping != nil {
handleErr = c.handlePing(cmd, started, nil)
} else if cmd.Subscribe != nil {
metricChannel = cmd.Subscribe.Channel
handleErr = c.handleSubscribe(cmd.Subscribe, cmd, started, nil)
} else if cmd.Unsubscribe != nil {
metricChannel = cmd.Unsubscribe.Channel
handleErr = c.handleUnsubscribe(cmd.Unsubscribe, cmd, started, nil)
} else if cmd.Publish != nil {
metricChannel = cmd.Publish.Channel
handleErr = c.handlePublish(cmd.Publish, cmd, started, nil)
} else if cmd.Presence != nil {
metricChannel = cmd.Presence.Channel
handleErr = c.handlePresence(cmd.Presence, cmd, started, nil)
} else if cmd.PresenceStats != nil {
metricChannel = cmd.PresenceStats.Channel
handleErr = c.handlePresenceStats(cmd.PresenceStats, cmd, started, nil)
} else if cmd.History != nil {
metricChannel = cmd.History.Channel
handleErr = c.handleHistory(cmd.History, cmd, started, nil)
} else if cmd.Rpc != nil {
handleErr = c.handleRPC(cmd.Rpc, cmd, started, nil)
Expand All @@ -1234,6 +1249,7 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec
} else if cmd.Refresh != nil {
handleErr = c.handleRefresh(cmd.Refresh, cmd, started, nil)
} else if cmd.SubRefresh != nil {
metricChannel = cmd.SubRefresh.Channel
handleErr = c.handleSubRefresh(cmd.SubRefresh, cmd, started, nil)
} else {
return &DisconnectBadRequest, false
Expand Down Expand Up @@ -2432,10 +2448,10 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q
MaxQueueSize: c.node.config.ClientQueueMaxSize,
WriteFn: func(item queue.Item) error {
channelGroup := "_"
if item.Channel != "" && c.node.config.GetChannelGroupLabel != nil {
channelGroup = c.node.config.GetChannelGroupLabel(item.Channel)
if item.Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesSent {
channelGroup = c.node.config.GetChannelNamespaceLabel(item.Channel)
}
incTransportMessages(c.transport.Name(), channelGroup, len(item.Data))
incTransportMessagesSent(c.transport.Name(), channelGroup, len(item.Data))

if c.node.clientEvents.transportWriteHandler != nil {
pass := c.node.clientEvents.transportWriteHandler(c, TransportWriteEvent(item))
Expand Down Expand Up @@ -2469,10 +2485,10 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q
}
messages = append(messages, items[i].Data)
channelGroup := "_"
if items[i].Channel != "" && c.node.config.GetChannelGroupLabel != nil {
channelGroup = c.node.config.GetChannelGroupLabel(items[i].Channel)
if items[i].Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesSent {
channelGroup = c.node.config.GetChannelNamespaceLabel(items[i].Channel)
}
incTransportMessages(c.transport.Name(), channelGroup, len(items[i].Data))
incTransportMessagesSent(c.transport.Name(), channelGroup, len(items[i].Data))
}
writeMu.Lock()
defer writeMu.Unlock()
Expand Down
10 changes: 7 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,15 @@ type Config struct {
// When zero Centrifuge uses default 30 days which we believe is more than enough
// for most use cases.
HistoryMetaTTL time.Duration
// GetChannelGroupLabel if set will be used by Centrifuge to extract channel_group label
// GetChannelNamespaceLabel if set will be used by Centrifuge to extract channel_group label
// for channel related metrics. Make sure to maintain low cardinality of returned
// values to avoid issues with Prometheus performance. This function may introduce
// sufficient overhead since it's called in a hot path right before transport write.
GetChannelGroupLabel func(channel string) string
// sufficient overhead since it's called in hot paths.
GetChannelNamespaceLabel func(channel string) string
// ChannelNamespaceLabelForMessagesSent ...
ChannelNamespaceLabelForMessagesSent bool
// ChannelNamespaceLabelForMessagesReceived ...
ChannelNamespaceLabelForMessagesReceived bool
}

const (
Expand Down
2 changes: 1 addition & 1 deletion hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func TestHubBroadcastPublication(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
n := defaultTestNode()
n.config.GetChannelGroupLabel = func(channel string) string {
n.config.GetChannelNamespaceLabel = func(channel string) string {
return channel
}
defer func() { _ = n.Shutdown(context.Background()) }()
Expand Down
127 changes: 93 additions & 34 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,26 @@ var defaultMetricsNamespace = "centrifuge"
var registryMu sync.RWMutex

var (
messagesSentCount *prometheus.CounterVec
messagesReceivedCount *prometheus.CounterVec
actionCount *prometheus.CounterVec
buildInfoGauge *prometheus.GaugeVec
numClientsGauge prometheus.Gauge
numUsersGauge prometheus.Gauge
numSubsGauge prometheus.Gauge
numChannelsGauge prometheus.Gauge
numNodesGauge prometheus.Gauge
replyErrorCount *prometheus.CounterVec
serverDisconnectCount *prometheus.CounterVec
commandDurationSummary *prometheus.SummaryVec
surveyDurationSummary *prometheus.SummaryVec
recoverCount *prometheus.CounterVec
transportConnectCount *prometheus.CounterVec
transportMessagesSent *prometheus.CounterVec
transportMessagesSentSize *prometheus.CounterVec
messagesBroadcastedCount *prometheus.CounterVec
messagesSentCount *prometheus.CounterVec
messagesReceivedCount *prometheus.CounterVec
actionCount *prometheus.CounterVec
buildInfoGauge *prometheus.GaugeVec
numClientsGauge prometheus.Gauge
numUsersGauge prometheus.Gauge
numSubsGauge prometheus.Gauge
numChannelsGauge prometheus.Gauge
numNodesGauge prometheus.Gauge
replyErrorCount *prometheus.CounterVec
serverDisconnectCount *prometheus.CounterVec
commandDurationSummary *prometheus.SummaryVec
surveyDurationSummary *prometheus.SummaryVec
recoverCount *prometheus.CounterVec
transportConnectCount *prometheus.CounterVec
transportMessagesSent *prometheus.CounterVec
transportMessagesSentSize *prometheus.CounterVec
transportMessagesReceived *prometheus.CounterVec
transportMessagesReceivedSize *prometheus.CounterVec
messagesBroadcastedCount *prometheus.CounterVec

messagesBroadcastedPublication prometheus.Counter
messagesBroadcastedJoin prometheus.Counter
Expand Down Expand Up @@ -235,53 +237,90 @@ func incTransportConnect(transport string) {
}
}

type messagesSentLabels struct {
type transportMessageLabels struct {
Transport string
ChannelGroup string
}

var (
transportMessagesSentCache map[messagesSentLabels]prometheus.Counter
transportMessagesSentSizeCache map[messagesSentLabels]prometheus.Counter
metricCacheMu sync.RWMutex
transportMessagesSentCache map[transportMessageLabels]prometheus.Counter
transportMessagesSentSizeCache map[transportMessageLabels]prometheus.Counter
messagesSentCacheMu sync.RWMutex

transportMessagesReceivedCache map[transportMessageLabels]prometheus.Counter
transportMessagesReceivedSizeCache map[transportMessageLabels]prometheus.Counter
messagesReceivedCacheMu sync.RWMutex
)

func init() {
transportMessagesSentCache = make(map[messagesSentLabels]prometheus.Counter)
transportMessagesSentSizeCache = make(map[messagesSentLabels]prometheus.Counter)
transportMessagesSentCache = make(map[transportMessageLabels]prometheus.Counter)
transportMessagesSentSizeCache = make(map[transportMessageLabels]prometheus.Counter)
transportMessagesReceivedCache = make(map[transportMessageLabels]prometheus.Counter)
transportMessagesReceivedSizeCache = make(map[transportMessageLabels]prometheus.Counter)
}

func incTransportMessages(transport string, channelGroup string, size int) {
func incTransportMessagesSent(transport string, channelGroup string, size int) {
registryMu.RLock()
defer registryMu.RUnlock()

labels := messagesSentLabels{
labels := transportMessageLabels{
Transport: transport,
ChannelGroup: channelGroup,
}

metricCacheMu.RLock()
messagesSentCacheMu.RLock()
counterSent, okSent := transportMessagesSentCache[labels]
counterSentSize, okSentSize := transportMessagesSentSizeCache[labels]
metricCacheMu.RUnlock()
messagesSentCacheMu.RUnlock()

if !okSent {
counterSent = transportMessagesSent.WithLabelValues(transport, channelGroup)
metricCacheMu.Lock()
messagesSentCacheMu.Lock()
transportMessagesSentCache[labels] = counterSent
metricCacheMu.Unlock()
messagesSentCacheMu.Unlock()
}

if !okSentSize {
counterSentSize = transportMessagesSentSize.WithLabelValues(transport, channelGroup)
metricCacheMu.Lock()
messagesSentCacheMu.Lock()
transportMessagesSentSizeCache[labels] = counterSentSize
metricCacheMu.Unlock()
messagesSentCacheMu.Unlock()
}
counterSent.Inc()
counterSentSize.Add(float64(size))
}

func incTransportMessagesReceived(transport string, channelGroup string, size int) {
registryMu.RLock()
defer registryMu.RUnlock()

labels := transportMessageLabels{
Transport: transport,
ChannelGroup: channelGroup,
}

messagesReceivedCacheMu.RLock()
counterReceived, okReceived := transportMessagesReceivedCache[labels]
counterReceivedSize, okReceivedSize := transportMessagesReceivedSizeCache[labels]
messagesReceivedCacheMu.RUnlock()

if !okReceived {
counterReceived = transportMessagesReceived.WithLabelValues(transport, channelGroup)
messagesReceivedCacheMu.Lock()
transportMessagesReceivedCache[labels] = counterReceived
messagesReceivedCacheMu.Unlock()
}

if !okReceivedSize {
counterReceivedSize = transportMessagesReceivedSize.WithLabelValues(transport, channelGroup)
messagesReceivedCacheMu.Lock()
transportMessagesReceivedSizeCache[labels] = counterReceivedSize
messagesReceivedCacheMu.Unlock()
}
counterReceived.Inc()
counterReceivedSize.Add(float64(size))
}

func incServerDisconnect(code uint32) {
registryMu.RLock()
defer registryMu.RUnlock()
Expand Down Expand Up @@ -513,14 +552,28 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string
Subsystem: "transport",
Name: "messages_sent",
Help: "Number of messages sent over specific transport.",
}, []string{"transport", "channel_group"})
}, []string{"transport", "channel_namespace"})

transportMessagesSentSize = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "messages_sent_size",
Help: "Size in bytes of messages sent over specific transport.",
}, []string{"transport", "channel_group"})
}, []string{"transport", "channel_namespace"})

transportMessagesReceived = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "messages_received",
Help: "Number of messages received over specific transport.",
}, []string{"transport", "channel_namespace"})

transportMessagesReceivedSize = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "messages_received_size",
Help: "Size in bytes of messages received over specific transport.",
}, []string{"transport", "channel_namespace"})

messagesReceivedCountPublication = messagesReceivedCount.WithLabelValues("publication")
messagesReceivedCountJoin = messagesReceivedCount.WithLabelValues("join")
Expand Down Expand Up @@ -625,6 +678,12 @@ func initMetricsRegistry(registry prometheus.Registerer, metricsNamespace string
if err := registry.Register(transportMessagesSentSize); err != nil {
return err
}
if err := registry.Register(transportMessagesReceived); err != nil {
return err
}
if err := registry.Register(transportMessagesReceivedSize); err != nil {
return err
}
if err := registry.Register(buildInfoGauge); err != nil {
return err
}
Expand Down

0 comments on commit d78d9f0

Please sign in to comment.