Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAPPL-399] Syncer limits #16234

Merged
merged 18 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/config/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type AppConfig interface {
AuditLogger() AuditLogger
AutoPprof() AutoPprof
Capabilities() Capabilities
Workflows() Workflows
Database() Database
Feature() Feature
FluxMonitor() FluxMonitor
Expand Down
7 changes: 7 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,13 @@ MaxEncryptedSecretsSize = '26.40kb' # Default
# MaxConfigSize is the maximum size of a config that can be fetched from the given config url.
MaxConfigSize = '50.00kb' # Default

[Workflows]
[Workflows.Limits]
# Global is the maximum number of workflows that can be registered globally.
Global = 200 # Default
# PerOwner is the maximum number of workflows that can be registered per owner.
PerOwner = 200 # Default

[Capabilities.ExternalRegistry]
# Address is the address for the capabilities registry contract.
Address = '0x0' # Example
Expand Down
25 changes: 25 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Core struct {
Mercury Mercury `toml:",omitempty"`
Capabilities Capabilities `toml:",omitempty"`
Telemetry Telemetry `toml:",omitempty"`
Workflows Workflows `toml:",omitempty"`
}

// SetFrom updates c with any non-nil values from f. (currently TOML field only!)
Expand Down Expand Up @@ -87,6 +88,7 @@ func (c *Core) SetFrom(f *Core) {
c.Keeper.setFrom(&f.Keeper)
c.Mercury.setFrom(&f.Mercury)
c.Capabilities.setFrom(&f.Capabilities)
c.Workflows.setFrom(&f.Workflows)

c.AutoPprof.setFrom(&f.AutoPprof)
c.Pyroscope.setFrom(&f.Pyroscope)
Expand Down Expand Up @@ -1487,6 +1489,29 @@ func (r *ExternalRegistry) setFrom(f *ExternalRegistry) {
}
}

type Workflows struct {
Limits Limits
}

type Limits struct {
Global *int32
PerOwner *int32
}

func (r *Workflows) setFrom(f *Workflows) {
r.Limits.setFrom(&f.Limits)
}

func (r *Limits) setFrom(f *Limits) {
if f.Global != nil {
r.Global = f.Global
}

if f.PerOwner != nil {
r.PerOwner = f.PerOwner
}
}

type WorkflowRegistry struct {
Address *string
NetworkID *string
Expand Down
10 changes: 10 additions & 0 deletions core/config/workflows_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package config

type Workflows interface {
Limits() WorkflowsLimits
}

type WorkflowsLimits interface {
Global() int32
PerOwner() int32
}
20 changes: 20 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter"
workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth"
"github.com/smartcontractkit/chainlink/v2/core/sessions/localauth"
Expand Down Expand Up @@ -284,6 +285,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
DS: opts.DS,
CREOpts: opts.CREOpts,
capabilityCfg: cfg.Capabilities(),
workflowsCfg: cfg.Workflows(),
logger: globalLogger,
relayerChainInterops: relayerChainInterops,
keystore: keyStore,
Expand Down Expand Up @@ -488,6 +490,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
opts.CapabilitiesRegistry,
workflowORM,
creServices.workflowRateLimiter,
creServices.workflowLimits,
)

// Flux monitor requires ethereum just to boot, silence errors with a null delegate
Expand Down Expand Up @@ -707,6 +710,7 @@ type creServiceConfig struct {
CREOpts

capabilityCfg config.Capabilities
workflowsCfg config.Workflows
keystore creKeystore
logger logger.Logger
relayerChainInterops *CoreRelayerChainInteroperators
Expand All @@ -717,6 +721,11 @@ type CREServices struct {
// workflowRateLimiter is the rate limiter for workflows
// it is exposed because there are contingent services in the application
workflowRateLimiter *ratelimiter.RateLimiter

// workflowLimits is the syncer limiter for workflows
// it will specify the amount of global an per owner workflows that can be registered
workflowLimits *syncerlimiter.Limits

// gatewayConnectorWrapper is the wrapper for the gateway connector
// it is exposed because there are contingent services in the application
gatewayConnectorWrapper *gatewayconnector.ServiceWrapper
Expand All @@ -727,6 +736,7 @@ type CREServices struct {
func newCREServices(cscfg creServiceConfig) (*CREServices, error) {
var (
capCfg = cscfg.capabilityCfg
wCfg = cscfg.workflowsCfg
globalLogger = cscfg.logger
keyStore = cscfg.keystore
relayerChainInterops = cscfg.relayerChainInterops
Expand All @@ -744,6 +754,14 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) {
return nil, fmt.Errorf("could not instantiate workflow rate limiter: %w", err)
}

workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{
Global: wCfg.Limits().Global(),
PerOwner: wCfg.Limits().PerOwner(),
})
if err != nil {
return nil, fmt.Errorf("could not instantiate workflow syncer limiter: %w", err)
}

var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper
if capCfg.GatewayConnector().DonID() != "" {
globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", capCfg.GatewayConnector().DonID())
Expand Down Expand Up @@ -849,6 +867,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) {
clockwork.NewRealClock(),
keys[0],
workflowRateLimiter,
workflowLimits,
syncer.WithMaxArtifactSize(
syncer.ArtifactConfig{
MaxBinarySize: uint64(capCfg.WorkflowRegistry().MaxBinarySize()),
Expand Down Expand Up @@ -886,6 +905,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) {
}
return &CREServices{
workflowRateLimiter: workflowRateLimiter,
workflowLimits: workflowLimits,
gatewayConnectorWrapper: gatewayConnectorWrapper,
srvs: srvcs,
}, nil
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,10 @@ func (g *generalConfig) Capabilities() config.Capabilities {
return &capabilitiesConfig{c: g.c.Capabilities}
}

func (g *generalConfig) Workflows() config.Workflows {
return &workflowsConfig{c: g.c.Workflows}
}

func (g *generalConfig) Database() coreconfig.Database {
return &databaseConfig{c: g.c.Database, s: g.secrets.Secrets.Database, logSQL: g.logSQL}
}
Expand Down
14 changes: 14 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ var (
AutoPprof: toml.AutoPprof{
CPUProfileRate: ptr[int64](7),
},
Workflows: toml.Workflows{
Limits: toml.Limits{
Global: ptr(int32(200)),
PerOwner: ptr(int32(200)),
},
},
},
EVM: []*evmcfg.EVMConfig{
{
Expand Down Expand Up @@ -448,6 +454,7 @@ func TestConfig_Marshal(t *testing.T) {
PerSenderRPS: ptr(100.0),
PerSenderBurst: ptr(100),
},

Peering: toml.P2P{
IncomingMessageBufferSize: ptr[int64](13),
OutgoingMessageBufferSize: ptr[int64](17),
Expand Down Expand Up @@ -500,6 +507,12 @@ func TestConfig_Marshal(t *testing.T) {
},
},
}
full.Workflows = toml.Workflows{
Limits: toml.Limits{
Global: ptr(int32(200)),
PerOwner: ptr(int32(200)),
},
}
full.Keeper = toml.Keeper{
DefaultTransactionQueueDepth: ptr[uint32](17),
GasPriceBufferPercent: ptr[uint16](12),
Expand Down Expand Up @@ -1691,6 +1704,7 @@ func TestConfig_setDefaults(t *testing.T) {
c.Solana = solcfg.TOMLConfigs{{ChainID: ptr("unknown solana chain")}}
c.Starknet = RawConfigs{{"ChainID": ptr("unknown starknet chain")}}
c.setDefaults()

if s, err := c.TOMLString(); assert.NoError(t, err) {
t.Log(s, err)
}
Expand Down
30 changes: 30 additions & 0 deletions core/services/chainlink/config_workflows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package chainlink

import (
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/config/toml"
)

var _ config.Workflows = (*workflowsConfig)(nil)

type workflowsConfig struct {
c toml.Workflows
}

func (w *workflowsConfig) Limits() config.WorkflowsLimits {
return &limits{
l: w.c.Limits,
}
}

type limits struct {
l toml.Limits
}

func (l *limits) Global() int32 {
return *l.l.Global
}

func (l *limits) PerOwner() int32 {
return *l.l.PerOwner
}
20 changes: 20 additions & 0 deletions core/services/chainlink/config_workflows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package chainlink

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWorkflowsConfig(t *testing.T) {
opts := GeneralConfigOpts{
ConfigStrings: []string{fullTOML},
}
cfg, err := opts.New()
require.NoError(t, err)

w := cfg.Workflows()
assert.Equal(t, int32(200), w.Limits().Global())
assert.Equal(t, int32(200), w.Limits().PerOwner())
}
47 changes: 47 additions & 0 deletions core/services/chainlink/mocks/general_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,8 @@ InsecureConnection = false
TraceSampleRatio = 0.01
EmitterBatchProcessor = true
EmitterExportTimeout = '1s'

[Workflows]
[Workflows.Limits]
Global = 200
PerOwner = 200
5 changes: 5 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ EmitterExportTimeout = '1s'
Baz = 'test'
Foo = 'bar'

[Workflows]
[Workflows.Limits]
Global = 200
PerOwner = 200

[[EVM]]
ChainID = '1'
Enabled = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ TraceSampleRatio = 0.01
EmitterBatchProcessor = true
EmitterExportTimeout = '1s'

[Workflows]
[Workflows.Limits]
Global = 200
PerOwner = 200

[[EVM]]
ChainID = '1'
AutoCreateKey = true
Expand Down
5 changes: 5 additions & 0 deletions core/services/chainlink/testdata/config-multi-chain.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ GasPriceBufferPercent = 10
[AutoPprof]
CPUProfileRate = 7

[Workflows]
[Workflows.Limits]
Global = 200
PerOwner = 200

[[EVM]]
ChainID = '1'
FinalityDepth = 26
Expand Down
Loading
Loading