From e38dc2306d8291d144620c6b929fad6f087e83d0 Mon Sep 17 00:00:00 2001 From: Andrey Polyakov Date: Tue, 23 Jul 2019 12:43:43 -0700 Subject: [PATCH 1/2] Updates for compatability with more recent Kafka Tool was broken when run against Kafka 2.2.1 beacuse "Failed to parse the broker info from zookeeper". This change does the following: - use the latest kafka-clients lib - stop using deprecated ZkUtils - remove unnecessary dependency on Kafka broker code - no more Scala <-> Java collection conversion - use Java 1.7 for compilation (README requirements already specify this requirement) - sorts the JSON output so you can use diff tools more easily --- README.md | 15 +- pom.xml | 11 +- .../kafka/tools/KafkaAssignmentGenerator.java | 198 ++++++++++-------- .../kafka/tools/KafkaTopicAssigner.java | 51 +---- 4 files changed, 119 insertions(+), 156 deletions(-) diff --git a/README.md b/README.md index a387ed2..4b36c25 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,9 @@ Requires Java 1.7+ ``` ./kafka-assignment-generator.sh [options...] arguments... + --bootstrap_server VAL : Kafka Broker(s) to bootstrap + connection (comma-separated host:port + pairs) --broker_hosts VAL : comma-separated list of broker hostnames (instead of broker IDs) --broker_hosts_to_remove VAL : comma-separated list of broker @@ -40,13 +43,11 @@ Requires Java 1.7+ PRINT_CURRENT_BROKERS | ENT, PRINT_CURRENT_BROKERS, PRINT_REASSIGNMENT] PRINT_REASSIGNMENT) --topics VAL : comma-separated list of topics - --zk_string VAL : ZK quorum as comma-separated - host:port pairs ``` ### Example: reassign partitions to all live hosts ``` -./kafka-assignment-generator.sh --zk_string my-zk-host:2181 --mode PRINT_REASSIGNMENT +./kafka-assignment-generator.sh --bootstrap_server my-kafka-host:9092 --mode PRINT_REASSIGNMENT ``` The output JSON can then be fed into Kafka's reassign partitions command. See [here](http://kafka.apache.org/0100/ops.html#basic_ops_partitionassignment) for instructions. @@ -54,7 +55,7 @@ The output JSON can then be fed into Kafka's reassign partitions command. See [h ### Example: reassign partitions to all but a few live hosts This mode is useful for decommissioning or replacing a node. The partitions will be assigned to all live hosts, excluding the hosts that are specified. ``` -./kafka-assignment-generator.sh --zk_string my-zk-host:2181 --mode PRINT_REASSIGNMENT --broker_hosts_to_remove misbehaving-host1,misbehaving-host2 +./kafka-assignment-generator.sh --bootstrap_server my-kafka-host:9092 --mode PRINT_REASSIGNMENT --broker_hosts_to_remove misbehaving-host1,misbehaving-host2 ``` The output JSON can then be fed into Kafka's reassign partitions command. See [here](http://kafka.apache.org/0100/ops.html#basic_ops_partitionassignment) for instructions. @@ -62,19 +63,19 @@ The output JSON can then be fed into Kafka's reassign partitions command. See [h ### Example: reassign partitions to specific hosts Note that in this mode, it is expected that every host that should own partitions should be specified, including existing ones. ``` -./kafka-assignment-generator.sh --zk_string my-zk-host:2181 --mode PRINT_REASSIGNMENT --broker_hosts host1,host2,host3 +./kafka-assignment-generator.sh --bootstrap_server my-kafka-host:9092 --mode PRINT_REASSIGNMENT --broker_hosts host1,host2,host3 ``` The output JSON can then be fed into Kafka's reassign partitions command. See [here](http://kafka.apache.org/0100/ops.html#basic_ops_partitionassignment) for instructions. ### Example: print current brokers ``` -./kafka-assignment-generator.sh --zk_string my-zk-host:2181 --mode PRINT_CURRENT_BROKERS +./kafka-assignment-generator.sh --bootstrap_server my-kafka-host:9092 --mode PRINT_CURRENT_BROKERS ``` ### Example: print current assignment ``` -./kafka-assignment-generator.sh --zk_string my-zk-host:2181 --mode PRINT_CURRENT_ASSIGNMENT +./kafka-assignment-generator.sh --bootstrap_server my-kafka-host:9092 --mode PRINT_CURRENT_ASSIGNMENT ``` # Building diff --git a/pom.xml b/pom.xml index 399cd43..87e93a4 100644 --- a/pom.xml +++ b/pom.xml @@ -46,15 +46,10 @@ commons-lang3 3.1 - - org.apache.kafka - kafka_2.11 - 0.10.0.0 - org.apache.kafka kafka-clients - 0.10.0.0 + 2.3.0 @@ -123,8 +118,8 @@ maven-compiler-plugin 3.0 - 1.5 - 1.5 + 1.7 + 1.7 diff --git a/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java b/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java index 9375154..2530cfb 100644 --- a/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java +++ b/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java @@ -1,35 +1,22 @@ package siftscience.kafka.tools; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import kafka.cluster.Broker; -import kafka.cluster.BrokerEndPoint; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; - -import org.I0Itec.zkclient.ZkClient; +import com.google.common.collect.*; import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option; -import scala.collection.JavaConversions; -import scala.collection.Seq; +import java.util.*; +import java.util.concurrent.ExecutionException; /** * Prints assignments of topic partition replicas to brokers. @@ -39,7 +26,7 @@ * Usage: * * bin/kafka-assignment-generator.sh - * --zk_string zkhost:2181 + * --bootstrap_servers host1:9092 * --mode PRINT_REASSIGNMENT * --broker_hosts host1,host2,host3 * --broker_hosts_to_remove misbehaving_host1 @@ -50,9 +37,9 @@ public class KafkaAssignmentGenerator { private static final Splitter SPLITTER = Splitter.on(','); - @Option(name = "--zk_string", - usage = "ZK quorum as comma-separated host:port pairs") - private String zkConnectString = null; + @Option(name = "--bootstrap_server", + usage = "Kafka Broker(s) to bootstrap connection (comma-separated host:port pairs)") + private String bootstrapServers = null; @Option(name = "--mode", usage = "the mode to run (PRINT_CURRENT_ASSIGNMENT, PRINT_CURRENT_BROKERS, " + @@ -100,48 +87,81 @@ private enum Mode { PRINT_REASSIGNMENT } - private static void printCurrentAssignment(ZkUtils zkUtils, List specifiedTopics) { - Seq topics = specifiedTopics != null ? - JavaConversions.iterableAsScalaIterable(specifiedTopics).toSeq() : - zkUtils.getAllTopics(); + private static void printCurrentAssignment(AdminClient adminClient, List specifiedTopics) throws ExecutionException, InterruptedException { + List topics = specifiedTopics != null ? specifiedTopics : new ArrayList<>(adminClient.listTopics().names().get()); + // Sort the topics for nice, diff-able output + Collections.sort(topics); + Map>> partitionAssignment = getPartitionAssignment(adminClient, topics); + + JSONObject json = new JSONObject(); + json.put("version", KAFKA_FORMAT_VERSION); + JSONArray partitionsJson = new JSONArray(); + for (String topic : topics) { + Map> partitionToReplicas = partitionAssignment.get(topic); + for (Integer partition : partitionToReplicas.keySet()) { + JSONObject partitionJson = new JSONObject(); + partitionJson.put("topic", topic); + partitionJson.put("partition", partition); + partitionJson.put("replicas", new JSONArray(partitionToReplicas.get(partition))); + partitionsJson.put(partitionJson); + } + } + json.put("partitions", partitionsJson); + System.out.println("CURRENT ASSIGNMENT:"); - System.out.println( - zkUtils.formatAsReassignmentJson(zkUtils.getReplicaAssignmentForTopics( - topics))); + System.out.println(json); } - private static void printCurrentBrokers(ZkUtils zkUtils) throws JSONException { - List brokers = JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()); + private static void printCurrentBrokers(AdminClient adminClient) throws JSONException, ExecutionException, InterruptedException { JSONArray json = new JSONArray(); - for (Broker broker : brokers) { - BrokerEndPoint endpoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT); + for (Node node: adminClient.describeCluster().nodes().get()) { JSONObject brokerJson = new JSONObject(); - brokerJson.put("id", broker.id()); - brokerJson.put("host", endpoint.host()); - brokerJson.put("port", endpoint.port()); - if (broker.rack().isDefined()) { - brokerJson.put("rack", broker.rack().get()); - } + brokerJson.put("id", node.id()); + brokerJson.put("host", node.host()); + brokerJson.put("port", node.port()); + brokerJson.put("rack", node.rack()); json.put(brokerJson); } System.out.println("CURRENT BROKERS:"); System.out.println(json.toString()); } + /** + * @return map from topic name to a map from partition ID to broker IDs that partition has been assigned to. + */ + private static Map>> getPartitionAssignment(AdminClient adminClient, List topics) throws ExecutionException, InterruptedException { + Map topicDescriptions = adminClient.describeTopics(topics).all().get(); + Map>> result = new HashMap<>(); + for (TopicDescription topicDescription : topicDescriptions.values()) { + Map> partitionToReplicas = new HashMap<>(); + for (TopicPartitionInfo tp : topicDescription.partitions()) { + List replicas = new ArrayList<>(Lists.transform(tp.replicas(), new Function() { + @Override + public Integer apply(Node node) { + return node.id(); + } + })); + // Sort the replicas for nice, diff-able output + Collections.sort(replicas); + partitionToReplicas.put(tp.partition(), replicas); + } + result.put(topicDescription.name(), partitionToReplicas); + } + return result; + } + private static void printLeastDisruptiveReassignment( - ZkUtils zkUtils, List specifiedTopics, Set specifiedBrokers, + AdminClient adminClient, List specifiedTopics, Set specifiedBrokers, Set excludedBrokers, Map rackAssignment, int desiredReplicationFactor) - throws JSONException { + throws JSONException, ExecutionException, InterruptedException { // We need three inputs for rebalacing: the brokers, the topics, and the current assignment // of topics to brokers. Set brokerSet = specifiedBrokers; if (brokerSet == null || brokerSet.isEmpty()) { - brokerSet = Sets.newHashSet(Lists.transform( - JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()), - new Function() { - @Override - public Integer apply(Broker broker) { - return broker.id(); + brokerSet = Sets.newHashSet(Collections2.transform(adminClient.describeCluster().nodes().get(), + new Function() { + public Integer apply(Node node) { + return node.id(); } })); } @@ -150,18 +170,15 @@ public Integer apply(Broker broker) { Set brokers = Sets.difference(brokerSet, excludedBrokers); rackAssignment.keySet().retainAll(brokers); - // The most common use case is to rebalance all topics, but explicit topic addition is also - // supported. - Seq topics = specifiedTopics != null ? - JavaConversions.collectionAsScalaIterable(specifiedTopics).toSeq() : - zkUtils.getAllTopics(); + // The most common use case is to rebalance all topics, but explicit topic addition is also supported. + List topics = specifiedTopics != null ? specifiedTopics : new ArrayList<>(adminClient.listTopics().names().get()); + // Sort the topics for nice, diff-able output + Collections.sort(topics); // Print the current assignment in case a rollback is needed - printCurrentAssignment(zkUtils, JavaConversions.seqAsJavaList(topics)); + printCurrentAssignment(adminClient, topics); - Map>> initialAssignments = - KafkaTopicAssigner.topicMapToJavaMap(zkUtils.getPartitionAssignmentForTopics( - topics)); + Map>> initialAssignments = getPartitionAssignment(adminClient, topics); // Assign topics one at a time. This is slightly suboptimal from a packing standpoint, but // it's close enough to work in practice. We can also always follow it up with a Kafka @@ -170,7 +187,7 @@ public Integer apply(Broker broker) { json.put("version", KAFKA_FORMAT_VERSION); JSONArray partitionsJson = new JSONArray(); KafkaTopicAssigner assigner = new KafkaTopicAssigner(); - for (String topic : JavaConversions.seqAsJavaList(topics)) { + for (String topic : topics) { Map> partitionAssignment = initialAssignments.get(topic); Map> finalAssignment = assigner.generateAssignment( topic, partitionAssignment, brokers, rackAssignment, desiredReplicationFactor); @@ -178,6 +195,8 @@ public Integer apply(Broker broker) { JSONObject partitionJson = new JSONObject(); partitionJson.put("topic", topic); partitionJson.put("partition", e.getKey()); + // Sort the replicas for nice, diff-able output + Collections.sort(e.getValue()); partitionJson.put("replicas", new JSONArray(e.getValue())); partitionsJson.put(partitionJson); } @@ -187,13 +206,14 @@ public Integer apply(Broker broker) { } private static Set brokerHostnamesToBrokerIds( - ZkUtils zkUtils, Set brokerHostnameSet, boolean checkPresence) { - List brokers = JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()); + AdminClient adminClient, Set brokerHostnameSet, boolean checkPresence)throws ExecutionException, InterruptedException { + + Collection nodes = adminClient.describeCluster().nodes().get(); Set brokerIdSet = Sets.newHashSet(); - for (Broker broker : brokers) { - BrokerEndPoint endpoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT); - if (brokerHostnameSet.contains(endpoint.host())) { - brokerIdSet.add(broker.id()); + for (Node node : nodes) { + if (brokerHostnameSet.contains(node.host())) { + brokerIdSet.add(node.id()); + System.out.println(node.id()); } } Preconditions.checkArgument(!checkPresence || @@ -203,12 +223,11 @@ private static Set brokerHostnamesToBrokerIds( return brokerIdSet; } - private Set getBrokerIds(ZkUtils zkUtils) { + private Set getBrokerIds(AdminClient adminClient) throws ExecutionException, InterruptedException { Set brokerIdSet = Collections.emptySet(); if (StringUtils.isNotEmpty(brokerIds)) { brokerIdSet = ImmutableSet.copyOf(Iterables.transform(SPLITTER .split(brokerIds), new Function() { - @Override public Integer apply(String brokerId) { try { return Integer.parseInt(brokerId); @@ -219,30 +238,28 @@ public Integer apply(String brokerId) { })); } else if (StringUtils.isNotEmpty(brokerHostnames)) { Set brokerHostnameSet = ImmutableSet.copyOf(SPLITTER.split(brokerHostnames)); - brokerIdSet = brokerHostnamesToBrokerIds(zkUtils, brokerHostnameSet, true); + brokerIdSet = brokerHostnamesToBrokerIds(adminClient, brokerHostnameSet, true); } return brokerIdSet; } - private Set getExcludedBrokerIds(ZkUtils zkUtils) { + private Set getExcludedBrokerIds(AdminClient adminClient) throws ExecutionException, InterruptedException { if (StringUtils.isNotEmpty(brokerHostnamesToReplace)) { Set brokerHostnamesToReplaceSet = ImmutableSet.copyOf( SPLITTER.split(brokerHostnamesToReplace)); return ImmutableSet.copyOf( - brokerHostnamesToBrokerIds(zkUtils, brokerHostnamesToReplaceSet, false)); + brokerHostnamesToBrokerIds(adminClient, brokerHostnamesToReplaceSet, false)); } return Collections.emptySet(); } - private Map getRackAssignment(ZkUtils zkUtils) { - List brokers = JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()); + private Map getRackAssignment(AdminClient adminClient) throws ExecutionException, InterruptedException { Map rackAssignment = Maps.newHashMap(); if (!disableRackAwareness) { - for (Broker broker : brokers) { - scala.Option rack = broker.rack(); - if (rack.isDefined()) { - rackAssignment.put(broker.id(), rack.get()); + for (Node node : adminClient.describeCluster().nodes().get()) { + if (node.hasRack()) { + rackAssignment.put(node.id(), node.rack()); } } } @@ -253,12 +270,12 @@ private List getTopics() { return topics != null ? Lists.newLinkedList(SPLITTER.split(topics)) : null; } - private void runTool(String[] args) throws JSONException { + private void runTool(String[] args) throws JSONException, ExecutionException, InterruptedException { // Get and validate all arguments from args4j CmdLineParser parser = new CmdLineParser(this); try { parser.parseArgument(args); - Preconditions.checkNotNull(zkConnectString); + Preconditions.checkNotNull(bootstrapServers); Preconditions.checkNotNull(mode); Preconditions.checkArgument(brokerIds == null || brokerHostnames == null, "--kafka_assigner_integer_broker_ids and " + @@ -269,36 +286,31 @@ private void runTool(String[] args) throws JSONException { return; } List topics = getTopics(); + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); - ZkClient zkClient = new ZkClient(zkConnectString, 10000, 10000, - ZKStringSerializer$.MODULE$); - zkClient.waitUntilConnected(); - ZkUtils zkUtils = ZkUtils.apply(zkClient, false); - - try { - Set brokerIdSet = getBrokerIds(zkUtils); - Set excludedBrokerIdSet = getExcludedBrokerIds(zkUtils); - Map rackAssignment = getRackAssignment(zkUtils); + try (AdminClient adminClient = AdminClient.create(props)) { + Set brokerIdSet = getBrokerIds(adminClient); + Set excludedBrokerIdSet = getExcludedBrokerIds(adminClient); + Map rackAssignment = getRackAssignment(adminClient); switch (mode) { case PRINT_CURRENT_ASSIGNMENT: - printCurrentAssignment(zkUtils, topics); + printCurrentAssignment(adminClient, topics); break; case PRINT_CURRENT_BROKERS: - printCurrentBrokers(zkUtils); + printCurrentBrokers(adminClient); break; case PRINT_REASSIGNMENT: - printLeastDisruptiveReassignment(zkUtils, topics, brokerIdSet, + printLeastDisruptiveReassignment(adminClient, topics, brokerIdSet, excludedBrokerIdSet, rackAssignment, desiredReplicationFactor); break; default: throw new UnsupportedOperationException("Invalid mode: " + mode); } - } finally { - zkUtils.close(); } } - public static void main(String[] args) throws JSONException { + public static void main(String[] args) throws JSONException, ExecutionException, InterruptedException { new KafkaAssignmentGenerator().runTool(args); } } diff --git a/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java b/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java index 5b5211e..e13fe76 100644 --- a/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java +++ b/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java @@ -1,16 +1,11 @@ package siftscience.kafka.tools; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import scala.collection.JavaConversions; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * Utilities for assigning topic partitions evenly to brokers. @@ -70,44 +65,4 @@ public Map> generateAssignment( return KafkaAssignmentStrategy.getRackAwareAssignment(topic, currentAssignment, rackAssignment, brokers, partitions, replicationFactor, assignmentContext); } - - /** - * Convert a Scala Kafka partition assignment into a Java one. - * @param topicMap the output from ZkUtils#getPartitionAssignmentForTopics - * @return a Java map representing the same data - */ - static Map>> topicMapToJavaMap( - scala.collection.Map>> topicMap) { - // We can actually use utilities like Maps#transformEntries, but since that doesn't allow - // changing the key type from Object to Integer, this code just goes into each map and makes - // copies all the way down. Copying is also useful for avoiding possible repeated lazy - // evaluations by the rebalancing algorithm. - Map>> resultTopicMap = Maps.newHashMap(); - Map>> convertedTopicMap = - JavaConversions.mapAsJavaMap(topicMap); - for (Map.Entry>> topicMapEntry : convertedTopicMap.entrySet()) { - String topic = topicMapEntry.getKey(); - Map> convertedPartitionMap = - JavaConversions.mapAsJavaMap(topicMapEntry.getValue()); - Map> resultPartitionMap = Maps.newHashMap(); - for (Map.Entry> partitionMapEntry : - convertedPartitionMap.entrySet()) { - Integer partition = (Integer) partitionMapEntry.getKey(); - List replicaList = Lists.newArrayList(Lists.transform( - JavaConversions.seqAsJavaList(partitionMapEntry.getValue()), - new Function() { - @Override - public Integer apply(Object raw) { - return (Integer) raw; - } - })); - resultPartitionMap.put(partition, replicaList); - } - resultTopicMap.put(topic, resultPartitionMap); - } - return resultTopicMap; - } } From 519371e1d66833cd525719a169b2cce24e9f1efe Mon Sep 17 00:00:00 2001 From: Andrey Polyakov Date: Wed, 24 Jul 2019 11:38:03 -0700 Subject: [PATCH 2/2] Don't sort replicas, changes the partition leader --- .../siftscience/kafka/tools/KafkaAssignmentGenerator.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java b/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java index 2530cfb..980534a 100644 --- a/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java +++ b/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java @@ -141,8 +141,6 @@ public Integer apply(Node node) { return node.id(); } })); - // Sort the replicas for nice, diff-able output - Collections.sort(replicas); partitionToReplicas.put(tp.partition(), replicas); } result.put(topicDescription.name(), partitionToReplicas); @@ -195,8 +193,6 @@ public Integer apply(Node node) { JSONObject partitionJson = new JSONObject(); partitionJson.put("topic", topic); partitionJson.put("partition", e.getKey()); - // Sort the replicas for nice, diff-able output - Collections.sort(e.getValue()); partitionJson.put("replicas", new JSONArray(e.getValue())); partitionsJson.put(partitionJson); }