Skip to content
This repository has been archived by the owner on Feb 24, 2024. It is now read-only.

added input/status checks, testcases, and comments for road-to-v1 #2243

Merged
merged 2 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions worker/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ var _ Worker = &Simple{}
// NewSimple creates a basic implementation of the Worker interface
// that is backed using just the standard library and goroutines.
func NewSimple() *Simple {
// TODO(sio4): #road-to-v1 - how to check if the worker is ready to work
// when worker should be initialized? how to check if worker is ready?
// and purpose of the context
return NewSimpleWithContext(context.Background())
}

Expand Down Expand Up @@ -49,6 +52,10 @@ type Simple struct {

// Register Handler with the worker
func (w *Simple) Register(name string, h Handler) error {
if name == "" || h == nil {
return fmt.Errorf("name or handler cannot be empty/nil")
}

w.moot.Lock()
defer w.moot.Unlock()
if _, ok := w.handlers[name]; ok {
Expand All @@ -60,6 +67,7 @@ func (w *Simple) Register(name string, h Handler) error {

// Start the worker
func (w *Simple) Start(ctx context.Context) error {
// TODO(sio4): #road-to-v1 - define the purpose of Start clearly
w.Logger.Info("starting Simple background worker")

w.ctx, w.cancel = context.WithCancel(ctx)
Expand All @@ -68,28 +76,38 @@ func (w *Simple) Start(ctx context.Context) error {

// Stop the worker
func (w *Simple) Stop() error {
// prevent job submission when stopping
w.moot.Lock()
defer w.moot.Unlock()

w.Logger.Info("stopping Simple background worker")

w.cancel()

w.wg.Wait()
w.Logger.Info("all background jobs stopped completely")
w.cancel()
return nil
}

// Perform a job as soon as possibly using a goroutine.
func (w *Simple) Perform(job Job) error {
// Perform should not allow a job submission if the worker is not running
if err := w.ctx.Err(); err != nil {
return fmt.Errorf("worker is not ready to perform a job: %v", err)
}

w.Logger.Debugf("performing job %s", job)

if job.Handler == "" {
err := fmt.Errorf("no handler name given for %s", job)
err := fmt.Errorf("no handler name given: %s", job)
w.Logger.Error(err)
return err
}

w.moot.Lock()
defer w.moot.Unlock()
if h, ok := w.handlers[job.Handler]; ok {
// TODO: consider to implement timeout and/or cancellation
// TODO(sio4): #road-to-v1 - consider timeout and/or cancellation
w.wg.Add(1)
go func() {
defer w.wg.Done()
Expand Down Expand Up @@ -118,11 +136,20 @@ func (w *Simple) PerformAt(job Job, t time.Time) error {
// PerformIn performs a job after waiting for a specified amount
// using a goroutine.
func (w *Simple) PerformIn(job Job, d time.Duration) error {
// Perform should not allow a job submission if the worker is not running
if err := w.ctx.Err(); err != nil {
return fmt.Errorf("worker is not ready to perform a job: %v", err)
}

w.wg.Add(1) // waiting job also should be counted
go func() {
defer w.wg.Done()

select {
case <-time.After(d):
w.Perform(job)
case <-w.ctx.Done():
// TODO(sio4): #road-to-v1 - it should be guaranteed to be performed
w.cancel()
}
}()
Expand Down
206 changes: 183 additions & 23 deletions worker/simple_test.go
Original file line number Diff line number Diff line change
@@ -1,83 +1,196 @@
package worker

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func sampleHandler(Args) error {
return nil
}

func Test_Simple_RegisterEmpty(t *testing.T) {
r := require.New(t)

w := NewSimple()
err := w.Register("", sampleHandler)
r.Error(err)
}

func Test_Simple_RegisterNil(t *testing.T) {
r := require.New(t)

w := NewSimple()
err := w.Register("sample", nil)
r.Error(err)
}

func Test_Simple_RegisterEmptyNil(t *testing.T) {
r := require.New(t)

w := NewSimple()
err := w.Register("", nil)
r.Error(err)
}

func Test_Simple_RegisterExisting(t *testing.T) {
r := require.New(t)

w := NewSimple()
err := w.Register("sample", sampleHandler)
r.NoError(err)

err = w.Register("sample", sampleHandler)
r.Error(err)
}

func Test_Simple_StartStop(t *testing.T) {
r := require.New(t)

w := NewSimple()
ctx := context.Background()
err := w.Start(ctx)
r.NoError(err)
r.NotNil(w.ctx)
r.Nil(w.ctx.Err())

err = w.Stop()
r.NoError(err)
r.NotNil(w.ctx)
r.NotNil(w.ctx.Err())
}

func Test_Simple_Perform(t *testing.T) {
r := require.New(t)

var hit bool
wg := &sync.WaitGroup{}
wg.Add(1)
w := NewSimple()
r.NoError(w.Start(context.Background()))

w.Register("x", func(Args) error {
hit = true
wg.Done()
return nil
})
w.Perform(Job{
Handler: "x",
})
wg.Wait()

// the worker should guarantee the job is finished before the worker stopped
r.NoError(w.Stop())
r.True(hit)
}

func Test_Simple_PerformAt(t *testing.T) {
func Test_Simple_PerformBroken(t *testing.T) {
r := require.New(t)

var hit bool
wg := &sync.WaitGroup{}
wg.Add(1)
w := NewSimple()
r.NoError(w.Start(context.Background()))

w.Register("x", func(Args) error {
hit = true
wg.Done()

//Index out of bounds on purpose
println([]string{}[0])

return nil
})
w.PerformAt(Job{
w.Perform(Job{
Handler: "x",
}, time.Now().Add(5*time.Millisecond))
wg.Wait()
})

r.NoError(w.Stop())
r.True(hit)
}

func Test_Simple_PerformBroken(t *testing.T) {
func Test_Simple_PerformWithEmptyJob(t *testing.T) {
r := require.New(t)

w := NewSimple()
r.NoError(w.Start(context.Background()))
defer w.Stop()

err := w.Perform(Job{})
r.Error(err)
}

func Test_Simple_PerformWithUnknownJob(t *testing.T) {
r := require.New(t)

w := NewSimple()
r.NoError(w.Start(context.Background()))
defer w.Stop()

err := w.Perform(Job{Handler: "unknown"})
r.Error(err)
}

/* TODO(sio4): #road-to-v1 - define the purpose of Start clearly
consider to make Perform to work only when the worker is started.
func Test_Simple_PerformBeforeStart(t *testing.T) {
r := require.New(t)

w := NewSimple()
r.NoError(w.Register("sample", sampleHandler))

err := w.Perform(Job{Handler: "sample"})
r.Error(err)
}
*/

func Test_Simple_PerformAfterStop(t *testing.T) {
r := require.New(t)

w := NewSimple()
r.NoError(w.Register("sample", sampleHandler))
r.NoError(w.Start(context.Background()))
r.NoError(w.Stop())

err := w.Perform(Job{Handler: "sample"})
r.Error(err)
}

func Test_Simple_PerformAt(t *testing.T) {
r := require.New(t)

var hit bool
w := NewSimple()
r.NoError(w.Start(context.Background()))

wg := &sync.WaitGroup{}
wg.Add(1)

w := NewSimple()
w.Register("x", func(Args) error {
hit = true
wg.Done()

//Index out of bounds on purpose
println([]string{}[0])

return nil
})

w.Perform(Job{
w.PerformAt(Job{
Handler: "x",
})
}, time.Now().Add(5*time.Millisecond))

// how long does the handler take for assignment? hmm,
time.Sleep(100 * time.Millisecond)
wg.Wait()
r.True(hit)

r.NoError(w.Stop())
}

func Test_Simple_PerformIn(t *testing.T) {
r := require.New(t)

var hit bool
w := NewSimple()
r.NoError(w.Start(context.Background()))

wg := &sync.WaitGroup{}
wg.Add(1)
w := NewSimple()

w.Register("x", func(Args) error {
hit = true
wg.Done()
Expand All @@ -86,14 +199,61 @@ func Test_Simple_PerformIn(t *testing.T) {
w.PerformIn(Job{
Handler: "x",
}, 5*time.Millisecond)

// how long does the handler take for assignment? hmm,
time.Sleep(100 * time.Millisecond)
wg.Wait()
r.True(hit)

r.NoError(w.Stop())
}

func Test_Simple_NoHandler(t *testing.T) {
/* TODO(sio4): #road-to-v1 - define the purpose of Start clearly
consider to make Perform to work only when the worker is started.
func Test_Simple_PerformInBeforeStart(t *testing.T) {
r := require.New(t)

w := NewSimple()
err := w.Perform(Job{})
r.NoError(w.Register("sample", sampleHandler))

err := w.PerformIn(Job{Handler: "sample"}, 5*time.Millisecond)
r.Error(err)
}
*/

func Test_Simple_PerformInAfterStop(t *testing.T) {
r := require.New(t)

w := NewSimple()
r.NoError(w.Register("sample", sampleHandler))
r.NoError(w.Start(context.Background()))
r.NoError(w.Stop())

err := w.PerformIn(Job{Handler: "sample"}, 5*time.Millisecond)
r.Error(err)
}

/* TODO(sio4): #road-to-v1 - it should be guaranteed to be performed
consider to make PerformIn to guarantee the job execution
func Test_Simple_PerformInFollowedByStop(t *testing.T) {
r := require.New(t)

var hit bool
w := NewSimple()
r.NoError(w.Start(context.Background()))

w.Register("x", func(Args) error {
hit = true
return nil
})
err := w.PerformIn(Job{
Handler: "x",
}, 5*time.Millisecond)
r.NoError(err)

// stop the worker immediately after PerformIn
r.NoError(w.Stop())

r.True(hit)
}
*/