diff --git a/core/config/app_config.go b/core/config/app_config.go index d4ef27b6305..db54fd278be 100644 --- a/core/config/app_config.go +++ b/core/config/app_config.go @@ -36,6 +36,7 @@ type AppConfig interface { AuditLogger() AuditLogger AutoPprof() AutoPprof Capabilities() Capabilities + Workflows() Workflows Database() Database Feature() Feature FluxMonitor() FluxMonitor diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index 7579b7133e5..014c8dbae73 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -470,6 +470,13 @@ MaxEncryptedSecretsSize = '26.40kb' # Default # MaxConfigSize is the maximum size of a config that can be fetched from the given config url. MaxConfigSize = '50.00kb' # Default +[Workflows] +[Workflows.Limits] +# Global is the maximum number of workflows that can be registered globally. +Global = 200 # Default +# PerOwner is the maximum number of workflows that can be registered per owner. +PerOwner = 200 # Default + [Capabilities.ExternalRegistry] # Address is the address for the capabilities registry contract. Address = '0x0' # Example diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 4a34384e74d..79cb8cc29bb 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -57,6 +57,7 @@ type Core struct { Mercury Mercury `toml:",omitempty"` Capabilities Capabilities `toml:",omitempty"` Telemetry Telemetry `toml:",omitempty"` + Workflows Workflows `toml:",omitempty"` } // SetFrom updates c with any non-nil values from f. (currently TOML field only!) @@ -87,6 +88,7 @@ func (c *Core) SetFrom(f *Core) { c.Keeper.setFrom(&f.Keeper) c.Mercury.setFrom(&f.Mercury) c.Capabilities.setFrom(&f.Capabilities) + c.Workflows.setFrom(&f.Workflows) c.AutoPprof.setFrom(&f.AutoPprof) c.Pyroscope.setFrom(&f.Pyroscope) @@ -1487,6 +1489,29 @@ func (r *ExternalRegistry) setFrom(f *ExternalRegistry) { } } +type Workflows struct { + Limits Limits +} + +type Limits struct { + Global *int32 + PerOwner *int32 +} + +func (r *Workflows) setFrom(f *Workflows) { + r.Limits.setFrom(&f.Limits) +} + +func (r *Limits) setFrom(f *Limits) { + if f.Global != nil { + r.Global = f.Global + } + + if f.PerOwner != nil { + r.PerOwner = f.PerOwner + } +} + type WorkflowRegistry struct { Address *string NetworkID *string diff --git a/core/config/workflows_config.go b/core/config/workflows_config.go new file mode 100644 index 00000000000..2c58755e9b5 --- /dev/null +++ b/core/config/workflows_config.go @@ -0,0 +1,10 @@ +package config + +type Workflows interface { + Limits() WorkflowsLimits +} + +type WorkflowsLimits interface { + Global() int32 + PerOwner() int32 +} diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 55ca46828d5..801c66ca7c5 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -78,6 +78,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" "github.com/smartcontractkit/chainlink/v2/core/sessions" "github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth" "github.com/smartcontractkit/chainlink/v2/core/sessions/localauth" @@ -284,6 +285,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { DS: opts.DS, CREOpts: opts.CREOpts, capabilityCfg: cfg.Capabilities(), + workflowsCfg: cfg.Workflows(), logger: globalLogger, relayerChainInterops: relayerChainInterops, keystore: keyStore, @@ -488,6 +490,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { opts.CapabilitiesRegistry, workflowORM, creServices.workflowRateLimiter, + creServices.workflowLimits, ) // Flux monitor requires ethereum just to boot, silence errors with a null delegate @@ -707,6 +710,7 @@ type creServiceConfig struct { CREOpts capabilityCfg config.Capabilities + workflowsCfg config.Workflows keystore creKeystore logger logger.Logger relayerChainInterops *CoreRelayerChainInteroperators @@ -717,6 +721,11 @@ type CREServices struct { // workflowRateLimiter is the rate limiter for workflows // it is exposed because there are contingent services in the application workflowRateLimiter *ratelimiter.RateLimiter + + // workflowLimits is the syncer limiter for workflows + // it will specify the amount of global an per owner workflows that can be registered + workflowLimits *syncerlimiter.Limits + // gatewayConnectorWrapper is the wrapper for the gateway connector // it is exposed because there are contingent services in the application gatewayConnectorWrapper *gatewayconnector.ServiceWrapper @@ -727,6 +736,7 @@ type CREServices struct { func newCREServices(cscfg creServiceConfig) (*CREServices, error) { var ( capCfg = cscfg.capabilityCfg + wCfg = cscfg.workflowsCfg globalLogger = cscfg.logger keyStore = cscfg.keystore relayerChainInterops = cscfg.relayerChainInterops @@ -744,6 +754,14 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { return nil, fmt.Errorf("could not instantiate workflow rate limiter: %w", err) } + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{ + Global: wCfg.Limits().Global(), + PerOwner: wCfg.Limits().PerOwner(), + }) + if err != nil { + return nil, fmt.Errorf("could not instantiate workflow syncer limiter: %w", err) + } + var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper if capCfg.GatewayConnector().DonID() != "" { globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", capCfg.GatewayConnector().DonID()) @@ -849,6 +867,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { clockwork.NewRealClock(), keys[0], workflowRateLimiter, + workflowLimits, syncer.WithMaxArtifactSize( syncer.ArtifactConfig{ MaxBinarySize: uint64(capCfg.WorkflowRegistry().MaxBinarySize()), @@ -886,6 +905,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { } return &CREServices{ workflowRateLimiter: workflowRateLimiter, + workflowLimits: workflowLimits, gatewayConnectorWrapper: gatewayConnectorWrapper, srvs: srvcs, }, nil diff --git a/core/services/chainlink/config_general.go b/core/services/chainlink/config_general.go index 0f6fea7518e..daeff2910e0 100644 --- a/core/services/chainlink/config_general.go +++ b/core/services/chainlink/config_general.go @@ -412,6 +412,10 @@ func (g *generalConfig) Capabilities() config.Capabilities { return &capabilitiesConfig{c: g.c.Capabilities} } +func (g *generalConfig) Workflows() config.Workflows { + return &workflowsConfig{c: g.c.Workflows} +} + func (g *generalConfig) Database() coreconfig.Database { return &databaseConfig{c: g.c.Database, s: g.secrets.Secrets.Database, logSQL: g.logSQL} } diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 1041e1bd97f..5235d1a6257 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -97,6 +97,12 @@ var ( AutoPprof: toml.AutoPprof{ CPUProfileRate: ptr[int64](7), }, + Workflows: toml.Workflows{ + Limits: toml.Limits{ + Global: ptr(int32(200)), + PerOwner: ptr(int32(200)), + }, + }, }, EVM: []*evmcfg.EVMConfig{ { @@ -448,6 +454,7 @@ func TestConfig_Marshal(t *testing.T) { PerSenderRPS: ptr(100.0), PerSenderBurst: ptr(100), }, + Peering: toml.P2P{ IncomingMessageBufferSize: ptr[int64](13), OutgoingMessageBufferSize: ptr[int64](17), @@ -500,6 +507,12 @@ func TestConfig_Marshal(t *testing.T) { }, }, } + full.Workflows = toml.Workflows{ + Limits: toml.Limits{ + Global: ptr(int32(200)), + PerOwner: ptr(int32(200)), + }, + } full.Keeper = toml.Keeper{ DefaultTransactionQueueDepth: ptr[uint32](17), GasPriceBufferPercent: ptr[uint16](12), @@ -1691,6 +1704,7 @@ func TestConfig_setDefaults(t *testing.T) { c.Solana = solcfg.TOMLConfigs{{ChainID: ptr("unknown solana chain")}} c.Starknet = RawConfigs{{"ChainID": ptr("unknown starknet chain")}} c.setDefaults() + if s, err := c.TOMLString(); assert.NoError(t, err) { t.Log(s, err) } diff --git a/core/services/chainlink/config_workflows.go b/core/services/chainlink/config_workflows.go new file mode 100644 index 00000000000..a8b4231d1d9 --- /dev/null +++ b/core/services/chainlink/config_workflows.go @@ -0,0 +1,30 @@ +package chainlink + +import ( + "github.com/smartcontractkit/chainlink/v2/core/config" + "github.com/smartcontractkit/chainlink/v2/core/config/toml" +) + +var _ config.Workflows = (*workflowsConfig)(nil) + +type workflowsConfig struct { + c toml.Workflows +} + +func (w *workflowsConfig) Limits() config.WorkflowsLimits { + return &limits{ + l: w.c.Limits, + } +} + +type limits struct { + l toml.Limits +} + +func (l *limits) Global() int32 { + return *l.l.Global +} + +func (l *limits) PerOwner() int32 { + return *l.l.PerOwner +} diff --git a/core/services/chainlink/config_workflows_test.go b/core/services/chainlink/config_workflows_test.go new file mode 100644 index 00000000000..83ecca18662 --- /dev/null +++ b/core/services/chainlink/config_workflows_test.go @@ -0,0 +1,20 @@ +package chainlink + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWorkflowsConfig(t *testing.T) { + opts := GeneralConfigOpts{ + ConfigStrings: []string{fullTOML}, + } + cfg, err := opts.New() + require.NoError(t, err) + + w := cfg.Workflows() + assert.Equal(t, int32(200), w.Limits().Global()) + assert.Equal(t, int32(200), w.Limits().PerOwner()) +} diff --git a/core/services/chainlink/mocks/general_config.go b/core/services/chainlink/mocks/general_config.go index 8f9e3f46816..0a376ec1703 100644 --- a/core/services/chainlink/mocks/general_config.go +++ b/core/services/chainlink/mocks/general_config.go @@ -2139,6 +2139,53 @@ func (_c *GeneralConfig_WebServer_Call) RunAndReturn(run func() config.WebServer return _c } +// Workflows provides a mock function with no fields +func (_m *GeneralConfig) Workflows() config.Workflows { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Workflows") + } + + var r0 config.Workflows + if rf, ok := ret.Get(0).(func() config.Workflows); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(config.Workflows) + } + } + + return r0 +} + +// GeneralConfig_Workflows_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Workflows' +type GeneralConfig_Workflows_Call struct { + *mock.Call +} + +// Workflows is a helper method to define mock.On call +func (_e *GeneralConfig_Expecter) Workflows() *GeneralConfig_Workflows_Call { + return &GeneralConfig_Workflows_Call{Call: _e.mock.On("Workflows")} +} + +func (_c *GeneralConfig_Workflows_Call) Run(run func()) *GeneralConfig_Workflows_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *GeneralConfig_Workflows_Call) Return(_a0 config.Workflows) *GeneralConfig_Workflows_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *GeneralConfig_Workflows_Call) RunAndReturn(run func() config.Workflows) *GeneralConfig_Workflows_Call { + _c.Call.Return(run) + return _c +} + // NewGeneralConfig creates a new instance of GeneralConfig. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewGeneralConfig(t interface { diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index c592e0e5fd5..0d4cb2c4794 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -307,3 +307,8 @@ InsecureConnection = false TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' + +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index 9d13abb29bc..c6c68c5d3bc 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -322,6 +322,11 @@ EmitterExportTimeout = '1s' Baz = 'test' Foo = 'bar' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' Enabled = false diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index 60d38ad85e0..2a47e263783 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -308,6 +308,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/core/services/chainlink/testdata/config-multi-chain.toml b/core/services/chainlink/testdata/config-multi-chain.toml index 5d9d8913b0b..6bdb3e642f2 100644 --- a/core/services/chainlink/testdata/config-multi-chain.toml +++ b/core/services/chainlink/testdata/config-multi-chain.toml @@ -35,6 +35,11 @@ GasPriceBufferPercent = 10 [AutoPprof] CPUProfileRate = 7 +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' FinalityDepth = 26 diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index ca9674700c9..99f3d40509d 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -33,6 +33,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/stretchr/testify/require" @@ -47,6 +48,11 @@ var rlConfig = ratelimiter.Config{ PerSenderBurst: 30, } +var wlConfig = syncerlimiter.Config{ + Global: 200, + PerOwner: 200, +} + type testEvtHandler struct { events []syncer.Event mux sync.Mutex @@ -354,9 +360,13 @@ func Test_SecretsWorker(t *testing.T) { rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + + wl, err := syncerlimiter.NewWorkflowLimits(wlConfig) + require.NoError(t, err) + handler := &testSecretsWorkEventHandler{ wrappedHandler: syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, - emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl), + emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, wl), registeredCh: make(chan syncer.Event, 1), } @@ -438,8 +448,12 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) { er := syncer.NewEngineRegistry() rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + + wl, err := syncerlimiter.NewWorkflowLimits(wlConfig) + require.NoError(t, err) + handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, - emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, syncer.WithEngineRegistry(er)) + emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, wl, syncer.WithEngineRegistry(er)) worker := syncer.NewWorkflowRegistry( lggr, @@ -538,6 +552,8 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) { er := syncer.NewEngineRegistry() rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + wl, err := syncerlimiter.NewWorkflowLimits(wlConfig) + require.NoError(t, err) handler := syncer.NewEventHandler( lggr, orm, @@ -548,6 +564,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) { clockwork.NewFakeClock(), workflowkey.Key{}, rl, + wl, syncer.WithEngineRegistry(er), syncer.WithEngineFactoryFn(mf.new), ) diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 9e74a999795..7266376b18c 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -16,6 +16,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" ) type Delegate struct { @@ -24,6 +25,7 @@ type Delegate struct { logger logger.Logger store store.Store ratelimiter *ratelimiter.RateLimiter + workflowLimits *syncerlimiter.Limits } var _ job.Delegate = (*Delegate)(nil) @@ -75,6 +77,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser Binary: binary, SecretsFetcher: d.secretsFetcher, RateLimiter: d.ratelimiter, + WorkflowLimits: d.workflowLimits, } engine, err := NewEngine(ctx, cfg) if err != nil { @@ -99,6 +102,7 @@ func NewDelegate( registry core.CapabilitiesRegistry, store store.Store, ratelimiter *ratelimiter.RateLimiter, + workflowLimits *syncerlimiter.Limits, ) *Delegate { return &Delegate{ logger: logger, @@ -106,6 +110,7 @@ func NewDelegate( secretsFetcher: newNoopSecretsFetcher(), store: store, ratelimiter: ratelimiter, + workflowLimits: workflowLimits, } } diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index abf4e59579d..2444acc507f 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -26,6 +26,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/platform" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" ) const ( @@ -34,6 +35,11 @@ const ( maxStepTimeoutOverrideSec = 10 * 60 // 10 minutes ) +var ( + errGlobalWorkflowCountLimitReached = errors.New("global workflow count limit reached") + errPerOwnerWorkflowCountLimitReached = errors.New("per owner workflow count limit reached") +) + type stepRequest struct { stepRef string state store.WorkflowExecution @@ -139,8 +145,9 @@ type Engine struct { maxWorkerLimit int - clock clockwork.Clock - ratelimiter *ratelimiter.RateLimiter + clock clockwork.Clock + ratelimiter *ratelimiter.RateLimiter + workflowLimits *syncerlimiter.Limits } func (e *Engine) Start(_ context.Context) error { @@ -148,6 +155,20 @@ func (e *Engine) Start(_ context.Context) error { // create a new context, since the one passed in via Start is short-lived. ctx, _ := e.stopCh.NewCtx() + // validate if adding another workflow would exceed either the global or per owner engine count limit + ownerAllow, globalAllow := e.workflowLimits.Allow(e.workflow.owner) + if !globalAllow { + e.metrics.with(platform.KeyWorkflowID, e.workflow.id, platform.KeyWorkflowOwner, e.workflow.owner).incrementWorkflowLimitGlobalCounter(ctx) + logCustMsg(ctx, e.cma.With(platform.KeyWorkflowID, e.workflow.id, platform.KeyWorkflowOwner, e.workflow.owner), errGlobalWorkflowCountLimitReached.Error(), e.logger) + return errGlobalWorkflowCountLimitReached + } + + if !ownerAllow { + e.metrics.with(platform.KeyWorkflowID, e.workflow.id, platform.KeyWorkflowOwner, e.workflow.owner).incrementWorkflowLimitPerOwnerCounter(ctx) + logCustMsg(ctx, e.cma.With(platform.KeyWorkflowID, e.workflow.id, platform.KeyWorkflowOwner, e.workflow.owner), errPerOwnerWorkflowCountLimitReached.Error(), e.logger) + return errPerOwnerWorkflowCountLimitReached + } + e.metrics.incrementWorkflowInitializationCounter(ctx) e.wg.Add(e.maxWorkerLimit) @@ -1219,6 +1240,9 @@ func (e *Engine) Close() error { if err != nil { return err } + // decrement the global and per owner engine counter + e.workflowLimits.Decrement(e.workflow.owner) + logCustMsg(ctx, e.cma, "workflow unregistered", e.logger) e.metrics.incrementWorkflowUnregisteredCounter(ctx) return nil @@ -1251,6 +1275,7 @@ type Config struct { HeartbeatCadence time.Duration StepTimeout time.Duration RateLimiter *ratelimiter.RateLimiter + WorkflowLimits *syncerlimiter.Limits // For testing purposes only maxRetries int @@ -1331,6 +1356,14 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { } } + if cfg.WorkflowLimits == nil { + return nil, &workflowError{reason: "workflowLimits must be provided", + labels: map[string]string{ + platform.KeyWorkflowID: cfg.WorkflowID, + }, + } + } + // TODO: validation of the workflow spec // We'll need to check, among other things: // - that there are no step `ref` called `trigger` as this is reserved for any triggers @@ -1384,6 +1417,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { maxWorkerLimit: cfg.MaxWorkerLimit, clock: cfg.clock, ratelimiter: cfg.RateLimiter, + workflowLimits: cfg.WorkflowLimits, } return engine, nil diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 59b95c94538..2af0da4816b 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -38,6 +38,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" ) const ( @@ -154,7 +155,10 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string, }).SDKSpec(testutils.Context(t)) require.NoError(t, err) - return newTestEngine(t, reg, sdkSpec, opts...) + eng, testHooks, err := newTestEngine(t, reg, sdkSpec, opts...) + require.NoError(t, err) + + return eng, testHooks } type mockSecretsFetcher struct{} @@ -164,7 +168,7 @@ func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWo } // newTestEngine creates a new engine with some test defaults. -func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec, opts ...func(c *Config)) (*Engine, *testHooks) { +func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec, opts ...func(c *Config)) (*Engine, *testHooks, error) { initFailed := make(chan struct{}) initSuccessful := make(chan struct{}) executionFinished := make(chan string, 100) @@ -178,6 +182,12 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec }) require.NoError(t, err) + sl, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{ + Global: 200, + PerOwner: 200, + }) + require.NoError(t, err) + reg.SetLocalRegistry(&testConfigProvider{}) cfg := Config{ WorkflowID: testWorkflowID, @@ -206,6 +216,7 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec SecretsFetcher: mockSecretsFetcher{}, clock: clock, RateLimiter: rl, + WorkflowLimits: sl, } for _, o := range opts { o(&cfg) @@ -215,8 +226,7 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec cfg.Store = newTestDBStore(t, cfg.clock) } eng, err := NewEngine(testutils.Context(t), cfg) - require.NoError(t, err) - return eng, &testHooks{initSuccessful: initSuccessful, initFailed: initFailed, executionFinished: executionFinished, rateLimited: rateLimited} + return eng, &testHooks{initSuccessful: initSuccessful, initFailed: initFailed, executionFinished: executionFinished, rateLimited: rateLimited}, err } // getExecutionId returns the execution id of the workflow that is @@ -612,6 +622,110 @@ func TestEngine_RateLimit(t *testing.T) { t.FailNow() } }) + + t.Run("global workflow limit", func(t *testing.T) { + ctx := testutils.Context(t) + reg := coreCap.NewRegistry(logger.TestLogger(t)) + + trigger, _ := mockTrigger(t) + require.NoError(t, reg.Add(ctx, trigger)) + require.NoError(t, reg.Add(ctx, mockConsensus(""))) + target1 := mockTarget("") + require.NoError(t, reg.Add(ctx, target1)) + + target2 := newMockCapability( + capabilities.MustNewCapabilityInfo( + "write_ethereum-testnet-sepolia@1.0.0", + capabilities.CapabilityTypeTarget, + "a write capability targeting ethereum sepolia testnet", + ), + func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { + m := req.Inputs.Underlying["report"].(*values.Map) + return capabilities.CapabilityResponse{ + Value: m, + }, nil + }, + ) + require.NoError(t, reg.Add(ctx, target2)) + + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{ + Global: 1, + PerOwner: 5, + }) + require.NoError(t, err) + + setWorkflowLimits := func(c *Config) { + c.WorkflowLimits = workflowLimits + } + + // we allow one owner, so the second one should be rate limited + ownerAllow, globalAllow := workflowLimits.Allow("some-previous-owner") + require.True(t, ownerAllow) + require.True(t, globalAllow) + + eng, _ := newTestEngineWithYAMLSpec( + t, + reg, + hardcodedWorkflow, + setWorkflowLimits, + ) + + err = eng.Start(context.Background()) + require.Error(t, err) + assert.ErrorIs(t, err, errGlobalWorkflowCountLimitReached) + }) + + t.Run("per owner workflow limit", func(t *testing.T) { + ctx := testutils.Context(t) + reg := coreCap.NewRegistry(logger.TestLogger(t)) + + trigger, _ := mockTrigger(t) + require.NoError(t, reg.Add(ctx, trigger)) + require.NoError(t, reg.Add(ctx, mockConsensus(""))) + target1 := mockTarget("") + require.NoError(t, reg.Add(ctx, target1)) + + target2 := newMockCapability( + capabilities.MustNewCapabilityInfo( + "write_ethereum-testnet-sepolia@1.0.0", + capabilities.CapabilityTypeTarget, + "a write capability targeting ethereum sepolia testnet", + ), + func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { + m := req.Inputs.Underlying["report"].(*values.Map) + return capabilities.CapabilityResponse{ + Value: m, + }, nil + }, + ) + require.NoError(t, reg.Add(ctx, target2)) + + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{ + Global: 10, + PerOwner: 1, + }) + require.NoError(t, err) + + setWorkflowLimits := func(c *Config) { + c.WorkflowLimits = workflowLimits + } + + // we allow one workflow for this particular owner, so the second one should be rate limited + ownerAllow, globalAllow := workflowLimits.Allow(testWorkflowOwner) + require.True(t, ownerAllow) + require.True(t, globalAllow) + + eng, _ := newTestEngineWithYAMLSpec( + t, + reg, + hardcodedWorkflow, + setWorkflowLimits, + ) + + err = eng.Start(context.Background()) + require.Error(t, err) + assert.ErrorIs(t, err, errPerOwnerWorkflowCountLimitReached) + }) } func TestEngine_ErrorsTheWorkflowIfAStepErrors(t *testing.T) { @@ -1614,7 +1728,7 @@ func TestEngine_WithCustomComputeStep(t *testing.T) { nil, // config ) require.NoError(t, err) - eng, testHooks := newTestEngine( + eng, testHooks, err := newTestEngine( t, reg, *spec, @@ -1623,6 +1737,7 @@ func TestEngine_WithCustomComputeStep(t *testing.T) { c.Config = nil }, ) + require.NoError(t, err) reg.SetLocalRegistry(testConfigProvider{}) servicetest.Run(t, eng) @@ -1683,7 +1798,7 @@ func TestEngine_CustomComputePropagatesBreaks(t *testing.T) { nil, // config ) require.NoError(t, err) - eng, testHooks := newTestEngine( + eng, testHooks, err := newTestEngine( t, reg, *spec, @@ -1692,6 +1807,7 @@ func TestEngine_CustomComputePropagatesBreaks(t *testing.T) { c.Config = nil }, ) + require.NoError(t, err) reg.SetLocalRegistry(testConfigProvider{}) servicetest.Run(t, eng) diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index e0a136e7e9d..bffe3ec3275 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -25,6 +25,8 @@ type engineMetrics struct { workflowUnregisteredCounter metric.Int64Counter workflowExecutionRateLimitGlobalCounter metric.Int64Counter workflowExecutionRateLimitPerUserCounter metric.Int64Counter + workflowLimitGlobalCounter metric.Int64Counter + workflowLimitPerOwnerCounter metric.Int64Counter workflowExecutionLatencyGauge metric.Int64Gauge // ms workflowStepErrorCounter metric.Int64Counter workflowInitializationCounter metric.Int64Counter @@ -49,6 +51,16 @@ func initMonitoringResources() (em *engineMetrics, err error) { return nil, fmt.Errorf("failed to register execution rate limit per user counter: %w", err) } + em.workflowLimitGlobalCounter, err = beholder.GetMeter().Int64Counter("platform_engine_limit_global") + if err != nil { + return nil, fmt.Errorf("failed to register execution limit global counter: %w", err) + } + + em.workflowLimitPerOwnerCounter, err = beholder.GetMeter().Int64Counter("platform_engine_limit_perowner") + if err != nil { + return nil, fmt.Errorf("failed to register execution limit per owner counter: %w", err) + } + em.registerTriggerFailureCounter, err = beholder.GetMeter().Int64Counter("platform_engine_registertrigger_failures") if err != nil { return nil, fmt.Errorf("failed to register trigger failure counter: %w", err) @@ -201,6 +213,16 @@ func (c workflowsMetricLabeler) incrementWorkflowExecutionRateLimitPerUserCounte c.em.workflowExecutionRateLimitPerUserCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } +func (c workflowsMetricLabeler) incrementWorkflowLimitGlobalCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowLimitGlobalCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) incrementWorkflowLimitPerOwnerCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowLimitPerOwnerCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + func (c workflowsMetricLabeler) incrementRegisterTriggerFailureCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.registerTriggerFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index b5813740134..e263f76847b 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -28,6 +28,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -176,6 +177,7 @@ type eventHandler struct { encryptionKey workflowkey.Key engineFactory engineFactoryFn ratelimiter *ratelimiter.RateLimiter + workflowLimits *syncerlimiter.Limits } type Event interface { @@ -214,6 +216,7 @@ func NewEventHandler( clock clockwork.Clock, encryptionKey workflowkey.Key, ratelimiter *ratelimiter.RateLimiter, + workflowLimits *syncerlimiter.Limits, opts ...func(*eventHandler), ) *eventHandler { eh := &eventHandler{ @@ -230,9 +233,11 @@ func NewEventHandler( secretsFreshnessDuration: defaultSecretsFreshnessDuration, encryptionKey: encryptionKey, ratelimiter: ratelimiter, + workflowLimits: workflowLimits, } eh.engineFactory = eh.engineFactoryFn eh.limits.ApplyDefaults() + for _, o := range opts { o(eh) } @@ -517,6 +522,7 @@ func (h *eventHandler) workflowRegisteredEvent( BinaryURL: payload.BinaryURL, ConfigURL: payload.ConfigURL, } + if _, err = h.orm.UpsertWorkflowSpecWithSecrets(ctx, entry, payload.SecretsURL, hex.EncodeToString(urlHash), string(secrets)); err != nil { return fmt.Errorf("failed to upsert workflow spec with secrets: %w", err) } @@ -610,6 +616,7 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner str Binary: binary, SecretsFetcher: h, RateLimiter: h.ratelimiter, + WorkflowLimits: h.workflowLimits, } return workflows.NewEngine(ctx, cfg) } diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index e9b75e5a162..e1d4b61731f 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -26,6 +26,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" wfstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/mocks" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/smartcontractkit/chainlink/v2/core/utils/matches" @@ -88,6 +89,9 @@ func Test_Handler(t *testing.T) { ctx := testutils.Context(t) rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) + giveURL := "https://original-url.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -106,7 +110,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) err = h.Handle(ctx, giveEvent) require.NoError(t, err) }) @@ -116,13 +120,15 @@ func Test_Handler(t *testing.T) { ctx := testutils.Context(t) rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) giveEvent := WorkflowRegistryEvent{} fetcher := func(_ context.Context, _ string, _ uint32) ([]byte, error) { return []byte("contents"), nil } - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.Contains(t, err.Error(), "event type unsupported") @@ -133,8 +139,10 @@ func Test_Handler(t *testing.T) { ctx := testutils.Context(t) rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) - h := NewEventHandler(lggr, mockORM, nil, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) + h := NewEventHandler(lggr, mockORM, nil, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) giveURL := "https://original-url.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -158,6 +166,9 @@ func Test_Handler(t *testing.T) { ctx := testutils.Context(t) rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) + giveURL := "http://example.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -175,7 +186,7 @@ func Test_Handler(t *testing.T) { return nil, assert.AnError } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -186,6 +197,9 @@ func Test_Handler(t *testing.T) { ctx := testutils.Context(t) rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) + giveURL := "http://example.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -204,7 +218,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -515,14 +529,16 @@ func testRunningWorkflow(t *testing.T, tc testCase) { if tc.engineFactoryFn != nil { opts = append(opts, WithEngineFactoryFn(tc.engineFactoryFn)) } + store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) registry := capabilities.NewRegistry(lggr) registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) h := NewEventHandler(lggr, orm, fetcher, store, registry, emitter, clockwork.NewFakeClock(), - workflowkey.Key{}, rl, opts...) - + workflowkey.Key{}, rl, workflowLimits, opts...) tc.validationFn(t, ctx, event, h, wfOwner, "workflow-name", wfID) }) } @@ -572,6 +588,8 @@ func Test_workflowDeletedHandler(t *testing.T) { registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) h := NewEventHandler( lggr, orm, @@ -582,6 +600,7 @@ func Test_workflowDeletedHandler(t *testing.T) { clockwork.NewFakeClock(), workflowkey.Key{}, rl, + workflowLimits, WithEngineRegistry(er), ) err = h.workflowRegisteredEvent(ctx, active) @@ -649,6 +668,8 @@ func Test_workflowDeletedHandler(t *testing.T) { registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) h := NewEventHandler( lggr, orm, @@ -659,6 +680,7 @@ func Test_workflowDeletedHandler(t *testing.T) { clockwork.NewFakeClock(), workflowkey.Key{}, rl, + workflowLimits, WithEngineRegistry(er), ) @@ -731,6 +753,8 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) h := NewEventHandler( lggr, orm, @@ -741,6 +765,7 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { clockwork.NewFakeClock(), workflowkey.Key{}, rl, + workflowLimits, WithEngineRegistry(er), ) err = h.workflowRegisteredEvent(ctx, active) @@ -880,6 +905,8 @@ func Test_Handler_SecretsFor(t *testing.T) { } rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) h := NewEventHandler( lggr, orm, @@ -890,6 +917,7 @@ func Test_Handler_SecretsFor(t *testing.T) { clockwork.NewFakeClock(), encryptionKey, rl, + workflowLimits, ) gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID) @@ -945,6 +973,8 @@ func Test_Handler_SecretsFor_RefreshesSecrets(t *testing.T) { } rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) h := NewEventHandler( lggr, orm, @@ -955,6 +985,7 @@ func Test_Handler_SecretsFor_RefreshesSecrets(t *testing.T) { clockwork.NewFakeClock(), encryptionKey, rl, + workflowLimits, ) gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID) @@ -1011,6 +1042,8 @@ func Test_Handler_SecretsFor_RefreshLogic(t *testing.T) { clock := clockwork.NewFakeClock() rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) + require.NoError(t, err) h := NewEventHandler( lggr, orm, @@ -1021,6 +1054,7 @@ func Test_Handler_SecretsFor_RefreshLogic(t *testing.T) { clock, encryptionKey, rl, + workflowLimits, ) gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID) diff --git a/core/services/workflows/syncerlimiter/limiter.go b/core/services/workflows/syncerlimiter/limiter.go new file mode 100644 index 00000000000..647f828b58e --- /dev/null +++ b/core/services/workflows/syncerlimiter/limiter.go @@ -0,0 +1,72 @@ +package syncerlimiter + +import ( + "sync" +) + +const ( + defaultGlobal = 200 + defaultPerOwner = 200 +) + +type Limits struct { + global *int32 + perOwner map[string]*int32 + config Config + mu sync.Mutex +} + +type Config struct { + Global int32 `json:"global"` + PerOwner int32 `json:"perOwner"` +} + +func NewWorkflowLimits(config Config) (*Limits, error) { + if config.Global <= 0 || config.PerOwner <= 0 { + config.Global = defaultGlobal + config.PerOwner = defaultPerOwner + } + + return &Limits{ + global: new(int32), + perOwner: make(map[string]*int32), + config: config, + }, nil +} + +func (l *Limits) Allow(owner string) (ownerAllow bool, globalAllow bool) { + l.mu.Lock() + defer l.mu.Unlock() + ownerLimiter, ok := l.perOwner[owner] + if !ok { + l.perOwner[owner] = new(int32) + ownerLimiter = l.perOwner[owner] + } + + if *ownerLimiter < l.config.PerOwner { + ownerAllow = true + } + + if *l.global < l.config.Global { + globalAllow = true + } + + if ownerAllow && globalAllow { + *ownerLimiter++ + *l.global++ + } + + return ownerAllow, globalAllow +} + +func (l *Limits) Decrement(owner string) { + l.mu.Lock() + defer l.mu.Unlock() + ownerLimiter, ok := l.perOwner[owner] + if !ok || *ownerLimiter <= 0 { + return + } + + *ownerLimiter-- + *l.global-- +} diff --git a/core/services/workflows/syncerlimiter/limiter_test.go b/core/services/workflows/syncerlimiter/limiter_test.go new file mode 100644 index 00000000000..6c5165f4b15 --- /dev/null +++ b/core/services/workflows/syncerlimiter/limiter_test.go @@ -0,0 +1,54 @@ +package syncerlimiter_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" +) + +func TestWorkflowLimits(t *testing.T) { + t.Parallel() + + config := syncerlimiter.Config{ + Global: 3, + PerOwner: 1, + } + wsl, err := syncerlimiter.NewWorkflowLimits(config) + require.NoError(t, err) + allowOwner, allowGlobal := wsl.Allow("user1") + require.True(t, allowOwner && allowGlobal) + // Global 1/3, PerOwner 1/1 + + allowOwner, allowGlobal = wsl.Allow("user2") + require.True(t, allowOwner && allowGlobal) + // Global 2/3, PerOwner 1/1 + + allowOwner, allowGlobal = wsl.Allow("user1") + require.True(t, allowGlobal) + require.False(t, allowOwner) + // Global 2/3, PerOwner 1/1 exceeded + + allowOwner, allowGlobal = wsl.Allow("user3") + require.True(t, allowOwner && allowGlobal) + // Global 3/3, PerOwner 1/1 (one each user) + + allowOwner, allowGlobal = wsl.Allow("user2") + require.False(t, allowOwner) + require.False(t, allowGlobal) + // Global 3/3, PerOwner 1/1 Global and PerOwner exceeded + + wsl.Decrement("user2") + // Global 2/3, User2 PerOwner 0/1 + + allowOwner, allowGlobal = wsl.Allow("user2") + require.True(t, allowOwner && allowGlobal) + // Global 3/3, PerOwner 1/1 (one each user) + + wsl.Decrement("non-existent-user") + allowOwner, allowGlobal = wsl.Allow("non-existent-user") + require.True(t, allowOwner) + require.False(t, allowGlobal) + // Global 3/3, PerOwner 0/1 Global exceeded +} diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index c592e0e5fd5..0d4cb2c4794 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -307,3 +307,8 @@ InsecureConnection = false TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' + +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index 9888e2583fe..8b1161aed35 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -322,6 +322,11 @@ EmitterExportTimeout = '1s' Baz = 'test' Foo = 'bar' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' Enabled = false diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index 54d9527801f..7b6529bcedd 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -308,6 +308,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/core/web/resolver/testdata/config-multi-chain.toml b/core/web/resolver/testdata/config-multi-chain.toml index a3776cbc981..484b0ccdbd7 100644 --- a/core/web/resolver/testdata/config-multi-chain.toml +++ b/core/web/resolver/testdata/config-multi-chain.toml @@ -35,6 +35,11 @@ GasPriceBufferPercent = 10 [AutoPprof] CPUProfileRate = 7 +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' FinalityDepth = 26 diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 8657f1d6328..a08813e26a4 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1302,6 +1302,32 @@ MaxConfigSize = '50.00kb' # Default ``` MaxConfigSize is the maximum size of a config that can be fetched from the given config url. +## Workflows +```toml +[Workflows] +``` + + +## Workflows.Limits +```toml +[Workflows.Limits] +Global = 200 # Default +PerOwner = 200 # Default +``` + + +### Global +```toml +Global = 200 # Default +``` +Global is the maximum number of workflows that can be registered globally. + +### PerOwner +```toml +PerOwner = 200 # Default +``` +PerOwner is the maximum number of workflows that can be registered per owner. + ## Capabilities.ExternalRegistry ```toml [Capabilities.ExternalRegistry] diff --git a/testdata/scripts/config/merge_raw_configs.txtar b/testdata/scripts/config/merge_raw_configs.txtar index 629c900062f..b82ca060fed 100644 --- a/testdata/scripts/config/merge_raw_configs.txtar +++ b/testdata/scripts/config/merge_raw_configs.txtar @@ -455,6 +455,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[Aptos]] ChainID = '1' Enabled = false diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index 0fac0605378..de892dfb329 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -320,6 +320,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + Invalid configuration: invalid secrets: 2 errors: - Database.URL: empty: must be provided and non-empty - Password.Keystore: empty: must be provided and non-empty diff --git a/testdata/scripts/node/validate/defaults-override.txtar b/testdata/scripts/node/validate/defaults-override.txtar index a8f6e49d7ad..d43ca12fc1a 100644 --- a/testdata/scripts/node/validate/defaults-override.txtar +++ b/testdata/scripts/node/validate/defaults-override.txtar @@ -381,6 +381,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index 1710fcbda3c..1f033ef877b 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -364,6 +364,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index c7130459520..7d4d65933b6 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -364,6 +364,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index 38b1a862264..0ef0f48e368 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -364,6 +364,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/fallback-override.txtar b/testdata/scripts/node/validate/fallback-override.txtar index 5d2b722a524..4b74b353d79 100644 --- a/testdata/scripts/node/validate/fallback-override.txtar +++ b/testdata/scripts/node/validate/fallback-override.txtar @@ -458,6 +458,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index 8f88cce9eed..a8bb73d20be 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -349,6 +349,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + Invalid configuration: invalid configuration: P2P.V2.Enabled: invalid value (false): P2P required for OCR or OCR2. Please enable P2P or disable OCR/OCR2. -- err.txt -- diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 87628948b54..f339874ab69 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -354,6 +354,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index 4695a4ac99b..f0e706731d7 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -361,6 +361,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index ce0a094de79..9bed6b84eca 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -343,6 +343,11 @@ TraceSampleRatio = 0.01 EmitterBatchProcessor = true EmitterExportTimeout = '1s' +[Workflows] +[Workflows.Limits] +Global = 200 +PerOwner = 200 + # Configuration warning: Tracing.TLSCertPath: invalid value (something): must be empty when Tracing.Mode is 'unencrypted' Valid configuration.