diff --git a/node/pkg/checker/dal/app.go b/node/pkg/checker/dal/app.go index 20a4f1c33..96ba03c4f 100644 --- a/node/pkg/checker/dal/app.go +++ b/node/pkg/checker/dal/app.go @@ -94,7 +94,6 @@ func Start(ctx context.Context) error { } go wsHelper.Run(ctx, handleWsMessage) - go filterDelayedWsResponse() ticker := time.NewTicker(interval) defer ticker.Stop() @@ -111,7 +110,6 @@ func Start(ctx context.Context) error { defer pool.Close() alarmCount := map[string]int{} - wsPushAlarmCount := map[string]int{} wsDelayAlarmCount := map[string]int{} for range ticker.C { @@ -120,7 +118,7 @@ func Start(ctx context.Context) error { log.Error().Str("Player", "DalChecker").Err(err).Msg("error in checkDal") } log.Debug().Msg("checking DAL WebSocket") - checkDalWs(ctx, wsPushAlarmCount, wsDelayAlarmCount) + checkDalWs(wsDelayAlarmCount) log.Debug().Msg("checked DAL WebSocket") if err := checkDalTraffic(ctx, pool); err != nil { @@ -194,71 +192,13 @@ func checkDal(endpoint string, key string, alarmCount map[string]int) error { return nil } -func checkDalWs(ctx context.Context, wsPushAlarmCount, wsDelayAlarmCount map[string]int) { - log.Debug().Msg("checking WebSocket message delays") - - if msgs := extractWsDelayAlarms(ctx, wsDelayAlarmCount); len(msgs) > 0 { - alert.SlackAlert(strings.Join(msgs, "\n")) - } - +func checkDalWs(wsPushAlarmCount map[string]int) { log.Debug().Msg("checking WebSocket message push") if msgs := updateTimes.CheckLastUpdateOffsets(wsPushAlarmCount); len(msgs) > 0 { alert.SlackAlert(strings.Join(msgs, "\n")) } } -func extractWsDelayAlarms(ctx context.Context, alarmCount map[string]int) []string { - log.Debug().Msg("extracting WebSocket alarms") - - var rawMsgs = []string{} - - select { - case <-ctx.Done(): - return nil - case entry := <-wsMsgChan: - rawMsgs = append(rawMsgs, entry) - loop: - for { - select { - case entry := <-wsMsgChan: - rawMsgs = append(rawMsgs, entry) - default: - break loop - } - } - default: - return nil - } - - delayedSymbolCount := 0 - delayedSymbols := map[string]any{} - resultMsgs := []string{} - for _, entry := range rawMsgs { - match := re.FindStringSubmatch(entry) - symbol := match[1] - delayedSymbols[symbol] = struct{}{} - alarmCount[symbol]++ - if alarmCount[symbol] > AlarmOffsetPerPair { - resultMsgs = append(resultMsgs, entry) - alarmCount[symbol] = 0 - } else if alarmCount[symbol] > AlarmOffsetInTotal { - delayedSymbolCount++ - } - } - - if delayedSymbolCount > 0 { - resultMsgs = append(resultMsgs, fmt.Sprintf("Websocket delayed for %d symbols", delayedSymbolCount)) - } - - for symbol := range alarmCount { - if _, exists := delayedSymbols[symbol]; !exists { - alarmCount[symbol] = 0 - } - } - - return resultMsgs -} - func isDataEmpty(data *OutgoingSubmissionData) bool { return data.Symbol == "" || data.Value == "" || data.AggregateTime == "" || data.Proof == "" || data.FeedHash == "" || data.Decimals == "" } @@ -287,26 +227,10 @@ func handleWsMessage(ctx context.Context, data map[string]interface{}) error { if err != nil { return err } - defer updateTimes.Store(wsData.Symbol, time.Now()) - wsChan <- wsData + updateTimes.Store(wsData.Symbol, time.Now()) return nil } -func filterDelayedWsResponse() { - log.Debug().Msg("filtering WebSocket responses") - for entry := range wsChan { - timestamp, err := strconv.ParseInt(entry.AggregateTime, 10, 64) - if err != nil { - log.Error().Err(err).Msg("failed to parse timestamp for WebSocket response") - continue - } - - if diff := time.Since(time.UnixMilli(timestamp)); diff > WsDelayThreshold { - wsMsgChan <- fmt.Sprintf("(%s) ws delayed by %v sec", entry.Symbol, diff.Seconds()) - } - } -} - func checkDalTraffic(ctx context.Context, pool *pgxpool.Pool) error { select { case <-ctx.Done(): diff --git a/node/pkg/checker/dal/app_test.go b/node/pkg/checker/dal/app_test.go index 1f88fc419..879f62a38 100644 --- a/node/pkg/checker/dal/app_test.go +++ b/node/pkg/checker/dal/app_test.go @@ -2,55 +2,11 @@ package dal import ( "context" - "strconv" "testing" - "time" "github.com/stretchr/testify/assert" ) -func TestExtractWsAlarms(t *testing.T) { - tests := []struct { - name string - messages []string - expectedAlert string - }{ - { - name: "Single message", - messages: []string{"(BTC) ws delayed by 6(sec)"}, - expectedAlert: "(BTC) ws delayed by 6(sec)", - }, - { - name: "Multiple messages", - messages: []string{"(BTC) ws delayed by 6(sec)", "(ETH) ws delayed by 7(sec)"}, - expectedAlert: "(BTC) ws delayed by 6(sec)\n(ETH) ws delayed by 7(sec)", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - // Set up context with timeout - for _, msg := range tt.messages { - wsMsgChan <- msg - } - alarmCountMap := map[string]int{ - "BTC": 10, - "ETH": 10, - } - - // Call the function - msgs := extractWsDelayAlarms(ctx, alarmCountMap) - - assert.Equal(t, 0, len(wsMsgChan)) - - for i, entry := range tt.messages { - assert.Equal(t, entry, msgs[i]) - } - }) - } -} - func TestHandleWsMessage(t *testing.T) { // Create a mock context ctx := context.Background() @@ -87,11 +43,6 @@ func TestHandleWsMessage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Clear the channel before each test - for len(wsChan) > 0 { - <-wsChan - } - // Call the function err := handleWsMessage(ctx, tt.inputData) @@ -100,68 +51,6 @@ func TestHandleWsMessage(t *testing.T) { assert.Error(t, err) } else { assert.NoError(t, err) - - // Check the channel for the expected data - select { - case result := <-wsChan: - assert.Equal(t, tt.expected, result) - case <-time.After(1 * time.Second): - t.Fatal("Expected data not received in channel") - } - } - }) - } -} - -func TestFilterWsReponses(t *testing.T) { - tests := []struct { - name string - wsResponse WsResponse - expectedAlert bool - }{ - { - name: "No delay", - wsResponse: WsResponse{ - Symbol: "BTC", - AggregateTime: strconv.FormatInt(time.Now().UnixMilli(), 10), - }, - expectedAlert: false, - }, - { - name: "Delayed response", - wsResponse: WsResponse{ - Symbol: "ETH", - AggregateTime: strconv.FormatInt(time.Now().Add(-10*time.Second).UnixMilli(), 10), - }, - expectedAlert: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Reset channels - wsChan = make(chan WsResponse, 1) - wsMsgChan = make(chan string, 1) - - // Send test data to wsChan - wsChan <- tt.wsResponse - - // Run filterWsReponses in a goroutine - go filterDelayedWsResponse() - - // Allow some time for the function to process - time.Sleep(100 * time.Millisecond) - - // Check if an alert was sent - select { - case msg := <-wsMsgChan: - if !tt.expectedAlert { - t.Errorf("unexpected alert received: %s", msg) - } - default: - if tt.expectedAlert { - t.Error("expected alert not received") - } } }) } diff --git a/node/pkg/checker/dal/types.go b/node/pkg/checker/dal/types.go index 43b621d71..3f62600c2 100644 --- a/node/pkg/checker/dal/types.go +++ b/node/pkg/checker/dal/types.go @@ -1,7 +1,6 @@ package dal import ( - "regexp" "sync" "time" ) @@ -25,12 +24,9 @@ api_key NOT IN (SELECT key from keys WHERE description IN (%s))` ) var ( - wsChan = make(chan WsResponse, 30000) - wsMsgChan = make(chan string, 10000) updateTimes = &UpdateTimes{ lastUpdates: make(map[string]time.Time), } - re = regexp.MustCompile(`\(([^)]+)\)`) ) type WsResponse struct {