diff --git a/CHANGELOG.md b/CHANGELOG.md index f78f1b445c3..d1f51b35df8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ * [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766 * [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769 * [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778 +* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histograms. #6794 * [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 4460ae63021..720505498d2 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3427,6 +3427,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.ingestion-rate-limit [ingestion_rate: | default = 25000] +# Per-user native histograms ingestion rate limit in samples per second. +# Disabled by default +# CLI flag: -distributor.native-histograms-ingestion-rate-limit +[native_histograms_ingestion_rate: | default = 1.7976931348623157e+308] + # Whether the ingestion rate limit should be applied individually to each # distributor instance (local), or evenly shared across the cluster (global). # CLI flag: -distributor.ingestion-rate-limit-strategy @@ -3436,6 +3441,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.ingestion-burst-size [ingestion_burst_size: | default = 50000] +# Per-user allowed native histograms ingestion burst size (in number of samples) +# CLI flag: -distributor.native-histograms-ingestion-burst-size +[native_histograms_ingestion_burst_size: | default = 0] + # Flag to enable, for all users, handling of samples with external labels # identifying replicas in an HA Prometheus setup. # CLI flag: -distributor.ha-tracker.enable-for-all-users diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6843bc27fcc..b798a27dcb3 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "io" + "math" "net/http" "sort" "strings" @@ -95,7 +96,8 @@ type Distributor struct { HATracker *ha.HATracker // Per-user rate limiter. - ingestionRateLimiter *limiter.RateLimiter + ingestionRateLimiter *limiter.RateLimiter + nativeHistogramsIngestionRateLimiter *limiter.RateLimiter // Manager for subservices (HA Tracker, distributor ring and client pool) subservices *services.Manager @@ -267,11 +269,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove // it's an internal dependency and can't join the distributors ring, we skip rate // limiting. var ingestionRateStrategy limiter.RateLimiterStrategy + var nativeHistogramsIngestionRateStrategy limiter.RateLimiterStrategy var distributorsLifeCycler *ring.Lifecycler var distributorsRing *ring.Ring if !canJoinDistributorsRing { ingestionRateStrategy = newInfiniteIngestionRateStrategy() + nativeHistogramsIngestionRateStrategy = newInfiniteIngestionRateStrategy() } else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg)) if err != nil { @@ -285,21 +289,24 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove subservices = append(subservices, distributorsLifeCycler, distributorsRing) ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsLifeCycler) + nativeHistogramsIngestionRateStrategy = newGlobalNativeHistogramsIngestionRateStrategy(limits, distributorsLifeCycler) } else { ingestionRateStrategy = newLocalIngestionRateStrategy(limits) + nativeHistogramsIngestionRateStrategy = newLocalNativeHistogramsIngestionRateStrategy(limits) } d := &Distributor{ - cfg: cfg, - log: log, - ingestersRing: ingestersRing, - ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log), - distributorsLifeCycler: distributorsLifeCycler, - distributorsRing: distributorsRing, - limits: limits, - ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), - HATracker: haTracker, - ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), + cfg: cfg, + log: log, + ingestersRing: ingestersRing, + ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log), + distributorsLifeCycler: distributorsLifeCycler, + distributorsRing: distributorsRing, + limits: limits, + ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), + nativeHistogramsIngestionRateLimiter: limiter.NewRateLimiter(nativeHistogramsIngestionRateStrategy, 10*time.Second), + HATracker: haTracker, + ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", @@ -774,16 +781,32 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co totalSamples := validatedFloatSamples + validatedHistogramSamples totalN := totalSamples + validatedExemplars + len(validatedMetadata) - if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { + + nhRateLimited := false + if limits.NativeHistogramsIngestionRate != math.MaxFloat64 { + nhRateLimited = !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) + } + rateLimited := !d.ingestionRateLimiter.AllowN(now, userID, totalN) + + // Return a 429 here to tell the client it is going too fast. + // Client may discard the data or slow down and re-send. + // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. + if nhRateLimited { // Ensure the request slice is reused if the request is rate limited. cortexpb.ReuseSlice(req.Timeseries) + d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(totalSamples)) + d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedExemplars)) + d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(len(validatedMetadata))) + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) + } + if rateLimited { + // Ensure the request slice is reused if the request is rate limited. + cortexpb.ReuseSlice(req.Timeseries) d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples)) d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars)) d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata))) - // Return a 429 here to tell the client it is going too fast. - // Client may discard the data or slow down and re-send. - // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2d3ceb62c1d..30d91212a10 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -351,6 +351,15 @@ func TestDistributor_Push(t *testing.T) { cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 25 `, }, + "A push not exceeding burst size but exceeding native histograms burst size should fail, histograms": { + numIngesters: 3, + happyIngesters: 3, + samples: samplesIn{num: 15, startTimestampMs: 123456789000}, + histogramSamples: true, + metadata: 5, + expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (10) exceeded while adding 15 samples and 5 metadata"), + metricNames: []string{lastSeenTimestamp, distributorReceivedSamples}, + }, } { for _, useStreamPush := range []bool{false, true} { for _, shardByAllLabels := range []bool{true, false} { @@ -364,6 +373,8 @@ func TestDistributor_Push(t *testing.T) { flagext.DefaultValues(limits) limits.IngestionRate = 20 limits.IngestionBurstSize = 20 + limits.NativeHistogramsIngestionRate = 10 + limits.NativeHistogramsIngestionBurstSize = 10 ds, _, regs, _ := prepare(t, prepConfig{ numIngesters: tc.numIngesters, @@ -681,6 +692,113 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { } } +func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { + t.Parallel() + type testPush struct { + samples int + metadata int + expectedError error + } + + ctx := user.InjectOrgID(context.Background(), "user") + tests := map[string]struct { + distributors int + ingestionRateStrategy string + ingestionRate float64 + ingestionBurstSize int + nativeHistogramsIngestionRate float64 + nativeHistogramsIngestionBurstSize int + pushes []testPush + }{ + "local strategy: limit should be set to each distributor: histograms": { + distributors: 2, + ingestionRateStrategy: validation.LocalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 10, + nativeHistogramsIngestionRate: 5, + nativeHistogramsIngestionBurstSize: 5, + pushes: []testPush{ + {samples: 2, expectedError: nil}, + {metadata: 1, expectedError: nil}, + {samples: 4, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (5) exceeded while adding 4 samples and 0 metadata")}, + {samples: 2, metadata: 1, expectedError: nil}, + {samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (5) exceeded while adding 3 samples and 0 metadata")}, + {metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 0 samples and 1 metadata")}, + }, + }, + "global strategy: limit should be evenly shared across distributors: histograms": { + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 5, + nativeHistogramsIngestionRate: 6, + nativeHistogramsIngestionBurstSize: 3, + pushes: []testPush{ + {samples: 2, expectedError: nil}, + {samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 2 samples and 1 metadata")}, + {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")}, + {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 1 samples and 0 metadata")}, + }, + }, + "global strategy: burst should set to each distributor: histograms": { + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 20, + nativeHistogramsIngestionRate: 6, + nativeHistogramsIngestionBurstSize: 10, + pushes: []testPush{ + {samples: 3, expectedError: nil}, + {samples: 1, expectedError: nil}, + {samples: 7, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 7 samples and 1 metadata")}, + {samples: 5, expectedError: nil}, + {samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 3 samples and 0 metadata")}, + {metadata: 12, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 12 metadata")}, + }, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.IngestionRateStrategy = testData.ingestionRateStrategy + limits.IngestionRate = testData.ingestionRate + limits.IngestionBurstSize = testData.ingestionBurstSize + limits.NativeHistogramsIngestionRate = testData.nativeHistogramsIngestionRate + limits.NativeHistogramsIngestionBurstSize = testData.nativeHistogramsIngestionBurstSize + + // Start all expected distributors + distributors, _, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: testData.distributors, + shardByAllLabels: true, + limits: limits, + }) + + // Push samples in multiple requests to the first distributor + for _, push := range testData.pushes { + var request = makeWriteRequest(0, 0, push.metadata, push.samples) + + response, err := distributors[0].Push(ctx, request) + + if push.expectedError == nil { + assert.Equal(t, emptyResponse, response) + assert.Nil(t, err) + } else { + assert.Nil(t, response) + assert.Equal(t, push.expectedError, err) + } + } + }) + } + +} + func TestPush_QuorumError(t *testing.T) { t.Parallel() diff --git a/pkg/distributor/ingestion_rate_strategy.go b/pkg/distributor/ingestion_rate_strategy.go index cc3e5dd2402..09629011af6 100644 --- a/pkg/distributor/ingestion_rate_strategy.go +++ b/pkg/distributor/ingestion_rate_strategy.go @@ -72,3 +72,49 @@ func (s *infiniteStrategy) Burst(tenantID string) int { // Burst is ignored when limit = rate.Inf return 0 } + +type localStrategyNativeHistograms struct { + limits *validation.Overrides +} + +func newLocalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides) limiter.RateLimiterStrategy { + return &localStrategyNativeHistograms{ + limits: limits, + } +} + +func (s *localStrategyNativeHistograms) Limit(tenantID string) float64 { + return s.limits.NativeHistogramsIngestionRate(tenantID) +} + +func (s *localStrategyNativeHistograms) Burst(tenantID string) int { + return s.limits.NativeHistogramsIngestionBurstSize(tenantID) +} + +type globalStrategyNativeHistograms struct { + limits *validation.Overrides + ring ReadLifecycler +} + +func newGlobalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy { + return &globalStrategyNativeHistograms{ + limits: limits, + ring: ring, + } +} + +func (s *globalStrategyNativeHistograms) Limit(tenantID string) float64 { + numDistributors := s.ring.HealthyInstancesCount() + + if numDistributors == 0 { + return s.limits.NativeHistogramsIngestionRate(tenantID) + } + + return s.limits.NativeHistogramsIngestionRate(tenantID) / float64(numDistributors) +} + +func (s *globalStrategyNativeHistograms) Burst(tenantID string) int { + // The meaning of burst doesn't change for the global strategy, in order + // to keep it easier to understand for users / operators. + return s.limits.NativeHistogramsIngestionBurstSize(tenantID) +} diff --git a/pkg/distributor/ingestion_rate_strategy_test.go b/pkg/distributor/ingestion_rate_strategy_test.go index 2820c2fb59d..baf47d7d796 100644 --- a/pkg/distributor/ingestion_rate_strategy_test.go +++ b/pkg/distributor/ingestion_rate_strategy_test.go @@ -15,44 +15,58 @@ import ( func TestIngestionRateStrategy(t *testing.T) { t.Parallel() tests := map[string]struct { - limits validation.Limits - ring ReadLifecycler - expectedLimit float64 - expectedBurst int + limits validation.Limits + ring ReadLifecycler + expectedLimit float64 + expectedBurst int + expectedNativeHistogramsLimit float64 + expectedNativeHistogramsBurst int }{ "local rate limiter should just return configured limits": { limits: validation.Limits{ - IngestionRateStrategy: validation.LocalIngestionRateStrategy, - IngestionRate: float64(1000), - IngestionBurstSize: 10000, + IngestionRateStrategy: validation.LocalIngestionRateStrategy, + IngestionRate: float64(1000), + IngestionBurstSize: 10000, + NativeHistogramsIngestionRate: float64(100), + NativeHistogramsIngestionBurstSize: 100, }, - ring: nil, - expectedLimit: float64(1000), - expectedBurst: 10000, + ring: nil, + expectedLimit: float64(1000), + expectedBurst: 10000, + expectedNativeHistogramsLimit: float64(100), + expectedNativeHistogramsBurst: 100, }, "global rate limiter should share the limit across the number of distributors": { limits: validation.Limits{ - IngestionRateStrategy: validation.GlobalIngestionRateStrategy, - IngestionRate: float64(1000), - IngestionBurstSize: 10000, + IngestionRateStrategy: validation.GlobalIngestionRateStrategy, + IngestionRate: float64(1000), + IngestionBurstSize: 10000, + NativeHistogramsIngestionRate: float64(100), + NativeHistogramsIngestionBurstSize: 100, }, ring: func() ReadLifecycler { ring := newReadLifecyclerMock() ring.On("HealthyInstancesCount").Return(2) return ring }(), - expectedLimit: float64(500), - expectedBurst: 10000, + expectedLimit: float64(500), + expectedBurst: 10000, + expectedNativeHistogramsLimit: float64(50), + expectedNativeHistogramsBurst: 100, }, "infinite rate limiter should return unlimited settings": { limits: validation.Limits{ - IngestionRateStrategy: "infinite", - IngestionRate: float64(1000), - IngestionBurstSize: 10000, + IngestionRateStrategy: "infinite", + IngestionRate: float64(1000), + IngestionBurstSize: 10000, + NativeHistogramsIngestionRate: float64(100), + NativeHistogramsIngestionBurstSize: 100, }, - ring: nil, - expectedLimit: float64(rate.Inf), - expectedBurst: 0, + ring: nil, + expectedLimit: float64(rate.Inf), + expectedBurst: 0, + expectedNativeHistogramsLimit: float64(rate.Inf), + expectedNativeHistogramsBurst: 0, }, } @@ -62,6 +76,7 @@ func TestIngestionRateStrategy(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() var strategy limiter.RateLimiterStrategy + var nativeHistogramsStrategy limiter.RateLimiterStrategy // Init limits overrides overrides, err := validation.NewOverrides(testData.limits, nil) @@ -71,16 +86,21 @@ func TestIngestionRateStrategy(t *testing.T) { switch testData.limits.IngestionRateStrategy { case validation.LocalIngestionRateStrategy: strategy = newLocalIngestionRateStrategy(overrides) + nativeHistogramsStrategy = newLocalNativeHistogramsIngestionRateStrategy(overrides) case validation.GlobalIngestionRateStrategy: strategy = newGlobalIngestionRateStrategy(overrides, testData.ring) + nativeHistogramsStrategy = newGlobalNativeHistogramsIngestionRateStrategy(overrides, testData.ring) case "infinite": strategy = newInfiniteIngestionRateStrategy() + nativeHistogramsStrategy = newInfiniteIngestionRateStrategy() default: require.Fail(t, "Unknown strategy") } assert.Equal(t, strategy.Limit("test"), testData.expectedLimit) assert.Equal(t, strategy.Burst("test"), testData.expectedBurst) + assert.Equal(t, nativeHistogramsStrategy.Limit("test"), testData.expectedNativeHistogramsLimit) + assert.Equal(t, nativeHistogramsStrategy.Burst("test"), testData.expectedNativeHistogramsBurst) }) } } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 6419dc6ba89..87a6a2b7b98 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -123,29 +123,31 @@ type LimitsPerLabelSet struct { // limits via flags, or per-user limits via yaml config. type Limits struct { // Distributor enforced limits. - IngestionRate float64 `yaml:"ingestion_rate" json:"ingestion_rate"` - IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` - IngestionBurstSize int `yaml:"ingestion_burst_size" json:"ingestion_burst_size"` - AcceptHASamples bool `yaml:"accept_ha_samples" json:"accept_ha_samples"` - AcceptMixedHASamples bool `yaml:"accept_mixed_ha_samples" json:"accept_mixed_ha_samples"` - HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"` - HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"` - HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"` - DropLabels flagext.StringSlice `yaml:"drop_labels" json:"drop_labels"` - MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"` - MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"` - MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"` - MaxLabelsSizeBytes int `yaml:"max_labels_size_bytes" json:"max_labels_size_bytes"` - MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"` - RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"` - RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"` - CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` - EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name"` - EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` - IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"` - MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."` - MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` - PromoteResourceAttributes []string `yaml:"promote_resource_attributes" json:"promote_resource_attributes"` + IngestionRate float64 `yaml:"ingestion_rate" json:"ingestion_rate"` + NativeHistogramsIngestionRate float64 `yaml:"native_histograms_ingestion_rate" json:"native_histograms_ingestion_rate"` + IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` + IngestionBurstSize int `yaml:"ingestion_burst_size" json:"ingestion_burst_size"` + NativeHistogramsIngestionBurstSize int `yaml:"native_histograms_ingestion_burst_size" json:"native_histograms_ingestion_burst_size"` + AcceptHASamples bool `yaml:"accept_ha_samples" json:"accept_ha_samples"` + AcceptMixedHASamples bool `yaml:"accept_mixed_ha_samples" json:"accept_mixed_ha_samples"` + HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"` + HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"` + HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"` + DropLabels flagext.StringSlice `yaml:"drop_labels" json:"drop_labels"` + MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"` + MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"` + MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"` + MaxLabelsSizeBytes int `yaml:"max_labels_size_bytes" json:"max_labels_size_bytes"` + MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"` + RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"` + RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"` + CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` + EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name"` + EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` + IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"` + MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."` + MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` + PromoteResourceAttributes []string `yaml:"promote_resource_attributes" json:"promote_resource_attributes"` // Ingester enforced limits. // Series @@ -240,8 +242,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set both on ingesters and distributors. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") + f.Float64Var(&l.NativeHistogramsIngestionRate, "distributor.native-histograms-ingestion-rate-limit", math.MaxFloat64, "Per-user native histograms ingestion rate limit in samples per second. Disabled by default") f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).") f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") + f.IntVar(&l.NativeHistogramsIngestionBurstSize, "distributor.native-histograms-ingestion-burst-size", 0, "Per-user allowed native histograms ingestion burst size (in number of samples)") f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.") f.BoolVar(&l.AcceptMixedHASamples, "experimental.distributor.ha-tracker.mixed-ha-samples", false, "[Experimental] Flag to enable handling of samples with mixed external labels identifying replicas in an HA Prometheus setup. Supported only if -distributor.ha-tracker.enable-for-all-users is true.") f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") @@ -577,6 +581,11 @@ func (o *Overrides) IngestionRate(userID string) float64 { return o.GetOverridesForUser(userID).IngestionRate } +// NativeHistogramsIngestionRate returns the limit on ingester rate (samples per second). +func (o *Overrides) NativeHistogramsIngestionRate(userID string) float64 { + return o.GetOverridesForUser(userID).NativeHistogramsIngestionRate +} + // IngestionRateStrategy returns whether the ingestion rate limit should be individually applied // to each distributor instance (local) or evenly shared across the cluster (global). func (o *Overrides) IngestionRateStrategy() string { @@ -589,6 +598,11 @@ func (o *Overrides) IngestionBurstSize(userID string) int { return o.GetOverridesForUser(userID).IngestionBurstSize } +// NativeHistogramsIngestionBurstSize returns the burst size for ingestion rate. +func (o *Overrides) NativeHistogramsIngestionBurstSize(userID string) int { + return o.GetOverridesForUser(userID).NativeHistogramsIngestionBurstSize +} + // AcceptHASamples returns whether the distributor should track and accept samples from HA replicas for this user. func (o *Overrides) AcceptHASamples(userID string) bool { return o.GetOverridesForUser(userID).AcceptHASamples diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 557c8bdb41b..7c8b3ee82df 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -61,7 +61,8 @@ const ( // RateLimited is one of the values for the reason to discard samples. // Declared here to avoid duplication in ingester and distributor. - RateLimited = "rate_limited" + RateLimited = "rate_limited" + NativeHistogramsRateLimited = "natve_histograms_rate_limited" // Too many HA clusters is one of the reasons for discarding samples. TooManyHAClusters = "too_many_ha_clusters"