Skip to content

Commit 6c2fcba

Browse files
committed
Add native histograms ingestion rate limit
Signed-off-by: Paurush Garg <[email protected]>
1 parent 844fa55 commit 6c2fcba

File tree

6 files changed

+169
-57
lines changed

6 files changed

+169
-57
lines changed

pkg/distributor/distributor.go

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ type Distributor struct {
9595
HATracker *ha.HATracker
9696

9797
// Per-user rate limiter.
98-
ingestionRateLimiter *limiter.RateLimiter
98+
ingestionRateLimiter *limiter.RateLimiter
99+
nativeHistogramsIngestionRateLimiter *limiter.RateLimiter
99100

100101
// Manager for subservices (HA Tracker, distributor ring and client pool)
101102
subservices *services.Manager
@@ -267,11 +268,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
267268
// it's an internal dependency and can't join the distributors ring, we skip rate
268269
// limiting.
269270
var ingestionRateStrategy limiter.RateLimiterStrategy
271+
var nativeHistogramsIngestionRateStrategy limiter.RateLimiterStrategy
270272
var distributorsLifeCycler *ring.Lifecycler
271273
var distributorsRing *ring.Ring
272274

273275
if !canJoinDistributorsRing {
274276
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
277+
nativeHistogramsIngestionRateStrategy = newInfiniteIngestionRateStrategy()
275278
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
276279
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
277280
if err != nil {
@@ -285,21 +288,24 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
285288
subservices = append(subservices, distributorsLifeCycler, distributorsRing)
286289

287290
ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsLifeCycler)
291+
nativeHistogramsIngestionRateStrategy = newGlobalNativeHistogramsIngestionRateStrategy(limits, distributorsLifeCycler)
288292
} else {
289293
ingestionRateStrategy = newLocalIngestionRateStrategy(limits)
294+
nativeHistogramsIngestionRateStrategy = newLocalNativeHistogramsIngestionRateStrategy(limits)
290295
}
291296

292297
d := &Distributor{
293-
cfg: cfg,
294-
log: log,
295-
ingestersRing: ingestersRing,
296-
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
297-
distributorsLifeCycler: distributorsLifeCycler,
298-
distributorsRing: distributorsRing,
299-
limits: limits,
300-
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
301-
HATracker: haTracker,
302-
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
298+
cfg: cfg,
299+
log: log,
300+
ingestersRing: ingestersRing,
301+
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
302+
distributorsLifeCycler: distributorsLifeCycler,
303+
distributorsRing: distributorsRing,
304+
limits: limits,
305+
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
306+
nativeHistogramsIngestionRateLimiter: limiter.NewRateLimiter(nativeHistogramsIngestionRateStrategy, 10*time.Second),
307+
HATracker: haTracker,
308+
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
303309

304310
queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
305311
Namespace: "cortex",
@@ -774,6 +780,20 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
774780

775781
totalSamples := validatedFloatSamples + validatedHistogramSamples
776782
totalN := totalSamples + validatedExemplars + len(validatedMetadata)
783+
784+
if !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) {
785+
// Ensure the request slice is reused if the request is rate limited.
786+
cortexpb.ReuseSlice(req.Timeseries)
787+
788+
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(totalSamples))
789+
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedExemplars))
790+
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(len(validatedMetadata)))
791+
// Return a 429 here to tell the client it is going too fast.
792+
// Client may discard the data or slow down and re-send.
793+
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
794+
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata))
795+
}
796+
777797
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
778798
// Ensure the request slice is reused if the request is rate limited.
779799
cortexpb.ReuseSlice(req.Timeseries)

pkg/distributor/distributor_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func TestConfig_Validate(t *testing.T) {
134134
func TestDistributor_Push(t *testing.T) {
135135
t.Parallel()
136136
// Metrics to assert on.
137-
lastSeenTimestamp := "cortex_distributor_latest_seen_sample_timestamp_seconds"
137+
lt astSeenTimestamp := "cortex_distributor_latest_seen_sample_timestamp_seconds"
138138
distributorAppend := "cortex_distributor_ingester_appends_total"
139139
distributorAppendFailure := "cortex_distributor_ingester_append_failures_total"
140140
distributorReceivedSamples := "cortex_distributor_received_samples_total"
@@ -351,6 +351,15 @@ func TestDistributor_Push(t *testing.T) {
351351
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 25
352352
`,
353353
},
354+
"A push not exceeding burst size but exceeding nativeHistograms burst size should fail, histograms": {
355+
numIngesters: 3,
356+
happyIngesters: 3,
357+
samples: samplesIn{num: 15, startTimestampMs: 123456789000},
358+
histogramSamples: true,
359+
metadata: 5,
360+
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (10) exceeded while adding 15 samples and 5 metadata"),
361+
metricNames: []string{lastSeenTimestamp, distributorReceivedSamples},
362+
},
354363
} {
355364
for _, useStreamPush := range []bool{false, true} {
356365
for _, shardByAllLabels := range []bool{true, false} {
@@ -364,6 +373,8 @@ func TestDistributor_Push(t *testing.T) {
364373
flagext.DefaultValues(limits)
365374
limits.IngestionRate = 20
366375
limits.IngestionBurstSize = 20
376+
limits.NativeHistogramsIngestionRate = 10
377+
limits.NativeHistogramsIngestionBurstSize = 10
367378

368379
ds, _, regs, _ := prepare(t, prepConfig{
369380
numIngesters: tc.numIngesters,

pkg/distributor/ingestion_rate_strategy.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,49 @@ func (s *infiniteStrategy) Burst(tenantID string) int {
7272
// Burst is ignored when limit = rate.Inf
7373
return 0
7474
}
75+
76+
type localStrategyNativeHistograms struct {
77+
limits *validation.Overrides
78+
}
79+
80+
func newLocalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides) limiter.RateLimiterStrategy {
81+
return &localStrategyNativeHistograms{
82+
limits: limits,
83+
}
84+
}
85+
86+
func (s *localStrategyNativeHistograms) Limit(tenantID string) float64 {
87+
return s.limits.NativeHistogramsIngestionRate(tenantID)
88+
}
89+
90+
func (s *localStrategyNativeHistograms) Burst(tenantID string) int {
91+
return s.limits.NativeHistogramsIngestionBurstSize(tenantID)
92+
}
93+
94+
type globalStrategyNativeHistograms struct {
95+
limits *validation.Overrides
96+
ring ReadLifecycler
97+
}
98+
99+
func newGlobalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy {
100+
return &globalStrategyNativeHistograms{
101+
limits: limits,
102+
ring: ring,
103+
}
104+
}
105+
106+
func (s *globalStrategyNativeHistograms) Limit(tenantID string) float64 {
107+
numDistributors := s.ring.HealthyInstancesCount()
108+
109+
if numDistributors == 0 {
110+
return s.limits.NativeHistogramsIngestionRate(tenantID)
111+
}
112+
113+
return s.limits.NativeHistogramsIngestionRate(tenantID) / float64(numDistributors)
114+
}
115+
116+
func (s *globalStrategyNativeHistograms) Burst(tenantID string) int {
117+
// The meaning of burst doesn't change for the global strategy, in order
118+
// to keep it easier to understand for users / operators.
119+
return s.limits.NativeHistogramsIngestionBurstSize(tenantID)
120+
}

pkg/distributor/ingestion_rate_strategy_test.go

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,44 +15,58 @@ import (
1515
func TestIngestionRateStrategy(t *testing.T) {
1616
t.Parallel()
1717
tests := map[string]struct {
18-
limits validation.Limits
19-
ring ReadLifecycler
20-
expectedLimit float64
21-
expectedBurst int
18+
limits validation.Limits
19+
ring ReadLifecycler
20+
expectedLimit float64
21+
expectedBurst int
22+
expectedNativeHistogramsLimit float64
23+
expectedNativeHistogramsBurst int
2224
}{
2325
"local rate limiter should just return configured limits": {
2426
limits: validation.Limits{
25-
IngestionRateStrategy: validation.LocalIngestionRateStrategy,
26-
IngestionRate: float64(1000),
27-
IngestionBurstSize: 10000,
27+
IngestionRateStrategy: validation.LocalIngestionRateStrategy,
28+
IngestionRate: float64(1000),
29+
IngestionBurstSize: 10000,
30+
NativeHistogramsIngestionRate: float64(100),
31+
NativeHistogramsIngestionBurstSize: 100,
2832
},
29-
ring: nil,
30-
expectedLimit: float64(1000),
31-
expectedBurst: 10000,
33+
ring: nil,
34+
expectedLimit: float64(1000),
35+
expectedBurst: 10000,
36+
expectedNativeHistogramsLimit: float64(100),
37+
expectedNativeHistogramsBurst: 100,
3238
},
3339
"global rate limiter should share the limit across the number of distributors": {
3440
limits: validation.Limits{
35-
IngestionRateStrategy: validation.GlobalIngestionRateStrategy,
36-
IngestionRate: float64(1000),
37-
IngestionBurstSize: 10000,
41+
IngestionRateStrategy: validation.GlobalIngestionRateStrategy,
42+
IngestionRate: float64(1000),
43+
IngestionBurstSize: 10000,
44+
NativeHistogramsIngestionRate: float64(100),
45+
NativeHistogramsIngestionBurstSize: 100,
3846
},
3947
ring: func() ReadLifecycler {
4048
ring := newReadLifecyclerMock()
4149
ring.On("HealthyInstancesCount").Return(2)
4250
return ring
4351
}(),
44-
expectedLimit: float64(500),
45-
expectedBurst: 10000,
52+
expectedLimit: float64(500),
53+
expectedBurst: 10000,
54+
expectedNativeHistogramsLimit: float64(50),
55+
expectedNativeHistogramsBurst: 100,
4656
},
4757
"infinite rate limiter should return unlimited settings": {
4858
limits: validation.Limits{
49-
IngestionRateStrategy: "infinite",
50-
IngestionRate: float64(1000),
51-
IngestionBurstSize: 10000,
59+
IngestionRateStrategy: "infinite",
60+
IngestionRate: float64(1000),
61+
IngestionBurstSize: 10000,
62+
NativeHistogramsIngestionRate: float64(100),
63+
NativeHistogramsIngestionBurstSize: 100,
5264
},
53-
ring: nil,
54-
expectedLimit: float64(rate.Inf),
55-
expectedBurst: 0,
65+
ring: nil,
66+
expectedLimit: float64(rate.Inf),
67+
expectedBurst: 0,
68+
expectedNativeHistogramsLimit: float64(rate.Inf),
69+
expectedNativeHistogramsBurst: 0,
5670
},
5771
}
5872

@@ -62,6 +76,7 @@ func TestIngestionRateStrategy(t *testing.T) {
6276
t.Run(testName, func(t *testing.T) {
6377
t.Parallel()
6478
var strategy limiter.RateLimiterStrategy
79+
var nativeHistogramsStrategy limiter.RateLimiterStrategy
6580

6681
// Init limits overrides
6782
overrides, err := validation.NewOverrides(testData.limits, nil)
@@ -71,16 +86,21 @@ func TestIngestionRateStrategy(t *testing.T) {
7186
switch testData.limits.IngestionRateStrategy {
7287
case validation.LocalIngestionRateStrategy:
7388
strategy = newLocalIngestionRateStrategy(overrides)
89+
nativeHistogramsStrategy = newLocalNativeHistogramsIngestionRateStrategy(overrides)
7490
case validation.GlobalIngestionRateStrategy:
7591
strategy = newGlobalIngestionRateStrategy(overrides, testData.ring)
92+
nativeHistogramsStrategy = newGlobalNativeHistogramsIngestionRateStrategy(overrides, testData.ring)
7693
case "infinite":
7794
strategy = newInfiniteIngestionRateStrategy()
95+
nativeHistogramsStrategy = newInfiniteIngestionRateStrategy()
7896
default:
7997
require.Fail(t, "Unknown strategy")
8098
}
8199

82100
assert.Equal(t, strategy.Limit("test"), testData.expectedLimit)
83101
assert.Equal(t, strategy.Burst("test"), testData.expectedBurst)
102+
assert.Equal(t, nativeHistogramsStrategy.Limit("test"), testData.expectedNativeHistogramsLimit)
103+
assert.Equal(t, nativeHistogramsStrategy.Burst("test"), testData.expectedNativeHistogramsBurst)
84104
})
85105
}
86106
}

0 commit comments

Comments
 (0)