Skip to content

Commit

Permalink
(Sentinel) reduce spamming from dal checks, offsetchecks, and event c…
Browse files Browse the repository at this point in the history
…hecks (#2236)

* fix: reduce spamming

* fix: fix test
  • Loading branch information
nick-bisonai authored Aug 27, 2024
1 parent 7e77f89 commit cb690d1
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 25 deletions.
23 changes: 19 additions & 4 deletions node/pkg/checker/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,28 @@ func (u *UpdateTimes) CheckLastUpdateOffsets(alarmCount map[string]int) []string
u.mu.RLock()
defer u.mu.RUnlock()

websocketNotPushedCount := 0
var messages []string
for symbol, updateTime := range u.lastUpdates {
elapsedTime := time.Since(updateTime)
if elapsedTime > WsPushThreshold {
alarmCount[symbol]++
if alarmCount[symbol] > AlarmOffset {
message := fmt.Sprintf("(%s) WebSocket not pushed for %v seconds", symbol, elapsedTime.Seconds())
if alarmCount[symbol] > AlarmOffsetPerPair {
message := fmt.Sprintf("(%s) Websocket not pushed for %v seconds", symbol, elapsedTime.Seconds())
messages = append(messages, message)
alarmCount[symbol] = 0
} else if alarmCount[symbol] > AlarmOffsetInTotal {
websocketNotPushedCount++
}
} else {
alarmCount[symbol] = 0
}
}

if websocketNotPushedCount > 0 {
messages = append(messages, fmt.Sprintf("Websocket not being pushed for %d symbols", websocketNotPushedCount))
}

return messages
}

Expand Down Expand Up @@ -145,6 +153,7 @@ func checkDal(endpoint string, key string, alarmCount map[string]int) error {
return err
}

totalDelayed := 0
for _, data := range resp {
rawTimestamp, err := strconv.ParseInt(data.AggregateTime, 10, 64)
if err != nil {
Expand All @@ -162,16 +171,22 @@ func checkDal(endpoint string, key string, alarmCount map[string]int) error {

if offset > DelayOffset+networkDelay {
alarmCount[data.Symbol]++
if alarmCount[data.Symbol] > AlarmOffset {
if alarmCount[data.Symbol] > AlarmOffsetPerPair {
msg += fmt.Sprintf("(DAL) %s price update delayed by %s\n", data.Symbol, offset)
alarmCount[data.Symbol] = 0
} else if alarmCount[data.Symbol] > AlarmOffsetInTotal {
totalDelayed++
}
} else {
alarmCount[data.Symbol] = 0
}

}

if totalDelayed > 0 {
msg += fmt.Sprintf("DAL price update delayed by %d symbols\n", totalDelayed)
}

if msg != "" {
alert.SlackAlert(msg)
}
Expand Down Expand Up @@ -222,7 +237,7 @@ func extractWsDelayAlarms(ctx context.Context, alarmCount map[string]int) []stri
symbol := match[1]
delayedSymbols[symbol] = struct{}{}
alarmCount[symbol]++
if alarmCount[symbol] > AlarmOffset {
if alarmCount[symbol] > AlarmOffsetPerPair {
resultMsgs = append(resultMsgs, entry)
alarmCount[symbol] = 0
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/checker/dal/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func TestExtractWsAlarms(t *testing.T) {
wsMsgChan <- msg
}
alarmCountMap := map[string]int{
"BTC": 3,
"ETH": 3,
"BTC": 10,
"ETH": 10,
}

// Call the function
Expand Down
3 changes: 2 additions & 1 deletion node/pkg/checker/dal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
const (
DefaultDalCheckInterval = 10 * time.Second
DelayOffset = 5 * time.Second
AlarmOffset = 3
AlarmOffsetPerPair = 10
AlarmOffsetInTotal = 3
WsDelayThreshold = 9 * time.Second
WsPushThreshold = 5 * time.Second
IgnoreKeys = "test,sentinel,orakl_reporter"
Expand Down
43 changes: 28 additions & 15 deletions node/pkg/checker/event/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"github.com/rs/zerolog/log"
)

const AlarmOffset = 3
const AlarmOffsetPerPair = 10
const AlarmOffsetInTotal = 3
const VRF_EVENT = "vrf_random_words_fulfilled"

var EventCheckInterval time.Duration
Expand Down Expand Up @@ -144,24 +145,35 @@ func checkGroupedFeeds(ctx context.Context, feedsByInterval map[int][]FeedToChec

func checkFeeds(ctx context.Context, feedsToCheck []FeedToCheck) {
msg := ""
totalDelayed := 0
for i := range feedsToCheck {
msg += checkEachFeed(ctx, &feedsToCheck[i])
}
if msg != "" {
alert.SlackAlert(msg)
// msg += checkEachFeed(ctx, &feedsToCheck[i])
offset, err := timeSinceLastFeedEvent(ctx, feedsToCheck[i])
if err != nil {
log.Error().Err(err).Str("feed", feedsToCheck[i].FeedName).Msg("Failed to check feed")
continue
}

if offset > time.Duration(feedsToCheck[i].ExpectedInterval)*time.Millisecond*2 {
log.Warn().Str("feed", feedsToCheck[i].FeedName).Msg(fmt.Sprintf("%s delayed by %s", feedsToCheck[i].FeedName, offset-time.Duration(feedsToCheck[i].ExpectedInterval)*time.Millisecond))
if feedsToCheck[i].LatencyChecked > AlarmOffsetPerPair {
msg += fmt.Sprintf("(REPORTER) %s delayed by %s\n", feedsToCheck[i].FeedName, offset-time.Duration(feedsToCheck[i].ExpectedInterval)*time.Millisecond)
feedsToCheck[i].LatencyChecked = 0
} else if feedsToCheck[i].LatencyChecked > AlarmOffsetInTotal {
totalDelayed++
}
} else {
feedsToCheck[i].LatencyChecked = 0
}
}
}

func checkEachFeed(ctx context.Context, feed *FeedToCheck) string {
result := ""
offset, err := timeSinceLastFeedEvent(ctx, *feed)
if err == nil {
result += handleFeedSubmissionDelay(offset, feed)
} else {
log.Error().Err(err).Str("feed", feed.FeedName).Msg("Failed to check feed")
if totalDelayed > 0 {
msg += fmt.Sprintf("(REPORTER) %d feeds are delayed\n", totalDelayed)
}

return result
if msg != "" {
alert.SlackAlert(msg)
}
}

func checkPorAndVrf(ctx context.Context, checkList *CheckList) {
Expand Down Expand Up @@ -291,12 +303,13 @@ func handleFeedSubmissionDelay(offset time.Duration, feed *FeedToCheck) string {
if offset > time.Duration(feed.ExpectedInterval)*time.Millisecond*2 {
log.Warn().Str("feed", feed.FeedName).Msg(fmt.Sprintf("%s delayed by %s", feed.FeedName, offset-time.Duration(feed.ExpectedInterval)*time.Millisecond))
feed.LatencyChecked++
if feed.LatencyChecked > AlarmOffset {
if feed.LatencyChecked > AlarmOffsetPerPair {
msg += fmt.Sprintf("(REPORTER) %s delayed by %s\n", feed.FeedName, offset-time.Duration(feed.ExpectedInterval)*time.Millisecond)
feed.LatencyChecked = 0
}
} else {
feed.LatencyChecked = 0
}

return msg
}
40 changes: 37 additions & 3 deletions node/pkg/checker/offset/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
)

const (
AlarmOffsetPerPair = 10
AlarmOffsetInTotal = 3

PriceDifferenceThreshold = 0.1
DelayThreshold = 5 * time.Second
DefaultCheckInterval = 5 * time.Minute
Expand Down Expand Up @@ -87,7 +90,13 @@ type LatestAggregateEach struct {
GlobalAggregate float64 `db:"global_aggregate"`
}

var localAggregateAlarmCount map[int32]int
var globalAggregateAlarmCount map[int32]int

func Start(ctx context.Context) error {
localAggregateAlarmCount = make(map[int32]int)
globalAggregateAlarmCount = make(map[int32]int)

serviceDBUrl := secrets.GetSecret("SERVICE_DB_URL")
if serviceDBUrl == "" {
log.Error().Msg("Missing SERVICE_DB_URL")
Expand All @@ -113,7 +122,6 @@ func Start(ctx context.Context) error {
checkOffsets(ctx, serviceDB)
}
}

}

func checkOffsets(ctx context.Context, serviceDB *pgxpool.Pool) {
Expand All @@ -125,6 +133,8 @@ func checkOffsets(ctx context.Context, serviceDB *pgxpool.Pool) {
return
}

localAggregateDelayedOffsetCount := 0
globalAggregateDelayedOffsetCount := 0
for _, config := range loadedConfigs {
log.Debug().Int32("id", config.ID).Str("name", config.Name).Msg("checking config offset")
localAggregateOffsetResult, err := db.QueryRowTransient[OffsetResultEach](ctx, serviceDB, fmt.Sprintf(GetSingleLocalAggregateOffset, config.ID), nil)
Expand All @@ -133,7 +143,15 @@ func checkOffsets(ctx context.Context, serviceDB *pgxpool.Pool) {
return
}
if localAggregateOffsetResult.Delay > DelayThreshold.Seconds() {
msg += fmt.Sprintf("(local aggregate offset delayed) %s: %v seconds\n", config.Name, localAggregateOffsetResult.Delay)
localAggregateAlarmCount[config.ID]++
if localAggregateAlarmCount[config.ID] > AlarmOffsetPerPair {
msg += fmt.Sprintf("(local aggregate offset delayed) %s: %v seconds\n", config.Name, localAggregateOffsetResult.Delay)
localAggregateAlarmCount[config.ID] = 0
} else if localAggregateAlarmCount[config.ID] > AlarmOffsetInTotal {
localAggregateDelayedOffsetCount++
}
} else {
localAggregateAlarmCount[config.ID] = 0
}

globalAggregateOffsetResult, err := db.QueryRowTransient[OffsetResultEach](ctx, serviceDB, fmt.Sprintf(GetSingleGlobalAggregateOffset, config.ID), nil)
Expand All @@ -142,7 +160,15 @@ func checkOffsets(ctx context.Context, serviceDB *pgxpool.Pool) {
return
}
if globalAggregateOffsetResult.Delay > DelayThreshold.Seconds() {
msg += fmt.Sprintf("(global aggregate offset delayed) %s: %v seconds\n", config.Name, globalAggregateOffsetResult.Delay)
globalAggregateAlarmCount[config.ID]++
if globalAggregateAlarmCount[config.ID] > AlarmOffsetPerPair {
msg += fmt.Sprintf("(global aggregate offset delayed) %s: %v seconds\n", config.Name, globalAggregateOffsetResult.Delay)
globalAggregateAlarmCount[config.ID] = 0
} else if globalAggregateAlarmCount[config.ID] > AlarmOffsetInTotal {
globalAggregateDelayedOffsetCount++
}
} else {
globalAggregateAlarmCount[config.ID] = 0
}

latestAggregateResult, err := db.QueryRowTransient[LatestAggregateEach](ctx, serviceDB, fmt.Sprintf(GetSingleLatestAggregates, config.ID, config.ID), nil)
Expand All @@ -158,6 +184,14 @@ func checkOffsets(ctx context.Context, serviceDB *pgxpool.Pool) {
time.Sleep(500 * time.Millisecond) // sleep to reduce pgsql stress
}

if localAggregateDelayedOffsetCount > 0 {
msg += fmt.Sprintf("%d local aggregate offsets delayed\n", localAggregateDelayedOffsetCount)
}

if globalAggregateDelayedOffsetCount > 0 {
msg += fmt.Sprintf("%d global aggregate offsets delayed\n", globalAggregateDelayedOffsetCount)
}

if msg != "" {
alert.SlackAlert(msg)
}
Expand Down

0 comments on commit cb690d1

Please sign in to comment.