Skip to content

Commit

Permalink
Revert "rm kafka input option max_concurrent_fetches"
Browse files Browse the repository at this point in the history
This reverts commit 216d4cb.
  • Loading branch information
DmitryRomanov committed Aug 19, 2024
1 parent 216d4cb commit 2f880de
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 2 deletions.
8 changes: 8 additions & 0 deletions plugin/input/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ The number of unprocessed messages in the buffer that are loaded in the backgrou

<br>

**`max_concurrent_fetches`** *`int`* *`default=0`*

MaxConcurrentFetches sets the maximum number of fetch requests to allow in
flight or buffered at once, overriding the unbounded (i.e. number of
brokers) default.

<br>

**`fetch_max_bytes`** *`cfg.Expression`* *`default=52428800`*

FetchMaxBytes (fetch.max.bytes) sets the maximum amount of bytes a broker will try to send during a fetch
Expand Down
3 changes: 1 addition & 2 deletions plugin/input/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kafka

import (
"context"
"runtime"
"time"

"github.com/ozontech/file.d/cfg"
Expand All @@ -17,7 +16,7 @@ func NewClient(c *Config, l *zap.Logger, s Consumer) *kgo.Client {
kgo.ConsumeTopics(c.Topics...),
kgo.FetchMaxWait(c.ConsumerMaxWaitTime_),
kgo.AutoCommitMarks(),
kgo.MaxConcurrentFetches(runtime.GOMAXPROCS(0)),
kgo.MaxConcurrentFetches(c.MaxConcurrentFetches),
kgo.FetchMaxBytes(c.FetchMaxBytes_),
kgo.FetchMinBytes(c.FetchMinBytes_),
kgo.AutoCommitInterval(c.AutoCommitInterval_),
Expand Down
7 changes: 7 additions & 0 deletions plugin/input/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ type Config struct {
// > The number of unprocessed messages in the buffer that are loaded in the background from kafka. (max.poll.records)
ChannelBufferSize int `json:"channel_buffer_size" default:"256"` // *

// > @3@4@5@6
// >
// > MaxConcurrentFetches sets the maximum number of fetch requests to allow in
// > flight or buffered at once, overriding the unbounded (i.e. number of
// > brokers) default.
MaxConcurrentFetches int `json:"max_concurrent_fetches" default:"0"` // *

// > @3@4@5@6
// >
// > FetchMaxBytes (fetch.max.bytes) sets the maximum amount of bytes a broker will try to send during a fetch
Expand Down

0 comments on commit 2f880de

Please sign in to comment.