From 4c5244c513bcc110a86cae4fab93d3bb0834c687 Mon Sep 17 00:00:00 2001 From: nicolaasuni-vonage Date: Wed, 11 Sep 2024 14:22:13 +0100 Subject: [PATCH] New Valkey client package --- VERSION | 2 +- pkg/valkey/client.go | 206 ++++++++++ pkg/valkey/client_test.go | 818 +++++++++++++++++++++++++++++++++++++ pkg/valkey/config.go | 47 +++ pkg/valkey/config_test.go | 62 +++ pkg/valkey/options.go | 36 ++ pkg/valkey/options_test.go | 61 +++ pkg/valkey/valkey.go | 14 + 8 files changed, 1245 insertions(+), 1 deletion(-) create mode 100644 pkg/valkey/client.go create mode 100644 pkg/valkey/client_test.go create mode 100644 pkg/valkey/config.go create mode 100644 pkg/valkey/config_test.go create mode 100644 pkg/valkey/options.go create mode 100644 pkg/valkey/options_test.go create mode 100644 pkg/valkey/valkey.go diff --git a/VERSION b/VERSION index 3d94b42a..7a9fecd3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.99.8 +1.100.0 diff --git a/pkg/valkey/client.go b/pkg/valkey/client.go new file mode 100644 index 00000000..8ae365d4 --- /dev/null +++ b/pkg/valkey/client.go @@ -0,0 +1,206 @@ +package valkey + +import ( + "context" + "fmt" + "time" + + "github.com/Vonage/gosrvlib/pkg/encode" + libvalkey "github.com/valkey-io/valkey-go" +) + +// TEncodeFunc is the type of function used to replace the default message encoding function used by SendData(). +type TEncodeFunc func(ctx context.Context, data any) (string, error) + +// TDecodeFunc is the type of function used to replace the default message decoding function used by ReceiveData(). +type TDecodeFunc func(ctx context.Context, msg string, data any) error + +// SrvOptions is an alias for the parent library client options. +type SrvOptions = libvalkey.ClientOption + +// VKMessage is an alias for the parent library Message type. +type VKMessage = libvalkey.PubSubMessage + +// VKClient represents the mockable functions in the parent Valkey Client. +type VKClient = libvalkey.Client + +// VKPubSub represents the mockable functions in the parent Valkey PubSub. +type VKPubSub = libvalkey.Completed + +// Client is a wrapper for the Valkey Client. +type Client struct { + // vkclient is the upstream Client. + vkclient VKClient + + // vkpubsub is the upstream PubSub completed command. + vkpubsub VKPubSub + + // messageEncodeFunc is the function used by SendData() + // to encode and serialize the input data to a string compatible with Valkey. + messageEncodeFunc TEncodeFunc + + // messageDecodeFunc is the function used by ReceiveData() + // to decode a message encoded with messageEncodeFunc to the provided data object. + // The value underlying data must be a pointer to the correct type for the next data item received. + messageDecodeFunc TDecodeFunc +} + +// New creates a new instance of the Valkey client wrapper. +func New(ctx context.Context, srvopt SrvOptions, opts ...Option) (*Client, error) { + cfg, err := loadConfig(ctx, srvopt, opts...) + if err != nil { + return nil, fmt.Errorf("cannot create a new valkey client: %w", err) + } + + if cfg.vkclient == nil { + vkc, err := libvalkey.NewClient(cfg.srvOpts) + if err != nil { + return nil, fmt.Errorf("failed to create Valkey client: %w", err) + } + + cfg.vkclient = &vkc + } + + return &Client{ + vkclient: (*cfg.vkclient), + vkpubsub: (*cfg.vkclient).B().Subscribe().Channel(cfg.channels...).Build().Pin(), + messageEncodeFunc: cfg.messageEncodeFunc, + messageDecodeFunc: cfg.messageDecodeFunc, + }, nil +} + +// Close closes the client. +// All pending calls will be finished. +func (c *Client) Close() { + c.vkclient.Close() +} + +// Set a raw string value for the specified key with an expiration time. +func (c *Client) Set(ctx context.Context, key string, value string, exp time.Duration) error { + err := c.vkclient.Do(ctx, c.vkclient.B().Set().Key(key).Value(value).Ex(exp).Build()).Error() + if err != nil { + return fmt.Errorf("cannot set key: %s %w", key, err) + } + + return nil +} + +// Get retrieves the raw string value of the specified key. +func (c *Client) Get(ctx context.Context, key string) (string, error) { + value, err := c.vkclient.Do(ctx, c.vkclient.B().Get().Key(key).Build()).ToString() + if err != nil { + return "", fmt.Errorf("cannot retrieve key %s: %w", key, err) + } + + return value, nil +} + +// Del deletes the specified key from the datastore. +func (c *Client) Del(ctx context.Context, key string) error { + err := c.vkclient.Do(ctx, c.vkclient.B().Del().Key(key).Build()).Error() + if err != nil { + return fmt.Errorf("cannot delete key: %s %w", key, err) + } + + return nil +} + +// Send publish a raw string value to the specified channel. +func (c *Client) Send(ctx context.Context, channel string, message string) error { + err := c.vkclient.Do(ctx, c.vkclient.B().Publish().Channel(channel).Message(message).Build()).Error() + if err != nil { + return fmt.Errorf("cannot send message to %s channel: %w", channel, err) + } + + return nil +} + +// Receive receives a raw string message from a subscribed channel. +// Returns the channel name and the message value. +func (c *Client) Receive(ctx context.Context) (string, string, error) { + data := VKMessage{} + + err := c.vkclient.Receive(ctx, c.vkpubsub, func(msg VKMessage) { + data = msg + }) + if err != nil { + return "", "", fmt.Errorf("error receiving message: %w", err) + } + + return data.Channel, data.Message, nil +} + +// MessageEncode encodes and serialize the input data to a string. +func MessageEncode(data any) (string, error) { + return encode.Encode(data) //nolint:wrapcheck +} + +// MessageDecode decodes a message encoded with MessageEncode to the provided data object. +// The value underlying data must be a pointer to the correct type for the next data item received. +func MessageDecode(msg string, data any) error { + return encode.Decode(msg, data) //nolint:wrapcheck +} + +// DefaultMessageEncodeFunc is the default function to encode and serialize the input data for SendData(). +func DefaultMessageEncodeFunc(_ context.Context, data any) (string, error) { + return MessageEncode(data) +} + +// DefaultMessageDecodeFunc is the default function to decode a message for ReceiveData(). +// The value underlying data must be a pointer to the correct type for the next data item received. +func DefaultMessageDecodeFunc(_ context.Context, msg string, data any) error { + return MessageDecode(msg, data) +} + +// SetData sets an encoded value for the specified key with an expiration time. +// Zero expiration means the key has no expiration time. +func (c *Client) SetData(ctx context.Context, key string, data any, exp time.Duration) error { + value, err := c.messageEncodeFunc(ctx, data) + if err != nil { + return err + } + + return c.Set(ctx, key, value, exp) +} + +// GetData retrieves an encoded value of the specified key and extract its content in the data parameter. +func (c *Client) GetData(ctx context.Context, key string, data any) error { + value, err := c.Get(ctx, key) + if err != nil { + return err + } + + return c.messageDecodeFunc(ctx, value, data) +} + +// SendData publish an encoded value to the specified channel. +func (c *Client) SendData(ctx context.Context, channel string, data any) error { + message, err := c.messageEncodeFunc(ctx, data) + if err != nil { + return err + } + + return c.Send(ctx, channel, message) +} + +// ReceiveData receives an encoded message from a subscribed channel, +// and extract its content in the data parameter. +// Returns the channel name in case of success. +func (c *Client) ReceiveData(ctx context.Context, data any) (string, error) { + channel, value, err := c.Receive(ctx) + if err != nil { + return "", err + } + + return channel, c.messageDecodeFunc(ctx, value, data) +} + +// HealthCheck checks if the current data-store is alive. +func (c *Client) HealthCheck(ctx context.Context) error { + err := c.vkclient.Do(ctx, c.vkclient.B().Ping().Build()).Error() + if err != nil { + return fmt.Errorf("unable to connect to Valkey: %w", err) + } + + return nil +} diff --git a/pkg/valkey/client_test.go b/pkg/valkey/client_test.go new file mode 100644 index 00000000..a2af002f --- /dev/null +++ b/pkg/valkey/client_test.go @@ -0,0 +1,818 @@ +package valkey + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/valkey-io/valkey-go/mock" + "go.uber.org/mock/gomock" +) + +func getTestSrvOptions() SrvOptions { + return SrvOptions{ + InitAddress: []string{"test.valkey.invalid:6379"}, + Username: "test_user", + Password: "test_password", + SelectDB: 0, + } +} + +func TestNew(t *testing.T) { + t.Parallel() + + srvOpts := getTestSrvOptions() + + got, err := New( + context.TODO(), + srvOpts, + WithMessageEncodeFunc(nil), + ) + + require.Error(t, err) + require.Nil(t, got) + + got, err = New( + context.TODO(), + srvOpts, + ) + + require.Error(t, err) + require.Nil(t, got) + + ctrl := gomock.NewController(t) + t.Cleanup(func() { ctrl.Finish() }) + + vkc := mock.NewClient(ctrl) + + got, err = New( + context.TODO(), + srvOpts, + WithValkeyClient(vkc), + ) + + require.NoError(t, err) + require.NotNil(t, got) + + vkc.EXPECT().Close() + + got.Close() +} + +func TestSet(t *testing.T) { + t.Parallel() + + srvOpts := getTestSrvOptions() + + ctrl := gomock.NewController(t) + t.Cleanup(func() { ctrl.Finish() }) + + vkc := mock.NewClient(ctrl) + ctx := context.TODO() + + cli, err := New( + ctx, + srvOpts, + WithValkeyClient(vkc), + ) + + require.NoError(t, err) + require.NotNil(t, cli) + + tests := []struct { + name string + key string + val string + exp time.Duration + mock func() + wantErr bool + }{ + { + name: "success", + key: "key1", + val: "val1", + exp: time.Second, + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("SET", "key1", "val1", "EX", "1"), + ) + }, + wantErr: false, + }, + { + name: "error", + key: "key2", + val: "val2", + exp: 2 * time.Second, + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("SET", "key2", "val2", "EX", "2"), + ).Return(mock.ErrorResult(errors.New("error"))) + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tt.mock() + + err := cli.Set(ctx, tt.key, tt.val, tt.exp) + if tt.wantErr { + require.Error(t, err) + + return + } + + require.NoError(t, err) + }) + } +} + +func TestGet(t *testing.T) { + t.Parallel() + + srvOpts := getTestSrvOptions() + + ctrl := gomock.NewController(t) + t.Cleanup(func() { ctrl.Finish() }) + + vkc := mock.NewClient(ctrl) + ctx := context.TODO() + + cli, err := New( + ctx, + srvOpts, + WithValkeyClient(vkc), + ) + + require.NoError(t, err) + require.NotNil(t, cli) + + tests := []struct { + name string + key string + val string + mock func() + wantErr bool + }{ + { + name: "success", + key: "key1", + val: "val1", + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("GET", "key1"), + ).Return(mock.Result(mock.ValkeyString("val1"))) + }, + wantErr: false, + }, + { + name: "error", + key: "key2", + val: "val2", + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("GET", "key2"), + ).Return(mock.ErrorResult(errors.New("error"))) + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tt.mock() + + val, err := cli.Get(ctx, tt.key) + if tt.wantErr { + require.Error(t, err) + require.Empty(t, val) + + return + } + + require.NoError(t, err) + require.Equal(t, tt.val, val) + }) + } +} + +func TestDel(t *testing.T) { + t.Parallel() + + srvOpts := getTestSrvOptions() + + ctrl := gomock.NewController(t) + t.Cleanup(func() { ctrl.Finish() }) + + vkc := mock.NewClient(ctrl) + ctx := context.TODO() + + cli, err := New( + ctx, + srvOpts, + WithValkeyClient(vkc), + ) + + require.NoError(t, err) + require.NotNil(t, cli) + + tests := []struct { + name string + key string + mock func() + wantErr bool + }{ + { + name: "success", + key: "key1", + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("DEL", "key1"), + ) + }, + wantErr: false, + }, + { + name: "error", + key: "key2", + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("DEL", "key2"), + ).Return(mock.ErrorResult(errors.New("error"))) + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tt.mock() + + err := cli.Del(ctx, tt.key) + if tt.wantErr { + require.Error(t, err) + + return + } + + require.NoError(t, err) + }) + } +} + +func TestSend(t *testing.T) { + t.Parallel() + + srvOpts := getTestSrvOptions() + + ctrl := gomock.NewController(t) + t.Cleanup(func() { ctrl.Finish() }) + + vkc := mock.NewClient(ctrl) + ctx := context.TODO() + + cli, err := New( + ctx, + srvOpts, + WithValkeyClient(vkc), + ) + + require.NoError(t, err) + require.NotNil(t, cli) + + tests := []struct { + name string + channel string + message string + mock func() + wantErr bool + }{ + { + name: "success", + channel: "ch1", + message: "msg1", + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("PUBLISH", "ch1", "msg1"), + ) + }, + wantErr: false, + }, + { + name: "error", + channel: "ch2", + message: "msg2", + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("PUBLISH", "ch2", "msg2"), + ).Return(mock.ErrorResult(errors.New("error"))) + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tt.mock() + + err := cli.Send(ctx, tt.channel, tt.message) + if tt.wantErr { + require.Error(t, err) + + return + } + + require.NoError(t, err) + }) + } +} + +func TestReceive(t *testing.T) { + t.Parallel() + + srvOpts := getTestSrvOptions() + + ctrl := gomock.NewController(t) + t.Cleanup(func() { ctrl.Finish() }) + + vkc := mock.NewClient(ctrl) + ctx := context.TODO() + + cli, err := New( + ctx, + srvOpts, + WithValkeyClient(vkc), + WithChannels("ch1", "ch2"), + ) + + require.NoError(t, err) + require.NotNil(t, cli) + + tests := []struct { + name string + channel string + message string + mock func() + wantErr bool + }{ + { + name: "success", + channel: "ch1", + message: "msg1", + mock: func() { + vkc.EXPECT().Receive( + ctx, + mock.Match("SUBSCRIBE", "ch1", "ch2"), + gomock.Any(), + ).Do(func(_, _ any, fn func(message VKMessage)) { + fn(VKMessage{Channel: "ch1", Message: "msg1"}) + }) + }, + wantErr: false, + }, + { + name: "error", + channel: "ch2", + message: "msg2", + mock: func() { + vkc.EXPECT().Receive( + ctx, + mock.Match("SUBSCRIBE", "ch1", "ch2"), + gomock.Any(), + ).Do(func(_, _ any, fn func(message VKMessage)) { + fn(VKMessage{Channel: "ch2", Message: "msg2"}) + }).Return(errors.New("error")) + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tt.mock() + + channel, message, err := cli.Receive(ctx) + if tt.wantErr { + require.Error(t, err) + require.Empty(t, channel) + require.Empty(t, message) + + return + } + + require.NoError(t, err) + require.Equal(t, tt.channel, channel) + require.Equal(t, tt.message, message) + }) + } +} + +func TestSetData(t *testing.T) { + t.Parallel() + + srvOpts := getTestSrvOptions() + + ctrl := gomock.NewController(t) + t.Cleanup(func() { ctrl.Finish() }) + + vkc := mock.NewClient(ctrl) + ctx := context.TODO() + + cli, err := New( + ctx, + srvOpts, + WithValkeyClient(vkc), + ) + + require.NoError(t, err) + require.NotNil(t, cli) + + type TestData struct { + Alpha string + Beta int + } + + testMsg := TestData{Alpha: "abc123", Beta: -567} + testEncMsg, err := MessageEncode(testMsg) + + require.NoError(t, err) + + tests := []struct { + name string + key string + val any + exp time.Duration + mock func() + wantErr bool + }{ + { + name: "success", + key: "key1", + val: testMsg, + exp: 2 * time.Second, + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("SET", "key1", testEncMsg, "EX", "2"), + ) + }, + wantErr: false, + }, + { + name: "error", + key: "key2", + val: nil, + exp: time.Second, + mock: func() {}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tt.mock() + + err := cli.SetData(ctx, tt.key, tt.val, tt.exp) + if tt.wantErr { + require.Error(t, err) + + return + } + + require.NoError(t, err) + }) + } +} + +func TestGetData(t *testing.T) { + t.Parallel() + + srvOpts := getTestSrvOptions() + + ctrl := gomock.NewController(t) + t.Cleanup(func() { ctrl.Finish() }) + + vkc := mock.NewClient(ctrl) + ctx := context.TODO() + + cli, err := New( + ctx, + srvOpts, + WithValkeyClient(vkc), + ) + + require.NoError(t, err) + require.NotNil(t, cli) + + type TestData struct { + Alpha string + Beta int + } + + tests := []struct { + name string + key string + val any + mock func() + wantErr bool + }{ + { + name: "success", + key: "key1", + val: TestData{Alpha: "abc123", Beta: -567}, + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("GET", "key1"), + ).Return(mock.Result(mock.ValkeyString("KH8DAQEIVGVzdERhdGEB/4AAAQIBBUFscGhhAQwAAQRCZXRhAQQAAAAP/4ABBmFiYzEyMwH+BG0A"))) + }, + wantErr: false, + }, + { + name: "error", + key: "key2", + val: nil, + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("GET", "key2"), + ).Return(mock.Result(mock.ValkeyString("INVALID-CORRUPT-DATA"))) + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tt.mock() + + var data TestData + + err := cli.GetData(ctx, tt.key, &data) + if tt.wantErr { + require.Error(t, err) + require.Empty(t, data) + + return + } + + require.NoError(t, err) + require.Equal(t, tt.val, data) + }) + } +} + +func TestSendData(t *testing.T) { + t.Parallel() + + srvOpts := getTestSrvOptions() + + ctrl := gomock.NewController(t) + t.Cleanup(func() { ctrl.Finish() }) + + vkc := mock.NewClient(ctrl) + ctx := context.TODO() + + cli, err := New( + ctx, + srvOpts, + WithValkeyClient(vkc), + ) + + require.NoError(t, err) + require.NotNil(t, cli) + + type TestData struct { + Alpha string + Beta int + } + + testMsg := TestData{Alpha: "abc123", Beta: -567} + testEncMsg, err := MessageEncode(testMsg) + + require.NoError(t, err) + + tests := []struct { + name string + channel string + message any + mock func() + wantErr bool + }{ + { + name: "success", + channel: "ch1", + message: testMsg, + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("PUBLISH", "ch1", testEncMsg), + ) + }, + wantErr: false, + }, + { + name: "error", + channel: "ch2", + message: nil, + mock: func() {}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tt.mock() + + err := cli.SendData(ctx, tt.channel, tt.message) + if tt.wantErr { + require.Error(t, err) + + return + } + + require.NoError(t, err) + }) + } +} + +func TestReceiveData(t *testing.T) { + t.Parallel() + + srvOpts := getTestSrvOptions() + + ctrl := gomock.NewController(t) + t.Cleanup(func() { ctrl.Finish() }) + + vkc := mock.NewClient(ctrl) + ctx := context.TODO() + + cli, err := New( + ctx, + srvOpts, + WithValkeyClient(vkc), + WithChannels("ch1", "ch2"), + ) + + require.NoError(t, err) + require.NotNil(t, cli) + + type TestData struct { + Alpha string + Beta int + } + + tests := []struct { + name string + channel string + message any + mock func() + wantErr bool + }{ + { + name: "success", + channel: "ch1", + message: TestData{Alpha: "abc123", Beta: -567}, + mock: func() { + vkc.EXPECT().Receive( + ctx, + mock.Match("SUBSCRIBE", "ch1", "ch2"), + gomock.Any(), + ).Do(func(_, _ any, fn func(message VKMessage)) { + fn(VKMessage{Channel: "ch1", Message: "KH8DAQEIVGVzdERhdGEB/4AAAQIBBUFscGhhAQwAAQRCZXRhAQQAAAAP/4ABBmFiYzEyMwH+BG0A"}) + }) + }, + wantErr: false, + }, + { + name: "error", + channel: "ch2", + message: nil, + mock: func() { + vkc.EXPECT().Receive( + ctx, + mock.Match("SUBSCRIBE", "ch1", "ch2"), + gomock.Any(), + ).Do(func(_, _ any, fn func(message VKMessage)) { + fn(VKMessage{Channel: "ch2", Message: "INVALID-CORRUPT-DATA"}) + }).Return(errors.New("error")) + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tt.mock() + + var data TestData + + channel, err := cli.ReceiveData(ctx, &data) + if tt.wantErr { + require.Error(t, err) + require.Empty(t, channel) + require.Empty(t, data) + + return + } + + require.NoError(t, err) + require.Equal(t, tt.channel, channel) + require.Equal(t, tt.message, data) + }) + } +} + +func TestHealthCheck(t *testing.T) { + t.Parallel() + + srvOpts := getTestSrvOptions() + + ctrl := gomock.NewController(t) + t.Cleanup(func() { ctrl.Finish() }) + + vkc := mock.NewClient(ctrl) + ctx := context.TODO() + + cli, err := New( + ctx, + srvOpts, + WithValkeyClient(vkc), + ) + + require.NoError(t, err) + require.NotNil(t, cli) + + tests := []struct { + name string + mock func() + wantErr bool + }{ + { + name: "success", + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("PING"), + ) + }, + wantErr: false, + }, + { + name: "error", + mock: func() { + vkc.EXPECT().Do( + ctx, + mock.Match("PING"), + ).Return(mock.ErrorResult(errors.New("error"))) + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tt.mock() + + err := cli.HealthCheck(ctx) + if tt.wantErr { + require.Error(t, err) + return + } + + require.NoError(t, err) + }) + } +} diff --git a/pkg/valkey/config.go b/pkg/valkey/config.go new file mode 100644 index 00000000..92284514 --- /dev/null +++ b/pkg/valkey/config.go @@ -0,0 +1,47 @@ +package valkey + +import ( + "context" + "errors" + "regexp" +) + +const ( + regexPatternHostPort = `^[^\:]*:[0-9]{2,5}$` +) + +var regexHostPort = regexp.MustCompile(regexPatternHostPort) + +type cfg struct { + messageEncodeFunc TEncodeFunc + messageDecodeFunc TDecodeFunc + srvOpts SrvOptions + channels []string + vkclient *VKClient +} + +func loadConfig(_ context.Context, srvOpts SrvOptions, opts ...Option) (*cfg, error) { + c := &cfg{ + messageEncodeFunc: DefaultMessageEncodeFunc, + messageDecodeFunc: DefaultMessageDecodeFunc, + srvOpts: srvOpts, + } + + if (len(srvOpts.InitAddress) == 0) || (!regexHostPort.MatchString(srvOpts.InitAddress[0])) { + return nil, errors.New("missing or invalid valkey client options") + } + + for _, apply := range opts { + apply(c) + } + + if c.messageEncodeFunc == nil { + return nil, errors.New("missing message encoding function") + } + + if c.messageDecodeFunc == nil { + return nil, errors.New("missing message decoding function") + } + + return c, nil +} diff --git a/pkg/valkey/config_test.go b/pkg/valkey/config_test.go new file mode 100644 index 00000000..3465e686 --- /dev/null +++ b/pkg/valkey/config_test.go @@ -0,0 +1,62 @@ +package valkey + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_loadConfig(t *testing.T) { + t.Parallel() + + srvOpts := SrvOptions{ + InitAddress: []string{"test.valkey.invalid:6379"}, + Username: "test_user", + Password: "test_password", + SelectDB: 0, + } + + got, err := loadConfig( + context.TODO(), + srvOpts, + WithMessageEncodeFunc(DefaultMessageEncodeFunc), + WithMessageDecodeFunc(DefaultMessageDecodeFunc), + WithChannels("test_channel_1", "test_channel_2"), + ) + + require.NoError(t, err) + require.NotNil(t, got) + require.Equal(t, srvOpts.InitAddress, got.srvOpts.InitAddress) + require.Equal(t, srvOpts.Username, got.srvOpts.Username) + require.Equal(t, srvOpts.Password, got.srvOpts.Password) + require.Equal(t, srvOpts.SelectDB, got.srvOpts.SelectDB) + require.NotNil(t, got.messageEncodeFunc) + require.NotNil(t, got.messageDecodeFunc) + + got, err = loadConfig( + context.TODO(), + SrvOptions{}, + ) + + require.Error(t, err) + require.Nil(t, got) + + got, err = loadConfig( + context.TODO(), + srvOpts, + WithMessageEncodeFunc(nil), + ) + + require.Error(t, err) + require.Nil(t, got) + + got, err = loadConfig( + context.TODO(), + srvOpts, + WithMessageDecodeFunc(nil), + ) + + require.Error(t, err) + require.Nil(t, got) +} diff --git a/pkg/valkey/options.go b/pkg/valkey/options.go new file mode 100644 index 00000000..98fbd444 --- /dev/null +++ b/pkg/valkey/options.go @@ -0,0 +1,36 @@ +package valkey + +// Option is a type to allow setting custom client options. +type Option func(*cfg) + +// WithMessageEncodeFunc allow to replace DefaultMessageEncodeFunc. +// This function used by SendData() to encode and serialize the input data to a string. +func WithMessageEncodeFunc(f TEncodeFunc) Option { + return func(c *cfg) { + c.messageEncodeFunc = f + } +} + +// WithMessageDecodeFunc allow to replace DefaultMessageDecodeFunc(). +// This function used by ReceiveData() to decode a message encoded with messageEncodeFunc to the provided data object. +// The value underlying data must be a pointer to the correct type for the next data item received. +func WithMessageDecodeFunc(f TDecodeFunc) Option { + return func(c *cfg) { + c.messageDecodeFunc = f + } +} + +// WithChannels sets the channels to subscribe to and receive data from. +func WithChannels(channels ...string) Option { + return func(c *cfg) { + c.channels = channels + } +} + +// WithValkeyClient overrides the default Valkey client. +// This function is mainly used for testing. +func WithValkeyClient(vkclient VKClient) Option { + return func(c *cfg) { + c.vkclient = &vkclient + } +} diff --git a/pkg/valkey/options_test.go b/pkg/valkey/options_test.go new file mode 100644 index 00000000..69c93249 --- /dev/null +++ b/pkg/valkey/options_test.go @@ -0,0 +1,61 @@ +package valkey + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/valkey-io/valkey-go/mock" + "go.uber.org/mock/gomock" +) + +func Test_WithMessageEncodeFunc(t *testing.T) { + t.Parallel() + + ret := "test_data_001" + f := func(_ context.Context, _ any) (string, error) { + return ret, nil + } + + conf := &cfg{} + WithMessageEncodeFunc(f)(conf) + + d, err := conf.messageEncodeFunc(context.TODO(), "") + require.NoError(t, err) + require.Equal(t, ret, d) +} + +func Test_WithMessageDecodeFunc(t *testing.T) { + t.Parallel() + + f := func(_ context.Context, _ string, _ any) error { + return nil + } + + conf := &cfg{} + WithMessageDecodeFunc(f)(conf) + require.NoError(t, conf.messageDecodeFunc(context.TODO(), "", "")) +} + +func Test_WithChannels(t *testing.T) { + t.Parallel() + + chns := []string{"alpha", "beta", "gamma"} + + conf := &cfg{} + WithChannels(chns...)(conf) + require.Len(t, conf.channels, 3) +} + +func Test_WithValkeyClient(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + client := mock.NewClient(ctrl) + + conf := &cfg{} + WithValkeyClient(client)(conf) + require.Equal(t, client, *conf.vkclient) +} diff --git a/pkg/valkey/valkey.go b/pkg/valkey/valkey.go new file mode 100644 index 00000000..bab4601a --- /dev/null +++ b/pkg/valkey/valkey.go @@ -0,0 +1,14 @@ +/* +Package valkey provides a simple and basic wrapper client for interacting with +Valkey (https://valkey.io), an open source in-memory data store. + +Based on https://github.com/valkey-io/valkey-go, it abstracts away the complexities +of the valkey protocol and provides a simplified interface. + +This package includes functions for setting, getting, and deleting key/value +entries. Additionally, it supports sending and receiving messages from channels. + +It allows to specify custom message encoding and decoding functions, including +serialization and encryption. +*/ +package valkey