Skip to content

Commit

Permalink
Add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Dec 19, 2024
1 parent 92b5d00 commit 3027f70
Showing 1 changed file with 3 additions and 0 deletions.
3 changes: 3 additions & 0 deletions plugins/inputs/kinesis_consumer/kinesis_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,15 @@ func (k *KinesisConsumer) Start(acc telegraf.Accumulator) error {
shardUpdateInterval: time.Duration(k.ShardUpdateInterval),
log: k.Log,
onMessage: func(ctx context.Context, shard string, r *types.Record) {
// Checking for number of messages in flight and wait for a free
// slot in case there are too many
select {
case <-ctx.Done():
return
case k.sem <- struct{}{}:
break
}

if err := k.onMessage(k.acc, shard, r); err != nil {
seqnr := *r.SequenceNumber
k.Log.Errorf("Processing message with sequence number %q in shard %s failed: %v", seqnr, shard, err)
Expand Down

0 comments on commit 3027f70

Please sign in to comment.