From 06e3418e26fe4be8708f155c281cf6d43bac4fe2 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Tue, 18 Feb 2025 15:47:32 -0500 Subject: [PATCH 01/13] changed engineCleanup from best effort based on service health to definitive --- core/services/workflows/engine.go | 2 +- core/services/workflows/syncer/handler.go | 30 +++++++++++------------ 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index abf4e59579d..d803988730a 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -1358,7 +1358,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { engine = &Engine{ cma: cma, - logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), + logger: cfg.Lggr.Named("WorkflowEngine").With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()), metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()), *em}, registry: cfg.Registry, workflow: workflow, diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index b5813740134..8ff3e062026 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -622,7 +622,7 @@ func (h *eventHandler) workflowUpdatedEvent( payload WorkflowRegistryWorkflowUpdatedV1, ) error { // Remove the old workflow engine from the local registry if it exists - if err := h.tryEngineCleanup(hex.EncodeToString(payload.OldWorkflowID[:])); err != nil { + if err := h.engineCleanup(hex.EncodeToString(payload.OldWorkflowID[:])); err != nil { return err } @@ -646,7 +646,7 @@ func (h *eventHandler) workflowPausedEvent( payload WorkflowRegistryWorkflowPausedV1, ) error { // Remove the workflow engine from the local registry if it exists - if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { + if err := h.engineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { return err } @@ -707,7 +707,7 @@ func (h *eventHandler) workflowDeletedEvent( ctx context.Context, payload WorkflowRegistryWorkflowDeletedV1, ) error { - if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { + if err := h.engineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { return err } @@ -752,20 +752,18 @@ func (h *eventHandler) forceUpdateSecretsEvent( return string(secrets), nil } -// tryEngineCleanup attempts to stop the workflow engine for the given workflow ID. Does nothing if the -// workflow engine is not running. -func (h *eventHandler) tryEngineCleanup(wfID string) error { - if h.engineRegistry.IsRunning(wfID) { - // Remove the engine from the registry - e, err := h.engineRegistry.Pop(wfID) - if err != nil { - return fmt.Errorf("failed to get workflow engine: %w", err) - } +// engineCleanup attempts to pop the engine from the registry, +// then stop + close it +func (h *eventHandler) engineCleanup(wfID string) error { + // Remove the engine from the registry + e, err := h.engineRegistry.Pop(wfID) + if err != nil { + return fmt.Errorf("failed to get workflow engine: %w", err) + } - // Stop the engine - if err := e.Close(); err != nil { - return fmt.Errorf("failed to close workflow engine: %w", err) - } + // Stop the engine + if err := e.Close(); err != nil { + return fmt.Errorf("failed to close workflow engine: %w", err) } return nil } From b1b54d40971f72461d5b58dabd4a03e762210db0 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Tue, 18 Feb 2025 16:47:38 -0500 Subject: [PATCH 02/13] removing total executions running metric --- core/services/workflows/engine.go | 3 +-- core/services/workflows/monitoring.go | 22 ++++++---------------- 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index d803988730a..efe20f12ea3 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -1153,7 +1153,6 @@ func (e *Engine) heartbeat(ctx context.Context) { return case <-ticker.C: e.metrics.incrementEngineHeartbeatCounter(ctx) - e.metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len()) logCustMsg(ctx, e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger) } } @@ -1266,7 +1265,7 @@ const ( defaultQueueSize = 100000 defaultNewWorkerTimeout = 2 * time.Second defaultMaxExecutionDuration = 10 * time.Minute - defaultHeartbeatCadence = 5 * time.Minute + defaultHeartbeatCadence = 1 * time.Minute defaultStepTimeout = 2 * time.Minute ) diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index e0a136e7e9d..85362b8b561 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -16,12 +16,12 @@ import ( // em AKA "engine metrics" is to locally scope these instruments to avoid // data races in testing type engineMetrics struct { - registerTriggerFailureCounter metric.Int64Counter - triggerWorkflowStarterErrorCounter metric.Int64Counter - workflowsRunningGauge metric.Int64Gauge - capabilityInvocationCounter metric.Int64Counter - capabilityFailureCounter metric.Int64Counter - workflowRegisteredCounter metric.Int64Counter + registerTriggerFailureCounter metric.Int64Counter + triggerWorkflowStarterErrorCounter metric.Int64Counter + capabilityInvocationCounter metric.Int64Counter + capabilityFailureCounter metric.Int64Counter + workflowRegisteredCounter metric.Int64Counter + // workflowUnregisteredCounter can also be viewed as workflow deletions workflowUnregisteredCounter metric.Int64Counter workflowExecutionRateLimitGlobalCounter metric.Int64Counter workflowExecutionRateLimitPerUserCounter metric.Int64Counter @@ -59,11 +59,6 @@ func initMonitoringResources() (em *engineMetrics, err error) { return nil, fmt.Errorf("failed to register trigger workflow starter error counter: %w", err) } - em.workflowsRunningGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_count") - if err != nil { - return nil, fmt.Errorf("failed to register workflows running gauge: %w", err) - } - em.capabilityInvocationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_capabilities_count") if err != nil { return nil, fmt.Errorf("failed to register capability invocation counter: %w", err) @@ -226,11 +221,6 @@ func (c workflowsMetricLabeler) incrementTotalWorkflowStepErrorsCounter(ctx cont c.em.workflowStepErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateTotalWorkflowsGauge(ctx context.Context, val int64) { - otelLabels := monutils.KvMapToOtelAttributes(c.Labels) - c.em.workflowsRunningGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) -} - func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) From f928260d19fce0d3fc0b98a7aef3626ae09a60be Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Tue, 18 Feb 2025 17:23:05 -0500 Subject: [PATCH 03/13] removing test in syncer relying on engine.Ready before Close --- .../services/workflows/syncer/handler_test.go | 58 ------------------- 1 file changed, 58 deletions(-) diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index e9b75e5a162..d6c0c56d0ad 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -617,64 +617,6 @@ func Test_workflowDeletedHandler(t *testing.T) { _, err = h.engineRegistry.Get(wfIDs) require.Error(t, err) }) - t.Run("success deleting non-existing workflow spec", func(t *testing.T) { - var ( - ctx = testutils.Context(t) - lggr = logger.TestLogger(t) - db = pgtest.NewSqlxDB(t) - orm = NewWorkflowRegistryDS(db, lggr) - emitter = custmsg.NewLabeler() - - binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) - encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) - config = []byte("") - secretsURL = "http://example.com" - binaryURL = "http://example.com/binary" - configURL = "http://example.com/config" - wfOwner = []byte("0xOwner") - - fetcher = newMockFetcher(map[string]mockFetchResp{ - binaryURL: {Body: encodedBinary, Err: nil}, - configURL: {Body: config, Err: nil}, - secretsURL: {Body: []byte("secrets"), Err: nil}, - }) - ) - - giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, secretsURL) - require.NoError(t, err) - - er := NewEngineRegistry() - store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) - registry := capabilities.NewRegistry(lggr) - registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) - rl, err := ratelimiter.NewRateLimiter(rlConfig) - require.NoError(t, err) - h := NewEventHandler( - lggr, - orm, - fetcher, - store, - registry, - emitter, - clockwork.NewFakeClock(), - workflowkey.Key{}, - rl, - WithEngineRegistry(er), - ) - - deleteEvent := WorkflowRegistryWorkflowDeletedV1{ - WorkflowID: giveWFID, - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - DonID: 1, - } - err = h.workflowDeletedEvent(ctx, deleteEvent) - require.NoError(t, err) - - // Verify the record is deleted in the database - _, err = orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") - require.Error(t, err) - }) } func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { From b7c5a58f62462305ed9b7ca9bd4d7d90784cc969 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Fri, 21 Feb 2025 19:13:10 -0500 Subject: [PATCH 04/13] Adding metrics to workflow registry syncer and lacking test coverage --- .../workflows/syncer/engine_registry.go | 12 +- .../workflows/syncer/engine_registry_test.go | 157 ++++++++++++++++++ core/services/workflows/syncer/handler.go | 56 +++++++ core/services/workflows/syncer/monitoring.go | 119 +++++++++++++ .../workflows/syncer/monitoring_test.go | 20 +++ 5 files changed, 360 insertions(+), 4 deletions(-) create mode 100644 core/services/workflows/syncer/engine_registry_test.go create mode 100644 core/services/workflows/syncer/monitoring.go create mode 100644 core/services/workflows/syncer/monitoring_test.go diff --git a/core/services/workflows/syncer/engine_registry.go b/core/services/workflows/syncer/engine_registry.go index ecea3b98c2f..f3441823c93 100644 --- a/core/services/workflows/syncer/engine_registry.go +++ b/core/services/workflows/syncer/engine_registry.go @@ -70,11 +70,15 @@ func (r *EngineRegistry) Close() error { defer r.mu.Unlock() var err error for id, engine := range r.engines { - closeErr := engine.Close() - if closeErr != nil { - err = errors.Join(err, closeErr) - } + err = errors.Join(err, engine.Close()) delete(r.engines, id) } return err } + +func (r *EngineRegistry) Size() int { + r.mu.RLock() + defer r.mu.RUnlock() + + return len(r.engines) +} diff --git a/core/services/workflows/syncer/engine_registry_test.go b/core/services/workflows/syncer/engine_registry_test.go new file mode 100644 index 00000000000..360076243de --- /dev/null +++ b/core/services/workflows/syncer/engine_registry_test.go @@ -0,0 +1,157 @@ +package syncer_test + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" +) + +// mockService implements the services.Service interface for testing. +type mockService struct { + mock.Mock +} + +func (m *mockService) Start(ctx context.Context) error { + return nil +} + +func (m *mockService) HealthReport() map[string]error { + return nil +} + +func (m *mockService) Name() string { + return "MockEngine" +} + +func (m *mockService) Ready() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockService) Close() error { + args := m.Called() + return args.Error(0) +} + +func TestEngineRegistry_AddAndGet(t *testing.T) { + registry := syncer.NewEngineRegistry() + + svc := mockService{} + svc.On("Ready").Return(nil) + + // Add the service + err := registry.Add("engine1", &svc) + require.NoError(t, err) + + // Try adding the same ID again; expect an error + err = registry.Add("engine1", &svc) + require.Error(t, err) + require.Contains(t, err.Error(), "duplicate") + + // Get the service + gotSvc, err := registry.Get("engine1") + require.NoError(t, err) + require.Equal(t, svc.Name(), gotSvc.Name()) + + // Try getting a non-existent engine + _, err = registry.Get("non-existent") + require.Error(t, err) + require.Contains(t, err.Error(), "not found") +} + +func TestEngineRegistry_IsRunning(t *testing.T) { + registry := syncer.NewEngineRegistry() + + runningSvc := &mockService{} + runningSvc.On("Ready").Return(nil) + + notRunningSvc := &mockService{} + notRunningSvc.On("Ready").Return(errors.New("not ready")) + + _ = registry.Add("running", runningSvc) + _ = registry.Add("notRunning", notRunningSvc) + + require.True(t, registry.IsRunning("running"), "expected running to be true if Ready() == nil") + require.False(t, registry.IsRunning("notRunning"), "expected running to be false if Ready() != nil") + require.False(t, registry.IsRunning("non-existent"), "expected false for non-existent engine") +} + +func TestEngineRegistry_Pop(t *testing.T) { + registry := syncer.NewEngineRegistry() + + svc := &mockService{} + err := registry.Add("id1", svc) + require.NoError(t, err) + + // Pop should remove and return the service + poppedSvc, err := registry.Pop("id1") + require.NoError(t, err) + require.Equal(t, svc.Name(), poppedSvc.Name()) + + // After popping, engine should not exist in registry + _, err = registry.Get("id1") + require.Error(t, err) + + // Popping a non-existent engine + _, err = registry.Pop("non-existent") + require.Error(t, err) +} + +func TestEngineRegistry_Close(t *testing.T) { + registry := syncer.NewEngineRegistry() + + // Set up multiple services to test aggregated errors + svc1 := &mockService{} + svc2 := &mockService{} + svc3 := &mockService{} + + // We want to track Close calls and possibly return different errors + svc1.On("Close").Return(nil).Once() + svc2.On("Close").Return(errors.New("close error on svc2")).Once() + svc3.On("Close").Return(errors.New("close error on svc3")).Once() + + _ = registry.Add("svc1", svc1) + _ = registry.Add("svc2", svc2) + _ = registry.Add("svc3", svc3) + + // Call Close + err := registry.Close() + // We expect both errors from svc2 and svc3 to be joined + require.Error(t, err, "expected an aggregated error from multiple closes") + // Check that the error message contains both parts + require.True(t, strings.Contains(err.Error(), "svc2") && strings.Contains(err.Error(), "svc3"), + "expected error to contain all relevant messages") + + // After Close, registry should be empty + require.Equal(t, 0, registry.Size(), "registry should be empty after Close") + + // Additional Close calls should not error because registry is empty + err = registry.Close() + require.NoError(t, err, "expected no error on subsequent Close calls when registry is empty") + + // Ensure the mock expectations have been met (all calls accounted for) + svc1.AssertExpectations(t) + svc2.AssertExpectations(t) + svc3.AssertExpectations(t) +} + +func TestEngineRegistry_Size(t *testing.T) { + registry := syncer.NewEngineRegistry() + require.Equal(t, 0, registry.Size(), "initial registry should have size 0") + + _ = registry.Add("id1", new(mockService)) + _ = registry.Add("id2", new(mockService)) + require.Equal(t, 2, registry.Size()) + + _, _ = registry.Pop("id1") + require.Equal(t, 1, registry.Size()) + + _ = registry.Close() + require.Equal(t, 0, registry.Size()) +} diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 8ff3e062026..7c74ec76b7b 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -169,6 +169,7 @@ type eventHandler struct { workflowStore store.Store capRegistry core.CapabilitiesRegistry engineRegistry *EngineRegistry + metrics workflowRegistryMetricsLabeler emitter custmsg.MessageEmitter lastFetchedAtMap *lastFetchedAtMap clock clockwork.Clock @@ -216,6 +217,12 @@ func NewEventHandler( ratelimiter *ratelimiter.RateLimiter, opts ...func(*eventHandler), ) *eventHandler { + + m, err := initMonitoringResources() + if err != nil { + lggr.Fatalw("Failed to initialize monitoring resources", "err", err) + } + eh := &eventHandler{ lggr: lggr, orm: orm, @@ -223,6 +230,7 @@ func NewEventHandler( capRegistry: capRegistry, fetchFn: fetchFn, engineRegistry: NewEngineRegistry(), + metrics: newWorkflowRegistryMetricsLabeler(m), emitter: emitter, lastFetchedAtMap: newLastFetchedAtMap(), clock: clock, @@ -326,11 +334,17 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), ) + metrics := h.metrics.with( + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), + ) + if _, err := h.forceUpdateSecretsEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr) return err } + metrics.incrementForceUpdateSecretsCounter(ctx) h.lggr.Debugw("handled force update secrets events for URL hash", "urlHash", payload.SecretsURLHash) return nil case WorkflowRegisteredEvent: @@ -346,11 +360,21 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.metrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowRegisteredEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow registered event: %v", err), h.lggr) return err } + metrics.incrementRegisterCounter(ctx) + + // intentionally without workflow specific labels + h.metrics.updateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow registration event", "workflowID", wfID) return nil case WorkflowUpdatedEvent: @@ -366,11 +390,18 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.metrics.with( + platform.KeyWorkflowID, newWorkflowID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowUpdatedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow updated event: %v", err), h.lggr) return err } + metrics.incrementUpdateCounter(ctx) h.lggr.Debugw("handled workflow updated event", "workflowID", newWorkflowID) return nil case WorkflowPausedEvent: @@ -387,10 +418,18 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.metrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowPausedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr) return err } + + metrics.incrementPauseCounter(ctx) h.lggr.Debugw("handled workflow paused event", "workflowID", wfID) return nil case WorkflowActivatedEvent: @@ -406,11 +445,19 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowName, payload.WorkflowName, platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + + metrics := h.metrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowActivatedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", err), h.lggr) return err } + metrics.incrementActivateCounter(ctx) h.lggr.Debugw("handled workflow activated event", "workflowID", wfID) return nil case WorkflowDeletedEvent: @@ -427,11 +474,20 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.metrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowDeletedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", err), h.lggr) return err } + metrics.incrementDeleteCounter(ctx) + // intentionally without workflow specific labels + h.metrics.updateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow deleted event", "workflowID", wfID) return nil default: diff --git a/core/services/workflows/syncer/monitoring.go b/core/services/workflows/syncer/monitoring.go new file mode 100644 index 00000000000..e8f762a97e9 --- /dev/null +++ b/core/services/workflows/syncer/monitoring.go @@ -0,0 +1,119 @@ +package syncer + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/metrics" + monutils "github.com/smartcontractkit/chainlink/v2/core/monitoring" +) + +// workflow registry metrics is to locally scope instruments to avoid +// data races in testing +type workflowRegistryMetrics struct { + activateCounter metric.Int64Counter + deleteCounter metric.Int64Counter + forceUpdateSecretsCounter metric.Int64Counter + pauseCounter metric.Int64Counter + registerCounter metric.Int64Counter + updateCounter metric.Int64Counter + + totalWorkflows metric.Int64Gauge +} + +func initMonitoringResources() (m *workflowRegistryMetrics, err error) { + m = &workflowRegistryMetrics{} + + m.activateCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_register") + if err != nil { + return nil, fmt.Errorf("error initializing activate counter: %w", err) + } + + m.deleteCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_delete") + if err != nil { + return nil, fmt.Errorf("error initializing delete counter: %w", err) + } + + m.forceUpdateSecretsCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_force_update_secrets") + if err != nil { + return nil, fmt.Errorf("error initializing force update secrets counter: %w", err) + } + + m.pauseCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_pause") + if err != nil { + return nil, fmt.Errorf("error initializing pause counter: %w", err) + } + + m.registerCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_register") + if err != nil { + return nil, fmt.Errorf("error initializing register counter: %w", err) + } + + m.updateCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_update") + if err != nil { + return nil, fmt.Errorf("error initializing update counter: %w", err) + } + + m.totalWorkflows, err = beholder.GetMeter().Int64Gauge("platform_workflow_syncer_total") + if err != nil { + return nil, fmt.Errorf("error initializing total workflows: %w", err) + } + + return m, nil +} + +// workflowRegistryMetricsLabeler wraps m to provide utilities for +// monitoring instrumentation +type workflowRegistryMetricsLabeler struct { + metrics.Labeler + m *workflowRegistryMetrics +} + +func newWorkflowRegistryMetricsLabeler(m *workflowRegistryMetrics) workflowRegistryMetricsLabeler { + return workflowRegistryMetricsLabeler{ + metrics.NewLabeler(), + m, + } +} + +func (l workflowRegistryMetricsLabeler) with(keyValues ...string) workflowRegistryMetricsLabeler { + return workflowRegistryMetricsLabeler{l.With(keyValues...), l.m} +} + +func (l workflowRegistryMetricsLabeler) incrementActivateCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.activateCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementDeleteCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.deleteCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementForceUpdateSecretsCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.forceUpdateSecretsCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementPauseCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.pauseCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementRegisterCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.registerCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementUpdateCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.updateCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) updateTotalWorkflowsGauge(ctx context.Context, val int64) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.totalWorkflows.Record(ctx, val, metric.WithAttributes(otelLabels...)) +} diff --git a/core/services/workflows/syncer/monitoring_test.go b/core/services/workflows/syncer/monitoring_test.go new file mode 100644 index 00000000000..7870f2d3999 --- /dev/null +++ b/core/services/workflows/syncer/monitoring_test.go @@ -0,0 +1,20 @@ +package syncer + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/metrics" +) + +func Test_InitMonitoringResources(t *testing.T) { + _, err := initMonitoringResources() + require.NoError(t, err) +} + +func Test_WorkflowMetricsLabeler(t *testing.T) { + testWorkflowsMetricLabeler := workflowRegistryMetricsLabeler{metrics.NewLabeler(), &workflowRegistryMetrics{}} + testWorkflowsMetricLabeler2 := testWorkflowsMetricLabeler.with("foo", "baz") + require.EqualValues(t, "baz", testWorkflowsMetricLabeler2.Labels["foo"]) +} From d76fe74c83b47d03cf6459a660bf7105feb5b35c Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Fri, 21 Feb 2025 21:45:37 -0500 Subject: [PATCH 05/13] fixing test --- .../workflows/syncer/engine_registry_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/services/workflows/syncer/engine_registry_test.go b/core/services/workflows/syncer/engine_registry_test.go index 360076243de..faf92ef8b4d 100644 --- a/core/services/workflows/syncer/engine_registry_test.go +++ b/core/services/workflows/syncer/engine_registry_test.go @@ -142,11 +142,19 @@ func TestEngineRegistry_Close(t *testing.T) { } func TestEngineRegistry_Size(t *testing.T) { + // Set up multiple services to test aggregated errors + svc1 := &mockService{} + svc2 := &mockService{} + + // We want to track Close calls and possibly return different errors + svc1.On("Close").Return(nil) + svc2.On("Close").Return(nil) + registry := syncer.NewEngineRegistry() require.Equal(t, 0, registry.Size(), "initial registry should have size 0") - _ = registry.Add("id1", new(mockService)) - _ = registry.Add("id2", new(mockService)) + _ = registry.Add("id1", svc1) + _ = registry.Add("id2", svc2) require.Equal(t, 2, registry.Size()) _, _ = registry.Pop("id1") From e75d11740ae4fe08f6a9f83751c2c9a54565ce19 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Mon, 24 Feb 2025 16:57:08 -0500 Subject: [PATCH 06/13] separating engine from workflows metrics. wiring an engineRegistry around to make that possible --- core/services/chainlink/application.go | 9 +++ core/services/workflows/delegate.go | 28 ++++++- core/services/workflows/engine.go | 7 +- core/services/workflows/monitoring.go | 81 ++++++++++++++------ core/services/workflows/monitoring_test.go | 4 +- core/services/workflows/syncer/handler.go | 35 +++++---- core/services/workflows/syncer/monitoring.go | 12 --- 7 files changed, 120 insertions(+), 56 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 55ca46828d5..822400fff8e 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -488,6 +488,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { opts.CapabilitiesRegistry, workflowORM, creServices.workflowRateLimiter, + creServices.engineRegistry, ) // Flux monitor requires ethereum just to boot, silence errors with a null delegate @@ -720,6 +721,10 @@ type CREServices struct { // gatewayConnectorWrapper is the wrapper for the gateway connector // it is exposed because there are contingent services in the application gatewayConnectorWrapper *gatewayconnector.ServiceWrapper + // engineRegistry is exposed so that both the delegate + // and syncer paths share an underlying store of Engine + // instances + engineRegistry *syncer.EngineRegistry // srvs are all the services that are created, including those that are explicitly exposed srvs []services.ServiceCtx } @@ -733,6 +738,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { opts = cscfg.CREOpts ds = cscfg.DS ) + var engineRegistry *syncer.EngineRegistry var srvcs []services.ServiceCtx workflowRateLimiter, err := ratelimiter.NewRateLimiter(ratelimiter.Config{ GlobalRPS: capCfg.RateLimit().GlobalRPS(), @@ -839,12 +845,14 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { return nil, fmt.Errorf("expected 1 key, got %d", len(keys)) } + engineRegistry = syncer.NewEngineRegistry() eventHandler := syncer.NewEventHandler( lggr, syncer.NewWorkflowRegistryDS(ds, globalLogger), fetcherFunc, workflowstore.NewDBStore(ds, lggr, clockwork.NewRealClock()), opts.CapabilitiesRegistry, + engineRegistry, custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0], @@ -887,6 +895,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { return &CREServices{ workflowRateLimiter: workflowRateLimiter, gatewayConnectorWrapper: gatewayConnectorWrapper, + engineRegistry: engineRegistry, srvs: srvcs, }, nil } diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 9e74a999795..945f8246459 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -8,6 +8,7 @@ import ( "github.com/pelletier/go-toml" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink-common/pkg/types/core" @@ -24,6 +25,8 @@ type Delegate struct { logger logger.Logger store store.Store ratelimiter *ratelimiter.RateLimiter + engineRegistry *syncer.EngineRegistry + metrics WorkflowMetricLabeler } var _ job.Delegate = (*Delegate)(nil) @@ -38,11 +41,20 @@ func (d *Delegate) AfterJobCreated(jb job.Job) {} func (d *Delegate) BeforeJobDeleted(spec job.Job) {} -func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil } +func (d *Delegate) OnDeleteJob(ctx context.Context, spec job.Job) error { + _, err := d.engineRegistry.Pop(spec.WorkflowSpec.WorkflowID) + if err != nil { + d.logger.Errorf("delegate failed to unregister workflow engine for workflow name: %s id: %s: %v", spec.WorkflowSpec.WorkflowName, spec.WorkflowSpec.WorkflowName, err) + return nil + } + d.metrics.UpdateTotalWorkflowsGauge(ctx, int64(d.engineRegistry.Size())) + return nil +} // ServicesForSpec satisfies the job.Delegate interface. func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) { cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, spec.WorkflowSpec.WorkflowID, platform.KeyWorkflowOwner, spec.WorkflowSpec.WorkflowOwner, platform.KeyWorkflowName, spec.WorkflowSpec.WorkflowName) + sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx) if err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger) @@ -80,6 +92,12 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser if err != nil { return nil, err } + + err = d.engineRegistry.Add(spec.WorkflowSpec.WorkflowID, engine) + if err != nil { + d.logger.Errorf("delegate failed to register workflow engine for workflow name: %s id: %s: %v", cfg.WorkflowName.String(), cfg.WorkflowID, err) + } + d.metrics.UpdateTotalWorkflowsGauge(ctx, int64(d.engineRegistry.Size())) d.logger.Infow("Creating Workflow Engine for workflow spec", "workflowID", spec.WorkflowSpec.WorkflowID, "workflowOwner", spec.WorkflowSpec.WorkflowOwner, "workflowName", spec.WorkflowSpec.WorkflowName, "jobName", spec.Name) return []job.ServiceCtx{engine}, nil } @@ -99,13 +117,21 @@ func NewDelegate( registry core.CapabilitiesRegistry, store store.Store, ratelimiter *ratelimiter.RateLimiter, + engineRegistry *syncer.EngineRegistry, ) *Delegate { + metrics, err := initWorkflowMonitoringResources() + if err != nil { + logger.Criticalw("Failed to initialize workflow monitoring resources", "err", err) + } + return &Delegate{ logger: logger, registry: registry, + engineRegistry: engineRegistry, secretsFetcher: newNoopSecretsFetcher(), store: store, ratelimiter: ratelimiter, + metrics: *metrics, } } diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index efe20f12ea3..a47f7ea4210 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -13,7 +13,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" - "github.com/smartcontractkit/chainlink-common/pkg/metrics" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/values" @@ -104,7 +103,7 @@ type secretsFetcher interface { type Engine struct { services.StateMachine cma custmsg.MessageEmitter - metrics workflowsMetricLabeler + metrics engineMetricLabeler logger logger.Logger registry core.CapabilitiesRegistry workflow *workflow @@ -1339,7 +1338,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { // - etc. // spin up monitoring resources - em, err := initMonitoringResources() + engineMetrics, err := initEngineMonitoringResources() if err != nil { return nil, fmt.Errorf("could not initialize monitoring resources: %w", err) } @@ -1358,7 +1357,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { engine = &Engine{ cma: cma, logger: cfg.Lggr.Named("WorkflowEngine").With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()), - metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()), *em}, + metrics: engineMetrics.with(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()), registry: cfg.Registry, workflow: workflow, secretsFetcher: cfg.SecretsFetcher, diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index 85362b8b561..4c68dfb16ea 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -13,6 +13,39 @@ import ( monutils "github.com/smartcontractkit/chainlink/v2/core/monitoring" ) +// wm AKA "workflow metrics" is to locally scope these instruments to avoid +// data races in testing +type workflowMetrics struct { + totalWorkflows metric.Int64Gauge +} + +func initWorkflowMonitoringResources() (l *WorkflowMetricLabeler, err error) { + wm := &workflowMetrics{} + + wm.totalWorkflows, err = beholder.GetMeter().Int64Gauge("platform_workflow_engines_total") + if err != nil { + return nil, fmt.Errorf("error initializing total workflows: %w", err) + } + + l = &WorkflowMetricLabeler{ + metrics.NewLabeler(), + *wm, + } + return l, nil +} + +// WorkflowMetricLabeler wraps a Labeler to provide workflow specific utilities +// for monitoring resources +type WorkflowMetricLabeler struct { + metrics.Labeler + wm workflowMetrics +} + +func (l WorkflowMetricLabeler) UpdateTotalWorkflowsGauge(ctx context.Context, val int64) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.wm.totalWorkflows.Record(ctx, val, metric.WithAttributes(otelLabels...)) +} + // em AKA "engine metrics" is to locally scope these instruments to avoid // data races in testing type engineMetrics struct { @@ -36,8 +69,8 @@ type engineMetrics struct { workflowStepDurationSeconds metric.Int64Histogram } -func initMonitoringResources() (em *engineMetrics, err error) { - em = &engineMetrics{} +func initEngineMonitoringResources() (m *engineMetricLabeler, err error) { + em := engineMetrics{} em.workflowExecutionRateLimitGlobalCounter, err = beholder.GetMeter().Int64Counter("platform_engine_execution_ratelimit_global") if err != nil { @@ -141,7 +174,7 @@ func initMonitoringResources() (em *engineMetrics, err error) { return nil, fmt.Errorf("failed to register step execution time histogram: %w", err) } - return em, nil + return &engineMetricLabeler{metrics.NewLabeler(), em}, nil } // Note: due to the OTEL specification, all histogram buckets @@ -175,98 +208,98 @@ func MetricViews() []sdkmetric.View { } } -// workflowsMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities +// engineMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities // for monitoring resources -type workflowsMetricLabeler struct { +type engineMetricLabeler struct { metrics.Labeler em engineMetrics } -func (c workflowsMetricLabeler) with(keyValues ...string) workflowsMetricLabeler { - return workflowsMetricLabeler{c.With(keyValues...), c.em} +func (c engineMetricLabeler) with(keyValues ...string) engineMetricLabeler { + return engineMetricLabeler{c.With(keyValues...), c.em} } -func (c workflowsMetricLabeler) incrementWorkflowExecutionRateLimitGlobalCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowExecutionRateLimitGlobalCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowExecutionRateLimitGlobalCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementWorkflowExecutionRateLimitPerUserCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowExecutionRateLimitPerUserCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowExecutionRateLimitPerUserCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementRegisterTriggerFailureCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementRegisterTriggerFailureCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.registerTriggerFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementTriggerWorkflowStarterErrorCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementTriggerWorkflowStarterErrorCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.triggerWorkflowStarterErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementCapabilityInvocationCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementCapabilityInvocationCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.capabilityInvocationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowExecutionLatencyGauge(ctx context.Context, val int64) { +func (c engineMetricLabeler) updateWorkflowExecutionLatencyGauge(ctx context.Context, val int64) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowExecutionLatencyGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementTotalWorkflowStepErrorsCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementTotalWorkflowStepErrorsCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowStepErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementCapabilityFailureCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementCapabilityFailureCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.capabilityFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementWorkflowRegisteredCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowRegisteredCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowRegisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementWorkflowUnregisteredCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowUnregisteredCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowUnregisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) incrementWorkflowInitializationCounter(ctx context.Context) { +func (c engineMetricLabeler) incrementWorkflowInitializationCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowInitializationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowCompletedDurationHistogram(ctx context.Context, duration int64) { +func (c engineMetricLabeler) updateWorkflowCompletedDurationHistogram(ctx context.Context, duration int64) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowCompletedDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowEarlyExitDurationHistogram(ctx context.Context, duration int64) { +func (c engineMetricLabeler) updateWorkflowEarlyExitDurationHistogram(ctx context.Context, duration int64) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowEarlyExitDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowErrorDurationHistogram(ctx context.Context, duration int64) { +func (c engineMetricLabeler) updateWorkflowErrorDurationHistogram(ctx context.Context, duration int64) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowErrorDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowTimeoutDurationHistogram(ctx context.Context, duration int64) { +func (c engineMetricLabeler) updateWorkflowTimeoutDurationHistogram(ctx context.Context, duration int64) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowTimeoutDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } -func (c workflowsMetricLabeler) updateWorkflowStepDurationHistogram(ctx context.Context, duration int64) { +func (c engineMetricLabeler) updateWorkflowStepDurationHistogram(ctx context.Context, duration int64) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowStepDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } diff --git a/core/services/workflows/monitoring_test.go b/core/services/workflows/monitoring_test.go index 5b7177e51dc..536a02ca4ba 100644 --- a/core/services/workflows/monitoring_test.go +++ b/core/services/workflows/monitoring_test.go @@ -9,12 +9,12 @@ import ( ) func Test_InitMonitoringResources(t *testing.T) { - _, err := initMonitoringResources() + _, err := initEngineMonitoringResources() require.NoError(t, err) } func Test_WorkflowMetricsLabeler(t *testing.T) { - testWorkflowsMetricLabeler := workflowsMetricLabeler{metrics.NewLabeler(), engineMetrics{}} + testWorkflowsMetricLabeler := engineMetricLabeler{metrics.NewLabeler(), engineMetrics{}} testWorkflowsMetricLabeler2 := testWorkflowsMetricLabeler.with("foo", "baz") require.EqualValues(t, testWorkflowsMetricLabeler2.Labels["foo"], "baz") } diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 7c74ec76b7b..20267468cc6 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -169,7 +169,8 @@ type eventHandler struct { workflowStore store.Store capRegistry core.CapabilitiesRegistry engineRegistry *EngineRegistry - metrics workflowRegistryMetricsLabeler + registryMetrics workflowRegistryMetricsLabeler + workflowMetrics workflows.WorkflowMetricLabeler emitter custmsg.MessageEmitter lastFetchedAtMap *lastFetchedAtMap clock clockwork.Clock @@ -211,6 +212,7 @@ func NewEventHandler( fetchFn FetcherFunc, workflowStore store.Store, capRegistry core.CapabilitiesRegistry, + engineRegistry *EngineRegistry, emitter custmsg.MessageEmitter, clock clockwork.Clock, encryptionKey workflowkey.Key, @@ -220,7 +222,7 @@ func NewEventHandler( m, err := initMonitoringResources() if err != nil { - lggr.Fatalw("Failed to initialize monitoring resources", "err", err) + lggr.Criticalw("Failed to initialize monitoring resources", "err", err) } eh := &eventHandler{ @@ -229,8 +231,8 @@ func NewEventHandler( workflowStore: workflowStore, capRegistry: capRegistry, fetchFn: fetchFn, - engineRegistry: NewEngineRegistry(), - metrics: newWorkflowRegistryMetricsLabeler(m), + engineRegistry: engineRegistry, + registryMetrics: newWorkflowRegistryMetricsLabeler(m), emitter: emitter, lastFetchedAtMap: newLastFetchedAtMap(), clock: clock, @@ -334,7 +336,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), ) - metrics := h.metrics.with( + metrics := h.registryMetrics.with( platform.KeyWorkflowName, payload.WorkflowName, platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), ) @@ -360,7 +362,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) - metrics := h.metrics.with( + metrics := h.registryMetrics.with( platform.KeyWorkflowID, wfID, platform.KeyWorkflowName, payload.WorkflowName, platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), @@ -374,7 +376,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { metrics.incrementRegisterCounter(ctx) // intentionally without workflow specific labels - h.metrics.updateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) + h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow registration event", "workflowID", wfID) return nil case WorkflowUpdatedEvent: @@ -390,7 +392,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) - metrics := h.metrics.with( + metrics := h.registryMetrics.with( platform.KeyWorkflowID, newWorkflowID, platform.KeyWorkflowName, payload.WorkflowName, platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), @@ -418,7 +420,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) - metrics := h.metrics.with( + metrics := h.registryMetrics.with( platform.KeyWorkflowID, wfID, platform.KeyWorkflowName, payload.WorkflowName, platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), @@ -430,6 +432,9 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { } metrics.incrementPauseCounter(ctx) + + // intentionally without workflow specific labels + h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow paused event", "workflowID", wfID) return nil case WorkflowActivatedEvent: @@ -446,7 +451,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) - metrics := h.metrics.with( + metrics := h.registryMetrics.with( platform.KeyWorkflowID, wfID, platform.KeyWorkflowName, payload.WorkflowName, platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), @@ -458,6 +463,9 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { } metrics.incrementActivateCounter(ctx) + + // intentionally without workflow specific labels + h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow activated event", "workflowID", wfID) return nil case WorkflowDeletedEvent: @@ -474,7 +482,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) - metrics := h.metrics.with( + metrics := h.registryMetrics.with( platform.KeyWorkflowID, wfID, platform.KeyWorkflowName, payload.WorkflowName, platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), @@ -487,7 +495,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { metrics.incrementDeleteCounter(ctx) // intentionally without workflow specific labels - h.metrics.updateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) + h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow deleted event", "workflowID", wfID) return nil default: @@ -814,7 +822,8 @@ func (h *eventHandler) engineCleanup(wfID string) error { // Remove the engine from the registry e, err := h.engineRegistry.Pop(wfID) if err != nil { - return fmt.Errorf("failed to get workflow engine: %w", err) + // no engine, nothing to cleanup + return nil } // Stop the engine diff --git a/core/services/workflows/syncer/monitoring.go b/core/services/workflows/syncer/monitoring.go index e8f762a97e9..2c4effcdcb3 100644 --- a/core/services/workflows/syncer/monitoring.go +++ b/core/services/workflows/syncer/monitoring.go @@ -20,8 +20,6 @@ type workflowRegistryMetrics struct { pauseCounter metric.Int64Counter registerCounter metric.Int64Counter updateCounter metric.Int64Counter - - totalWorkflows metric.Int64Gauge } func initMonitoringResources() (m *workflowRegistryMetrics, err error) { @@ -57,11 +55,6 @@ func initMonitoringResources() (m *workflowRegistryMetrics, err error) { return nil, fmt.Errorf("error initializing update counter: %w", err) } - m.totalWorkflows, err = beholder.GetMeter().Int64Gauge("platform_workflow_syncer_total") - if err != nil { - return nil, fmt.Errorf("error initializing total workflows: %w", err) - } - return m, nil } @@ -112,8 +105,3 @@ func (l workflowRegistryMetricsLabeler) incrementUpdateCounter(ctx context.Conte otelLabels := monutils.KvMapToOtelAttributes(l.Labels) l.m.updateCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } - -func (l workflowRegistryMetricsLabeler) updateTotalWorkflowsGauge(ctx context.Context, val int64) { - otelLabels := monutils.KvMapToOtelAttributes(l.Labels) - l.m.totalWorkflows.Record(ctx, val, metric.WithAttributes(otelLabels...)) -} From 8dc77169fbbca74141aecd43e91805bb6e7e7b2d Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Wed, 26 Feb 2025 12:16:40 -0500 Subject: [PATCH 07/13] fixing import cycle. workflow -/-> syncer --- core/services/chainlink/application.go | 7 +++--- .../workflows/syncer/workflow_syncer_test.go | 5 ++-- core/services/workflows/delegate.go | 7 +++--- .../{syncer => registry}/engine_registry.go | 2 +- .../engine_registry_test.go | 14 +++++------ core/services/workflows/syncer/handler.go | 7 +++--- .../services/workflows/syncer/handler_test.go | 24 ++++++++++++------- 7 files changed, 37 insertions(+), 29 deletions(-) rename core/services/workflows/{syncer => registry}/engine_registry.go (99%) rename core/services/workflows/{syncer => registry}/engine_registry_test.go (94%) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 822400fff8e..91c8987f3b9 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -76,6 +76,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/webhook" "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/sessions" @@ -724,7 +725,7 @@ type CREServices struct { // engineRegistry is exposed so that both the delegate // and syncer paths share an underlying store of Engine // instances - engineRegistry *syncer.EngineRegistry + engineRegistry *registry.EngineRegistry // srvs are all the services that are created, including those that are explicitly exposed srvs []services.ServiceCtx } @@ -738,7 +739,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { opts = cscfg.CREOpts ds = cscfg.DS ) - var engineRegistry *syncer.EngineRegistry + var engineRegistry *registry.EngineRegistry var srvcs []services.ServiceCtx workflowRateLimiter, err := ratelimiter.NewRateLimiter(ratelimiter.Config{ GlobalRPS: capCfg.RateLimit().GlobalRPS(), @@ -845,7 +846,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { return nil, fmt.Errorf("expected 1 key, got %d", len(keys)) } - engineRegistry = syncer.NewEngineRegistry() + engineRegistry = registry.NewEngineRegistry() eventHandler := syncer.NewEventHandler( lggr, syncer.NewWorkflowRegistryDS(ds, globalLogger), diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index ca9674700c9..fbddeb3ca13 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -32,6 +32,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils" "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" @@ -435,7 +436,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) { require.NoError(t, err) giveWorkflow.ID = id - er := syncer.NewEngineRegistry() + er := registry.NewEngineRegistry() rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, @@ -535,7 +536,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) { giveWorkflow.ID = id mf := &mockEngineFactory{} - er := syncer.NewEngineRegistry() + er := registry.NewEngineRegistry() rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) handler := syncer.NewEventHandler( diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 945f8246459..20587a92a32 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -8,9 +8,8 @@ import ( "github.com/pelletier/go-toml" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" - "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/platform" @@ -25,7 +24,7 @@ type Delegate struct { logger logger.Logger store store.Store ratelimiter *ratelimiter.RateLimiter - engineRegistry *syncer.EngineRegistry + engineRegistry *registry.EngineRegistry metrics WorkflowMetricLabeler } @@ -117,7 +116,7 @@ func NewDelegate( registry core.CapabilitiesRegistry, store store.Store, ratelimiter *ratelimiter.RateLimiter, - engineRegistry *syncer.EngineRegistry, + engineRegistry *registry.EngineRegistry, ) *Delegate { metrics, err := initWorkflowMonitoringResources() if err != nil { diff --git a/core/services/workflows/syncer/engine_registry.go b/core/services/workflows/registry/engine_registry.go similarity index 99% rename from core/services/workflows/syncer/engine_registry.go rename to core/services/workflows/registry/engine_registry.go index f3441823c93..7625f92583d 100644 --- a/core/services/workflows/syncer/engine_registry.go +++ b/core/services/workflows/registry/engine_registry.go @@ -1,4 +1,4 @@ -package syncer +package registry import ( "errors" diff --git a/core/services/workflows/syncer/engine_registry_test.go b/core/services/workflows/registry/engine_registry_test.go similarity index 94% rename from core/services/workflows/syncer/engine_registry_test.go rename to core/services/workflows/registry/engine_registry_test.go index faf92ef8b4d..d67407cba60 100644 --- a/core/services/workflows/syncer/engine_registry_test.go +++ b/core/services/workflows/registry/engine_registry_test.go @@ -1,4 +1,4 @@ -package syncer_test +package registry_test import ( "context" @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" ) // mockService implements the services.Service interface for testing. @@ -40,7 +40,7 @@ func (m *mockService) Close() error { } func TestEngineRegistry_AddAndGet(t *testing.T) { - registry := syncer.NewEngineRegistry() + registry := registry.NewEngineRegistry() svc := mockService{} svc.On("Ready").Return(nil) @@ -66,7 +66,7 @@ func TestEngineRegistry_AddAndGet(t *testing.T) { } func TestEngineRegistry_IsRunning(t *testing.T) { - registry := syncer.NewEngineRegistry() + registry := registry.NewEngineRegistry() runningSvc := &mockService{} runningSvc.On("Ready").Return(nil) @@ -83,7 +83,7 @@ func TestEngineRegistry_IsRunning(t *testing.T) { } func TestEngineRegistry_Pop(t *testing.T) { - registry := syncer.NewEngineRegistry() + registry := registry.NewEngineRegistry() svc := &mockService{} err := registry.Add("id1", svc) @@ -104,7 +104,7 @@ func TestEngineRegistry_Pop(t *testing.T) { } func TestEngineRegistry_Close(t *testing.T) { - registry := syncer.NewEngineRegistry() + registry := registry.NewEngineRegistry() // Set up multiple services to test aggregated errors svc1 := &mockService{} @@ -150,7 +150,7 @@ func TestEngineRegistry_Size(t *testing.T) { svc1.On("Close").Return(nil) svc2.On("Close").Return(nil) - registry := syncer.NewEngineRegistry() + registry := registry.NewEngineRegistry() require.Equal(t, 0, registry.Size(), "initial registry should have size 0") _ = registry.Add("id1", svc1) diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 20267468cc6..6ea86f313db 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -27,6 +27,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -168,7 +169,7 @@ type eventHandler struct { limits *ArtifactConfig workflowStore store.Store capRegistry core.CapabilitiesRegistry - engineRegistry *EngineRegistry + engineRegistry *registry.EngineRegistry registryMetrics workflowRegistryMetricsLabeler workflowMetrics workflows.WorkflowMetricLabeler emitter custmsg.MessageEmitter @@ -187,7 +188,7 @@ type Event interface { var defaultSecretsFreshnessDuration = 24 * time.Hour -func WithEngineRegistry(er *EngineRegistry) func(*eventHandler) { +func WithEngineRegistry(er *registry.EngineRegistry) func(*eventHandler) { return func(e *eventHandler) { e.engineRegistry = er } @@ -212,7 +213,7 @@ func NewEventHandler( fetchFn FetcherFunc, workflowStore store.Store, capRegistry core.CapabilitiesRegistry, - engineRegistry *EngineRegistry, + engineRegistry *registry.EngineRegistry, emitter custmsg.MessageEmitter, clock clockwork.Clock, encryptionKey workflowkey.Key, diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index d6c0c56d0ad..5914ff61a19 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -24,6 +24,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter" + ereg "github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry" wfstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/mocks" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" @@ -106,7 +107,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, ereg.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) err = h.Handle(ctx, giveEvent) require.NoError(t, err) }) @@ -122,7 +123,7 @@ func Test_Handler(t *testing.T) { return []byte("contents"), nil } - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, ereg.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.Contains(t, err.Error(), "event type unsupported") @@ -134,7 +135,7 @@ func Test_Handler(t *testing.T) { rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) - h := NewEventHandler(lggr, mockORM, nil, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) + h := NewEventHandler(lggr, mockORM, nil, nil, nil, ereg.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) giveURL := "https://original-url.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -175,7 +176,7 @@ func Test_Handler(t *testing.T) { return nil, assert.AnError } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, ereg.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -204,7 +205,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, ereg.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -508,7 +509,7 @@ func testRunningWorkflow(t *testing.T, tc testCase) { event := tc.Event(giveWFID[:]) - er := NewEngineRegistry() + er := ereg.NewEngineRegistry() opts := []func(*eventHandler){ WithEngineRegistry(er), } @@ -520,7 +521,7 @@ func testRunningWorkflow(t *testing.T, tc testCase) { registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) - h := NewEventHandler(lggr, orm, fetcher, store, registry, emitter, clockwork.NewFakeClock(), + h := NewEventHandler(lggr, orm, fetcher, store, registry, er, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, opts...) tc.validationFn(t, ctx, event, h, wfOwner, "workflow-name", wfID) @@ -566,7 +567,7 @@ func Test_workflowDeletedHandler(t *testing.T) { SecretsURL: secretsURL, } - er := NewEngineRegistry() + er := ereg.NewEngineRegistry() store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) registry := capabilities.NewRegistry(lggr) registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) @@ -578,6 +579,7 @@ func Test_workflowDeletedHandler(t *testing.T) { fetcher, store, registry, + er, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, @@ -667,7 +669,7 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { SecretsURL: secretsURL, } - er := NewEngineRegistry() + er := ereg.NewEngineRegistry() store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) registry := capabilities.NewRegistry(lggr) registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) @@ -679,6 +681,7 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { fetcher, store, registry, + er, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, @@ -828,6 +831,7 @@ func Test_Handler_SecretsFor(t *testing.T) { fetcher.Fetch, wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()), capabilities.NewRegistry(lggr), + ereg.NewEngineRegistry(), custmsg.NewLabeler(), clockwork.NewFakeClock(), encryptionKey, @@ -893,6 +897,7 @@ func Test_Handler_SecretsFor_RefreshesSecrets(t *testing.T) { fetcher.Fetch, wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()), capabilities.NewRegistry(lggr), + ereg.NewEngineRegistry(), custmsg.NewLabeler(), clockwork.NewFakeClock(), encryptionKey, @@ -959,6 +964,7 @@ func Test_Handler_SecretsFor_RefreshLogic(t *testing.T) { fetcher.Fetch, wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()), capabilities.NewRegistry(lggr), + ereg.NewEngineRegistry(), custmsg.NewLabeler(), clock, encryptionKey, From 47c6f8730dcd40bcda757bd0503e82b9e9eec378 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Wed, 26 Feb 2025 14:12:43 -0500 Subject: [PATCH 08/13] fixing web test --- core/services/chainlink/application.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 91c8987f3b9..2a0519f3b29 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -739,7 +739,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { opts = cscfg.CREOpts ds = cscfg.DS ) - var engineRegistry *registry.EngineRegistry + engineRegistry := registry.NewEngineRegistry() // must be created even w/o workflow registry var srvcs []services.ServiceCtx workflowRateLimiter, err := ratelimiter.NewRateLimiter(ratelimiter.Config{ GlobalRPS: capCfg.RateLimit().GlobalRPS(), @@ -846,7 +846,6 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { return nil, fmt.Errorf("expected 1 key, got %d", len(keys)) } - engineRegistry = registry.NewEngineRegistry() eventHandler := syncer.NewEventHandler( lggr, syncer.NewWorkflowRegistryDS(ds, globalLogger), From c25ebb8de0930b046ad013923e7cd82993e9e5b7 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Wed, 26 Feb 2025 14:48:09 -0500 Subject: [PATCH 09/13] fixing other tests in unexpected place --- core/services/chainlink/application.go | 4 +++- .../capabilities/workflows/syncer/workflow_syncer_test.go | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 2a0519f3b29..b231ca7b2b8 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -739,7 +739,9 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) { opts = cscfg.CREOpts ds = cscfg.DS ) - engineRegistry := registry.NewEngineRegistry() // must be created even w/o workflow registry + // engineRegistry is consumed by the workflow job delegate and by the handler + // for the workflow registry for a unified backend store of engine instances + engineRegistry := registry.NewEngineRegistry() var srvcs []services.ServiceCtx workflowRateLimiter, err := ratelimiter.NewRateLimiter(ratelimiter.Config{ GlobalRPS: capCfg.RateLimit().GlobalRPS(), diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index fbddeb3ca13..7777463df92 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -357,7 +357,7 @@ func Test_SecretsWorker(t *testing.T) { require.NoError(t, err) handler := &testSecretsWorkEventHandler{ wrappedHandler: syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, - emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl), + registry.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl), registeredCh: make(chan syncer.Event, 1), } @@ -440,7 +440,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) { rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, - emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, syncer.WithEngineRegistry(er)) + registry.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, syncer.WithEngineRegistry(er)) worker := syncer.NewWorkflowRegistry( lggr, @@ -545,6 +545,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) { fetcherFn, nil, nil, + er, emitter, clockwork.NewFakeClock(), workflowkey.Key{}, From 1f466d383e53751a4a21d44cce4f8ba07ad48cb9 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Wed, 26 Feb 2025 15:57:16 -0500 Subject: [PATCH 10/13] missing initialization of metrics struct in handler --- core/services/workflows/delegate.go | 2 +- core/services/workflows/monitoring.go | 8 +++++--- core/services/workflows/syncer/handler.go | 12 +++++++++--- core/services/workflows/syncer/monitoring.go | 10 +++++----- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 20587a92a32..67f19ce9e80 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -118,7 +118,7 @@ func NewDelegate( ratelimiter *ratelimiter.RateLimiter, engineRegistry *registry.EngineRegistry, ) *Delegate { - metrics, err := initWorkflowMonitoringResources() + metrics, err := InitWorkflowMonitoringResources() if err != nil { logger.Criticalw("Failed to initialize workflow monitoring resources", "err", err) } diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index 4c68dfb16ea..61161465622 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -19,8 +19,10 @@ type workflowMetrics struct { totalWorkflows metric.Int64Gauge } -func initWorkflowMonitoringResources() (l *WorkflowMetricLabeler, err error) { - wm := &workflowMetrics{} +// InitWorkflowMonitoringResources is exported to allow the syncer to report on +// totalWorkflows, as well as the delegate in this package +func InitWorkflowMonitoringResources() (l *WorkflowMetricLabeler, err error) { + wm := workflowMetrics{} wm.totalWorkflows, err = beholder.GetMeter().Int64Gauge("platform_workflow_engines_total") if err != nil { @@ -29,7 +31,7 @@ func initWorkflowMonitoringResources() (l *WorkflowMetricLabeler, err error) { l = &WorkflowMetricLabeler{ metrics.NewLabeler(), - *wm, + wm, } return l, nil } diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 6ea86f313db..111605d0005 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -221,9 +221,14 @@ func NewEventHandler( opts ...func(*eventHandler), ) *eventHandler { - m, err := initMonitoringResources() + registryMetrics, err := initMonitoringResources() if err != nil { - lggr.Criticalw("Failed to initialize monitoring resources", "err", err) + lggr.Criticalw("Failed to initialize registry monitoring resources", "err", err) + } + + workflowMetrics, err := workflows.InitWorkflowMonitoringResources() + if err != nil { + lggr.Criticalw("Failed to initialize workflow monitoring resources", "err", err) } eh := &eventHandler{ @@ -233,7 +238,8 @@ func NewEventHandler( capRegistry: capRegistry, fetchFn: fetchFn, engineRegistry: engineRegistry, - registryMetrics: newWorkflowRegistryMetricsLabeler(m), + registryMetrics: *registryMetrics, + workflowMetrics: *workflowMetrics, emitter: emitter, lastFetchedAtMap: newLastFetchedAtMap(), clock: clock, diff --git a/core/services/workflows/syncer/monitoring.go b/core/services/workflows/syncer/monitoring.go index 2c4effcdcb3..f72868bb957 100644 --- a/core/services/workflows/syncer/monitoring.go +++ b/core/services/workflows/syncer/monitoring.go @@ -22,8 +22,8 @@ type workflowRegistryMetrics struct { updateCounter metric.Int64Counter } -func initMonitoringResources() (m *workflowRegistryMetrics, err error) { - m = &workflowRegistryMetrics{} +func initMonitoringResources() (l *workflowRegistryMetricsLabeler, err error) { + m := &workflowRegistryMetrics{} m.activateCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_register") if err != nil { @@ -55,7 +55,7 @@ func initMonitoringResources() (m *workflowRegistryMetrics, err error) { return nil, fmt.Errorf("error initializing update counter: %w", err) } - return m, nil + return newWorkflowRegistryMetricsLabeler(m), nil } // workflowRegistryMetricsLabeler wraps m to provide utilities for @@ -65,8 +65,8 @@ type workflowRegistryMetricsLabeler struct { m *workflowRegistryMetrics } -func newWorkflowRegistryMetricsLabeler(m *workflowRegistryMetrics) workflowRegistryMetricsLabeler { - return workflowRegistryMetricsLabeler{ +func newWorkflowRegistryMetricsLabeler(m *workflowRegistryMetrics) *workflowRegistryMetricsLabeler { + return &workflowRegistryMetricsLabeler{ metrics.NewLabeler(), m, } From 908059236b3c55aa4eb8bc11442ea4f7b417e9df Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Wed, 26 Feb 2025 16:12:15 -0500 Subject: [PATCH 11/13] lint --- core/services/workflows/monitoring.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index 61161465622..707c8028606 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -60,7 +60,6 @@ type engineMetrics struct { workflowUnregisteredCounter metric.Int64Counter workflowExecutionRateLimitGlobalCounter metric.Int64Counter workflowExecutionRateLimitPerUserCounter metric.Int64Counter - workflowExecutionLatencyGauge metric.Int64Gauge // ms workflowStepErrorCounter metric.Int64Counter workflowInitializationCounter metric.Int64Counter engineHeartbeatCounter metric.Int64Counter @@ -114,13 +113,6 @@ func initEngineMonitoringResources() (m *engineMetricLabeler, err error) { return nil, fmt.Errorf("failed to register workflow unregistered counter: %w", err) } - em.workflowExecutionLatencyGauge, err = beholder.GetMeter().Int64Gauge( - "platform_engine_workflow_time", - metric.WithUnit("ms")) - if err != nil { - return nil, fmt.Errorf("failed to register workflow execution latency gauge: %w", err) - } - em.workflowInitializationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_initializations") if err != nil { return nil, fmt.Errorf("failed to register workflow initialization counter: %w", err) @@ -246,11 +238,6 @@ func (c engineMetricLabeler) incrementCapabilityInvocationCounter(ctx context.Co c.em.capabilityInvocationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } -func (c engineMetricLabeler) updateWorkflowExecutionLatencyGauge(ctx context.Context, val int64) { - otelLabels := monutils.KvMapToOtelAttributes(c.Labels) - c.em.workflowExecutionLatencyGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) -} - func (c engineMetricLabeler) incrementTotalWorkflowStepErrorsCounter(ctx context.Context) { otelLabels := monutils.KvMapToOtelAttributes(c.Labels) c.em.workflowStepErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) From 039cb79b40f437ee9dac059ab83b6b787e20b8bf Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Thu, 27 Feb 2025 11:17:36 -0500 Subject: [PATCH 12/13] engine registry failures now exposed to consumers --- .../workflows/syncer/workflow_syncer_test.go | 3 +- core/services/workflows/delegate.go | 7 +- core/services/workflows/syncer/handler.go | 25 +++---- .../services/workflows/syncer/handler_test.go | 69 +++++++++++++++++-- core/services/workflows/syncer/monitoring.go | 12 ++-- 5 files changed, 82 insertions(+), 34 deletions(-) diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 7777463df92..48dfef57724 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -440,7 +440,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) { rl, err := ratelimiter.NewRateLimiter(rlConfig) require.NoError(t, err) handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, - registry.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, syncer.WithEngineRegistry(er)) + registry.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl) worker := syncer.NewWorkflowRegistry( lggr, @@ -550,7 +550,6 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) { clockwork.NewFakeClock(), workflowkey.Key{}, rl, - syncer.WithEngineRegistry(er), syncer.WithEngineFactoryFn(mf.new), ) diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 67f19ce9e80..5ca5ce3dfae 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -40,11 +40,12 @@ func (d *Delegate) AfterJobCreated(jb job.Job) {} func (d *Delegate) BeforeJobDeleted(spec job.Job) {} +// OnDeleteJob removes the engine instance from the registry. +// Engine instance will be closed in the job spawner. func (d *Delegate) OnDeleteJob(ctx context.Context, spec job.Job) error { _, err := d.engineRegistry.Pop(spec.WorkflowSpec.WorkflowID) if err != nil { - d.logger.Errorf("delegate failed to unregister workflow engine for workflow name: %s id: %s: %v", spec.WorkflowSpec.WorkflowName, spec.WorkflowSpec.WorkflowName, err) - return nil + return fmt.Errorf("delegate failed to unregister workflow engine for workflow name: %s id: %s: %v", spec.WorkflowSpec.WorkflowName, spec.WorkflowSpec.WorkflowName, err) } d.metrics.UpdateTotalWorkflowsGauge(ctx, int64(d.engineRegistry.Size())) return nil @@ -94,7 +95,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser err = d.engineRegistry.Add(spec.WorkflowSpec.WorkflowID, engine) if err != nil { - d.logger.Errorf("delegate failed to register workflow engine for workflow name: %s id: %s: %v", cfg.WorkflowName.String(), cfg.WorkflowID, err) + return nil, fmt.Errorf("delegate failed to register workflow engine for workflow name: %s id: %s: %v", cfg.WorkflowName.String(), cfg.WorkflowID, err) } d.metrics.UpdateTotalWorkflowsGauge(ctx, int64(d.engineRegistry.Size())) d.logger.Infow("Creating Workflow Engine for workflow spec", "workflowID", spec.WorkflowSpec.WorkflowID, "workflowOwner", spec.WorkflowSpec.WorkflowOwner, "workflowName", spec.WorkflowSpec.WorkflowName, "jobName", spec.Name) diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 111605d0005..d2089d86086 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -188,12 +188,6 @@ type Event interface { var defaultSecretsFreshnessDuration = 24 * time.Hour -func WithEngineRegistry(er *registry.EngineRegistry) func(*eventHandler) { - return func(e *eventHandler) { - e.engineRegistry = er - } -} - func WithEngineFactoryFn(efn engineFactoryFn) func(*eventHandler) { return func(e *eventHandler) { e.engineFactory = efn @@ -382,7 +376,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { metrics.incrementRegisterCounter(ctx) - // intentionally without workflow specific labels + // intentionally without workflow specific labels. measures total workflows on this node. h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow registration event", "workflowID", wfID) return nil @@ -440,7 +434,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { metrics.incrementPauseCounter(ctx) - // intentionally without workflow specific labels + // intentionally without workflow specific labels. measures total workflows on this node. h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow paused event", "workflowID", wfID) return nil @@ -471,7 +465,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { metrics.incrementActivateCounter(ctx) - // intentionally without workflow specific labels + // intentionally without workflow specific labels. measures total workflows on this node. h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow activated event", "workflowID", wfID) return nil @@ -501,7 +495,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { } metrics.incrementDeleteCounter(ctx) - // intentionally without workflow specific labels + // intentionally without workflow specific labels. measures total workflows on this node. h.workflowMetrics.UpdateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow deleted event", "workflowID", wfID) return nil @@ -612,14 +606,13 @@ func (h *eventHandler) workflowRegisteredEvent( return fmt.Errorf("failed to create workflow engine: %w", err) } - if err := engine.Start(ctx); err != nil { - return fmt.Errorf("failed to start workflow engine: %w", err) + // add the engine to the registry before we start so it can be deleted on close + if err := h.engineRegistry.Add(wfID, engine); err != nil { + return fmt.Errorf("failed to add new engine to registry: %w", err) } - // This shouldn't happen because we call the handler serially and - // check for running engines above, see the call to engineRegistry.IsRunning. - if err := h.engineRegistry.Add(wfID, engine); err != nil { - return fmt.Errorf("invariant violation: %w", err) + if err := engine.Start(ctx); err != nil { + return fmt.Errorf("failed to start workflow engine: %w", err) } return nil diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index 5914ff61a19..48218f7d12b 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -370,8 +370,9 @@ func Test_workflowRegisteredHandler(t *testing.T) { }, validationFn: func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) { me := &mockEngine{} - h.engineRegistry.Add(wfID, me) - err := h.workflowRegisteredEvent(ctx, event) + err := h.engineRegistry.Add(wfID, me) + require.NoError(t, err) + err = h.workflowRegisteredEvent(ctx, event) require.Error(t, err) require.ErrorContains(t, err, "workflow is already running") }, @@ -510,9 +511,7 @@ func testRunningWorkflow(t *testing.T, tc testCase) { event := tc.Event(giveWFID[:]) er := ereg.NewEngineRegistry() - opts := []func(*eventHandler){ - WithEngineRegistry(er), - } + var opts []func(*eventHandler) if tc.engineFactoryFn != nil { opts = append(opts, WithEngineFactoryFn(tc.engineFactoryFn)) } @@ -584,7 +583,6 @@ func Test_workflowDeletedHandler(t *testing.T) { clockwork.NewFakeClock(), workflowkey.Key{}, rl, - WithEngineRegistry(er), ) err = h.workflowRegisteredEvent(ctx, active) require.NoError(t, err) @@ -619,6 +617,64 @@ func Test_workflowDeletedHandler(t *testing.T) { _, err = h.engineRegistry.Get(wfIDs) require.Error(t, err) }) + t.Run("success deleting non-existing workflow spec", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + db = pgtest.NewSqlxDB(t) + orm = NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) + encodedBinary = []byte(base64.StdEncoding.EncodeToString(binary)) + config = []byte("") + secretsURL = "http://example.com" + binaryURL = "http://example.com/binary" + configURL = "http://example.com/config" + wfOwner = []byte("0xOwner") + + fetcher = newMockFetcher(map[string]mockFetchResp{ + binaryURL: {Body: encodedBinary, Err: nil}, + configURL: {Body: config, Err: nil}, + secretsURL: {Body: []byte("secrets"), Err: nil}, + }) + ) + + giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, "workflow-name", binary, config, secretsURL) + require.NoError(t, err) + + er := ereg.NewEngineRegistry() + store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + rl, err := ratelimiter.NewRateLimiter(rlConfig) + require.NoError(t, err) + h := NewEventHandler( + lggr, + orm, + fetcher, + store, + registry, + er, + emitter, + clockwork.NewFakeClock(), + workflowkey.Key{}, + rl, + ) + + deleteEvent := WorkflowRegistryWorkflowDeletedV1{ + WorkflowID: giveWFID, + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + DonID: 1, + } + err = h.workflowDeletedEvent(ctx, deleteEvent) + require.NoError(t, err) + + // Verify the record is deleted in the database + _, err = orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + require.Error(t, err) + }) } func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { @@ -686,7 +742,6 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { clockwork.NewFakeClock(), workflowkey.Key{}, rl, - WithEngineRegistry(er), ) err = h.workflowRegisteredEvent(ctx, active) require.NoError(t, err) diff --git a/core/services/workflows/syncer/monitoring.go b/core/services/workflows/syncer/monitoring.go index f72868bb957..b08f8802a59 100644 --- a/core/services/workflows/syncer/monitoring.go +++ b/core/services/workflows/syncer/monitoring.go @@ -25,32 +25,32 @@ type workflowRegistryMetrics struct { func initMonitoringResources() (l *workflowRegistryMetricsLabeler, err error) { m := &workflowRegistryMetrics{} - m.activateCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_register") + m.activateCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_activate") if err != nil { return nil, fmt.Errorf("error initializing activate counter: %w", err) } - m.deleteCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_delete") + m.deleteCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_delete") if err != nil { return nil, fmt.Errorf("error initializing delete counter: %w", err) } - m.forceUpdateSecretsCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_force_update_secrets") + m.forceUpdateSecretsCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_force_update_secrets") if err != nil { return nil, fmt.Errorf("error initializing force update secrets counter: %w", err) } - m.pauseCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_pause") + m.pauseCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_pause") if err != nil { return nil, fmt.Errorf("error initializing pause counter: %w", err) } - m.registerCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_register") + m.registerCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_register") if err != nil { return nil, fmt.Errorf("error initializing register counter: %w", err) } - m.updateCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_update") + m.updateCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_events_update") if err != nil { return nil, fmt.Errorf("error initializing update counter: %w", err) } From 893d060f7c3cbb8408b4ec5d607d6ba5ef3dbbf7 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Thu, 27 Feb 2025 15:54:02 -0500 Subject: [PATCH 13/13] adding comments --- core/services/job/spawner.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/services/job/spawner.go b/core/services/job/spawner.go index 0727333d7ee..8042b8384c8 100644 --- a/core/services/job/spawner.go +++ b/core/services/job/spawner.go @@ -56,7 +56,10 @@ type ( lbDependentAwaiters []utils.DependentAwaiter } - // TODO(spook): I can't wait for Go generics + // Delegate allows for multiple types of jobs and their related + // services + subservices to be managed by a Spawner. + // Errors should not be silenced; partial creation or + // deletion states should be bubbled out to be handled by the Spawner. Delegate interface { JobType() Type // BeforeJobCreated is only called once on first time job create. @@ -73,6 +76,7 @@ type ( // non-db side effects. This is required in order to guarantee mutual atomicity between // all tasks intended to happen during job deletion. For the same reason, the job will // not show up in the db within OnDeleteJob(), even though it is still actively running. + // A failure in OnDeleteJob will cause a total rollback of the delete operation. OnDeleteJob(ctx context.Context, jb Job) error }