Skip to content

Add active series limit for nativeHistograms samples #6796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [ENHANCEMENT] Ingester: Add activeSeries limit specifically for NativeHistograms. #6796
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3544,6 +3544,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.max-series-per-metric
[max_series_per_metric: <int> | default = 50000]

# The maximum number of active nativeHistograms series per user, per ingester. 0
# to disable.
# CLI flag: -ingester.max-native-histograms-series-per-user
[max_native_histograms_series_per_user: <int> | default = 5000000]

# The maximum number of active series per user, across the cluster before
# replication. 0 to disable. Supported only if -distributor.shard-by-all-labels
# is true.
Expand All @@ -3555,6 +3560,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.max-global-series-per-metric
[max_global_series_per_metric: <int> | default = 0]

# The maximum number of active nativeHistograms series per user, across the
# cluster before replication. 0 to disable. Supported only if
# -distributor.shard-by-all-labels is true.
# CLI flag: -ingester.max-global-native-histograms-series-per-user
[max_global_native_histograms_series_per_user: <int> | default = 0]

# [Experimental] Enable limits per LabelSet. Supported limits per labelSet:
# [max_series]
[limits_per_label_set: <list of LimitsPerLabelSet> | default = []]
Expand Down
45 changes: 30 additions & 15 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
}
}

// Total nativeHistograms series limit.
if err := u.limiter.AssertMaxNativeHistogramsSeriesPerUser(u.userID, u.activeSeries.ActiveNativeHistogram()); err != nil {
return err
}

// Total series limit.
if err := u.limiter.AssertMaxSeriesPerUser(u.userID, int(u.Head().NumSeries())); err != nil {
return err
Expand Down Expand Up @@ -1219,21 +1224,22 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// Keep track of some stats which are tracked only if the samples will be
// successfully committed
var (
succeededSamplesCount = 0
failedSamplesCount = 0
succeededHistogramsCount = 0
failedHistogramsCount = 0
succeededExemplarsCount = 0
failedExemplarsCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perLabelSetSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
discardedNativeHistogramCount = 0
succeededSamplesCount = 0
failedSamplesCount = 0
succeededHistogramsCount = 0
failedHistogramsCount = 0
succeededExemplarsCount = 0
failedExemplarsCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perUserNativeHistogramsSeriesLimitCount = 0
perLabelSetSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
discardedNativeHistogramCount = 0

updateFirstPartial = func(errFn func() error) {
if firstPartialErr == nil {
Expand Down Expand Up @@ -1269,6 +1275,12 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels))
})

case errors.Is(cause, errMaxNativeHistogramsSeriesPerUserLimitExceeded):
perUserNativeHistogramsSeriesLimitCount++
updateFirstPartial(func() error {
return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels))
})

case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
perMetricSeriesLimitCount++
updateFirstPartial(func() error {
Expand Down Expand Up @@ -1512,6 +1524,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if perUserSeriesLimitCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserSeriesLimit, userID).Add(float64(perUserSeriesLimitCount))
}
if perUserNativeHistogramsSeriesLimitCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserNativeHistogramsSeriesLimit, userID).Add(float64(perUserNativeHistogramsSeriesLimitCount))
}
if perMetricSeriesLimitCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
}
Expand Down
87 changes: 87 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,93 @@ func TestIngesterUserLimitExceeded(t *testing.T) {

}

func TestIngesterUserLimitExceededForNativeHistograms(t *testing.T) {
limits := defaultLimitsTestConfig()
limits.EnableNativeHistograms = true
limits.MaxLocalNativeHistogramsSeriesPerUser = 1
limits.MaxLocalSeriesPerUser = 1
limits.MaxLocalMetricsWithMetadataPerUser = 1

userID := "1"
// Series
labels1 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}
labels3 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "biz"}}
sampleNativeHistogram1 := cortexpb.HistogramToHistogramProto(0, tsdbutil.GenerateTestHistogram(1))
sampleNativeHistogram2 := cortexpb.HistogramToHistogramProto(1, tsdbutil.GenerateTestHistogram(2))
sampleNativeHistogram3 := cortexpb.HistogramToHistogramProto(0, tsdbutil.GenerateTestHistogram(3))

// Metadata
metadata1 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric", Help: "a help for testmetric", Type: cortexpb.COUNTER}
metadata2 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric2", Help: "a help for testmetric2", Type: cortexpb.COUNTER}

dir := t.TempDir()

chunksDir := filepath.Join(dir, "chunks")
blocksDir := filepath.Join(dir, "blocks")
require.NoError(t, os.Mkdir(chunksDir, os.ModePerm))
require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))

blocksIngesterGenerator := func(reg prometheus.Registerer) *Ingester {
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, nil, blocksDir, reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
// Wait until it's ACTIVE
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
return ing.lifecycler.GetState()
})

return ing
}

tests := []string{"blocks"}
for i, ingGenerator := range []func(reg prometheus.Registerer) *Ingester{blocksIngesterGenerator} {
t.Run(tests[i], func(t *testing.T) {
reg := prometheus.NewRegistry()
ing := ingGenerator(reg)

// Append only one series and one metadata first, expect no error.
ctx := user.InjectOrgID(context.Background(), userID)
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1}, nil, []*cortexpb.MetricMetadata{metadata1}, []cortexpb.Histogram{sampleNativeHistogram1}, cortexpb.API))
require.NoError(t, err)

testLimits := func(reg prometheus.Gatherer) {
// Append to two series, expect series-exceeded error.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1, labels3}, nil, nil, []cortexpb.Histogram{sampleNativeHistogram2, sampleNativeHistogram3}, cortexpb.API))
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "returned error is not an httpgrpc response")
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
assert.Equal(t, wrapWithUser(makeLimitError(perUserNativeHistogramsSeriesLimit, ing.limiter.FormatError(userID, errMaxNativeHistogramsSeriesPerUserLimitExceeded, labels1)), userID).Error(), string(httpResp.Body))

// Append two metadata, expect no error since metadata is a best effort approach.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(nil, nil, []*cortexpb.MetricMetadata{metadata1, metadata2}, nil, cortexpb.API))
require.NoError(t, err)

// Read samples back via ingester queries.
res, _, err := runTestQuery(ctx, t, ing, labels.MatchEqual, model.MetricNameLabel, "testmetric")
require.NoError(t, err)
require.NotNil(t, res)

// Verify metadata
m, err := ing.MetricsMetadata(ctx, &client.MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""})
require.NoError(t, err)
assert.Equal(t, []*cortexpb.MetricMetadata{metadata1}, m.Metadata)
}

testLimits(reg)

// Limits should hold after restart.
services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
// Use new registry to prevent metrics registration panic.
reg = prometheus.NewRegistry()
ing = ingGenerator(reg)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

testLimits(reg)
})
}

}

func benchmarkData(nSeries int) (allLabels []labels.Labels, allSamples []cortexpb.Sample) {
for j := 0; j < nSeries; j++ {
labels := chunk.BenchmarkLabels.Copy()
Expand Down
38 changes: 34 additions & 4 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
)

var (
errMaxSeriesPerMetricLimitExceeded = errors.New("per-metric series limit exceeded")
errMaxMetadataPerMetricLimitExceeded = errors.New("per-metric metadata limit exceeded")
errMaxSeriesPerUserLimitExceeded = errors.New("per-user series limit exceeded")
errMaxMetadataPerUserLimitExceeded = errors.New("per-user metric metadata limit exceeded")
errMaxSeriesPerMetricLimitExceeded = errors.New("per-metric series limit exceeded")
errMaxMetadataPerMetricLimitExceeded = errors.New("per-metric metadata limit exceeded")
errMaxSeriesPerUserLimitExceeded = errors.New("per-user series limit exceeded")
errMaxNativeHistogramsSeriesPerUserLimitExceeded = errors.New("per-user nativeHistograms series limit exceeded")
errMaxMetadataPerUserLimitExceeded = errors.New("per-user metric metadata limit exceeded")
)

type errMaxSeriesPerLabelSetLimitExceeded struct {
Expand Down Expand Up @@ -95,6 +96,16 @@ func (l *Limiter) AssertMaxSeriesPerUser(userID string, series int) error {
return errMaxSeriesPerUserLimitExceeded
}

// AssertMaxNativeHistogramsSeriesPerUser limit has not been reached compared to the current
// number of nativeHistograms series in input and returns an error if so.
func (l *Limiter) AssertMaxNativeHistogramsSeriesPerUser(userID string, series int) error {
if actualLimit := l.maxNativeHistogramsSeriesPerUser(userID); series < actualLimit {
return nil
}

return errMaxNativeHistogramsSeriesPerUserLimitExceeded
}

// AssertMaxMetricsWithMetadataPerUser limit has not been reached compared to the current
// number of metrics with metadata in input and returns an error if so.
func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int) error {
Expand Down Expand Up @@ -134,6 +145,8 @@ func (l *Limiter) FormatError(userID string, err error, lbls labels.Labels) erro
switch {
case errors.Is(err, errMaxSeriesPerUserLimitExceeded):
return l.formatMaxSeriesPerUserError(userID)
case errors.Is(err, errMaxNativeHistogramsSeriesPerUserLimitExceeded):
return l.formatMaxNativeHistogramsSeriesPerUserError(userID)
case errors.Is(err, errMaxSeriesPerMetricLimitExceeded):
return l.formatMaxSeriesPerMetricError(userID, lbls.Get(labels.MetricName))
case errors.Is(err, errMaxMetadataPerUserLimitExceeded):
Expand All @@ -158,6 +171,15 @@ func (l *Limiter) formatMaxSeriesPerUserError(userID string) error {
minNonZero(localLimit, globalLimit), l.AdminLimitMessage, localLimit, globalLimit, actualLimit)
}

func (l *Limiter) formatMaxNativeHistogramsSeriesPerUserError(userID string) error {
actualLimit := l.maxNativeHistogramsSeriesPerUser(userID)
localLimit := l.limits.MaxLocalNativeHistogramsSeriesPerUser(userID)
globalLimit := l.limits.MaxGlobalNativeHistogramsSeriesPerUser(userID)

return fmt.Errorf("per-user nativeHistograms series limit of %d exceeded, %s (local limit: %d global limit: %d actual local limit: %d)",
minNonZero(localLimit, globalLimit), l.AdminLimitMessage, localLimit, globalLimit, actualLimit)
}

func (l *Limiter) formatMaxSeriesPerMetricError(userID string, metric string) error {
actualLimit := l.maxSeriesPerMetric(userID)
localLimit := l.limits.MaxLocalSeriesPerMetric(userID)
Expand Down Expand Up @@ -248,6 +270,14 @@ func (l *Limiter) maxSeriesPerUser(userID string) int {
)
}

func (l *Limiter) maxNativeHistogramsSeriesPerUser(userID string) int {
return l.maxByLocalAndGlobal(
userID,
l.limits.MaxLocalNativeHistogramsSeriesPerUser,
l.limits.MaxGlobalNativeHistogramsSeriesPerUser,
)
}

func (l *Limiter) maxMetadataPerUser(userID string) int {
return l.maxByLocalAndGlobal(
userID,
Expand Down
88 changes: 84 additions & 4 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ func TestLimiter_maxSeriesPerUser(t *testing.T) {
runLimiterMaxFunctionTest(t, applyLimits, runMaxFn, false)
}

func TestLimiter_maxNativeHistogramsSeriesPerUser(t *testing.T) {
applyLimits := func(limits *validation.Limits, localLimit, globalLimit int) {
limits.MaxLocalNativeHistogramsSeriesPerUser = localLimit
limits.MaxGlobalNativeHistogramsSeriesPerUser = globalLimit
}

runMaxFn := func(limiter *Limiter) int {
return limiter.maxNativeHistogramsSeriesPerUser("test")
}

runLimiterMaxFunctionTest(t, applyLimits, runMaxFn, false)
}

func TestLimiter_maxMetadataPerUser(t *testing.T) {
applyLimits := func(limits *validation.Limits, localLimit, globalLimit int) {
limits.MaxLocalMetricsWithMetadataPerUser = localLimit
Expand Down Expand Up @@ -425,6 +438,69 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) {
}
}

func TestLimiter_AssertMaxNativeHistogramsSeriesPerUser(t *testing.T) {
tests := map[string]struct {
maxLocalNativeHistogramsSeriesPerUser int
maxGlobalNativeHistogramsSeriesPerUser int
ringReplicationFactor int
ringIngesterCount int
shardByAllLabels bool
series int
expected error
}{
"both local and global limit are disabled": {
maxLocalNativeHistogramsSeriesPerUser: 0,
maxGlobalNativeHistogramsSeriesPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
shardByAllLabels: false,
series: 100,
expected: nil,
},
"current number of series is below the limit": {
maxLocalNativeHistogramsSeriesPerUser: 0,
maxGlobalNativeHistogramsSeriesPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
shardByAllLabels: true,
series: 299,
expected: nil,
},
"current number of series is above the limit": {
maxLocalNativeHistogramsSeriesPerUser: 0,
maxGlobalNativeHistogramsSeriesPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
shardByAllLabels: true,
series: 300,
expected: errMaxNativeHistogramsSeriesPerUserLimitExceeded,
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
// Mock the ring
ring := &ringCountMock{}
ring.On("HealthyInstancesCount").Return(testData.ringIngesterCount)
ring.On("ZonesCount").Return(1)

// Mock limits
limits, err := validation.NewOverrides(validation.Limits{
MaxLocalNativeHistogramsSeriesPerUser: testData.maxLocalNativeHistogramsSeriesPerUser,
MaxGlobalNativeHistogramsSeriesPerUser: testData.maxGlobalNativeHistogramsSeriesPerUser,
}, nil)
require.NoError(t, err)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
actual := limiter.AssertMaxNativeHistogramsSeriesPerUser("test", testData.series)

assert.Equal(t, testData.expected, actual)
})
}
}

func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) {

tests := map[string]struct {
Expand Down Expand Up @@ -580,10 +656,11 @@ func TestLimiter_FormatError(t *testing.T) {

// Mock limits
limits, err := validation.NewOverrides(validation.Limits{
MaxGlobalSeriesPerUser: 100,
MaxGlobalSeriesPerMetric: 20,
MaxGlobalMetricsWithMetadataPerUser: 10,
MaxGlobalMetadataPerMetric: 3,
MaxGlobalSeriesPerUser: 100,
MaxGlobalNativeHistogramsSeriesPerUser: 100,
MaxGlobalSeriesPerMetric: 20,
MaxGlobalMetricsWithMetadataPerUser: 10,
MaxGlobalMetadataPerMetric: 3,
}, nil)
require.NoError(t, err)

Expand All @@ -593,6 +670,9 @@ func TestLimiter_FormatError(t *testing.T) {
actual := limiter.FormatError("user-1", errMaxSeriesPerUserLimitExceeded, lbls)
assert.EqualError(t, actual, "per-user series limit of 100 exceeded, please contact administrator to raise it (local limit: 0 global limit: 100 actual local limit: 100)")

actual = limiter.FormatError("user-1", errMaxNativeHistogramsSeriesPerUserLimitExceeded, lbls)
assert.EqualError(t, actual, "per-user nativeHistograms series limit of 100 exceeded, please contact administrator to raise it (local limit: 0 global limit: 100 actual local limit: 100)")

actual = limiter.FormatError("user-1", errMaxSeriesPerMetricLimitExceeded, lbls)
assert.EqualError(t, actual, "per-metric series limit of 20 exceeded for metric testMetric, please contact administrator to raise it (local limit: 0 global limit: 20 actual local limit: 20)")

Expand Down
Loading
Loading