Skip to content
This repository has been archived by the owner on Feb 24, 2024. It is now read-only.

fixed racing condition of the simple worker #2266

Merged
merged 1 commit into from
May 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions worker/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewSimpleWithContext(ctx context.Context) *Simple {
cancel: cancel,
handlers: map[string]Handler{},
moot: &sync.Mutex{},
started: false,
}
}

Expand All @@ -48,6 +49,7 @@ type Simple struct {
handlers map[string]Handler
moot *sync.Mutex
wg sync.WaitGroup
started bool
}

// Register Handler with the worker
Expand All @@ -70,7 +72,11 @@ func (w *Simple) Start(ctx context.Context) error {
// TODO(sio4): #road-to-v1 - define the purpose of Start clearly
w.Logger.Info("starting Simple background worker")

w.moot.Lock()
defer w.moot.Unlock()

w.ctx, w.cancel = context.WithCancel(ctx)
w.started = true
return nil
}

Expand All @@ -91,6 +97,13 @@ func (w *Simple) Stop() error {

// Perform a job as soon as possibly using a goroutine.
func (w *Simple) Perform(job Job) error {
w.moot.Lock()
defer w.moot.Unlock()

if !w.started {
return fmt.Errorf("worker is not yet started")
}

// Perform should not allow a job submission if the worker is not running
if err := w.ctx.Err(); err != nil {
return fmt.Errorf("worker is not ready to perform a job: %v", err)
Expand All @@ -104,8 +117,6 @@ func (w *Simple) Perform(job Job) error {
return err
}

w.moot.Lock()
defer w.moot.Unlock()
if h, ok := w.handlers[job.Handler]; ok {
// TODO(sio4): #road-to-v1 - consider timeout and/or cancellation
w.wg.Add(1)
Expand Down Expand Up @@ -145,6 +156,19 @@ func (w *Simple) PerformIn(job Job, d time.Duration) error {
go func() {
defer w.wg.Done()

for {
w.moot.Lock()
if w.started {
w.moot.Unlock()
break
}
w.moot.Unlock()

waiting := 100 * time.Millisecond
time.Sleep(waiting)
d = d - waiting
}

select {
case <-time.After(d):
w.Perform(job)
Expand Down
12 changes: 12 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@ type Worker interface {
// Register a Handler
Register(string, Handler) error
}

/* TODO(sio4): #road-to-v1 - redefine Worker interface clearer
1. The Start() functions of current implementations including Simple,
Gocraft Work Adapter do not block and immediately return the error.
However, App.Serve() calls them within a go routine.
2. The Perform() family of functions can be called before the worker
was started once the worker configured. Could be fine but there should
be some guidiance for its usage.
3. The Perform() function could be interpreted as "Do it" by its name but
their actual job is "Enqueue it" even though Simple worker has no clear
boundary between them. It could make confusion.
*/