Skip to content

Commit

Permalink
blockbuilder: metrics for discarded samples due to non-terminal failures
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <[email protected]>
  • Loading branch information
narqo committed Oct 16, 2024
1 parent 9d8252f commit f69f4a2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
6 changes: 6 additions & 0 deletions pkg/blockbuilder/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,19 @@ func newBlockBuilderMetrics(reg prometheus.Registerer) blockBuilderMetrics {
}

type tsdbBuilderMetrics struct {
processSamplesDiscarded *prometheus.CounterVec
compactAndUploadDuration *prometheus.HistogramVec
compactAndUploadFailed *prometheus.CounterVec
}

func newTSDBBBuilderMetrics(reg prometheus.Registerer) tsdbBuilderMetrics {
var m tsdbBuilderMetrics

m.processSamplesDiscarded = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_blockbuilder_tsdb_process_samples_discarded_total",
Help: "The total number of samples that were discarded while processing records in one partition.",
}, []string{"partition"})

m.compactAndUploadDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_blockbuilder_tsdb_compact_and_upload_duration_seconds",
Help: "Time spent compacting and uploading a tsdb of one partition.",
Expand Down
31 changes: 21 additions & 10 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
}()

var (
labelsBuilder labels.ScratchBuilder
nonCopiedLabels labels.Labels
labelsBuilder labels.ScratchBuilder
nonCopiedLabels labels.Labels

allSamplesProcessed = true
discardedSamples = 0
)
for _, ts := range req.Timeseries {
mimirpb.FromLabelAdaptersOverwriteLabels(&labelsBuilder, ts.Labels, &nonCopiedLabels)
Expand Down Expand Up @@ -141,10 +143,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
}
}

// Only abort the processing on a terminal error.
// TODO(v): add metrics for non-terminal errors
if err := checkTSDBAppendError(err); err != nil {
return false, err
if err != nil {
// Only abort the processing on a terminal error.
if err := checkTSDBAppendError(err); err != nil {
return false, err
}
discardedSamples++
}
}

Expand Down Expand Up @@ -183,16 +187,23 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
}
}

// Only abort the processing on a terminal error.
// TODO(v): add metrics for non-terminal errors
if err := checkTSDBAppendError(err); err != nil {
return false, err
if err != nil {
// Only abort the processing on a terminal error.
if err := checkTSDBAppendError(err); err != nil {
return false, err
}
discardedSamples++
}
}

// Exemplars and metadata are not persisted in the block. So we skip them.
}

if discardedSamples > 0 {
partitionStr := fmt.Sprintf("%d", tenant.partitionID)
b.metrics.processSamplesDiscarded.WithLabelValues(partitionStr).Add(float64(discardedSamples))
}

return allSamplesProcessed, app.Commit()
}

Expand Down

0 comments on commit f69f4a2

Please sign in to comment.