Skip to content

Commit

Permalink
rm kafka input option max_concurrent_fetches
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Aug 19, 2024
1 parent 981249b commit 216d4cb
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 16 deletions.
8 changes: 0 additions & 8 deletions plugin/input/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,6 @@ The number of unprocessed messages in the buffer that are loaded in the backgrou

<br>

**`max_concurrent_fetches`** *`int`* *`default=0`*

MaxConcurrentFetches sets the maximum number of fetch requests to allow in
flight or buffered at once, overriding the unbounded (i.e. number of
brokers) default.

<br>

**`fetch_max_bytes`** *`cfg.Expression`* *`default=52428800`*

FetchMaxBytes (fetch.max.bytes) sets the maximum amount of bytes a broker will try to send during a fetch
Expand Down
3 changes: 2 additions & 1 deletion plugin/input/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"context"
"runtime"
"time"

"github.com/ozontech/file.d/cfg"
Expand All @@ -16,7 +17,7 @@ func NewClient(c *Config, l *zap.Logger, s Consumer) *kgo.Client {
kgo.ConsumeTopics(c.Topics...),
kgo.FetchMaxWait(c.ConsumerMaxWaitTime_),
kgo.AutoCommitMarks(),
kgo.MaxConcurrentFetches(c.MaxConcurrentFetches),
kgo.MaxConcurrentFetches(runtime.GOMAXPROCS(0)),
kgo.FetchMaxBytes(c.FetchMaxBytes_),
kgo.FetchMinBytes(c.FetchMinBytes_),
kgo.AutoCommitInterval(c.AutoCommitInterval_),
Expand Down
7 changes: 0 additions & 7 deletions plugin/input/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,6 @@ type Config struct {
// > The number of unprocessed messages in the buffer that are loaded in the background from kafka. (max.poll.records)
ChannelBufferSize int `json:"channel_buffer_size" default:"256"` // *

// > @3@4@5@6
// >
// > MaxConcurrentFetches sets the maximum number of fetch requests to allow in
// > flight or buffered at once, overriding the unbounded (i.e. number of
// > brokers) default.
MaxConcurrentFetches int `json:"max_concurrent_fetches" default:"0"` // *

// > @3@4@5@6
// >
// > FetchMaxBytes (fetch.max.bytes) sets the maximum amount of bytes a broker will try to send during a fetch
Expand Down

0 comments on commit 216d4cb

Please sign in to comment.