Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #730 from zalando/feature/kafka-compression
Browse files Browse the repository at this point in the history
aruha-1032: enabled lz4 for producer
  • Loading branch information
adyach authored Aug 29, 2017
2 parents 08a0e61 + fd60cdf commit 4673839
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [2.2.0] - 2017-08-29

### Added
- Enable lz4 compression type for Kafka producer

## [2.1.2] - 2017-08-24

### Fixed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.zalando.nakadi.repository.kafka;

import org.apache.curator.framework.CuratorFramework;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,8 +80,6 @@ private static String buildBootstrapServers(final List<Broker> brokers) {
private Properties buildKafkaProperties(final List<Broker> brokers) {
final Properties props = new Properties();
props.put("bootstrap.servers", buildBootstrapServers(brokers));
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}

Expand All @@ -87,27 +88,33 @@ private void updateBrokers() {
if (kafkaProperties != null) {
final List<Broker> brokers = fetchBrokers();
if (!brokers.isEmpty()) {
kafkaProperties.setProperty("bootstrap.servers", buildBootstrapServers(brokers));
kafkaProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
buildBootstrapServers(brokers));
}
}
}

public Properties getKafkaConsumerProperties() {
final Properties properties = (Properties) kafkaProperties.clone();
properties.put("enable.auto.commit", kafkaSettings.getEnableAutoCommit());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaSettings.getEnableAutoCommit());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
return properties;
}

public Properties getKafkaProducerProperties() {
final Properties producerProps = getKafkaConsumerProperties();
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("acks", "all");
producerProps.put("request.timeout.ms", kafkaSettings.getRequestTimeoutMs());
producerProps.put("batch.size", kafkaSettings.getBatchSize());
producerProps.put("linger.ms", kafkaSettings.getLingerMs());
final Properties producerProps = (Properties) kafkaProperties.clone();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaSettings.getRequestTimeoutMs());
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaSettings.getBatchSize());
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaSettings.getLingerMs());
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
return producerProps;
}
}

0 comments on commit 4673839

Please sign in to comment.