Skip to content

Commit

Permalink
feat: support redis/influx ping (#2850)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer authored and ngjaying committed May 24, 2024
1 parent 56111c5 commit c121003
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 1 deletion.
14 changes: 14 additions & 0 deletions extensions/sinks/influx/ext/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ func (m *influxSink) Open(ctx api.StreamContext) error {
return err
}

func (m *influxSink) Ping(_ string, props map[string]interface{}) error {
if err := m.Configure(props); err != nil {
return err
}
defer func() {
if m.cli != nil {
m.cli.Close()
}
}()
// Test connection. Put it here to avoid server connection when running test in Configure
_, _, err := m.cli.Ping(time.Second * 10)
return err
}

func (m *influxSink) Collect(ctx api.StreamContext, data any) error {
logger := ctx.GetLogger()
err := m.transformPoints(ctx, data)
Expand Down
4 changes: 3 additions & 1 deletion extensions/sinks/influx2/ext/influx2.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ func (m *influxSink2) Ping(_ string, props map[string]interface{}) error {
return err
}
defer func() {
m.cli.Close()
if m.cli != nil {
m.cli.Close()
}
}()
pingable, err := m.cli.Ping(context.Background())
if err != nil || !pingable {
Expand Down
10 changes: 10 additions & 0 deletions internal/io/redis/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ type lookupSource struct {
cli *redis.Client
}

func (s *lookupSource) Ping(dataSource string, props map[string]interface{}) error {
err := s.Configure(dataSource, props)
defer func() {
if s.cli != nil {
s.cli.Close()
}
}()
return err
}

func (s *lookupSource) Validate(props map[string]interface{}) error {
cfg := &conf{}
err := cast.MapToStruct(props, cfg)
Expand Down
9 changes: 9 additions & 0 deletions internal/io/redis/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,12 @@ func TestLookupSourceDB(t *testing.T) {
require.Error(t, err)
require.Equal(t, "redis lookup source db should be in range 0-15", err.Error())
}

func TestLookUpPingRedis(t *testing.T) {
s := &lookupSource{}
prop := map[string]interface{}{
"addr": addr,
"datatype": "string",
}
require.NoError(t, s.Ping("1", prop))
}
18 changes: 18 additions & 0 deletions internal/io/redis/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package redis

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -78,6 +79,23 @@ func (r *RedisSink) Validate(props map[string]interface{}) error {
return nil
}

func (r *RedisSink) Ping(dataSource string, props map[string]interface{}) error {
if err := r.Configure(props); err != nil {
return err
}
cli := redis.NewClient(&redis.Options{
Addr: r.c.Addr,
Username: r.c.Username,
Password: r.c.Password,
DB: r.c.Db, // use default DB
})
_, err := cli.Ping(context.Background()).Result()
defer func() {
cli.Close()
}()
return err
}

func (r *RedisSink) Configure(props map[string]interface{}) error {
return r.Validate(props)
}
Expand Down

0 comments on commit c121003

Please sign in to comment.