Skip to content

Commit

Permalink
feat: restore redis io
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Jul 22, 2024
1 parent bf665da commit 4b4b5e7
Show file tree
Hide file tree
Showing 11 changed files with 1,442 additions and 2 deletions.
29 changes: 29 additions & 0 deletions internal/binder/io/ext_redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build redisdb || !core

package io

import (
"github.com/lf-edge/ekuiper/v2/internal/io/redis"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
)

func init() {
modules.RegisterLookupSource("redis", redis.GetLookupSource)
modules.RegisterSink("redis", redis.GetSink)
modules.RegisterSink("redisPub", redis.RedisPub)
modules.RegisterSource("redisSub", redis.RedisSub)
}
141 changes: 141 additions & 0 deletions internal/io/redis/lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package redis

import (
"encoding/json"
"errors"
"fmt"
"strconv"

"github.com/redis/go-redis/v9"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
)

type conf struct {
// host:port address.
Addr string `json:"addr,omitempty"`
Username string `json:"username,omitempty"`
// Optional password. Must match the password specified in the
Password string `json:"password,omitempty"`
DataType string `json:"dataType,omitempty"`
DB string `json:"datasource,omitempty"`
}

type lookupSource struct {
c *conf
db int
cli *redis.Client
}

func (s *lookupSource) Provision(ctx api.StreamContext, props map[string]any) error {
return s.Validate(props)
}

func (s *lookupSource) Connect(ctx api.StreamContext) error {
logger := ctx.GetLogger()
logger.Debug("Opening redis lookup source")

s.cli = redis.NewClient(&redis.Options{
Addr: s.c.Addr,
Username: s.c.Username,
Password: s.c.Password,
DB: s.db, // use default DB
})
_, err := s.cli.Ping(ctx).Result()
return err
}

func (s *lookupSource) Lookup(ctx api.StreamContext, _ []string, keys []string, values []any) ([]map[string]any, error) {
ctx.GetLogger().Debugf("Lookup redis %v", keys)
if len(keys) != 1 {
return nil, fmt.Errorf("redis lookup only support one key, but got %v", keys)

Check warning on line 66 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L66

Added line #L66 was not covered by tests
}
v := fmt.Sprintf("%v", values[0])
if s.c.DataType == "string" {
res, err := s.cli.Get(ctx, v).Result()
if err != nil {
if err == redis.Nil {
return []map[string]any{}, nil
}
return nil, err

Check warning on line 75 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L75

Added line #L75 was not covered by tests
}
m := make(map[string]any)
err = json.Unmarshal(cast.StringToBytes(res), &m)
if err != nil {
return nil, err

Check warning on line 80 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L80

Added line #L80 was not covered by tests
}
return []map[string]any{m}, nil
} else {
res, err := s.cli.LRange(ctx, v, 0, -1).Result()
if err != nil {
if err == redis.Nil {
return []map[string]any{}, nil

Check warning on line 87 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L86-L87

Added lines #L86 - L87 were not covered by tests
}
return nil, err

Check warning on line 89 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L89

Added line #L89 was not covered by tests
}
ret := make([]map[string]any, 0, len(res))
for _, r := range res {
m := make(map[string]any)
err = json.Unmarshal(cast.StringToBytes(r), &m)
if err != nil {
return nil, err

Check warning on line 96 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L96

Added line #L96 was not covered by tests
}
ret = append(ret, m)
}
return ret, nil
}
}

func (s *lookupSource) Validate(props map[string]any) error {
cfg := &conf{}
err := cast.MapToStruct(props, cfg)
if err != nil {
return err

Check warning on line 108 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L108

Added line #L108 was not covered by tests
}
if cfg.Addr == "" {
return errors.New("redis addr is null")

Check warning on line 111 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L111

Added line #L111 was not covered by tests
}
if cfg.DataType != "string" && cfg.DataType != "list" {
return errors.New("redis dataType must be string or list")

Check warning on line 114 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L114

Added line #L114 was not covered by tests
}
s.db, err = strconv.Atoi(cfg.DB)
if err != nil {
return fmt.Errorf("datasource %s is invalid", cfg.DB)

Check warning on line 118 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L118

Added line #L118 was not covered by tests
}
if s.db < 0 || s.db > 15 {
return fmt.Errorf("redis lookup source db should be in range 0-15")
}
s.c = cfg
return nil
}

func (s *lookupSource) Open(ctx api.StreamContext) error {
ctx.GetLogger().Infof("Opening redis lookup source with conf %v", s.c)
return nil

Check warning on line 129 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L127-L129

Added lines #L127 - L129 were not covered by tests
}

func (s *lookupSource) Close(ctx api.StreamContext) error {
ctx.GetLogger().Infof("Closing redis lookup source")
return s.cli.Close()

Check warning on line 134 in internal/io/redis/lookup.go

View check run for this annotation

Codecov / codecov/patch

internal/io/redis/lookup.go#L132-L134

Added lines #L132 - L134 were not covered by tests
}

func GetLookupSource() api.Source {
return &lookupSource{}
}

var _ api.LookupSource = &lookupSource{}
136 changes: 136 additions & 0 deletions internal/io/redis/lookup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package redis

import (
"fmt"
"testing"

"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/contract/v2/api"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)

func init() {
s, err := miniredis.Run()
if err != nil {
panic(err)
}
addr = "localhost:" + s.Port()
// Mock id key data
s.Set("1", `{"id":1,"name":"John","address":34,"mobile":"334433"}`)
s.Set("2", `{"id":2,"name":"Susan","address":22,"mobile":"666433"}`)
// Mock group key list data
s.Lpush("group1", `{"id":1,"name":"John"}`)
s.Lpush("group1", `{"id":2,"name":"Susan"}`)
s.Lpush("group2", `{"id":3,"name":"Nancy"}`)
s.Lpush("group3", `{"id":4,"name":"Tom"}`)
mr = s
}

// TestSingle test lookup value of a single map
func TestSingle(t *testing.T) {
ctx := mockContext.NewMockContext("test", "tt")
ls := GetLookupSource()
err := ls.Provision(ctx, map[string]any{"addr": addr, "datatype": "string", "datasource": "0"})
if err != nil {
t.Error(err)
return
}
err = ls.Connect(ctx)
if err != nil {
t.Error(err)
return
}
tests := []struct {
value int
result []map[string]any
}{
{
value: 1,
result: []map[string]any{
{"id": float64(1), "name": "John", "address": float64(34), "mobile": "334433"},
},
}, {
value: 2,
result: []map[string]any{
{"id": float64(2), "name": "Susan", "address": float64(22), "mobile": "666433"},
},
}, {
value: 3,
result: []map[string]any{},
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
actual, err := ls.(api.LookupSource).Lookup(ctx, []string{}, []string{"id"}, []any{tt.value})
assert.NoError(t, err)
assert.Equal(t, tt.result, actual)
})
}
}

func TestList(t *testing.T) {
ctx := mockContext.NewMockContext("test", "tt")
ls := GetLookupSource()
err := ls.Provision(ctx, map[string]any{"addr": addr, "datatype": "list", "datasource": "0"})
if err != nil {
t.Error(err)
return
}
err = ls.Connect(ctx)
if err != nil {
t.Error(err)
return
}
tests := []struct {
value string
result []map[string]any
}{
{
value: "group1",
result: []map[string]any{
{"id": float64(2), "name": "Susan"},
{"id": float64(1), "name": "John"},
},
}, {
value: "group2",
result: []map[string]any{
{"id": float64(3), "name": "Nancy"},
},
}, {
value: "group4",
result: []map[string]any{},
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
actual, err := ls.(api.LookupSource).Lookup(ctx, []string{}, []string{"id"}, []any{tt.value})
assert.NoError(t, err)
assert.Equal(t, tt.result, actual)
})
}
}

func TestLookupSourceDB(t *testing.T) {
ctx := mockContext.NewMockContext("test", "tt")
s := &lookupSource{}
err := s.Provision(ctx, map[string]any{"addr": addr, "datatype": "string", "datasource": "199"})
require.Error(t, err)
require.Equal(t, "redis lookup source db should be in range 0-15", err.Error())
}
Loading

0 comments on commit 4b4b5e7

Please sign in to comment.