diff --git a/CHANGELOG.md b/CHANGELOG.md index 92d64401e42..655339e6ab1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ * [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802 * [CHANGE] Ingester: circuit breakers do not open in case of per-instance limit errors anymore. Opening can be triggered only in case of push and pull requests exceeding the configured duration. #8854 * [CHANGE] Query-frontend: Return `413 Request Entity Too Large` if a response shard for an `/active_series` request is too large. #8861 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8932 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8925 #8932 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index e275bea4af8..f361d66ce4c 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -2010,6 +2010,17 @@ "fieldFlag": "querier.mimir-query-engine.enable-binary-operations", "fieldType": "boolean", "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "enable_over_time_functions", + "required": false, + "desc": "Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use.", + "fieldValue": null, + "fieldDefaultValue": true, + "fieldFlag": "querier.mimir-query-engine.enable-over-time-functions", + "fieldType": "boolean", + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 7e312d77f37..25b94ba74e3 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1895,6 +1895,8 @@ Usage of ./cmd/mimir/mimir: Maximum number of samples a single query can load into memory. This config option should be set on query-frontend too when query sharding is enabled. (default 50000000) -querier.mimir-query-engine.enable-binary-operations [experimental] Enable support for binary operations in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true) + -querier.mimir-query-engine.enable-over-time-functions + [experimental] Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true) -querier.minimize-ingester-requests If true, when querying ingesters, only the minimum required ingesters required to reach quorum will be queried initially, with other ingesters queried only if needed due to failures from the initial set of ingesters. Enabling this option reduces resource consumption for the happy path at the cost of increased latency for the unhappy path. (default true) -querier.minimize-ingester-requests-hedging-delay duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 386d36e7c4e..7720e5aa979 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1520,6 +1520,11 @@ mimir_query_engine: # Only applies if the Mimir query engine is in use. # CLI flag: -querier.mimir-query-engine.enable-binary-operations [enable_binary_operations: | default = true] + + # (experimental) Enable support for ..._over_time functions in Mimir's query + # engine. Only applies if the Mimir query engine is in use. + # CLI flag: -querier.mimir-query-engine.enable-over-time-functions + [enable_over_time_functions: | default = true] ``` ### frontend diff --git a/pkg/streamingpromql/config.go b/pkg/streamingpromql/config.go index 9f5ce87aca9..b0bbaaa3238 100644 --- a/pkg/streamingpromql/config.go +++ b/pkg/streamingpromql/config.go @@ -14,15 +14,24 @@ type EngineOpts struct { } type FeatureToggles struct { - EnableBinaryOperations bool `yaml:"enable_binary_operations" category:"experimental"` + EnableBinaryOperations bool `yaml:"enable_binary_operations" category:"experimental"` + EnableOverTimeFunctions bool `yaml:"enable_over_time_functions" category:"experimental"` +} + +var overTimeFunctionNames = []string{ + "count_over_time", + "last_over_time", + "present_over_time", } // EnableAllFeatures enables all features supported by MQE, including experimental or incomplete features. var EnableAllFeatures = FeatureToggles{ // Note that we deliberately use a keyless literal here to force a compilation error if we don't keep this in sync with new fields added to FeatureToggles. true, + true, } func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&t.EnableBinaryOperations, "querier.mimir-query-engine.enable-binary-operations", true, "Enable support for binary operations in Mimir's query engine. Only applies if the Mimir query engine is in use.") + f.BoolVar(&t.EnableOverTimeFunctions, "querier.mimir-query-engine.enable-over-time-functions", true, "Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use.") } diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 63ec485351a..62da170f4bd 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -86,6 +86,14 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) { requireRangeQueryIsUnsupported(t, featureToggles, "metric{} + other_metric{}", "binary expressions") requireInstantQueryIsUnsupported(t, featureToggles, "metric{} + other_metric{}", "binary expressions") }) + + t.Run("..._over_time functions", func(t *testing.T) { + featureToggles := EnableAllFeatures + featureToggles.EnableOverTimeFunctions = false + + requireRangeQueryIsUnsupported(t, featureToggles, "count_over_time(metric[1m])", "'count_over_time' function") + requireInstantQueryIsUnsupported(t, featureToggles, "count_over_time(metric[1m])", "'count_over_time' function") + }) } func requireRangeQueryIsUnsupported(t *testing.T, featureToggles FeatureToggles, expression string, expectedError string) { diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 8033576a2b5..aa09a2b0369 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -59,7 +59,7 @@ func InstantVectorTransformationFunctionOperatorFactory(name string, seriesDataF // - name: The name of the function // - metadataFunc: The function for handling metadata func InstantVectorLabelManipulationFunctionOperatorFactory(name string, metadataFunc functions.SeriesMetadataFunction) InstantVectorFunctionOperatorFactory { - return SingleInputVectorFunctionOperatorFactory(name, metadataFunc, functions.Passthrough) + return SingleInputVectorFunctionOperatorFactory(name, metadataFunc, functions.PassthroughData) } // FunctionOverRangeVectorOperatorFactory creates an InstantVectorFunctionOperatorFactory for functions @@ -90,32 +90,37 @@ func FunctionOverRangeVectorOperatorFactory( // These functions return an instant-vector. var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOperatorFactory{ - "abs": InstantVectorTransformationFunctionOperatorFactory("abs", functions.Abs), - "acos": InstantVectorTransformationFunctionOperatorFactory("acos", functions.Acos), - "acosh": InstantVectorTransformationFunctionOperatorFactory("acosh", functions.Acosh), - "asin": InstantVectorTransformationFunctionOperatorFactory("asin", functions.Asin), - "asinh": InstantVectorTransformationFunctionOperatorFactory("asinh", functions.Asinh), - "atan": InstantVectorTransformationFunctionOperatorFactory("atan", functions.Atan), - "atanh": InstantVectorTransformationFunctionOperatorFactory("atanh", functions.Atanh), - "ceil": InstantVectorTransformationFunctionOperatorFactory("ceil", functions.Ceil), - "cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos), - "cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh), - "deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg), - "exp": InstantVectorTransformationFunctionOperatorFactory("exp", functions.Exp), - "floor": InstantVectorTransformationFunctionOperatorFactory("floor", functions.Floor), - "histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount), - "histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum), - "ln": InstantVectorTransformationFunctionOperatorFactory("ln", functions.Ln), - "log10": InstantVectorTransformationFunctionOperatorFactory("log10", functions.Log10), - "log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2), - "rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad), - "rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate), - "sgn": InstantVectorTransformationFunctionOperatorFactory("sgn", functions.Sgn), - "sin": InstantVectorTransformationFunctionOperatorFactory("sin", functions.Sin), - "sinh": InstantVectorTransformationFunctionOperatorFactory("sinh", functions.Sinh), - "sqrt": InstantVectorTransformationFunctionOperatorFactory("sqrt", functions.Sqrt), - "tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan), - "tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh), + // Please keep this list sorted alphabetically. + + "abs": InstantVectorTransformationFunctionOperatorFactory("abs", functions.Abs), + "acos": InstantVectorTransformationFunctionOperatorFactory("acos", functions.Acos), + "acosh": InstantVectorTransformationFunctionOperatorFactory("acosh", functions.Acosh), + "asin": InstantVectorTransformationFunctionOperatorFactory("asin", functions.Asin), + "asinh": InstantVectorTransformationFunctionOperatorFactory("asinh", functions.Asinh), + "atan": InstantVectorTransformationFunctionOperatorFactory("atan", functions.Atan), + "atanh": InstantVectorTransformationFunctionOperatorFactory("atanh", functions.Atanh), + "ceil": InstantVectorTransformationFunctionOperatorFactory("ceil", functions.Ceil), + "cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos), + "cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh), + "count_over_time": FunctionOverRangeVectorOperatorFactory("count_over_time", functions.CountOverTime), + "deg": InstantVectorTransformationFunctionOperatorFactory("deg", functions.Deg), + "exp": InstantVectorTransformationFunctionOperatorFactory("exp", functions.Exp), + "floor": InstantVectorTransformationFunctionOperatorFactory("floor", functions.Floor), + "histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount), + "histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum), + "last_over_time": FunctionOverRangeVectorOperatorFactory("last_over_time", functions.LastOverTime), + "ln": InstantVectorTransformationFunctionOperatorFactory("ln", functions.Ln), + "log10": InstantVectorTransformationFunctionOperatorFactory("log10", functions.Log10), + "log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2), + "present_over_time": FunctionOverRangeVectorOperatorFactory("present_over_time", functions.PresentOverTime), + "rad": InstantVectorTransformationFunctionOperatorFactory("rad", functions.Rad), + "rate": FunctionOverRangeVectorOperatorFactory("rate", functions.Rate), + "sgn": InstantVectorTransformationFunctionOperatorFactory("sgn", functions.Sgn), + "sin": InstantVectorTransformationFunctionOperatorFactory("sin", functions.Sin), + "sinh": InstantVectorTransformationFunctionOperatorFactory("sinh", functions.Sinh), + "sqrt": InstantVectorTransformationFunctionOperatorFactory("sqrt", functions.Sqrt), + "tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan), + "tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh), } func RegisterInstantVectorFunctionOperatorFactory(functionName string, factory InstantVectorFunctionOperatorFactory) error { diff --git a/pkg/streamingpromql/functions/common.go b/pkg/streamingpromql/functions/common.go index c3f57c8adba..adca6d07b58 100644 --- a/pkg/streamingpromql/functions/common.go +++ b/pkg/streamingpromql/functions/common.go @@ -13,6 +13,10 @@ import ( // SeriesMetadataFunction is a function to operate on the metadata across series. type SeriesMetadataFunction func(seriesMetadata []types.SeriesMetadata, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) +func PassthroughSeriesMetadata(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) { + return seriesMetadata, nil +} + func DropSeriesName(seriesMetadata []types.SeriesMetadata, _ *limiting.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) { for i := range seriesMetadata { seriesMetadata[i].Labels = seriesMetadata[i].Labels.DropMetricName() @@ -45,7 +49,7 @@ func FloatTransformationDropHistogramsFunc(transform func(f float64) float64) In } } -func Passthrough(seriesData types.InstantVectorSeriesData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { +func PassthroughData(seriesData types.InstantVectorSeriesData, _ *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, error) { return seriesData, nil } diff --git a/pkg/streamingpromql/functions/range_vectors.go b/pkg/streamingpromql/functions/range_vectors.go new file mode 100644 index 00000000000..7ce38ba265e --- /dev/null +++ b/pkg/streamingpromql/functions/range_vectors.go @@ -0,0 +1,59 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package functions + +import ( + "github.com/prometheus/prometheus/model/histogram" + + "github.com/grafana/mimir/pkg/streamingpromql/types" +) + +var CountOverTime = FunctionOverRangeVector{ + SeriesMetadataFunc: DropSeriesName, + StepFunc: countOverTime, +} + +func countOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error) { + fPointCount := fPoints.CountAtOrBefore(step.RangeEnd) + hPointCount := hPoints.CountAtOrBefore(step.RangeEnd) + + if fPointCount == 0 && hPointCount == 0 { + return 0, false, nil, nil + } + + return float64(fPointCount + hPointCount), true, nil, nil +} + +var LastOverTime = FunctionOverRangeVector{ + SeriesMetadataFunc: PassthroughSeriesMetadata, + StepFunc: lastOverTime, +} + +func lastOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error) { + lastFloat, floatAvailable := fPoints.LastAtOrBefore(step.RangeEnd) + lastHistogram, histogramAvailable := hPoints.LastAtOrBefore(step.RangeEnd) + + if !floatAvailable && !histogramAvailable { + return 0, false, nil, nil + } + + if floatAvailable && (!histogramAvailable || lastFloat.T > lastHistogram.T) { + return lastFloat.F, true, nil, nil + } + + // We must make a copy of the histogram before returning it, as the ring buffer may reuse the FloatHistogram instance on subsequent steps. + return 0, false, lastHistogram.H.Copy(), nil +} + +var PresentOverTime = FunctionOverRangeVector{ + SeriesMetadataFunc: DropSeriesName, + StepFunc: presentOverTime, +} + +func presentOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error) { + if fPoints.AnyAtOrBefore(step.RangeEnd) || hPoints.AnyAtOrBefore(step.RangeEnd) { + return 1, true, nil, nil + } + + return 0, false, nil, nil +} diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index e5e12b4fb78..a443018bb55 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -204,6 +204,10 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV } func (q *Query) convertFunctionCallToOperator(e *parser.Call) (types.InstantVectorOperator, error) { + if !q.engine.featureToggles.EnableOverTimeFunctions && slices.Contains(overTimeFunctionNames, e.Func.Name) { + return nil, compat.NewNotSupportedError(fmt.Sprintf("'%s' function", e.Func.Name)) + } + factory, ok := instantVectorFunctionOperatorFactories[e.Func.Name] if !ok { return nil, compat.NewNotSupportedError(fmt.Sprintf("'%s' function", e.Func.Name)) diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index a5d689ea4bf..6f99c0881d3 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -63,3 +63,26 @@ eval range from 0 to 6m step 1m ceil(some_metric) eval range from 0 to 6m step 1m floor(some_metric) {env="prod"} 0 0 -1 NaN -NaN 2 -3 + +clear + +load 1m + some_metric 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}} + +eval range from 0 to 7m step 1m count_over_time(some_metric[3m]) + {} 1 2 3 4 3 2 2 2 + +eval range from 0 to 7m step 1m count_over_time(some_metric[5s]) + {} 1 1 1 1 _ _ 1 1 + +eval range from 0 to 7m step 1m last_over_time(some_metric[3m]) + some_metric 0 1 2 3 3 3 {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}} + +eval range from 0 to 7m step 1m last_over_time(some_metric[5s]) + some_metric 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}} + +eval range from 0 to 7m step 1m present_over_time(some_metric[3m]) + {} 1 1 1 1 1 1 1 1 + +eval range from 0 to 7m step 1m present_over_time(some_metric[5s]) + {} 1 1 1 1 _ _ 1 1 diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index 6f428f842eb..f29d8e2ae20 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -1135,13 +1135,12 @@ load 10s # {type="some_nan3"} 1 # {type="only_nan"} NaN -# Unsupported by streaming engine. -# eval instant at 1m last_over_time(data[1m]) -# data{type="numbers"} 3 -# data{type="some_nan"} NaN -# data{type="some_nan2"} 1 -# data{type="some_nan3"} 1 -# data{type="only_nan"} NaN +eval instant at 1m last_over_time(data[1m]) + data{type="numbers"} 3 + data{type="some_nan"} NaN + data{type="some_nan2"} 1 + data{type="some_nan3"} 1 + data{type="only_nan"} NaN clear @@ -1292,23 +1291,18 @@ load 1m clear # Testdata for present_over_time() -# Unsupported by streaming engine. -# eval instant at 1m present_over_time(http_requests[5m]) +eval instant at 1m present_over_time(http_requests[5m]) -# Unsupported by streaming engine. -# eval instant at 1m present_over_time(http_requests{handler="/foo"}[5m]) +eval instant at 1m present_over_time(http_requests{handler="/foo"}[5m]) -# Unsupported by streaming engine. -# eval instant at 1m present_over_time(http_requests{handler!="/foo"}[5m]) +eval instant at 1m present_over_time(http_requests{handler!="/foo"}[5m]) -# Unsupported by streaming engine. -# eval instant at 1m present_over_time(http_requests{handler="/foo", handler="/bar", handler="/foobar"}[5m]) +eval instant at 1m present_over_time(http_requests{handler="/foo", handler="/bar", handler="/foobar"}[5m]) # Unsupported by streaming engine. # eval instant at 1m present_over_time(rate(nonexistant[5m])[5m:]) -# Unsupported by streaming engine. -# eval instant at 1m present_over_time(http_requests{handler="/foo", handler="/bar", instance="127.0.0.1"}[5m]) +eval instant at 1m present_over_time(http_requests{handler="/foo", handler="/bar", instance="127.0.0.1"}[5m]) load 1m http_requests{path="/foo",instance="127.0.0.1",job="httpd"} 1+1x10 @@ -1317,56 +1311,45 @@ load 1m httpd_log_lines_total{instance="127.0.0.1",job="node"} 1 ssl_certificate_expiry_seconds{job="ingress"} NaN NaN NaN NaN NaN -# Unsupported by streaming engine. -# eval instant at 5m present_over_time(http_requests[5m]) -# {instance="127.0.0.1", job="httpd", path="/bar"} 1 -# {instance="127.0.0.1", job="httpd", path="/foo"} 1 +eval instant at 5m present_over_time(http_requests[5m]) + {instance="127.0.0.1", job="httpd", path="/bar"} 1 + {instance="127.0.0.1", job="httpd", path="/foo"} 1 # Unsupported by streaming engine. # eval instant at 5m present_over_time(rate(http_requests[5m])[5m:1m]) # {instance="127.0.0.1", job="httpd", path="/bar"} 1 # {instance="127.0.0.1", job="httpd", path="/foo"} 1 -# Unsupported by streaming engine. -# eval instant at 0m present_over_time(httpd_log_lines_total[30s]) -# {instance="127.0.0.1",job="node"} 1 +eval instant at 0m present_over_time(httpd_log_lines_total[30s]) + {instance="127.0.0.1",job="node"} 1 -# Unsupported by streaming engine. -# eval instant at 1m present_over_time(httpd_log_lines_total[30s]) +eval instant at 1m present_over_time(httpd_log_lines_total[30s]) -# Unsupported by streaming engine. -# eval instant at 15m present_over_time(http_requests[5m]) -# {instance="127.0.0.1", job="httpd", path="/bar"} 1 -# {instance="127.0.0.1", job="httpd", path="/foo"} 1 +eval instant at 15m present_over_time(http_requests[5m]) + {instance="127.0.0.1", job="httpd", path="/bar"} 1 + {instance="127.0.0.1", job="httpd", path="/foo"} 1 -# Unsupported by streaming engine. -# eval instant at 16m present_over_time(http_requests[5m]) +eval instant at 16m present_over_time(http_requests[5m]) -# Unsupported by streaming engine. -# eval instant at 16m present_over_time(http_requests[6m]) -# {instance="127.0.0.1", job="httpd", path="/bar"} 1 -# {instance="127.0.0.1", job="httpd", path="/foo"} 1 +eval instant at 16m present_over_time(http_requests[6m]) + {instance="127.0.0.1", job="httpd", path="/bar"} 1 + {instance="127.0.0.1", job="httpd", path="/foo"} 1 -# Unsupported by streaming engine. -# eval instant at 16m present_over_time(httpd_handshake_failures_total[1m]) -# {instance="127.0.0.1", job="node"} 1 +eval instant at 16m present_over_time(httpd_handshake_failures_total[1m]) + {instance="127.0.0.1", job="node"} 1 -# Unsupported by streaming engine. -# eval instant at 16m present_over_time({instance="127.0.0.1"}[5m]) -# {instance="127.0.0.1",job="node"} 1 +eval instant at 16m present_over_time({instance="127.0.0.1"}[5m]) + {instance="127.0.0.1",job="node"} 1 -# Unsupported by streaming engine. -# eval instant at 21m present_over_time({job="grok"}[20m]) +eval instant at 21m present_over_time({job="grok"}[20m]) # Unsupported by streaming engine. # eval instant at 30m present_over_time({instance="127.0.0.1"}[5m:5s]) -# Unsupported by streaming engine. -# eval instant at 5m present_over_time({job="ingress"}[4m]) -# {job="ingress"} 1 +eval instant at 5m present_over_time({job="ingress"}[4m]) + {job="ingress"} 1 -# Unsupported by streaming engine. -# eval instant at 10m present_over_time({job="ingress"}[4m]) +eval instant at 10m present_over_time({job="ingress"}[4m]) clear diff --git a/pkg/streamingpromql/testdata/upstream/native_histograms.test b/pkg/streamingpromql/testdata/upstream/native_histograms.test index 86e86af7d5b..fa4578714e7 100644 --- a/pkg/streamingpromql/testdata/upstream/native_histograms.test +++ b/pkg/streamingpromql/testdata/upstream/native_histograms.test @@ -303,13 +303,11 @@ load 5m incr_sum_histogram{number="1"} {{schema:0 sum:0 count:0 buckets:[1]}}+{{schema:0 sum:1 count:1 buckets:[1]}}x10 incr_sum_histogram{number="2"} {{schema:0 sum:0 count:0 buckets:[1]}}+{{schema:0 sum:2 count:1 buckets:[1]}}x10 -# Unsupported by streaming engine. -# eval instant at 50m histogram_sum(sum(incr_sum_histogram)) -# {} 30 +eval instant at 50m histogram_sum(sum(incr_sum_histogram)) + {} 30 -# Unsupported by streaming engine. -# eval instant at 50m histogram_sum(sum(last_over_time(incr_sum_histogram[5m]))) -# {} 30 +eval instant at 50m histogram_sum(sum(last_over_time(incr_sum_histogram[5m]))) + {} 30 # Apply rate function to histogram. load 15s diff --git a/pkg/streamingpromql/testdata/upstream/staleness.test b/pkg/streamingpromql/testdata/upstream/staleness.test index 9f6081638ba..76224047a3c 100644 --- a/pkg/streamingpromql/testdata/upstream/staleness.test +++ b/pkg/streamingpromql/testdata/upstream/staleness.test @@ -26,24 +26,19 @@ eval instant at 331s metric # Range vector ignores stale sample. -# Unsupported by streaming engine. -# eval instant at 30s count_over_time(metric[1m]) -# {} 3 +eval instant at 30s count_over_time(metric[1m]) + {} 3 -# Unsupported by streaming engine. -# eval instant at 10s count_over_time(metric[1s]) -# {} 1 +eval instant at 10s count_over_time(metric[1s]) + {} 1 -# Unsupported by streaming engine. -# eval instant at 20s count_over_time(metric[1s]) +eval instant at 20s count_over_time(metric[1s]) -# Unsupported by streaming engine. -# eval instant at 20s count_over_time(metric[10s]) -# {} 1 +eval instant at 20s count_over_time(metric[10s]) + {} 1 -# Unsupported by streaming engine. -# eval instant at 20s count_over_time(metric[10]) -# {} 1 +eval instant at 20s count_over_time(metric[10]) + {} 1 clear diff --git a/pkg/streamingpromql/types/fpoint_ring_buffer.go b/pkg/streamingpromql/types/fpoint_ring_buffer.go index cff0182d1ed..fc1335567a3 100644 --- a/pkg/streamingpromql/types/fpoint_ring_buffer.go +++ b/pkg/streamingpromql/types/fpoint_ring_buffer.go @@ -190,6 +190,32 @@ func (b *FPointRingBuffer) LastAtOrBefore(maxT int64) (promql.FPoint, bool) { return promql.FPoint{}, false } +// CountAtOrBefore returns the number of points in this ring buffer with timestamp less than or equal to maxT. +func (b *FPointRingBuffer) CountAtOrBefore(maxT int64) int { + count := b.size + + for count > 0 { + p := b.points[(b.firstIndex+count-1)%len(b.points)] + + if p.T <= maxT { + return count + } + + count-- + } + + return count +} + +// AnyAtOrBefore returns true if this ring buffer contains any points with timestamp less than or equal to maxT. +func (b *FPointRingBuffer) AnyAtOrBefore(maxT int64) bool { + if b.size == 0 { + return false + } + + return b.points[b.firstIndex].T <= maxT +} + // These hooks exist so we can override them during unit tests. var getFPointSliceForRingBuffer = FPointSlicePool.Get var putFPointSliceForRingBuffer = FPointSlicePool.Put diff --git a/pkg/streamingpromql/types/hpoint_ring_buffer.go b/pkg/streamingpromql/types/hpoint_ring_buffer.go index 2ef33438ffa..945db554d16 100644 --- a/pkg/streamingpromql/types/hpoint_ring_buffer.go +++ b/pkg/streamingpromql/types/hpoint_ring_buffer.go @@ -241,6 +241,32 @@ func (b *HPointRingBuffer) LastAtOrBefore(maxT int64) (promql.HPoint, bool) { return promql.HPoint{}, false } +// CountAtOrBefore returns the number of points in this ring buffer with timestamp less than or equal to maxT. +func (b *HPointRingBuffer) CountAtOrBefore(maxT int64) int { + count := b.size + + for count > 0 { + p := b.points[(b.firstIndex+count-1)%len(b.points)] + + if p.T <= maxT { + return count + } + + count-- + } + + return count +} + +// AnyAtOrBefore returns true if this ring buffer contains any points with timestamp less than or equal to maxT. +func (b *HPointRingBuffer) AnyAtOrBefore(maxT int64) bool { + if b.size == 0 { + return false + } + + return b.points[b.firstIndex].T <= maxT +} + // These hooks exist so we can override them during unit tests. var getHPointSliceForRingBuffer = HPointSlicePool.Get var putHPointSliceForRingBuffer = HPointSlicePool.Put diff --git a/pkg/streamingpromql/types/ring_buffer_test.go b/pkg/streamingpromql/types/ring_buffer_test.go index ff4c7367ea4..1e34b0d24c9 100644 --- a/pkg/streamingpromql/types/ring_buffer_test.go +++ b/pkg/streamingpromql/types/ring_buffer_test.go @@ -22,6 +22,8 @@ type ringBuffer[T any] interface { UnsafePoints(maxT int64) (head []T, tail []T) CopyPoints(maxT int64) ([]T, error) LastAtOrBefore(maxT int64) (T, bool) + CountAtOrBefore(maxT int64) int + AnyAtOrBefore(maxT int64) bool First() T Reset() GetPoints() []T @@ -113,7 +115,7 @@ func testRingBuffer[T any](t *testing.T, buf ringBuffer[T], points []T) { func TestRingBuffer_DiscardPointsBefore_ThroughWrapAround(t *testing.T) { setupRingBufferTestingPools(t) - t.Run("test FPoint ring buffer", func(t *testing.T) { + t.Run("test FPointRingBuffer", func(t *testing.T) { points := []promql.FPoint{ {T: 1, F: 100}, {T: 2, F: 200}, @@ -126,7 +128,7 @@ func TestRingBuffer_DiscardPointsBefore_ThroughWrapAround(t *testing.T) { testDiscardPointsBeforeThroughWrapAround(t, buf, points) }) - t.Run("test HPoint ring buffer", func(t *testing.T) { + t.Run("test HPointRingBuffer", func(t *testing.T) { points := []promql.HPoint{ {T: 1, H: &histogram.FloatHistogram{Count: 100}}, {T: 2, H: &histogram.FloatHistogram{Count: 200}}, @@ -170,7 +172,7 @@ func testDiscardPointsBeforeThroughWrapAround[T any](t *testing.T, buf ringBuffe shouldHavePoints(t, buf, points[5]) } -func TestRemoveLastPoint(t *testing.T) { +func TestRingBuffer_RemoveLastPoint(t *testing.T) { setupRingBufferTestingPools(t) points := []promql.HPoint{ @@ -300,10 +302,14 @@ func shouldHavePointsAtOrBeforeTime[T any](t *testing.T, buf ringBuffer[T], ts i if len(expected) == 0 { require.Len(t, combinedPoints, 0) + require.False(t, buf.AnyAtOrBefore(ts)) } else { require.Equal(t, expected, combinedPoints) + require.True(t, buf.AnyAtOrBefore(ts)) } + require.Equal(t, len(expected), buf.CountAtOrBefore(ts)) + copiedPoints, err := buf.CopyPoints(ts) require.NoError(t, err) require.Equal(t, expected, copiedPoints)