Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Sentinel] Remove ws delay checker #2319

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 3 additions & 79 deletions node/pkg/checker/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func Start(ctx context.Context) error {
}

go wsHelper.Run(ctx, handleWsMessage)
go filterDelayedWsResponse()

ticker := time.NewTicker(interval)
defer ticker.Stop()
Expand All @@ -111,7 +110,6 @@ func Start(ctx context.Context) error {
defer pool.Close()

alarmCount := map[string]int{}
wsPushAlarmCount := map[string]int{}
wsDelayAlarmCount := map[string]int{}

for range ticker.C {
Expand All @@ -120,7 +118,7 @@ func Start(ctx context.Context) error {
log.Error().Str("Player", "DalChecker").Err(err).Msg("error in checkDal")
}
log.Debug().Msg("checking DAL WebSocket")
checkDalWs(ctx, wsPushAlarmCount, wsDelayAlarmCount)
checkDalWs(wsDelayAlarmCount)
log.Debug().Msg("checked DAL WebSocket")

if err := checkDalTraffic(ctx, pool); err != nil {
Expand Down Expand Up @@ -194,71 +192,13 @@ func checkDal(endpoint string, key string, alarmCount map[string]int) error {
return nil
}

func checkDalWs(ctx context.Context, wsPushAlarmCount, wsDelayAlarmCount map[string]int) {
log.Debug().Msg("checking WebSocket message delays")

if msgs := extractWsDelayAlarms(ctx, wsDelayAlarmCount); len(msgs) > 0 {
alert.SlackAlert(strings.Join(msgs, "\n"))
}

func checkDalWs(wsPushAlarmCount map[string]int) {
log.Debug().Msg("checking WebSocket message push")
if msgs := updateTimes.CheckLastUpdateOffsets(wsPushAlarmCount); len(msgs) > 0 {
alert.SlackAlert(strings.Join(msgs, "\n"))
}
}

func extractWsDelayAlarms(ctx context.Context, alarmCount map[string]int) []string {
log.Debug().Msg("extracting WebSocket alarms")

var rawMsgs = []string{}

select {
case <-ctx.Done():
return nil
case entry := <-wsMsgChan:
rawMsgs = append(rawMsgs, entry)
loop:
for {
select {
case entry := <-wsMsgChan:
rawMsgs = append(rawMsgs, entry)
default:
break loop
}
}
default:
return nil
}

delayedSymbolCount := 0
delayedSymbols := map[string]any{}
resultMsgs := []string{}
for _, entry := range rawMsgs {
match := re.FindStringSubmatch(entry)
symbol := match[1]
delayedSymbols[symbol] = struct{}{}
alarmCount[symbol]++
if alarmCount[symbol] > AlarmOffsetPerPair {
resultMsgs = append(resultMsgs, entry)
alarmCount[symbol] = 0
} else if alarmCount[symbol] > AlarmOffsetInTotal {
delayedSymbolCount++
}
}

if delayedSymbolCount > 0 {
resultMsgs = append(resultMsgs, fmt.Sprintf("Websocket delayed for %d symbols", delayedSymbolCount))
}

for symbol := range alarmCount {
if _, exists := delayedSymbols[symbol]; !exists {
alarmCount[symbol] = 0
}
}

return resultMsgs
}

func isDataEmpty(data *OutgoingSubmissionData) bool {
return data.Symbol == "" || data.Value == "" || data.AggregateTime == "" || data.Proof == "" || data.FeedHash == "" || data.Decimals == ""
}
Expand Down Expand Up @@ -287,26 +227,10 @@ func handleWsMessage(ctx context.Context, data map[string]interface{}) error {
if err != nil {
return err
}
defer updateTimes.Store(wsData.Symbol, time.Now())
wsChan <- wsData
updateTimes.Store(wsData.Symbol, time.Now())
return nil
}

func filterDelayedWsResponse() {
log.Debug().Msg("filtering WebSocket responses")
for entry := range wsChan {
timestamp, err := strconv.ParseInt(entry.AggregateTime, 10, 64)
if err != nil {
log.Error().Err(err).Msg("failed to parse timestamp for WebSocket response")
continue
}

if diff := time.Since(time.UnixMilli(timestamp)); diff > WsDelayThreshold {
wsMsgChan <- fmt.Sprintf("(%s) ws delayed by %v sec", entry.Symbol, diff.Seconds())
}
}
}

func checkDalTraffic(ctx context.Context, pool *pgxpool.Pool) error {
select {
case <-ctx.Done():
Expand Down
111 changes: 0 additions & 111 deletions node/pkg/checker/dal/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,11 @@ package dal

import (
"context"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestExtractWsAlarms(t *testing.T) {
tests := []struct {
name string
messages []string
expectedAlert string
}{
{
name: "Single message",
messages: []string{"(BTC) ws delayed by 6(sec)"},
expectedAlert: "(BTC) ws delayed by 6(sec)",
},
{
name: "Multiple messages",
messages: []string{"(BTC) ws delayed by 6(sec)", "(ETH) ws delayed by 7(sec)"},
expectedAlert: "(BTC) ws delayed by 6(sec)\n(ETH) ws delayed by 7(sec)",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
// Set up context with timeout
for _, msg := range tt.messages {
wsMsgChan <- msg
}
alarmCountMap := map[string]int{
"BTC": 10,
"ETH": 10,
}

// Call the function
msgs := extractWsDelayAlarms(ctx, alarmCountMap)

assert.Equal(t, 0, len(wsMsgChan))

for i, entry := range tt.messages {
assert.Equal(t, entry, msgs[i])
}
})
}
}

func TestHandleWsMessage(t *testing.T) {
// Create a mock context
ctx := context.Background()
Expand Down Expand Up @@ -87,11 +43,6 @@ func TestHandleWsMessage(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Clear the channel before each test
for len(wsChan) > 0 {
<-wsChan
}

// Call the function
err := handleWsMessage(ctx, tt.inputData)

Expand All @@ -100,68 +51,6 @@ func TestHandleWsMessage(t *testing.T) {
assert.Error(t, err)
} else {
assert.NoError(t, err)

// Check the channel for the expected data
select {
case result := <-wsChan:
assert.Equal(t, tt.expected, result)
case <-time.After(1 * time.Second):
t.Fatal("Expected data not received in channel")
}
}
})
}
}

func TestFilterWsReponses(t *testing.T) {
tests := []struct {
name string
wsResponse WsResponse
expectedAlert bool
}{
{
name: "No delay",
wsResponse: WsResponse{
Symbol: "BTC",
AggregateTime: strconv.FormatInt(time.Now().UnixMilli(), 10),
},
expectedAlert: false,
},
{
name: "Delayed response",
wsResponse: WsResponse{
Symbol: "ETH",
AggregateTime: strconv.FormatInt(time.Now().Add(-10*time.Second).UnixMilli(), 10),
},
expectedAlert: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Reset channels
wsChan = make(chan WsResponse, 1)
wsMsgChan = make(chan string, 1)

// Send test data to wsChan
wsChan <- tt.wsResponse

// Run filterWsReponses in a goroutine
go filterDelayedWsResponse()

// Allow some time for the function to process
time.Sleep(100 * time.Millisecond)

// Check if an alert was sent
select {
case msg := <-wsMsgChan:
if !tt.expectedAlert {
t.Errorf("unexpected alert received: %s", msg)
}
default:
if tt.expectedAlert {
t.Error("expected alert not received")
}
}
})
}
Expand Down
4 changes: 0 additions & 4 deletions node/pkg/checker/dal/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dal

import (
"regexp"
"sync"
"time"
)
Expand All @@ -25,12 +24,9 @@ api_key NOT IN (SELECT key from keys WHERE description IN (%s))`
)

var (
wsChan = make(chan WsResponse, 30000)
wsMsgChan = make(chan string, 10000)
updateTimes = &UpdateTimes{
lastUpdates: make(map[string]time.Time),
}
re = regexp.MustCompile(`\(([^)]+)\)`)
)

type WsResponse struct {
Expand Down