diff --git a/go.work.sum b/go.work.sum index 6c0aa0eda4..5c16766c70 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1075,6 +1075,7 @@ github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/ github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= github.com/jackc/puddle v1.3.0 h1:eHK/5clGOatcjX3oWGBO/MpxpbHzSwud5EWTSCI+MX0= +github.com/jackc/puddle v1.3.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle/v2 v2.1.2 h1:0f7vaaXINONKTsxYDn4otOAiJanX/BMeAtY//BXqzlg= github.com/jackc/puddle/v2 v2.1.2/go.mod h1:2lpufsF5mRHO6SuZkm0fNYxM6SWHfvyFj62KwNzgels= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= @@ -1440,6 +1441,7 @@ golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0 golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= diff --git a/internal/io/redis/pubsub/redisPub.go b/internal/io/redis/pubsub/redisPub.go index 396a5114b7..83d2bc1109 100644 --- a/internal/io/redis/pubsub/redisPub.go +++ b/internal/io/redis/pubsub/redisPub.go @@ -1,4 +1,4 @@ -// Copyright 2023-2023 emy120115@gmail.com +// Copyright 2023-2024 emy120115@gmail.com // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ package pubsub import ( + "context" "fmt" "github.com/redis/go-redis/v9" @@ -67,6 +68,22 @@ func (r *redisPub) Validate(props map[string]interface{}) error { return nil } +func (r *redisPub) Ping(_ string, props map[string]interface{}) error { + if err := r.Configure(props); err != nil { + return err + } + r.conn = redis.NewClient(&redis.Options{ + Addr: r.conf.Address, + Username: r.conf.Username, + Password: r.conf.Password, + DB: r.conf.Db, + }) + if err := r.conn.Ping(context.Background()).Err(); err != nil { + return fmt.Errorf("Ping Redis failed with error: %v", err) + } + return nil +} + func (r *redisPub) Configure(props map[string]interface{}) error { return r.Validate(props) } @@ -79,11 +96,6 @@ func (r *redisPub) Open(ctx api.StreamContext) error { Password: r.conf.Password, DB: r.conf.Db, }) - // Ping Redis to check if the connection is alive - err := r.conn.Ping(ctx).Err() - if err != nil { - return fmt.Errorf("Ping Redis failed with error: %v", err) - } return nil } diff --git a/internal/io/redis/pubsub/redisPub_test.go b/internal/io/redis/pubsub/redisPub_test.go index dbbf56b841..c91e7dc097 100644 --- a/internal/io/redis/pubsub/redisPub_test.go +++ b/internal/io/redis/pubsub/redisPub_test.go @@ -1,4 +1,4 @@ -// Copyright 2023-2023 emy120115@gmail.com +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import ( "github.com/lf-edge/ekuiper/internal/io/mock" mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" + "github.com/lf-edge/ekuiper/internal/pkg/util" "github.com/lf-edge/ekuiper/pkg/errorx" ) @@ -170,19 +171,14 @@ func TestSinkDecompressorError(t *testing.T) { } func TestSinkPingRedisError(t *testing.T) { - s := RedisPub() + s := RedisPub().(util.PingableConn) prop := map[string]interface{}{ "address": "127.0.0.1:6379", "db": 0, "channel": DefaultChannel, } expErrStr := fmt.Sprintf("Ping Redis failed with error") - err := s.Configure(prop) - if err != nil { - t.Error(err) - } - ctx := mockContext.NewMockContext("ruleSink", "op1") - err = s.Open(ctx) + err := s.Ping("", prop) if err == nil { t.Errorf("should have error") return diff --git a/internal/io/redis/pubsub/redisSub.go b/internal/io/redis/pubsub/redisSub.go index 3b66f5f887..d4ebae51f8 100644 --- a/internal/io/redis/pubsub/redisSub.go +++ b/internal/io/redis/pubsub/redisSub.go @@ -1,4 +1,4 @@ -// Copyright 2023-2023 emy120115@gmail.com +// Copyright 2023-2024 emy120115@gmail.com // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -72,18 +72,20 @@ func (r *redisSub) Validate(props map[string]interface{}) error { return nil } -func (r *redisSub) Configure(_ string, props map[string]interface{}) error { - if err := r.Validate(props); err != nil { +func (r *redisSub) Ping(dataSource string, props map[string]interface{}) error { + if err := r.Configure(dataSource, props); err != nil { return err } - // Ping Redis to check if the connection is alive - err := r.conn.Ping(context.Background()).Err() - if err != nil { + if err := r.conn.Ping(context.Background()).Err(); err != nil { return fmt.Errorf("Ping Redis failed with error: %v", err) } return nil } +func (r *redisSub) Configure(_ string, props map[string]interface{}) error { + return r.Validate(props) +} + func (r *redisSub) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) { logger := ctx.GetLogger() logger.Infof("redisSub sink Opening") diff --git a/internal/io/redis/pubsub/redisSub_test.go b/internal/io/redis/pubsub/redisSub_test.go index cb59a88891..9b019344d6 100644 --- a/internal/io/redis/pubsub/redisSub_test.go +++ b/internal/io/redis/pubsub/redisSub_test.go @@ -1,4 +1,4 @@ -// Copyright 2023-2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import ( _ "go.nanomsg.org/mangos/v3/transport/ipc" mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" + "github.com/lf-edge/ekuiper/internal/pkg/util" "github.com/lf-edge/ekuiper/pkg/api" ) @@ -104,14 +105,14 @@ func TestSourceDecompressorError(t *testing.T) { } func TestSourcePingRedisError(t *testing.T) { - s := RedisSub() + s := RedisSub().(util.PingableConn) prop := map[string]interface{}{ "address": "", "db": 0, "channels": []string{DefaultChannel}, } expErrStr := fmt.Sprintf("Ping Redis failed with error") - err := s.Configure("new", prop) + err := s.Ping("new", prop) if err == nil { t.Errorf("should have error") return