Skip to content

Commit

Permalink
Make work queue push based
Browse files Browse the repository at this point in the history
The work item queue was pull based, meaning that if the backed had no
work to do, the processor would both spin wheels doing no work as well
as processing the next work item slower as there was a backoff delay to
when there was no work.

Updates the queue to be pull based, calling a function which hangs until
there is either work to do or the context is cancelled.

Signed-off-by: joshvanl <[email protected]>
  • Loading branch information
JoshVanL committed Jan 14, 2025
1 parent 3c1cc0f commit 78eae56
Show file tree
Hide file tree
Showing 16 changed files with 332 additions and 480 deletions.
24 changes: 10 additions & 14 deletions backend/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ type ActivityExecutor interface {
ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
}

func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker {
func NewActivityTaskWorker(be Backend, executor ActivityExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker[*ActivityWorkItem] {
processor := newActivityProcessor(be, executor)
return NewTaskWorker(processor, logger, opts...)
}

func newActivityProcessor(be Backend, executor ActivityExecutor) TaskProcessor {
func newActivityProcessor(be Backend, executor ActivityExecutor) TaskProcessor[*ActivityWorkItem] {
return &activityProcessor{
be: be,
executor: executor,
Expand All @@ -38,15 +38,13 @@ func (*activityProcessor) Name() string {
return "activity-processor"
}

// FetchWorkItem implements TaskDispatcher
func (ap *activityProcessor) FetchWorkItem(ctx context.Context) (WorkItem, error) {
return ap.be.GetActivityWorkItem(ctx)
// NextWorkItem implements TaskDispatcher
func (ap *activityProcessor) NextWorkItem(ctx context.Context) (*ActivityWorkItem, error) {
return ap.be.NextActivityWorkItem(ctx)
}

// ProcessWorkItem implements TaskDispatcher
func (p *activityProcessor) ProcessWorkItem(ctx context.Context, wi WorkItem) error {
awi := wi.(*ActivityWorkItem)

func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWorkItem) error {
ts := awi.NewEvent.GetTaskScheduled()
if ts == nil {
return fmt.Errorf("%v: invalid TaskScheduled event", awi.InstanceID)
Expand Down Expand Up @@ -83,20 +81,18 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, wi WorkItem) er
}

// CompleteWorkItem implements TaskDispatcher
func (ap *activityProcessor) CompleteWorkItem(ctx context.Context, wi WorkItem) error {
awi := wi.(*ActivityWorkItem)
func (ap *activityProcessor) CompleteWorkItem(ctx context.Context, awi *ActivityWorkItem) error {
if awi.Result == nil {
return fmt.Errorf("can't complete work item '%s' with nil result", wi)
return fmt.Errorf("can't complete work item '%s' with nil result", awi)
}
if awi.Result.GetTaskCompleted() == nil && awi.Result.GetTaskFailed() == nil {
return fmt.Errorf("can't complete work item '%s', which isn't TaskCompleted or TaskFailed", wi)
return fmt.Errorf("can't complete work item '%s', which isn't TaskCompleted or TaskFailed", awi)
}

return ap.be.CompleteActivityWorkItem(ctx, awi)
}

// AbandonWorkItem implements TaskDispatcher
func (ap *activityProcessor) AbandonWorkItem(ctx context.Context, wi WorkItem) error {
awi := wi.(*ActivityWorkItem)
func (ap *activityProcessor) AbandonWorkItem(ctx context.Context, awi *ActivityWorkItem) error {
return ap.be.AbandonActivityWorkItem(ctx, awi)
}
12 changes: 6 additions & 6 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ type Backend interface {
// AddNewEvent adds a new orchestration event to the specified orchestration instance.
AddNewOrchestrationEvent(context.Context, api.InstanceID, *HistoryEvent) error

// GetOrchestrationWorkItem gets a pending work item from the task hub or returns [ErrNoOrchWorkItems]
// if there are no pending work items.
GetOrchestrationWorkItem(context.Context) (*OrchestrationWorkItem, error)
// NextOrchestrationWorkItem blocks and returns the next orchestration work
// item from the task hub. Should only return an error when shutting down.
NextOrchestrationWorkItem(context.Context) (*OrchestrationWorkItem, error)

// GetOrchestrationRuntimeState gets the runtime state of an orchestration instance.
GetOrchestrationRuntimeState(context.Context, *OrchestrationWorkItem) (*OrchestrationRuntimeState, error)
Expand All @@ -97,9 +97,9 @@ type Backend interface {
// completes with a failure is still considered a successfully processed work item).
AbandonOrchestrationWorkItem(context.Context, *OrchestrationWorkItem) error

// GetActivityWorkItem gets a pending activity work item from the task hub or returns [ErrNoWorkItems]
// if there are no pending activity work items.
GetActivityWorkItem(context.Context) (*ActivityWorkItem, error)
// NextActivityWorkItem blocks and returns the next activity work item from
// the task hub. Should only return an error when shutting down.
NextActivityWorkItem(context.Context) (*ActivityWorkItem, error)

// CompleteActivityWorkItem sends a message to the parent orchestration indicating activity completion.
//
Expand Down
23 changes: 10 additions & 13 deletions backend/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,27 @@ type orchestratorProcessor struct {
logger Logger
}

func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker {
func NewOrchestrationWorker(be Backend, executor OrchestratorExecutor, logger Logger, opts ...NewTaskWorkerOptions) TaskWorker[*OrchestrationWorkItem] {
processor := &orchestratorProcessor{
be: be,
executor: executor,
logger: logger,
}
return NewTaskWorker(processor, logger, opts...)
return NewTaskWorker[*OrchestrationWorkItem](processor, logger, opts...)
}

// Name implements TaskProcessor
func (*orchestratorProcessor) Name() string {
return "orchestration-processor"
}

// FetchWorkItem implements TaskProcessor
func (p *orchestratorProcessor) FetchWorkItem(ctx context.Context) (WorkItem, error) {
return p.be.GetOrchestrationWorkItem(ctx)
// NextWorkItem implements TaskProcessor
func (p *orchestratorProcessor) NextWorkItem(ctx context.Context) (*OrchestrationWorkItem, error) {
return p.be.NextOrchestrationWorkItem(ctx)
}

// ProcessWorkItem implements TaskProcessor
func (w *orchestratorProcessor) ProcessWorkItem(ctx context.Context, cwi WorkItem) error {
wi := cwi.(*OrchestrationWorkItem)
func (w *orchestratorProcessor) ProcessWorkItem(ctx context.Context, wi *OrchestrationWorkItem) error {
w.logger.Debugf("%v: received work item with %d new event(s): %v", wi.InstanceID, len(wi.NewEvents), helpers.HistoryListSummary(wi.NewEvents))

// TODO: Caching
Expand Down Expand Up @@ -131,15 +130,13 @@ func (w *orchestratorProcessor) ProcessWorkItem(ctx context.Context, cwi WorkIte
}

// CompleteWorkItem implements TaskProcessor
func (p *orchestratorProcessor) CompleteWorkItem(ctx context.Context, wi WorkItem) error {
owi := wi.(*OrchestrationWorkItem)
return p.be.CompleteOrchestrationWorkItem(ctx, owi)
func (p *orchestratorProcessor) CompleteWorkItem(ctx context.Context, wi *OrchestrationWorkItem) error {
return p.be.CompleteOrchestrationWorkItem(ctx, wi)
}

// AbandonWorkItem implements TaskProcessor
func (p *orchestratorProcessor) AbandonWorkItem(ctx context.Context, wi WorkItem) error {
owi := wi.(*OrchestrationWorkItem)
return p.be.AbandonOrchestrationWorkItem(ctx, owi)
func (p *orchestratorProcessor) AbandonWorkItem(ctx context.Context, wi *OrchestrationWorkItem) error {
return p.be.AbandonOrchestrationWorkItem(ctx, wi)
}

func (w *orchestratorProcessor) applyWorkItem(ctx context.Context, wi *OrchestrationWorkItem) (context.Context, trace.Span, bool) {
Expand Down
81 changes: 76 additions & 5 deletions backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var schema string

var emptyString string = ""

var errNoWorkItems = errors.New("no work items were found")

type SqliteOptions struct {
OrchestrationLockTimeout time.Duration
ActivityLockTimeout time.Duration
Expand All @@ -40,6 +42,9 @@ type sqliteBackend struct {
workerName string
logger backend.Logger
options *SqliteOptions

activityWorker *backend.TaskWorker[*backend.ActivityWorkItem]
orchestrationWorker *backend.TaskWorker[*backend.OrchestrationWorkItem]
}

// NewSqliteOptions creates a new options object for the sqlite backend provider.
Expand Down Expand Up @@ -778,8 +783,8 @@ func (be *sqliteBackend) GetOrchestrationRuntimeState(ctx context.Context, wi *b
return state, nil
}

// GetOrchestrationWorkItem implements backend.Backend
func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend.OrchestrationWorkItem, error) {
// getOrchestrationWorkItem implements backend.Backend
func (be *sqliteBackend) getOrchestrationWorkItem(ctx context.Context) (*backend.OrchestrationWorkItem, error) {
if err := be.ensureDB(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -819,7 +824,7 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend
if err := row.Scan(&instanceID); err != nil {
if err == sql.ErrNoRows {
// No new events to process
return nil, backend.ErrNoWorkItems
return nil, errNoWorkItems
}

return nil, fmt.Errorf("failed to scan the orchestration work-item: %w", err)
Expand Down Expand Up @@ -878,7 +883,73 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend
return wi, nil
}

func (be *sqliteBackend) GetActivityWorkItem(ctx context.Context) (*backend.ActivityWorkItem, error) {
func (be *sqliteBackend) NextOrchestrationWorkItem(ctx context.Context) (*backend.OrchestrationWorkItem, error) {
b := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 50 * time.Millisecond,
MaxInterval: 5 * time.Second,
Multiplier: 1.05,
RandomizationFactor: 0.05,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)

for {
wi, err := be.getOrchestrationWorkItem(ctx)
if err == nil {
return wi, nil
}

if !errors.Is(err, errNoWorkItems) {
return nil, err
}

t := time.NewTimer(b.NextBackOff())
select {
case <-t.C:
case <-ctx.Done():
if !t.Stop() {
<-t.C
}
be.logger.Info("Activity: received cancellation signal")
return nil, ctx.Err()
}
}
}

func (be *sqliteBackend) NextActivityWorkItem(ctx context.Context) (*backend.ActivityWorkItem, error) {
b := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 50 * time.Millisecond,
MaxInterval: 5 * time.Second,
Multiplier: 1.05,
RandomizationFactor: 0.05,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)

for {
wi, err := be.getActivityWorkItem(ctx)
if err == nil {
return wi, nil
}

if !errors.Is(err, errNoWorkItems) {
return nil, err
}

t := time.NewTimer(b.NextBackOff())
select {
case <-t.C:
case <-ctx.Done():
if !t.Stop() {
<-t.C
}
be.logger.Info("Activity: received cancellation signal")
return nil, ctx.Err()
}
}
}

func (be *sqliteBackend) getActivityWorkItem(ctx context.Context) (*backend.ActivityWorkItem, error) {
if err := be.ensureDB(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -910,7 +981,7 @@ func (be *sqliteBackend) GetActivityWorkItem(ctx context.Context) (*backend.Acti
if err := row.Scan(&sequenceNumber, &instanceID, &eventPayload); err != nil {
if err == sql.ErrNoRows {
// No new activity tasks to process
return nil, backend.ErrNoWorkItems
return nil, errNoWorkItems
}

return nil, fmt.Errorf("failed to scan the activity work-item: %w", err)
Expand Down
11 changes: 6 additions & 5 deletions backend/taskhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ type TaskHubWorker interface {

type taskHubWorker struct {
backend Backend
orchestrationWorker TaskWorker
activityWorker TaskWorker
orchestrationWorker TaskWorker[*OrchestrationWorkItem]
activityWorker TaskWorker[*ActivityWorkItem]
logger Logger
}

func NewTaskHubWorker(be Backend, orchestrationWorker TaskWorker, activityWorker TaskWorker, logger Logger) TaskHubWorker {
func NewTaskHubWorker(be Backend, orchestrationWorker TaskWorker[*OrchestrationWorkItem], activityWorker TaskWorker[*ActivityWorkItem], logger Logger) TaskHubWorker {
return &taskHubWorker{
backend: be,
orchestrationWorker: orchestrationWorker,
Expand All @@ -30,13 +30,14 @@ func NewTaskHubWorker(be Backend, orchestrationWorker TaskWorker, activityWorker
}

func (w *taskHubWorker) Start(ctx context.Context) error {
// TODO: Check for already started worker
if err := w.backend.CreateTaskHub(ctx); err != nil && err != ErrTaskHubExists {
return err
}

if err := w.backend.Start(ctx); err != nil {
return err
}

w.logger.Infof("worker started with backend %v", w.backend)

w.orchestrationWorker.Start(ctx)
Expand All @@ -53,7 +54,7 @@ func (w *taskHubWorker) Shutdown(ctx context.Context) error {
w.logger.Info("workers stopping and draining...")
defer w.logger.Info("finished stopping and draining workers!")

wg := sync.WaitGroup{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
Loading

0 comments on commit 78eae56

Please sign in to comment.