From 993fed29cadf178ea0c52d9268c7298e28cb8c52 Mon Sep 17 00:00:00 2001 From: Andrey Dyachkov Date: Tue, 20 Jun 2017 15:30:05 +0200 Subject: [PATCH 1/2] aruha-876: kafka client 10.1 --- build.gradle | 4 +- docker-compose.yml | 2 +- gradle/wrapper/gradle-wrapper.properties | 4 +- .../repository/kafka/KafkaTestHelper.java | 5 +- .../kafka/KafkaTopicRepository.java | 207 +++++++++--------- 5 files changed, 114 insertions(+), 108 deletions(-) diff --git a/build.gradle b/build.gradle index 27b6f6d3cf..3a098d707f 100644 --- a/build.gradle +++ b/build.gradle @@ -150,8 +150,8 @@ dependencies { compile 'org.echocat.jomon:runtime:1.6.3' // kafka & zookeeper - compile 'org.apache.kafka:kafka-clients:0.9.0.1' - compile 'org.apache.kafka:kafka_2.11:0.9.0.1' + compile 'org.apache.kafka:kafka-clients:0.10.1.0' + compile 'org.apache.kafka:kafka_2.11:0.10.1.0' compile 'org.apache.curator:curator-framework:2.12.0' compile 'org.apache.curator:curator-recipes:2.12.0' diff --git a/docker-compose.yml b/docker-compose.yml index da8df13393..3232ba6a62 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -38,7 +38,7 @@ services: - "2181:2181" kafka: - image: wurstmeister/kafka:0.9.0.0 + image: wurstmeister/kafka:0.10.1.0 network_mode: "host" ports: - "9092:9092" diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 043b6ba15a..f9147d507d 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Fri Feb 24 13:27:52 CET 2017 +#Tue Jun 20 11:33:10 CEST 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.14.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.14.1-all.zip diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java index a5734e9b06..dfb2a4b408 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java @@ -1,6 +1,7 @@ package org.zalando.nakadi.repository.kafka; import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; import kafka.server.ConfigType; import kafka.utils.ZkUtils; import org.apache.commons.lang3.StringUtils; @@ -87,7 +88,7 @@ public List getNextOffsets(final String topic) { .collect(Collectors.toList()); consumer.assign(partitions); - consumer.seekToEnd(partitions.toArray(new TopicPartition[partitions.size()])); + consumer.seekToEnd(partitions); return partitions .stream() @@ -100,7 +101,7 @@ public void createTopic(final String topic, final String zkUrl) { ZkUtils zkUtils = null; try { zkUtils = ZkUtils.apply(zkUrl, 30000, 10000, false); - AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties()); + AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); } finally { if (zkUtils != null) { diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 996b4be84d..a249c27461 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -1,30 +1,8 @@ package org.zalando.nakadi.repository.kafka; import com.google.common.base.Preconditions; -import static com.google.common.collect.Lists.newArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import static java.util.Collections.unmodifiableList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; -import static java.util.stream.Collectors.toList; -import java.util.stream.IntStream; -import java.util.stream.Stream; -import javax.annotation.Nullable; import kafka.admin.AdminUtils; -import kafka.common.TopicExistsException; +import kafka.admin.RackAwareMode; import kafka.server.ConfigType; import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.Consumer; @@ -35,16 +13,13 @@ import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NotLeaderForPartitionException; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.BatchItem; -import static org.zalando.nakadi.domain.CursorError.NULL_OFFSET; -import static org.zalando.nakadi.domain.CursorError.NULL_PARTITION; -import static org.zalando.nakadi.domain.CursorError.PARTITION_NOT_FOUND; -import static org.zalando.nakadi.domain.CursorError.UNAVAILABLE; import org.zalando.nakadi.domain.EventPublishingStatus; import org.zalando.nakadi.domain.EventPublishingStep; import org.zalando.nakadi.domain.NakadiCursor; @@ -66,6 +41,34 @@ import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings; import org.zalando.nakadi.util.UUIDGenerator; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static com.google.common.collect.Lists.newArrayList; +import static java.util.Collections.unmodifiableList; +import static java.util.stream.Collectors.toList; +import static org.zalando.nakadi.domain.CursorError.NULL_OFFSET; +import static org.zalando.nakadi.domain.CursorError.NULL_PARTITION; +import static org.zalando.nakadi.domain.CursorError.PARTITION_NOT_FOUND; +import static org.zalando.nakadi.domain.CursorError.UNAVAILABLE; + public class KafkaTopicRepository implements TopicRepository { private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicRepository.class); @@ -93,6 +96,63 @@ public KafkaTopicRepository(final ZooKeeperHolder zkFactory, this.circuitBreakers = new ConcurrentHashMap<>(); } + private static CompletableFuture publishItem( + final Producer producer, + final String topicId, + final BatchItem item, + final HystrixKafkaCircuitBreaker circuitBreaker) throws EventPublishingException { + try { + final CompletableFuture result = new CompletableFuture<>(); + final ProducerRecord kafkaRecord = new ProducerRecord<>( + topicId, + KafkaCursor.toKafkaPartition(item.getPartition()), + item.getPartition(), + item.getEvent().toString()); + + circuitBreaker.markStart(); + producer.send(kafkaRecord, ((metadata, exception) -> { + if (null != exception) { + LOG.warn("Failed to publish to kafka topic {}", topicId, exception); + item.updateStatusAndDetail(EventPublishingStatus.FAILED, "internal error"); + if (hasKafkaConnectionException(exception)) { + circuitBreaker.markFailure(); + } else { + circuitBreaker.markSuccessfully(); + } + result.complete(exception); + } else { + item.updateStatusAndDetail(EventPublishingStatus.SUBMITTED, ""); + circuitBreaker.markSuccessfully(); + result.complete(null); + } + })); + return result; + } catch (final InterruptException e) { + Thread.currentThread().interrupt(); + circuitBreaker.markSuccessfully(); + item.updateStatusAndDetail(EventPublishingStatus.FAILED, "internal error"); + throw new EventPublishingException("Error publishing message to kafka", e); + } catch (final RuntimeException e) { + circuitBreaker.markSuccessfully(); + item.updateStatusAndDetail(EventPublishingStatus.FAILED, "internal error"); + throw new EventPublishingException("Error publishing message to kafka", e); + } + } + + private static boolean isExceptionShouldLeadToReset(@Nullable final Exception exception) { + if (null == exception) { + return false; + } + return Stream.of(NotLeaderForPartitionException.class, UnknownTopicOrPartitionException.class) + .anyMatch(clazz -> clazz.isAssignableFrom(exception.getClass())); + } + + private static boolean hasKafkaConnectionException(final Exception exception) { + return exception instanceof org.apache.kafka.common.errors.TimeoutException || + exception instanceof NetworkException || + exception instanceof UnknownServerException; + } + public List listTopics() throws TopicRepositoryException { try { return zkFactory.get() @@ -126,7 +186,8 @@ private void createTopic(final String topic, final int partitionsNum, final int final Properties topicConfig = new Properties(); topicConfig.setProperty("retention.ms", Long.toString(retentionMs)); topicConfig.setProperty("segment.ms", Long.toString(rotationMs)); - AdminUtils.createTopic(zkUtils, topic, partitionsNum, replicaFactor, topicConfig); + AdminUtils.createTopic(zkUtils, topic, partitionsNum, replicaFactor, topicConfig, + RackAwareMode.Enforced$.MODULE$); }); } catch (final TopicExistsException e) { throw new TopicCreationException("Topic with name " + topic + @@ -153,63 +214,6 @@ public boolean topicExists(final String topic) throws TopicRepositoryException { .anyMatch(t -> t.equals(topic)); } - private static CompletableFuture publishItem( - final Producer producer, - final String topicId, - final BatchItem item, - final HystrixKafkaCircuitBreaker circuitBreaker) throws EventPublishingException { - try { - final CompletableFuture result = new CompletableFuture<>(); - final ProducerRecord kafkaRecord = new ProducerRecord<>( - topicId, - KafkaCursor.toKafkaPartition(item.getPartition()), - item.getPartition(), - item.getEvent().toString()); - - circuitBreaker.markStart(); - producer.send(kafkaRecord, ((metadata, exception) -> { - if (null != exception) { - LOG.warn("Failed to publish to kafka topic {}", topicId, exception); - item.updateStatusAndDetail(EventPublishingStatus.FAILED, "internal error"); - if (hasKafkaConnectionException(exception)) { - circuitBreaker.markFailure(); - } else { - circuitBreaker.markSuccessfully(); - } - result.complete(exception); - } else { - item.updateStatusAndDetail(EventPublishingStatus.SUBMITTED, ""); - circuitBreaker.markSuccessfully(); - result.complete(null); - } - })); - return result; - } catch (final InterruptException e) { - Thread.currentThread().interrupt(); - circuitBreaker.markSuccessfully(); - item.updateStatusAndDetail(EventPublishingStatus.FAILED, "internal error"); - throw new EventPublishingException("Error publishing message to kafka", e); - } catch (final RuntimeException e) { - circuitBreaker.markSuccessfully(); - item.updateStatusAndDetail(EventPublishingStatus.FAILED, "internal error"); - throw new EventPublishingException("Error publishing message to kafka", e); - } - } - - private static boolean isExceptionShouldLeadToReset(@Nullable final Exception exception) { - if (null == exception) { - return false; - } - return Stream.of(NotLeaderForPartitionException.class, UnknownTopicOrPartitionException.class) - .anyMatch(clazz -> clazz.isAssignableFrom(exception.getClass())); - } - - private static boolean hasKafkaConnectionException(final Exception exception) { - return exception instanceof org.apache.kafka.common.errors.TimeoutException || - exception instanceof NetworkException || - exception instanceof UnknownServerException; - } - @Override public void syncPostBatch(final String topicId, final List batch) throws EventPublishingException { final Producer producer = kafkaFactory.takeProducer(); @@ -296,11 +300,12 @@ public Optional loadPartitionStatistics(final Timeline time return Optional.empty(); } final TopicPartition kafkaTP = tp.map(v -> new TopicPartition(v.topic(), v.partition())).get(); + final Collection topicPartitions = Collections.singletonList(kafkaTP); consumer.assign(Collections.singletonList(kafkaTP)); - consumer.seekToBeginning(kafkaTP); + consumer.seekToBeginning(topicPartitions); final long begin = consumer.position(kafkaTP); - consumer.seekToEnd(kafkaTP); + consumer.seekToEnd(topicPartitions); final long end = consumer.position(kafkaTP); return Optional.of(new KafkaPartitionStatistics(timeline, kafkaTP.partition(), begin, end - 1)); @@ -320,18 +325,18 @@ public List loadTopicStatistics(final Collection .map(p -> new TopicPartition(p.topic(), p.partition())) .forEach(tp -> backMap.put(tp, timeline)); } - final TopicPartition[] kafkaTPs = backMap.keySet().toArray(new TopicPartition[backMap.size()]); - consumer.assign(Arrays.asList(kafkaTPs)); + final List kafkaTPs = new ArrayList<>(backMap.keySet()); + consumer.assign(kafkaTPs); consumer.seekToBeginning(kafkaTPs); - final long[] begins = Stream.of(kafkaTPs).mapToLong(consumer::position).toArray(); + final long[] begins = kafkaTPs.stream().mapToLong(consumer::position).toArray(); consumer.seekToEnd(kafkaTPs); - final long[] ends = Stream.of(kafkaTPs).mapToLong(consumer::position).toArray(); + final long[] ends = kafkaTPs.stream().mapToLong(consumer::position).toArray(); - return IntStream.range(0, kafkaTPs.length) + return IntStream.range(0, kafkaTPs.size()) .mapToObj(i -> new KafkaPartitionStatistics( - backMap.get(kafkaTPs[i]), - kafkaTPs[i].partition(), + backMap.get(kafkaTPs.get(i)), + kafkaTPs.get(i).partition(), begins[i], ends[i] - 1)) .collect(toList()); @@ -353,7 +358,7 @@ public List loadTopicEndStatistics(final Collection