diff --git a/build.gradle b/build.gradle index 2467165190..44c7a1daa4 100644 --- a/build.gradle +++ b/build.gradle @@ -150,8 +150,8 @@ dependencies { compile 'org.echocat.jomon:runtime:1.6.3' // kafka & zookeeper - compile 'org.apache.kafka:kafka-clients:0.9.0.1' - compile 'org.apache.kafka:kafka_2.11:0.9.0.1' + compile 'org.apache.kafka:kafka-clients:0.10.1.0' + compile 'org.apache.kafka:kafka_2.11:0.10.1.0' compile 'org.apache.curator:curator-framework:2.12.0' compile 'org.apache.curator:curator-recipes:2.12.0' diff --git a/docker-compose.yml b/docker-compose.yml index 636ca6e497..7d0e08481d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -38,7 +38,7 @@ services: - "2181:2181" kafka: - image: wurstmeister/kafka:0.9.0.0 + image: wurstmeister/kafka:0.10.1.0 network_mode: "host" ports: - "9092:9092" diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 043b6ba15a..f9147d507d 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Fri Feb 24 13:27:52 CET 2017 +#Tue Jun 20 11:33:10 CEST 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.14.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.14.1-all.zip diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java index a5734e9b06..dfb2a4b408 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java @@ -1,6 +1,7 @@ package org.zalando.nakadi.repository.kafka; import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; import kafka.server.ConfigType; import kafka.utils.ZkUtils; import org.apache.commons.lang3.StringUtils; @@ -87,7 +88,7 @@ public List getNextOffsets(final String topic) { .collect(Collectors.toList()); consumer.assign(partitions); - consumer.seekToEnd(partitions.toArray(new TopicPartition[partitions.size()])); + consumer.seekToEnd(partitions); return partitions .stream() @@ -100,7 +101,7 @@ public void createTopic(final String topic, final String zkUrl) { ZkUtils zkUtils = null; try { zkUtils = ZkUtils.apply(zkUrl, 30000, 10000, false); - AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties()); + AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); } finally { if (zkUtils != null) { diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 1f3be50d1c..06b473063b 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -2,7 +2,7 @@ import com.google.common.base.Preconditions; import kafka.admin.AdminUtils; -import kafka.common.TopicExistsException; +import kafka.admin.RackAwareMode; import kafka.server.ConfigType; import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.Consumer; @@ -13,6 +13,7 @@ import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NotLeaderForPartitionException; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; @@ -43,7 +44,7 @@ import org.zalando.nakadi.util.UUIDGenerator; import javax.annotation.Nullable; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -130,7 +131,8 @@ private void createTopic(final String topic, final int partitionsNum, final int final Properties topicConfig = new Properties(); topicConfig.setProperty("retention.ms", Long.toString(retentionMs)); topicConfig.setProperty("segment.ms", Long.toString(rotationMs)); - AdminUtils.createTopic(zkUtils, topic, partitionsNum, replicaFactor, topicConfig); + AdminUtils.createTopic(zkUtils, topic, partitionsNum, replicaFactor, topicConfig, + RackAwareMode.Enforced$.MODULE$); }); } catch (final TopicExistsException e) { throw new TopicCreationException("Topic with name " + topic + @@ -308,20 +310,19 @@ private void failUnpublished(final List batch, final String reason) { public Optional loadPartitionStatistics(final Timeline timeline, final String partition) throws ServiceUnavailableException { try (Consumer consumer = kafkaFactory.getConsumer()) { - final List topicPartitions = consumer.partitionsFor(timeline.getTopic()); - - final Optional tp = topicPartitions.stream() + final Optional tp = consumer.partitionsFor(timeline.getTopic()).stream() .filter(p -> KafkaCursor.toNakadiPartition(p.partition()).equals(partition)) .findAny(); if (!tp.isPresent()) { return Optional.empty(); } final TopicPartition kafkaTP = tp.map(v -> new TopicPartition(v.topic(), v.partition())).get(); + final Collection topicPartitions = Collections.singletonList(kafkaTP); consumer.assign(Collections.singletonList(kafkaTP)); - consumer.seekToBeginning(kafkaTP); + consumer.seekToBeginning(topicPartitions); final long begin = consumer.position(kafkaTP); - consumer.seekToEnd(kafkaTP); + consumer.seekToEnd(topicPartitions); final long end = consumer.position(kafkaTP); return Optional.of(new KafkaPartitionStatistics(timeline, kafkaTP.partition(), begin, end - 1)); @@ -342,18 +343,18 @@ public List loadTopicStatistics(final Collection .map(p -> new TopicPartition(p.topic(), p.partition())) .forEach(tp -> backMap.put(tp, timeline)); } - final TopicPartition[] kafkaTPs = backMap.keySet().toArray(new TopicPartition[backMap.size()]); - consumer.assign(Arrays.asList(kafkaTPs)); + final List kafkaTPs = new ArrayList<>(backMap.keySet()); + consumer.assign(kafkaTPs); consumer.seekToBeginning(kafkaTPs); - final long[] begins = Stream.of(kafkaTPs).mapToLong(consumer::position).toArray(); + final long[] begins = kafkaTPs.stream().mapToLong(consumer::position).toArray(); consumer.seekToEnd(kafkaTPs); - final long[] ends = Stream.of(kafkaTPs).mapToLong(consumer::position).toArray(); + final long[] ends = kafkaTPs.stream().mapToLong(consumer::position).toArray(); - return IntStream.range(0, kafkaTPs.length) + return IntStream.range(0, kafkaTPs.size()) .mapToObj(i -> new KafkaPartitionStatistics( - backMap.get(kafkaTPs[i]), - kafkaTPs[i].partition(), + backMap.get(kafkaTPs.get(i)), + kafkaTPs.get(i).partition(), begins[i], ends[i] - 1)) .collect(toList()); @@ -375,7 +376,7 @@ public List loadTopicEndStatistics(final Collection