Skip to content

Commit

Permalink
Ruler: add option to skip writing result to ingesters (#9060)
Browse files Browse the repository at this point in the history
* Ruler: add option to skip writing result to ingesters

Signed-off-by: Marco Pracucci <[email protected]>

* Expose new structs so that we can use them in GEM + update doc / changelog

Signed-off-by: Marco Pracucci <[email protected]>

* Add TestDefaultManagerFactory_ShouldNotWriteRecordingRuleResultsWhenDisabled

Signed-off-by: Marco Pracucci <[email protected]>

* Fixed linter

Signed-off-by: Marco Pracucci <[email protected]>

* Fix race in tests

Signed-off-by: Marco Pracucci <[email protected]>

* Apply suggestions from code review

Co-authored-by: Taylor C <[email protected]>

* Updated doc

Signed-off-by: Marco Pracucci <[email protected]>

* Update doc

Signed-off-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
Co-authored-by: Taylor C <[email protected]>
  • Loading branch information
pracucci and tacole02 authored Aug 21, 2024
1 parent 702feb1 commit bd874de
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* `-ingester.partition-ring.*`: configures partitions ring backend.
* [FEATURE] Querier: added support for `limitk()` and `limit_ratio()` experimental PromQL functions. Experimental functions are disabled by default, but can be enabled setting `-querier.promql-experimental-functions-enabled=true` in the query-frontend and querier. #8632
* [FEATURE] Querier: experimental support for `X-Mimir-Chunk-Info-Logger` header that triggers logging information about TSDB chunks loaded from ingesters and store-gateways in the querier. The header should contain the comma separated list of labels for which their value will be included in the logs. #8599
* [FEATURE] Ruler: added experimental configuration, `-ruler.rule-evaluation-write-enabled`, to disable writing the result of rule evaluation to ingesters. This feature can be used for testing purposes. #9060
* [ENHANCEMENT] Compactor: Add `cortex_compactor_compaction_job_duration_seconds` and `cortex_compactor_compaction_job_blocks` histogram metrics to track duration of individual compaction jobs and number of blocks per job. #8371
* [ENHANCEMENT] Rules: Added per namespace max rules per rule group limit. The maximum number of rules per rule groups for all namespaces continues to be configured by `-ruler.max-rules-per-rule-group`, but now, this can be superseded by the new `-ruler.max-rules-per-rule-group-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8378
* [ENHANCEMENT] Rules: Added per namespace max rule groups per tenant limit. The maximum number of rule groups per rule tenant for all namespaces continues to be configured by `-ruler.max-rule-groups-per-tenant`, but now, this can be superseded by the new `-ruler.max-rule-groups-per-tenant-by-namespace` option on a per namespace basis. This new limit can be overridden using the overrides mechanism to be applied per-tenant. #8425
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -12642,6 +12642,17 @@
"fieldFlag": "ruler.independent-rule-evaluation-concurrency-min-duration-percentage",
"fieldType": "float",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "rule_evaluation_write_enabled",
"required": false,
"desc": "Writes the results of rule evaluation to ingesters or ingest storage when enabled. Use this option for testing purposes. To disable, set to false.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "ruler.rule-evaluation-write-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2783,6 +2783,8 @@ Usage of ./cmd/mimir/mimir:
The prefix for the keys in the store. Should end with a /. (default "rulers/")
-ruler.ring.store string
Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi. (default "memberlist")
-ruler.rule-evaluation-write-enabled
[experimental] Writes the results of rule evaluation to ingesters or ingest storage when enabled. Use this option for testing purposes. To disable, set to false. (default true)
-ruler.rule-path string
Directory to store temporary rule files loaded by the Prometheus rule managers. This directory is not required to be persisted between restarts. (default "./data-ruler/")
-ruler.sync-rules-on-changes-enabled
Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ The following features are currently experimental:
- `-ruler.max-independent-rule-evaluation-concurrency`
- `-ruler.max-independent-rule-evaluation-concurrency-per-tenant`
- `-ruler.independent-rule-evaluation-concurrency-min-duration-percentage`
- `-ruler.rule-evaluation-write-enabled`
- Distributor
- Metrics relabeling
- `-distributor.metric-relabeling-enabled`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2118,6 +2118,12 @@ tenant_federation:
# group runtime duration must exceed 50.0% of the evaluation interval.
# CLI flag: -ruler.independent-rule-evaluation-concurrency-min-duration-percentage
[independent_rule_evaluation_concurrency_min_duration_percentage: <float> | default = 50]
# (experimental) Writes the results of rule evaluation to ingesters or ingest
# storage when enabled. Use this option for testing purposes. To disable, set to
# false.
# CLI flag: -ruler.rule-evaluation-write-enabled
[rule_evaluation_write_enabled: <boolean> | default = true]
```

### ruler_storage
Expand Down
52 changes: 50 additions & 2 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,47 @@ func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender {
}
}

type NoopAppender struct{}

func (a *NoopAppender) Append(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) {
return 0, nil
}

func (a *NoopAppender) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) {
return 0, errors.New("exemplars are unsupported")
}

func (a *NoopAppender) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
return 0, errors.New("metadata updates are unsupported")
}

func (a *NoopAppender) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, nil
}

func (a *NoopAppender) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) {
return 0, errors.New("CT zero samples are unsupported")
}

func (a *NoopAppender) Commit() error {
return nil
}

func (a *NoopAppender) Rollback() error {
return nil
}

type NoopAppendable struct{}

func NewNoopAppendable() *NoopAppendable {
return &NoopAppendable{}
}

// Appender returns a storage.Appender.
func (t *NoopAppendable) Appender(_ context.Context) storage.Appender {
return &NoopAppender{}
}

// RulesLimits defines limits used by Ruler.
type RulesLimits interface {
EvaluationDelay(userID string) time.Duration
Expand Down Expand Up @@ -267,7 +308,7 @@ type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.

func DefaultTenantManagerFactory(
cfg Config,
p Pusher,
pusher Pusher,
queryable storage.Queryable,
queryFunc rules.QueryFunc,
concurrencyController MultiTenantRuleConcurrencyController,
Expand Down Expand Up @@ -319,8 +360,15 @@ func DefaultTenantManagerFactory(
// Wrap the queryable with our custom logic.
wrappedQueryable := WrapQueryableWithReadConsistency(queryable, logger)

var appendeable storage.Appendable
if cfg.RuleEvaluationWriteEnabled {
appendeable = NewPusherAppendable(pusher, userID, totalWrites, failedWrites)
} else {
appendeable = NewNoopAppendable()
}

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, totalWrites, failedWrites),
Appendable: appendeable,
Queryable: wrappedQueryable,
QueryFunc: wrappedQueryFunc,
Context: user.InjectOrgID(ctx, userID),
Expand Down
86 changes: 86 additions & 0 deletions pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -525,11 +526,90 @@ func TestDefaultManagerFactory_CorrectQueryableUsed(t *testing.T) {
case <-time.NewTimer(time.Second).C:
require.Fail(t, "neither of the queryables was called within the timeout")
}

// Ensure the result has been written.
require.EventuallyWithT(t, func(collect *assert.CollectT) {
pusher.AssertCalled(&collectWithLogf{collect}, "Push", mock.Anything, mock.Anything)
}, 5*time.Second, 100*time.Millisecond)

manager.Stop()
})
}
}

func TestDefaultManagerFactory_ShouldNotWriteRecordingRuleResultsWhenDisabled(t *testing.T) {
const userID = "tenant-1"

for _, writeEnabled := range []bool{false, true} {
writeEnabled := writeEnabled

t.Run(fmt.Sprintf("write enabled: %t", writeEnabled), func(t *testing.T) {
t.Parallel()

// Create a test recording rule.
ruleGroup := rulespb.RuleGroupDesc{
Name: "test",
Interval: time.Second,
Rules: []*rulespb.RuleDesc{{
Record: "test",
Expr: "1",
}},
}

// Setup ruler with writes disabled.
cfg := defaultRulerConfig(t)
cfg.RuleEvaluationWriteEnabled = writeEnabled

var (
options = applyPrepareOptions(t, cfg.Ring.Common.InstanceID)
notifierManager = notifier.NewManager(&notifier.Options{Do: func(_ context.Context, _ *http.Client, _ *http.Request) (*http.Response, error) { return nil, nil }}, options.logger)
ruleFiles = writeRuleGroupToFiles(t, cfg.RulePath, options.logger, userID, ruleGroup)
queryable = newMockQueryable()
tracker = promql.NewActiveQueryTracker(t.TempDir(), 20, log.NewNopLogger())
eng = promql.NewEngine(promql.EngineOpts{
MaxSamples: 1e6,
ActiveQueryTracker: tracker,
Timeout: 2 * time.Minute,
})
queryFunc = rules.EngineQueryFunc(eng, queryable)
)

pusher := newPusherMock()
pusher.MockPush(&mimirpb.WriteResponse{}, nil)

factory := DefaultTenantManagerFactory(cfg, pusher, queryable, queryFunc, &NoopMultiTenantConcurrencyController{}, options.limits, nil)
manager := factory(context.Background(), userID, notifierManager, options.logger, nil)

// Load rules into manager and start it.
require.NoError(t, manager.Update(time.Millisecond, ruleFiles, labels.EmptyLabels(), "", nil))
go manager.Run()

// Wait until the query has been executed.
select {
case <-queryable.called:
t.Log("query executed")
case <-time.NewTimer(time.Second).C:
require.Fail(t, "no query executed")
}

if writeEnabled {
// Ensure the result has been written.
require.EventuallyWithT(t, func(collect *assert.CollectT) {
pusher.AssertCalled(&collectWithLogf{collect}, "Push", mock.Anything, mock.Anything)
}, 5*time.Second, 100*time.Millisecond)
} else {
// Ensure no write occurred within a reasonable amount of time.
time.Sleep(time.Second)
pusher.AssertNumberOfCalls(t, "Push", 0)
}

manager.Stop()

})
}

}

func TestDefaultManagerFactory_ShouldInjectReadConsistencyToContextBasedOnRuleDetail(t *testing.T) {
const userID = "tenant-1"

Expand Down Expand Up @@ -768,3 +848,9 @@ func mustStatusWithDetails(code codes.Code, cause mimirpb.ErrorCause) *status.St
}
return s
}

type collectWithLogf struct {
*assert.CollectT
}

func (c *collectWithLogf) Logf(_ string, _ ...interface{}) {}
4 changes: 4 additions & 0 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ type Config struct {

MaxIndependentRuleEvaluationConcurrency int64 `yaml:"max_independent_rule_evaluation_concurrency" category:"experimental"`
IndependentRuleEvaluationConcurrencyMinDurationPercentage float64 `yaml:"independent_rule_evaluation_concurrency_min_duration_percentage" category:"experimental"`

RuleEvaluationWriteEnabled bool `yaml:"rule_evaluation_write_enabled" category:"experimental"`
}

// Validate config and returns error on failure
Expand Down Expand Up @@ -200,6 +202,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.Int64Var(&cfg.MaxIndependentRuleEvaluationConcurrency, "ruler.max-independent-rule-evaluation-concurrency", 0, "Number of rules rules that don't have dependencies that we allow to be evaluated concurrently across all tenants. 0 to disable.")
f.Float64Var(&cfg.IndependentRuleEvaluationConcurrencyMinDurationPercentage, "ruler.independent-rule-evaluation-concurrency-min-duration-percentage", 50.0, "Minimum threshold of the interval to last rule group runtime duration to allow a rule to be evaluated concurrency. By default, the rule group runtime duration must exceed 50.0% of the evaluation interval.")

f.BoolVar(&cfg.RuleEvaluationWriteEnabled, "ruler.rule-evaluation-write-enabled", true, "Writes the results of rule evaluation to ingesters or ingest storage when enabled. Use this option for testing purposes. To disable, set to false.")

cfg.RingCheckPeriod = 5 * time.Second
}

Expand Down

0 comments on commit bd874de

Please sign in to comment.