Skip to content

Commit

Permalink
(OraklNode) Async fetcher (#1523)
Browse files Browse the repository at this point in the history
* feat: redis list functionalities

* fix: fix shadow

* fix: update based on feedback

* feat: separation of logics

* fix: remove unused func

* feat: app control update

* feat: add missing `startAll` usage

* fix: fix vet shadowing

* feat: use generic for redis funcs

* fix: remove duplicate functions

* docs: fix comment

* test: wip

* test: test code for fetcher utils

* test: wip

* fix: run skipped tests

* test: nolint test codes

* fix: fix vet err

* feat: implement expiration for latest feed data

* fix: fix shadowing

* fix: remove continue inside redis iteration
  • Loading branch information
nick-bisonai authored May 27, 2024
1 parent 4e28a3c commit fa8ec13
Show file tree
Hide file tree
Showing 15 changed files with 1,198 additions and 828 deletions.
3 changes: 3 additions & 0 deletions node/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ DELEGATOR_URL=
SIGNER_PK=
ENCRYPT_PASSWORD=


# `baobab` or `cypress`, defaults to baobab
CHAIN=

Expand All @@ -19,6 +20,8 @@ VAULT_ROLE=
VAULT_SECRET_PATH=
VAULT_KEY_NAME=

# (optional) interval for streaming feed_data from redis -> pgsql, defaults to 10s
FEED_DATA_STREAM_INTERVAL=
# (optional) required if wallets table is empty
KLAYTN_REPORTER_PK=
# (optional) required to run klaytn_helper test
Expand Down
2 changes: 1 addition & 1 deletion node/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ENCRYPT_PASSWORD=anything
CHAIN=test
KLAYTN_REPORTER_PK=
SIGNER_PK=

FEED_DATA_STREAM_INTERVAL=10s

# this address is dummy contract in baobab
SUBMISSION_PROXY_CONTRACT=0x284E7E442d64108Bd593Ec4b41538dCE5aEdA858
Expand Down
67 changes: 57 additions & 10 deletions node/pkg/db/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,55 @@ func MSet(ctx context.Context, values map[string]string) error {
return err
}

var pairs []interface{}
var pairs []any
for key, value := range values {
pairs = append(pairs, key, value)
}
return rdbConn.MSet(ctx, pairs...).Err()
}

func MSetObject(ctx context.Context, values map[string]any) error {
stringMap := make(map[string]string)
rdbConn, err := GetRedisConn(ctx)
if err != nil {
return err
}

var pairs []any
for key, value := range values {
data, err := json.Marshal(value)
if err != nil {
log.Error().Err(err).Msg("Error marshalling object")
return err
}
stringMap[key] = string(data)
pairs = append(pairs, key, string(data))
}
return MSet(ctx, stringMap)

return rdbConn.MSet(ctx, pairs...).Err()
}

func MSetObjectWithExp(ctx context.Context, values map[string]any, exp time.Duration) error {
rdbConn, err := GetRedisConn(ctx)
if err != nil {
return err
}

var pairs []any
for key, value := range values {
data, jsonMarshalErr := json.Marshal(value)
if jsonMarshalErr != nil {
log.Error().Err(jsonMarshalErr).Msg("Error marshalling object")
return jsonMarshalErr
}
pairs = append(pairs, key, string(data))
}

pipe := rdbConn.TxPipeline()
pipe.MSet(ctx, pairs...)
for key := range values {
pipe.Expire(ctx, key, exp)
}
_, err = pipe.Exec(ctx)
return err
}

func Set(ctx context.Context, key string, value string, exp time.Duration) error {
Expand Down Expand Up @@ -160,6 +191,24 @@ func LRange(ctx context.Context, key string, start int64, end int64) ([]string,
return rdbConn.LRange(ctx, key, start, end).Result()
}

func LRangeObject[T any](ctx context.Context, key string, start int64, end int64) ([]T, error) {
data, err := LRange(ctx, key, start, end)
if err != nil {
log.Error().Err(err).Msg("Error getting range")
return nil, err
}

results := make([]T, len(data))
for i, d := range data {
err = json.Unmarshal([]byte(d), &results[i])
if err != nil {
log.Error().Err(err).Msg("Error unmarshalling object")
return nil, err
}
}
return results, nil
}

func LPush(ctx context.Context, key string, values ...any) error {
rdbConn, err := GetRedisConn(ctx)
if err != nil {
Expand All @@ -169,7 +218,7 @@ func LPush(ctx context.Context, key string, values ...any) error {
return rdbConn.LPush(ctx, key, values...).Err()
}

func LPushObject(ctx context.Context, key string, values []any) error {
func LPushObject[T any](ctx context.Context, key string, values []T) error {
stringValues := make([]interface{}, len(values))
for i, v := range values {
data, err := json.Marshal(v)
Expand Down Expand Up @@ -208,15 +257,13 @@ func PopAllObject[T any](ctx context.Context, key string) ([]T, error) {
return nil, err
}

results := []T{}
for _, d := range data {
var t T
err = json.Unmarshal([]byte(d), &t)
results := make([]T, len(data))
for i, d := range data {
err = json.Unmarshal([]byte(d), &results[i])
if err != nil {
log.Error().Err(err).Msg("Error unmarshalling object")
return nil, err
}
results = append(results, t)
}
return results, nil
}
Expand Down
60 changes: 52 additions & 8 deletions node/pkg/db/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,10 @@ func TestLPushObject(t *testing.T) {
}

key := "testKey"
values := []any{
TestStruct{ID: 1, Name: "Test1"},
TestStruct{ID: 2, Name: "Test2"},
TestStruct{ID: 3, Name: "Test3"},
values := []TestStruct{
{ID: 1, Name: "Test1"},
{ID: 2, Name: "Test2"},
{ID: 3, Name: "Test3"},
}

err := LPushObject(ctx, key, values)
Expand Down Expand Up @@ -443,10 +443,10 @@ func TestPopAllObject(t *testing.T) {
}

key := "testKey"
values := []any{
TestStruct{ID: 1, Name: "Test1"},
TestStruct{ID: 2, Name: "Test2"},
TestStruct{ID: 3, Name: "Test3"},
values := []TestStruct{
{ID: 1, Name: "Test1"},
{ID: 2, Name: "Test2"},
{ID: 3, Name: "Test3"},
}

// Push objects to the list
Expand Down Expand Up @@ -480,3 +480,47 @@ func TestPopAllObject(t *testing.T) {
t.Errorf("Expected empty list, got %+v", result)
}
}
func TestMSetObjectWithExp(t *testing.T) {
ctx := context.Background()

values := map[string]any{
"key1": "value1",
"key2": "value2",
}

exp := 1 * time.Second

err := MSetObjectWithExp(ctx, values, exp)
if err != nil {
t.Errorf("Error setting objects with expiration: %v", err)
}

// Check if the values were set correctly
for key, value := range values {
gotValue, getValueErr := Get(ctx, key)
if getValueErr != nil {
t.Errorf("Error getting key: %v", getValueErr)
}
expectedValue, marshalErr := json.Marshal(value)
if marshalErr != nil {
t.Errorf("Error marshalling value: %v", marshalErr)
continue
}
if gotValue != string(expectedValue) {
t.Errorf("Value did not match expected. Got %v, expected %v", gotValue, string(expectedValue))
}
}

time.Sleep(1001 * time.Millisecond)

// Check if the values were expired
for key := range values {
gotValue, getValueErr := Get(ctx, key)
if getValueErr == nil || !strings.Contains(getValueErr.Error(), "redis: nil") {
t.Errorf("Expected to have err")
}
if gotValue != "" {
t.Errorf("Expected empty value, got %v", gotValue)
}
}
}
2 changes: 2 additions & 0 deletions node/pkg/error/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ var (
ErrFetcherConvertToBigInt = &CustomError{Service: Fetcher, Code: InternalError, Message: "Failed to convert to big.Int"}
ErrFetcherInvalidInput = &CustomError{Service: Fetcher, Code: InvalidInputError, Message: "Invalid input"}
ErrFetcherDivisionByZero = &CustomError{Service: Fetcher, Code: InternalError, Message: "Division by zero"}
ErrCollectorCancelNotFound = &CustomError{Service: Fetcher, Code: InternalError, Message: "Collector cancel function not found"}
ErrStreamerCancelNotFound = &CustomError{Service: Fetcher, Code: InternalError, Message: "Streamer cancel function not found"}

ErrLibP2pEmptyNonLocalAddress = &CustomError{Service: Others, Code: InternalError, Message: "Host has no non-local addresses"}
ErrLibP2pAddressSplitFail = &CustomError{Service: Others, Code: InternalError, Message: "Failed to split address"}
Expand Down
Loading

0 comments on commit fa8ec13

Please sign in to comment.