Skip to content

Commit

Permalink
fix: redisPubSub ping (#2739)
Browse files Browse the repository at this point in the history
Signed-off-by: XinTong Zhou <[email protected]>
  • Loading branch information
retoool authored and ngjaying committed Mar 29, 2024
1 parent 7404c57 commit 94a7190
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 23 deletions.
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,7 @@ github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/
github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A=
github.com/jackc/puddle v1.3.0 h1:eHK/5clGOatcjX3oWGBO/MpxpbHzSwud5EWTSCI+MX0=
github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle/v2 v2.1.2 h1:0f7vaaXINONKTsxYDn4otOAiJanX/BMeAtY//BXqzlg=
github.com/jackc/puddle/v2 v2.1.2/go.mod h1:2lpufsF5mRHO6SuZkm0fNYxM6SWHfvyFj62KwNzgels=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
Expand Down Expand Up @@ -1440,6 +1441,7 @@ golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
Expand Down
24 changes: 18 additions & 6 deletions internal/io/redis/pubsub/redisPub.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023-2023 [email protected]
// Copyright 2023-2024 [email protected]
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
package pubsub

import (
"context"
"fmt"

"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -67,6 +68,22 @@ func (r *redisPub) Validate(props map[string]interface{}) error {
return nil
}

func (r *redisPub) Ping(_ string, props map[string]interface{}) error {
if err := r.Configure(props); err != nil {
return err
}
r.conn = redis.NewClient(&redis.Options{
Addr: r.conf.Address,
Username: r.conf.Username,
Password: r.conf.Password,
DB: r.conf.Db,
})
if err := r.conn.Ping(context.Background()).Err(); err != nil {
return fmt.Errorf("Ping Redis failed with error: %v", err)
}
return nil
}

func (r *redisPub) Configure(props map[string]interface{}) error {
return r.Validate(props)
}
Expand All @@ -79,11 +96,6 @@ func (r *redisPub) Open(ctx api.StreamContext) error {
Password: r.conf.Password,
DB: r.conf.Db,
})
// Ping Redis to check if the connection is alive
err := r.conn.Ping(ctx).Err()
if err != nil {
return fmt.Errorf("Ping Redis failed with error: %v", err)
}
return nil
}

Expand Down
12 changes: 4 additions & 8 deletions internal/io/redis/pubsub/redisPub_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023-2023 [email protected]
// Copyright 2023-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -26,6 +26,7 @@ import (

"github.com/lf-edge/ekuiper/internal/io/mock"
mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context"
"github.com/lf-edge/ekuiper/internal/pkg/util"
"github.com/lf-edge/ekuiper/pkg/errorx"
)

Expand Down Expand Up @@ -170,19 +171,14 @@ func TestSinkDecompressorError(t *testing.T) {
}

func TestSinkPingRedisError(t *testing.T) {
s := RedisPub()
s := RedisPub().(util.PingableConn)
prop := map[string]interface{}{
"address": "127.0.0.1:6379",
"db": 0,
"channel": DefaultChannel,
}
expErrStr := fmt.Sprintf("Ping Redis failed with error")
err := s.Configure(prop)
if err != nil {
t.Error(err)
}
ctx := mockContext.NewMockContext("ruleSink", "op1")
err = s.Open(ctx)
err := s.Ping("", prop)
if err == nil {
t.Errorf("should have error")
return
Expand Down
14 changes: 8 additions & 6 deletions internal/io/redis/pubsub/redisSub.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023-2023 [email protected]
// Copyright 2023-2024 [email protected]
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,18 +72,20 @@ func (r *redisSub) Validate(props map[string]interface{}) error {
return nil
}

func (r *redisSub) Configure(_ string, props map[string]interface{}) error {
if err := r.Validate(props); err != nil {
func (r *redisSub) Ping(dataSource string, props map[string]interface{}) error {
if err := r.Configure(dataSource, props); err != nil {
return err
}
// Ping Redis to check if the connection is alive
err := r.conn.Ping(context.Background()).Err()
if err != nil {
if err := r.conn.Ping(context.Background()).Err(); err != nil {
return fmt.Errorf("Ping Redis failed with error: %v", err)
}
return nil
}

func (r *redisSub) Configure(_ string, props map[string]interface{}) error {
return r.Validate(props)
}

func (r *redisSub) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
logger := ctx.GetLogger()
logger.Infof("redisSub sink Opening")
Expand Down
7 changes: 4 additions & 3 deletions internal/io/redis/pubsub/redisSub_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023-2023 EMQ Technologies Co., Ltd.
// Copyright 2023-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@ import (
_ "go.nanomsg.org/mangos/v3/transport/ipc"

mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context"
"github.com/lf-edge/ekuiper/internal/pkg/util"
"github.com/lf-edge/ekuiper/pkg/api"
)

Expand Down Expand Up @@ -104,14 +105,14 @@ func TestSourceDecompressorError(t *testing.T) {
}

func TestSourcePingRedisError(t *testing.T) {
s := RedisSub()
s := RedisSub().(util.PingableConn)
prop := map[string]interface{}{
"address": "",
"db": 0,
"channels": []string{DefaultChannel},
}
expErrStr := fmt.Sprintf("Ping Redis failed with error")
err := s.Configure("new", prop)
err := s.Ping("new", prop)
if err == nil {
t.Errorf("should have error")
return
Expand Down

0 comments on commit 94a7190

Please sign in to comment.