diff --git a/cmd/axelard/cmd/vald/broadcaster/broadcast.go b/cmd/axelard/cmd/vald/broadcaster/broadcast.go index ac80faecb..81b642650 100644 --- a/cmd/axelard/cmd/vald/broadcaster/broadcast.go +++ b/cmd/axelard/cmd/vald/broadcaster/broadcast.go @@ -62,9 +62,9 @@ func (b *Broadcaster) processBacklog() { msgCount := 0 for { // we cannot split a single task, so take at least one task and then fill up the batch - // until the size limit is reached or no new tasks are in the backlog + // until the size limit is reached batchWouldBeTooLarge := len(batch) > 0 && msgCount+len(b.backlog.Peek().Msgs) > b.batchSizeLimit - if batchWouldBeTooLarge || b.backlog.Len() == 0 { + if batchWouldBeTooLarge { break } @@ -72,6 +72,11 @@ func (b *Broadcaster) processBacklog() { batch = append(batch, task) msgCount += len(task.Msgs) + + // if there are no new tasks in the backlog, stop filling up the batch + if b.backlog.Len() == 0 { + break + } } b.logger.Debug("high traffic; merging batches", "batch_size", msgCount) @@ -369,10 +374,10 @@ func (bl *backlog) Peek() broadcastTask { } func (bl *backlog) Len() int { - bl.loadHead() - + // do not block in this function because it might be used to inform other calls like Peek() if len(bl.head.Msgs) == 0 { - return 0 + // head is not currently loaded + return len(bl.tail) } return 1 + len(bl.tail)