From 50444bf9777253480c7790d92bf17108398a6c73 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 8 Aug 2024 16:41:10 +1000 Subject: [PATCH] Mimir query engine: improve performance of binary operations (#8933) * Initial improvement * Add fast path for common case * Add changelog entry --- CHANGELOG.md | 2 +- .../operators/binary_operation.go | 54 ++++++++++++++----- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73a2948a6d0..58a057b3d90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ * [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802 * [CHANGE] Ingester: circuit breakers do not open in case of per-instance limit errors anymore. Opening can be triggered only in case of push and pull requests exceeding the configured duration. #8854 * [CHANGE] Query-frontend: Return `413 Request Entity Too Large` if a response shard for an `/active_series` request is too large. #8861 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. diff --git a/pkg/streamingpromql/operators/binary_operation.go b/pkg/streamingpromql/operators/binary_operation.go index a589e34c490..9cbe729c338 100644 --- a/pkg/streamingpromql/operators/binary_operation.go +++ b/pkg/streamingpromql/operators/binary_operation.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "math" + "slices" "sort" "time" @@ -180,7 +181,8 @@ func (b *BinaryOperation) loadSeriesMetadata(ctx context.Context) (bool, error) // - a list indicating which series from the left side are needed to compute the output // - a list indicating which series from the right side are needed to compute the output func (b *BinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*binaryOperationOutputSeries, []bool, []bool, error) { - labelsFunc := b.labelsFunc() + labelsFunc := b.groupLabelsFunc() + groupKeyFunc := b.groupKeyFunc() outputSeriesMap := map[string]*binaryOperationOutputSeries{} // Use the smaller side to populate the map of possible output series first. @@ -197,12 +199,12 @@ func (b *BinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*bina } for idx, s := range smallerSide { - groupLabels := labelsFunc(s.Labels).String() - series, exists := outputSeriesMap[groupLabels] + groupKey := groupKeyFunc(s.Labels) + series, exists := outputSeriesMap[string(groupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it. if !exists { series = &binaryOperationOutputSeries{} - outputSeriesMap[groupLabels] = series + outputSeriesMap[string(groupKey)] = series } if smallerSideIsLeftSide { @@ -213,9 +215,10 @@ func (b *BinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*bina } for idx, s := range largerSide { - groupLabels := labelsFunc(s.Labels).String() + groupKey := groupKeyFunc(s.Labels) - if series, exists := outputSeriesMap[groupLabels]; exists { + // Important: don't extract the string(...) call below - passing it directly allows us to avoid allocating it. + if series, exists := outputSeriesMap[string(groupKey)]; exists { if smallerSideIsLeftSide { // Currently iterating through right side. series.rightSeriesIndices = append(series.rightSeriesIndices, idx) @@ -348,8 +351,8 @@ func (g favourRightSideSorter) Less(i, j int) bool { return g.series[i].latestLeftSeries() < g.series[j].latestLeftSeries() } -// labelsFunc returns a function that computes the labels of the output group this series belongs to. -func (b *BinaryOperation) labelsFunc() func(labels.Labels) labels.Labels { +// groupLabelsFunc returns a function that computes the labels of the output group this series belongs to. +func (b *BinaryOperation) groupLabelsFunc() func(labels.Labels) labels.Labels { lb := labels.NewBuilder(labels.EmptyLabels()) if b.VectorMatching.On { @@ -362,12 +365,39 @@ func (b *BinaryOperation) labelsFunc() func(labels.Labels) labels.Labels { return func(l labels.Labels) labels.Labels { lb.Reset(l) - lb.Del(b.VectorMatching.MatchingLabels...) lb.Del(labels.MetricName) + lb.Del(b.VectorMatching.MatchingLabels...) return lb.Labels() } } +// groupKeyFunc returns a function that computes the grouping key of the output group this series belongs to. +func (b *BinaryOperation) groupKeyFunc() func(labels.Labels) []byte { + buf := make([]byte, 0, 1024) + + if b.VectorMatching.On { + return func(l labels.Labels) []byte { + return l.BytesWithLabels(buf, b.VectorMatching.MatchingLabels...) + } + } + + if len(b.VectorMatching.MatchingLabels) == 0 { + // Fast path for common case for expressions like "a + b" with no 'on' or 'without' labels. + return func(l labels.Labels) []byte { + return l.BytesWithoutLabels(buf, labels.MetricName) + } + } + + lbls := make([]string, 0, len(b.VectorMatching.MatchingLabels)+1) + lbls = append(lbls, labels.MetricName) + lbls = append(lbls, b.VectorMatching.MatchingLabels...) + slices.Sort(lbls) + + return func(l labels.Labels) []byte { + return l.BytesWithoutLabels(buf, lbls...) + } +} + func (b *BinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { if len(b.remainingSeries) == 0 { return types.InstantVectorSeriesData{}, types.EOS @@ -447,7 +477,7 @@ func (b *BinaryOperation) mergeOneSide(data []types.InstantVectorSeriesData, sou if floats[idxFloats].T == histograms[idxHistograms].T { // Conflict found firstConflictingSeriesLabels := sourceSeriesMetadata[0].Labels - groupLabels := b.labelsFunc()(firstConflictingSeriesLabels) + groupLabels := b.groupLabelsFunc()(firstConflictingSeriesLabels) return types.InstantVectorSeriesData{}, fmt.Errorf("found both float and histogram samples for the match group %s on the %s side of the operation at timestamp %s", groupLabels, side, timestamp.Time(floats[idxFloats].T).Format(time.RFC3339Nano)) } @@ -563,7 +593,7 @@ func (b *BinaryOperation) mergeOneSideFloats(seriesGroupSide seriesForOneGroupSi // Another series has a point with the same timestamp. We have a conflict. firstConflictingSeriesLabels := sourceSeriesMetadata[seriesGroupSide.sourceSeriesIndices[sourceSeriesIndexInData]].Labels secondConflictingSeriesLabels := sourceSeriesMetadata[seriesGroupSide.sourceSeriesIndices[seriesIndexInData]].Labels - groupLabels := b.labelsFunc()(firstConflictingSeriesLabels) + groupLabels := b.groupLabelsFunc()(firstConflictingSeriesLabels) return nil, fmt.Errorf("found duplicate series for the match group %s on the %s side of the operation at timestamp %s: %s and %s", groupLabels, side, timestamp.Time(nextT).Format(time.RFC3339Nano), firstConflictingSeriesLabels, secondConflictingSeriesLabels) } @@ -684,7 +714,7 @@ func (b *BinaryOperation) mergeOneSideHistograms(seriesGroupSide seriesForOneGro // Another series has a point with the same timestamp. We have a conflict. firstConflictingSeriesLabels := sourceSeriesMetadata[seriesGroupSide.sourceSeriesIndices[sourceSeriesIndexInData]].Labels secondConflictingSeriesLabels := sourceSeriesMetadata[seriesGroupSide.sourceSeriesIndices[seriesIndexInData]].Labels - groupLabels := b.labelsFunc()(firstConflictingSeriesLabels) + groupLabels := b.groupLabelsFunc()(firstConflictingSeriesLabels) return nil, fmt.Errorf("found duplicate series for the match group %s on the %s side of the operation at timestamp %s: %s and %s", groupLabels, side, timestamp.Time(nextT).Format(time.RFC3339Nano), firstConflictingSeriesLabels, secondConflictingSeriesLabels) }