Skip to content

Commit 9939cb4

Browse files
authored
chore: Support redis cluster (#6)
* chore: Support redis cluster Signed-off-by: Bo-Yi Wu <[email protected]> * update Signed-off-by: Bo-Yi Wu <[email protected]> * update Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 920b0ec commit 9939cb4

File tree

4 files changed

+80
-36
lines changed

4 files changed

+80
-36
lines changed

.github/workflows/testing.yml

+5-15
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,15 @@ jobs:
77
# You must use a Linux environment when using service containers or container jobs
88
runs-on: ubuntu-latest
99

10-
# Service containers to run with `container-job`
11-
services:
12-
# Label used to access the service container
13-
redis:
14-
# Docker Hub image
15-
image: redis
16-
# Set health checks to wait until redis has started
17-
options: >-
18-
--health-cmd "redis-cli ping"
19-
--health-interval 10s
20-
--health-timeout 5s
21-
--health-retries 5
22-
ports:
23-
- 6379:6379
24-
2510
env:
2611
GO111MODULE: on
2712
GOPROXY: https://proxy.golang.org
2813
steps:
14+
- uses: vishnudxb/[email protected]
15+
with:
16+
master1-port: 6379
17+
slave1-port: 6380
18+
2919
- name: Set up Go ${{ matrix.go }}
3020
uses: actions/setup-go@v2
3121

conf/redis.conf

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
protected-mode no
2+
cluster-enabled yes
3+
appendonly yes

redis.go

+38-15
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"strings"
78
"sync"
89
"sync/atomic"
910
"time"
@@ -20,7 +21,7 @@ type Option func(*Worker)
2021
// Worker for Redis
2122
type Worker struct {
2223
// redis config
23-
rdb *redis.Client
24+
rdb redis.Cmdable
2425
pubsub *redis.PubSub
2526
channel <-chan *redis.Message
2627
addr string
@@ -29,6 +30,7 @@ type Worker struct {
2930
password string
3031
channelName string
3132
channelSize int
33+
cluster bool
3234

3335
stopOnce sync.Once
3436
stop chan struct{}
@@ -52,6 +54,13 @@ func WithDB(db int) Option {
5254
}
5355
}
5456

57+
// WithCluster redis cluster
58+
func WithCluster(enable bool) Option {
59+
return func(w *Worker) {
60+
w.cluster = enable
61+
}
62+
}
63+
5564
// WithChannelSize redis channel size
5665
func WithChannelSize(size int) Option {
5766
return func(w *Worker) {
@@ -114,32 +123,41 @@ func NewWorker(opts ...Option) *Worker {
114123
opt(w)
115124
}
116125

117-
var options *redis.Options
118-
119126
if w.connectionString != "" {
120-
options, err = redis.ParseURL(w.connectionString)
127+
options, err := redis.ParseURL(w.connectionString)
121128
if err != nil {
122129
w.logger.Fatal(err)
123130
}
131+
w.rdb = redis.NewClient(options)
124132
} else if w.addr != "" {
125-
options = &redis.Options{
126-
Addr: w.addr,
127-
Password: w.password,
128-
DB: w.db,
133+
if w.cluster {
134+
w.rdb = redis.NewClusterClient(&redis.ClusterOptions{
135+
Addrs: strings.Split(w.addr, ","),
136+
Password: w.password,
137+
})
138+
} else {
139+
options := &redis.Options{
140+
Addr: w.addr,
141+
Password: w.password,
142+
DB: w.db,
143+
}
144+
w.rdb = redis.NewClient(options)
129145
}
130146
}
131147

132-
rdb := redis.NewClient(options)
133-
134-
_, err = rdb.Ping(context.Background()).Result()
148+
_, err = w.rdb.Ping(context.Background()).Result()
135149
if err != nil {
136150
w.logger.Fatal(err)
137151
}
138152

139-
w.rdb = rdb
140-
141153
ctx := context.Background()
142-
w.pubsub = w.rdb.Subscribe(ctx, w.channelName)
154+
155+
switch v := w.rdb.(type) {
156+
case *redis.Client:
157+
w.pubsub = v.Subscribe(ctx, w.channelName)
158+
case *redis.ClusterClient:
159+
w.pubsub = v.Subscribe(ctx, w.channelName)
160+
}
143161

144162
var ropts []redis.ChannelOption
145163

@@ -235,7 +253,12 @@ func (w *Worker) Shutdown() error {
235253

236254
w.stopOnce.Do(func() {
237255
w.pubsub.Close()
238-
w.rdb.Close()
256+
switch v := w.rdb.(type) {
257+
case *redis.Client:
258+
v.Close()
259+
case *redis.ClusterClient:
260+
v.Close()
261+
}
239262
close(w.stop)
240263
})
241264
return nil

redis_test.go

+34-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"log"
88
"runtime"
9+
"strings"
910
"testing"
1011
"time"
1112

@@ -77,20 +78,47 @@ func TestCustomFuncAndWait(t *testing.T) {
7778
return nil
7879
}),
7980
)
80-
q, err := queue.NewQueue(
81+
q := queue.NewPool(
82+
5,
8183
queue.WithWorker(w),
82-
queue.WithWorkerCount(10),
8384
)
84-
assert.NoError(t, err)
85-
q.Start()
8685
time.Sleep(100 * time.Millisecond)
8786
assert.NoError(t, q.Queue(m))
8887
assert.NoError(t, q.Queue(m))
8988
assert.NoError(t, q.Queue(m))
9089
assert.NoError(t, q.Queue(m))
9190
time.Sleep(1000 * time.Millisecond)
92-
q.Shutdown()
93-
q.Wait()
91+
q.Release()
92+
// you will see the execute time > 1000ms
93+
}
94+
95+
func TestRedisCluster(t *testing.T) {
96+
m := &mockMessage{
97+
Message: "foo",
98+
}
99+
100+
hosts := []string{host + ":6379", host + ":6380"}
101+
102+
w := NewWorker(
103+
WithAddr(strings.Join(hosts, ",")),
104+
WithChannel("testCluster"),
105+
WithCluster(true),
106+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
107+
time.Sleep(500 * time.Millisecond)
108+
return nil
109+
}),
110+
)
111+
q := queue.NewPool(
112+
5,
113+
queue.WithWorker(w),
114+
)
115+
time.Sleep(100 * time.Millisecond)
116+
assert.NoError(t, q.Queue(m))
117+
assert.NoError(t, q.Queue(m))
118+
assert.NoError(t, q.Queue(m))
119+
assert.NoError(t, q.Queue(m))
120+
time.Sleep(1000 * time.Millisecond)
121+
q.Release()
94122
// you will see the execute time > 1000ms
95123
}
96124

0 commit comments

Comments
 (0)