diff --git a/CHANGELOG.md b/CHANGELOG.md index b59f910e9c..5c5e5ebbaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ * [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508 * [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508 * [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508 +* [BUGFIX] Fix issue where sharded queries could return annotations with incorrect or confusing position information. #9536 ### Mixin diff --git a/pkg/frontend/querymiddleware/querysharding.go b/pkg/frontend/querymiddleware/querysharding.go index dd7d7da55e..1fe6a186de 100644 --- a/pkg/frontend/querymiddleware/querysharding.go +++ b/pkg/frontend/querymiddleware/querysharding.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/tenant" + "github.com/grafana/regexp" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -175,9 +176,11 @@ func ExecuteQueryOnQueryable(ctx context.Context, r MetricsQueryRequest, engine if annotationAccumulator != nil { // Add any annotations returned by the sharded queries, and remove any duplicates. + // We remove any position information for the same reason as above: the position information + // relates to the rewritten expression sent to queriers, not the original expression provided by the user. accumulatedWarnings, accumulatedInfos := annotationAccumulator.getAll() - warn = append(warn, accumulatedWarnings...) - info = append(info, accumulatedInfos...) + warn = append(warn, removeAllAnnotationPositionInformation(accumulatedWarnings)...) + info = append(info, removeAllAnnotationPositionInformation(accumulatedInfos)...) warn = removeDuplicates(warn) info = removeDuplicates(info) } @@ -574,3 +577,19 @@ func removeDuplicates(s []string) []string { slices.Sort(s) return slices.Compact(s) } + +var annotationPositionPattern = regexp.MustCompile(`\s+\(\d+:\d+\)$`) + +func removeAnnotationPositionInformation(annotation string) string { + return annotationPositionPattern.ReplaceAllLiteralString(annotation, "") +} + +// removeAllAnnotationPositionInformation removes position information from each annotation in annotations, +// modifying annotations in-place and returning it for convenience. +func removeAllAnnotationPositionInformation(annotations []string) []string { + for i, annotation := range annotations { + annotations[i] = removeAnnotationPositionInformation(annotation) + } + + return annotations +} diff --git a/pkg/frontend/querymiddleware/querysharding_test.go b/pkg/frontend/querymiddleware/querysharding_test.go index ef6b7cda60..c2809acc39 100644 --- a/pkg/frontend/querymiddleware/querysharding_test.go +++ b/pkg/frontend/querymiddleware/querysharding_test.go @@ -108,7 +108,7 @@ func approximatelyEqualsSamples(t *testing.T, a, b *PrometheusResponse) { func approximatelyEquals(t *testing.T, a, b *PrometheusResponse) { approximatelyEqualsSamples(t, a, b) require.ElementsMatch(t, a.Infos, b.Infos, "expected same info annotations") - require.ElementsMatch(t, a.Warnings, b.Warnings, "expected same info annotations") + require.ElementsMatch(t, a.Warnings, b.Warnings, "expected same warning annotations") } func compareExpectedAndActual(t *testing.T, expectedTs, actualTs int64, expectedVal, actualVal float64, j int, labels []mimirpb.LabelAdapter, sampleType string, tolerance float64) { @@ -707,8 +707,9 @@ func TestQuerySharding_Correctness(t *testing.T) { t.Run(fmt.Sprintf("%T", req), func(t *testing.T) { engine := newEngine() downstream := &downstreamHandler{ - engine: engine, - queryable: queryable, + engine: engine, + queryable: queryable, + includePositionInformationInAnnotations: true, } // Run the query without sharding. @@ -723,6 +724,12 @@ func TestQuerySharding_Correctness(t *testing.T) { require.NotEmpty(t, expectedPrometheusRes.Data.Result) requireValidSamples(t, expectedPrometheusRes.Data.Result) + if testData.expectedShardedQueries > 0 { + // Remove position information from annotations, to mirror what we expect from the sharded queries below. + removeAllAnnotationPositionInformation(expectedPrometheusRes.Infos) + removeAllAnnotationPositionInformation(expectedPrometheusRes.Warnings) + } + for _, numShards := range []int{2, 4, 8, 16} { t.Run(fmt.Sprintf("shards=%d", numShards), func(t *testing.T) { reg := prometheus.NewPedanticRegistry() @@ -1050,24 +1057,31 @@ func TestQuerySharding_FunctionCorrectness(t *testing.T) { {fn: "histogram_stdvar"}, {fn: "histogram_stddev"}, } - queryableFloats := storageSeriesQueryable([]*promql.StorageSeries{ - newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "barr"), start.Add(-lookbackDelta), end, step, factor(5)), - newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "bazz"), start.Add(-lookbackDelta), end, step, factor(7)), - newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(12)), - newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bozz"), start.Add(-lookbackDelta), end, step, factor(11)), - newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(8)), - newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bazz"), start.Add(-lookbackDelta), end, step, arithmeticSequence(10)), + + t.Run("floats", func(t *testing.T) { + queryableFloats := storageSeriesQueryable([]*promql.StorageSeries{ + newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "barr"), start.Add(-lookbackDelta), end, step, factor(5)), + newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "bazz"), start.Add(-lookbackDelta), end, step, factor(7)), + newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(12)), + newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bozz"), start.Add(-lookbackDelta), end, step, factor(11)), + newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(8)), + newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bazz"), start.Add(-lookbackDelta), end, step, arithmeticSequence(10)), + }) + testQueryShardingFunctionCorrectness(t, queryableFloats, append(testsForBoth, testsForFloatsOnly...), testsForNativeHistogramsOnly) }) - testQueryShardingFunctionCorrectness(t, queryableFloats, append(testsForBoth, testsForFloatsOnly...), testsForNativeHistogramsOnly) - queryableNativeHistograms := storageSeriesQueryable([]*promql.StorageSeries{ - newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "barr"), start.Add(-lookbackDelta), end, step, factor(5)), - newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "bazz"), start.Add(-lookbackDelta), end, step, factor(7)), - newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(12)), - newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bozz"), start.Add(-lookbackDelta), end, step, factor(11)), - newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(8)), - newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bazz"), start.Add(-lookbackDelta), end, step, arithmeticSequence(10)), + + t.Run("native histograms", func(t *testing.T) { + queryableNativeHistograms := storageSeriesQueryable([]*promql.StorageSeries{ + newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "barr"), start.Add(-lookbackDelta), end, step, factor(5)), + newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "bazz"), start.Add(-lookbackDelta), end, step, factor(7)), + newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(12)), + newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bozz"), start.Add(-lookbackDelta), end, step, factor(11)), + newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(8)), + newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bazz"), start.Add(-lookbackDelta), end, step, arithmeticSequence(10)), + }) + + testQueryShardingFunctionCorrectness(t, queryableNativeHistograms, append(testsForBoth, testsForNativeHistogramsOnly...), testsForFloatsOnly) }) - testQueryShardingFunctionCorrectness(t, queryableNativeHistograms, append(testsForBoth, testsForNativeHistogramsOnly...), testsForFloatsOnly) } func testQueryShardingFunctionCorrectness(t *testing.T, queryable storage.Queryable, tests []queryShardingFunctionCorrectnessTest, testsToIgnore []queryShardingFunctionCorrectnessTest) { @@ -1778,16 +1792,41 @@ func TestQuerySharding_Annotations(t *testing.T) { reg, ) downstream := &downstreamHandler{ - engine: engine, - queryable: queryable, + engine: engine, + queryable: queryable, + includePositionInformationInAnnotations: true, } type template struct { query string isWarning bool + isSharded bool } - templates := []template{{"quantile(10, %s)", true}, {"quantile(10, sum(%s))", true}, {"rate(%s[1m])", false}, {"increase(%s[1m])", false}} + templates := []template{ + { + query: "quantile(10, %s)", + isWarning: true, + }, + { + query: "quantile(10, sum(%s))", + isWarning: true, + isSharded: true, + }, + { + query: "rate(%s[1m])", + isWarning: false, + }, + { + query: "increase(%s[1m])", + isWarning: false, + }, + { + query: "sum(rate(%s[1m]))", + isWarning: false, + isSharded: true, + }, + } for _, template := range templates { t.Run(template.query, func(t *testing.T) { query := fmt.Sprintf(template.query, seriesName) @@ -1804,6 +1843,7 @@ func TestQuerySharding_Annotations(t *testing.T) { // Run the query without sharding. expectedRes, err := downstream.Do(injectedContext, req) require.Nil(t, err) + expectedPrometheusRes := expectedRes.(*PrometheusResponse) // Ensure the query produces some results. require.NotEmpty(t, expectedRes.(*PrometheusResponse).Data.Result) @@ -1822,19 +1862,25 @@ func TestQuerySharding_Annotations(t *testing.T) { // Ensure the query produces some results. require.NotEmpty(t, splitRes.(*PrometheusResponse).Data.Result) + expected := expectedPrometheusRes.Infos + actualSharded := shardedRes.(*PrometheusResponse).Infos + actualSplit := splitRes.(*PrometheusResponse).Infos + if template.isWarning { - require.NotEmpty(t, expectedRes.(*PrometheusResponse).Warnings) - require.NotEmpty(t, shardedRes.(*PrometheusResponse).Warnings) - require.NotEmpty(t, splitRes.(*PrometheusResponse).Warnings) - require.Equal(t, expectedRes.(*PrometheusResponse).Warnings, shardedRes.(*PrometheusResponse).Warnings) - require.Equal(t, expectedRes.(*PrometheusResponse).Warnings, splitRes.(*PrometheusResponse).Warnings) - } else { - require.NotEmpty(t, expectedRes.(*PrometheusResponse).Infos) - require.NotEmpty(t, shardedRes.(*PrometheusResponse).Infos) - require.NotEmpty(t, splitRes.(*PrometheusResponse).Infos) - require.Equal(t, expectedRes.(*PrometheusResponse).Infos, shardedRes.(*PrometheusResponse).Infos) - require.Equal(t, expectedRes.(*PrometheusResponse).Infos, splitRes.(*PrometheusResponse).Infos) + expected = expectedPrometheusRes.Warnings + actualSharded = shardedRes.(*PrometheusResponse).Warnings + actualSplit = splitRes.(*PrometheusResponse).Warnings } + + require.NotEmpty(t, expected) + require.Equal(t, expected, actualSplit) + + if template.isSharded { + // Remove position information from annotations generated with the unsharded query, to mirror what we expect from the sharded query. + removeAllAnnotationPositionInformation(expected) + } + + require.Equal(t, expected, actualSharded) }) } } @@ -2159,8 +2205,9 @@ func TestLongestRegexpMatcherBytes(t *testing.T) { } type downstreamHandler struct { - engine *promql.Engine - queryable storage.Queryable + engine *promql.Engine + queryable storage.Queryable + includePositionInformationInAnnotations bool } func (h *downstreamHandler) Do(ctx context.Context, r MetricsQueryRequest) (Response, error) { @@ -2182,7 +2229,14 @@ func (h *downstreamHandler) Do(ctx context.Context, r MetricsQueryRequest) (Resp Result: extracted, }, } - warnings, infos := res.Warnings.AsStrings("", 0, 0) + + qs := "" + + if h.includePositionInformationInAnnotations { + qs = r.GetQuery() + } + + warnings, infos := res.Warnings.AsStrings(qs, 0, 0) if len(warnings) > 0 { resp.Warnings = warnings } @@ -2464,3 +2518,24 @@ func newEngine() *promql.Engine { }, }) } + +func TestRemoveAnnotationPositionInformation(t *testing.T) { + testCases := map[string]string{ + "": "", + "foo": "foo", + "foo (1:1)": "foo", + "foo (123:456)": "foo", + "foo (1:1) (2:2)": "foo (1:1)", + "foo (1:1": "foo (1:1", + "foo (1:": "foo (1:", + "foo (1": "foo (1", + "foo (": "foo (", + "foo(1:1)": "foo(1:1)", + } + + for input, expectedOutput := range testCases { + t.Run(input, func(t *testing.T) { + require.Equal(t, expectedOutput, removeAnnotationPositionInformation(input)) + }) + } +} diff --git a/pkg/frontend/querymiddleware/split_by_instant_interval.go b/pkg/frontend/querymiddleware/split_by_instant_interval.go index 59f930f20b..bf10102ad9 100644 --- a/pkg/frontend/querymiddleware/split_by_instant_interval.go +++ b/pkg/frontend/querymiddleware/split_by_instant_interval.go @@ -202,9 +202,11 @@ func (s *splitInstantQueryByIntervalMiddleware) Do(ctx context.Context, req Metr warn, info := res.Warnings.AsStrings("", 0, 0) // Add any annotations returned by the sharded queries, and remove any duplicates. + // We remove any position information for the same reason as above: the position information + // relates to the rewritten expression sent to queriers, not the original expression provided by the user. accumulatedWarnings, accumulatedInfos := annotationAccumulator.getAll() - warn = append(warn, accumulatedWarnings...) - info = append(info, accumulatedInfos...) + warn = append(warn, removeAllAnnotationPositionInformation(accumulatedWarnings)...) + info = append(info, removeAllAnnotationPositionInformation(accumulatedInfos)...) warn = removeDuplicates(warn) info = removeDuplicates(info) diff --git a/pkg/frontend/querymiddleware/split_by_instant_interval_test.go b/pkg/frontend/querymiddleware/split_by_instant_interval_test.go index 6bc5999502..d544b24aff 100644 --- a/pkg/frontend/querymiddleware/split_by_instant_interval_test.go +++ b/pkg/frontend/querymiddleware/split_by_instant_interval_test.go @@ -516,8 +516,9 @@ func TestInstantQuerySplittingCorrectness(t *testing.T) { reg := prometheus.NewPedanticRegistry() engine := newEngine() downstream := &downstreamHandler{ - engine: engine, - queryable: queryable, + engine: engine, + queryable: queryable, + includePositionInformationInAnnotations: true, } // Run the query with the normal engine @@ -531,6 +532,12 @@ func TestInstantQuerySplittingCorrectness(t *testing.T) { require.NotEmpty(t, expectedPrometheusRes.Data.Result) requireValidSamples(t, expectedPrometheusRes.Data.Result) + if testData.expectedSplitQueries > 0 { + // Remove position information from annotations, to mirror what we expect from the split queries below. + removeAllAnnotationPositionInformation(expectedPrometheusRes.Infos) + removeAllAnnotationPositionInformation(expectedPrometheusRes.Warnings) + } + splittingware := newSplitInstantQueryByIntervalMiddleware(mockLimits{splitInstantQueriesInterval: 1 * time.Minute}, log.NewNopLogger(), engine, reg) // Run the query with splitting