From 86e8af22bb5163fd3249c6fe37d1947443615714 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Fri, 21 Feb 2025 19:13:10 -0500 Subject: [PATCH] Adding metrics to workflow registry syncer and lacking test coverage --- .../workflows/syncer/engine_registry.go | 12 +- .../workflows/syncer/engine_registry_test.go | 157 ++++++++++++++++++ core/services/workflows/syncer/handler.go | 56 +++++++ core/services/workflows/syncer/monitoring.go | 119 +++++++++++++ .../workflows/syncer/monitoring_test.go | 20 +++ 5 files changed, 360 insertions(+), 4 deletions(-) create mode 100644 core/services/workflows/syncer/engine_registry_test.go create mode 100644 core/services/workflows/syncer/monitoring.go create mode 100644 core/services/workflows/syncer/monitoring_test.go diff --git a/core/services/workflows/syncer/engine_registry.go b/core/services/workflows/syncer/engine_registry.go index ecea3b98c2f..f3441823c93 100644 --- a/core/services/workflows/syncer/engine_registry.go +++ b/core/services/workflows/syncer/engine_registry.go @@ -70,11 +70,15 @@ func (r *EngineRegistry) Close() error { defer r.mu.Unlock() var err error for id, engine := range r.engines { - closeErr := engine.Close() - if closeErr != nil { - err = errors.Join(err, closeErr) - } + err = errors.Join(err, engine.Close()) delete(r.engines, id) } return err } + +func (r *EngineRegistry) Size() int { + r.mu.RLock() + defer r.mu.RUnlock() + + return len(r.engines) +} diff --git a/core/services/workflows/syncer/engine_registry_test.go b/core/services/workflows/syncer/engine_registry_test.go new file mode 100644 index 00000000000..360076243de --- /dev/null +++ b/core/services/workflows/syncer/engine_registry_test.go @@ -0,0 +1,157 @@ +package syncer_test + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" +) + +// mockService implements the services.Service interface for testing. +type mockService struct { + mock.Mock +} + +func (m *mockService) Start(ctx context.Context) error { + return nil +} + +func (m *mockService) HealthReport() map[string]error { + return nil +} + +func (m *mockService) Name() string { + return "MockEngine" +} + +func (m *mockService) Ready() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockService) Close() error { + args := m.Called() + return args.Error(0) +} + +func TestEngineRegistry_AddAndGet(t *testing.T) { + registry := syncer.NewEngineRegistry() + + svc := mockService{} + svc.On("Ready").Return(nil) + + // Add the service + err := registry.Add("engine1", &svc) + require.NoError(t, err) + + // Try adding the same ID again; expect an error + err = registry.Add("engine1", &svc) + require.Error(t, err) + require.Contains(t, err.Error(), "duplicate") + + // Get the service + gotSvc, err := registry.Get("engine1") + require.NoError(t, err) + require.Equal(t, svc.Name(), gotSvc.Name()) + + // Try getting a non-existent engine + _, err = registry.Get("non-existent") + require.Error(t, err) + require.Contains(t, err.Error(), "not found") +} + +func TestEngineRegistry_IsRunning(t *testing.T) { + registry := syncer.NewEngineRegistry() + + runningSvc := &mockService{} + runningSvc.On("Ready").Return(nil) + + notRunningSvc := &mockService{} + notRunningSvc.On("Ready").Return(errors.New("not ready")) + + _ = registry.Add("running", runningSvc) + _ = registry.Add("notRunning", notRunningSvc) + + require.True(t, registry.IsRunning("running"), "expected running to be true if Ready() == nil") + require.False(t, registry.IsRunning("notRunning"), "expected running to be false if Ready() != nil") + require.False(t, registry.IsRunning("non-existent"), "expected false for non-existent engine") +} + +func TestEngineRegistry_Pop(t *testing.T) { + registry := syncer.NewEngineRegistry() + + svc := &mockService{} + err := registry.Add("id1", svc) + require.NoError(t, err) + + // Pop should remove and return the service + poppedSvc, err := registry.Pop("id1") + require.NoError(t, err) + require.Equal(t, svc.Name(), poppedSvc.Name()) + + // After popping, engine should not exist in registry + _, err = registry.Get("id1") + require.Error(t, err) + + // Popping a non-existent engine + _, err = registry.Pop("non-existent") + require.Error(t, err) +} + +func TestEngineRegistry_Close(t *testing.T) { + registry := syncer.NewEngineRegistry() + + // Set up multiple services to test aggregated errors + svc1 := &mockService{} + svc2 := &mockService{} + svc3 := &mockService{} + + // We want to track Close calls and possibly return different errors + svc1.On("Close").Return(nil).Once() + svc2.On("Close").Return(errors.New("close error on svc2")).Once() + svc3.On("Close").Return(errors.New("close error on svc3")).Once() + + _ = registry.Add("svc1", svc1) + _ = registry.Add("svc2", svc2) + _ = registry.Add("svc3", svc3) + + // Call Close + err := registry.Close() + // We expect both errors from svc2 and svc3 to be joined + require.Error(t, err, "expected an aggregated error from multiple closes") + // Check that the error message contains both parts + require.True(t, strings.Contains(err.Error(), "svc2") && strings.Contains(err.Error(), "svc3"), + "expected error to contain all relevant messages") + + // After Close, registry should be empty + require.Equal(t, 0, registry.Size(), "registry should be empty after Close") + + // Additional Close calls should not error because registry is empty + err = registry.Close() + require.NoError(t, err, "expected no error on subsequent Close calls when registry is empty") + + // Ensure the mock expectations have been met (all calls accounted for) + svc1.AssertExpectations(t) + svc2.AssertExpectations(t) + svc3.AssertExpectations(t) +} + +func TestEngineRegistry_Size(t *testing.T) { + registry := syncer.NewEngineRegistry() + require.Equal(t, 0, registry.Size(), "initial registry should have size 0") + + _ = registry.Add("id1", new(mockService)) + _ = registry.Add("id2", new(mockService)) + require.Equal(t, 2, registry.Size()) + + _, _ = registry.Pop("id1") + require.Equal(t, 1, registry.Size()) + + _ = registry.Close() + require.Equal(t, 0, registry.Size()) +} diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 8ff3e062026..7c74ec76b7b 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -169,6 +169,7 @@ type eventHandler struct { workflowStore store.Store capRegistry core.CapabilitiesRegistry engineRegistry *EngineRegistry + metrics workflowRegistryMetricsLabeler emitter custmsg.MessageEmitter lastFetchedAtMap *lastFetchedAtMap clock clockwork.Clock @@ -216,6 +217,12 @@ func NewEventHandler( ratelimiter *ratelimiter.RateLimiter, opts ...func(*eventHandler), ) *eventHandler { + + m, err := initMonitoringResources() + if err != nil { + lggr.Fatalw("Failed to initialize monitoring resources", "err", err) + } + eh := &eventHandler{ lggr: lggr, orm: orm, @@ -223,6 +230,7 @@ func NewEventHandler( capRegistry: capRegistry, fetchFn: fetchFn, engineRegistry: NewEngineRegistry(), + metrics: newWorkflowRegistryMetricsLabeler(m), emitter: emitter, lastFetchedAtMap: newLastFetchedAtMap(), clock: clock, @@ -326,11 +334,17 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), ) + metrics := h.metrics.with( + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), + ) + if _, err := h.forceUpdateSecretsEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr) return err } + metrics.incrementForceUpdateSecretsCounter(ctx) h.lggr.Debugw("handled force update secrets events for URL hash", "urlHash", payload.SecretsURLHash) return nil case WorkflowRegisteredEvent: @@ -346,11 +360,21 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.metrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowRegisteredEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow registered event: %v", err), h.lggr) return err } + metrics.incrementRegisterCounter(ctx) + + // intentionally without workflow specific labels + h.metrics.updateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow registration event", "workflowID", wfID) return nil case WorkflowUpdatedEvent: @@ -366,11 +390,18 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.metrics.with( + platform.KeyWorkflowID, newWorkflowID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowUpdatedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow updated event: %v", err), h.lggr) return err } + metrics.incrementUpdateCounter(ctx) h.lggr.Debugw("handled workflow updated event", "workflowID", newWorkflowID) return nil case WorkflowPausedEvent: @@ -387,10 +418,18 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.metrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowPausedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr) return err } + + metrics.incrementPauseCounter(ctx) h.lggr.Debugw("handled workflow paused event", "workflowID", wfID) return nil case WorkflowActivatedEvent: @@ -406,11 +445,19 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowName, payload.WorkflowName, platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + + metrics := h.metrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowActivatedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", err), h.lggr) return err } + metrics.incrementActivateCounter(ctx) h.lggr.Debugw("handled workflow activated event", "workflowID", wfID) return nil case WorkflowDeletedEvent: @@ -427,11 +474,20 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error { platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), ) + metrics := h.metrics.with( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowDeletedEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", err), h.lggr) return err } + metrics.incrementDeleteCounter(ctx) + // intentionally without workflow specific labels + h.metrics.updateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size())) h.lggr.Debugw("handled workflow deleted event", "workflowID", wfID) return nil default: diff --git a/core/services/workflows/syncer/monitoring.go b/core/services/workflows/syncer/monitoring.go new file mode 100644 index 00000000000..27dbdfc49b1 --- /dev/null +++ b/core/services/workflows/syncer/monitoring.go @@ -0,0 +1,119 @@ +package syncer + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/metrics" + monutils "github.com/smartcontractkit/chainlink/v2/core/monitoring" +) + +// workflow registry metrics is to locally scope instruments to avoid +// data races in testing +type workflowRegistryMetrics struct { + activateCounter metric.Int64Counter + deleteCounter metric.Int64Counter + forceUpdateSecretsCounter metric.Int64Counter + pauseCounter metric.Int64Counter + registerCounter metric.Int64Counter + updateCounter metric.Int64Counter + + totalWorkflows metric.Int64Gauge +} + +func initMonitoringResources() (m *workflowRegistryMetrics, err error) { + m = &workflowRegistryMetrics{} + + m.activateCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_register") + if err != nil { + return nil, fmt.Errorf("error initializing activate counter: %v", err) + } + + m.deleteCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_delete") + if err != nil { + return nil, fmt.Errorf("error initializing delete counter: %v", err) + } + + m.forceUpdateSecretsCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_force_update_secrets") + if err != nil { + return nil, fmt.Errorf("error initializing force update secrets counter: %v", err) + } + + m.pauseCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_pause") + if err != nil { + return nil, fmt.Errorf("error initializing pause counter: %v", err) + } + + m.registerCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_register") + if err != nil { + return nil, fmt.Errorf("error initializing register counter: %v", err) + } + + m.updateCounter, err = beholder.GetMeter().Int64Counter("platform_workflow_syncer_update") + if err != nil { + return nil, fmt.Errorf("error initializing update counter: %v", err) + } + + m.totalWorkflows, err = beholder.GetMeter().Int64Gauge("platform_workflow_syncer_total") + if err != nil { + return nil, fmt.Errorf("error initializing total workflows: %v", err) + } + + return m, nil +} + +// workflowRegistryMetricsLabeler wraps m to provide utilities for +// monitoring instrumentation +type workflowRegistryMetricsLabeler struct { + metrics.Labeler + m *workflowRegistryMetrics +} + +func newWorkflowRegistryMetricsLabeler(m *workflowRegistryMetrics) workflowRegistryMetricsLabeler { + return workflowRegistryMetricsLabeler{ + metrics.NewLabeler(), + m, + } +} + +func (l workflowRegistryMetricsLabeler) with(keyValues ...string) workflowRegistryMetricsLabeler { + return workflowRegistryMetricsLabeler{l.With(keyValues...), l.m} +} + +func (l workflowRegistryMetricsLabeler) incrementActivateCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.activateCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementDeleteCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.deleteCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementForceUpdateSecretsCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.forceUpdateSecretsCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementPauseCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.pauseCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementRegisterCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.registerCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) incrementUpdateCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.updateCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (l workflowRegistryMetricsLabeler) updateTotalWorkflowsGauge(ctx context.Context, val int64) { + otelLabels := monutils.KvMapToOtelAttributes(l.Labels) + l.m.totalWorkflows.Record(ctx, val, metric.WithAttributes(otelLabels...)) +} diff --git a/core/services/workflows/syncer/monitoring_test.go b/core/services/workflows/syncer/monitoring_test.go new file mode 100644 index 00000000000..763f25929c3 --- /dev/null +++ b/core/services/workflows/syncer/monitoring_test.go @@ -0,0 +1,20 @@ +package syncer + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/metrics" +) + +func Test_InitMonitoringResources(t *testing.T) { + _, err := initMonitoringResources() + require.NoError(t, err) +} + +func Test_WorkflowMetricsLabeler(t *testing.T) { + testWorkflowsMetricLabeler := workflowRegistryMetricsLabeler{metrics.NewLabeler(), &workflowRegistryMetrics{}} + testWorkflowsMetricLabeler2 := testWorkflowsMetricLabeler.with("foo", "baz") + require.EqualValues(t, testWorkflowsMetricLabeler2.Labels["foo"], "baz") +}