Skip to content

Commit

Permalink
Mimir query engine: improve performance of binary operations (#8933)
Browse files Browse the repository at this point in the history
* Initial improvement

* Add fast path for common case

* Add changelog entry
  • Loading branch information
charleskorn authored Aug 8, 2024
1 parent 1c0ad91 commit 50444bf
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802
* [CHANGE] Ingester: circuit breakers do not open in case of per-instance limit errors anymore. Opening can be triggered only in case of push and pull requests exceeding the configured duration. #8854
* [CHANGE] Query-frontend: Return `413 Request Entity Too Large` if a response shard for an `/active_series` request is too large. #8861
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
54 changes: 42 additions & 12 deletions pkg/streamingpromql/operators/binary_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"math"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -180,7 +181,8 @@ func (b *BinaryOperation) loadSeriesMetadata(ctx context.Context) (bool, error)
// - a list indicating which series from the left side are needed to compute the output
// - a list indicating which series from the right side are needed to compute the output
func (b *BinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*binaryOperationOutputSeries, []bool, []bool, error) {
labelsFunc := b.labelsFunc()
labelsFunc := b.groupLabelsFunc()
groupKeyFunc := b.groupKeyFunc()
outputSeriesMap := map[string]*binaryOperationOutputSeries{}

// Use the smaller side to populate the map of possible output series first.
Expand All @@ -197,12 +199,12 @@ func (b *BinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*bina
}

for idx, s := range smallerSide {
groupLabels := labelsFunc(s.Labels).String()
series, exists := outputSeriesMap[groupLabels]
groupKey := groupKeyFunc(s.Labels)
series, exists := outputSeriesMap[string(groupKey)] // Important: don't extract the string(...) call here - passing it directly allows us to avoid allocating it.

if !exists {
series = &binaryOperationOutputSeries{}
outputSeriesMap[groupLabels] = series
outputSeriesMap[string(groupKey)] = series
}

if smallerSideIsLeftSide {
Expand All @@ -213,9 +215,10 @@ func (b *BinaryOperation) computeOutputSeries() ([]types.SeriesMetadata, []*bina
}

for idx, s := range largerSide {
groupLabels := labelsFunc(s.Labels).String()
groupKey := groupKeyFunc(s.Labels)

if series, exists := outputSeriesMap[groupLabels]; exists {
// Important: don't extract the string(...) call below - passing it directly allows us to avoid allocating it.
if series, exists := outputSeriesMap[string(groupKey)]; exists {
if smallerSideIsLeftSide {
// Currently iterating through right side.
series.rightSeriesIndices = append(series.rightSeriesIndices, idx)
Expand Down Expand Up @@ -348,8 +351,8 @@ func (g favourRightSideSorter) Less(i, j int) bool {
return g.series[i].latestLeftSeries() < g.series[j].latestLeftSeries()
}

// labelsFunc returns a function that computes the labels of the output group this series belongs to.
func (b *BinaryOperation) labelsFunc() func(labels.Labels) labels.Labels {
// groupLabelsFunc returns a function that computes the labels of the output group this series belongs to.
func (b *BinaryOperation) groupLabelsFunc() func(labels.Labels) labels.Labels {
lb := labels.NewBuilder(labels.EmptyLabels())

if b.VectorMatching.On {
Expand All @@ -362,12 +365,39 @@ func (b *BinaryOperation) labelsFunc() func(labels.Labels) labels.Labels {

return func(l labels.Labels) labels.Labels {
lb.Reset(l)
lb.Del(b.VectorMatching.MatchingLabels...)
lb.Del(labels.MetricName)
lb.Del(b.VectorMatching.MatchingLabels...)
return lb.Labels()
}
}

// groupKeyFunc returns a function that computes the grouping key of the output group this series belongs to.
func (b *BinaryOperation) groupKeyFunc() func(labels.Labels) []byte {
buf := make([]byte, 0, 1024)

if b.VectorMatching.On {
return func(l labels.Labels) []byte {
return l.BytesWithLabels(buf, b.VectorMatching.MatchingLabels...)
}
}

if len(b.VectorMatching.MatchingLabels) == 0 {
// Fast path for common case for expressions like "a + b" with no 'on' or 'without' labels.
return func(l labels.Labels) []byte {
return l.BytesWithoutLabels(buf, labels.MetricName)
}
}

lbls := make([]string, 0, len(b.VectorMatching.MatchingLabels)+1)
lbls = append(lbls, labels.MetricName)
lbls = append(lbls, b.VectorMatching.MatchingLabels...)
slices.Sort(lbls)

return func(l labels.Labels) []byte {
return l.BytesWithoutLabels(buf, lbls...)
}
}

func (b *BinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
if len(b.remainingSeries) == 0 {
return types.InstantVectorSeriesData{}, types.EOS
Expand Down Expand Up @@ -447,7 +477,7 @@ func (b *BinaryOperation) mergeOneSide(data []types.InstantVectorSeriesData, sou
if floats[idxFloats].T == histograms[idxHistograms].T {
// Conflict found
firstConflictingSeriesLabels := sourceSeriesMetadata[0].Labels
groupLabels := b.labelsFunc()(firstConflictingSeriesLabels)
groupLabels := b.groupLabelsFunc()(firstConflictingSeriesLabels)

return types.InstantVectorSeriesData{}, fmt.Errorf("found both float and histogram samples for the match group %s on the %s side of the operation at timestamp %s", groupLabels, side, timestamp.Time(floats[idxFloats].T).Format(time.RFC3339Nano))
}
Expand Down Expand Up @@ -563,7 +593,7 @@ func (b *BinaryOperation) mergeOneSideFloats(seriesGroupSide seriesForOneGroupSi
// Another series has a point with the same timestamp. We have a conflict.
firstConflictingSeriesLabels := sourceSeriesMetadata[seriesGroupSide.sourceSeriesIndices[sourceSeriesIndexInData]].Labels
secondConflictingSeriesLabels := sourceSeriesMetadata[seriesGroupSide.sourceSeriesIndices[seriesIndexInData]].Labels
groupLabels := b.labelsFunc()(firstConflictingSeriesLabels)
groupLabels := b.groupLabelsFunc()(firstConflictingSeriesLabels)

return nil, fmt.Errorf("found duplicate series for the match group %s on the %s side of the operation at timestamp %s: %s and %s", groupLabels, side, timestamp.Time(nextT).Format(time.RFC3339Nano), firstConflictingSeriesLabels, secondConflictingSeriesLabels)
}
Expand Down Expand Up @@ -684,7 +714,7 @@ func (b *BinaryOperation) mergeOneSideHistograms(seriesGroupSide seriesForOneGro
// Another series has a point with the same timestamp. We have a conflict.
firstConflictingSeriesLabels := sourceSeriesMetadata[seriesGroupSide.sourceSeriesIndices[sourceSeriesIndexInData]].Labels
secondConflictingSeriesLabels := sourceSeriesMetadata[seriesGroupSide.sourceSeriesIndices[seriesIndexInData]].Labels
groupLabels := b.labelsFunc()(firstConflictingSeriesLabels)
groupLabels := b.groupLabelsFunc()(firstConflictingSeriesLabels)

return nil, fmt.Errorf("found duplicate series for the match group %s on the %s side of the operation at timestamp %s: %s and %s", groupLabels, side, timestamp.Time(nextT).Format(time.RFC3339Nano), firstConflictingSeriesLabels, secondConflictingSeriesLabels)
}
Expand Down

0 comments on commit 50444bf

Please sign in to comment.