Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add beholder logging for custom compute #15122

Merged
merged 15 commits into from
Nov 11, 2024
43 changes: 32 additions & 11 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"

Expand All @@ -21,8 +22,10 @@ import (
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/platform"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
)

Expand Down Expand Up @@ -117,7 +120,7 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe

m, ok := c.modules.get(id)
if !ok {
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.WorkflowExecutionID, request.Metadata.ReferenceID)
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata)
if err != nil {
return capabilities.CapabilityResponse{}, err
}
Expand All @@ -128,10 +131,10 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe
return c.executeWithModule(ctx, m.module, cfg.Config, request)
}

func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, workflowExecutionID, referenceID string) (*module, error) {
func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, requestMetadata capabilities.RequestMetadata) (*module, error) {
initStart := time.Now()

cfg.Fetch = c.createFetcher(workflowID, workflowExecutionID)
cfg.Fetch = c.createFetcher()
mod, err := host.NewModule(cfg, binary)
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
Expand All @@ -140,7 +143,7 @@ func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, w
mod.Start()

initDuration := time.Since(initStart)
computeWASMInit.WithLabelValues(workflowID, referenceID).Observe(float64(initDuration))
computeWASMInit.WithLabelValues(requestMetadata.WorkflowID, requestMetadata.ReferenceID).Observe(float64(initDuration))

m := &module{module: mod}
c.modules.add(id, m)
Expand Down Expand Up @@ -201,18 +204,26 @@ func (c *Compute) Close() error {
return nil
}

func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
return func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err)
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowId); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", req.Metadata.WorkflowId, err)
}
if err := validation.ValidateWorkflowOrExecutionID(workflowExecutionID); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", workflowExecutionID, err)
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionId); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", req.Metadata.WorkflowExecutionId, err)
}

cma := c.emitter.With(
platform.KeyWorkflowID, req.Metadata.WorkflowId,
platform.KeyWorkflowName, req.Metadata.WorkflowName,
platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner,
platform.KeyWorkflowExecutionID, req.Metadata.WorkflowExecutionId,
timestampKey, time.Now().UTC().Format(time.RFC3339Nano),
)

messageID := strings.Join([]string{
workflowID,
workflowExecutionID,
req.Metadata.WorkflowId,
req.Metadata.WorkflowExecutionId,
ghcapabilities.MethodComputeAction,
c.idGenerator(),
}, "/")
Expand Down Expand Up @@ -245,6 +256,16 @@ func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx
if err != nil {
return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err)
}

// Only log if the response is not in the 200 range
if response.StatusCode < http.StatusOK || response.StatusCode >= http.StatusMultipleChoices {
msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode)
err = cma.Emit(ctx, msg)
if err != nil {
c.log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}

return &response, nil
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/capabilities/compute/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package compute

const timestampKey = "computeTimestamp"
15 changes: 15 additions & 0 deletions core/platform/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package platform

// Observability keys
const (
KeyCapabilityID = "capabilityID"
KeyTriggerID = "triggerID"
KeyWorkflowID = "workflowID"
KeyWorkflowExecutionID = "workflowExecutionID"
KeyWorkflowName = "workflowName"
KeyWorkflowOwner = "workflowOwner"
KeyStepID = "stepID"
KeyStepRef = "stepRef"
)

var OrderedLabelKeys = []string{KeyStepRef, KeyStepID, KeyTriggerID, KeyCapabilityID, KeyWorkflowExecutionID, KeyWorkflowID}
3 changes: 2 additions & 1 deletion core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)
Expand Down Expand Up @@ -39,7 +40,7 @@ func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) {
cma := custmsg.NewLabeler().With(wIDKey, spec.WorkflowSpec.WorkflowID, woIDKey, spec.WorkflowSpec.WorkflowOwner, wnKey, spec.WorkflowSpec.WorkflowName)
cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, spec.WorkflowSpec.WorkflowID, platform.KeyWorkflowOwner, spec.WorkflowSpec.WorkflowOwner, platform.KeyWorkflowName, spec.WorkflowSpec.WorkflowName)
sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger)
Expand Down
75 changes: 38 additions & 37 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

Expand Down Expand Up @@ -167,9 +168,9 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
for _, t := range e.workflow.triggers {
tg, err := e.registry.GetTrigger(ctx, t.ID)
if err != nil {
log := e.logger.With(cIDKey, t.ID)
log := e.logger.With(platform.KeyCapabilityID, t.ID)
log.Errorf("failed to get trigger capability: %s", err)
logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log)
logCustMsg(ctx, e.cma.With(platform.KeyCapabilityID, t.ID), fmt.Sprintf("failed to resolve trigger: %s", err), log)
// we don't immediately return here, since we want to retry all triggers
// to notify the user of all errors at once.
triggersInitialized = false
Expand All @@ -179,7 +180,7 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
}
if !triggersInitialized {
return &workflowError{reason: "failed to resolve triggers", labels: map[string]string{
wIDKey: e.workflow.id,
platform.KeyWorkflowID: e.workflow.id,
}}
}

Expand All @@ -201,15 +202,15 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
if err != nil {
logCustMsg(
ctx,
e.cma.With(wIDKey, e.workflow.id, sIDKey, s.ID, sRKey, s.Ref),
e.cma.With(platform.KeyWorkflowID, e.workflow.id, platform.KeyStepID, s.ID, platform.KeyStepRef, s.Ref),
fmt.Sprintf("failed to initialize capability for step: %s", err),
e.logger,
)
return &workflowError{err: err, reason: "failed to initialize capability for step",
labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: s.ID,
sRKey: s.Ref,
platform.KeyWorkflowID: e.workflow.id,
platform.KeyStepID: s.ID,
platform.KeyStepRef: s.Ref,
}}
}

Expand All @@ -231,8 +232,8 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
}

return &workflowError{reason: reason, err: err, labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: step.ID,
platform.KeyWorkflowID: e.workflow.id,
platform.KeyStepID: step.ID,
}}
}

Expand Down Expand Up @@ -318,7 +319,7 @@ func (e *Engine) init(ctx context.Context) {
if err != nil {
return &workflowError{err: err, reason: "failed to resolve workflow capabilities",
labels: map[string]string{
wIDKey: e.workflow.id,
platform.KeyWorkflowID: e.workflow.id,
}}
}
return nil
Expand All @@ -341,9 +342,9 @@ func (e *Engine) init(ctx context.Context) {
for idx, t := range e.workflow.triggers {
terr := e.registerTrigger(ctx, t, idx)
if terr != nil {
log := e.logger.With(cIDKey, t.ID)
log := e.logger.With(platform.KeyCapabilityID, t.ID)
log.Errorf("failed to register trigger: %s", terr)
logCustMsg(ctx, e.cma.With(cIDKey, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log)
logCustMsg(ctx, e.cma.With(platform.KeyCapabilityID, t.ID), fmt.Sprintf("failed to register trigger: %s", terr), log)
}
}

Expand Down Expand Up @@ -451,9 +452,9 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
// and triggerID might be "wf_123_trigger_0"
return &workflowError{err: err, reason: fmt.Sprintf("failed to register trigger: %+v", triggerRegRequest),
labels: map[string]string{
wIDKey: e.workflow.id,
cIDKey: t.ID,
tIDKey: triggerID,
platform.KeyWorkflowID: e.workflow.id,
platform.KeyCapabilityID: t.ID,
platform.KeyTriggerID: triggerID,
}}
}

Expand Down Expand Up @@ -491,7 +492,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
// `executionState`.
func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpdateCh chan store.WorkflowExecutionStep, workflowCreatedAt *time.Time) {
defer e.wg.Done()
lggr := e.logger.With(eIDKey, executionID)
lggr := e.logger.With(platform.KeyWorkflowExecutionID, executionID)
e.logger.Debugf("running stepUpdateLoop for execution %s", executionID)
for {
select {
Expand All @@ -505,11 +506,11 @@ func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpd
}
// Executed synchronously to ensure we correctly schedule subsequent tasks.
e.logger.Debugw(fmt.Sprintf("received step update for execution %s", stepUpdate.ExecutionID),
eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref)
err := e.handleStepUpdate(ctx, stepUpdate, workflowCreatedAt)
if err != nil {
e.logger.Errorf(fmt.Sprintf("failed to update step state: %+v, %s", stepUpdate, err),
eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref)
}
}
}
Expand All @@ -532,7 +533,7 @@ func generateExecutionID(workflowID, eventID string) (string, error) {

// startExecution kicks off a new workflow execution when a trigger event is received.
func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error {
lggr := e.logger.With("event", event, eIDKey, executionID)
lggr := e.logger.With("event", event, platform.KeyWorkflowExecutionID, executionID)
lggr.Debug("executing on a trigger event")
ec := &store.WorkflowExecution{
Steps: map[string]*store.WorkflowExecutionStep{
Expand Down Expand Up @@ -584,8 +585,8 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event *
}

func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.WorkflowExecutionStep, workflowCreatedAt *time.Time) error {
l := e.logger.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
cma := e.cma.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
l := e.logger.With(platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref)
cma := e.cma.With(platform.KeyWorkflowExecutionID, stepUpdate.ExecutionID, platform.KeyStepRef, stepUpdate.Ref)

// If we've been executing for too long, let's time the workflow step out and continue.
if workflowCreatedAt != nil && e.clock.Since(*workflowCreatedAt) > e.maxExecutionDuration {
Expand Down Expand Up @@ -658,7 +659,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {

// If all dependencies are completed, enqueue the step.
if !waitingOnDependencies {
e.logger.With(sRKey, step.Ref, eIDKey, state.ExecutionID, "state", copyState(state)).
e.logger.With(platform.KeyStepRef, step.Ref, platform.KeyWorkflowExecutionID, state.ExecutionID, "state", copyState(state)).
Debug("step request enqueued")
e.pendingStepRequests <- stepRequest{
state: copyState(state),
Expand All @@ -668,7 +669,7 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {
}

func (e *Engine) finishExecution(ctx context.Context, executionID string, status string) error {
e.logger.With(eIDKey, executionID, "status", status).Info("finishing execution")
e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status).Info("finishing execution")
metrics := e.metrics.with("status", status)
err := e.executionStates.UpdateStatus(ctx, executionID, status)
if err != nil {
Expand Down Expand Up @@ -713,23 +714,23 @@ func (e *Engine) worker(ctx context.Context) {
te := resp.Event

if te.ID == "" {
e.logger.With(tIDKey, te.TriggerType).Error("trigger event ID is empty; not executing")
e.logger.With(platform.KeyTriggerID, te.TriggerType).Error("trigger event ID is empty; not executing")
continue
}

executionID, err := generateExecutionID(e.workflow.id, te.ID)
if err != nil {
e.logger.With(tIDKey, te.ID).Errorf("could not generate execution ID: %v", err)
e.logger.With(platform.KeyTriggerID, te.ID).Errorf("could not generate execution ID: %v", err)
continue
}

cma := e.cma.With(eIDKey, executionID)
cma := e.cma.With(platform.KeyWorkflowExecutionID, executionID)
err = e.startExecution(ctx, executionID, resp.Event.Outputs)
if err != nil {
e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err)
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
} else {
e.logger.With(eIDKey, executionID).Debug("execution started")
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Debug("execution started")
logCustMsg(ctx, cma, "execution started", e.logger)
}
case <-ctx.Done():
Expand All @@ -741,8 +742,8 @@ func (e *Engine) worker(ctx context.Context) {
func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
// Instantiate a child logger; in addition to the WorkflowID field the workflow
// logger will already have, this adds the `stepRef` and `executionID`
l := e.logger.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)
cma := e.cma.With(sRKey, msg.stepRef, eIDKey, msg.state.ExecutionID)
l := e.logger.With(platform.KeyStepRef, msg.stepRef, platform.KeyWorkflowExecutionID, msg.state.ExecutionID)
cma := e.cma.With(platform.KeyStepRef, msg.stepRef, platform.KeyWorkflowExecutionID, msg.state.ExecutionID)

l.Debug("executing on a step event")
stepState := &store.WorkflowExecutionStep{
Expand Down Expand Up @@ -1104,9 +1105,9 @@ func (e *Engine) Close() error {
return &workflowError{err: innerErr,
reason: fmt.Sprintf("failed to unregister capability from workflow: %+v", reg),
labels: map[string]string{
wIDKey: e.workflow.id,
sIDKey: s.ID,
sRKey: s.Ref,
platform.KeyWorkflowID: e.workflow.id,
platform.KeyStepID: s.ID,
platform.KeyStepRef: s.Ref,
}}
}

Expand Down Expand Up @@ -1157,7 +1158,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
if cfg.Store == nil {
return nil, &workflowError{reason: "store is nil",
labels: map[string]string{
wIDKey: cfg.WorkflowID,
platform.KeyWorkflowID: cfg.WorkflowID,
},
}
}
Expand Down Expand Up @@ -1206,7 +1207,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
// - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist)
// - etc.

cma := custmsg.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, cfg.WorkflowName)
cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName)
workflow, err := Parse(cfg.Workflow)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to parse workflow: %s", err), cfg.Lggr)
Expand All @@ -1220,7 +1221,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
engine = &Engine{
cma: cma,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(wIDKey, cfg.WorkflowID, woIDKey, cfg.WorkflowOwner, wnKey, workflow.name)},
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, workflow.name)},
registry: cfg.Registry,
workflow: workflow,
secretsFetcher: cfg.SecretsFetcher,
Expand Down Expand Up @@ -1268,7 +1269,7 @@ func (e *workflowError) Error() string {
}

// prefix the error with the labels
for _, label := range orderedLabelKeys {
for _, label := range platform.OrderedLabelKeys {
// This will silently ignore any labels that are not present in the map
// are we ok with this?
if value, ok := e.labels[label]; ok {
Expand Down
Loading
Loading