Skip to content

Commit

Permalink
Don't return incorrect position information from annotations generate…
Browse files Browse the repository at this point in the history
…d by sharded query legs (#9536)

* Remove position information from annotations from sharded query legs

* Add changelog entry
  • Loading branch information
charleskorn authored Oct 8, 2024
1 parent a16f763 commit cabc3ee
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508
* [BUGFIX] Fix issue where sharded queries could return annotations with incorrect or confusing position information. #9536

### Mixin

Expand Down
23 changes: 21 additions & 2 deletions pkg/frontend/querymiddleware/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/grafana/regexp"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -175,9 +176,11 @@ func ExecuteQueryOnQueryable(ctx context.Context, r MetricsQueryRequest, engine

if annotationAccumulator != nil {
// Add any annotations returned by the sharded queries, and remove any duplicates.
// We remove any position information for the same reason as above: the position information
// relates to the rewritten expression sent to queriers, not the original expression provided by the user.
accumulatedWarnings, accumulatedInfos := annotationAccumulator.getAll()
warn = append(warn, accumulatedWarnings...)
info = append(info, accumulatedInfos...)
warn = append(warn, removeAllAnnotationPositionInformation(accumulatedWarnings)...)
info = append(info, removeAllAnnotationPositionInformation(accumulatedInfos)...)
warn = removeDuplicates(warn)
info = removeDuplicates(info)
}
Expand Down Expand Up @@ -574,3 +577,19 @@ func removeDuplicates(s []string) []string {
slices.Sort(s)
return slices.Compact(s)
}

var annotationPositionPattern = regexp.MustCompile(`\s+\(\d+:\d+\)$`)

func removeAnnotationPositionInformation(annotation string) string {
return annotationPositionPattern.ReplaceAllLiteralString(annotation, "")
}

// removeAllAnnotationPositionInformation removes position information from each annotation in annotations,
// modifying annotations in-place and returning it for convenience.
func removeAllAnnotationPositionInformation(annotations []string) []string {
for i, annotation := range annotations {
annotations[i] = removeAnnotationPositionInformation(annotation)
}

return annotations
}
147 changes: 111 additions & 36 deletions pkg/frontend/querymiddleware/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func approximatelyEqualsSamples(t *testing.T, a, b *PrometheusResponse) {
func approximatelyEquals(t *testing.T, a, b *PrometheusResponse) {
approximatelyEqualsSamples(t, a, b)
require.ElementsMatch(t, a.Infos, b.Infos, "expected same info annotations")
require.ElementsMatch(t, a.Warnings, b.Warnings, "expected same info annotations")
require.ElementsMatch(t, a.Warnings, b.Warnings, "expected same warning annotations")
}

func compareExpectedAndActual(t *testing.T, expectedTs, actualTs int64, expectedVal, actualVal float64, j int, labels []mimirpb.LabelAdapter, sampleType string, tolerance float64) {
Expand Down Expand Up @@ -707,8 +707,9 @@ func TestQuerySharding_Correctness(t *testing.T) {
t.Run(fmt.Sprintf("%T", req), func(t *testing.T) {
engine := newEngine()
downstream := &downstreamHandler{
engine: engine,
queryable: queryable,
engine: engine,
queryable: queryable,
includePositionInformationInAnnotations: true,
}

// Run the query without sharding.
Expand All @@ -723,6 +724,12 @@ func TestQuerySharding_Correctness(t *testing.T) {
require.NotEmpty(t, expectedPrometheusRes.Data.Result)
requireValidSamples(t, expectedPrometheusRes.Data.Result)

if testData.expectedShardedQueries > 0 {
// Remove position information from annotations, to mirror what we expect from the sharded queries below.
removeAllAnnotationPositionInformation(expectedPrometheusRes.Infos)
removeAllAnnotationPositionInformation(expectedPrometheusRes.Warnings)
}

for _, numShards := range []int{2, 4, 8, 16} {
t.Run(fmt.Sprintf("shards=%d", numShards), func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
Expand Down Expand Up @@ -1050,24 +1057,31 @@ func TestQuerySharding_FunctionCorrectness(t *testing.T) {
{fn: "histogram_stdvar"},
{fn: "histogram_stddev"},
}
queryableFloats := storageSeriesQueryable([]*promql.StorageSeries{
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "barr"), start.Add(-lookbackDelta), end, step, factor(5)),
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "bazz"), start.Add(-lookbackDelta), end, step, factor(7)),
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(12)),
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bozz"), start.Add(-lookbackDelta), end, step, factor(11)),
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(8)),
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bazz"), start.Add(-lookbackDelta), end, step, arithmeticSequence(10)),

t.Run("floats", func(t *testing.T) {
queryableFloats := storageSeriesQueryable([]*promql.StorageSeries{
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "barr"), start.Add(-lookbackDelta), end, step, factor(5)),
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "bazz"), start.Add(-lookbackDelta), end, step, factor(7)),
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(12)),
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bozz"), start.Add(-lookbackDelta), end, step, factor(11)),
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(8)),
newSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bazz"), start.Add(-lookbackDelta), end, step, arithmeticSequence(10)),
})
testQueryShardingFunctionCorrectness(t, queryableFloats, append(testsForBoth, testsForFloatsOnly...), testsForNativeHistogramsOnly)
})
testQueryShardingFunctionCorrectness(t, queryableFloats, append(testsForBoth, testsForFloatsOnly...), testsForNativeHistogramsOnly)
queryableNativeHistograms := storageSeriesQueryable([]*promql.StorageSeries{
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "barr"), start.Add(-lookbackDelta), end, step, factor(5)),
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "bazz"), start.Add(-lookbackDelta), end, step, factor(7)),
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(12)),
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bozz"), start.Add(-lookbackDelta), end, step, factor(11)),
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(8)),
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bazz"), start.Add(-lookbackDelta), end, step, arithmeticSequence(10)),

t.Run("native histograms", func(t *testing.T) {
queryableNativeHistograms := storageSeriesQueryable([]*promql.StorageSeries{
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "barr"), start.Add(-lookbackDelta), end, step, factor(5)),
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "bazz"), start.Add(-lookbackDelta), end, step, factor(7)),
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(12)),
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bozz"), start.Add(-lookbackDelta), end, step, factor(11)),
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blop", "foo", "buzz"), start.Add(-lookbackDelta), end, step, factor(8)),
newNativeHistogramSeries(labels.FromStrings("__name__", "bar1", "baz", "blip", "bar", "blap", "foo", "bazz"), start.Add(-lookbackDelta), end, step, arithmeticSequence(10)),
})

testQueryShardingFunctionCorrectness(t, queryableNativeHistograms, append(testsForBoth, testsForNativeHistogramsOnly...), testsForFloatsOnly)
})
testQueryShardingFunctionCorrectness(t, queryableNativeHistograms, append(testsForBoth, testsForNativeHistogramsOnly...), testsForFloatsOnly)
}

func testQueryShardingFunctionCorrectness(t *testing.T, queryable storage.Queryable, tests []queryShardingFunctionCorrectnessTest, testsToIgnore []queryShardingFunctionCorrectnessTest) {
Expand Down Expand Up @@ -1778,16 +1792,41 @@ func TestQuerySharding_Annotations(t *testing.T) {
reg,
)
downstream := &downstreamHandler{
engine: engine,
queryable: queryable,
engine: engine,
queryable: queryable,
includePositionInformationInAnnotations: true,
}

type template struct {
query string
isWarning bool
isSharded bool
}

templates := []template{{"quantile(10, %s)", true}, {"quantile(10, sum(%s))", true}, {"rate(%s[1m])", false}, {"increase(%s[1m])", false}}
templates := []template{
{
query: "quantile(10, %s)",
isWarning: true,
},
{
query: "quantile(10, sum(%s))",
isWarning: true,
isSharded: true,
},
{
query: "rate(%s[1m])",
isWarning: false,
},
{
query: "increase(%s[1m])",
isWarning: false,
},
{
query: "sum(rate(%s[1m]))",
isWarning: false,
isSharded: true,
},
}
for _, template := range templates {
t.Run(template.query, func(t *testing.T) {
query := fmt.Sprintf(template.query, seriesName)
Expand All @@ -1804,6 +1843,7 @@ func TestQuerySharding_Annotations(t *testing.T) {
// Run the query without sharding.
expectedRes, err := downstream.Do(injectedContext, req)
require.Nil(t, err)
expectedPrometheusRes := expectedRes.(*PrometheusResponse)

// Ensure the query produces some results.
require.NotEmpty(t, expectedRes.(*PrometheusResponse).Data.Result)
Expand All @@ -1822,19 +1862,25 @@ func TestQuerySharding_Annotations(t *testing.T) {
// Ensure the query produces some results.
require.NotEmpty(t, splitRes.(*PrometheusResponse).Data.Result)

expected := expectedPrometheusRes.Infos
actualSharded := shardedRes.(*PrometheusResponse).Infos
actualSplit := splitRes.(*PrometheusResponse).Infos

if template.isWarning {
require.NotEmpty(t, expectedRes.(*PrometheusResponse).Warnings)
require.NotEmpty(t, shardedRes.(*PrometheusResponse).Warnings)
require.NotEmpty(t, splitRes.(*PrometheusResponse).Warnings)
require.Equal(t, expectedRes.(*PrometheusResponse).Warnings, shardedRes.(*PrometheusResponse).Warnings)
require.Equal(t, expectedRes.(*PrometheusResponse).Warnings, splitRes.(*PrometheusResponse).Warnings)
} else {
require.NotEmpty(t, expectedRes.(*PrometheusResponse).Infos)
require.NotEmpty(t, shardedRes.(*PrometheusResponse).Infos)
require.NotEmpty(t, splitRes.(*PrometheusResponse).Infos)
require.Equal(t, expectedRes.(*PrometheusResponse).Infos, shardedRes.(*PrometheusResponse).Infos)
require.Equal(t, expectedRes.(*PrometheusResponse).Infos, splitRes.(*PrometheusResponse).Infos)
expected = expectedPrometheusRes.Warnings
actualSharded = shardedRes.(*PrometheusResponse).Warnings
actualSplit = splitRes.(*PrometheusResponse).Warnings
}

require.NotEmpty(t, expected)
require.Equal(t, expected, actualSplit)

if template.isSharded {
// Remove position information from annotations generated with the unsharded query, to mirror what we expect from the sharded query.
removeAllAnnotationPositionInformation(expected)
}

require.Equal(t, expected, actualSharded)
})
}
}
Expand Down Expand Up @@ -2159,8 +2205,9 @@ func TestLongestRegexpMatcherBytes(t *testing.T) {
}

type downstreamHandler struct {
engine *promql.Engine
queryable storage.Queryable
engine *promql.Engine
queryable storage.Queryable
includePositionInformationInAnnotations bool
}

func (h *downstreamHandler) Do(ctx context.Context, r MetricsQueryRequest) (Response, error) {
Expand All @@ -2182,7 +2229,14 @@ func (h *downstreamHandler) Do(ctx context.Context, r MetricsQueryRequest) (Resp
Result: extracted,
},
}
warnings, infos := res.Warnings.AsStrings("", 0, 0)

qs := ""

if h.includePositionInformationInAnnotations {
qs = r.GetQuery()
}

warnings, infos := res.Warnings.AsStrings(qs, 0, 0)
if len(warnings) > 0 {
resp.Warnings = warnings
}
Expand Down Expand Up @@ -2464,3 +2518,24 @@ func newEngine() *promql.Engine {
},
})
}

func TestRemoveAnnotationPositionInformation(t *testing.T) {
testCases := map[string]string{
"": "",
"foo": "foo",
"foo (1:1)": "foo",
"foo (123:456)": "foo",
"foo (1:1) (2:2)": "foo (1:1)",
"foo (1:1": "foo (1:1",
"foo (1:": "foo (1:",
"foo (1": "foo (1",
"foo (": "foo (",
"foo(1:1)": "foo(1:1)",
}

for input, expectedOutput := range testCases {
t.Run(input, func(t *testing.T) {
require.Equal(t, expectedOutput, removeAnnotationPositionInformation(input))
})
}
}
6 changes: 4 additions & 2 deletions pkg/frontend/querymiddleware/split_by_instant_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,11 @@ func (s *splitInstantQueryByIntervalMiddleware) Do(ctx context.Context, req Metr
warn, info := res.Warnings.AsStrings("", 0, 0)

// Add any annotations returned by the sharded queries, and remove any duplicates.
// We remove any position information for the same reason as above: the position information
// relates to the rewritten expression sent to queriers, not the original expression provided by the user.
accumulatedWarnings, accumulatedInfos := annotationAccumulator.getAll()
warn = append(warn, accumulatedWarnings...)
info = append(info, accumulatedInfos...)
warn = append(warn, removeAllAnnotationPositionInformation(accumulatedWarnings)...)
info = append(info, removeAllAnnotationPositionInformation(accumulatedInfos)...)
warn = removeDuplicates(warn)
info = removeDuplicates(info)

Expand Down
11 changes: 9 additions & 2 deletions pkg/frontend/querymiddleware/split_by_instant_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,9 @@ func TestInstantQuerySplittingCorrectness(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
engine := newEngine()
downstream := &downstreamHandler{
engine: engine,
queryable: queryable,
engine: engine,
queryable: queryable,
includePositionInformationInAnnotations: true,
}

// Run the query with the normal engine
Expand All @@ -531,6 +532,12 @@ func TestInstantQuerySplittingCorrectness(t *testing.T) {
require.NotEmpty(t, expectedPrometheusRes.Data.Result)
requireValidSamples(t, expectedPrometheusRes.Data.Result)

if testData.expectedSplitQueries > 0 {
// Remove position information from annotations, to mirror what we expect from the split queries below.
removeAllAnnotationPositionInformation(expectedPrometheusRes.Infos)
removeAllAnnotationPositionInformation(expectedPrometheusRes.Warnings)
}

splittingware := newSplitInstantQueryByIntervalMiddleware(mockLimits{splitInstantQueriesInterval: 1 * time.Minute}, log.NewNopLogger(), engine, reg)

// Run the query with splitting
Expand Down

0 comments on commit cabc3ee

Please sign in to comment.