Skip to content

Commit

Permalink
Reduce volume of log errors on transmission failure (#16498)
Browse files Browse the repository at this point in the history
- Since we run 100 transmit threads and retry on a very short backoff
  (starting at 40ms) transmit errors can be extremely verbose even on
  short network interruptions.
- This PR adds batching so we log first every 100 then every 10,000
  errors to avoid overloading logging ingestion.
  • Loading branch information
samsondav authored Feb 20, 2025
1 parent b7a6083 commit c822f92
Showing 1 changed file with 51 additions and 4 deletions.
55 changes: 51 additions & 4 deletions core/services/llo/mercurytransmitter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mercurytransmitter
import (
"context"
"fmt"
"maps"
"slices"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -92,7 +94,10 @@ type server struct {
transmitQueuePushErrorCount prometheus.Counter
transmitConcurrentTransmitGauge prometheus.Gauge

transmitThreadBusyCount atomic.Int32
transmitThreadBusyCount atomic.Int32
consecutiveTransmitErrorCount int
consecutiveTransmitUniqueErrors map[string]struct{}
consecutiveTransmitErrorMu sync.Mutex
}

type QueueConfig interface {
Expand All @@ -103,7 +108,7 @@ type QueueConfig interface {

func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client grpc.Client, orm ORM, serverURL string) *server {
pm := NewPersistenceManager(lggr, orm, serverURL, int(cfg.TransmitQueueMaxSize()), FlushDeletesFrequency, PruneFrequency, cfg.ReaperMaxAge().Duration())
donIDStr := fmt.Sprintf("%d", pm.DonID())
donIDStr := strconv.FormatUint(uint64(pm.DonID()), 10)
var codecLggr logger.Logger
if verboseLogging {
codecLggr = lggr
Expand All @@ -128,6 +133,9 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client
promTransmitQueuePushErrorCount.WithLabelValues(donIDStr, serverURL),
promTransmitConcurrentTransmitGauge.WithLabelValues(donIDStr, serverURL),
atomic.Int32{},
0,
make(map[string]struct{}),
sync.Mutex{},
}

return s
Expand Down Expand Up @@ -199,7 +207,7 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup,
return false
} else if err != nil {
s.transmitConnectionErrorCount.Inc()
lggr.Errorw("Transmit report failed", "err", err)
s.rateLimitedLogError(lggr, "Transmit report failed", err.Error())
if ok := s.q.Push(t); !ok {
s.lggr.Error("Failed to push report to transmit queue; queue is closed")
return false
Expand All @@ -217,6 +225,7 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup,
b.Reset()
if res.Error == "" {
s.transmitSuccessCount.Inc()
s.resetConsecutiveTransmitFailures()
lggr.Debug("Transmit report success")
} else {
// We don't need to retry here because the mercury server
Expand All @@ -226,10 +235,11 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup,
case DuplicateReport:
s.transmitSuccessCount.Inc()
s.transmitDuplicateCount.Inc()
s.resetConsecutiveTransmitFailures()
lggr.Debug("Transmit report success; duplicate report")
default:
promTransmitServerErrorCount.WithLabelValues(donIDStr, s.url, strconv.FormatInt(int64(res.Code), 10)).Inc()
lggr.Errorw("Transmit report failed; mercury server returned error", "err", res.Error, "code", res.Code)
s.rateLimitedLogError(lggr, "Transmit report failed; mercury server returned error", fmt.Sprintf("mercury server returned error: %q, statusCode: %d", res.Error, res.Code))
}
}

Expand All @@ -239,6 +249,43 @@ func (s *server) spawnTransmitLoop(stopCh services.StopChan, wg *sync.WaitGroup,
}
}

func (s *server) rateLimitedLogError(lggr logger.Logger, msg string, err string) {
cnt, uniqueErrors := s.incConsecutiveTransmitErrorCount(err)
switch {
case cnt < 10:
// Log first 10 errors individually
lggr.Errorw(msg, "nErrs", 1, "err", err)
return
case cnt < 10_000:
// Log errors up to 10k in batches of 100
if cnt%100 == 0 {
lggr.Errorw(msg+" (100 failures)", "nErrs", 100, "uniqueErrors", uniqueErrors)
}
return
default:
// After that, log every 10k errors
if cnt%10_000 == 0 {
lggr.Errorw(msg+" (10,000 failures)", "nErrs", 10_000, "uniqueErrors", uniqueErrors)
}
return
}
}

func (s *server) incConsecutiveTransmitErrorCount(errStr string) (int, []string) {
s.consecutiveTransmitErrorMu.Lock()
defer s.consecutiveTransmitErrorMu.Unlock()
s.consecutiveTransmitErrorCount++
s.consecutiveTransmitUniqueErrors[errStr] = struct{}{}
return s.consecutiveTransmitErrorCount, slices.Sorted(maps.Keys(s.consecutiveTransmitUniqueErrors))
}

func (s *server) resetConsecutiveTransmitFailures() {
s.consecutiveTransmitErrorMu.Lock()
s.consecutiveTransmitErrorCount = 0
s.consecutiveTransmitUniqueErrors = make(map[string]struct{})
s.consecutiveTransmitErrorMu.Unlock()
}

func (s *server) transmit(ctx context.Context, t *Transmission) (*rpc.TransmitRequest, *rpc.TransmitResponse, error) {
var payload []byte
var err error
Expand Down

0 comments on commit c822f92

Please sign in to comment.