forked from harlow/kinesis-consumer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
allgroup.go
84 lines (70 loc) · 2.18 KB
/
allgroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package consumer
import (
"context"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
)
// NewAllGroup returns an intitialized AllGroup for consuming
// all shards on a stream
func NewAllGroup(ksis kinesisClient, store Store, streamName string, logger Logger) *AllGroup {
return &AllGroup{
ksis: ksis,
shards: make(map[string]types.Shard),
streamName: streamName,
logger: logger,
Store: store,
}
}
// AllGroup is used to consume all shards from a single consumer. It
// caches a local list of the shards we are already processing
// and routinely polls the stream looking for new shards to process.
type AllGroup struct {
ksis kinesisClient
streamName string
logger Logger
Store
shardMu sync.Mutex
shards map[string]types.Shard
}
// Start is a blocking operation which will loop and attempt to find new
// shards on a regular cadence.
func (g *AllGroup) Start(ctx context.Context, shardc chan types.Shard) {
// Note: while ticker is a rather naive approach to this problem,
// it actually simplifies a few things. i.e. If we miss a new shard
// while AWS is resharding we'll pick it up max 30 seconds later.
// It might be worth refactoring this flow to allow the consumer to
// to notify the broker when a shard is closed. However, shards don't
// necessarily close at the same time, so we could potentially get a
// thundering heard of notifications from the consumer.
var ticker = time.NewTicker(30 * time.Second)
for {
g.findNewShards(ctx, shardc)
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
}
}
}
// findNewShards pulls the list of shards from the Kinesis API
// and uses a local cache to determine if we are already processing
// a particular shard.
func (g *AllGroup) findNewShards(ctx context.Context, shardc chan types.Shard) {
g.shardMu.Lock()
defer g.shardMu.Unlock()
g.logger.Log("[GROUP]", "fetching shards")
shards, err := listShards(ctx, g.ksis, g.streamName)
if err != nil {
g.logger.Log("[GROUP] error:", err)
return
}
for _, shard := range shards {
if _, ok := g.shards[*shard.ShardId]; ok {
continue
}
g.shards[*shard.ShardId] = shard
shardc <- shard
}
}