From 9b07423d1921b3d9d5fe9397b2168902c75d9f7a Mon Sep 17 00:00:00 2001 From: dsorokin Date: Tue, 16 Jul 2019 11:40:18 +0200 Subject: [PATCH] ARUHA-2414 Increase kafka buffer size --- .../zalando/nakadi/repository/kafka/KafkaRepositoryAT.java | 3 ++- .../nakadi/repository/kafka/KafkaLocationManager.java | 1 + .../org/zalando/nakadi/repository/kafka/KafkaSettings.java | 7 +++++++ src/main/resources/application.yml | 4 ++++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index 7a5e4e1ef2..b910242595 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -56,6 +56,7 @@ public class KafkaRepositoryAT extends BaseAT { private static final int KAFKA_DELIVERY_TIMEOUT = 30000; private static final int KAFKA_MAX_BLOCK_TIMEOUT = 5000; private static final int KAFKA_BATCH_SIZE = 1048576; + private static final long KAFKA_BUFFER_MEMORY = KAFKA_BATCH_SIZE * 10L; private static final int KAFKA_LINGER_MS = 0; private static final long NAKADI_EVENT_MAX_BYTES = 1000000L; private static final long TIMELINE_WAIT_TIMEOUT = 40000; @@ -94,7 +95,7 @@ public void setup() { DEFAULT_WARN_ALL_DATA_ACCESS_MESSAGE, DEFAULT_WARN_LOG_COMPACTION_MESSAGE); - kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, + kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY, KAFKA_LINGER_MS, KAFKA_ENABLE_AUTO_COMMIT, KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT); zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT); diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java index 58be09de67..48d5281d5a 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java @@ -117,6 +117,7 @@ public Properties getKafkaProducerProperties() { "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaSettings.getRequestTimeoutMs()); + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaSettings.getBufferMemory()); producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaSettings.getBatchSize()); producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaSettings.getLingerMs()); producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java index 6b43d5b0fe..dae56b85ec 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java @@ -15,6 +15,7 @@ public class KafkaSettings { // https://github.com/apache/kafka/blob/d9206500bf2f99ce93f6ad64c7a89483100b3b5f/clients/src/main/java/org/apache // /kafka/clients/producer/ProducerConfig.java#L232 private final int batchSize; + private final long bufferMemory; private final int lingerMs; private final boolean enableAutoCommit; private final int maxRequestSize; @@ -24,6 +25,7 @@ public class KafkaSettings { @Autowired public KafkaSettings(@Value("${nakadi.kafka.request.timeout.ms}") final int requestTimeoutMs, @Value("${nakadi.kafka.batch.size}") final int batchSize, + @Value("${nakadi.kafka.buffer.memory}") final long bufferMemory, @Value("${nakadi.kafka.linger.ms}") final int lingerMs, @Value("${nakadi.kafka.enable.auto.commit}") final boolean enableAutoCommit, @Value("${nakadi.kafka.max.request.size}") final int maxRequestSize, @@ -31,6 +33,7 @@ public KafkaSettings(@Value("${nakadi.kafka.request.timeout.ms}") final int requ @Value("${nakadi.kafka.max.block.ms}") final int maxBlockMs) { this.requestTimeoutMs = requestTimeoutMs; this.batchSize = batchSize; + this.bufferMemory = bufferMemory; this.lingerMs = lingerMs; this.enableAutoCommit = enableAutoCommit; this.maxRequestSize = maxRequestSize; @@ -46,6 +49,10 @@ public int getBatchSize() { return batchSize; } + public long getBufferMemory() { + return bufferMemory; + } + public int getLingerMs() { return lingerMs; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0b53b93168..d734fb77de 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -65,6 +65,10 @@ nakadi: send.timeoutMs: 5000 batch.size: 5242880 max.request.size: 2098152 + buffer.memory: 83886080 # May be defined as X * Y * Z, where: + # X - avg incoming traffic per instance per second (here: 4Mb/s) + # Y - spike koeff, to get traffic for loaded instance out of average traffic (here: 2) + # Z - seconds to tolerate networking issues (here: 10) linger.ms: 0 enable.auto.commit: false delivery.timeout.ms: 30000 # request.timeout.ms + linger.ms