diff --git a/buffer/buffer.go b/buffer/buffer.go index f4198ba..7d19d0c 100644 --- a/buffer/buffer.go +++ b/buffer/buffer.go @@ -44,7 +44,7 @@ func (b *buffer) clear() { b.items = make([]interface{}, b.cap) go func() { b.chunks <- entry{ - items: events, + items: events, retries: cap(b.chunks), } }() @@ -75,7 +75,7 @@ type Config struct { } type entry struct { - items []interface{} + items []interface{} retries int } @@ -115,15 +115,18 @@ func (b *buffer) consumer(c Config) { } }() for events := range b.chunks { - err := c.OnOverflow(events.items) - if err != nil { - go func(events entry) { + go func(events entry) { + err := c.OnOverflow(events.items) + if err != nil { + go func(events entry) { events.retries-- if events.retries >= 0 { time.Sleep(b.backoff) b.chunks <- events } - }(events) - } + }(events) + } + }(events) + } }