From cb229aba70c02718b569637f58b31c05405664ec Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 22 Nov 2024 13:29:30 +0900 Subject: [PATCH 1/4] fix: remove ws delay checker --- node/pkg/checker/dal/app.go | 82 ++----------------------------------- 1 file changed, 3 insertions(+), 79 deletions(-) diff --git a/node/pkg/checker/dal/app.go b/node/pkg/checker/dal/app.go index 20a4f1c33..96ba03c4f 100644 --- a/node/pkg/checker/dal/app.go +++ b/node/pkg/checker/dal/app.go @@ -94,7 +94,6 @@ func Start(ctx context.Context) error { } go wsHelper.Run(ctx, handleWsMessage) - go filterDelayedWsResponse() ticker := time.NewTicker(interval) defer ticker.Stop() @@ -111,7 +110,6 @@ func Start(ctx context.Context) error { defer pool.Close() alarmCount := map[string]int{} - wsPushAlarmCount := map[string]int{} wsDelayAlarmCount := map[string]int{} for range ticker.C { @@ -120,7 +118,7 @@ func Start(ctx context.Context) error { log.Error().Str("Player", "DalChecker").Err(err).Msg("error in checkDal") } log.Debug().Msg("checking DAL WebSocket") - checkDalWs(ctx, wsPushAlarmCount, wsDelayAlarmCount) + checkDalWs(wsDelayAlarmCount) log.Debug().Msg("checked DAL WebSocket") if err := checkDalTraffic(ctx, pool); err != nil { @@ -194,71 +192,13 @@ func checkDal(endpoint string, key string, alarmCount map[string]int) error { return nil } -func checkDalWs(ctx context.Context, wsPushAlarmCount, wsDelayAlarmCount map[string]int) { - log.Debug().Msg("checking WebSocket message delays") - - if msgs := extractWsDelayAlarms(ctx, wsDelayAlarmCount); len(msgs) > 0 { - alert.SlackAlert(strings.Join(msgs, "\n")) - } - +func checkDalWs(wsPushAlarmCount map[string]int) { log.Debug().Msg("checking WebSocket message push") if msgs := updateTimes.CheckLastUpdateOffsets(wsPushAlarmCount); len(msgs) > 0 { alert.SlackAlert(strings.Join(msgs, "\n")) } } -func extractWsDelayAlarms(ctx context.Context, alarmCount map[string]int) []string { - log.Debug().Msg("extracting WebSocket alarms") - - var rawMsgs = []string{} - - select { - case <-ctx.Done(): - return nil - case entry := <-wsMsgChan: - rawMsgs = append(rawMsgs, entry) - loop: - for { - select { - case entry := <-wsMsgChan: - rawMsgs = append(rawMsgs, entry) - default: - break loop - } - } - default: - return nil - } - - delayedSymbolCount := 0 - delayedSymbols := map[string]any{} - resultMsgs := []string{} - for _, entry := range rawMsgs { - match := re.FindStringSubmatch(entry) - symbol := match[1] - delayedSymbols[symbol] = struct{}{} - alarmCount[symbol]++ - if alarmCount[symbol] > AlarmOffsetPerPair { - resultMsgs = append(resultMsgs, entry) - alarmCount[symbol] = 0 - } else if alarmCount[symbol] > AlarmOffsetInTotal { - delayedSymbolCount++ - } - } - - if delayedSymbolCount > 0 { - resultMsgs = append(resultMsgs, fmt.Sprintf("Websocket delayed for %d symbols", delayedSymbolCount)) - } - - for symbol := range alarmCount { - if _, exists := delayedSymbols[symbol]; !exists { - alarmCount[symbol] = 0 - } - } - - return resultMsgs -} - func isDataEmpty(data *OutgoingSubmissionData) bool { return data.Symbol == "" || data.Value == "" || data.AggregateTime == "" || data.Proof == "" || data.FeedHash == "" || data.Decimals == "" } @@ -287,26 +227,10 @@ func handleWsMessage(ctx context.Context, data map[string]interface{}) error { if err != nil { return err } - defer updateTimes.Store(wsData.Symbol, time.Now()) - wsChan <- wsData + updateTimes.Store(wsData.Symbol, time.Now()) return nil } -func filterDelayedWsResponse() { - log.Debug().Msg("filtering WebSocket responses") - for entry := range wsChan { - timestamp, err := strconv.ParseInt(entry.AggregateTime, 10, 64) - if err != nil { - log.Error().Err(err).Msg("failed to parse timestamp for WebSocket response") - continue - } - - if diff := time.Since(time.UnixMilli(timestamp)); diff > WsDelayThreshold { - wsMsgChan <- fmt.Sprintf("(%s) ws delayed by %v sec", entry.Symbol, diff.Seconds()) - } - } -} - func checkDalTraffic(ctx context.Context, pool *pgxpool.Pool) error { select { case <-ctx.Done(): From 32a5bb255811b6749147b1cd2f015294725a2a9c Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 22 Nov 2024 13:33:10 +0900 Subject: [PATCH 2/4] test: remove deprecated tests --- node/pkg/checker/dal/app_test.go | 97 -------------------------------- 1 file changed, 97 deletions(-) diff --git a/node/pkg/checker/dal/app_test.go b/node/pkg/checker/dal/app_test.go index 1f88fc419..4644b142f 100644 --- a/node/pkg/checker/dal/app_test.go +++ b/node/pkg/checker/dal/app_test.go @@ -2,55 +2,12 @@ package dal import ( "context" - "strconv" "testing" "time" "github.com/stretchr/testify/assert" ) -func TestExtractWsAlarms(t *testing.T) { - tests := []struct { - name string - messages []string - expectedAlert string - }{ - { - name: "Single message", - messages: []string{"(BTC) ws delayed by 6(sec)"}, - expectedAlert: "(BTC) ws delayed by 6(sec)", - }, - { - name: "Multiple messages", - messages: []string{"(BTC) ws delayed by 6(sec)", "(ETH) ws delayed by 7(sec)"}, - expectedAlert: "(BTC) ws delayed by 6(sec)\n(ETH) ws delayed by 7(sec)", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - // Set up context with timeout - for _, msg := range tt.messages { - wsMsgChan <- msg - } - alarmCountMap := map[string]int{ - "BTC": 10, - "ETH": 10, - } - - // Call the function - msgs := extractWsDelayAlarms(ctx, alarmCountMap) - - assert.Equal(t, 0, len(wsMsgChan)) - - for i, entry := range tt.messages { - assert.Equal(t, entry, msgs[i]) - } - }) - } -} - func TestHandleWsMessage(t *testing.T) { // Create a mock context ctx := context.Background() @@ -112,57 +69,3 @@ func TestHandleWsMessage(t *testing.T) { }) } } - -func TestFilterWsReponses(t *testing.T) { - tests := []struct { - name string - wsResponse WsResponse - expectedAlert bool - }{ - { - name: "No delay", - wsResponse: WsResponse{ - Symbol: "BTC", - AggregateTime: strconv.FormatInt(time.Now().UnixMilli(), 10), - }, - expectedAlert: false, - }, - { - name: "Delayed response", - wsResponse: WsResponse{ - Symbol: "ETH", - AggregateTime: strconv.FormatInt(time.Now().Add(-10*time.Second).UnixMilli(), 10), - }, - expectedAlert: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Reset channels - wsChan = make(chan WsResponse, 1) - wsMsgChan = make(chan string, 1) - - // Send test data to wsChan - wsChan <- tt.wsResponse - - // Run filterWsReponses in a goroutine - go filterDelayedWsResponse() - - // Allow some time for the function to process - time.Sleep(100 * time.Millisecond) - - // Check if an alert was sent - select { - case msg := <-wsMsgChan: - if !tt.expectedAlert { - t.Errorf("unexpected alert received: %s", msg) - } - default: - if tt.expectedAlert { - t.Error("expected alert not received") - } - } - }) - } -} From 6a0ad28402428330f36b35e5e709751bbbc95d73 Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 22 Nov 2024 13:33:52 +0900 Subject: [PATCH 3/4] fix: remove deprecated variables --- node/pkg/checker/dal/types.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/node/pkg/checker/dal/types.go b/node/pkg/checker/dal/types.go index 43b621d71..c6ff8ebf8 100644 --- a/node/pkg/checker/dal/types.go +++ b/node/pkg/checker/dal/types.go @@ -1,7 +1,6 @@ package dal import ( - "regexp" "sync" "time" ) @@ -26,11 +25,9 @@ api_key NOT IN (SELECT key from keys WHERE description IN (%s))` var ( wsChan = make(chan WsResponse, 30000) - wsMsgChan = make(chan string, 10000) updateTimes = &UpdateTimes{ lastUpdates: make(map[string]time.Time), } - re = regexp.MustCompile(`\(([^)]+)\)`) ) type WsResponse struct { From ad722e3292532ed128ae28755e5c85d9162f048e Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 22 Nov 2024 13:42:26 +0900 Subject: [PATCH 4/4] fix: remove deprecated async tests --- node/pkg/checker/dal/app_test.go | 14 -------------- node/pkg/checker/dal/types.go | 1 - 2 files changed, 15 deletions(-) diff --git a/node/pkg/checker/dal/app_test.go b/node/pkg/checker/dal/app_test.go index 4644b142f..879f62a38 100644 --- a/node/pkg/checker/dal/app_test.go +++ b/node/pkg/checker/dal/app_test.go @@ -3,7 +3,6 @@ package dal import ( "context" "testing" - "time" "github.com/stretchr/testify/assert" ) @@ -44,11 +43,6 @@ func TestHandleWsMessage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Clear the channel before each test - for len(wsChan) > 0 { - <-wsChan - } - // Call the function err := handleWsMessage(ctx, tt.inputData) @@ -57,14 +51,6 @@ func TestHandleWsMessage(t *testing.T) { assert.Error(t, err) } else { assert.NoError(t, err) - - // Check the channel for the expected data - select { - case result := <-wsChan: - assert.Equal(t, tt.expected, result) - case <-time.After(1 * time.Second): - t.Fatal("Expected data not received in channel") - } } }) } diff --git a/node/pkg/checker/dal/types.go b/node/pkg/checker/dal/types.go index c6ff8ebf8..3f62600c2 100644 --- a/node/pkg/checker/dal/types.go +++ b/node/pkg/checker/dal/types.go @@ -24,7 +24,6 @@ api_key NOT IN (SELECT key from keys WHERE description IN (%s))` ) var ( - wsChan = make(chan WsResponse, 30000) updateTimes = &UpdateTimes{ lastUpdates: make(map[string]time.Time), }