From ba6813c6e3ac5975ed459aa2f635f3655695f83c Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 20 Feb 2025 10:58:10 -0500 Subject: [PATCH] Reduce volume of log errors on transmission failure - Since we run 100 transmit threads and retry on a very short backoff (starting at 40ms) transmit errors can be extremely verbose even on short network interruptions. - This PR adds batching so we log first every 100 then every 10,000 errors to avoid overloading logging ingestion. --- .../services/llo/mercurytransmitter/server.go | 55 +++++++++++++++++-- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/core/services/llo/mercurytransmitter/server.go b/core/services/llo/mercurytransmitter/server.go index c572d61d6c1..ebfdd69933d 100644 --- a/core/services/llo/mercurytransmitter/server.go +++ b/core/services/llo/mercurytransmitter/server.go @@ -3,6 +3,8 @@ package mercurytransmitter import ( "context" "fmt" + "maps" + "slices" "strconv" "sync" "sync/atomic" @@ -92,7 +94,10 @@ type server struct { transmitQueuePushErrorCount prometheus.Counter transmitConcurrentTransmitGauge prometheus.Gauge - transmitThreadBusyCount atomic.Int32 + transmitThreadBusyCount atomic.Int32 + consecutiveTransmitErrorCount int + consecutiveTransmitUniqueErrors map[string]struct{} + consecutiveTransmitErrorMu sync.Mutex } type QueueConfig interface { @@ -103,7 +108,7 @@ type QueueConfig interface { func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client grpc.Client, orm ORM, serverURL string) *server { pm := NewPersistenceManager(lggr, orm, serverURL, int(cfg.TransmitQueueMaxSize()), FlushDeletesFrequency, PruneFrequency, cfg.ReaperMaxAge().Duration()) - donIDStr := fmt.Sprintf("%d", pm.DonID()) + donIDStr := strconv.FormatUint(uint64(pm.DonID()), 10) var codecLggr logger.Logger if verboseLogging { codecLggr = lggr @@ -128,6 +133,9 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client promTransmitQueuePushErrorCount.WithLabelValues(donIDStr, serverURL), promTransmitConcurrentTransmitGauge.WithLabelValues(donIDStr, serverURL), atomic.Int32{}, + 0, + make(map[string]struct{}), + sync.Mutex{}, } return s @@ -199,7 +207,7 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup, return false } else if err != nil { s.transmitConnectionErrorCount.Inc() - lggr.Errorw("Transmit report failed", "err", err) + s.rateLimitedLogError(lggr, "Transmit report failed", err.Error()) if ok := s.q.Push(t); !ok { s.lggr.Error("Failed to push report to transmit queue; queue is closed") return false @@ -217,6 +225,7 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup, b.Reset() if res.Error == "" { s.transmitSuccessCount.Inc() + s.resetConsecutiveTransmitFailures() lggr.Debug("Transmit report success") } else { // We don't need to retry here because the mercury server @@ -226,10 +235,11 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup, case DuplicateReport: s.transmitSuccessCount.Inc() s.transmitDuplicateCount.Inc() + s.resetConsecutiveTransmitFailures() lggr.Debug("Transmit report success; duplicate report") default: promTransmitServerErrorCount.WithLabelValues(donIDStr, s.url, strconv.FormatInt(int64(res.Code), 10)).Inc() - lggr.Errorw("Transmit report failed; mercury server returned error", "err", res.Error, "code", res.Code) + s.rateLimitedLogError(lggr, "Transmit report failed; mercury server returned error", fmt.Sprintf("mercury server returned error: %q, statusCode: %d", res.Error, res.Code)) } } @@ -239,6 +249,43 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup, } } +func (s *server) rateLimitedLogError(lggr logger.Logger, msg string, err string) { + cnt, uniqueErrors := s.incConsecutiveTransmitErrorCount(err) + switch { + case cnt < 10: + // Log first 10 errors individually + lggr.Errorw(msg, "nErrs", 1, "err", err) + return + case cnt < 10_000: + // Log errors up to 10k in batches of 100 + if cnt%100 == 0 { + lggr.Errorw(msg+" (100 failures)", "nErrs", 100, "uniqueErrors", uniqueErrors) + } + return + default: + // After that, log every 10k errors + if cnt%10_000 == 0 { + lggr.Errorw(msg+" (10,000 failures)", "nErrs", 10_000, "uniqueErrors", uniqueErrors) + } + return + } +} + +func (s *server) incConsecutiveTransmitErrorCount(errStr string) (int, []string) { + s.consecutiveTransmitErrorMu.Lock() + defer s.consecutiveTransmitErrorMu.Unlock() + s.consecutiveTransmitErrorCount++ + s.consecutiveTransmitUniqueErrors[errStr] = struct{}{} + return s.consecutiveTransmitErrorCount, slices.Sorted(maps.Keys(s.consecutiveTransmitUniqueErrors)) +} + +func (s *server) resetConsecutiveTransmitFailures() { + s.consecutiveTransmitErrorMu.Lock() + s.consecutiveTransmitErrorCount = 0 + s.consecutiveTransmitUniqueErrors = make(map[string]struct{}) + s.consecutiveTransmitErrorMu.Unlock() +} + func (s *server) transmit(ctx context.Context, t *Transmission) (*rpc.TransmitRequest, *rpc.TransmitResponse, error) { var payload []byte var err error