-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatcher.go
108 lines (86 loc) · 1.88 KB
/
batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package batcher
import (
"errors"
"sync"
"time"
)
// Batcher is a structure that collects items and flushes buffer
// either when the buffer fills up or when timeout is reached.
// See examples/batcher/batcher.go for usage example.
type Batcher[T any] struct {
ticker *time.Ticker
capacity int
buffer []T
output chan []T
mu sync.Mutex
closed bool
}
// New creates a new instance of Batcher.
func New[T any](d time.Duration, capacity int) *Batcher[T] {
if capacity <= 0 {
panic("capacity must be greater than 0")
}
obj := &Batcher[T]{}
obj.ticker = time.NewTicker(d)
obj.capacity = capacity
obj.buffer = make([]T, 0, obj.capacity)
obj.output = make(chan []T, 10) // number of batches to keep
go func() {
for range obj.ticker.C {
obj.mu.Lock()
// the object may be closed while we were waiting for the lock
// it is safe to exit, the last batch has been already processed
if obj.closed {
obj.mu.Unlock()
return
}
obj.makeBatch()
obj.mu.Unlock()
}
}()
return obj
}
// Add adds an item to the batcher.
func (b *Batcher[T]) Add(item T) {
err := b.AddE(item)
if err != nil {
panic(err)
}
}
// AddE adds an item to the batcher.
// It returns an error if the batcher is closed.
func (b *Batcher[T]) AddE(item T) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return errors.New("batcher is closed")
}
b.buffer = append(b.buffer, item)
if len(b.buffer) >= b.capacity {
b.makeBatch()
}
return nil
}
// C returns a channel that will receive batches.
func (b *Batcher[T]) C() <-chan []T {
return b.output
}
// Close closes the batcher.
func (b *Batcher[T]) Close() {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
b.ticker.Stop()
b.closed = true
b.makeBatch()
close(b.output)
}
func (b *Batcher[T]) makeBatch() {
if len(b.buffer) == 0 {
return
}
b.output <- b.buffer
b.buffer = make([]T, 0, b.capacity)
}