From 265b652d8672af589567f96ef22e6df7779f566e Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 13 Aug 2024 13:39:03 +0900 Subject: [PATCH 1/4] feat: dal request traffic checker --- node/pkg/checker/dal/app.go | 74 ++++++++++++----------------------- node/pkg/checker/dal/types.go | 61 +++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 48 deletions(-) create mode 100644 node/pkg/checker/dal/types.go diff --git a/node/pkg/checker/dal/app.go b/node/pkg/checker/dal/app.go index 824b32dd5..15017ebdc 100644 --- a/node/pkg/checker/dal/app.go +++ b/node/pkg/checker/dal/app.go @@ -6,65 +6,19 @@ import ( "errors" "fmt" "os" - "regexp" "strconv" "strings" - "sync" "time" "bisonai.com/orakl/node/pkg/alert" + "bisonai.com/orakl/node/pkg/db" "bisonai.com/orakl/node/pkg/secrets" "bisonai.com/orakl/node/pkg/utils/request" "bisonai.com/orakl/node/pkg/wss" + "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog/log" ) -const ( - DefaultDalCheckInterval = 10 * time.Second - DelayOffset = 5 * time.Second - AlarmOffset = 3 - WsDelayThreshold = 9 * time.Second - WsPushThreshold = 5 * time.Second -) - -var ( - wsChan = make(chan WsResponse, 30000) - wsMsgChan = make(chan string, 10000) - updateTimes = &UpdateTimes{ - lastUpdates: make(map[string]time.Time), - } - re = regexp.MustCompile(`\(([^)]+)\)`) -) - -type WsResponse struct { - Symbol string `json:"symbol"` - AggregateTime string `json:"aggregateTime"` -} - -type Subscription struct { - Method string `json:"method"` - Params []string `json:"params"` -} - -type Config struct { - Name string `json:"name"` - SubmitInterval *int `json:"submitInterval"` -} - -type OutgoingSubmissionData struct { - Symbol string `json:"symbol"` - Value string `json:"value"` - AggregateTime string `json:"aggregateTime"` - Proof string `json:"proof"` - FeedHash string `json:"feedHash"` - Decimals string `json:"decimals"` -} - -type UpdateTimes struct { - lastUpdates map[string]time.Time - mu sync.RWMutex -} - func (u *UpdateTimes) Store(symbol string, time time.Time) { u.mu.Lock() defer u.mu.Unlock() @@ -137,6 +91,17 @@ func Start(ctx context.Context) error { ticker := time.NewTicker(interval) defer ticker.Stop() + dalDBConnectionUrl := secrets.GetSecret("DAL_DB_CONNECTION_URL") + if dalDBConnectionUrl == "" { + return errors.New("DAL_DB_CONNECTION_URL not found") + } + + pool, err := db.GetTransientPool(ctx, dalDBConnectionUrl) + if err != nil { + return err + } + defer pool.Close() + alarmCount := map[string]int{} wsPushAlarmCount := map[string]int{} wsDelayAlarmCount := map[string]int{} @@ -149,6 +114,8 @@ func Start(ctx context.Context) error { log.Debug().Msg("checking DAL WebSocket") checkDalWs(ctx, wsPushAlarmCount, wsDelayAlarmCount) log.Debug().Msg("checked DAL WebSocket") + + checkDalTraffic(ctx, pool) } return nil } @@ -314,3 +281,14 @@ func filterDelayedWsResponse() { } } } + +func checkDalTraffic(ctx context.Context, pool *pgxpool.Pool) { + result, err := db.QueryRowTransient[Count](ctx, pool, trafficCheckQuery, map[string]any{}) + if err != nil { + log.Error().Err(err).Msg("failed to check DAL traffic") + return + } + if result.Count > trafficThreshold { + alert.SlackAlert(fmt.Sprintf("DAL traffic exceeded threshold: %d", result.Count)) + } +} diff --git a/node/pkg/checker/dal/types.go b/node/pkg/checker/dal/types.go new file mode 100644 index 000000000..14cd2bad2 --- /dev/null +++ b/node/pkg/checker/dal/types.go @@ -0,0 +1,61 @@ +package dal + +import ( + "regexp" + "sync" + "time" +) + +const ( + DefaultDalCheckInterval = 10 * time.Second + DelayOffset = 5 * time.Second + AlarmOffset = 3 + WsDelayThreshold = 9 * time.Second + WsPushThreshold = 5 * time.Second +) + +var ( + wsChan = make(chan WsResponse, 30000) + wsMsgChan = make(chan string, 10000) + updateTimes = &UpdateTimes{ + lastUpdates: make(map[string]time.Time), + } + re = regexp.MustCompile(`\(([^)]+)\)`) + trafficCheckQuery = `select count(1) from rest_calls where +timestamp > current_timestamp - INTERVAL '10 seconds' AND +api_key NOT IN (SELECT key from keys WHERE description IN ('test', 'sentinel', 'orakl_reporter'))` + trafficThreshold = 100 +) + +type WsResponse struct { + Symbol string `json:"symbol"` + AggregateTime string `json:"aggregateTime"` +} + +type Subscription struct { + Method string `json:"method"` + Params []string `json:"params"` +} + +type Config struct { + Name string `json:"name"` + SubmitInterval *int `json:"submitInterval"` +} + +type OutgoingSubmissionData struct { + Symbol string `json:"symbol"` + Value string `json:"value"` + AggregateTime string `json:"aggregateTime"` + Proof string `json:"proof"` + FeedHash string `json:"feedHash"` + Decimals string `json:"decimals"` +} + +type UpdateTimes struct { + lastUpdates map[string]time.Time + mu sync.RWMutex +} + +type Count struct { + Count int `db:"count"` +} From c8ffe3ddc3bd8b5c95a7c8a579d3949fd857ee16 Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 13 Aug 2024 13:42:48 +0900 Subject: [PATCH 2/4] feat: update var to const type --- node/pkg/checker/dal/app.go | 4 ++-- node/pkg/checker/dal/types.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/node/pkg/checker/dal/app.go b/node/pkg/checker/dal/app.go index 15017ebdc..35c965f00 100644 --- a/node/pkg/checker/dal/app.go +++ b/node/pkg/checker/dal/app.go @@ -283,12 +283,12 @@ func filterDelayedWsResponse() { } func checkDalTraffic(ctx context.Context, pool *pgxpool.Pool) { - result, err := db.QueryRowTransient[Count](ctx, pool, trafficCheckQuery, map[string]any{}) + result, err := db.QueryRowTransient[Count](ctx, pool, TrafficCheckQuery, map[string]any{}) if err != nil { log.Error().Err(err).Msg("failed to check DAL traffic") return } - if result.Count > trafficThreshold { + if result.Count > TrafficThreshold { alert.SlackAlert(fmt.Sprintf("DAL traffic exceeded threshold: %d", result.Count)) } } diff --git a/node/pkg/checker/dal/types.go b/node/pkg/checker/dal/types.go index 14cd2bad2..eeacbeeac 100644 --- a/node/pkg/checker/dal/types.go +++ b/node/pkg/checker/dal/types.go @@ -12,6 +12,10 @@ const ( AlarmOffset = 3 WsDelayThreshold = 9 * time.Second WsPushThreshold = 5 * time.Second + TrafficCheckQuery = `select count(1) from rest_calls where +timestamp > current_timestamp - INTERVAL '10 seconds' AND +api_key NOT IN (SELECT key from keys WHERE description IN ('test', 'sentinel', 'orakl_reporter'))` + TrafficThreshold = 100 ) var ( @@ -20,11 +24,7 @@ var ( updateTimes = &UpdateTimes{ lastUpdates: make(map[string]time.Time), } - re = regexp.MustCompile(`\(([^)]+)\)`) - trafficCheckQuery = `select count(1) from rest_calls where -timestamp > current_timestamp - INTERVAL '10 seconds' AND -api_key NOT IN (SELECT key from keys WHERE description IN ('test', 'sentinel', 'orakl_reporter'))` - trafficThreshold = 100 + re = regexp.MustCompile(`\(([^)]+)\)`) ) type WsResponse struct { From e4b524f35ae6bcb52e8c7e763553d88a88f56695 Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 13 Aug 2024 13:51:27 +0900 Subject: [PATCH 3/4] fix: update based on coderabbit feedback --- node/pkg/checker/dal/app.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/node/pkg/checker/dal/app.go b/node/pkg/checker/dal/app.go index 35c965f00..d9d452b9f 100644 --- a/node/pkg/checker/dal/app.go +++ b/node/pkg/checker/dal/app.go @@ -115,7 +115,9 @@ func Start(ctx context.Context) error { checkDalWs(ctx, wsPushAlarmCount, wsDelayAlarmCount) log.Debug().Msg("checked DAL WebSocket") - checkDalTraffic(ctx, pool) + if err := checkDalTraffic(ctx, pool); err != nil { + log.Error().Err(err).Msg("error in checkDalTraffic") + } } return nil } @@ -282,13 +284,19 @@ func filterDelayedWsResponse() { } } -func checkDalTraffic(ctx context.Context, pool *pgxpool.Pool) { - result, err := db.QueryRowTransient[Count](ctx, pool, TrafficCheckQuery, map[string]any{}) - if err != nil { - log.Error().Err(err).Msg("failed to check DAL traffic") - return - } - if result.Count > TrafficThreshold { - alert.SlackAlert(fmt.Sprintf("DAL traffic exceeded threshold: %d", result.Count)) +func checkDalTraffic(ctx context.Context, pool *pgxpool.Pool) error { + select { + case <-ctx.Done(): + return nil + default: + result, err := db.QueryRowTransient[Count](ctx, pool, TrafficCheckQuery, map[string]any{}) + if err != nil { + log.Error().Err(err).Msg("failed to check DAL traffic") + return err + } + if result.Count > TrafficThreshold { + alert.SlackAlert(fmt.Sprintf("DAL traffic exceeded threshold: %d", result.Count)) + } + return nil } } From fd7a4247cc12df898946f674d1c8cae69880aaf3 Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 13 Aug 2024 14:30:22 +0900 Subject: [PATCH 4/4] feat: update query load --- node/pkg/checker/dal/app.go | 12 +++++++++++- node/pkg/checker/dal/types.go | 3 ++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/node/pkg/checker/dal/app.go b/node/pkg/checker/dal/app.go index d9d452b9f..78c9a9216 100644 --- a/node/pkg/checker/dal/app.go +++ b/node/pkg/checker/dal/app.go @@ -289,7 +289,7 @@ func checkDalTraffic(ctx context.Context, pool *pgxpool.Pool) error { case <-ctx.Done(): return nil default: - result, err := db.QueryRowTransient[Count](ctx, pool, TrafficCheckQuery, map[string]any{}) + result, err := db.QueryRowTransient[Count](ctx, pool, getTrafficCheckQuery(), map[string]any{}) if err != nil { log.Error().Err(err).Msg("failed to check DAL traffic") return err @@ -300,3 +300,13 @@ func checkDalTraffic(ctx context.Context, pool *pgxpool.Pool) error { return nil } } + +func getTrafficCheckQuery() string { + keysToIgnore := strings.Split(IgnoreKeys, ",") + modified := []string{} + for _, desc := range keysToIgnore { + modified = append(modified, fmt.Sprintf("'%s'", desc)) + } + + return fmt.Sprintf(TrafficCheckQuery, strings.Join(modified, ", ")) +} diff --git a/node/pkg/checker/dal/types.go b/node/pkg/checker/dal/types.go index eeacbeeac..80af00be0 100644 --- a/node/pkg/checker/dal/types.go +++ b/node/pkg/checker/dal/types.go @@ -12,9 +12,10 @@ const ( AlarmOffset = 3 WsDelayThreshold = 9 * time.Second WsPushThreshold = 5 * time.Second + IgnoreKeys = "test,sentinel,orakl_reporter" TrafficCheckQuery = `select count(1) from rest_calls where timestamp > current_timestamp - INTERVAL '10 seconds' AND -api_key NOT IN (SELECT key from keys WHERE description IN ('test', 'sentinel', 'orakl_reporter'))` +api_key NOT IN (SELECT key from keys WHERE description IN (%s))` TrafficThreshold = 100 )