Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce volume of log errors on transmission failure #16498

Merged
merged 1 commit into from
Feb 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions core/services/llo/mercurytransmitter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mercurytransmitter
import (
"context"
"fmt"
"maps"
"slices"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -92,7 +94,10 @@ type server struct {
transmitQueuePushErrorCount prometheus.Counter
transmitConcurrentTransmitGauge prometheus.Gauge

transmitThreadBusyCount atomic.Int32
transmitThreadBusyCount atomic.Int32
consecutiveTransmitErrorCount int
consecutiveTransmitUniqueErrors map[string]struct{}
consecutiveTransmitErrorMu sync.Mutex
}

type QueueConfig interface {
Expand All @@ -103,7 +108,7 @@ type QueueConfig interface {

func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client grpc.Client, orm ORM, serverURL string) *server {
pm := NewPersistenceManager(lggr, orm, serverURL, int(cfg.TransmitQueueMaxSize()), FlushDeletesFrequency, PruneFrequency, cfg.ReaperMaxAge().Duration())
donIDStr := fmt.Sprintf("%d", pm.DonID())
donIDStr := strconv.FormatUint(uint64(pm.DonID()), 10)
var codecLggr logger.Logger
if verboseLogging {
codecLggr = lggr
Expand All @@ -128,6 +133,9 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client
promTransmitQueuePushErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitConcurrentTransmitGauge.WithLabelValues(donIDStr, serverURL),
atomic.Int32{},
0,
make(map[string]struct{}),
sync.Mutex{},
}

return s
Expand Down Expand Up @@ -199,7 +207,7 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup,
return false
} else if err != nil {
s.transmitConnectionErrorCount.Inc()
lggr.Errorw("Transmit report failed", "err", err)
s.rateLimitedLogError(lggr, "Transmit report failed", err.Error())
if ok := s.q.Push(t); !ok {
s.lggr.Error("Failed to push report to transmit queue; queue is closed")
return false
Expand All @@ -217,6 +225,7 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup,
b.Reset()
if res.Error == "" {
s.transmitSuccessCount.Inc()
s.resetConsecutiveTransmitFailures()
lggr.Debug("Transmit report success")
} else {
// We don't need to retry here because the mercury server
Expand All @@ -226,10 +235,11 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup,
case DuplicateReport:
s.transmitSuccessCount.Inc()
s.transmitDuplicateCount.Inc()
s.resetConsecutiveTransmitFailures()
lggr.Debug("Transmit report success; duplicate report")
default:
promTransmitServerErrorCount.WithLabelValues(donIDStr, s.url, strconv.FormatInt(int64(res.Code), 10)).Inc()
lggr.Errorw("Transmit report failed; mercury server returned error", "err", res.Error, "code", res.Code)
s.rateLimitedLogError(lggr, "Transmit report failed; mercury server returned error", fmt.Sprintf("mercury server returned error: %q, statusCode: %d", res.Error, res.Code))
}
}

Expand All @@ -239,6 +249,43 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup,
}
}

func (s *server) rateLimitedLogError(lggr logger.Logger, msg string, err string) {
cnt, uniqueErrors := s.incConsecutiveTransmitErrorCount(err)
switch {
case cnt < 10:
// Log first 10 errors individually
lggr.Errorw(msg, "nErrs", 1, "err", err)
return
case cnt < 10_000:
// Log errors up to 10k in batches of 100
if cnt%100 == 0 {
lggr.Errorw(msg+" (100 failures)", "nErrs", 100, "uniqueErrors", uniqueErrors)
}
return
default:
// After that, log every 10k errors
if cnt%10_000 == 0 {
lggr.Errorw(msg+" (10,000 failures)", "nErrs", 10_000, "uniqueErrors", uniqueErrors)
}
return
}
}

func (s *server) incConsecutiveTransmitErrorCount(errStr string) (int, []string) {
s.consecutiveTransmitErrorMu.Lock()
defer s.consecutiveTransmitErrorMu.Unlock()
s.consecutiveTransmitErrorCount++
s.consecutiveTransmitUniqueErrors[errStr] = struct{}{}
return s.consecutiveTransmitErrorCount, slices.Sorted(maps.Keys(s.consecutiveTransmitUniqueErrors))
}

func (s *server) resetConsecutiveTransmitFailures() {
s.consecutiveTransmitErrorMu.Lock()
s.consecutiveTransmitErrorCount = 0
s.consecutiveTransmitUniqueErrors = make(map[string]struct{})
s.consecutiveTransmitErrorMu.Unlock()
}

func (s *server) transmit(ctx context.Context, t *Transmission) (*rpc.TransmitRequest, *rpc.TransmitResponse, error) {
var payload []byte
var err error
Expand Down
Loading