Skip to content

Commit

Permalink
blockbuilder: handle non-terminal errors when processing record samp…
Browse files Browse the repository at this point in the history
…les (#9006)

* blockbuilder: handle non-terminal errors when processing record samples

Signed-off-by: Vladimir Varankin <[email protected]>

* extract checkTSDBAppendError out of tsdb builder

* more test for OOO samples

* fixup! extract checkTSDBAppendError out of tsdb builder

---------

Signed-off-by: Vladimir Varankin <[email protected]>
  • Loading branch information
narqo authored Aug 27, 2024
1 parent 4f99694 commit b3a9412
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 52 deletions.
56 changes: 52 additions & 4 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/grafana/mimir/pkg/mimirpb"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/util/globalerror"
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/validation"
)
Expand Down Expand Up @@ -135,8 +136,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
continue
}
}
// TODO(v): not all errors should terminate; see how it ingester handles them
return false, err

// Only abort the processing on a terminal error.
// TODO(v): add metrics for non-terminal errors
if err := checkTSDBAppendError(err); err != nil {
return false, err
}
}

for _, h := range ts.Histograms {
Expand Down Expand Up @@ -173,8 +178,12 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
continue
}
}
// TODO(v): not all errors should terminate; see how it ingester handles them
return false, err

// Only abort the processing on a terminal error.
// TODO(v): add metrics for non-terminal errors
if err := checkTSDBAppendError(err); err != nil {
return false, err
}
}

// Exemplars and metadata are not persisted in the block. So we skip them.
Expand All @@ -183,6 +192,45 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax
return allSamplesProcessed, app.Commit()
}

// checkTSDBAppendError checks if err is a non-terminal error, that should not block processing other series in the batch.
func checkTSDBAppendError(err error) error {
if err == nil {
return nil
}

// Check if error is one of the "soft errors" we can proceed on without terminating.
// Same as https://github.com/grafana/mimir/blob/1eb4b8e1e3293df100d7fc4df0c94712c31a0930/pkg/ingester/ingester.go#L1283-L1284
switch {
case errors.Is(err, storage.ErrOutOfBounds):
return nil
case errors.Is(err, storage.ErrOutOfOrderSample):
return nil
case errors.Is(err, storage.ErrTooOldSample):
return nil
case errors.Is(err, globalerror.SampleTooFarInFuture):
return nil
case errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
return nil
case errors.Is(err, globalerror.MaxSeriesPerUser):
return nil
case errors.Is(err, globalerror.MaxSeriesPerMetric):
return nil

// Map TSDB native histogram validation errors to soft errors.
case errors.Is(err, histogram.ErrHistogramCountMismatch):
return nil
case errors.Is(err, histogram.ErrHistogramCountNotBigEnough):
return nil
case errors.Is(err, histogram.ErrHistogramNegativeBucketCount):
return nil
case errors.Is(err, histogram.ErrHistogramSpanNegativeOffset):
return nil
case errors.Is(err, histogram.ErrHistogramSpansBucketsMismatch):
return nil
}
return err
}

func (b *TSDBBuilder) getOrCreateTSDB(tenant tsdbTenant) (*userTSDB, error) {
b.tsdbsMu.RLock()
db := b.tsdbs[tenant]
Expand Down
130 changes: 82 additions & 48 deletions pkg/blockbuilder/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func createWriteRequest(t *testing.T, suffix string, samples []mimirpb.Sample, h
return data
}

func floatSample(ts int64) []mimirpb.Sample {
return []mimirpb.Sample{{TimestampMs: ts, Value: float64(ts)}}
func floatSample(ts int64, val float64) []mimirpb.Sample {
return []mimirpb.Sample{{TimestampMs: ts, Value: val}}
}

func histogramSample(ts int64) []mimirpb.Histogram {
Expand All @@ -66,55 +66,52 @@ func histogramSample(ts int64) []mimirpb.Histogram {
}

func TestTSDBBuilder(t *testing.T) {
limits := defaultLimitsTestConfig()
limits.OutOfOrderTimeWindow = 2 * model.Duration(time.Hour)
limits.NativeHistogramsIngestionEnabled = true
overrides, err := validation.NewOverrides(limits, nil)
userID := strconv.Itoa(rand.Int())

// Set OOO window and other overrides for testing tenant.
limits := map[string]*validation.Limits{
userID: {
OutOfOrderTimeWindow: model.Duration(30 * time.Minute),
NativeHistogramsIngestionEnabled: true,
},
}
overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), validation.NewMockTenantLimits(limits))
require.NoError(t, err)

userID := strconv.Itoa(rand.Int())
// Add a sample for all the cases and check for correctness.
// Hold samples for all cases and check for the correctness.
var expSamples []mimirpb.Sample
var expHistograms []mimirpb.Histogram

// val is int64 to make calling this function concise.
createRequest := func(ts int64, accepted, isHistogram bool) *kgo.Record {
var samples []mimirpb.Sample
var histograms []mimirpb.Histogram
if isHistogram {
histograms = histogramSample(ts)
if accepted {
histograms[0].ResetHint = 0
expHistograms = append(expHistograms, histograms[0])
}
} else {
samples = floatSample(ts)
if accepted {
expSamples = append(expSamples, samples[0])
createRequest := func(userID string, samples []mimirpb.Sample, histograms []mimirpb.Histogram, mustAccept bool) *kgo.Record {
if mustAccept {
expSamples = append(expSamples, samples...)
for i := range histograms {
histograms[i].ResetHint = 0
expHistograms = append(expHistograms, histograms[i])
}
}

var rec kgo.Record
rec.Key = []byte(userID)
rec.Value = createWriteRequest(t, "", samples, histograms)
return &rec
return &kgo.Record{
Key: []byte(userID),
Value: createWriteRequest(t, "", samples, histograms),
}
}
addFloatSample := func(builder *TSDBBuilder, ts int64, lastEnd, currEnd int64, recordProcessedBefore, accepted bool) {
rec := createRequest(ts, accepted, false)
addFloatSample := func(builder *TSDBBuilder, ts int64, val float64, lastEnd, currEnd int64, recordProcessedBefore, wantAccepted bool) {
rec := createRequest(userID, floatSample(ts, val), nil, wantAccepted)
allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, recordProcessedBefore)
require.NoError(t, err)
require.Equal(t, accepted, allProcessed)
require.Equal(t, wantAccepted, allProcessed)
}
addHistogramSample := func(builder *TSDBBuilder, ts int64, lastEnd, currEnd int64, recordProcessedBefore, accepted bool) {
rec := createRequest(ts, accepted, true)
addHistogramSample := func(builder *TSDBBuilder, ts int64, lastEnd, currEnd int64, recordProcessedBefore, wantAccepted bool) {
rec := createRequest(userID, nil, histogramSample(ts), wantAccepted)
allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, recordProcessedBefore)
require.NoError(t, err)
require.Equal(t, accepted, allProcessed)
require.Equal(t, wantAccepted, allProcessed)
}

processingRange := time.Hour.Milliseconds()
blockRange := 2 * time.Hour.Milliseconds()
for _, tc := range []struct {

testCases := []struct {
name string
lastEnd, currEnd int64
verifyBlocksAfterCompaction func(blocks []*tsdb.Block)
Expand All @@ -124,7 +121,7 @@ func TestTSDBBuilder(t *testing.T) {
lastEnd: 2 * processingRange,
currEnd: 3 * processingRange,
verifyBlocksAfterCompaction: func(blocks []*tsdb.Block) {
require.Len(t, blocks, 4)
require.Len(t, blocks, 5) // 4 blocks for main userID, and 1 for ooo-user

lastEnd := 2 * processingRange
// One in-order and one out-of-order block for the previous range.
Expand All @@ -139,13 +136,12 @@ func TestTSDBBuilder(t *testing.T) {
require.Equal(t, lastEnd+blockRange, blocks[2].MaxTime())
},
},

{
name: "current start is at odd hour",
lastEnd: 3 * processingRange,
currEnd: 4 * processingRange,
verifyBlocksAfterCompaction: func(blocks []*tsdb.Block) {
require.Len(t, blocks, 2)
require.Len(t, blocks, 3) // 2 blocks for main userID, and 1 for ooo-user

currEnd := 4 * processingRange
// Both in-order and out-of-order blocks are in the same block range.
Expand All @@ -155,7 +151,9 @@ func TestTSDBBuilder(t *testing.T) {
require.Equal(t, currEnd, blocks[1].MaxTime())
},
},
} {
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
expSamples = expSamples[:0]
expHistograms = expHistograms[:0]
Expand All @@ -170,28 +168,35 @@ func TestTSDBBuilder(t *testing.T) {
// 1. Processing records that were processed before (they come first in real world).
// A. This sample is already processed. So it should be ignored but say all processed
// because it is already in a block.
addFloatSample(builder, lastEnd-10, lastEnd, currEnd, true, true)
addFloatSample(builder, lastEnd-10, 1, lastEnd, currEnd, true, true)
// Since this is already processed, it should not be added to the expected samples.
expSamples = expSamples[:0]
// B. This goes in this block.
addFloatSample(builder, lastEnd+100, lastEnd, currEnd, true, true)
addFloatSample(builder, lastEnd+100, 1, lastEnd, currEnd, true, true)
// C. This sample should be processed in the future.
addFloatSample(builder, currEnd+1, lastEnd, currEnd, true, false)
addFloatSample(builder, currEnd+1, 1, lastEnd, currEnd, true, false)

// 2. Processing records that were not processed before.
// A. Sample that belonged to previous processing period but came in late. Processed in current cycle.
addFloatSample(builder, lastEnd-5, lastEnd, currEnd, false, true)
addFloatSample(builder, lastEnd-5, 1, lastEnd, currEnd, false, true)
// B. Sample that belongs to the current processing period.
addFloatSample(builder, lastEnd+200, lastEnd, currEnd, false, true)
// C. This sample should be processed in the future.
addFloatSample(builder, currEnd+2, lastEnd, currEnd, false, false)
addFloatSample(builder, lastEnd+200, 1, lastEnd, currEnd, false, true)
// C. Sample that belongs to the current processing period but is a duplicate with different value.
addFloatSample(builder, lastEnd+200, 2, lastEnd, currEnd, false, true)
// The request is accepted, but its sample won't end up in the DB due to soft "ErrDuplicateSampleForTimestamp".
expSamples = expSamples[:len(expSamples)-1]
// D. This sample should be processed in the future.
addFloatSample(builder, currEnd+2, 1, lastEnd, currEnd, false, false)
// E. This sample is too old (soft error).
addFloatSample(builder, 0, 1, lastEnd, currEnd, false, true)
expSamples = expSamples[:len(expSamples)-1]

// 3. Out of order sample in a new record.
// A. In the current range but out of order w.r.t. the previous sample.
addFloatSample(builder, lastEnd+20, lastEnd, currEnd, false, true)
addFloatSample(builder, lastEnd+20, 1, lastEnd, currEnd, false, true)
// B. Before current range and out of order w.r.t. the previous sample. Already covered above, but this
// exists to explicitly state the case.
addFloatSample(builder, lastEnd-20, lastEnd, currEnd, false, true)
addFloatSample(builder, lastEnd-20, 1, lastEnd, currEnd, false, true)
}
{ // Add native histogram samples.
// 1.A from above.
Expand All @@ -215,6 +220,35 @@ func TestTSDBBuilder(t *testing.T) {

// 3.A and 3.B not done. TODO: do it when out-of-order histograms are supported.
}
{
// Out of order sample with no OOO window configured for the tenant.
userID := "test-ooo-tenant"

// This one goes into the block.
samples := floatSample(lastEnd+20, 1)
rec := createRequest(userID, samples, nil, false)
allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, false)
require.NoError(t, err)
require.True(t, allProcessed)
expOOOSamples := append([]mimirpb.Sample(nil), samples...)

// This one doesn't go into the block because of "ErrOutOfOrderSample" (soft error)
samples = floatSample(lastEnd-20, 1)
rec = createRequest(userID, samples, nil, false)
allProcessed, err = builder.Process(context.Background(), rec, lastEnd, currEnd, false)
require.NoError(t, err)
require.True(t, allProcessed)

tenant := tsdbTenant{
partitionID: rec.Partition,
tenantID: userID,
}
db, err := builder.getOrCreateTSDB(tenant)
require.NoError(t, err)

// Check expected out of order samples in the DB.
compareQuery(t, db.DB, expOOOSamples, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"))
}

// Query the TSDB for the expected samples.
tenant := tsdbTenant{
Expand All @@ -236,7 +270,7 @@ func TestTSDBBuilder(t *testing.T) {
newDB, err := tsdb.Open(shipperDir, log.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)

// One for the in-order current range. Two for the out-of-order blocks: ont for current range
// One for the in-order current range. Two for the out-of-order blocks: one for the current range
// and one for the previous range.
blocks := newDB.Blocks()
tc.verifyBlocksAfterCompaction(blocks)
Expand Down

0 comments on commit b3a9412

Please sign in to comment.