Skip to content

Commit

Permalink
kafka replay speed: rename existing cortex_ingest_storage_reader_reco…
Browse files Browse the repository at this point in the history
…rds_* metrics (#9654)

* kafka replay speed: rename existing cortex_ingest_storage_reader_records_* metrics

Because of the batching behaviour of the sharding pushers, there is no longer a 1:1 matching between the kafka records and the errors we get from the TSDB. Because of this, the existing metrics `cortex_ingest_storage_reader_records_failed_total` and `cortex_ingest_storage_reader_records_total` no longer make sense.

This PR renames them to `cortex_ingest_storage_reader_requests_total` and `cortex_ingest_storage_reader_requests_failed_total` while keeping the alerts and dashboards working. It also fixes a bug in how `requests_total` is tracked - it used to be only tracked when there is an error.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix jsonnet templating

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Include comments

Co-authored-by: Marco Pracucci <[email protected]>

* Fix metric help

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Regenerate mixins

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix jsonnet

Signed-off-by: Dimitar Dimitrov <[email protected]>

* upate mixin?

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: Marco Pracucci <[email protected]>
  • Loading branch information
dimitarvdimitrov and pracucci authored Oct 17, 2024
1 parent 16f7f5f commit 9de001e
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,12 @@ spec:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to consume write requests read from Kafka due to internal errors.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterfailstoprocessrecordsfromkafka
expr: |
sum by (cluster, namespace, pod) (rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])) > 0
sum by (cluster, namespace, pod) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])
or
rate(cortex_ingest_storage_reader_requests_failed_total{cause="server"}[1m])
) > 0
for: 5m
labels:
severity: critical
Expand All @@ -1117,7 +1122,12 @@ spec:
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterstuckprocessingrecordsfromkafka
expr: |
# Alert if the reader is not processing any records, but there buffered records to process in the Kafka client.
(sum by (cluster, namespace, pod) (rate(cortex_ingest_storage_reader_records_total[5m])) == 0)
(sum by (cluster, namespace, pod) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_total[5m])
or
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
Expand Down
14 changes: 12 additions & 2 deletions operations/mimir-mixin-compiled-baremetal/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,12 @@ groups:
message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to consume write requests read from Kafka due to internal errors.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterfailstoprocessrecordsfromkafka
expr: |
sum by (cluster, namespace, instance) (rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])) > 0
sum by (cluster, namespace, instance) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])
or
rate(cortex_ingest_storage_reader_requests_failed_total{cause="server"}[1m])
) > 0
for: 5m
labels:
severity: critical
Expand All @@ -1091,7 +1096,12 @@ groups:
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterstuckprocessingrecordsfromkafka
expr: |
# Alert if the reader is not processing any records, but there buffered records to process in the Kafka client.
(sum by (cluster, namespace, instance) (rate(cortex_ingest_storage_reader_records_total[5m])) == 0)
(sum by (cluster, namespace, instance) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_total[5m])
or
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
Expand Down
14 changes: 12 additions & 2 deletions operations/mimir-mixin-compiled/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,12 @@ groups:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to consume write requests read from Kafka due to internal errors.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterfailstoprocessrecordsfromkafka
expr: |
sum by (cluster, namespace, pod) (rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])) > 0
sum by (cluster, namespace, pod) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])
or
rate(cortex_ingest_storage_reader_requests_failed_total{cause="server"}[1m])
) > 0
for: 5m
labels:
severity: critical
Expand All @@ -1105,7 +1110,12 @@ groups:
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterstuckprocessingrecordsfromkafka
expr: |
# Alert if the reader is not processing any records, but there buffered records to process in the Kafka client.
(sum by (cluster, namespace, pod) (rate(cortex_ingest_storage_reader_records_total[5m])) == 0)
(sum by (cluster, namespace, pod) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_total[5m])
or
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
Expand Down
14 changes: 12 additions & 2 deletions operations/mimir-mixin/alerts/ingest-storage.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@
alert: $.alertName('IngesterFailsToProcessRecordsFromKafka'),
'for': '5m',
expr: |||
sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])) > 0
sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])
or
rate(cortex_ingest_storage_reader_requests_failed_total{cause="server"}[1m])
) > 0
||| % $._config,
labels: {
severity: 'critical',
Expand All @@ -139,7 +144,12 @@
'for': '5m',
expr: |||
# Alert if the reader is not processing any records, but there buffered records to process in the Kafka client.
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (rate(cortex_ingest_storage_reader_records_total[5m])) == 0)
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_total[5m])
or
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
Expand Down
30 changes: 25 additions & 5 deletions operations/mimir-mixin/dashboards/writes.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,32 @@ local filename = 'mimir-writes.json';
$.queryPanel(
[
|||
sum(rate(cortex_ingest_storage_reader_records_total{%s}[$__rate_interval]))
sum(
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_total{%s}[$__rate_interval])
or
rate(cortex_ingest_storage_reader_requests_total{%s}[$__rate_interval])
)
-
sum(rate(cortex_ingest_storage_reader_records_failed_total{%s}[$__rate_interval]))
||| % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)],
'sum (rate (cortex_ingest_storage_reader_records_failed_total{%s, cause="client"}[$__rate_interval]))' % [$.jobMatcher($._config.job_names.ingester)],
'sum (rate (cortex_ingest_storage_reader_records_failed_total{%s, cause="server"}[$__rate_interval]))' % [$.jobMatcher($._config.job_names.ingester)],
sum(
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{%s}[$__rate_interval])
or
rate(cortex_ingest_storage_reader_requests_failed_total{%s}[$__rate_interval])
)
||| % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)],
'sum (
# This is the old metric name. We\'re keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{%s, cause="client"}[$__rate_interval])
or
rate(cortex_ingest_storage_reader_requests_failed_total{%s, cause="client"}[$__rate_interval])
)' % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)],
'sum (
# This is the old metric name. We\'re keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{%s, cause="server"}[$__rate_interval])
or
rate(cortex_ingest_storage_reader_requests_failed_total{%s, cause="server"}[$__rate_interval])
)' % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)],
],
[
'successful',
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (p *parallelStorageShards) run(queue *batchingQueue) {
// The error handler needs to determine if this is a server error or not.
// If it is, we need to stop processing as the batch will be retried. When is not (client error), it'll log it, and we can continue processing.
p.metrics.processingTime.WithLabelValues(requestContents(wr.WriteRequest)).Observe(time.Since(processingStart).Seconds())
if err != nil && p.errorHandler.IsServerError(wr.Context, err) {
if p.errorHandler.IsServerError(wr.Context, err) {
queue.ErrorChannel() <- err
}
}
Expand Down Expand Up @@ -433,10 +433,10 @@ func (p *pushErrorHandler) IsServerError(ctx context.Context, err error) bool {
// For the sake of simplicity, let's increment the total requests counter here.
p.metrics.totalRequests.Inc()

spanLog := spanlogger.FromContext(ctx, p.fallbackLogger)
if err == nil {
return false
}
spanLog := spanlogger.FromContext(ctx, p.fallbackLogger)

// Only return non-client errors; these will stop the processing of the current Kafka fetches and retry (possibly).
if !mimirpb.IsClientError(err) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/ingest/pusher_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type storagePusherMetrics struct {
// newStoragePusherMetrics creates a new storagePusherMetrics instance.
func newStoragePusherMetrics(reg prometheus.Registerer) *storagePusherMetrics {
errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_records_failed_total",
Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.",
Name: "cortex_ingest_storage_reader_requests_failed_total",
Help: "Number of write requests which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.",
}, []string{"cause"})

return &storagePusherMetrics{
Expand All @@ -70,8 +70,8 @@ func newStoragePusherMetrics(reg prometheus.Registerer) *storagePusherMetrics {
clientErrRequests: errRequestsCounter.WithLabelValues("client"),
serverErrRequests: errRequestsCounter.WithLabelValues("server"),
totalRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_records_total",
Help: "Number of attempted records (write requests).",
Name: "cortex_ingest_storage_reader_requests_total",
Help: "Number of attempted write requests after batching records from Kafka.",
}),
}
}
Expand Down
54 changes: 37 additions & 17 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,11 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin

assert.Contains(t, logs.String(), pusherErr.Error())
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_records_failed_total Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_records_failed_total counter
cortex_ingest_storage_reader_records_failed_total{cause="client"} 1
cortex_ingest_storage_reader_records_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_records_failed_total"))
# HELP cortex_ingest_storage_reader_requests_failed_total Number of write requests which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_requests_failed_total counter
cortex_ingest_storage_reader_requests_failed_total{cause="client"} 1
cortex_ingest_storage_reader_requests_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_requests_failed_total"))
})

t.Run("should log a client error if does implement optional logging interface and ShouldLog() returns true", func(t *testing.T) {
Expand All @@ -344,11 +344,11 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin

assert.Contains(t, logs.String(), fmt.Sprintf("%s (sampled 1/100)", pusherErr.Error()))
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_records_failed_total Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_records_failed_total counter
cortex_ingest_storage_reader_records_failed_total{cause="client"} 1
cortex_ingest_storage_reader_records_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_records_failed_total"))
# HELP cortex_ingest_storage_reader_requests_failed_total Number of write requests which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_requests_failed_total counter
cortex_ingest_storage_reader_requests_failed_total{cause="client"} 1
cortex_ingest_storage_reader_requests_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_requests_failed_total"))
})

t.Run("should not log a client error if does implement optional logging interface and ShouldLog() returns false", func(t *testing.T) {
Expand All @@ -365,11 +365,11 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin

assert.Empty(t, logs.String())
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_records_failed_total Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_records_failed_total counter
cortex_ingest_storage_reader_records_failed_total{cause="client"} 1
cortex_ingest_storage_reader_records_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_records_failed_total"))
# HELP cortex_ingest_storage_reader_requests_failed_total Number of write requests which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_requests_failed_total counter
cortex_ingest_storage_reader_requests_failed_total{cause="client"} 1
cortex_ingest_storage_reader_requests_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_requests_failed_total"))
})

}
Expand Down Expand Up @@ -666,12 +666,18 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) {
pusher := &mockPusher{}
// run with a buffer of one, so some of the tests can fill the buffer and test the error handling
const buffer = 1
metrics := newStoragePusherMetrics(prometheus.NewPedanticRegistry())
reg := prometheus.NewPedanticRegistry()
metrics := newStoragePusherMetrics(reg)
errorHandler := newPushErrorHandler(metrics, nil, log.NewNopLogger())
shardingP := newParallelStorageShards(metrics, errorHandler, tc.shardCount, tc.batchSize, buffer, pusher, labels.StableHash)

upstreamPushErrsCount := 0
for i, req := range tc.expectedUpstreamPushes {
pusher.On("PushToStorage", mock.Anything, req).Return(tc.upstreamPushErrs[i])
err := tc.upstreamPushErrs[i]
pusher.On("PushToStorage", mock.Anything, req).Return(err)
if err != nil {
upstreamPushErrsCount++
}
}
var actualPushErrs []error
for _, req := range tc.requests {
Expand All @@ -695,6 +701,20 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) {
require.ErrorIs(t, closeErr, tc.expectedCloseErr)
pusher.AssertNumberOfCalls(t, "PushToStorage", len(tc.expectedUpstreamPushes))
pusher.AssertExpectations(t)

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingest_storage_reader_requests_total Number of attempted write requests after batching records from Kafka.
# TYPE cortex_ingest_storage_reader_requests_total counter
cortex_ingest_storage_reader_requests_total %d
# HELP cortex_ingest_storage_reader_requests_failed_total Number of write requests which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_requests_failed_total counter
cortex_ingest_storage_reader_requests_failed_total{cause="server"} %d
cortex_ingest_storage_reader_requests_failed_total{cause="client"} 0
`, len(tc.expectedUpstreamPushes), upstreamPushErrsCount)),
"cortex_ingest_storage_reader_requests_total",
"cortex_ingest_storage_reader_requests_failed_total",
),
)
})
}
}
Expand Down

0 comments on commit 9de001e

Please sign in to comment.