From 2b407aa14e68bf1c11c2dbd22157d00b95bd2556 Mon Sep 17 00:00:00 2001 From: simplesteph Date: Wed, 14 Dec 2016 16:58:26 +1100 Subject: [PATCH] added option to look for non .sh executables --- kafka/tools/assigner/__main__.py | 12 ++++++------ kafka/tools/assigner/models/reassignment.py | 14 +++++++------- kafka/tools/assigner/tools.py | 16 +++++++++++++--- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/kafka/tools/assigner/__main__.py b/kafka/tools/assigner/__main__.py index 1f421b5..84ca350 100644 --- a/kafka/tools/assigner/__main__.py +++ b/kafka/tools/assigner/__main__.py @@ -27,7 +27,7 @@ from kafka.tools.assigner.batcher import split_partitions_into_batches from kafka.tools.assigner.exceptions import ProgrammingException from kafka.tools.assigner.modules import get_modules -from kafka.tools.assigner.tools import get_tools_path, check_java_home +from kafka.tools.assigner.tools import get_tools_path_with_ext, check_java_home from kafka.tools.assigner.models.cluster import Cluster from kafka.tools.assigner.models.reassignment import Reassignment from kafka.tools.assigner.models.replica_election import ReplicaElection @@ -53,7 +53,7 @@ def check_and_get_sizes(action_cls, args, cluster, sizer_map): log.info("{0} {1}:{2}".format(partition.size, topic, partition.num)) -def run_preferred_replica_elections(batches, args, tools_path, plugins, dry_run): +def run_preferred_replica_elections(batches, args, tools_path, extension, plugins, dry_run): for i, batch in enumerate(batches): # Sleep between PLEs if i > 0 and not dry_run: @@ -61,7 +61,7 @@ def run_preferred_replica_elections(batches, args, tools_path, plugins, dry_run) time.sleep(args.ple_wait) log.info("Executing preferred replica election {0}/{1}".format(i + 1, len(batches))) - batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run) + batch.execute(i + 1, len(batches), args.zookeeper, tools_path, extension, plugins, dry_run) def get_all_plugins(): @@ -103,7 +103,7 @@ def main(): args = set_up_arguments(action_map, sizer_map, plugins) run_plugins_at_step(plugins, 'set_arguments', args) - tools_path = get_tools_path(args.tools_path) + tools_path, extension = get_tools_path_with_ext(args.tools_path) check_java_home() cluster = Cluster.create_from_zookeeper(args.zookeeper) @@ -131,7 +131,7 @@ def main(): for i, batch in enumerate(batches): log.info("Executing partition reassignment {0}/{1}: {2}".format(i + 1, len(batches), repr(batch))) - batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run) + batch.execute(i + 1, len(batches), args.zookeeper, tools_path, extension, plugins, dry_run) run_plugins_at_step(plugins, 'before_ple') @@ -139,7 +139,7 @@ def main(): all_cluster_partitions = [p for p in action_to_run.cluster.partitions(args.exclude_topics)] batches = split_partitions_into_batches(all_cluster_partitions, batch_size=args.ple_size, use_class=ReplicaElection) log.info("Number of replica elections: {0}".format(len(batches))) - run_preferred_replica_elections(batches, args, tools_path, plugins, dry_run) + run_preferred_replica_elections(batches, args, tools_path, extension, plugins, dry_run) run_plugins_at_step(plugins, 'finished') diff --git a/kafka/tools/assigner/models/reassignment.py b/kafka/tools/assigner/models/reassignment.py index db91934..3bb87eb 100644 --- a/kafka/tools/assigner/models/reassignment.py +++ b/kafka/tools/assigner/models/reassignment.py @@ -44,20 +44,20 @@ def dict_for_reassignment(self): reassignment['partitions'].append(partition.dict_for_reassignment()) return reassignment - def execute(self, num, total, zookeeper, tools_path, plugins=[], dry_run=True): + def execute(self, num, total, zookeeper, tools_path, extension, plugins=[], dry_run=True): for plugin in plugins: plugin.before_execute_batch(num) if not dry_run: - self._execute(num, total, zookeeper, tools_path) + self._execute(num, total, zookeeper, tools_path, extension) for plugin in plugins: plugin.after_execute_batch(num) - def _execute(self, num, total, zookeeper, tools_path): + def _execute(self, num, total, zookeeper, tools_path, extension): with NamedTemporaryFile(mode='w') as assignfile: json.dump(self.dict_for_reassignment(), assignfile) assignfile.flush() FNULL = open(os.devnull, 'w') - proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--execute', + proc = subprocess.Popen(['{0}/kafka-reassign-partitions{1}'.format(tools_path, extension), '--execute', '--zookeeper', zookeeper, '--reassignment-json-file', assignfile.name], stdout=FNULL, stderr=FNULL) @@ -65,7 +65,7 @@ def _execute(self, num, total, zookeeper, tools_path): # Wait until finished while True: - remaining_partitions = self.check_completion(zookeeper, tools_path, assignfile.name) + remaining_partitions = self.check_completion(zookeeper, tools_path, extension, assignfile.name) if remaining_partitions == 0: break @@ -85,9 +85,9 @@ def process_verify_match(self, line): return 1 return 0 - def check_completion(self, zookeeper, tools_path, assign_filename): + def check_completion(self, zookeeper, tools_path, extension, assign_filename): FNULL = open(os.devnull, 'w') - proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--verify', + proc = subprocess.Popen(['{0}/kafka-reassign-partitions{1}'.format(tools_path, extension), '--verify', '--zookeeper', zookeeper, '--reassignment-json-file', assign_filename], stdout=subprocess.PIPE, stderr=FNULL) diff --git a/kafka/tools/assigner/tools.py b/kafka/tools/assigner/tools.py index 43059af..f2cbb16 100644 --- a/kafka/tools/assigner/tools.py +++ b/kafka/tools/assigner/tools.py @@ -49,7 +49,17 @@ def find_path_containing(fname): raise ConfigurationException("Cannot find the Kafka admin utilities using PATH. Try using the --tools-path option") -def get_tools_path(tools_path=None): +def get_tools_path_with_ext(tools_path=None, executable_name='kafka-reassign-partitions'): + extension = ".sh" + try: + tools_path = get_tools_path(tools_path=tools_path, executable_name=executable_name) + return tools_path, "" + except ConfigurationException: + tools_path = get_tools_path(tools_path=tools_path, executable_name=(executable_name+extension)) + return tools_path, extension + + +def get_tools_path(tools_path=None, executable_name='kafka-reassign-partitions.sh'): """ Find the Kafka admin utilities, either from the provided arg or the PATH. @@ -58,12 +68,12 @@ def get_tools_path(tools_path=None): :raises: ConfigurationException if the path cannot be determined """ if tools_path is not None: - script_file = os.path.join(tools_path, 'kafka-reassign-partitions.sh') + script_file = os.path.join(tools_path, executable_name) if not is_exec_file(script_file): raise ConfigurationException("--tools-path does not lead to the Kafka admin utilities ({0} is not an executable)".format(script_file)) return tools_path - return find_path_containing('kafka-reassign-partitions.sh') + return find_path_containing(executable_name) def check_java_home():