Skip to content

Commit

Permalink
[chore] Add worker pool to compute capability
Browse files Browse the repository at this point in the history
- Also add step-level timeout to engine. This was removed when we moved
  away from ExecuteSync().
  • Loading branch information
cedric-cordenier committed Nov 4, 2024
1 parent 1b555ff commit 09493e4
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 58 deletions.
138 changes: 123 additions & 15 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/google/uuid"
Expand All @@ -18,7 +19,9 @@ import (
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
Expand Down Expand Up @@ -70,7 +73,8 @@ var (
var _ capabilities.ActionCapability = (*Compute)(nil)

type Compute struct {
log logger.Logger
stopCh services.StopChan
log logger.Logger

// emitter is used to emit messages from the WASM module to a configured collector.
emitter custmsg.MessageEmitter
Expand All @@ -82,6 +86,10 @@ type Compute struct {
transformer ConfigTransformer
outgoingConnectorHandler *webapi.OutgoingConnectorHandler
idGenerator func() string

numWorkers int
queue chan request
wg sync.WaitGroup
}

func (c *Compute) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand All @@ -97,35 +105,96 @@ func generateID(binary []byte) string {
return fmt.Sprintf("%x", id)
}

func copyRequest(req capabilities.CapabilityRequest) capabilities.CapabilityRequest {
return capabilities.CapabilityRequest{
Metadata: req.Metadata,
Inputs: req.Inputs.CopyMap(),
Config: req.Config.CopyMap(),
func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
ch, err := c.enqueueRequest(ctx, request)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

select {
case <-c.stopCh:
return capabilities.CapabilityResponse{}, errors.New("service shutting down, aborting request")
case <-ctx.Done():
return capabilities.CapabilityResponse{}, fmt.Errorf("request cancelled by upstream: %w", ctx.Err())
case resp := <-ch:
return resp.resp, resp.err
}
}

func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
copied := copyRequest(request)
type request struct {
ch chan response
req capabilities.CapabilityRequest
ctx func() context.Context
}

type response struct {
resp capabilities.CapabilityResponse
err error
}

func (c *Compute) enqueueRequest(ctx context.Context, req capabilities.CapabilityRequest) (<-chan response, error) {
ch := make(chan response)
r := request{
ch: ch,
req: req,
ctx: func() context.Context { return ctx },
}
select {
case <-c.stopCh:
return nil, errors.New("service shutting down, aborting request")
case <-ctx.Done():
return nil, fmt.Errorf("could not enqueue request: %w", ctx.Err())
case c.queue <- r:
return ch, nil
}
}

func shallowCopy(m *values.Map) *values.Map {
to := values.EmptyMap()

for k, v := range m.Underlying {
to.Underlying[k] = v
}

return to
}

cfg, err := c.transformer.Transform(copied.Config)
func (c *Compute) execute(ctx context.Context, respCh chan response, req capabilities.CapabilityRequest) {
// Shallow copy the request.
// This is because we mutate its overall shape.
req = capabilities.CapabilityRequest{
Config: shallowCopy(req.Config),

// These aren't mutated so we ignore them.
Metadata: req.Metadata,
Inputs: req.Inputs,
}

cfg, err := c.transformer.Transform(req.Config)
if err != nil {
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: could not transform config: %w", err)
respCh <- response{err: fmt.Errorf("invalid request: could not transform config: %w", err)}
return
}

id := generateID(cfg.Binary)

m, ok := c.modules.get(id)
if !ok {
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.WorkflowExecutionID, request.Metadata.ReferenceID)
if err != nil {
return capabilities.CapabilityResponse{}, err
mod, innerErr := c.initModule(id, cfg.ModuleConfig, cfg.Binary, req.Metadata.WorkflowID, req.Metadata.WorkflowExecutionID, req.Metadata.ReferenceID)
if innerErr != nil {
respCh <- response{err: innerErr}
return
}

m = mod
}

return c.executeWithModule(ctx, m.module, cfg.Config, request)
resp, err := c.executeWithModule(ctx, m.module, cfg.Config, req)
select {
case <-c.stopCh:
case <-ctx.Done():
case respCh <- response{resp: resp, err: err}:
}
}

func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, workflowExecutionID, referenceID string) (*module, error) {
Expand Down Expand Up @@ -193,11 +262,35 @@ func (c *Compute) Info(ctx context.Context) (capabilities.CapabilityInfo, error)

func (c *Compute) Start(ctx context.Context) error {
c.modules.start()

c.wg.Add(c.numWorkers)
for i := 0; i < c.numWorkers; i++ {
go func() {
innerCtx, cancel := c.stopCh.NewCtx()
defer cancel()

defer c.wg.Done()
c.worker(innerCtx)
}()
}
return c.registry.Add(ctx, c)
}

func (c *Compute) worker(ctx context.Context) {
for {
select {
case <-c.stopCh:
return
case req := <-c.queue:
c.execute(req.ctx(), req.ch, req.req)
}
}
}

func (c *Compute) Close() error {
c.modules.close()
close(c.stopCh)
c.wg.Wait()
return nil
}

Expand Down Expand Up @@ -249,25 +342,40 @@ func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx
}
}

const (
defaultNumWorkers = 3
)

type Config struct {
webapi.ServiceConfig
NumWorkers int
}

func NewAction(
config webapi.ServiceConfig,
config Config,
log logger.Logger,
registry coretypes.CapabilitiesRegistry,
handler *webapi.OutgoingConnectorHandler,
idGenerator func() string,
opts ...func(*Compute),
) *Compute {
if config.NumWorkers == 0 {
config.NumWorkers = defaultNumWorkers
}
var (
lggr = logger.Named(log, "CustomCompute")
labeler = custmsg.NewLabeler()
compute = &Compute{
stopCh: make(services.StopChan),
log: lggr,
emitter: labeler,
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(lggr, labeler),
outgoingConnectorHandler: handler,
idGenerator: idGenerator,
queue: make(chan request),
numWorkers: defaultNumWorkers,
}
)

Expand Down
20 changes: 11 additions & 9 deletions core/capabilities/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,32 @@ const (
validRequestUUID = "d2fe6db9-beb4-47c9-b2d6-d3065ace111e"
)

var defaultConfig = webapi.ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
var defaultConfig = Config{
ServiceConfig: webapi.ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
},
}

type testHarness struct {
registry *corecapabilities.Registry
connector *gcmocks.GatewayConnector
log logger.Logger
config webapi.ServiceConfig
config Config
connectorHandler *webapi.OutgoingConnectorHandler
compute *Compute
}

func setup(t *testing.T, config webapi.ServiceConfig) testHarness {
func setup(t *testing.T, config Config) testHarness {
log := logger.TestLogger(t)
registry := capabilities.NewRegistry(log)
connector := gcmocks.NewGatewayConnector(t)
idGeneratorFn := func() string { return validRequestUUID }
connectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, config, ghcapabilities.MethodComputeAction, log)
connectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, config.ServiceConfig, ghcapabilities.MethodComputeAction, log)
require.NoError(t, err)

compute := NewAction(config, log, registry, connectorHandler, idGeneratorFn)
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241104110737-3072d4cf1ba4
github.com/smartcontractkit/chainlink/deployment v0.0.0-00010101000000-000000000000
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB
github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241104130643-4b7e196370c4 h1:GWjim4uGGFbye4XbJP0cPAbARhc8u3cAJU8jLYy0mXM=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20241104130643-4b7e196370c4/go.mod h1:4adKaHNaxFsRvV/lYfqtbsWyyvIPUMLR0FdOJN/ljis=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7 h1:AGi0kAtMRW1zl1h7sGw+3CKO4Nlev6iA08YfEcgJCGs=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241101093830-33711d0c3de7/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241104110737-3072d4cf1ba4 h1:nJ7sns0iO2FTEUivMaPee5KvCBTn7qcNSXRoie/Ik5Q=
github.com/smartcontractkit/chainlink-common v0.3.1-0.20241104110737-3072d4cf1ba4/go.mod h1:TQ9/KKXZ9vr8QAlUquqGpSvDCpR+DtABKPXZY4CiRns=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f h1:BwrIaQIx5Iy6eT+DfLhFfK2XqjxRm74mVdlX8gbu4dw=
github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241017133723-5277829bd53f/go.mod h1:wHtwSR3F1CQSJJZDQKuqaqFYnvkT+kMyget7dl8Clvo=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241018134907-a00ba3729b5e h1:JiETqdNM0bktAUGMc62COwXIaw3rR3M77Me6bBLG0Fg=
Expand Down
8 changes: 4 additions & 4 deletions core/services/standardcapabilities/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,14 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
return nil, errors.New("config is empty")
}

var fetchCfg webapi.ServiceConfig
err := toml.Unmarshal([]byte(spec.StandardCapabilitiesSpec.Config), &fetchCfg)
var cfg compute.Config
err := toml.Unmarshal([]byte(spec.StandardCapabilitiesSpec.Config), &cfg)
if err != nil {
return nil, err
}
lggr := d.logger.Named("ComputeAction")

handler, err := webapi.NewOutgoingConnectorHandler(d.gatewayConnectorWrapper.GetGatewayConnector(), fetchCfg, capabilities.MethodComputeAction, lggr)
handler, err := webapi.NewOutgoingConnectorHandler(d.gatewayConnectorWrapper.GetGatewayConnector(), cfg.ServiceConfig, capabilities.MethodComputeAction, lggr)
if err != nil {
return nil, err
}
Expand All @@ -253,7 +253,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
return uuid.New().String()
}

computeSrvc := compute.NewAction(fetchCfg, log, d.registry, handler, idGeneratorFn)
computeSrvc := compute.NewAction(cfg, log, d.registry, handler, idGeneratorFn)
return []job.ServiceCtx{computeSrvc}, nil
}

Expand Down
13 changes: 12 additions & 1 deletion core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type Engine struct {
newWorkerTimeout time.Duration
maxExecutionDuration time.Duration
heartbeatCadence time.Duration
stepTimeoutDuration time.Duration

// testing lifecycle hook to signal when an execution is finished.
onExecutionFinished func(string)
Expand Down Expand Up @@ -754,7 +755,10 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
// TODO ks-462 inputs
logCustMsg(ctx, cma, "executing step", l)

inputs, outputs, err := e.executeStep(ctx, l, msg)
stepCtx, cancel := context.WithTimeout(ctx, e.stepTimeoutDuration)
defer cancel()

inputs, outputs, err := e.executeStep(stepCtx, l, msg)
var stepStatus string
switch {
case errors.Is(capabilities.ErrStopExecution, err):
Expand Down Expand Up @@ -1136,6 +1140,7 @@ type Config struct {
Binary []byte
SecretsFetcher secretsFetcher
HeartbeatCadence time.Duration
StepTimeout time.Duration

// For testing purposes only
maxRetries int
Expand All @@ -1151,6 +1156,7 @@ const (
defaultNewWorkerTimeout = 2 * time.Second
defaultMaxExecutionDuration = 10 * time.Minute
defaultHeartbeatCadence = 5 * time.Minute
defaultStepTimeout = 2 * time.Minute
)

func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
Expand Down Expand Up @@ -1182,6 +1188,10 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
cfg.HeartbeatCadence = defaultHeartbeatCadence
}

if cfg.StepTimeout == 0 {
cfg.StepTimeout = defaultStepTimeout
}

if cfg.retryMs == 0 {
cfg.retryMs = 5000
}
Expand Down Expand Up @@ -1234,6 +1244,7 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
triggerEvents: make(chan capabilities.TriggerResponse),
stopCh: make(chan struct{}),
newWorkerTimeout: cfg.NewWorkerTimeout,
stepTimeoutDuration: cfg.StepTimeout,
maxExecutionDuration: cfg.MaxExecutionDuration,
heartbeatCadence: cfg.HeartbeatCadence,
onExecutionFinished: cfg.onExecutionFinished,
Expand Down
Loading

0 comments on commit 09493e4

Please sign in to comment.