Skip to content

Commit

Permalink
Mimir query engine: add support for count_over_time, `present_over_…
Browse files Browse the repository at this point in the history
…time` and `last_over_time` (#8925)

* Add `count_over_time` function

* Add feature toggle for functions

* Add `present_over_time` function

* Add `last_over_time` function

* Add changelog entry

* Update CLI flag description and enable by default
  • Loading branch information
charleskorn authored Aug 8, 2024
1 parent 37ce120 commit b5098c5
Show file tree
Hide file tree
Showing 17 changed files with 266 additions and 102 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802
* [CHANGE] Ingester: circuit breakers do not open in case of per-instance limit errors anymore. Opening can be triggered only in case of push and pull requests exceeding the configured duration. #8854
* [CHANGE] Query-frontend: Return `413 Request Entity Too Large` if a response shard for an `/active_series` request is too large. #8861
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8932
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8925 #8932
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2010,6 +2010,17 @@
"fieldFlag": "querier.mimir-query-engine.enable-binary-operations",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "enable_over_time_functions",
"required": false,
"desc": "Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.mimir-query-engine.enable-over-time-functions",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1895,6 +1895,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of samples a single query can load into memory. This config option should be set on query-frontend too when query sharding is enabled. (default 50000000)
-querier.mimir-query-engine.enable-binary-operations
[experimental] Enable support for binary operations in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
-querier.mimir-query-engine.enable-over-time-functions
[experimental] Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
-querier.minimize-ingester-requests
If true, when querying ingesters, only the minimum required ingesters required to reach quorum will be queried initially, with other ingesters queried only if needed due to failures from the initial set of ingesters. Enabling this option reduces resource consumption for the happy path at the cost of increased latency for the unhappy path. (default true)
-querier.minimize-ingester-requests-hedging-delay duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,11 @@ mimir_query_engine:
# Only applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-binary-operations
[enable_binary_operations: <boolean> | default = true]
# (experimental) Enable support for ..._over_time functions in Mimir's query
# engine. Only applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-over-time-functions
[enable_over_time_functions: <boolean> | default = true]
```

### frontend
Expand Down
11 changes: 10 additions & 1 deletion pkg/streamingpromql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,24 @@ type EngineOpts struct {
}

type FeatureToggles struct {
EnableBinaryOperations bool `yaml:"enable_binary_operations" category:"experimental"`
EnableBinaryOperations bool `yaml:"enable_binary_operations" category:"experimental"`
EnableOverTimeFunctions bool `yaml:"enable_over_time_functions" category:"experimental"`
}

var overTimeFunctionNames = []string{
"count_over_time",
"last_over_time",
"present_over_time",
}

// EnableAllFeatures enables all features supported by MQE, including experimental or incomplete features.
var EnableAllFeatures = FeatureToggles{
// Note that we deliberately use a keyless literal here to force a compilation error if we don't keep this in sync with new fields added to FeatureToggles.
true,
true,
}

func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&t.EnableBinaryOperations, "querier.mimir-query-engine.enable-binary-operations", true, "Enable support for binary operations in Mimir's query engine. Only applies if the Mimir query engine is in use.")
f.BoolVar(&t.EnableOverTimeFunctions, "querier.mimir-query-engine.enable-over-time-functions", true, "Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use.")
}
8 changes: 8 additions & 0 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) {
requireRangeQueryIsUnsupported(t, featureToggles, "metric{} + other_metric{}", "binary expressions")
requireInstantQueryIsUnsupported(t, featureToggles, "metric{} + other_metric{}", "binary expressions")
})

t.Run("..._over_time functions", func(t *testing.T) {
featureToggles := EnableAllFeatures
featureToggles.EnableOverTimeFunctions = false

requireRangeQueryIsUnsupported(t, featureToggles, "count_over_time(metric[1m])", "'count_over_time' function")
requireInstantQueryIsUnsupported(t, featureToggles, "count_over_time(metric[1m])", "'count_over_time' function")
})
}

func requireRangeQueryIsUnsupported(t *testing.T, featureToggles FeatureToggles, expression string, expectedError string) {
Expand Down
59 changes: 32 additions & 27 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF
// - name: The name of the function
// - metadataFunc: The function for handling metadata
func InstantVectorLabelManipulationFunctionOperatorFactory(name string, metadataFunc functions.SeriesMetadataFunction) InstantVectorFunctionOperatorFactory {
return SingleInputVectorFunctionOperatorFactory(name, metadataFunc, functions.Passthrough)
return SingleInputVectorFunctionOperatorFactory(name, metadataFunc, functions.PassthroughData)
}

// FunctionOverRangeVectorOperatorFactory creates an InstantVectorFunctionOperatorFactory for functions
Expand Down Expand Up @@ -90,32 +90,37 @@ func FunctionOverRangeVectorOperatorFactory(

// These functions return an instant-vector.
var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{
"abs": InstantVectorTransformationFunctionOperatorFactory("abs", functions.Abs),
"acos": InstantVectorTransformationFunctionOperatorFactory("acos", functions.Acos),
"acosh": InstantVectorTransformationFunctionOperatorFactory("acosh", functions.Acosh),
"asin": InstantVectorTransformationFunctionOperatorFactory("asin", functions.Asin),
"asinh": InstantVectorTransformationFunctionOperatorFactory("asinh", functions.Asinh),
"atan": InstantVectorTransformationFunctionOperatorFactory("atan", functions.Atan),
"atanh": InstantVectorTransformationFunctionOperatorFactory("atanh", functions.Atanh),
"ceil": InstantVectorTransformationFunctionOperatorFactory("ceil", functions.Ceil),
"cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos),
"cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh),
"deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg),
"exp": InstantVectorTransformationFunctionOperatorFactory("exp", functions.Exp),
"floor": InstantVectorTransformationFunctionOperatorFactory("floor", functions.Floor),
"histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount),
"histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum),
"ln": InstantVectorTransformationFunctionOperatorFactory("ln", functions.Ln),
"log10": InstantVectorTransformationFunctionOperatorFactory("log10", functions.Log10),
"log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2),
"rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad),
"rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate),
"sgn": InstantVectorTransformationFunctionOperatorFactory("sgn", functions.Sgn),
"sin": InstantVectorTransformationFunctionOperatorFactory("sin", functions.Sin),
"sinh": InstantVectorTransformationFunctionOperatorFactory("sinh", functions.Sinh),
"sqrt": InstantVectorTransformationFunctionOperatorFactory("sqrt", functions.Sqrt),
"tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan),
"tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh),
// Please keep this list sorted alphabetically.

"abs": InstantVectorTransformationFunctionOperatorFactory("abs", functions.Abs),
"acos": InstantVectorTransformationFunctionOperatorFactory("acos", functions.Acos),
"acosh": InstantVectorTransformationFunctionOperatorFactory("acosh", functions.Acosh),
"asin": InstantVectorTransformationFunctionOperatorFactory("asin", functions.Asin),
"asinh": InstantVectorTransformationFunctionOperatorFactory("asinh", functions.Asinh),
"atan": InstantVectorTransformationFunctionOperatorFactory("atan", functions.Atan),
"atanh": InstantVectorTransformationFunctionOperatorFactory("atanh", functions.Atanh),
"ceil": InstantVectorTransformationFunctionOperatorFactory("ceil", functions.Ceil),
"cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos),
"cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh),
"count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime),
"deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg),
"exp": InstantVectorTransformationFunctionOperatorFactory("exp", functions.Exp),
"floor": InstantVectorTransformationFunctionOperatorFactory("floor", functions.Floor),
"histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount),
"histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum),
"last_over_time": FunctionOverRangeVectorOperatorFactory("last_over_time", functions.LastOverTime),
"ln": InstantVectorTransformationFunctionOperatorFactory("ln", functions.Ln),
"log10": InstantVectorTransformationFunctionOperatorFactory("log10", functions.Log10),
"log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2),
"present_over_time": FunctionOverRangeVectorOperatorFactory("present_over_time", functions.PresentOverTime),
"rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad),
"rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate),
"sgn": InstantVectorTransformationFunctionOperatorFactory("sgn", functions.Sgn),
"sin": InstantVectorTransformationFunctionOperatorFactory("sin", functions.Sin),
"sinh": InstantVectorTransformationFunctionOperatorFactory("sinh", functions.Sinh),
"sqrt": InstantVectorTransformationFunctionOperatorFactory("sqrt", functions.Sqrt),
"tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan),
"tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh),
}

func RegisterInstantVectorFunctionOperatorFactory(functionName string, factory InstantVectorFunctionOperatorFactory) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/streamingpromql/functions/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
// SeriesMetadataFunction is a function to operate on the metadata across series.
type SeriesMetadataFunction func(seriesMetadata []types.SeriesMetadata, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error)

func PassthroughSeriesMetadata(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) {
return seriesMetadata, nil
}

func DropSeriesName(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) {
for i := range seriesMetadata {
seriesMetadata[i].Labels = seriesMetadata[i].Labels.DropMetricName()
Expand Down Expand Up @@ -45,7 +49,7 @@ func FloatTransformationDropHistogramsFunc(transform func(f float64) float64) In
}
}

func Passthrough(seriesData types.InstantVectorSeriesData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
func PassthroughData(seriesData types.InstantVectorSeriesData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) {
return seriesData, nil
}

Expand Down
59 changes: 59 additions & 0 deletions pkg/streamingpromql/functions/range_vectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// SPDX-License-Identifier: AGPL-3.0-only

package functions

import (
"github.com/prometheus/prometheus/model/histogram"

"github.com/grafana/mimir/pkg/streamingpromql/types"
)

var CountOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
StepFunc: countOverTime,
}

func countOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error) {
fPointCount := fPoints.CountAtOrBefore(step.RangeEnd)
hPointCount := hPoints.CountAtOrBefore(step.RangeEnd)

if fPointCount == 0 && hPointCount == 0 {
return 0, false, nil, nil
}

return float64(fPointCount + hPointCount), true, nil, nil
}

var LastOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: PassthroughSeriesMetadata,
StepFunc: lastOverTime,
}

func lastOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error) {
lastFloat, floatAvailable := fPoints.LastAtOrBefore(step.RangeEnd)
lastHistogram, histogramAvailable := hPoints.LastAtOrBefore(step.RangeEnd)

if !floatAvailable && !histogramAvailable {
return 0, false, nil, nil
}

if floatAvailable && (!histogramAvailable || lastFloat.T > lastHistogram.T) {
return lastFloat.F, true, nil, nil
}

// We must make a copy of the histogram before returning it, as the ring buffer may reuse the FloatHistogram instance on subsequent steps.
return 0, false, lastHistogram.H.Copy(), nil
}

var PresentOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
StepFunc: presentOverTime,
}

func presentOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error) {
if fPoints.AnyAtOrBefore(step.RangeEnd) || hPoints.AnyAtOrBefore(step.RangeEnd) {
return 1, true, nil, nil
}

return 0, false, nil, nil
}
4 changes: 4 additions & 0 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV
}

func (q *Query) convertFunctionCallToOperator(e *parser.Call) (types.InstantVectorOperator, error) {
if !q.engine.featureToggles.EnableOverTimeFunctions && slices.Contains(overTimeFunctionNames, e.Func.Name) {
return nil, compat.NewNotSupportedError(fmt.Sprintf("'%s' function", e.Func.Name))
}

factory, ok := instantVectorFunctionOperatorFactories[e.Func.Name]
if !ok {
return nil, compat.NewNotSupportedError(fmt.Sprintf("'%s' function", e.Func.Name))
Expand Down
23 changes: 23 additions & 0 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,26 @@ eval range from 0 to 6m step 1m ceil(some_metric)

eval range from 0 to 6m step 1m floor(some_metric)
{env="prod"} 0 0 -1 NaN -NaN 2 -3

clear

load 1m
some_metric 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}}

eval range from 0 to 7m step 1m count_over_time(some_metric[3m])
{} 1 2 3 4 3 2 2 2

eval range from 0 to 7m step 1m count_over_time(some_metric[5s])
{} 1 1 1 1 _ _ 1 1

eval range from 0 to 7m step 1m last_over_time(some_metric[3m])
some_metric 0 1 2 3 3 3 {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}}

eval range from 0 to 7m step 1m last_over_time(some_metric[5s])
some_metric 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}}

eval range from 0 to 7m step 1m present_over_time(some_metric[3m])
{} 1 1 1 1 1 1 1 1

eval range from 0 to 7m step 1m present_over_time(some_metric[5s])
{} 1 1 1 1 _ _ 1 1
Loading

0 comments on commit b5098c5

Please sign in to comment.