Skip to content

Commit

Permalink
Enforce per-query chunks limit earlier when streaming chunks from ing…
Browse files Browse the repository at this point in the history
…esters to queriers (#5369)

* Add initial attempt at enforcing chunks limit earlier.

* Update unit tests to match new behaviour.

* Use changes from grafana/mimir-prometheus#512

* Add changelog entry.

* Fix linting issue.

* Simplify methods not expected to be used.
  • Loading branch information
charleskorn authored Jul 7, 2023
1 parent 4819a55 commit 188ce1c
Show file tree
Hide file tree
Showing 17 changed files with 367 additions and 193 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [ENHANCEMENT] Querier: add `cortex_querier_queries_rejected_total` metric that counts the number of queries rejected due to hitting a limit (eg. max series per query or max chunks per query). #5316 #5440
* [ENHANCEMENT] Querier: add experimental `-querier.minimize-ingester-requests-hedging-delay` option to initiate requests to further ingesters when request minimisation is enabled and not all initial requests have completed. #5368
* [ENHANCEMENT] Clarify docs for `-ingester.client.*` flags to make it clear that these are used by both queriers and distributors. #5375
* [ENHANCEMENT] Querier: enforce `max-chunks-per-query` limit earlier in query processing when streaming chunks from ingesters to queriers to avoid unnecessarily consuming resources for queries that will be aborted. #5369
* [ENHANCEMENT] Ingester: added `cortex_ingester_shipper_last_successful_upload_timestamp_seconds` metric tracking the last successful TSDB block uploaded to the bucket (unix timestamp in seconds). #5396
* [BUGFIX] Ingester: Handle when previous ring state is leaving and the number of tokens has changed. #5204
* [BUGFIX] Querier: fix issue where queries that use the `timestamp()` function fail with `execution: attempted to read series at index 0 from stream, but the stream has already been exhausted` if streaming chunks from ingesters to queriers is enabled. #5370
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ require (
)

// Using a fork of Prometheus with Mimir-specific changes.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20230706135548-245a68172fbe
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20230706234245-f3697f524295

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -873,8 +873,8 @@ github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 h1:WB3bGH2f1UN6
github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20230706135548-245a68172fbe h1:8c4p0PhhhD5G4Ueessf+BF2cS5KYXbryPUFvBdKhYm0=
github.com/grafana/mimir-prometheus v0.0.0-20230706135548-245a68172fbe/go.mod h1:fkSyvELGooBfCA+cH2ziQvMpbQFdyopvJLEBEmNg2zk=
github.com/grafana/mimir-prometheus v0.0.0-20230706234245-f3697f524295 h1:cPynDsMdfh+qDDnAkJy27J5mMTBpYpqHVFVi2/d/o2E=
github.com/grafana/mimir-prometheus v0.0.0-20230706234245-f3697f524295/go.mod h1:fkSyvELGooBfCA+cH2ziQvMpbQFdyopvJLEBEmNg2zk=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6 h1:A3dhViTeFDSQcGOXuUi6ukCQSMyDtDISBp2z6OOo2YM=
Expand Down
69 changes: 61 additions & 8 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2912,6 +2912,7 @@ type prepConfig struct {
ingestersSeriesCountTotal uint64
ingesterZones []string
labelNamesStreamZonesResponseDelay map[string]time.Duration
preferStreamingChunks bool

timeOut bool
}
Expand Down Expand Up @@ -3022,6 +3023,8 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
distributorCfg.DefaultLimits.MaxInflightPushRequestsBytes = cfg.maxInflightRequestsBytes
distributorCfg.DefaultLimits.MaxIngestionRate = cfg.maxIngestionRate
distributorCfg.ShuffleShardingLookbackPeriod = time.Hour
distributorCfg.PreferStreamingChunks = cfg.preferStreamingChunks
distributorCfg.StreamingChunksPerIngesterSeriesBufferSize = 128

cfg.limits.IngestionTenantShardSize = cfg.shuffleShardSize

Expand Down Expand Up @@ -3493,12 +3496,25 @@ func (i *mockIngester) QueryStream(_ context.Context, req *client.QueryRequest,
return nil, err
}

results := []*client.QueryStreamResponse{}
nonStreamingResponses := []*client.QueryStreamResponse{}
streamingLabelResponses := []*client.QueryStreamResponse{}
streamingChunkResponses := []*client.QueryStreamResponse{}

series := make([]*mimirpb.PreallocTimeseries, 0, len(i.timeseries))

for _, ts := range i.timeseries {
if !match(ts.Labels, matchers) {
continue
}

series = append(series, ts)
}

slices.SortFunc(series, func(a, b *mimirpb.PreallocTimeseries) bool {
return labels.Compare(mimirpb.FromLabelAdaptersToLabels(a.Labels), mimirpb.FromLabelAdaptersToLabels(b.Labels)) < 0
})

for seriesIndex, ts := range series {
c, err := chunk.NewForEncoding(chunk.PrometheusXorChunk)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3574,15 +3590,48 @@ func (i *mockIngester) QueryStream(_ context.Context, req *client.QueryRequest,
}
}

results = append(results, &client.QueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{
{
Labels: ts.Labels,
Chunks: wireChunks,
if req.StreamingChunksBatchSize > 0 {
streamingLabelResponses = append(streamingLabelResponses, &client.QueryStreamResponse{
StreamingSeries: []client.QueryStreamSeries{
{
Labels: ts.Labels,
ChunkCount: int64(len(wireChunks)),
},
},
},
})
})

streamingChunkResponses = append(streamingChunkResponses, &client.QueryStreamResponse{
StreamingSeriesChunks: []client.QueryStreamSeriesChunks{
{
SeriesIndex: uint64(seriesIndex),
Chunks: wireChunks,
},
},
})
} else {
nonStreamingResponses = append(nonStreamingResponses, &client.QueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{
{
Labels: ts.Labels,
Chunks: wireChunks,
},
},
})
}
}

var results []*client.QueryStreamResponse

if req.StreamingChunksBatchSize > 0 {
endOfLabelsMessage := &client.QueryStreamResponse{
IsEndOfSeriesStream: true,
}
results = append(streamingLabelResponses, endOfLabelsMessage)
results = append(results, streamingChunkResponses...)
} else {
results = nonStreamingResponses
}

return &stream{
results: results,
}, nil
Expand Down Expand Up @@ -3874,6 +3923,10 @@ func (s *stream) Recv() (*client.QueryStreamResponse, error) {
return result, nil
}

func (s *stream) Context() context.Context {
return context.Background()
}

func (i *mockIngester) AllUserStats(context.Context, *client.UserStatsRequest, ...grpc.CallOption) (*client.UsersStatsResponse, error) {
return &i.stats, nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return ingesterQueryResult{}, limitErr
}

// We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves.
if chunkLimitErr := queryLimiter.AddChunks(int(s.ChunkCount)); chunkLimitErr != nil {
return ingesterQueryResult{}, chunkLimitErr
}

labelsBatch = append(labelsBatch, mimirpb.FromLabelAdaptersToLabels(s.Labels))
}

Expand Down
96 changes: 53 additions & 43 deletions pkg/distributor/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,56 +34,66 @@ import (
func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReached(t *testing.T) {
const maxChunksLimit = 30 // Chunks are duplicated due to replication factor.

ctx := user.InjectOrgID(context.Background(), "user")
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.MaxChunksPerQuery = maxChunksLimit
for _, streamingEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("streaming enabled: %v", streamingEnabled), func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.MaxChunksPerQuery = maxChunksLimit

// Prepare distributors.
ds, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
limits: limits,
preferStreamingChunks: streamingEnabled,
})

// Push a number of series below the max chunks limit. Each series has 1 sample,
// so expect 1 chunk per series when querying back.
initialSeries := maxChunksLimit / 3
writeReq := makeWriteRequest(0, initialSeries, 0, false, false)
writeRes, err := ds[0].Push(ctx, writeReq)
require.Equal(t, &mimirpb.WriteResponse{}, writeRes)
require.Nil(t, err)

allSeriesMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"),
}

// Prepare distributors.
ds, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
limits: limits,
})
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit, stats.NewQueryMetrics(prometheus.NewPedanticRegistry())))

ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit, stats.NewQueryMetrics(prometheus.NewPedanticRegistry())))
// Since the number of series (and thus chunks) is equal to the limit (but doesn't
// exceed it), we expect a query running on all series to succeed.
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.NoError(t, err)

// Push a number of series below the max chunks limit. Each series has 1 sample,
// so expect 1 chunk per series when querying back.
initialSeries := maxChunksLimit / 3
writeReq := makeWriteRequest(0, initialSeries, 0, false, false)
writeRes, err := ds[0].Push(ctx, writeReq)
assert.Equal(t, &mimirpb.WriteResponse{}, writeRes)
assert.Nil(t, err)
if streamingEnabled {
require.Len(t, queryRes.StreamingSeries, initialSeries)
} else {
require.Len(t, queryRes.Chunkseries, initialSeries)
}

allSeriesMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"),
}
// Push more series to exceed the limit once we'll query back all series.
writeReq = &mimirpb.WriteRequest{}
for i := 0; i < maxChunksLimit; i++ {
writeReq.Timeseries = append(writeReq.Timeseries,
makeWriteRequestTimeseries([]mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series_%d", i)}}, 0, 0),
)
}

// Since the number of series (and thus chunks) is equal to the limit (but doesn't
// exceed it), we expect a query running on all series to succeed.
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.NoError(t, err)
assert.Len(t, queryRes.Chunkseries, initialSeries)
writeRes, err = ds[0].Push(ctx, writeReq)
require.Equal(t, &mimirpb.WriteResponse{}, writeRes)
require.Nil(t, err)

// Push more series to exceed the limit once we'll query back all series.
writeReq = &mimirpb.WriteRequest{}
for i := 0; i < maxChunksLimit; i++ {
writeReq.Timeseries = append(writeReq.Timeseries,
makeWriteRequestTimeseries([]mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series_%d", i)}}, 0, 0),
)
// Since the number of series (and thus chunks) is exceeding to the limit, we expect
// a query running on all series to fail.
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.Error(t, err)
require.ErrorContains(t, err, "the query exceeded the maximum number of chunks")
})
}

writeRes, err = ds[0].Push(ctx, writeReq)
assert.Equal(t, &mimirpb.WriteResponse{}, writeRes)
assert.Nil(t, err)

// Since the number of series (and thus chunks) is exceeding to the limit, we expect
// a query running on all series to fail.
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.Error(t, err)
assert.ErrorContains(t, err, "the query exceeded the maximum number of chunks")
}

func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReached(t *testing.T) {
Expand Down
Loading

0 comments on commit 188ce1c

Please sign in to comment.