Skip to content

Commit dc443d8

Browse files
committed
chore: refactor message handling and update dependencies
- Change `QueuedMessage` to `TaskMessage` in various function signatures - Update dependency `github.com/golang-queue/queue` to version `v0.3.0` - Add error handling for JSON unmarshal in `redis.go` Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 0fce981 commit dc443d8

File tree

6 files changed

+21
-16
lines changed

6 files changed

+21
-16
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func main() {
6363
w := redisdb.NewWorker(
6464
redisdb.WithAddr("127.0.0.1:6379"),
6565
redisdb.WithChannel("foobar"),
66-
redisdb.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
66+
redisdb.WithRunFunc(func(ctx context.Context, m queue.TaskMessage) error {
6767
v, ok := m.(*job)
6868
if !ok {
6969
if err := json.Unmarshal(m.Bytes(), &v); err != nil {

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/golang-queue/redisdb
33
go 1.22
44

55
require (
6-
github.com/golang-queue/queue v0.2.2-0.20250119152159-18502599f7a6
6+
github.com/golang-queue/queue v0.3.0
77
github.com/redis/go-redis/v9 v9.7.0
88
github.com/stretchr/testify v1.10.0
99
github.com/testcontainers/testcontainers-go v0.35.0

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
5252
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
5353
github.com/golang-queue/queue v0.2.2-0.20250119152159-18502599f7a6 h1:ifL43cgrzjheIJqb6dU4poR+4U1jhTwzkXwlh29nC8w=
5454
github.com/golang-queue/queue v0.2.2-0.20250119152159-18502599f7a6/go.mod h1:SkjMwz1TjxZOrF7kABvbar1CagcMxwRtXt5Tx00wb4g=
55+
github.com/golang-queue/queue v0.3.0 h1:gyBLNT9EDOsChazYScp8iLiwLfG0SdnCDmNUybcHig4=
56+
github.com/golang-queue/queue v0.3.0/go.mod h1:SkjMwz1TjxZOrF7kABvbar1CagcMxwRtXt5Tx00wb4g=
5557
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
5658
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
5759
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=

options.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
type Option func(*options)
1313

1414
type options struct {
15-
runFunc func(context.Context, core.QueuedMessage) error
15+
runFunc func(context.Context, core.TaskMessage) error
1616
logger queue.Logger
1717
addr string
1818
db int
@@ -127,7 +127,7 @@ func WithChannel(channel string) Option {
127127
}
128128

129129
// WithRunFunc setup the run func of queue
130-
func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option {
130+
func WithRunFunc(fn func(context.Context, core.TaskMessage) error) Option {
131131
return func(w *options) {
132132
w.runFunc = fn
133133
}
@@ -153,7 +153,7 @@ func newOptions(opts ...Option) options {
153153
// default channel size in go-redis package
154154
channelSize: 100,
155155
logger: queue.NewLogger(),
156-
runFunc: func(context.Context, core.QueuedMessage) error {
156+
runFunc: func(context.Context, core.TaskMessage) error {
157157
return nil
158158
},
159159
}

redis.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func NewWorker(opts ...Option) *Worker {
112112
}
113113

114114
// Run to execute new task
115-
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
115+
func (w *Worker) Run(ctx context.Context, task core.TaskMessage) error {
116116
return w.opts.runFunc(ctx, task)
117117
}
118118

@@ -136,7 +136,7 @@ func (w *Worker) Shutdown() error {
136136
}
137137

138138
// Queue send notification to queue
139-
func (w *Worker) Queue(job core.QueuedMessage) error {
139+
func (w *Worker) Queue(job core.TaskMessage) error {
140140
if atomic.LoadInt32(&w.stopFlag) == 1 {
141141
return queue.ErrQueueShutdown
142142
}
@@ -153,7 +153,7 @@ func (w *Worker) Queue(job core.QueuedMessage) error {
153153
}
154154

155155
// Request a new task
156-
func (w *Worker) Request() (core.QueuedMessage, error) {
156+
func (w *Worker) Request() (core.TaskMessage, error) {
157157
clock := 0
158158
loop:
159159
for {
@@ -163,7 +163,10 @@ loop:
163163
return nil, queue.ErrQueueHasBeenClosed
164164
}
165165
var data job.Message
166-
_ = json.Unmarshal([]byte(task.Payload), &data)
166+
err := json.Unmarshal([]byte(task.Payload), &data)
167+
if err != nil {
168+
return nil, err
169+
}
167170
return &data, nil
168171
case <-time.After(1 * time.Second):
169172
if clock == 5 {

redis_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func TestCustomFuncAndWait(t *testing.T) {
179179
w := NewWorker(
180180
WithAddr(endpoint),
181181
WithChannel("test3"),
182-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
182+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
183183
time.Sleep(500 * time.Millisecond)
184184
return nil
185185
}),
@@ -227,7 +227,7 @@ func TestRedisCluster(t *testing.T) {
227227
WithAddr(strings.Join(hosts, ",")),
228228
WithChannel("testCluster"),
229229
WithCluster(),
230-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
230+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
231231
time.Sleep(500 * time.Millisecond)
232232
return nil
233233
}),
@@ -281,7 +281,7 @@ func TestRedisSentinel(t *testing.T) {
281281
WithMasterName("mymaster"),
282282
WithChannel("testSentinel"),
283283
WithSentinel(),
284-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
284+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
285285
time.Sleep(500 * time.Millisecond)
286286
return nil
287287
}),
@@ -335,7 +335,7 @@ func TestJobReachTimeout(t *testing.T) {
335335
w := NewWorker(
336336
WithAddr(endpoint),
337337
WithChannel("timeout"),
338-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
338+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
339339
for {
340340
select {
341341
case <-ctx.Done():
@@ -378,7 +378,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
378378
WithAddr(endpoint),
379379
WithChannel("cancel"),
380380
WithLogger(queue.NewLogger()),
381-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
381+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
382382
for {
383383
select {
384384
case <-ctx.Done():
@@ -421,7 +421,7 @@ func TestGoroutineLeak(t *testing.T) {
421421
WithAddr(endpoint),
422422
WithChannel("GoroutineLeak"),
423423
WithLogger(queue.NewEmptyLogger()),
424-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
424+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
425425
for {
426426
select {
427427
case <-ctx.Done():
@@ -468,7 +468,7 @@ func TestGoroutinePanic(t *testing.T) {
468468
w := NewWorker(
469469
WithAddr(endpoint),
470470
WithChannel("GoroutinePanic"),
471-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
471+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
472472
panic("missing something")
473473
}),
474474
)

0 commit comments

Comments
 (0)