Skip to content

Commit

Permalink
Add support for ingesting out-of-order native histograms (#7175)
Browse files Browse the repository at this point in the history
* Add ooo native histograms flag

Update CHANGELOG and versioning doc
  • Loading branch information
fionaliao authored Sep 30, 2024
1 parent a8a52dc commit f2cff61
Show file tree
Hide file tree
Showing 19 changed files with 1,075 additions and 187 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* `-ruler.client.grpc-compression=s2`
* `-ruler.query-frontend.grpc-client-config.grpc-compression=s2`
* [FEATURE] Alertmanager: limit added for maximum size of the Grafana configuration (`-alertmanager.max-config-size-bytes`). #9402
* [FEATURE] Ingester: Experimental support for ingesting out-of-order native histograms. This is disabled by default and can be enabled by setting `-ingester.ooo-native-histograms-ingestion-enabled` to `true`. #7175
* [ENHANCEMENT] Ruler: Support `exclude_alerts` parameter in `<prometheus-http-prefix>/api/v1/rules` endpoint. #9300
* [ENHANCEMENT] Distributor: add a metric to track tenants who are sending newlines in their label values called `cortex_distributor_label_values_with_newlines_total`. #9400

Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -3940,6 +3940,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "ooo_native_histograms_ingestion_enabled",
"required": false,
"desc": "Enable experimental out-of-order native histogram ingestion. This only takes effect if the `-ingester.out-of-order-time-window` value is greater than zero and if `-ingester.native-histograms-ingestion-enabled = true`",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "ingester.ooo-native-histograms-ingestion-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "active_series_custom_trackers",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,8 @@ Usage of ./cmd/mimir/mimir:
Period at which metadata we have not seen will remain in memory before being deleted. (default 10m0s)
-ingester.native-histograms-ingestion-enabled
[experimental] Enable ingestion of native histogram samples. If false, native histogram samples are ignored without an error. To query native histograms with query-sharding enabled make sure to set -query-frontend.query-result-response-format to 'protobuf'.
-ingester.ooo-native-histograms-ingestion-enabled
[experimental] Enable experimental out-of-order native histogram ingestion. This only takes effect if the `-ingester.out-of-order-time-window` value is greater than zero and if `-ingester.native-histograms-ingestion-enabled = true`
-ingester.out-of-order-blocks-external-label-enabled
[experimental] Whether the shipper should label out-of-order blocks with an external label before uploading them. Setting this label will compact out-of-order blocks separately from non-out-of-order blocks
-ingester.out-of-order-time-window duration
Expand Down
3 changes: 2 additions & 1 deletion docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ The following features are currently experimental:
- Ingester
- Add variance to chunks end time to spread writing across time (`-blocks-storage.tsdb.head-chunks-end-time-variance`)
- Snapshotting of in-memory TSDB data on disk when shutting down (`-blocks-storage.tsdb.memory-snapshot-on-shutdown`)
- Out-of-order samples ingestion (`-ingester.out-of-order-time-window`)
- Out-of-order samples ingestion (`-ingester.ooo-native-histograms-ingestion-enabled`)
- Out-of-order native histogram samples ingestion (`-ingester.out-of-order-time-window`)
- Shipper labeling out-of-order blocks before upload to cloud storage (`-ingester.out-of-order-blocks-external-label-enabled`)
- Postings for matchers cache configuration:
- `-blocks-storage.tsdb.head-postings-for-matchers-cache-ttl`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3268,6 +3268,13 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -ingester.native-histograms-ingestion-enabled
[native_histograms_ingestion_enabled: <boolean> | default = false]
# (experimental) Enable experimental out-of-order native histogram ingestion.
# This only takes effect if the `-ingester.out-of-order-time-window` value is
# greater than zero and if `-ingester.native-histograms-ingestion-enabled =
# true`
# CLI flag: -ingester.ooo-native-histograms-ingestion-enabled
[ooo_native_histograms_ingestion_enabled: <boolean> | default = false]

# (advanced) Additional custom trackers for active metrics. If there are active
# series matching a provided matcher (map value), the count will be exposed in
# the custom trackers metric labeled using the tracker name (map key). Zero
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ require (
)

// Using a fork of Prometheus with Mimir-specific changes.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240920170818-87bc81f0005c
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240925112120-6046bf43c9b2

// client_golang v1.20.3 has some data races in histogram exemplars.
// Stick to v1.19.1 until they are fixed.
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1262,8 +1262,8 @@ github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wp
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20240920170818-87bc81f0005c h1:+Bs4f9RDbRpBAzq50oCnbkc9yZNFkKZl53uchHvCwmc=
github.com/grafana/mimir-prometheus v0.0.0-20240920170818-87bc81f0005c/go.mod h1:oyDm7JaLUh+QGuGkC7iXC8IyTUq5rlh1ba2CRm9DlVg=
github.com/grafana/mimir-prometheus v0.0.0-20240925112120-6046bf43c9b2 h1:440vf/oyTq3xubH0iMHveDfEM8p+t/lzLMHiJNQRfY0=
github.com/grafana/mimir-prometheus v0.0.0-20240925112120-6046bf43c9b2/go.mod h1:oyDm7JaLUh+QGuGkC7iXC8IyTUq5rlh1ba2CRm9DlVg=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6 h1:nT8QXdJo6wHMBcF0xEoXxEWkoUZOyzV/jyi/u9l7YEk=
Expand Down
227 changes: 227 additions & 0 deletions integration/ooo_ingestion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// SPDX-License-Identifier: AGPL-3.0-only
//go:build requires_docker

package integration

import (
"net/http"
"testing"
"time"

"github.com/grafana/e2e"
e2edb "github.com/grafana/e2e/db"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/integration/e2ehistograms"
"github.com/grafana/mimir/integration/e2emimir"
)

func TestOOOIngestion(t *testing.T) {
t.Helper()

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, blocksBucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Start Mimir components.
require.NoError(t, copyFileToSharedDir(s, "docs/configurations/single-process-config-blocks.yaml", mimirConfigFile))

// Start Mimir in single binary mode, reading the config from file and overwriting
// the backend config to make it work with Minio.
flags := mergeFlags(
DefaultSingleBinaryFlags(),
BlocksStorageFlags(),
BlocksStorageS3Flags(),
map[string]string{
"-ingester.out-of-order-time-window": "10m",
"-ingester.native-histograms-ingestion-enabled": "true",
"-ingester.ooo-native-histograms-ingestion-enabled": "true",
},
)

mimir := e2emimir.NewSingleBinary("mimir-1", flags, e2emimir.WithConfigFile(mimirConfigFile), e2emimir.WithPorts(9009, 9095))
require.NoError(t, s.StartAndWaitReady(mimir))

c, err := e2emimir.NewClient(mimir.HTTPEndpoint(), mimir.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

nowTS := time.Now()
oooTS := nowTS.Add(-time.Minute)
tooOldTS := nowTS.Add(-time.Hour)

var expectedMatrix model.Matrix
var expectedVector model.Vector

// Push float series.
floatSeriesName := "ooo_float_series"

// Push in-order sample.
series, expectedVector, _ := generateFloatSeries(floatSeriesName, nowTS)
res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Push out-of-order sample.
series, _, expectedMatrix = generateFloatSeries(floatSeriesName, oooTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Push sample that's older than the out-of-order time window.
series, _, _ = generateFloatSeries(floatSeriesName, tooOldTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, res.StatusCode)

expectedMatrix[0].Values = append(expectedMatrix[0].Values, model.SamplePair{Timestamp: expectedVector[0].Timestamp, Value: expectedVector[0].Value})

// Query float series.
rangeResult, err := c.QueryRange(floatSeriesName, nowTS.Add(-time.Minute), nowTS, time.Minute)
require.NoError(t, err)
require.Equal(t, model.ValMatrix, rangeResult.Type())
require.Equal(t, expectedMatrix, rangeResult.(model.Matrix))

// Push int histogram series.
intHistogramSeriesName := "ooo_int_histogram_series"

// Push in-order sample.
series, expectedVector, _ = e2ehistograms.GenerateHistogramSeries(intHistogramSeriesName, nowTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Push out-of-order sample.
series, _, expectedMatrix = e2ehistograms.GenerateHistogramSeries(intHistogramSeriesName, oooTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Push sample that's older than the out-of-order time window.
series, _, _ = e2ehistograms.GenerateHistogramSeries(intHistogramSeriesName, tooOldTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, res.StatusCode)

expectedMatrix[0].Histograms = append(expectedMatrix[0].Histograms, model.SampleHistogramPair{Timestamp: expectedVector[0].Timestamp, Histogram: expectedVector[0].Histogram})

// Query int histogram series.
rangeResult, err = c.QueryRange(intHistogramSeriesName, nowTS.Add(-time.Minute), nowTS, time.Minute)
require.NoError(t, err)
require.Equal(t, model.ValMatrix, rangeResult.Type())
require.Equal(t, expectedMatrix, rangeResult.(model.Matrix))

// Push float histogram series.
floatHistogramSeriesName := "ooo_float_histogram_series"

// Push in-order sample.
series, expectedVector, _ = e2ehistograms.GenerateFloatHistogramSeries(floatHistogramSeriesName, nowTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Push out-of-order sample.
series, _, expectedMatrix = e2ehistograms.GenerateFloatHistogramSeries(floatHistogramSeriesName, oooTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Push sample that's older than the out-of-order time window.
series, _, _ = generateHistogramSeries(floatHistogramSeriesName, tooOldTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, res.StatusCode)

expectedMatrix[0].Histograms = append(expectedMatrix[0].Histograms, model.SampleHistogramPair{Timestamp: expectedVector[0].Timestamp, Histogram: expectedVector[0].Histogram})

// Query float histogram series.
rangeResult, err = c.QueryRange(floatHistogramSeriesName, nowTS.Add(-time.Minute), nowTS, time.Minute)
require.NoError(t, err)
require.Equal(t, model.ValMatrix, rangeResult.Type())
require.Equal(t, expectedMatrix, rangeResult.(model.Matrix))
}

func TestOOOHistogramIngestionDisabled(t *testing.T) {
t.Helper()

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, blocksBucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Start Mimir components.
require.NoError(t, copyFileToSharedDir(s, "docs/configurations/single-process-config-blocks.yaml", mimirConfigFile))

// Start Mimir in single binary mode, reading the config from file and overwriting
// the backend config to make it work with Minio.
flags := mergeFlags(
DefaultSingleBinaryFlags(),
BlocksStorageFlags(),
BlocksStorageS3Flags(),
map[string]string{
"-ingester.out-of-order-time-window": "10m",
"-ingester.native-histograms-ingestion-enabled": "true",
},
)

mimir := e2emimir.NewSingleBinary("mimir-1", flags, e2emimir.WithConfigFile(mimirConfigFile), e2emimir.WithPorts(9009, 9095))
require.NoError(t, s.StartAndWaitReady(mimir))

c, err := e2emimir.NewClient(mimir.HTTPEndpoint(), mimir.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

nowTS := time.Now()
oooTS := nowTS.Add(-time.Minute)

// Push float series.
floatSeriesName := "ooo_float_series"

// Push in-order sample.
series, _, _ := generateFloatSeries(floatSeriesName, nowTS)
res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Push out-of-order sample.
series, _, _ = generateFloatSeries(floatSeriesName, oooTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Push int histogram series.
intHistogramSeriesName := "ooo_int_histogram_series"

// Push in-order sample.
series, _, _ = e2ehistograms.GenerateHistogramSeries(intHistogramSeriesName, nowTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Push out-of-order sample.
series, _, _ = e2ehistograms.GenerateHistogramSeries(intHistogramSeriesName, oooTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, res.StatusCode)

// Push float histogram series.
floatHistogramSeriesName := "ooo_float_histogram_series"

// Push in-order sample.
series, _, _ = e2ehistograms.GenerateFloatHistogramSeries(floatHistogramSeriesName, nowTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Push out-of-order sample.
series, _, _ = e2ehistograms.GenerateFloatHistogramSeries(floatHistogramSeriesName, oooTS)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, res.StatusCode)
}
6 changes: 6 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,11 @@ func (i *Ingester) applyTSDBSettings() {
} else {
db.db.DisableNativeHistograms()
}
if i.limits.OOONativeHistogramsIngestionEnabled(userID) {
db.db.EnableOOONativeHistograms()
} else {
db.db.DisableOOONativeHistograms()
}
}
}

Expand Down Expand Up @@ -2682,6 +2687,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD
BlockPostingsForMatchersCacheMaxBytes: i.cfg.BlocksStorageConfig.TSDB.BlockPostingsForMatchersCacheMaxBytes,
BlockPostingsForMatchersCacheForce: i.cfg.BlocksStorageConfig.TSDB.BlockPostingsForMatchersCacheForce,
EnableNativeHistograms: i.limits.NativeHistogramsIngestionEnabled(userID),
EnableOOONativeHistograms: i.limits.OOONativeHistogramsIngestionEnabled(userID),
SecondaryHashFunction: secondaryTSDBHashFunctionForUser(userID),
}, nil)
if err != nil {
Expand Down
Loading

0 comments on commit f2cff61

Please sign in to comment.