-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add worker pool to WASM capability #15088
Changes from 1 commit
09493e4
2e1c6d9
486db86
d277afb
b3e3ca4
0da0b30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |
"errors" | ||
"fmt" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
|
@@ -18,7 +19,9 @@ import ( | |
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" | ||
"github.com/smartcontractkit/chainlink-common/pkg/custmsg" | ||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
"github.com/smartcontractkit/chainlink-common/pkg/services" | ||
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core" | ||
"github.com/smartcontractkit/chainlink-common/pkg/values" | ||
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" | ||
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb" | ||
"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation" | ||
|
@@ -70,7 +73,8 @@ var ( | |
var _ capabilities.ActionCapability = (*Compute)(nil) | ||
|
||
type Compute struct { | ||
log logger.Logger | ||
stopCh services.StopChan | ||
log logger.Logger | ||
|
||
// emitter is used to emit messages from the WASM module to a configured collector. | ||
emitter custmsg.MessageEmitter | ||
|
@@ -82,6 +86,10 @@ type Compute struct { | |
transformer ConfigTransformer | ||
outgoingConnectorHandler *webapi.OutgoingConnectorHandler | ||
idGenerator func() string | ||
|
||
numWorkers int | ||
queue chan request | ||
wg sync.WaitGroup | ||
} | ||
|
||
func (c *Compute) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { | ||
|
@@ -97,35 +105,96 @@ func generateID(binary []byte) string { | |
return fmt.Sprintf("%x", id) | ||
} | ||
|
||
func copyRequest(req capabilities.CapabilityRequest) capabilities.CapabilityRequest { | ||
return capabilities.CapabilityRequest{ | ||
Metadata: req.Metadata, | ||
Inputs: req.Inputs.CopyMap(), | ||
Config: req.Config.CopyMap(), | ||
func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { | ||
ch, err := c.enqueueRequest(ctx, request) | ||
if err != nil { | ||
return capabilities.CapabilityResponse{}, err | ||
} | ||
|
||
select { | ||
case <-c.stopCh: | ||
return capabilities.CapabilityResponse{}, errors.New("service shutting down, aborting request") | ||
case <-ctx.Done(): | ||
return capabilities.CapabilityResponse{}, fmt.Errorf("request cancelled by upstream: %w", ctx.Err()) | ||
case resp := <-ch: | ||
return resp.resp, resp.err | ||
} | ||
} | ||
|
||
func (c *Compute) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { | ||
copied := copyRequest(request) | ||
type request struct { | ||
ch chan response | ||
req capabilities.CapabilityRequest | ||
ctx func() context.Context | ||
} | ||
|
||
type response struct { | ||
resp capabilities.CapabilityResponse | ||
err error | ||
} | ||
|
||
func (c *Compute) enqueueRequest(ctx context.Context, req capabilities.CapabilityRequest) (<-chan response, error) { | ||
ch := make(chan response) | ||
r := request{ | ||
ch: ch, | ||
req: req, | ||
ctx: func() context.Context { return ctx }, | ||
} | ||
select { | ||
case <-c.stopCh: | ||
return nil, errors.New("service shutting down, aborting request") | ||
case <-ctx.Done(): | ||
return nil, fmt.Errorf("could not enqueue request: %w", ctx.Err()) | ||
case c.queue <- r: | ||
return ch, nil | ||
} | ||
} | ||
|
||
func shallowCopy(m *values.Map) *values.Map { | ||
to := values.EmptyMap() | ||
|
||
for k, v := range m.Underlying { | ||
to.Underlying[k] = v | ||
} | ||
|
||
return to | ||
} | ||
|
||
cfg, err := c.transformer.Transform(copied.Config) | ||
func (c *Compute) execute(ctx context.Context, respCh chan response, req capabilities.CapabilityRequest) { | ||
// Shallow copy the request. | ||
// This is because we mutate its overall shape. | ||
cedric-cordenier marked this conversation as resolved.
Show resolved
Hide resolved
|
||
req = capabilities.CapabilityRequest{ | ||
Config: shallowCopy(req.Config), | ||
|
||
// These aren't mutated so we ignore them. | ||
Metadata: req.Metadata, | ||
Inputs: req.Inputs, | ||
} | ||
|
||
cfg, err := c.transformer.Transform(req.Config) | ||
cedric-cordenier marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return capabilities.CapabilityResponse{}, fmt.Errorf("invalid request: could not transform config: %w", err) | ||
respCh <- response{err: fmt.Errorf("invalid request: could not transform config: %w", err)} | ||
return | ||
} | ||
|
||
id := generateID(cfg.Binary) | ||
|
||
m, ok := c.modules.get(id) | ||
if !ok { | ||
mod, err := c.initModule(id, cfg.ModuleConfig, cfg.Binary, request.Metadata.WorkflowID, request.Metadata.WorkflowExecutionID, request.Metadata.ReferenceID) | ||
if err != nil { | ||
return capabilities.CapabilityResponse{}, err | ||
mod, innerErr := c.initModule(id, cfg.ModuleConfig, cfg.Binary, req.Metadata.WorkflowID, req.Metadata.WorkflowExecutionID, req.Metadata.ReferenceID) | ||
if innerErr != nil { | ||
respCh <- response{err: innerErr} | ||
return | ||
} | ||
|
||
m = mod | ||
} | ||
|
||
return c.executeWithModule(ctx, m.module, cfg.Config, request) | ||
resp, err := c.executeWithModule(ctx, m.module, cfg.Config, req) | ||
select { | ||
case <-c.stopCh: | ||
case <-ctx.Done(): | ||
case respCh <- response{resp: resp, err: err}: | ||
} | ||
} | ||
|
||
func (c *Compute) initModule(id string, cfg *host.ModuleConfig, binary []byte, workflowID, workflowExecutionID, referenceID string) (*module, error) { | ||
|
@@ -193,11 +262,35 @@ func (c *Compute) Info(ctx context.Context) (capabilities.CapabilityInfo, error) | |
|
||
func (c *Compute) Start(ctx context.Context) error { | ||
c.modules.start() | ||
|
||
c.wg.Add(c.numWorkers) | ||
for i := 0; i < c.numWorkers; i++ { | ||
go func() { | ||
innerCtx, cancel := c.stopCh.NewCtx() | ||
defer cancel() | ||
|
||
defer c.wg.Done() | ||
c.worker(innerCtx) | ||
}() | ||
} | ||
return c.registry.Add(ctx, c) | ||
} | ||
|
||
func (c *Compute) worker(ctx context.Context) { | ||
for { | ||
select { | ||
case <-c.stopCh: | ||
return | ||
case req := <-c.queue: | ||
c.execute(req.ctx(), req.ch, req.req) | ||
} | ||
} | ||
} | ||
|
||
func (c *Compute) Close() error { | ||
c.modules.close() | ||
close(c.stopCh) | ||
c.wg.Wait() | ||
return nil | ||
} | ||
|
||
|
@@ -249,25 +342,40 @@ func (c *Compute) createFetcher(workflowID, workflowExecutionID string) func(ctx | |
} | ||
} | ||
|
||
const ( | ||
defaultNumWorkers = 3 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm worried about how low this is. We should be able to run a lot more. I don't get what takes so much memory. I'm not going to block the PR for now, since it'll fix some OOMs, but we need to get to the bottom of it... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed offline, we agree to revisit WASM performance generally once we've hit the external audit. |
||
) | ||
|
||
type Config struct { | ||
webapi.ServiceConfig | ||
NumWorkers int | ||
} | ||
|
||
func NewAction( | ||
config webapi.ServiceConfig, | ||
config Config, | ||
log logger.Logger, | ||
registry coretypes.CapabilitiesRegistry, | ||
handler *webapi.OutgoingConnectorHandler, | ||
idGenerator func() string, | ||
opts ...func(*Compute), | ||
) *Compute { | ||
if config.NumWorkers == 0 { | ||
config.NumWorkers = defaultNumWorkers | ||
} | ||
var ( | ||
lggr = logger.Named(log, "CustomCompute") | ||
labeler = custmsg.NewLabeler() | ||
compute = &Compute{ | ||
stopCh: make(services.StopChan), | ||
log: lggr, | ||
emitter: labeler, | ||
registry: registry, | ||
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3), | ||
transformer: NewTransformer(lggr, labeler), | ||
outgoingConnectorHandler: handler, | ||
idGenerator: idGenerator, | ||
queue: make(chan request), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the queue be non-blocking? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left this as non-blocking so that we backpressure onto the engine itself; if we don't succeed after 2 minutes we'll interrupt the request altogether. |
||
numWorkers: defaultNumWorkers, | ||
} | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to worry about the capability timing out in the engine if this queue gets too long?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do -- this is why I added the step-level timeout; we'll wait for a maximum of 2 minutes (which is incredibly generous) before interrupting a step (and 10 minutes for the whole workflow).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'll be more precise, I'm not convinced the timer in the engine should start until it's running. Users shouldn't be penalized if other tasks can block them. I'm worried that I can DoS the compute capability by making N compute steps that have infinite loops and intentionally time out.
For now, maybe this is ok, we can re-evaluate before we open compute for general use...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be possible because we apply a lower-level timeout to the individual WASM call; the default setting for this is 2s. I set the step-level timeout to be very large partly as an attempt to compensate for this.
What solution would you propose? We could ignore the engine timeout I suppose, but that feels dangerous IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of ignoring the engine timeout for this capability. For now, we don't need to block on this. We can think it out more later.