diff --git a/pkg/blockbuilder/metrics.go b/pkg/blockbuilder/metrics.go index 59ad431603e..d9db46c6a4f 100644 --- a/pkg/blockbuilder/metrics.go +++ b/pkg/blockbuilder/metrics.go @@ -44,6 +44,7 @@ func newBlockBuilderMetrics(reg prometheus.Registerer) blockBuilderMetrics { } type tsdbBuilderMetrics struct { + processSamplesDiscarded *prometheus.CounterVec compactAndUploadDuration *prometheus.HistogramVec compactAndUploadFailed *prometheus.CounterVec } @@ -51,6 +52,11 @@ type tsdbBuilderMetrics struct { func newTSDBBBuilderMetrics(reg prometheus.Registerer) tsdbBuilderMetrics { var m tsdbBuilderMetrics + m.processSamplesDiscarded = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_blockbuilder_tsdb_process_samples_discarded_total", + Help: "The total number of samples that were discarded while processing records in one partition.", + }, []string{"partition"}) + m.compactAndUploadDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_blockbuilder_tsdb_compact_and_upload_duration_seconds", Help: "Time spent compacting and uploading a tsdb of one partition.", diff --git a/pkg/blockbuilder/tsdb.go b/pkg/blockbuilder/tsdb.go index 97b0fc82d09..d0455d209e3 100644 --- a/pkg/blockbuilder/tsdb.go +++ b/pkg/blockbuilder/tsdb.go @@ -106,9 +106,11 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax }() var ( - labelsBuilder labels.ScratchBuilder - nonCopiedLabels labels.Labels + labelsBuilder labels.ScratchBuilder + nonCopiedLabels labels.Labels + allSamplesProcessed = true + discardedSamples = 0 ) for _, ts := range req.Timeseries { mimirpb.FromLabelAdaptersOverwriteLabels(&labelsBuilder, ts.Labels, &nonCopiedLabels) @@ -141,10 +143,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax } } - // Only abort the processing on a terminal error. - // TODO(v): add metrics for non-terminal errors - if err := checkTSDBAppendError(err); err != nil { - return false, err + if err != nil { + // Only abort the processing on a terminal error. + if err := checkTSDBAppendError(err); err != nil { + return false, err + } + discardedSamples++ } } @@ -183,16 +187,23 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax } } - // Only abort the processing on a terminal error. - // TODO(v): add metrics for non-terminal errors - if err := checkTSDBAppendError(err); err != nil { - return false, err + if err != nil { + // Only abort the processing on a terminal error. + if err := checkTSDBAppendError(err); err != nil { + return false, err + } + discardedSamples++ } } // Exemplars and metadata are not persisted in the block. So we skip them. } + if discardedSamples > 0 { + partitionStr := fmt.Sprintf("%d", tenant.partitionID) + b.metrics.processSamplesDiscarded.WithLabelValues(partitionStr).Add(float64(discardedSamples)) + } + return allSamplesProcessed, app.Commit() }