Skip to content

Commit

Permalink
gomaxprocs*1 default for kafka consumer max_concurrent_fetches
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Aug 19, 2024
1 parent 2f880de commit b551951
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
6 changes: 3 additions & 3 deletions plugin/input/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ The number of unprocessed messages in the buffer that are loaded in the backgrou

<br>

**`max_concurrent_fetches`** *`int`* *`default=0`*
**`max_concurrent_fetches`** *`cfg.Expression`* *`default=gomaxprocs*1`*

MaxConcurrentFetches sets the maximum number of fetch requests to allow in
flight or buffered at once, overriding the unbounded (i.e. number of
brokers) default.
flight or buffered at once
(use 0 to make max_concurrent_fetches equal to the number of brokers)

<br>

Expand Down
2 changes: 1 addition & 1 deletion plugin/input/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func NewClient(c *Config, l *zap.Logger, s Consumer) *kgo.Client {
kgo.ConsumeTopics(c.Topics...),
kgo.FetchMaxWait(c.ConsumerMaxWaitTime_),
kgo.AutoCommitMarks(),
kgo.MaxConcurrentFetches(c.MaxConcurrentFetches),
kgo.MaxConcurrentFetches(c.MaxConcurrentFetches_),
kgo.FetchMaxBytes(c.FetchMaxBytes_),
kgo.FetchMinBytes(c.FetchMinBytes_),
kgo.AutoCommitInterval(c.AutoCommitInterval_),
Expand Down
7 changes: 4 additions & 3 deletions plugin/input/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ type Config struct {
// > @3@4@5@6
// >
// > MaxConcurrentFetches sets the maximum number of fetch requests to allow in
// > flight or buffered at once, overriding the unbounded (i.e. number of
// > brokers) default.
MaxConcurrentFetches int `json:"max_concurrent_fetches" default:"0"` // *
// > flight or buffered at once
// > (use 0 to make max_concurrent_fetches equal to the number of brokers)
MaxConcurrentFetches cfg.Expression `json:"max_concurrent_fetches" default:"gomaxprocs*1" parse:"expression"` // *
MaxConcurrentFetches_ int

// > @3@4@5@6
// >
Expand Down

0 comments on commit b551951

Please sign in to comment.