Skip to content

Commit

Permalink
fix: call errfunc when task is running on singleton mode
Browse files Browse the repository at this point in the history
Signed-off-by: rfyiamcool <[email protected]>
  • Loading branch information
rfyiamcool committed Nov 21, 2023
1 parent bef788b commit 1816e99
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 11 deletions.
10 changes: 10 additions & 0 deletions tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ package tasks

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -190,6 +191,9 @@ type Scheduler struct {
var (
// ErrIDInUse is returned when a Task ID is specified but already used.
ErrIDInUse = fmt.Errorf("ID already used")

// ErrTaskRunning is returned when the previous task is running.
ErrTaskRunning = errors.New("the previous task is still running")
)

// New will create a new scheduler instance that allows users to create and manage tasks.
Expand Down Expand Up @@ -366,6 +370,12 @@ func (schd *Scheduler) execTask(t *Task) {
go func() {
t.Lock()
if t.Singleton && t.isRunning {
if t.ErrFuncWithTaskContext != nil {
go t.ErrFuncWithTaskContext(t.TaskContext, ErrTaskRunning)
} else {
go t.ErrFunc(ErrTaskRunning)
}

t.Unlock()
return
}
Expand Down
69 changes: 58 additions & 11 deletions tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,12 +604,9 @@ func TestSchedulerSingleton(t *testing.T) {

defer scheduler.Del(id)

select {
case <-time.After(3 * time.Second):
if len(doneCh) > 4 {
t.Log("failed to run with singleton mode")
}
return
<-time.After(3 * time.Second)
if len(doneCh) > 4 {
t.Log("failed to run with singleton mode")
}
})

Expand All @@ -634,12 +631,62 @@ func TestSchedulerSingleton(t *testing.T) {

defer scheduler.Del(id)

select {
case <-time.After(3 * time.Second):
if atomic.LoadInt32(&incr) > 4 {
t.Log("failed to run with singleton mode")
<-time.After(3 * time.Second)
if atomic.LoadInt32(&incr) > 4 {
t.Log("failed to run with singleton mode")
}
})

t.Run("Verify singleton by errFunc", func(t *testing.T) {
// Channel for orchestrating when the task ran
var incr int32
var errchan = make(chan error, 1)

// Setup A task
id, err := scheduler.Add(&Task{
Interval: time.Duration(100 * time.Millisecond),
Singleton: true,
TaskFunc: func() error {
fmt.Println("run")
atomic.AddInt32(&incr, 1)
time.Sleep(1 * time.Second)
return nil
},
ErrFunc: func(e error) {
fmt.Println("err")
errchan <- e
},
})
if err != nil {
t.Errorf("Unexpected errors when scheduling a valid task - %s", err)
}

defer scheduler.Del(id)

var (
timeout = time.After(3 * time.Second)
errIncr = 0
)

for {
select {
case err := <-errchan:
if err != ErrTaskRunning {
t.Error("the err must be ErrTaskRunning")
}
errIncr++

case <-timeout:
if atomic.LoadInt32(&incr) > 4 {
t.Error("failed to run with singleton mode")
}

if errIncr < 20 {
t.Error("failed to run with singleton mode")
}

return
}
return
}
})
}
Expand Down

0 comments on commit 1816e99

Please sign in to comment.