Skip to content

Commit

Permalink
Adding metrics to workflow registry syncer and lacking test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickhuie19 committed Feb 22, 2025
1 parent f928260 commit 86e8af2
Show file tree
Hide file tree
Showing 5 changed files with 360 additions and 4 deletions.
12 changes: 8 additions & 4 deletions core/services/workflows/syncer/engine_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,15 @@ func (r *EngineRegistry) Close() error {
defer r.mu.Unlock()
var err error
for id, engine := range r.engines {
closeErr := engine.Close()
if closeErr != nil {
err = errors.Join(err, closeErr)
}
err = errors.Join(err, engine.Close())
delete(r.engines, id)
}
return err
}

func (r *EngineRegistry) Size() int {
r.mu.RLock()
defer r.mu.RUnlock()

return len(r.engines)
}
157 changes: 157 additions & 0 deletions core/services/workflows/syncer/engine_registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package syncer_test

import (
"context"
"errors"
"strings"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
)

// mockService implements the services.Service interface for testing.
type mockService struct {
mock.Mock
}

func (m *mockService) Start(ctx context.Context) error {
return nil
}

func (m *mockService) HealthReport() map[string]error {
return nil
}

func (m *mockService) Name() string {
return "MockEngine"
}

func (m *mockService) Ready() error {
args := m.Called()
return args.Error(0)
}

func (m *mockService) Close() error {
args := m.Called()
return args.Error(0)
}

func TestEngineRegistry_AddAndGet(t *testing.T) {
registry := syncer.NewEngineRegistry()

svc := mockService{}
svc.On("Ready").Return(nil)

// Add the service
err := registry.Add("engine1", &svc)
require.NoError(t, err)

// Try adding the same ID again; expect an error
err = registry.Add("engine1", &svc)
require.Error(t, err)
require.Contains(t, err.Error(), "duplicate")

// Get the service
gotSvc, err := registry.Get("engine1")
require.NoError(t, err)
require.Equal(t, svc.Name(), gotSvc.Name())

// Try getting a non-existent engine
_, err = registry.Get("non-existent")
require.Error(t, err)
require.Contains(t, err.Error(), "not found")
}

func TestEngineRegistry_IsRunning(t *testing.T) {
registry := syncer.NewEngineRegistry()

runningSvc := &mockService{}
runningSvc.On("Ready").Return(nil)

notRunningSvc := &mockService{}
notRunningSvc.On("Ready").Return(errors.New("not ready"))

_ = registry.Add("running", runningSvc)
_ = registry.Add("notRunning", notRunningSvc)

require.True(t, registry.IsRunning("running"), "expected running to be true if Ready() == nil")
require.False(t, registry.IsRunning("notRunning"), "expected running to be false if Ready() != nil")
require.False(t, registry.IsRunning("non-existent"), "expected false for non-existent engine")
}

func TestEngineRegistry_Pop(t *testing.T) {
registry := syncer.NewEngineRegistry()

svc := &mockService{}
err := registry.Add("id1", svc)
require.NoError(t, err)

// Pop should remove and return the service
poppedSvc, err := registry.Pop("id1")
require.NoError(t, err)
require.Equal(t, svc.Name(), poppedSvc.Name())

// After popping, engine should not exist in registry
_, err = registry.Get("id1")
require.Error(t, err)

// Popping a non-existent engine
_, err = registry.Pop("non-existent")
require.Error(t, err)
}

func TestEngineRegistry_Close(t *testing.T) {
registry := syncer.NewEngineRegistry()

// Set up multiple services to test aggregated errors
svc1 := &mockService{}
svc2 := &mockService{}
svc3 := &mockService{}

// We want to track Close calls and possibly return different errors
svc1.On("Close").Return(nil).Once()
svc2.On("Close").Return(errors.New("close error on svc2")).Once()
svc3.On("Close").Return(errors.New("close error on svc3")).Once()

_ = registry.Add("svc1", svc1)
_ = registry.Add("svc2", svc2)
_ = registry.Add("svc3", svc3)

// Call Close
err := registry.Close()
// We expect both errors from svc2 and svc3 to be joined
require.Error(t, err, "expected an aggregated error from multiple closes")
// Check that the error message contains both parts
require.True(t, strings.Contains(err.Error(), "svc2") && strings.Contains(err.Error(), "svc3"),
"expected error to contain all relevant messages")

// After Close, registry should be empty
require.Equal(t, 0, registry.Size(), "registry should be empty after Close")

// Additional Close calls should not error because registry is empty
err = registry.Close()
require.NoError(t, err, "expected no error on subsequent Close calls when registry is empty")

// Ensure the mock expectations have been met (all calls accounted for)
svc1.AssertExpectations(t)
svc2.AssertExpectations(t)
svc3.AssertExpectations(t)
}

func TestEngineRegistry_Size(t *testing.T) {
registry := syncer.NewEngineRegistry()
require.Equal(t, 0, registry.Size(), "initial registry should have size 0")

_ = registry.Add("id1", new(mockService))
_ = registry.Add("id2", new(mockService))
require.Equal(t, 2, registry.Size())

_, _ = registry.Pop("id1")
require.Equal(t, 1, registry.Size())

_ = registry.Close()
require.Equal(t, 0, registry.Size())
}
56 changes: 56 additions & 0 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type eventHandler struct {
workflowStore store.Store
capRegistry core.CapabilitiesRegistry
engineRegistry *EngineRegistry
metrics workflowRegistryMetricsLabeler
emitter custmsg.MessageEmitter
lastFetchedAtMap *lastFetchedAtMap
clock clockwork.Clock
Expand Down Expand Up @@ -216,13 +217,20 @@ func NewEventHandler(
ratelimiter *ratelimiter.RateLimiter,
opts ...func(*eventHandler),
) *eventHandler {

m, err := initMonitoringResources()
if err != nil {
lggr.Fatalw("Failed to initialize monitoring resources", "err", err)
}

eh := &eventHandler{
lggr: lggr,
orm: orm,
workflowStore: workflowStore,
capRegistry: capRegistry,
fetchFn: fetchFn,
engineRegistry: NewEngineRegistry(),
metrics: newWorkflowRegistryMetricsLabeler(m),
emitter: emitter,
lastFetchedAtMap: newLastFetchedAtMap(),
clock: clock,
Expand Down Expand Up @@ -326,11 +334,17 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner),
)

metrics := h.metrics.with(
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner),
)

if _, err := h.forceUpdateSecretsEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr)
return err
}

metrics.incrementForceUpdateSecretsCounter(ctx)
h.lggr.Debugw("handled force update secrets events for URL hash", "urlHash", payload.SecretsURLHash)
return nil
case WorkflowRegisteredEvent:
Expand All @@ -346,11 +360,21 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

metrics := h.metrics.with(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowRegisteredEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow registered event: %v", err), h.lggr)
return err
}

metrics.incrementRegisterCounter(ctx)

// intentionally without workflow specific labels
h.metrics.updateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size()))
h.lggr.Debugw("handled workflow registration event", "workflowID", wfID)
return nil
case WorkflowUpdatedEvent:
Expand All @@ -366,11 +390,18 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

metrics := h.metrics.with(
platform.KeyWorkflowID, newWorkflowID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowUpdatedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow updated event: %v", err), h.lggr)
return err
}

metrics.incrementUpdateCounter(ctx)
h.lggr.Debugw("handled workflow updated event", "workflowID", newWorkflowID)
return nil
case WorkflowPausedEvent:
Expand All @@ -387,10 +418,18 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

metrics := h.metrics.with(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowPausedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr)
return err
}

metrics.incrementPauseCounter(ctx)
h.lggr.Debugw("handled workflow paused event", "workflowID", wfID)
return nil
case WorkflowActivatedEvent:
Expand All @@ -406,11 +445,19 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

metrics := h.metrics.with(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowActivatedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", err), h.lggr)
return err
}

metrics.incrementActivateCounter(ctx)
h.lggr.Debugw("handled workflow activated event", "workflowID", wfID)
return nil
case WorkflowDeletedEvent:
Expand All @@ -427,11 +474,20 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

metrics := h.metrics.with(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowDeletedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", err), h.lggr)
return err
}

metrics.incrementDeleteCounter(ctx)
// intentionally without workflow specific labels
h.metrics.updateTotalWorkflowsGauge(ctx, int64(h.engineRegistry.Size()))
h.lggr.Debugw("handled workflow deleted event", "workflowID", wfID)
return nil
default:
Expand Down
Loading

0 comments on commit 86e8af2

Please sign in to comment.