Skip to content

Commit

Permalink
metrics: add consumer read latency distribution metric (#579)
Browse files Browse the repository at this point in the history
Add missing consumer read latency histogram metric via `HookBrokerRead` in `franz-go`.
Additionally fix broken tests for `TestConsumerMetrics` and `TestProducerMetrics.
  • Loading branch information
1pkg authored Oct 7, 2024
1 parent c78f771 commit 5d223b7
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 81 deletions.
7 changes: 5 additions & 2 deletions kafka/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,10 @@ func TestManagerMetrics(t *testing.T) {
assert.Equal(t, "github.com/elastic/apm-queue/kafka", rm.ScopeMetrics[0].Scope.Name)

metrics := rm.ScopeMetrics[0].Metrics
require.Len(t, metrics, 7)
require.Len(t, metrics, 8)
var lagMetric, assignmentMetric metricdata.Metrics
// these are not stable so we just assert for existence
var connectsMetric, disconnectsMetric, writeBytesMetric, readBytesMetric, writeLatencyMetric bool
var connectsMetric, disconnectsMetric, writeBytesMetric, readBytesMetric, writeLatencyMetric, readLatencyMetric bool
for _, metric := range metrics {
switch metric.Name {
case "consumer_group_lag":
Expand All @@ -443,13 +443,16 @@ func TestManagerMetrics(t *testing.T) {
readBytesMetric = true
case "messaging.kafka.write.latency":
writeLatencyMetric = true
case "messaging.kafka.read.latency":
readLatencyMetric = true
}
}
assert.True(t, writeBytesMetric)
assert.True(t, readBytesMetric)
assert.True(t, connectsMetric)
assert.True(t, disconnectsMetric)
assert.True(t, writeLatencyMetric)
assert.True(t, readLatencyMetric)
metricdatatest.AssertAggregationsEqual(t, metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{{
Attributes: attribute.NewSet(
Expand Down
32 changes: 26 additions & 6 deletions kafka/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
msgConsumedUncompressedBytesKey = "consumer.messages.uncompressed.bytes"
throttlingDurationKey = "messaging.kafka.throttling.duration"
messageWriteLatencyKey = "messaging.kafka.write.latency"
messageReadLatencyKey = "messaging.kafka.read.latency"
errorReasonKey = "error_reason"
)

Expand Down Expand Up @@ -97,6 +98,7 @@ type metricHooks struct {
messageFetched metric.Int64Counter
messageFetchedWireBytes metric.Int64Counter
messageFetchedUncompressedBytes metric.Int64Counter
messageReadLatency metric.Float64Histogram
messageDelay metric.Float64Histogram
throttlingDuration metric.Float64Histogram

Expand Down Expand Up @@ -241,7 +243,7 @@ func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string,
}

messageWriteLatency, err := m.Float64Histogram(messageWriteLatencyKey,
metric.WithDescription("Time took to write including waited before being written"),
metric.WithDescription("Time it took to write a batch including wait time before writing"),
metric.WithUnit("s"),
)
if err != nil {
Expand All @@ -255,6 +257,7 @@ func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string,
if err != nil {
return nil, formatMetricError(msgFetchedKey, err)
}

messageFetchedWireBytes, err := m.Int64Counter(msgConsumedWireBytesKey,
metric.WithDescription("The number of bytes consumed"),
metric.WithUnit(unitBytes),
Expand All @@ -271,6 +274,14 @@ func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string,
return nil, formatMetricError(msgConsumedUncompressedBytesKey, err)
}

messageReadLatency, err := m.Float64Histogram(messageReadLatencyKey,
metric.WithDescription("Time it took to read a batch including wait time before reading"),
metric.WithUnit("s"),
)
if err != nil {
return nil, formatMetricError(messageReadLatencyKey, err)
}

messageDelayHistogram, err := m.Float64Histogram(msgDelayKey,
metric.WithDescription("The delay between producing messages and reading them"),
metric.WithUnit("s"),
Expand Down Expand Up @@ -323,6 +334,7 @@ func newKgoHooks(mp metric.MeterProvider, namespace, topicPrefix string,
messageFetched: messageFetchedCounter,
messageFetchedWireBytes: messageFetchedWireBytes,
messageFetchedUncompressedBytes: messageFetchedUncompressedBytes,
messageReadLatency: messageReadLatency,
messageDelay: messageDelayHistogram,
throttlingDuration: throttlingDurationHistogram,

Expand Down Expand Up @@ -407,23 +419,31 @@ func (h *metricHooks) OnBrokerWrite(meta kgo.BrokerMetadata, key int16, bytesWri
)
}

func (h *metricHooks) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, _, _ time.Duration, err error) {
attrs := make([]attribute.KeyValue, 0, 2)
func (h *metricHooks) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
attrs := make([]attribute.KeyValue, 0, 3)
attrs = append(attrs, semconv.MessagingSystem("kafka"))
if h.namespace != "" {
attrs = append(attrs, attribute.String("namespace", h.namespace))
}
outcome := "success"
if err != nil {
outcome = "failure"
h.readErrs.Add(
context.Background(),
1,
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
return
} else {
h.readBytes.Add(
context.Background(),
int64(bytesRead),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
h.readBytes.Add(
attrs = append(attrs, attribute.String("outcome", outcome))
h.messageReadLatency.Record(
context.Background(),
int64(bytesRead),
readWait.Seconds()+timeToRead.Seconds(),
metric.WithAttributeSet(attribute.NewSet(attrs...)),
)
}
Expand Down
121 changes: 48 additions & 73 deletions kafka/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package kafka
import (
"context"
"fmt"
"slices"
"sync/atomic"
"testing"
"time"
Expand All @@ -46,7 +45,6 @@ func TestProducerMetrics(t *testing.T) {
producer apmqueue.Producer,
rdr sdkmetric.Reader,
want []metricdata.Metrics,
ignore ...string,
) {
topic := apmqueue.Topic("default-topic")
producer.Produce(ctx,
Expand All @@ -65,22 +63,16 @@ func TestProducerMetrics(t *testing.T) {
assert.NoError(t, rdr.Collect(context.Background(), &rm))

metrics := filterMetrics(t, rm.ScopeMetrics)
for i := range metrics {
t.Log(metrics[i].Name)
}
for i := range want {
if slices.Contains(ignore, want[i].Name) {
continue
}
for _, m := range want {
var actual metricdata.Metrics
for _, m := range metrics {
if m.Name == want[i].Name {
actual = m
for _, mi := range metrics {
if m.Name == mi.Name {
actual = mi
break
}
}
assert.NotEmpty(t, actual, fmt.Sprintf("metric %s should exist", want[i].Name))
metricdatatest.AssertEqual(t, want[i], actual,
assert.NotEmpty(t, actual)
metricdatatest.AssertEqual(t, m, actual,
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
metricdatatest.IgnoreValue(),
Expand Down Expand Up @@ -116,12 +108,7 @@ func TestProducerMetrics(t *testing.T) {
}
ctx, cancel := context.WithTimeout(context.Background(), 0)
defer cancel()
test(ctx, t, producer, rdr, want,
"messaging.kafka.connects.count",
"messaging.kafka.disconnects.count",
"messaging.kafka.connect_errors.count",
"messaging.kafka.write_errors.count",
)
test(ctx, t, producer, rdr, want)
})
t.Run("ContextCanceled", func(t *testing.T) {
producer, rdr := setupTestProducer(t, nil)
Expand Down Expand Up @@ -151,13 +138,7 @@ func TestProducerMetrics(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
test(ctx, t, producer, rdr, want,
"messaging.kafka.connect_errors.count",
"messaging.kafka.connects.count",
"messaging.kafka.disconnects.count",
"messaging.kafka.write_bytes",
"messaging.kafka.read_bytes.count",
)
test(ctx, t, producer, rdr, want)
})
t.Run("Unknown error reason", func(t *testing.T) {
producer, rdr := setupTestProducer(t, nil)
Expand Down Expand Up @@ -185,10 +166,7 @@ func TestProducerMetrics(t *testing.T) {
},
}
require.NoError(t, producer.Close())
test(context.Background(), t, producer, rdr, []metricdata.Metrics{want},
"messaging.kafka.connect_errors.count",
"messaging.kafka.connects.count",
)
test(context.Background(), t, producer, rdr, []metricdata.Metrics{want})
})
t.Run("Produced", func(t *testing.T) {
producer, rdr := setupTestProducer(t, func(topic string) attribute.KeyValue {
Expand Down Expand Up @@ -269,7 +247,7 @@ func TestProducerMetrics(t *testing.T) {
},
{
Name: "messaging.kafka.write.latency",
Description: "Time took to write including waited before being written",
Description: "Time it took to write a batch including wait time before writing",
Unit: "s",
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
Expand Down Expand Up @@ -303,14 +281,7 @@ func TestProducerMetrics(t *testing.T) {
}},
},
}
test(context.Background(), t, producer, rdr, want,
"messaging.kafka.connects.count",
"messaging.kafka.disconnects.count",
"messaging.kafka.write_bytes",
"messaging.kafka.read_bytes.count",
"messaging.kafka.produce_bytes.count",
"messaging.kafka.produce_records.count",
)
test(context.Background(), t, producer, rdr, want)
})
t.Run("ProducedWithHeaders", func(t *testing.T) {
producer, rdr := setupTestProducer(t, func(topic string) attribute.KeyValue {
Expand Down Expand Up @@ -394,15 +365,7 @@ func TestProducerMetrics(t *testing.T) {
"key": "value",
"some key": "some value",
})
test(ctx, t, producer, rdr, want,
"messaging.kafka.connects.count",
"messaging.kafka.disconnects.count",
"messaging.kafka.write_bytes",
"messaging.kafka.read_bytes.count",
"messaging.kafka.produce_bytes.count",
"messaging.kafka.produce_records.count",
"messaging.kafka.write.latency", // header is not attached to this metric. skip check
)
test(ctx, t, producer, rdr, want)
})
}

Expand Down Expand Up @@ -459,11 +422,13 @@ func TestConsumerMetrics(t *testing.T) {
{
Value: int64(records),
Attributes: attribute.NewSet(
attribute.String("namespace", "name_space"),
semconv.MessagingSystem("kafka"),
semconv.MessagingSourceName(t.Name()),
semconv.MessagingKafkaSourcePartition(0),
attribute.String("compression.codec", "none"),
attribute.String("header", "included"),
semconv.MessagingKafkaSourcePartition(0),
semconv.MessagingSourceName(t.Name()),
semconv.MessagingSystem("kafka"),
attribute.String("namespace", "name_space"),
attribute.String("topic", "name_space-TestConsumerMetrics"),
),
},
},
Expand All @@ -477,40 +442,49 @@ func TestConsumerMetrics(t *testing.T) {
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{{
Attributes: attribute.NewSet(
attribute.String("namespace", "name_space"),
semconv.MessagingSystem("kafka"),
semconv.MessagingSourceName(t.Name()),
semconv.MessagingKafkaSourcePartition(0),
attribute.String("header", "included"),
semconv.MessagingKafkaSourcePartition(0),
semconv.MessagingSourceName(t.Name()),
semconv.MessagingSystem("kafka"),
attribute.String("namespace", "name_space"),
attribute.String("topic", "name_space-TestConsumerMetrics"),
),

Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
Count: uint64(records),
}},
},
},
}
{
Name: "messaging.kafka.read.latency",
Description: "Time it took to read a batch including wait time before reading",
Unit: "s",
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{{
Attributes: attribute.NewSet(
attribute.String("namespace", "name_space"),
semconv.MessagingSystem("kafka"),
attribute.String("outcome", "success"),
),

ignore := []string{
"messaging.kafka.connects.count",
"messaging.kafka.write_bytes",
"messaging.kafka.read_bytes.count",
"messaging.kafka.fetch_bytes.count",
"messaging.kafka.fetch_records.count",
"consumer.messages.wire.bytes",
"consumer.messages.uncompressed.bytes",
"messaging.kafka.write.latency",
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
}}},
},
}

metrics := filterMetrics(t, rm.ScopeMetrics)
assert.Len(t, metrics, len(wantMetrics)+len(ignore))

for k, m := range wantMetrics {
metric := metrics[k]
if slices.Contains(ignore, metric.Name) {
continue
for _, m := range wantMetrics {
var metric metricdata.Metrics
for _, mi := range metrics {
if mi.Name == m.Name {
metric = mi
break
}
}

assert.NotEmpty(t, metric)

// Remove time-specific data for histograms
if dp, ok := metric.Data.(metricdata.Histogram[float64]); ok {
for k := range dp.DataPoints {
Expand All @@ -527,6 +501,7 @@ func TestConsumerMetrics(t *testing.T) {
metric,
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
metricdatatest.IgnoreValue(),
)
}
}
Expand Down

0 comments on commit 5d223b7

Please sign in to comment.