diff --git a/extensions/sinks/influx/ext/influx.go b/extensions/sinks/influx/ext/influx.go index 4c42b7baab..4ad5451f65 100644 --- a/extensions/sinks/influx/ext/influx.go +++ b/extensions/sinks/influx/ext/influx.go @@ -118,6 +118,20 @@ func (m *influxSink) Open(ctx api.StreamContext) error { return err } +func (m *influxSink) Ping(_ string, props map[string]interface{}) error { + if err := m.Configure(props); err != nil { + return err + } + defer func() { + if m.cli != nil { + m.cli.Close() + } + }() + // Test connection. Put it here to avoid server connection when running test in Configure + _, _, err := m.cli.Ping(time.Second * 10) + return err +} + func (m *influxSink) Collect(ctx api.StreamContext, data any) error { logger := ctx.GetLogger() err := m.transformPoints(ctx, data) diff --git a/extensions/sinks/influx2/ext/influx2.go b/extensions/sinks/influx2/ext/influx2.go index d452d72ea8..9e915b2b98 100644 --- a/extensions/sinks/influx2/ext/influx2.go +++ b/extensions/sinks/influx2/ext/influx2.go @@ -62,7 +62,9 @@ func (m *influxSink2) Ping(_ string, props map[string]interface{}) error { return err } defer func() { - m.cli.Close() + if m.cli != nil { + m.cli.Close() + } }() pingable, err := m.cli.Ping(context.Background()) if err != nil || !pingable { diff --git a/internal/io/redis/lookup.go b/internal/io/redis/lookup.go index 117ffe7323..a4a8bd444e 100644 --- a/internal/io/redis/lookup.go +++ b/internal/io/redis/lookup.go @@ -44,6 +44,16 @@ type lookupSource struct { cli *redis.Client } +func (s *lookupSource) Ping(dataSource string, props map[string]interface{}) error { + err := s.Configure(dataSource, props) + defer func() { + if s.cli != nil { + s.cli.Close() + } + }() + return err +} + func (s *lookupSource) Validate(props map[string]interface{}) error { cfg := &conf{} err := cast.MapToStruct(props, cfg) diff --git a/internal/io/redis/lookup_test.go b/internal/io/redis/lookup_test.go index 4751d20f4c..aced2a0b7f 100644 --- a/internal/io/redis/lookup_test.go +++ b/internal/io/redis/lookup_test.go @@ -162,3 +162,12 @@ func TestLookupSourceDB(t *testing.T) { require.Error(t, err) require.Equal(t, "redis lookup source db should be in range 0-15", err.Error()) } + +func TestLookUpPingRedis(t *testing.T) { + s := &lookupSource{} + prop := map[string]interface{}{ + "addr": addr, + "datatype": "string", + } + require.NoError(t, s.Ping("1", prop)) +} diff --git a/internal/io/redis/sink.go b/internal/io/redis/sink.go index 6111999d2c..f2ddd545a3 100644 --- a/internal/io/redis/sink.go +++ b/internal/io/redis/sink.go @@ -17,6 +17,7 @@ package redis import ( + "context" "encoding/json" "errors" "fmt" @@ -78,6 +79,23 @@ func (r *RedisSink) Validate(props map[string]interface{}) error { return nil } +func (r *RedisSink) Ping(dataSource string, props map[string]interface{}) error { + if err := r.Configure(props); err != nil { + return err + } + cli := redis.NewClient(&redis.Options{ + Addr: r.c.Addr, + Username: r.c.Username, + Password: r.c.Password, + DB: r.c.Db, // use default DB + }) + _, err := cli.Ping(context.Background()).Result() + defer func() { + cli.Close() + }() + return err +} + func (r *RedisSink) Configure(props map[string]interface{}) error { return r.Validate(props) }