Skip to content

Add nativeHistograms IngestionRate limit #6794

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histograms. #6794
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3427,6 +3427,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.ingestion-rate-limit
[ingestion_rate: <float> | default = 25000]

# Per-user native histograms ingestion rate limit in samples per second.
# Disabled by default
# CLI flag: -distributor.native-histograms-ingestion-rate-limit
[native_histograms_ingestion_rate: <float> | default = 1.7976931348623157e+308]

# Whether the ingestion rate limit should be applied individually to each
# distributor instance (local), or evenly shared across the cluster (global).
# CLI flag: -distributor.ingestion-rate-limit-strategy
Expand All @@ -3436,6 +3441,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.ingestion-burst-size
[ingestion_burst_size: <int> | default = 50000]

# Per-user allowed native histograms ingestion burst size (in number of samples)
# CLI flag: -distributor.native-histograms-ingestion-burst-size
[native_histograms_ingestion_burst_size: <int> | default = 0]

# Flag to enable, for all users, handling of samples with external labels
# identifying replicas in an HA Prometheus setup.
# CLI flag: -distributor.ha-tracker.enable-for-all-users
Expand Down
53 changes: 38 additions & 15 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io"
"math"
"net/http"
"sort"
"strings"
Expand Down Expand Up @@ -95,7 +96,8 @@ type Distributor struct {
HATracker *ha.HATracker

// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
ingestionRateLimiter *limiter.RateLimiter
nativeHistogramsIngestionRateLimiter *limiter.RateLimiter

// Manager for subservices (HA Tracker, distributor ring and client pool)
subservices *services.Manager
Expand Down Expand Up @@ -267,11 +269,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
// it's an internal dependency and can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy limiter.RateLimiterStrategy
var nativeHistogramsIngestionRateStrategy limiter.RateLimiterStrategy
var distributorsLifeCycler *ring.Lifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
nativeHistogramsIngestionRateStrategy = newInfiniteIngestionRateStrategy()
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
Expand All @@ -285,21 +289,24 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices = append(subservices, distributorsLifeCycler, distributorsRing)

ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsLifeCycler)
nativeHistogramsIngestionRateStrategy = newGlobalNativeHistogramsIngestionRateStrategy(limits, distributorsLifeCycler)
} else {
ingestionRateStrategy = newLocalIngestionRateStrategy(limits)
nativeHistogramsIngestionRateStrategy = newLocalNativeHistogramsIngestionRateStrategy(limits)
}

d := &Distributor{
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
distributorsLifeCycler: distributorsLifeCycler,
distributorsRing: distributorsRing,
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
distributorsLifeCycler: distributorsLifeCycler,
distributorsRing: distributorsRing,
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
nativeHistogramsIngestionRateLimiter: limiter.NewRateLimiter(nativeHistogramsIngestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -774,16 +781,32 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

totalSamples := validatedFloatSamples + validatedHistogramSamples
totalN := totalSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {

nhRateLimited := false
if limits.NativeHistogramsIngestionRate != math.MaxFloat64 {
nhRateLimited = !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples)
}
rateLimited := !d.ingestionRateLimiter.AllowN(now, userID, totalN)

// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
if nhRateLimited {
// Ensure the request slice is reused if the request is rate limited.
cortexpb.ReuseSlice(req.Timeseries)
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(totalSamples))
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedExemplars))
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(len(validatedMetadata)))
Comment on lines +797 to +799
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we always returning NativeHistogramsRateLimited? Can't this be trigger only by rateLimited?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks very much.
I needed to set label value validation.RateLimited in case it is rateLimited due to IngestionRate limit, and set label value validation.NativeHistogramsRateLimited in case it is nhRateLimited due to nativeHistogramsIngestionRate limit.
Updated now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to drop all samples, exemplars and metadata if native histograms are rate limited?
The default ingestion rate drop everything today because it passes all to the rate limiter. But NH limiter we only check native histograms so it should only throttle native histograms.

I think it doesn't make sense for this limit to impact the existing ingestion rate limit if NH limit is set very small but there is still big room for the default ingestion rate

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. we can just block NH


return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata))
}
if rateLimited {
// Ensure the request slice is reused if the request is rate limited.
cortexpb.ReuseSlice(req.Timeseries)
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples))
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.

return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR. We should mention number of dropped exemplars as well.

}

Expand Down
118 changes: 118 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,15 @@ func TestDistributor_Push(t *testing.T) {
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 25
`,
},
"A push not exceeding burst size but exceeding native histograms burst size should fail, histograms": {
numIngesters: 3,
happyIngesters: 3,
samples: samplesIn{num: 15, startTimestampMs: 123456789000},
histogramSamples: true,
metadata: 5,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (10) exceeded while adding 15 samples and 5 metadata"),
metricNames: []string{lastSeenTimestamp, distributorReceivedSamples},
},
} {
for _, useStreamPush := range []bool{false, true} {
for _, shardByAllLabels := range []bool{true, false} {
Expand All @@ -364,6 +373,8 @@ func TestDistributor_Push(t *testing.T) {
flagext.DefaultValues(limits)
limits.IngestionRate = 20
limits.IngestionBurstSize = 20
limits.NativeHistogramsIngestionRate = 10
limits.NativeHistogramsIngestionBurstSize = 10

ds, _, regs, _ := prepare(t, prepConfig{
numIngesters: tc.numIngesters,
Expand Down Expand Up @@ -681,6 +692,113 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
}
}

func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
t.Parallel()
type testPush struct {
samples int
metadata int
expectedError error
}

ctx := user.InjectOrgID(context.Background(), "user")
tests := map[string]struct {
distributors int
ingestionRateStrategy string
ingestionRate float64
ingestionBurstSize int
nativeHistogramsIngestionRate float64
nativeHistogramsIngestionBurstSize int
pushes []testPush
}{
"local strategy: limit should be set to each distributor: histograms": {
distributors: 2,
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
ingestionRate: 10,
ingestionBurstSize: 10,
nativeHistogramsIngestionRate: 5,
nativeHistogramsIngestionBurstSize: 5,
pushes: []testPush{
{samples: 2, expectedError: nil},
{metadata: 1, expectedError: nil},
{samples: 4, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (5) exceeded while adding 4 samples and 0 metadata")},
{samples: 2, metadata: 1, expectedError: nil},
{samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (5) exceeded while adding 3 samples and 0 metadata")},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 0 samples and 1 metadata")},
},
},
"global strategy: limit should be evenly shared across distributors: histograms": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRate: 10,
ingestionBurstSize: 5,
nativeHistogramsIngestionRate: 6,
nativeHistogramsIngestionBurstSize: 3,
pushes: []testPush{
{samples: 2, expectedError: nil},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 2 samples and 1 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 1 samples and 0 metadata")},
},
},
"global strategy: burst should set to each distributor: histograms": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRate: 10,
ingestionBurstSize: 20,
nativeHistogramsIngestionRate: 6,
nativeHistogramsIngestionBurstSize: 10,
pushes: []testPush{
{samples: 3, expectedError: nil},
{samples: 1, expectedError: nil},
{samples: 7, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 7 samples and 1 metadata")},
{samples: 5, expectedError: nil},
{samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 3 samples and 0 metadata")},
{metadata: 12, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 12 metadata")},
},
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
t.Parallel()
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionRateStrategy = testData.ingestionRateStrategy
limits.IngestionRate = testData.ingestionRate
limits.IngestionBurstSize = testData.ingestionBurstSize
limits.NativeHistogramsIngestionRate = testData.nativeHistogramsIngestionRate
limits.NativeHistogramsIngestionBurstSize = testData.nativeHistogramsIngestionBurstSize

// Start all expected distributors
distributors, _, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: testData.distributors,
shardByAllLabels: true,
limits: limits,
})

// Push samples in multiple requests to the first distributor
for _, push := range testData.pushes {
var request = makeWriteRequest(0, 0, push.metadata, push.samples)

response, err := distributors[0].Push(ctx, request)

if push.expectedError == nil {
assert.Equal(t, emptyResponse, response)
assert.Nil(t, err)
} else {
assert.Nil(t, response)
assert.Equal(t, push.expectedError, err)
}
}
})
}

}

func TestPush_QuorumError(t *testing.T) {
t.Parallel()

Expand Down
46 changes: 46 additions & 0 deletions pkg/distributor/ingestion_rate_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,49 @@ func (s *infiniteStrategy) Burst(tenantID string) int {
// Burst is ignored when limit = rate.Inf
return 0
}

type localStrategyNativeHistograms struct {
limits *validation.Overrides
}

func newLocalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides) limiter.RateLimiterStrategy {
return &localStrategyNativeHistograms{
limits: limits,
}
}

func (s *localStrategyNativeHistograms) Limit(tenantID string) float64 {
return s.limits.NativeHistogramsIngestionRate(tenantID)
}

func (s *localStrategyNativeHistograms) Burst(tenantID string) int {
return s.limits.NativeHistogramsIngestionBurstSize(tenantID)
}

type globalStrategyNativeHistograms struct {
limits *validation.Overrides
ring ReadLifecycler
}

func newGlobalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy {
return &globalStrategyNativeHistograms{
limits: limits,
ring: ring,
}
}

func (s *globalStrategyNativeHistograms) Limit(tenantID string) float64 {
numDistributors := s.ring.HealthyInstancesCount()

if numDistributors == 0 {
return s.limits.NativeHistogramsIngestionRate(tenantID)
}

return s.limits.NativeHistogramsIngestionRate(tenantID) / float64(numDistributors)
}

func (s *globalStrategyNativeHistograms) Burst(tenantID string) int {
// The meaning of burst doesn't change for the global strategy, in order
// to keep it easier to understand for users / operators.
return s.limits.NativeHistogramsIngestionBurstSize(tenantID)
}
Loading
Loading