Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Workflow Registry Metrics and remove executions running counter #16463

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
11 changes: 11 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry"
workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
Expand Down Expand Up @@ -488,6 +489,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
opts.CapabilitiesRegistry,
workflowORM,
creServices.workflowRateLimiter,
creServices.engineRegistry,
)

// Flux monitor requires ethereum just to boot, silence errors with a null delegate
Expand Down Expand Up @@ -720,6 +722,10 @@ type CREServices struct {
// gatewayConnectorWrapper is the wrapper for the gateway connector
// it is exposed because there are contingent services in the application
gatewayConnectorWrapper *gatewayconnector.ServiceWrapper
// engineRegistry is exposed so that both the delegate
// and syncer paths share an underlying store of Engine
// instances
engineRegistry *registry.EngineRegistry
// srvs are all the services that are created, including those that are explicitly exposed
srvs []services.ServiceCtx
}
Expand All @@ -733,6 +739,9 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) {
opts = cscfg.CREOpts
ds = cscfg.DS
)
// engineRegistry is consumed by the workflow job delegate and by the handler
// for the workflow registry for a unified backend store of engine instances
engineRegistry := registry.NewEngineRegistry()
var srvcs []services.ServiceCtx
workflowRateLimiter, err := ratelimiter.NewRateLimiter(ratelimiter.Config{
GlobalRPS: capCfg.RateLimit().GlobalRPS(),
Expand Down Expand Up @@ -845,6 +854,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) {
fetcherFunc,
workflowstore.NewDBStore(ds, lggr, clockwork.NewRealClock()),
opts.CapabilitiesRegistry,
engineRegistry,
custmsg.NewLabeler(),
clockwork.NewRealClock(),
keys[0],
Expand Down Expand Up @@ -887,6 +897,7 @@ func newCREServices(cscfg creServiceConfig) (*CREServices, error) {
return &CREServices{
workflowRateLimiter: workflowRateLimiter,
gatewayConnectorWrapper: gatewayConnectorWrapper,
engineRegistry: engineRegistry,
srvs: srvcs,
}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"

Expand Down Expand Up @@ -356,7 +357,7 @@ func Test_SecretsWorker(t *testing.T) {
require.NoError(t, err)
handler := &testSecretsWorkEventHandler{
wrappedHandler: syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl),
registry.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl),
registeredCh: make(chan syncer.Event, 1),
}

Expand Down Expand Up @@ -435,11 +436,11 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) {
require.NoError(t, err)
giveWorkflow.ID = id

er := syncer.NewEngineRegistry()
er := registry.NewEngineRegistry()
rl, err := ratelimiter.NewRateLimiter(rlConfig)
require.NoError(t, err)
handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, syncer.WithEngineRegistry(er))
registry.NewEngineRegistry(), emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl)

worker := syncer.NewWorkflowRegistry(
lggr,
Expand Down Expand Up @@ -535,7 +536,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) {
giveWorkflow.ID = id

mf := &mockEngineFactory{}
er := syncer.NewEngineRegistry()
er := registry.NewEngineRegistry()
rl, err := ratelimiter.NewRateLimiter(rlConfig)
require.NoError(t, err)
handler := syncer.NewEventHandler(
Expand All @@ -544,11 +545,11 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) {
fetcherFn,
nil,
nil,
er,
emitter,
clockwork.NewFakeClock(),
workflowkey.Key{},
rl,
syncer.WithEngineRegistry(er),
syncer.WithEngineFactoryFn(mf.new),
)

Expand Down
30 changes: 28 additions & 2 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/pelletier/go-toml"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"

"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/registry"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
Expand All @@ -24,6 +24,8 @@ type Delegate struct {
logger logger.Logger
store store.Store
ratelimiter *ratelimiter.RateLimiter
engineRegistry *registry.EngineRegistry
metrics WorkflowMetricLabeler
}

var _ job.Delegate = (*Delegate)(nil)
Expand All @@ -38,11 +40,21 @@ func (d *Delegate) AfterJobCreated(jb job.Job) {}

func (d *Delegate) BeforeJobDeleted(spec job.Job) {}

func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil }
// OnDeleteJob removes the engine instance from the registry.
// Engine instance will be closed in the job spawner.
func (d *Delegate) OnDeleteJob(ctx context.Context, spec job.Job) error {
_, err := d.engineRegistry.Pop(spec.WorkflowSpec.WorkflowID)
if err != nil {
return fmt.Errorf("delegate failed to unregister workflow engine for workflow name: %s id: %s: %v", spec.WorkflowSpec.WorkflowName, spec.WorkflowSpec.WorkflowName, err)
}
d.metrics.UpdateTotalWorkflowsGauge(ctx, int64(d.engineRegistry.Size()))
return nil
}

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) {
cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, spec.WorkflowSpec.WorkflowID, platform.KeyWorkflowOwner, spec.WorkflowSpec.WorkflowOwner, platform.KeyWorkflowName, spec.WorkflowSpec.WorkflowName)

sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger)
Expand Down Expand Up @@ -80,6 +92,12 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
if err != nil {
return nil, err
}

err = d.engineRegistry.Add(spec.WorkflowSpec.WorkflowID, engine)
if err != nil {
return nil, fmt.Errorf("delegate failed to register workflow engine for workflow name: %s id: %s: %v", cfg.WorkflowName.String(), cfg.WorkflowID, err)
}
d.metrics.UpdateTotalWorkflowsGauge(ctx, int64(d.engineRegistry.Size()))
d.logger.Infow("Creating Workflow Engine for workflow spec", "workflowID", spec.WorkflowSpec.WorkflowID, "workflowOwner", spec.WorkflowSpec.WorkflowOwner, "workflowName", spec.WorkflowSpec.WorkflowName, "jobName", spec.Name)
return []job.ServiceCtx{engine}, nil
}
Expand All @@ -99,13 +117,21 @@ func NewDelegate(
registry core.CapabilitiesRegistry,
store store.Store,
ratelimiter *ratelimiter.RateLimiter,
engineRegistry *registry.EngineRegistry,
) *Delegate {
metrics, err := InitWorkflowMonitoringResources()
if err != nil {
logger.Criticalw("Failed to initialize workflow monitoring resources", "err", err)
}

return &Delegate{
logger: logger,
registry: registry,
engineRegistry: engineRegistry,
secretsFetcher: newNoopSecretsFetcher(),
store: store,
ratelimiter: ratelimiter,
metrics: *metrics,
}
}

Expand Down
12 changes: 5 additions & 7 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/metrics"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
Expand Down Expand Up @@ -104,7 +103,7 @@ type secretsFetcher interface {
type Engine struct {
services.StateMachine
cma custmsg.MessageEmitter
metrics workflowsMetricLabeler
metrics engineMetricLabeler
logger logger.Logger
registry core.CapabilitiesRegistry
workflow *workflow
Expand Down Expand Up @@ -1153,7 +1152,6 @@ func (e *Engine) heartbeat(ctx context.Context) {
return
case <-ticker.C:
e.metrics.incrementEngineHeartbeatCounter(ctx)
e.metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am removing this metric entirely. Read PR description for why

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... this still feels like it could be useful @patrickhuie19 -- I can imagine scenarios where the number of executions is just increasing, and not going down such that we'll eventually exhaust the queue. In that case this metric would help us diagnose what's going on.

You mentioned that this relates to deletes of the workflow engine -- can you say more about how? I'm not following entirely

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that one source of confusion here is that this name of this metric (total workflows running) doesn't correspond to what it's measuring (number of executions running an in instance of the workflow engine); maybe that's the source of confusion here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be more confusing than helpful. The name is part of it. The metric is updated on a heartbeat, which is async to engine.Start() and engine.Close().

I can imagine scenarios where the number of executions is just increasing, and not going down such that we'll eventually exhaust the queue

Thanks for the example use case. Would we be able to satisfy this by looking at the completed duration histogram metrics and observing no executions are finishing?

logCustMsg(ctx, e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger)
}
}
Expand Down Expand Up @@ -1266,7 +1264,7 @@ const (
defaultQueueSize = 100000
defaultNewWorkerTimeout = 2 * time.Second
defaultMaxExecutionDuration = 10 * time.Minute
defaultHeartbeatCadence = 5 * time.Minute
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 minutes is anemic

defaultHeartbeatCadence = 1 * time.Minute
defaultStepTimeout = 2 * time.Minute
)

Expand Down Expand Up @@ -1340,7 +1338,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
// - etc.

// spin up monitoring resources
em, err := initMonitoringResources()
engineMetrics, err := initEngineMonitoringResources()
if err != nil {
return nil, fmt.Errorf("could not initialize monitoring resources: %w", err)
}
Expand All @@ -1358,8 +1356,8 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {

engine = &Engine{
cma: cma,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()), *em},
logger: cfg.Lggr.Named("WorkflowEngine").With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()),
metrics: engineMetrics.with(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName.String()),
registry: cfg.Registry,
workflow: workflow,
secretsFetcher: cfg.SecretsFetcher,
Expand Down
Loading
Loading