Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Sentinel) DAL request traffic checker #2078

Merged
merged 4 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 34 additions & 48 deletions node/pkg/checker/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,65 +6,19 @@ import (
"errors"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"

"bisonai.com/orakl/node/pkg/alert"
"bisonai.com/orakl/node/pkg/db"
"bisonai.com/orakl/node/pkg/secrets"
"bisonai.com/orakl/node/pkg/utils/request"
"bisonai.com/orakl/node/pkg/wss"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog/log"
)

const (
DefaultDalCheckInterval = 10 * time.Second
DelayOffset = 5 * time.Second
AlarmOffset = 3
WsDelayThreshold = 9 * time.Second
WsPushThreshold = 5 * time.Second
)

var (
wsChan = make(chan WsResponse, 30000)
wsMsgChan = make(chan string, 10000)
updateTimes = &UpdateTimes{
lastUpdates: make(map[string]time.Time),
}
re = regexp.MustCompile(`\(([^)]+)\)`)
)

type WsResponse struct {
Symbol string `json:"symbol"`
AggregateTime string `json:"aggregateTime"`
}

type Subscription struct {
Method string `json:"method"`
Params []string `json:"params"`
}

type Config struct {
Name string `json:"name"`
SubmitInterval *int `json:"submitInterval"`
}

type OutgoingSubmissionData struct {
Symbol string `json:"symbol"`
Value string `json:"value"`
AggregateTime string `json:"aggregateTime"`
Proof string `json:"proof"`
FeedHash string `json:"feedHash"`
Decimals string `json:"decimals"`
}

type UpdateTimes struct {
lastUpdates map[string]time.Time
mu sync.RWMutex
}

func (u *UpdateTimes) Store(symbol string, time time.Time) {
u.mu.Lock()
defer u.mu.Unlock()
Expand Down Expand Up @@ -137,6 +91,17 @@ func Start(ctx context.Context) error {
ticker := time.NewTicker(interval)
defer ticker.Stop()

dalDBConnectionUrl := secrets.GetSecret("DAL_DB_CONNECTION_URL")
if dalDBConnectionUrl == "" {
return errors.New("DAL_DB_CONNECTION_URL not found")
}

pool, err := db.GetTransientPool(ctx, dalDBConnectionUrl)
if err != nil {
return err
}
defer pool.Close()

alarmCount := map[string]int{}
wsPushAlarmCount := map[string]int{}
wsDelayAlarmCount := map[string]int{}
Expand All @@ -149,6 +114,10 @@ func Start(ctx context.Context) error {
log.Debug().Msg("checking DAL WebSocket")
checkDalWs(ctx, wsPushAlarmCount, wsDelayAlarmCount)
log.Debug().Msg("checked DAL WebSocket")

if err := checkDalTraffic(ctx, pool); err != nil {
log.Error().Err(err).Msg("error in checkDalTraffic")
}
}
return nil
}
Expand Down Expand Up @@ -314,3 +283,20 @@ func filterDelayedWsResponse() {
}
}
}

func checkDalTraffic(ctx context.Context, pool *pgxpool.Pool) error {
select {
case <-ctx.Done():
return nil
default:
result, err := db.QueryRowTransient[Count](ctx, pool, TrafficCheckQuery, map[string]any{})
if err != nil {
log.Error().Err(err).Msg("failed to check DAL traffic")
return err
}
if result.Count > TrafficThreshold {
alert.SlackAlert(fmt.Sprintf("DAL traffic exceeded threshold: %d", result.Count))
}
return nil
}
}
61 changes: 61 additions & 0 deletions node/pkg/checker/dal/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package dal

import (
"regexp"
"sync"
"time"
)

const (
DefaultDalCheckInterval = 10 * time.Second
DelayOffset = 5 * time.Second
AlarmOffset = 3
WsDelayThreshold = 9 * time.Second
WsPushThreshold = 5 * time.Second
TrafficCheckQuery = `select count(1) from rest_calls where
timestamp > current_timestamp - INTERVAL '10 seconds' AND
api_key NOT IN (SELECT key from keys WHERE description IN ('test', 'sentinel', 'orakl_reporter'))`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the hard coded values can get lost easily. I wonder if it's possible to filter these in a more dynamic way... but cannot think of a better way atm. But I guess shouldn't be a problem to leave them like this either 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now const with those specified keys can be declared and used to generate query, will make an update

TrafficThreshold = 100
)

var (
wsChan = make(chan WsResponse, 30000)
wsMsgChan = make(chan string, 10000)
updateTimes = &UpdateTimes{
lastUpdates: make(map[string]time.Time),
}
re = regexp.MustCompile(`\(([^)]+)\)`)
)

type WsResponse struct {
Symbol string `json:"symbol"`
AggregateTime string `json:"aggregateTime"`
}

type Subscription struct {
Method string `json:"method"`
Params []string `json:"params"`
}

type Config struct {
Name string `json:"name"`
SubmitInterval *int `json:"submitInterval"`
}

type OutgoingSubmissionData struct {
Symbol string `json:"symbol"`
Value string `json:"value"`
AggregateTime string `json:"aggregateTime"`
Proof string `json:"proof"`
FeedHash string `json:"feedHash"`
Decimals string `json:"decimals"`
}

type UpdateTimes struct {
lastUpdates map[string]time.Time
mu sync.RWMutex
}

type Count struct {
Count int `db:"count"`
}