Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add beholder logging for custom compute #15122

Merged
merged 15 commits into from
Nov 11, 2024
Merged
43 changes: 32 additions & 11 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"

Expand All @@ -21,8 +22,10 @@ import (
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/platform"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
)

Expand Down Expand Up @@ -117,7 +120,7 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe

m, ok := c.modules.get(id)
if !ok {
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.WorkflowExecutionID, request.Metadata.ReferenceID)
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata)
if err != nil {
return capabilities.CapabilityResponse{}, err
}
Expand All @@ -128,10 +131,10 @@ func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRe
return c.executeWithModule(ctx, m.module, cfg.Config, request)
}

func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, workflowExecutionID, referenceID string) (*module, error) {
func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, requestMetadata capabilities.RequestMetadata) (*module, error) {
initStart := time.Now()

cfg.Fetch = c.createFetcher(workflowID, workflowExecutionID)
cfg.Fetch = c.createFetcher()
mod, err := host.NewModule(cfg, binary)
if err != nil {
return nil, fmt.Errorf("failed to instantiate WASM module: %w", err)
Expand All @@ -140,7 +143,7 @@ func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, w
mod.Start()

initDuration := time.Since(initStart)
computeWASMInit.WithLabelValues(workflowID, referenceID).Observe(float64(initDuration))
computeWASMInit.WithLabelValues(requestMetadata.WorkflowID, requestMetadata.ReferenceID).Observe(float64(initDuration))

m := &module{module: mod}
c.modules.add(id, m)
Expand Down Expand Up @@ -201,18 +204,26 @@ func (c *Compute) Close() error {
return nil
}

func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
return func(ctx context.Context, req *wasmpb.FetchRequest) (*wasmpb.FetchResponse, error) {
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err)
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowId); err != nil {
return nil, fmt.Errorf("workflow ID %q is invalid: %w", req.Metadata.WorkflowId, err)
}
if err := validation.ValidateWorkflowOrExecutionID(workflowExecutionID); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", workflowExecutionID, err)
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionId); err != nil {
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", req.Metadata.WorkflowExecutionId, err)
}

cma := c.emitter.With(
platform.WorkflowIDKey, req.Metadata.WorkflowId,
platform.WorkflowNameKey, req.Metadata.WorkflowName,
platform.WorkflowOwnerKey, req.Metadata.WorkflowOwner,
platform.WorkflowExecutionIDKey, req.Metadata.WorkflowExecutionId,
timestampKey, time.Now().UTC().Format(time.RFC3339Nano),
)

messageID := strings.Join([]string{
workflowID,
workflowExecutionID,
req.Metadata.WorkflowId,
req.Metadata.WorkflowExecutionId,
ghcapabilities.MethodComputeAction,
c.idGenerator(),
}, "/")
Expand Down Expand Up @@ -245,6 +256,16 @@ func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx
if err != nil {
return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err)
}

// Only log if the response is not in the 200 range
if response.StatusCode < http.StatusOK || response.StatusCode >= http.StatusMultipleChoices {
msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode)
err = cma.Emit(ctx, msg)
if err != nil {
c.log.Errorf("failed to send custom message with msg: %s, err: %v", msg, err)
}
}

return &response, nil
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/capabilities/compute/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package compute

const timestampKey = "computeTimestamp"
15 changes: 15 additions & 0 deletions core/platform/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package platform

// Observability keys
const (
CapabilityIDKey = "capabilityID"
TriggerIDKey = "triggerID"
WorkflowIDKey = "workflowID"
WorkflowExecutionIDKey = "workflowExecutionID"
WorkflowNameKey = "workflowName"
WorkflowOwnerKey = "workflowOwner"
StepIDKey = "stepID"
StepRefKey = "stepRef"
)

var OrderedLabelKeys = []string{StepRefKey, StepIDKey, TriggerIDKey, CapabilityIDKey, WorkflowExecutionIDKey, WorkflowIDKey}
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241106142051-c7bded1c08ae
github.com/smartcontractkit/chainlink/deployment v0.0.0-00010101000000-000000000000
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241106140121-4c9ee21ab422 h1:VfH/AW5NtTmroY9zz6OYCPFbFTqpMyJ2ubgT9ahYf3U=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241106140121-4c9ee21ab422/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 h1:AGi0kAtMRW1zl1h7sGw+3CKO4Nlev6iA08YfEcgJCGs=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241106142051-c7bded1c08ae h1:uqce0bjNVYzFrrVLafXgyn8SVNdfOtZekLfAwQihHiA=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241106142051-c7bded1c08ae/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
3 changes: 2 additions & 1 deletion core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)
Expand Down Expand Up @@ -39,7 +40,7 @@ func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) {
cma := custmsg.NewLabeler().With(wIDKey, spec.WorkflowSpec.WorkflowID, woIDKey, spec.WorkflowSpec.WorkflowOwner, wnKey, spec.WorkflowSpec.WorkflowName)
cma := custmsg.NewLabeler().With(platform.WorkflowIDKey, spec.WorkflowSpec.WorkflowID, platform.WorkflowOwnerKey, spec.WorkflowSpec.WorkflowOwner, platform.WorkflowNameKey, spec.WorkflowSpec.WorkflowName)
sdkSpec, err := spec.WorkflowSpec.SDKSpec(ctx)
if err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to start workflow engine: failed to get workflow sdk spec: %v", err), d.logger)
Expand Down
Loading
Loading