Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Sentinel) reduce spamming from dal checks, offsetchecks, and event checks #2236

Merged
merged 2 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions node/pkg/checker/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,28 @@ func (u *UpdateTimes) CheckLastUpdateOffsets(alarmCount map[string]int) []string
u.mu.RLock()
defer u.mu.RUnlock()

websocketNotPushedCount := 0
var messages []string
for symbol, updateTime := range u.lastUpdates {
elapsedTime := time.Since(updateTime)
if elapsedTime > WsPushThreshold {
alarmCount[symbol]++
if alarmCount[symbol] > AlarmOffset {
message := fmt.Sprintf("(%s) WebSocket not pushed for %v seconds", symbol, elapsedTime.Seconds())
if alarmCount[symbol] > AlarmOffsetPerPair {
message := fmt.Sprintf("(%s) Websocket not pushed for %v seconds", symbol, elapsedTime.Seconds())
messages = append(messages, message)
alarmCount[symbol] = 0
} else if alarmCount[symbol] > AlarmOffsetInTotal {
websocketNotPushedCount++
}
} else {
alarmCount[symbol] = 0
}
}

if websocketNotPushedCount > 0 {
messages = append(messages, fmt.Sprintf("Websocket not being pushed for %d symbols", websocketNotPushedCount))
}

return messages
}

Expand Down Expand Up @@ -145,6 +153,7 @@ func checkDal(endpoint string, key string, alarmCount map[string]int) error {
return err
}

totalDelayed := 0
for _, data := range resp {
rawTimestamp, err := strconv.ParseInt(data.AggregateTime, 10, 64)
if err != nil {
Expand All @@ -162,16 +171,22 @@ func checkDal(endpoint string, key string, alarmCount map[string]int) error {

if offset > DelayOffset+networkDelay {
alarmCount[data.Symbol]++
if alarmCount[data.Symbol] > AlarmOffset {
if alarmCount[data.Symbol] > AlarmOffsetPerPair {
msg += fmt.Sprintf("(DAL) %s price update delayed by %s\n", data.Symbol, offset)
alarmCount[data.Symbol] = 0
} else if alarmCount[data.Symbol] > AlarmOffsetInTotal {
totalDelayed++
}
} else {
alarmCount[data.Symbol] = 0
}

}

if totalDelayed > 0 {
msg += fmt.Sprintf("DAL price update delayed by %d symbols\n", totalDelayed)
}

if msg != "" {
alert.SlackAlert(msg)
}
Expand Down Expand Up @@ -222,7 +237,7 @@ func extractWsDelayAlarms(ctx context.Context, alarmCount map[string]int) []stri
symbol := match[1]
delayedSymbols[symbol] = struct{}{}
alarmCount[symbol]++
if alarmCount[symbol] > AlarmOffset {
if alarmCount[symbol] > AlarmOffsetPerPair {
resultMsgs = append(resultMsgs, entry)
alarmCount[symbol] = 0
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/checker/dal/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func TestExtractWsAlarms(t *testing.T) {
wsMsgChan <- msg
}
alarmCountMap := map[string]int{
"BTC": 3,
"ETH": 3,
"BTC": 10,
"ETH": 10,
}

// Call the function
Expand Down
3 changes: 2 additions & 1 deletion node/pkg/checker/dal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
const (
DefaultDalCheckInterval = 10 * time.Second
DelayOffset = 5 * time.Second
AlarmOffset = 3
AlarmOffsetPerPair = 10
AlarmOffsetInTotal = 3
WsDelayThreshold = 9 * time.Second
WsPushThreshold = 5 * time.Second
IgnoreKeys = "test,sentinel,orakl_reporter"
Expand Down
43 changes: 28 additions & 15 deletions node/pkg/checker/event/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"github.com/rs/zerolog/log"
)

const AlarmOffset = 3
const AlarmOffsetPerPair = 10
const AlarmOffsetInTotal = 3
const VRF_EVENT = "vrf_random_words_fulfilled"

var EventCheckInterval time.Duration
Expand Down Expand Up @@ -144,24 +145,35 @@ func checkGroupedFeeds(ctx context.Context, feedsByInterval map[int][]FeedToChec

func checkFeeds(ctx context.Context, feedsToCheck []FeedToCheck) {
msg := ""
totalDelayed := 0
for i := range feedsToCheck {
msg += checkEachFeed(ctx, &feedsToCheck[i])
}
if msg != "" {
alert.SlackAlert(msg)
// msg += checkEachFeed(ctx, &feedsToCheck[i])
offset, err := timeSinceLastFeedEvent(ctx, feedsToCheck[i])
if err != nil {
log.Error().Err(err).Str("feed", feedsToCheck[i].FeedName).Msg("Failed to check feed")
continue
}

if offset > time.Duration(feedsToCheck[i].ExpectedInterval)*time.Millisecond*2 {
log.Warn().Str("feed", feedsToCheck[i].FeedName).Msg(fmt.Sprintf("%s delayed by %s", feedsToCheck[i].FeedName, offset-time.Duration(feedsToCheck[i].ExpectedInterval)*time.Millisecond))
if feedsToCheck[i].LatencyChecked > AlarmOffsetPerPair {
msg += fmt.Sprintf("(REPORTER) %s delayed by %s\n", feedsToCheck[i].FeedName, offset-time.Duration(feedsToCheck[i].ExpectedInterval)*time.Millisecond)
feedsToCheck[i].LatencyChecked = 0
} else if feedsToCheck[i].LatencyChecked > AlarmOffsetInTotal {
totalDelayed++
}
} else {
feedsToCheck[i].LatencyChecked = 0
}
}
}

func checkEachFeed(ctx context.Context, feed *FeedToCheck) string {
result := ""
offset, err := timeSinceLastFeedEvent(ctx, *feed)
if err == nil {
result += handleFeedSubmissionDelay(offset, feed)
} else {
log.Error().Err(err).Str("feed", feed.FeedName).Msg("Failed to check feed")
if totalDelayed > 0 {
msg += fmt.Sprintf("(REPORTER) %d feeds are delayed\n", totalDelayed)
}

return result
if msg != "" {
alert.SlackAlert(msg)
}
}

func checkPorAndVrf(ctx context.Context, checkList *CheckList) {
Expand Down Expand Up @@ -291,12 +303,13 @@ func handleFeedSubmissionDelay(offset time.Duration, feed *FeedToCheck) string {
if offset > time.Duration(feed.ExpectedInterval)*time.Millisecond*2 {
log.Warn().Str("feed", feed.FeedName).Msg(fmt.Sprintf("%s delayed by %s", feed.FeedName, offset-time.Duration(feed.ExpectedInterval)*time.Millisecond))
feed.LatencyChecked++
if feed.LatencyChecked > AlarmOffset {
if feed.LatencyChecked > AlarmOffsetPerPair {
msg += fmt.Sprintf("(REPORTER) %s delayed by %s\n", feed.FeedName, offset-time.Duration(feed.ExpectedInterval)*time.Millisecond)
feed.LatencyChecked = 0
}
} else {
feed.LatencyChecked = 0
}

return msg
}
40 changes: 37 additions & 3 deletions node/pkg/checker/offset/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
)

const (
AlarmOffsetPerPair = 10
AlarmOffsetInTotal = 3

PriceDifferenceThreshold = 0.1
DelayThreshold = 5 * time.Second
DefaultCheckInterval = 5 * time.Minute
Expand Down Expand Up @@ -87,7 +90,13 @@ type LatestAggregateEach struct {
GlobalAggregate float64 `db:"global_aggregate"`
}

var localAggregateAlarmCount map[int32]int
var globalAggregateAlarmCount map[int32]int

func Start(ctx context.Context) error {
localAggregateAlarmCount = make(map[int32]int)
globalAggregateAlarmCount = make(map[int32]int)

serviceDBUrl := secrets.GetSecret("SERVICE_DB_URL")
if serviceDBUrl == "" {
log.Error().Msg("Missing SERVICE_DB_URL")
Expand All @@ -113,7 +122,6 @@ func Start(ctx context.Context) error {
checkOffsets(ctx, serviceDB)
}
}

}

func checkOffsets(ctx context.Context, serviceDB *pgxpool.Pool) {
Expand All @@ -125,6 +133,8 @@ func checkOffsets(ctx context.Context, serviceDB *pgxpool.Pool) {
return
}

localAggregateDelayedOffsetCount := 0
globalAggregateDelayedOffsetCount := 0
for _, config := range loadedConfigs {
log.Debug().Int32("id", config.ID).Str("name", config.Name).Msg("checking config offset")
localAggregateOffsetResult, err := db.QueryRowTransient[OffsetResultEach](ctx, serviceDB, fmt.Sprintf(GetSingleLocalAggregateOffset, config.ID), nil)
Expand All @@ -133,7 +143,15 @@ func checkOffsets(ctx context.Context, serviceDB *pgxpool.Pool) {
return
}
if localAggregateOffsetResult.Delay > DelayThreshold.Seconds() {
msg += fmt.Sprintf("(local aggregate offset delayed) %s: %v seconds\n", config.Name, localAggregateOffsetResult.Delay)
localAggregateAlarmCount[config.ID]++
if localAggregateAlarmCount[config.ID] > AlarmOffsetPerPair {
msg += fmt.Sprintf("(local aggregate offset delayed) %s: %v seconds\n", config.Name, localAggregateOffsetResult.Delay)
localAggregateAlarmCount[config.ID] = 0
} else if localAggregateAlarmCount[config.ID] > AlarmOffsetInTotal {
localAggregateDelayedOffsetCount++
}
} else {
localAggregateAlarmCount[config.ID] = 0
}

globalAggregateOffsetResult, err := db.QueryRowTransient[OffsetResultEach](ctx, serviceDB, fmt.Sprintf(GetSingleGlobalAggregateOffset, config.ID), nil)
Expand All @@ -142,7 +160,15 @@ func checkOffsets(ctx context.Context, serviceDB *pgxpool.Pool) {
return
}
if globalAggregateOffsetResult.Delay > DelayThreshold.Seconds() {
msg += fmt.Sprintf("(global aggregate offset delayed) %s: %v seconds\n", config.Name, globalAggregateOffsetResult.Delay)
globalAggregateAlarmCount[config.ID]++
if globalAggregateAlarmCount[config.ID] > AlarmOffsetPerPair {
msg += fmt.Sprintf("(global aggregate offset delayed) %s: %v seconds\n", config.Name, globalAggregateOffsetResult.Delay)
globalAggregateAlarmCount[config.ID] = 0
} else if globalAggregateAlarmCount[config.ID] > AlarmOffsetInTotal {
globalAggregateDelayedOffsetCount++
}
} else {
globalAggregateAlarmCount[config.ID] = 0
}

latestAggregateResult, err := db.QueryRowTransient[LatestAggregateEach](ctx, serviceDB, fmt.Sprintf(GetSingleLatestAggregates, config.ID, config.ID), nil)
Expand All @@ -158,6 +184,14 @@ func checkOffsets(ctx context.Context, serviceDB *pgxpool.Pool) {
time.Sleep(500 * time.Millisecond) // sleep to reduce pgsql stress
}

if localAggregateDelayedOffsetCount > 0 {
msg += fmt.Sprintf("%d local aggregate offsets delayed\n", localAggregateDelayedOffsetCount)
}

if globalAggregateDelayedOffsetCount > 0 {
msg += fmt.Sprintf("%d global aggregate offsets delayed\n", globalAggregateDelayedOffsetCount)
}

if msg != "" {
alert.SlackAlert(msg)
}
Expand Down