diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 801c66ca7c5..aa1ade4f319 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -76,6 +76,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/webhook" "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" @@ -490,6 +491,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { opts.CapabilitiesRegistry, workflowORM, creServices.workflowRateLimiter, + creServices.engineRegistry, creServices.workflowLimits, ) @@ -729,6 +731,10 @@ type CREServices struct { // gatewayConnectorWrapper is the wrapper for the gateway connector // it is exposed because there are contingent services in the application gatewayConnectorWrapper *gatewayconnector.ServiceWrapper + // engineRegistry is exposed so that both the delegate + // and syncer paths share an underlying store of Engine + // instances + engineRegistry *registry.EngineRegistry // srvs are all the services that are created, including those that are explicitly exposed srvs []services.ServiceCtx } @@ -743,6 +749,9 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { opts = cscfg.CREOpts ds = cscfg.DS ) + // engineRegistry is consumed by the workflow job delegate and by the handler + // for the workflow registry for a unified backend store of engine instances + engineRegistry := registry.NewEngineRegistry() var srvcs []services.ServiceCtx workflowRateLimiter, err := ratelimiter.NewRateLimiter(ratelimiter.Config{ GlobalRPS: capCfg.RateLimit().GlobalRPS(), @@ -863,6 +872,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { fetcherFunc, workflowstore.NewDBStore(ds, lggr, clockwork.NewRealClock()), opts.CapabilitiesRegistry, + engineRegistry, custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0], @@ -907,6 +917,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { workflowRateLimiter: workflowRateLimiter, workflowLimits: workflowLimits, gatewayConnectorWrapper: gatewayConnectorWrapper, + engineRegistry: engineRegistry, srvs: srvcs, }, nil } diff --git a/core/services/job/spawner.go b/core/services/job/spawner.go index 0727333d7ee..8042b8384c8 100644 --- a/core/services/job/spawner.go +++ b/core/services/job/spawner.go @@ -56,7 +56,10 @@ type ( lbDependentAwaiters []utils.DependentAwaiter } - // TODO(spook): I can't wait for Go generics + // Delegate allows for multiple types of jobs and their related + // services + subservices to be managed by a Spawner. + // Errors should not be silenced; partial creation or + // deletion states should be bubbled out to be handled by the Spawner. Delegate interface { JobType() Type // BeforeJobCreated is only called once on first time job create. @@ -73,6 +76,7 @@ type ( // non-db side effects. This is required in order to guarantee mutual atomicity between // all tasks intended to happen during job deletion. For the same reason, the job will // not show up in the db within OnDeleteJob(), even though it is still actively running. + // A failure in OnDeleteJob will cause a total rollback of the delete operation. OnDeleteJob(ctx context.Context, jb Job) error } diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 526fd9db7de..5f1cb51d634 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -32,6 +32,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils" "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" @@ -366,7 +367,7 @@ func Test_SecretsWorker(t *testing.T) { handler := &testSecretsWorkEventHandler{ wrappedHandler: syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, - emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, wl), + registry.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, wl), registeredCh: make(chan syncer.Event, 1), } @@ -522,7 +523,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) { require.NoError(t, err) giveWorkflow.ID = id - er := syncer.NewEngineRegistry() + er := registry.NewEngineRegistry() rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) @@ -530,7 +531,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) { require.NoError(t, err) handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, - emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, wl, syncer.WithEngineRegistry(er)) + registry.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, wl) worker := syncer.NewWorkflowRegistry( lggr, @@ -626,7 +627,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) { giveWorkflow.ID = id mf := &mockEngineFactory{} - er := syncer.NewEngineRegistry() + er := registry.NewEngineRegistry() rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) wl, err := syncerlimiter.NewWorkflowLimits(wlConfig) @@ -637,12 +638,12 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) { fetcherFn, nil, nil, + er, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, wl, - syncer.WithEngineRegistry(er), syncer.WithEngineFactoryFn(mf.new), ) diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 7266376b18c..0ea948a8999 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -8,8 +8,8 @@ import ( "github.com/pelletier/go-toml" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" - "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/platform" @@ -25,6 +25,8 @@ type Delegate struct { logger logger.Logger store store.Store ratelimiter *ratelimiter.RateLimiter + engineRegistry *registry.EngineRegistry + metrics WorkflowMetricLabeler workflowLimits *syncerlimiter.Limits } @@ -40,11 +42,21 @@ func (d *Delegate) AfterJobCreated(jb job.Job) {} func (d *Delegate) BeforeJobDeleted(spec job.Job) {} -func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil } +// OnDeleteJob removes the engine instance from the registry. +// Engine instance will be closed in the job spawner. +func (d *Delegate) OnDeleteJob(ctx context.Context, spec job.Job) error { + _, err := d.engineRegistry.Pop(spec.WorkflowSpec.WorkflowID) + if err != nil { + return fmt.Errorf("delegate failed to unregister workflow engine for workflow name: %s id: %s: %v", spec.WorkflowSpec.WorkflowName, spec.WorkflowSpec.WorkflowName, err) + } + d.metrics.UpdateTotalWorkflowsGauge(ctx, int64(d.engineRegistry.Size())) + return nil +} // ServicesForSpec satisfies the job.Delegate interface. func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) { cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, spec.WorkflowSpec.WorkflowID, platform.KeyWorkflowOwner, spec.WorkflowSpec.WorkflowOwner, platform.KeyWorkflowName, spec.WorkflowSpec.WorkflowName) + sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx) if err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger) @@ -83,6 +95,12 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser if err != nil { return nil, err } + + err = d.engineRegistry.Add(spec.WorkflowSpec.WorkflowID, engine) + if err != nil { + return nil, fmt.Errorf("delegate failed to register workflow engine for workflow name: %s id: %s: %v", cfg.WorkflowName.String(), cfg.WorkflowID, err) + } + d.metrics.UpdateTotalWorkflowsGauge(ctx, int64(d.engineRegistry.Size())) d.logger.Infow("Creating Workflow Engine for workflow spec", "workflowID", spec.WorkflowSpec.WorkflowID, "workflowOwner", spec.WorkflowSpec.WorkflowOwner, "workflowName", spec.WorkflowSpec.WorkflowName, "jobName", spec.Name) return []job.ServiceCtx{engine}, nil } @@ -102,14 +120,22 @@ func NewDelegate( registry core.CapabilitiesRegistry, store store.Store, ratelimiter *ratelimiter.RateLimiter, + engineRegistry *registry.EngineRegistry, workflowLimits *syncerlimiter.Limits, ) *Delegate { + metrics, err := InitWorkflowMonitoringResources() + if err != nil { + logger.Criticalw("Failed to initialize workflow monitoring resources", "err", err) + } + return &Delegate{ logger: logger, registry: registry, + engineRegistry: engineRegistry, secretsFetcher: newNoopSecretsFetcher(), store: store, ratelimiter: ratelimiter, + metrics: *metrics, workflowLimits: workflowLimits, } } diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 7b9741d9592..6d53d3906a3 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -13,7 +13,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" - "github.com/smartcontractkit/chainlink-common/pkg/metrics" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/values" @@ -110,7 +109,7 @@ type secretsFetcher interface { type Engine struct { services.StateMachine cma custmsg.MessageEmitter - metrics workflowsMetricLabeler + metrics engineMetricLabeler logger logger.Logger registry core.CapabilitiesRegistry workflow *workflow @@ -1174,7 +1173,6 @@ func (e *Engine) heartbeat(ctx context.Context) { return case <-ticker.C: e.metrics.incrementEngineHeartbeatCounter(ctx) - e.metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len()) logCustMsg(ctx, e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger) } } @@ -1292,7 +1290,7 @@ const ( defaultQueueSize = 100000 defaultNewWorkerTimeout = 2 * time.Second defaultMaxExecutionDuration = 10 * time.Minute - defaultHeartbeatCadence = 5 * time.Minute + defaultHeartbeatCadence = 1 * time.Minute defaultStepTimeout = 2 * time.Minute ) @@ -1374,7 +1372,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { // - etc. // spin up monitoring resources - em, err := initMonitoringResources() + engineMetrics, err := initEngineMonitoringResources() if err != nil { return nil, fmt.Errorf("could not initialize monitoring resources: %w", err) } @@ -1392,8 +1390,8 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { engine = &Engine{ cma: cma, - logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), - metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()), *em}, + logger: cfg.Lggr.Named("WorkflowEngine").With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()), + metrics: engineMetrics.with(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()), registry: cfg.Registry, workflow: workflow, secretsFetcher: cfg.SecretsFetcher, diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index db6ffe29296..65e65518caa 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -13,21 +13,55 @@ import ( monutils "github.com/smartcontractkit/chainlink/v2/core/monitoring" ) +// wm AKA "workflow metrics" is to locally scope these instruments to avoid +// data races in testing +type workflowMetrics struct { + totalWorkflows metric.Int64Gauge +} + +// InitWorkflowMonitoringResources is exported to allow the syncer to report on +// totalWorkflows, as well as the delegate in this package +func InitWorkflowMonitoringResources() (l *WorkflowMetricLabeler, err error) { + wm := workflowMetrics{} + + wm.totalWorkflows, err = beholder.GetMeter().Int64Gauge("platform_workflow_engines_total") + if err != nil { + return nil, fmt.Errorf("error initializing total workflows: %w", err) + } + + l = &WorkflowMetricLabeler{ + metrics.NewLabeler(), + wm, + } + return l, nil +} + +// WorkflowMetricLabeler wraps a Labeler to provide workflow specific utilities +// for monitoring resources +type WorkflowMetricLabeler struct { + metrics.Labeler + wm workflowMetrics +} + +func (l WorkflowMetricLabeler) UpdateTotalWorkflowsGauge(ctx context.Context, val int64) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.wm.totalWorkflows.Record(ctx, val, metric.WithAttributes(otelLabels...)) +} + // em AKA "engine metrics" is to locally scope these instruments to avoid // data races in testing type engineMetrics struct { - registerTriggerFailureCounter metric.Int64Counter - triggerWorkflowStarterErrorCounter metric.Int64Counter - workflowsRunningGauge metric.Int64Gauge - capabilityInvocationCounter metric.Int64Counter - capabilityFailureCounter metric.Int64Counter - workflowRegisteredCounter metric.Int64Counter + registerTriggerFailureCounter metric.Int64Counter + triggerWorkflowStarterErrorCounter metric.Int64Counter + capabilityInvocationCounter metric.Int64Counter + capabilityFailureCounter metric.Int64Counter + workflowRegisteredCounter metric.Int64Counter + // workflowUnregisteredCounter can also be viewed as workflow deletions workflowUnregisteredCounter metric.Int64Counter workflowExecutionRateLimitGlobalCounter metric.Int64Counter workflowExecutionRateLimitPerUserCounter metric.Int64Counter workflowLimitGlobalCounter metric.Int64Counter workflowLimitPerOwnerCounter metric.Int64Counter - workflowExecutionLatencyGauge metric.Int64Gauge // ms workflowStepErrorCounter metric.Int64Counter workflowInitializationCounter metric.Int64Counter engineHeartbeatCounter metric.Int64Counter @@ -38,8 +72,8 @@ type engineMetrics struct { workflowStepDurationSeconds metric.Int64Histogram } -func initMonitoringResources() (em *engineMetrics, err error) { - em = &engineMetrics{} +func initEngineMonitoringResources() (m *engineMetricLabeler, err error) { + em := engineMetrics{} em.workflowExecutionRateLimitGlobalCounter, err = beholder.GetMeter().Int64Counter("platform_engine_execution_ratelimit_global") if err != nil { @@ -71,11 +105,6 @@ func initMonitoringResources() (em *engineMetrics, err error) { return nil, fmt.Errorf("failed to register trigger workflow starter error counter: %w", err) } - em.workflowsRunningGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_count") - if err != nil { - return nil, fmt.Errorf("failed to register workflows running gauge: %w", err) - } - em.capabilityInvocationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_capabilities_count") if err != nil { return nil, fmt.Errorf("failed to register capability invocation counter: %w", err) @@ -96,13 +125,6 @@ func initMonitoringResources() (em *engineMetrics, err error) { return nil, fmt.Errorf("failed to register workflow unregistered counter: %w", err) } - em.workflowExecutionLatencyGauge, err = beholder.GetMeter().Int64Gauge( - "platform_engine_workflow_time", - metric.WithUnit("ms")) - if err != nil { - return nil, fmt.Errorf("failed to register workflow execution latency gauge: %w", err) - } - em.workflowInitializationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_initializations") if err != nil { return nil, fmt.Errorf("failed to register workflow initialization counter: %w", err) @@ -158,7 +180,7 @@ func initMonitoringResources() (em *engineMetrics, err error) { return nil, fmt.Errorf("failed to register step execution time histogram: %w", err) } - return em, nil + return &engineMetricLabeler{metrics.NewLabeler(), em}, nil } // Note: due to the OTEL specification, all histogram buckets @@ -193,113 +215,103 @@ func MetricViews() []sdkmetric.View { } } -// workflowsMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities +// engineMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities // for monitoring resources -type workflowsMetricLabeler struct { +type engineMetricLabeler struct { metrics.Labeler em engineMetrics } -func (c workflowsMetricLabeler) with(keyValues ...string) workflowsMetricLabeler { - return workflowsMetricLabeler{c.With(keyValues...), c.em} +func (c engineMetricLabeler) with(keyValues ...string) engineMetricLabeler { + return engineMetricLabeler{c.With(keyValues...), c.em} } -func (c workflowsMetricLabeler) incrementWorkflowExecutionRateLimitGlobalCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowExecutionRateLimitGlobalCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowExecutionRateLimitGlobalCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementWorkflowExecutionRateLimitPerUserCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowExecutionRateLimitPerUserCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowExecutionRateLimitPerUserCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementWorkflowLimitGlobalCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementRegisterTriggerFailureCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) - c.em.workflowLimitGlobalCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + c.em.registerTriggerFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementWorkflowLimitPerOwnerCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowLimitGlobalCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) - c.em.workflowLimitPerOwnerCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + c.em.workflowLimitGlobalCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementRegisterTriggerFailureCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowLimitPerOwnerCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) - c.em.registerTriggerFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + c.em.workflowLimitPerOwnerCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementTriggerWorkflowStarterErrorCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementTriggerWorkflowStarterErrorCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.triggerWorkflowStarterErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementCapabilityInvocationCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementCapabilityInvocationCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.capabilityInvocationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowExecutionLatencyGauge(ctx context.Context, val int64) { - otelLabels := monutils.KvMapToOtelAttributes(c.Labels) - c.em.workflowExecutionLatencyGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) -} - -func (c workflowsMetricLabeler) incrementTotalWorkflowStepErrorsCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementTotalWorkflowStepErrorsCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowStepErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateTotalWorkflowsGauge(ctx context.Context, val int64) { - otelLabels := monutils.KvMapToOtelAttributes(c.Labels) - c.em.workflowsRunningGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) -} - -func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementCapabilityFailureCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementCapabilityFailureCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.capabilityFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementWorkflowRegisteredCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowRegisteredCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowRegisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementWorkflowUnregisteredCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowUnregisteredCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowUnregisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementWorkflowInitializationCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowInitializationCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowInitializationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowCompletedDurationHistogram(ctx context.Context, duration int64) { +func (c engineMetricLabeler) updateWorkflowCompletedDurationHistogram(ctx context.Context, duration int64) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowCompletedDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowEarlyExitDurationHistogram(ctx context.Context, duration int64) { +func (c engineMetricLabeler) updateWorkflowEarlyExitDurationHistogram(ctx context.Context, duration int64) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowEarlyExitDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowErrorDurationHistogram(ctx context.Context, duration int64) { +func (c engineMetricLabeler) updateWorkflowErrorDurationHistogram(ctx context.Context, duration int64) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowErrorDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowTimeoutDurationHistogram(ctx context.Context, duration int64) { +func (c engineMetricLabeler) updateWorkflowTimeoutDurationHistogram(ctx context.Context, duration int64) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowTimeoutDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowStepDurationHistogram(ctx context.Context, duration int64) { +func (c engineMetricLabeler) updateWorkflowStepDurationHistogram(ctx context.Context, duration int64) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowStepDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } diff --git a/core/services/workflows/monitoring_test.go b/core/services/workflows/monitoring_test.go index 5b7177e51dc..536a02ca4ba 100644 --- a/core/services/workflows/monitoring_test.go +++ b/core/services/workflows/monitoring_test.go @@ -9,12 +9,12 @@ import ( ) func Test_InitMonitoringResources(t *testing.T) { - _, err := initMonitoringResources() + _, err := initEngineMonitoringResources() require.NoError(t, err) } func Test_WorkflowMetricsLabeler(t *testing.T) { - testWorkflowsMetricLabeler := workflowsMetricLabeler{metrics.NewLabeler(), engineMetrics{}} + testWorkflowsMetricLabeler := engineMetricLabeler{metrics.NewLabeler(), engineMetrics{}} testWorkflowsMetricLabeler2 := testWorkflowsMetricLabeler.with("foo", "baz") require.EqualValues(t, testWorkflowsMetricLabeler2.Labels["foo"], "baz") } diff --git a/core/services/workflows/syncer/engine_registry.go b/core/services/workflows/registry/engine_registry.go similarity index 91% rename from core/services/workflows/syncer/engine_registry.go rename to core/services/workflows/registry/engine_registry.go index ecea3b98c2f..7625f92583d 100644 --- a/core/services/workflows/syncer/engine_registry.go +++ b/core/services/workflows/registry/engine_registry.go @@ -1,4 +1,4 @@ -package syncer +package registry import ( "errors" @@ -70,11 +70,15 @@ func (r *EngineRegistry) Close() error { defer r.mu.Unlock() var err error for id, engine := range r.engines { - closeErr := engine.Close() - if closeErr != nil { - err = errors.Join(err, closeErr) - } + err = errors.Join(err, engine.Close()) delete(r.engines, id) } return err } + +func (r *EngineRegistry) Size() int { + r.mu.RLock() + defer r.mu.RUnlock() + + return len(r.engines) +} diff --git a/core/services/workflows/registry/engine_registry_test.go b/core/services/workflows/registry/engine_registry_test.go new file mode 100644 index 00000000000..d67407cba60 --- /dev/null +++ b/core/services/workflows/registry/engine_registry_test.go @@ -0,0 +1,165 @@ +package registry_test + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" +) + +// mockService implements the services.Service interface for testing. +type mockService struct { + mock.Mock +} + +func (m *mockService) Start(ctx context.Context) error { + return nil +} + +func (m *mockService) HealthReport() map[string]error { + return nil +} + +func (m *mockService) Name() string { + return "MockEngine" +} + +func (m *mockService) Ready() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockService) Close() error { + args := m.Called() + return args.Error(0) +} + +func TestEngineRegistry_AddAndGet(t *testing.T) { + registry := registry.NewEngineRegistry() + + svc := mockService{} + svc.On("Ready").Return(nil) + + // Add the service + err := registry.Add("engine1", &svc) + require.NoError(t, err) + + // Try adding the same ID again; expect an error + err = registry.Add("engine1", &svc) + require.Error(t, err) + require.Contains(t, err.Error(), "duplicate") + + // Get the service + gotSvc, err := registry.Get("engine1") + require.NoError(t, err) + require.Equal(t, svc.Name(), gotSvc.Name()) + + // Try getting a non-existent engine + _, err = registry.Get("non-existent") + require.Error(t, err) + require.Contains(t, err.Error(), "not found") +} + +func TestEngineRegistry_IsRunning(t *testing.T) { + registry := registry.NewEngineRegistry() + + runningSvc := &mockService{} + runningSvc.On("Ready").Return(nil) + + notRunningSvc := &mockService{} + notRunningSvc.On("Ready").Return(errors.New("not ready")) + + _ = registry.Add("running", runningSvc) + _ = registry.Add("notRunning", notRunningSvc) + + require.True(t, registry.IsRunning("running"), "expected running to be true if Ready() == nil") + require.False(t, registry.IsRunning("notRunning"), "expected running to be false if Ready() != nil") + require.False(t, registry.IsRunning("non-existent"), "expected false for non-existent engine") +} + +func TestEngineRegistry_Pop(t *testing.T) { + registry := registry.NewEngineRegistry() + + svc := &mockService{} + err := registry.Add("id1", svc) + require.NoError(t, err) + + // Pop should remove and return the service + poppedSvc, err := registry.Pop("id1") + require.NoError(t, err) + require.Equal(t, svc.Name(), poppedSvc.Name()) + + // After popping, engine should not exist in registry + _, err = registry.Get("id1") + require.Error(t, err) + + // Popping a non-existent engine + _, err = registry.Pop("non-existent") + require.Error(t, err) +} + +func TestEngineRegistry_Close(t *testing.T) { + registry := registry.NewEngineRegistry() + + // Set up multiple services to test aggregated errors + svc1 := &mockService{} + svc2 := &mockService{} + svc3 := &mockService{} + + // We want to track Close calls and possibly return different errors + svc1.On("Close").Return(nil).Once() + svc2.On("Close").Return(errors.New("close error on svc2")).Once() + svc3.On("Close").Return(errors.New("close error on svc3")).Once() + + _ = registry.Add("svc1", svc1) + _ = registry.Add("svc2", svc2) + _ = registry.Add("svc3", svc3) + + // Call Close + err := registry.Close() + // We expect both errors from svc2 and svc3 to be joined + require.Error(t, err, "expected an aggregated error from multiple closes") + // Check that the error message contains both parts + require.True(t, strings.Contains(err.Error(), "svc2") && strings.Contains(err.Error(), "svc3"), + "expected error to contain all relevant messages") + + // After Close, registry should be empty + require.Equal(t, 0, registry.Size(), "registry should be empty after Close") + + // Additional Close calls should not error because registry is empty + err = registry.Close() + require.NoError(t, err, "expected no error on subsequent Close calls when registry is empty") + + // Ensure the mock expectations have been met (all calls accounted for) + svc1.AssertExpectations(t) + svc2.AssertExpectations(t) + svc3.AssertExpectations(t) +} + +func TestEngineRegistry_Size(t *testing.T) { + // Set up multiple services to test aggregated errors + svc1 := &mockService{} + svc2 := &mockService{} + + // We want to track Close calls and possibly return different errors + svc1.On("Close").Return(nil) + svc2.On("Close").Return(nil) + + registry := registry.NewEngineRegistry() + require.Equal(t, 0, registry.Size(), "initial registry should have size 0") + + _ = registry.Add("id1", svc1) + _ = registry.Add("id2", svc2) + require.Equal(t, 2, registry.Size()) + + _, _ = registry.Pop("id1") + require.Equal(t, 1, registry.Size()) + + _ = registry.Close() + require.Equal(t, 0, registry.Size()) +} diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index e263f76847b..403552ff961 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -27,6 +27,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -169,7 +170,9 @@ type eventHandler struct { limits *ArtifactConfig workflowStore store.Store capRegistry core.CapabilitiesRegistry - engineRegistry *EngineRegistry + engineRegistry *registry.EngineRegistry + registryMetrics workflowRegistryMetricsLabeler + workflowMetrics workflows.WorkflowMetricLabeler emitter custmsg.MessageEmitter lastFetchedAtMap *lastFetchedAtMap clock clockwork.Clock @@ -187,12 +190,6 @@ type Event interface { var defaultSecretsFreshnessDuration = 24 * time.Hour -func WithEngineRegistry(er *EngineRegistry) func(*eventHandler) { - return func(e *eventHandler) { - e.engineRegistry = er - } -} - func WithEngineFactoryFn(efn engineFactoryFn) func(*eventHandler) { return func(e *eventHandler) { e.engineFactory = efn @@ -212,6 +209,7 @@ func NewEventHandler( fetchFn FetcherFunc, workflowStore store.Store, capRegistry core.CapabilitiesRegistry, + engineRegistry *registry.EngineRegistry, emitter custmsg.MessageEmitter, clock clockwork.Clock, encryptionKey workflowkey.Key, @@ -219,13 +217,26 @@ func NewEventHandler( workflowLimits *syncerlimiter.Limits, opts ...func(*eventHandler), ) *eventHandler { + + registryMetrics, err := initMonitoringResources() + if err != nil { + lggr.Criticalw("Failed to initialize registry monitoring resources", "err", err) + } + + workflowMetrics, err := workflows.InitWorkflowMonitoringResources() + if err != nil { + lggr.Criticalw("Failed to initialize workflow monitoring resources", "err", err) + } + eh := &eventHandler{ lggr: lggr, orm: orm, workflowStore: workflowStore, capRegistry: capRegistry, fetchFn: fetchFn, - engineRegistry: NewEngineRegistry(), + engineRegistry: engineRegistry, + registryMetrics: *registryMetrics, + workflowMetrics: *workflowMetrics, emitter: emitter, lastFetchedAtMap: newLastFetchedAtMap(), clock: clock, @@ -331,11 +342,17 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), ) + metrics := h.registryMetrics.with( + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), + ) + if _, err := h.forceUpdateSecretsEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr) return err } + metrics.incrementForceUpdateSecretsCounter(ctx) h.lggr.Debugw("handled force update secrets events for URL hash", "urlHash", payload.SecretsURLHash) return nil case WorkflowRegisteredEvent: @@ -351,11 +368,21 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.registryMetrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowRegisteredEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow registered event: %v", err), h.lggr) return err } + metrics.incrementRegisterCounter(ctx) + + // intentionally without workflow specific labels. measures total workflows on this node. + h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow registration event", "workflowID", wfID) return nil case WorkflowUpdatedEvent: @@ -371,11 +398,18 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.registryMetrics.with( + platform.KeyWorkflowID, newWorkflowID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowUpdatedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow updated event: %v", err), h.lggr) return err } + metrics.incrementUpdateCounter(ctx) h.lggr.Debugw("handled workflow updated event", "workflowID", newWorkflowID) return nil case WorkflowPausedEvent: @@ -392,10 +426,21 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.registryMetrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowPausedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr) return err } + + metrics.incrementPauseCounter(ctx) + + // intentionally without workflow specific labels. measures total workflows on this node. + h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow paused event", "workflowID", wfID) return nil case WorkflowActivatedEvent: @@ -411,11 +456,22 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowName, payload.WorkflowName, platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + + metrics := h.registryMetrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowActivatedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", err), h.lggr) return err } + metrics.incrementActivateCounter(ctx) + + // intentionally without workflow specific labels. measures total workflows on this node. + h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow activated event", "workflowID", wfID) return nil case WorkflowDeletedEvent: @@ -432,11 +488,20 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.registryMetrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowDeletedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", err), h.lggr) return err } + metrics.incrementDeleteCounter(ctx) + // intentionally without workflow specific labels. measures total workflows on this node. + h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow deleted event", "workflowID", wfID) return nil default: @@ -547,14 +612,13 @@ func (h *eventHandler) workflowRegisteredEvent( return fmt.Errorf("failed to create workflow engine: %w", err) } - if err := engine.Start(ctx); err != nil { - return fmt.Errorf("failed to start workflow engine: %w", err) + // add the engine to the registry before we start so it can be deleted on close + if err := h.engineRegistry.Add(wfID, engine); err != nil { + return fmt.Errorf("failed to add new engine to registry: %w", err) } - // This shouldn't happen because we call the handler serially and - // check for running engines above, see the call to engineRegistry.IsRunning. - if err := h.engineRegistry.Add(wfID, engine); err != nil { - return fmt.Errorf("invariant violation: %w", err) + if err := engine.Start(ctx); err != nil { + return fmt.Errorf("failed to start workflow engine: %w", err) } return nil @@ -629,7 +693,7 @@ func (h *eventHandler) workflowUpdatedEvent( payload WorkflowRegistryWorkflowUpdatedV1, ) error { // Remove the old workflow engine from the local registry if it exists - if err := h.tryEngineCleanup(hex.EncodeToString(payload.OldWorkflowID[:])); err != nil { + if err := h.engineCleanup(hex.EncodeToString(payload.OldWorkflowID[:])); err != nil { return err } @@ -653,7 +717,7 @@ func (h *eventHandler) workflowPausedEvent( payload WorkflowRegistryWorkflowPausedV1, ) error { // Remove the workflow engine from the local registry if it exists - if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { + if err := h.engineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { return err } @@ -714,7 +778,7 @@ func (h *eventHandler) workflowDeletedEvent( ctx context.Context, payload WorkflowRegistryWorkflowDeletedV1, ) error { - if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { + if err := h.engineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { return err } @@ -759,20 +823,19 @@ func (h *eventHandler) forceUpdateSecretsEvent( return string(secrets), nil } -// tryEngineCleanup attempts to stop the workflow engine for the given workflow ID. Does nothing if the -// workflow engine is not running. -func (h *eventHandler) tryEngineCleanup(wfID string) error { - if h.engineRegistry.IsRunning(wfID) { - // Remove the engine from the registry - e, err := h.engineRegistry.Pop(wfID) - if err != nil { - return fmt.Errorf("failed to get workflow engine: %w", err) - } +// engineCleanup attempts to pop the engine from the registry, +// then stop + close it +func (h *eventHandler) engineCleanup(wfID string) error { + // Remove the engine from the registry + e, err := h.engineRegistry.Pop(wfID) + if err != nil { + // no engine, nothing to cleanup + return nil + } - // Stop the engine - if err := e.Close(); err != nil { - return fmt.Errorf("failed to close workflow engine: %w", err) - } + // Stop the engine + if err := e.Close(); err != nil { + return fmt.Errorf("failed to close workflow engine: %w", err) } return nil } diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index e1d4b61731f..c9f107c0b29 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -24,6 +24,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" + ereg "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" wfstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncerlimiter" @@ -110,7 +111,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, ereg.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) err = h.Handle(ctx, giveEvent) require.NoError(t, err) }) @@ -128,7 +129,7 @@ func Test_Handler(t *testing.T) { return []byte("contents"), nil } - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, ereg.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.Contains(t, err.Error(), "event type unsupported") @@ -142,7 +143,7 @@ func Test_Handler(t *testing.T) { workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) require.NoError(t, err) - h := NewEventHandler(lggr, mockORM, nil, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) + h := NewEventHandler(lggr, mockORM, nil, nil, nil, ereg.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) giveURL := "https://original-url.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -186,7 +187,7 @@ func Test_Handler(t *testing.T) { return nil, assert.AnError } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, ereg.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -218,7 +219,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, ereg.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -383,8 +384,9 @@ func Test_workflowRegisteredHandler(t *testing.T) { }, validationFn: func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) { me := &mockEngine{} - h.engineRegistry.Add(wfID, me) - err := h.workflowRegisteredEvent(ctx, event) + err := h.engineRegistry.Add(wfID, me) + require.NoError(t, err) + err = h.workflowRegisteredEvent(ctx, event) require.Error(t, err) require.ErrorContains(t, err, "workflow is already running") }, @@ -522,10 +524,8 @@ func testRunningWorkflow(t *testing.T, tc testCase) { event := tc.Event(giveWFID[:]) - er := NewEngineRegistry() - opts := []func(*eventHandler){ - WithEngineRegistry(er), - } + er := ereg.NewEngineRegistry() + var opts []func(*eventHandler) if tc.engineFactoryFn != nil { opts = append(opts, WithEngineFactoryFn(tc.engineFactoryFn)) } @@ -535,9 +535,10 @@ func testRunningWorkflow(t *testing.T, tc testCase) { registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) + workflowLimits, err := syncerlimiter.NewWorkflowLimits(syncerlimiter.Config{Global: 200, PerOwner: 200}) require.NoError(t, err) - h := NewEventHandler(lggr, orm, fetcher, store, registry, emitter, clockwork.NewFakeClock(), + h := NewEventHandler(lggr, orm, fetcher, store, registry, er, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits, opts...) tc.validationFn(t, ctx, event, h, wfOwner, "workflow-name", wfID) }) @@ -582,7 +583,7 @@ func Test_workflowDeletedHandler(t *testing.T) { SecretsURL: secretsURL, } - er := NewEngineRegistry() + er := ereg.NewEngineRegistry() store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) registry := capabilities.NewRegistry(lggr) registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) @@ -596,12 +597,12 @@ func Test_workflowDeletedHandler(t *testing.T) { fetcher, store, registry, + er, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits, - WithEngineRegistry(er), ) err = h.workflowRegisteredEvent(ctx, active) require.NoError(t, err) @@ -662,7 +663,7 @@ func Test_workflowDeletedHandler(t *testing.T) { giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, secretsURL) require.NoError(t, err) - er := NewEngineRegistry() + er := ereg.NewEngineRegistry() store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) registry := capabilities.NewRegistry(lggr) registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) @@ -676,12 +677,12 @@ func Test_workflowDeletedHandler(t *testing.T) { fetcher, store, registry, + er, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits, - WithEngineRegistry(er), ) deleteEvent := WorkflowRegistryWorkflowDeletedV1{ @@ -747,7 +748,7 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { SecretsURL: secretsURL, } - er := NewEngineRegistry() + er := ereg.NewEngineRegistry() store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) registry := capabilities.NewRegistry(lggr) registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) @@ -761,12 +762,12 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { fetcher, store, registry, + er, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, workflowLimits, - WithEngineRegistry(er), ) err = h.workflowRegisteredEvent(ctx, active) require.NoError(t, err) @@ -913,6 +914,7 @@ func Test_Handler_SecretsFor(t *testing.T) { fetcher.Fetch, wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()), capabilities.NewRegistry(lggr), + ereg.NewEngineRegistry(), custmsg.NewLabeler(), clockwork.NewFakeClock(), encryptionKey, @@ -981,6 +983,7 @@ func Test_Handler_SecretsFor_RefreshesSecrets(t *testing.T) { fetcher.Fetch, wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()), capabilities.NewRegistry(lggr), + ereg.NewEngineRegistry(), custmsg.NewLabeler(), clockwork.NewFakeClock(), encryptionKey, @@ -1050,6 +1053,7 @@ func Test_Handler_SecretsFor_RefreshLogic(t *testing.T) { fetcher.Fetch, wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()), capabilities.NewRegistry(lggr), + ereg.NewEngineRegistry(), custmsg.NewLabeler(), clock, encryptionKey, diff --git a/core/services/workflows/syncer/monitoring.go b/core/services/workflows/syncer/monitoring.go new file mode 100644 index 00000000000..b08f8802a59 --- /dev/null +++ b/core/services/workflows/syncer/monitoring.go @@ -0,0 +1,107 @@ +package syncer + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/metrics" + monutils "github.com/smartcontractkit/chainlink/v2/core/monitoring" +) + +// workflow registry metrics is to locally scope instruments to avoid +// data races in testing +type workflowRegistryMetrics struct { + activateCounter metric.Int64Counter + deleteCounter metric.Int64Counter + forceUpdateSecretsCounter metric.Int64Counter + pauseCounter metric.Int64Counter + registerCounter metric.Int64Counter + updateCounter metric.Int64Counter +} + +func initMonitoringResources() (l *workflowRegistryMetricsLabeler, err error) { + m := &workflowRegistryMetrics{} + + m.activateCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_activate") + if err != nil { + return nil, fmt.Errorf("error initializing activate counter: %w", err) + } + + m.deleteCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_delete") + if err != nil { + return nil, fmt.Errorf("error initializing delete counter: %w", err) + } + + m.forceUpdateSecretsCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_force_update_secrets") + if err != nil { + return nil, fmt.Errorf("error initializing force update secrets counter: %w", err) + } + + m.pauseCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_pause") + if err != nil { + return nil, fmt.Errorf("error initializing pause counter: %w", err) + } + + m.registerCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_register") + if err != nil { + return nil, fmt.Errorf("error initializing register counter: %w", err) + } + + m.updateCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_update") + if err != nil { + return nil, fmt.Errorf("error initializing update counter: %w", err) + } + + return newWorkflowRegistryMetricsLabeler(m), nil +} + +// workflowRegistryMetricsLabeler wraps m to provide utilities for +// monitoring instrumentation +type workflowRegistryMetricsLabeler struct { + metrics.Labeler + m *workflowRegistryMetrics +} + +func newWorkflowRegistryMetricsLabeler(m *workflowRegistryMetrics) *workflowRegistryMetricsLabeler { + return &workflowRegistryMetricsLabeler{ + metrics.NewLabeler(), + m, + } +} + +func (l workflowRegistryMetricsLabeler) with(keyValues ...string) workflowRegistryMetricsLabeler { + return workflowRegistryMetricsLabeler{l.With(keyValues...), l.m} +} + +func (l workflowRegistryMetricsLabeler) incrementActivateCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.activateCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementDeleteCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.deleteCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementForceUpdateSecretsCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.forceUpdateSecretsCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementPauseCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.pauseCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementRegisterCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.registerCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementUpdateCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.updateCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} diff --git a/core/services/workflows/syncer/monitoring_test.go b/core/services/workflows/syncer/monitoring_test.go new file mode 100644 index 00000000000..7870f2d3999 --- /dev/null +++ b/core/services/workflows/syncer/monitoring_test.go @@ -0,0 +1,20 @@ +package syncer + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/metrics" +) + +func Test_InitMonitoringResources(t *testing.T) { + _, err := initMonitoringResources() + require.NoError(t, err) +} + +func Test_WorkflowMetricsLabeler(t *testing.T) { + testWorkflowsMetricLabeler := workflowRegistryMetricsLabeler{metrics.NewLabeler(), &workflowRegistryMetrics{}} + testWorkflowsMetricLabeler2 := testWorkflowsMetricLabeler.with("foo", "baz") + require.EqualValues(t, "baz", testWorkflowsMetricLabeler2.Labels["foo"]) +}