diff --git a/options.go b/options.go index 840a681..3e90aa1 100644 --- a/options.go +++ b/options.go @@ -3,6 +3,7 @@ package queue import ( "context" "runtime" + "time" "github.com/golang-queue/queue/core" ) @@ -80,26 +81,35 @@ func WithAfterFn(afterFn func()) Option { }) } +// WithRetryInterval sets the retry interval +func WithRetryInterval(d time.Duration) Option { + return OptionFunc(func(q *Options) { + q.retryInterval = d + }) +} + // Options for custom args in Queue type Options struct { - workerCount int64 - logger Logger - queueSize int - worker core.Worker - fn func(context.Context, core.TaskMessage) error - afterFn func() - metric Metric + workerCount int64 + logger Logger + queueSize int + worker core.Worker + fn func(context.Context, core.TaskMessage) error + afterFn func() + metric Metric + retryInterval time.Duration } // NewOptions initialize the default value for the options func NewOptions(opts ...Option) *Options { o := &Options{ - workerCount: defaultWorkerCount, - queueSize: defaultCapacity, - logger: defaultNewLogger, - worker: nil, - fn: defaultFn, - metric: defaultMetric, + workerCount: defaultWorkerCount, + queueSize: defaultCapacity, + logger: defaultNewLogger, + worker: nil, + fn: defaultFn, + metric: defaultMetric, + retryInterval: time.Second, } // Loop through each option diff --git a/options_test.go b/options_test.go new file mode 100644 index 0000000..f41b4aa --- /dev/null +++ b/options_test.go @@ -0,0 +1,44 @@ +package queue + +import ( + "testing" + "time" +) + +func TestWithRetryInterval(t *testing.T) { + tests := []struct { + name string + duration time.Duration + want time.Duration + }{ + { + name: "Set 2 seconds retry interval", + duration: 2 * time.Second, + want: 2 * time.Second, + }, + { + name: "Set 500ms retry interval", + duration: 500 * time.Millisecond, + want: 500 * time.Millisecond, + }, + { + name: "Set zero retry interval", + duration: 0, + want: 0, + }, + { + name: "Set negative retry interval", + duration: -1 * time.Second, + want: -1 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := NewOptions(WithRetryInterval(tt.duration)) + if opts.retryInterval != tt.want { + t.Errorf("WithRetryInterval() = %v, want %v", opts.retryInterval, tt.want) + } + }) + } +} diff --git a/queue.go b/queue.go index a502821..a0a72d1 100644 --- a/queue.go +++ b/queue.go @@ -20,16 +20,17 @@ type ( // A Queue is a message queue. Queue struct { sync.Mutex - metric *metric - logger Logger - workerCount int64 - routineGroup *routineGroup - quit chan struct{} - ready chan struct{} - worker core.Worker - stopOnce sync.Once - stopFlag int32 - afterFn func() + metric *metric + logger Logger + workerCount int64 + routineGroup *routineGroup + quit chan struct{} + ready chan struct{} + worker core.Worker + stopOnce sync.Once + stopFlag int32 + afterFn func() + retryInterval time.Duration } ) @@ -40,14 +41,15 @@ var ErrMissingWorker = errors.New("missing worker module") func NewQueue(opts ...Option) (*Queue, error) { o := NewOptions(opts...) q := &Queue{ - routineGroup: newRoutineGroup(), - quit: make(chan struct{}), - ready: make(chan struct{}, 1), - workerCount: o.workerCount, - logger: o.logger, - worker: o.worker, - metric: &metric{}, - afterFn: o.afterFn, + routineGroup: newRoutineGroup(), + quit: make(chan struct{}), + ready: make(chan struct{}, 1), + workerCount: o.workerCount, + logger: o.logger, + worker: o.worker, + metric: &metric{}, + afterFn: o.afterFn, + retryInterval: o.retryInterval, } if q.worker == nil { @@ -296,6 +298,8 @@ func (q *Queue) schedule() { // start to start all worker func (q *Queue) start() { tasks := make(chan core.TaskMessage, 1) + ticker := time.NewTicker(q.retryInterval) + defer ticker.Stop() for { // check worker number @@ -320,8 +324,7 @@ func (q *Queue) start() { close(tasks) return } - case <-time.After(time.Second): - // sleep 1 second to fetch new task + case <-ticker.C: } } }