diff --git a/plugin/input/kafka/README.md b/plugin/input/kafka/README.md index 63e1fefd..d0808f07 100755 --- a/plugin/input/kafka/README.md +++ b/plugin/input/kafka/README.md @@ -59,6 +59,14 @@ The number of unprocessed messages in the buffer that are loaded in the backgrou
+**`max_concurrent_fetches`** *`int`* *`default=0`* + +MaxConcurrentFetches sets the maximum number of fetch requests to allow in +flight or buffered at once, overriding the unbounded (i.e. number of +brokers) default. + +
+ **`fetch_max_bytes`** *`cfg.Expression`* *`default=52428800`* FetchMaxBytes (fetch.max.bytes) sets the maximum amount of bytes a broker will try to send during a fetch diff --git a/plugin/input/kafka/client.go b/plugin/input/kafka/client.go index a5a3d189..f316ad2c 100644 --- a/plugin/input/kafka/client.go +++ b/plugin/input/kafka/client.go @@ -2,7 +2,6 @@ package kafka import ( "context" - "runtime" "time" "github.com/ozontech/file.d/cfg" @@ -17,7 +16,7 @@ func NewClient(c *Config, l *zap.Logger, s Consumer) *kgo.Client { kgo.ConsumeTopics(c.Topics...), kgo.FetchMaxWait(c.ConsumerMaxWaitTime_), kgo.AutoCommitMarks(), - kgo.MaxConcurrentFetches(runtime.GOMAXPROCS(0)), + kgo.MaxConcurrentFetches(c.MaxConcurrentFetches), kgo.FetchMaxBytes(c.FetchMaxBytes_), kgo.FetchMinBytes(c.FetchMinBytes_), kgo.AutoCommitInterval(c.AutoCommitInterval_), diff --git a/plugin/input/kafka/kafka.go b/plugin/input/kafka/kafka.go index 3475473b..e51c319e 100644 --- a/plugin/input/kafka/kafka.go +++ b/plugin/input/kafka/kafka.go @@ -97,6 +97,13 @@ type Config struct { // > The number of unprocessed messages in the buffer that are loaded in the background from kafka. (max.poll.records) ChannelBufferSize int `json:"channel_buffer_size" default:"256"` // * + // > @3@4@5@6 + // > + // > MaxConcurrentFetches sets the maximum number of fetch requests to allow in + // > flight or buffered at once, overriding the unbounded (i.e. number of + // > brokers) default. + MaxConcurrentFetches int `json:"max_concurrent_fetches" default:"0"` // * + // > @3@4@5@6 // > // > FetchMaxBytes (fetch.max.bytes) sets the maximum amount of bytes a broker will try to send during a fetch