Skip to content

Commit 603cfe4

Browse files
authored
feat: add configurable retry interval for queue retries (#144)
- Import `time` package - Add `WithRetryInterval` function to set retry intervals - Add `retryInterval` field to `Options` struct with default value of 1 second - New file `options_test.go` for testing `WithRetryInterval` functionality with various durations - Add `retryInterval` field to `Queue` struct - Initialize ticker with `retryInterval` in `Queue` and update its usage in `start` method Signed-off-by: appleboy <[email protected]>
1 parent e2fce4f commit 603cfe4

File tree

3 files changed

+90
-33
lines changed

3 files changed

+90
-33
lines changed

options.go

+23-13
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package queue
33
import (
44
"context"
55
"runtime"
6+
"time"
67

78
"github.com/golang-queue/queue/core"
89
)
@@ -80,26 +81,35 @@ func WithAfterFn(afterFn func()) Option {
8081
})
8182
}
8283

84+
// WithRetryInterval sets the retry interval
85+
func WithRetryInterval(d time.Duration) Option {
86+
return OptionFunc(func(q *Options) {
87+
q.retryInterval = d
88+
})
89+
}
90+
8391
// Options for custom args in Queue
8492
type Options struct {
85-
workerCount int64
86-
logger Logger
87-
queueSize int
88-
worker core.Worker
89-
fn func(context.Context, core.TaskMessage) error
90-
afterFn func()
91-
metric Metric
93+
workerCount int64
94+
logger Logger
95+
queueSize int
96+
worker core.Worker
97+
fn func(context.Context, core.TaskMessage) error
98+
afterFn func()
99+
metric Metric
100+
retryInterval time.Duration
92101
}
93102

94103
// NewOptions initialize the default value for the options
95104
func NewOptions(opts ...Option) *Options {
96105
o := &Options{
97-
workerCount: defaultWorkerCount,
98-
queueSize: defaultCapacity,
99-
logger: defaultNewLogger,
100-
worker: nil,
101-
fn: defaultFn,
102-
metric: defaultMetric,
106+
workerCount: defaultWorkerCount,
107+
queueSize: defaultCapacity,
108+
logger: defaultNewLogger,
109+
worker: nil,
110+
fn: defaultFn,
111+
metric: defaultMetric,
112+
retryInterval: time.Second,
103113
}
104114

105115
// Loop through each option

options_test.go

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package queue
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestWithRetryInterval(t *testing.T) {
9+
tests := []struct {
10+
name string
11+
duration time.Duration
12+
want time.Duration
13+
}{
14+
{
15+
name: "Set 2 seconds retry interval",
16+
duration: 2 * time.Second,
17+
want: 2 * time.Second,
18+
},
19+
{
20+
name: "Set 500ms retry interval",
21+
duration: 500 * time.Millisecond,
22+
want: 500 * time.Millisecond,
23+
},
24+
{
25+
name: "Set zero retry interval",
26+
duration: 0,
27+
want: 0,
28+
},
29+
{
30+
name: "Set negative retry interval",
31+
duration: -1 * time.Second,
32+
want: -1 * time.Second,
33+
},
34+
}
35+
36+
for _, tt := range tests {
37+
t.Run(tt.name, func(t *testing.T) {
38+
opts := NewOptions(WithRetryInterval(tt.duration))
39+
if opts.retryInterval != tt.want {
40+
t.Errorf("WithRetryInterval() = %v, want %v", opts.retryInterval, tt.want)
41+
}
42+
})
43+
}
44+
}

queue.go

+23-20
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,17 @@ type (
2020
// A Queue is a message queue.
2121
Queue struct {
2222
sync.Mutex
23-
metric *metric
24-
logger Logger
25-
workerCount int64
26-
routineGroup *routineGroup
27-
quit chan struct{}
28-
ready chan struct{}
29-
worker core.Worker
30-
stopOnce sync.Once
31-
stopFlag int32
32-
afterFn func()
23+
metric *metric
24+
logger Logger
25+
workerCount int64
26+
routineGroup *routineGroup
27+
quit chan struct{}
28+
ready chan struct{}
29+
worker core.Worker
30+
stopOnce sync.Once
31+
stopFlag int32
32+
afterFn func()
33+
retryInterval time.Duration
3334
}
3435
)
3536

@@ -40,14 +41,15 @@ var ErrMissingWorker = errors.New("missing worker module")
4041
func NewQueue(opts ...Option) (*Queue, error) {
4142
o := NewOptions(opts...)
4243
q := &Queue{
43-
routineGroup: newRoutineGroup(),
44-
quit: make(chan struct{}),
45-
ready: make(chan struct{}, 1),
46-
workerCount: o.workerCount,
47-
logger: o.logger,
48-
worker: o.worker,
49-
metric: &metric{},
50-
afterFn: o.afterFn,
44+
routineGroup: newRoutineGroup(),
45+
quit: make(chan struct{}),
46+
ready: make(chan struct{}, 1),
47+
workerCount: o.workerCount,
48+
logger: o.logger,
49+
worker: o.worker,
50+
metric: &metric{},
51+
afterFn: o.afterFn,
52+
retryInterval: o.retryInterval,
5153
}
5254

5355
if q.worker == nil {
@@ -296,6 +298,8 @@ func (q *Queue) schedule() {
296298
// start to start all worker
297299
func (q *Queue) start() {
298300
tasks := make(chan core.TaskMessage, 1)
301+
ticker := time.NewTicker(q.retryInterval)
302+
defer ticker.Stop()
299303

300304
for {
301305
// check worker number
@@ -320,8 +324,7 @@ func (q *Queue) start() {
320324
close(tasks)
321325
return
322326
}
323-
case <-time.After(time.Second):
324-
// sleep 1 second to fetch new task
327+
case <-ticker.C:
325328
}
326329
}
327330
}

0 commit comments

Comments
 (0)