From ed7f98891fb93812b9e5924795e24b07139aa13f Mon Sep 17 00:00:00 2001 From: Yonghwan SO Date: Sat, 14 May 2022 13:52:32 +0900 Subject: [PATCH] fixed racing condition of the simple worker --- worker/simple.go | 28 ++++++++++++++++++++++++++-- worker/worker.go | 12 ++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/worker/simple.go b/worker/simple.go index 76d9ef673..6097cbb3a 100644 --- a/worker/simple.go +++ b/worker/simple.go @@ -36,6 +36,7 @@ func NewSimpleWithContext(ctx context.Context) *Simple { cancel: cancel, handlers: map[string]Handler{}, moot: &sync.Mutex{}, + started: false, } } @@ -48,6 +49,7 @@ type Simple struct { handlers map[string]Handler moot *sync.Mutex wg sync.WaitGroup + started bool } // Register Handler with the worker @@ -70,7 +72,11 @@ func (w *Simple) Start(ctx context.Context) error { // TODO(sio4): #road-to-v1 - define the purpose of Start clearly w.Logger.Info("starting Simple background worker") + w.moot.Lock() + defer w.moot.Unlock() + w.ctx, w.cancel = context.WithCancel(ctx) + w.started = true return nil } @@ -91,6 +97,13 @@ func (w *Simple) Stop() error { // Perform a job as soon as possibly using a goroutine. func (w *Simple) Perform(job Job) error { + w.moot.Lock() + defer w.moot.Unlock() + + if !w.started { + return fmt.Errorf("worker is not yet started") + } + // Perform should not allow a job submission if the worker is not running if err := w.ctx.Err(); err != nil { return fmt.Errorf("worker is not ready to perform a job: %v", err) @@ -104,8 +117,6 @@ func (w *Simple) Perform(job Job) error { return err } - w.moot.Lock() - defer w.moot.Unlock() if h, ok := w.handlers[job.Handler]; ok { // TODO(sio4): #road-to-v1 - consider timeout and/or cancellation w.wg.Add(1) @@ -145,6 +156,19 @@ func (w *Simple) PerformIn(job Job, d time.Duration) error { go func() { defer w.wg.Done() + for { + w.moot.Lock() + if w.started { + w.moot.Unlock() + break + } + w.moot.Unlock() + + waiting := 100 * time.Millisecond + time.Sleep(waiting) + d = d - waiting + } + select { case <-time.After(d): w.Perform(job) diff --git a/worker/worker.go b/worker/worker.go index 084f5edd7..36fc0262a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -25,3 +25,15 @@ type Worker interface { // Register a Handler Register(string, Handler) error } + +/* TODO(sio4): #road-to-v1 - redefine Worker interface clearer +1. The Start() functions of current implementations including Simple, + Gocraft Work Adapter do not block and immediately return the error. + However, App.Serve() calls them within a go routine. +2. The Perform() family of functions can be called before the worker + was started once the worker configured. Could be fine but there should + be some guidiance for its usage. +3. The Perform() function could be interpreted as "Do it" by its name but + their actual job is "Enqueue it" even though Simple worker has no clear + boundary between them. It could make confusion. +*/