Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker pool to WASM capability #15088

Merged
merged 6 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 123 additions & 15 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/google/uuid"
Expand All @@ -18,7 +19,9 @@ import (
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
Expand Down Expand Up @@ -70,7 +73,8 @@ var (
var _ capabilities.ActionCapability = (*Compute)(nil)

type Compute struct {
log logger.Logger
stopCh services.StopChan
log logger.Logger

// emitter is used to emit messages from the WASM module to a configured collector.
emitter custmsg.MessageEmitter
Expand All @@ -82,6 +86,10 @@ type Compute struct {
transformer ConfigTransformer
outgoingConnectorHandler *webapi.OutgoingConnectorHandler
idGenerator func() string

numWorkers int
queue chan request
wg sync.WaitGroup
}

func (c *Compute) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand All @@ -97,35 +105,96 @@ func generateID(binary []byte) string {
return fmt.Sprintf("%x", id)
}

func copyRequest(req capabilities.CapabilityRequest) capabilities.CapabilityRequest {
return capabilities.CapabilityRequest{
Metadata: req.Metadata,
Inputs: req.Inputs.CopyMap(),
Config: req.Config.CopyMap(),
func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
ch, err := c.enqueueRequest(ctx, request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to worry about the capability timing out in the engine if this queue gets too long?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do -- this is why I added the step-level timeout; we'll wait for a maximum of 2 minutes (which is incredibly generous) before interrupting a step (and 10 minutes for the whole workflow).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'll be more precise, I'm not convinced the timer in the engine should start until it's running. Users shouldn't be penalized if other tasks can block them. I'm worried that I can DoS the compute capability by making N compute steps that have infinite loops and intentionally time out.

For now, maybe this is ok, we can re-evaluate before we open compute for general use...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that I can DoS the compute capability by making N compute steps that have infinite loops and intentionally time out.

This shouldn't be possible because we apply a lower-level timeout to the individual WASM call; the default setting for this is 2s. I set the step-level timeout to be very large partly as an attempt to compensate for this.

What solution would you propose? We could ignore the engine timeout I suppose, but that feels dangerous IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of ignoring the engine timeout for this capability. For now, we don't need to block on this. We can think it out more later.

if err != nil {
return capabilities.CapabilityResponse{}, err
}

select {
case <-c.stopCh:
return capabilities.CapabilityResponse{}, errors.New("service shutting down, aborting request")
case <-ctx.Done():
return capabilities.CapabilityResponse{}, fmt.Errorf("request cancelled by upstream: %w", ctx.Err())
case resp := <-ch:
return resp.resp, resp.err
}
}

func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
copied := copyRequest(request)
type request struct {
ch chan response
req capabilities.CapabilityRequest
ctx func() context.Context
}

type response struct {
resp capabilities.CapabilityResponse
err error
}

func (c *Compute) enqueueRequest(ctx context.Context, req capabilities.CapabilityRequest) (<-chan response, error) {
ch := make(chan response)
r := request{
ch: ch,
req: req,
ctx: func() context.Context { return ctx },
}
select {
case <-c.stopCh:
return nil, errors.New("service shutting down, aborting request")
case <-ctx.Done():
return nil, fmt.Errorf("could not enqueue request: %w", ctx.Err())
case c.queue <- r:
return ch, nil
}
}

func shallowCopy(m *values.Map) *values.Map {
to := values.EmptyMap()

for k, v := range m.Underlying {
to.Underlying[k] = v
}

return to
}

cfg, err := c.transformer.Transform(copied.Config)
func (c *Compute) execute(ctx context.Context, respCh chan response, req capabilities.CapabilityRequest) {
// Shallow copy the request.
// This is because we mutate its overall shape.
req = capabilities.CapabilityRequest{
Config: shallowCopy(req.Config),

// These aren't mutated so we ignore them.
Metadata: req.Metadata,
Inputs: req.Inputs,
}

cfg, err := c.transformer.Transform(req.Config)
if err != nil {
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: could not transform config: %w", err)
respCh <- response{err: fmt.Errorf("invalid request: could not transform config: %w", err)}
return
}

id := generateID(cfg.Binary)

m, ok := c.modules.get(id)
if !ok {
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.WorkflowExecutionID, request.Metadata.ReferenceID)
if err != nil {
return capabilities.CapabilityResponse{}, err
mod, innerErr := c.initModule(id, cfg.ModuleConfig, cfg.Binary, req.Metadata.WorkflowID, req.Metadata.WorkflowExecutionID, req.Metadata.ReferenceID)
if innerErr != nil {
respCh <- response{err: innerErr}
return
}

m = mod
}

return c.executeWithModule(ctx, m.module, cfg.Config, request)
resp, err := c.executeWithModule(ctx, m.module, cfg.Config, req)
select {
case <-c.stopCh:
case <-ctx.Done():
case respCh <- response{resp: resp, err: err}:
}
}

func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, workflowExecutionID, referenceID string) (*module, error) {
Expand Down Expand Up @@ -193,11 +262,35 @@ func (c *Compute) Info(ctx context.Context) (capabilities.CapabilityInfo, error)

func (c *Compute) Start(ctx context.Context) error {
c.modules.start()

c.wg.Add(c.numWorkers)
for i := 0; i < c.numWorkers; i++ {
go func() {
innerCtx, cancel := c.stopCh.NewCtx()
defer cancel()

defer c.wg.Done()
c.worker(innerCtx)
}()
}
return c.registry.Add(ctx, c)
}

func (c *Compute) worker(ctx context.Context) {
for {
select {
case <-c.stopCh:
return
case req := <-c.queue:
c.execute(req.ctx(), req.ch, req.req)
}
}
}

func (c *Compute) Close() error {
c.modules.close()
close(c.stopCh)
c.wg.Wait()
return nil
}

Expand Down Expand Up @@ -249,25 +342,40 @@ func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx
}
}

const (
defaultNumWorkers = 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about how low this is. We should be able to run a lot more. I don't get what takes so much memory. I'm not going to block the PR for now, since it'll fix some OOMs, but we need to get to the bottom of it...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, we agree to revisit WASM performance generally once we've hit the external audit.

)

type Config struct {
webapi.ServiceConfig
NumWorkers int
}

func NewAction(
config webapi.ServiceConfig,
config Config,
log logger.Logger,
registry coretypes.CapabilitiesRegistry,
handler *webapi.OutgoingConnectorHandler,
idGenerator func() string,
opts ...func(*Compute),
) *Compute {
if config.NumWorkers == 0 {
config.NumWorkers = defaultNumWorkers
}
var (
lggr = logger.Named(log, "CustomCompute")
labeler = custmsg.NewLabeler()
compute = &Compute{
stopCh: make(services.StopChan),
log: lggr,
emitter: labeler,
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(lggr, labeler),
outgoingConnectorHandler: handler,
idGenerator: idGenerator,
queue: make(chan request),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the queue be non-blocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this as non-blocking so that we backpressure onto the engine itself; if we don't succeed after 2 minutes we'll interrupt the request altogether.

numWorkers: defaultNumWorkers,
}
)

Expand Down
20 changes: 11 additions & 9 deletions core/capabilities/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,32 @@ const (
validRequestUUID = "d2fe6db9-beb4-47c9-b2d6-d3065ace111e"
)

var defaultConfig = webapi.ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
var defaultConfig = Config{
ServiceConfig: webapi.ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
},
}

type testHarness struct {
registry *corecapabilities.Registry
connector *gcmocks.GatewayConnector
log logger.Logger
config webapi.ServiceConfig
config Config
connectorHandler *webapi.OutgoingConnectorHandler
compute *Compute
}

func setup(t *testing.T, config webapi.ServiceConfig) testHarness {
func setup(t *testing.T, config Config) testHarness {
log := logger.TestLogger(t)
registry := capabilities.NewRegistry(log)
connector := gcmocks.NewGatewayConnector(t)
idGeneratorFn := func() string { return validRequestUUID }
connectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, config, ghcapabilities.MethodComputeAction, log)
connectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, config.ServiceConfig, ghcapabilities.MethodComputeAction, log)
require.NoError(t, err)

compute := NewAction(config, log, registry, connectorHandler, idGeneratorFn)
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241104110737-3072d4cf1ba4
github.com/smartcontractkit/chainlink/deployment v0.0.0-00010101000000-000000000000
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241104130643-4b7e196370c4 h1:GWjim4uGGFbye4XbJP0cPAbARhc8u3cAJU8jLYy0mXM=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241104130643-4b7e196370c4/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 h1:AGi0kAtMRW1zl1h7sGw+3CKO4Nlev6iA08YfEcgJCGs=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241104110737-3072d4cf1ba4 h1:nJ7sns0iO2FTEUivMaPee5KvCBTn7qcNSXRoie/Ik5Q=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241104110737-3072d4cf1ba4/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
8 changes: 4 additions & 4 deletions core/services/standardcapabilities/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,14 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
return nil, errors.New("config is empty")
}

var fetchCfg webapi.ServiceConfig
err := toml.Unmarshal([]byte(spec.StandardCapabilitiesSpec.Config), &fetchCfg)
var cfg compute.Config
err := toml.Unmarshal([]byte(spec.StandardCapabilitiesSpec.Config), &cfg)
if err != nil {
return nil, err
}
lggr := d.logger.Named("ComputeAction")

handler, err := webapi.NewOutgoingConnectorHandler(d.gatewayConnectorWrapper.GetGatewayConnector(), fetchCfg, capabilities.MethodComputeAction, lggr)
handler, err := webapi.NewOutgoingConnectorHandler(d.gatewayConnectorWrapper.GetGatewayConnector(), cfg.ServiceConfig, capabilities.MethodComputeAction, lggr)
if err != nil {
return nil, err
}
Expand All @@ -253,7 +253,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
return uuid.New().String()
}

computeSrvc := compute.NewAction(fetchCfg, log, d.registry, handler, idGeneratorFn)
computeSrvc := compute.NewAction(cfg, log, d.registry, handler, idGeneratorFn)
return []job.ServiceCtx{computeSrvc}, nil
}

Expand Down
13 changes: 12 additions & 1 deletion core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type Engine struct {
newWorkerTimeout time.Duration
maxExecutionDuration time.Duration
heartbeatCadence time.Duration
stepTimeoutDuration time.Duration

// testing lifecycle hook to signal when an execution is finished.
onExecutionFinished func(string)
Expand Down Expand Up @@ -754,7 +755,10 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
// TODO ks-462 inputs
logCustMsg(ctx, cma, "executing step", l)

inputs, outputs, err := e.executeStep(ctx, l, msg)
stepCtx, cancel := context.WithTimeout(ctx, e.stepTimeoutDuration)
defer cancel()

inputs, outputs, err := e.executeStep(stepCtx, l, msg)
var stepStatus string
switch {
case errors.Is(capabilities.ErrStopExecution, err):
Expand Down Expand Up @@ -1136,6 +1140,7 @@ type Config struct {
Binary []byte
SecretsFetcher secretsFetcher
HeartbeatCadence time.Duration
StepTimeout time.Duration

// For testing purposes only
maxRetries int
Expand All @@ -1151,6 +1156,7 @@ const (
defaultNewWorkerTimeout = 2 * time.Second
defaultMaxExecutionDuration = 10 * time.Minute
defaultHeartbeatCadence = 5 * time.Minute
defaultStepTimeout = 2 * time.Minute
)

func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
Expand Down Expand Up @@ -1182,6 +1188,10 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
cfg.HeartbeatCadence = defaultHeartbeatCadence
}

if cfg.StepTimeout == 0 {
cfg.StepTimeout = defaultStepTimeout
}

if cfg.retryMs == 0 {
cfg.retryMs = 5000
}
Expand Down Expand Up @@ -1234,6 +1244,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
triggerEvents: make(chan capabilities.TriggerResponse),
stopCh: make(chan struct{}),
newWorkerTimeout: cfg.NewWorkerTimeout,
stepTimeoutDuration: cfg.StepTimeout,
maxExecutionDuration: cfg.MaxExecutionDuration,
heartbeatCadence: cfg.HeartbeatCadence,
onExecutionFinished: cfg.onExecutionFinished,
Expand Down
Loading
Loading