Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1078 from zalando/ARUHA-2414
Browse files Browse the repository at this point in the history
ARUHA-2414 Increase kafka buffer size
  • Loading branch information
antban authored Jul 16, 2019
2 parents eb665e5 + 9b07423 commit f6e1b2e
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class KafkaRepositoryAT extends BaseAT {
private static final int KAFKA_DELIVERY_TIMEOUT = 30000;
private static final int KAFKA_MAX_BLOCK_TIMEOUT = 5000;
private static final int KAFKA_BATCH_SIZE = 1048576;
private static final long KAFKA_BUFFER_MEMORY = KAFKA_BATCH_SIZE * 10L;
private static final int KAFKA_LINGER_MS = 0;
private static final long NAKADI_EVENT_MAX_BYTES = 1000000L;
private static final long TIMELINE_WAIT_TIMEOUT = 40000;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void setup() {
DEFAULT_WARN_ALL_DATA_ACCESS_MESSAGE,
DEFAULT_WARN_LOG_COMPACTION_MESSAGE);

kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE,
kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY,
KAFKA_LINGER_MS, KAFKA_ENABLE_AUTO_COMMIT, KAFKA_MAX_REQUEST_SIZE,
KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT);
zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public Properties getKafkaProducerProperties() {
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaSettings.getRequestTimeoutMs());
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaSettings.getBufferMemory());
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaSettings.getBatchSize());
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaSettings.getLingerMs());
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class KafkaSettings {
// https://github.com/apache/kafka/blob/d9206500bf2f99ce93f6ad64c7a89483100b3b5f/clients/src/main/java/org/apache
// /kafka/clients/producer/ProducerConfig.java#L232
private final int batchSize;
private final long bufferMemory;
private final int lingerMs;
private final boolean enableAutoCommit;
private final int maxRequestSize;
Expand All @@ -24,13 +25,15 @@ public class KafkaSettings {
@Autowired
public KafkaSettings(@Value("${nakadi.kafka.request.timeout.ms}") final int requestTimeoutMs,
@Value("${nakadi.kafka.batch.size}") final int batchSize,
@Value("${nakadi.kafka.buffer.memory}") final long bufferMemory,
@Value("${nakadi.kafka.linger.ms}") final int lingerMs,
@Value("${nakadi.kafka.enable.auto.commit}") final boolean enableAutoCommit,
@Value("${nakadi.kafka.max.request.size}") final int maxRequestSize,
@Value("${nakadi.kafka.delivery.timeout.ms}") final int deliveryTimeoutMs,
@Value("${nakadi.kafka.max.block.ms}") final int maxBlockMs) {
this.requestTimeoutMs = requestTimeoutMs;
this.batchSize = batchSize;
this.bufferMemory = bufferMemory;
this.lingerMs = lingerMs;
this.enableAutoCommit = enableAutoCommit;
this.maxRequestSize = maxRequestSize;
Expand All @@ -46,6 +49,10 @@ public int getBatchSize() {
return batchSize;
}

public long getBufferMemory() {
return bufferMemory;
}

public int getLingerMs() {
return lingerMs;
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ nakadi:
send.timeoutMs: 5000
batch.size: 5242880
max.request.size: 2098152
buffer.memory: 83886080 # May be defined as X * Y * Z, where:
# X - avg incoming traffic per instance per second (here: 4Mb/s)
# Y - spike koeff, to get traffic for loaded instance out of average traffic (here: 2)
# Z - seconds to tolerate networking issues (here: 10)
linger.ms: 0
enable.auto.commit: false
delivery.timeout.ms: 30000 # request.timeout.ms + linger.ms
Expand Down

0 comments on commit f6e1b2e

Please sign in to comment.