Skip to content

Commit

Permalink
fix: do the broadcast backlog length check at the correct location (#…
Browse files Browse the repository at this point in the history
…1336) (#1338)

* fix: do the broadcast backlog length check at the correct location

* Update cmd/axelard/cmd/vald/broadcaster/broadcast.go

* debug logs

* fix Len() blocking

* remove debug logs

Co-authored-by: Christian Gorenflo <[email protected]>
  • Loading branch information
fish-sammy and cgorenflo authored Feb 24, 2022
1 parent 9ef9f45 commit a1d594a
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions cmd/axelard/cmd/vald/broadcaster/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,21 @@ func (b *Broadcaster) processBacklog() {
msgCount := 0
for {
// we cannot split a single task, so take at least one task and then fill up the batch
// until the size limit is reached or no new tasks are in the backlog
// until the size limit is reached
batchWouldBeTooLarge := len(batch) > 0 && msgCount+len(b.backlog.Peek().Msgs) > b.batchSizeLimit
if batchWouldBeTooLarge || b.backlog.Len() == 0 {
if batchWouldBeTooLarge {
break
}

task := b.backlog.Pop()

batch = append(batch, task)
msgCount += len(task.Msgs)

// if there are no new tasks in the backlog, stop filling up the batch
if b.backlog.Len() == 0 {
break
}
}

b.logger.Debug("high traffic; merging batches", "batch_size", msgCount)
Expand Down Expand Up @@ -369,10 +374,10 @@ func (bl *backlog) Peek() broadcastTask {
}

func (bl *backlog) Len() int {
bl.loadHead()

// do not block in this function because it might be used to inform other calls like Peek()
if len(bl.head.Msgs) == 0 {
return 0
// head is not currently loaded
return len(bl.tail)
}

return 1 + len(bl.tail)
Expand Down

0 comments on commit a1d594a

Please sign in to comment.