-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaggregator.go
43 lines (41 loc) · 965 Bytes
/
aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package pipeline
import (
"context"
)
// AggregatorFunc uses the given function to aggregate n Inputs to one Output.
func AggregatorFunc[Input, Output any](ctx context.Context, input <-chan Input, count int, f func(ctx context.Context, input ...Input) Output) <-chan Output {
out := make(chan Output)
go func(ctx context.Context, input <-chan Input, out chan<- Output, count int, f func(ctx context.Context, input ...Input) Output) {
defer close(out)
var inputs []Input
dump := func() {
if len(inputs) > 0 {
out <- f(ctx, inputs...)
}
}
for {
select {
case <-ctx.Done():
dump()
return
case val, ok := <-input:
if !ok {
dump()
return
}
inputs = append(inputs, val)
if len(inputs) == count {
output := f(ctx, inputs...)
select {
case <-ctx.Done():
dump()
return
case out <- output:
inputs = nil
}
}
}
}
}(ctx, input, out, count, f)
return out
}