Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added option to look for non .sh executables #51

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions kafka/tools/assigner/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from kafka.tools.assigner.batcher import split_partitions_into_batches
from kafka.tools.assigner.exceptions import ProgrammingException
from kafka.tools.assigner.modules import get_modules
from kafka.tools.assigner.tools import get_tools_path, check_java_home
from kafka.tools.assigner.tools import get_tools_path_with_ext, check_java_home
from kafka.tools.assigner.models.cluster import Cluster
from kafka.tools.assigner.models.reassignment import Reassignment
from kafka.tools.assigner.models.replica_election import ReplicaElection
Expand All @@ -53,15 +53,15 @@ def check_and_get_sizes(action_cls, args, cluster, sizer_map):
log.info("{0} {1}:{2}".format(partition.size, topic, partition.num))


def run_preferred_replica_elections(batches, args, tools_path, plugins, dry_run):
def run_preferred_replica_elections(batches, args, tools_path, extension, plugins, dry_run):
for i, batch in enumerate(batches):
# Sleep between PLEs
if i > 0 and not dry_run:
log.info("Waiting {0} seconds for replica election to complete".format(args.ple_wait))
time.sleep(args.ple_wait)

log.info("Executing preferred replica election {0}/{1}".format(i + 1, len(batches)))
batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run)
batch.execute(i + 1, len(batches), args.zookeeper, tools_path, extension, plugins, dry_run)


def get_all_plugins():
Expand Down Expand Up @@ -103,7 +103,7 @@ def main():
args = set_up_arguments(action_map, sizer_map, plugins)
run_plugins_at_step(plugins, 'set_arguments', args)

tools_path = get_tools_path(args.tools_path)
tools_path, extension = get_tools_path_with_ext(args.tools_path)
check_java_home()

cluster = Cluster.create_from_zookeeper(args.zookeeper)
Expand Down Expand Up @@ -131,15 +131,15 @@ def main():

for i, batch in enumerate(batches):
log.info("Executing partition reassignment {0}/{1}: {2}".format(i + 1, len(batches), repr(batch)))
batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run)
batch.execute(i + 1, len(batches), args.zookeeper, tools_path, extension, plugins, dry_run)

run_plugins_at_step(plugins, 'before_ple')

if not args.skip_ple:
all_cluster_partitions = [p for p in action_to_run.cluster.partitions(args.exclude_topics)]
batches = split_partitions_into_batches(all_cluster_partitions, batch_size=args.ple_size, use_class=ReplicaElection)
log.info("Number of replica elections: {0}".format(len(batches)))
run_preferred_replica_elections(batches, args, tools_path, plugins, dry_run)
run_preferred_replica_elections(batches, args, tools_path, extension, plugins, dry_run)

run_plugins_at_step(plugins, 'finished')

Expand Down
14 changes: 7 additions & 7 deletions kafka/tools/assigner/models/reassignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,28 @@ def dict_for_reassignment(self):
reassignment['partitions'].append(partition.dict_for_reassignment())
return reassignment

def execute(self, num, total, zookeeper, tools_path, plugins=[], dry_run=True):
def execute(self, num, total, zookeeper, tools_path, extension, plugins=[], dry_run=True):
for plugin in plugins:
plugin.before_execute_batch(num)
if not dry_run:
self._execute(num, total, zookeeper, tools_path)
self._execute(num, total, zookeeper, tools_path, extension)
for plugin in plugins:
plugin.after_execute_batch(num)

def _execute(self, num, total, zookeeper, tools_path):
def _execute(self, num, total, zookeeper, tools_path, extension):
with NamedTemporaryFile(mode='w') as assignfile:
json.dump(self.dict_for_reassignment(), assignfile)
assignfile.flush()
FNULL = open(os.devnull, 'w')
proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--execute',
proc = subprocess.Popen(['{0}/kafka-reassign-partitions{1}'.format(tools_path, extension), '--execute',
'--zookeeper', zookeeper,
'--reassignment-json-file', assignfile.name],
stdout=FNULL, stderr=FNULL)
proc.wait()

# Wait until finished
while True:
remaining_partitions = self.check_completion(zookeeper, tools_path, assignfile.name)
remaining_partitions = self.check_completion(zookeeper, tools_path, extension, assignfile.name)
if remaining_partitions == 0:
break

Expand All @@ -85,9 +85,9 @@ def process_verify_match(self, line):
return 1
return 0

def check_completion(self, zookeeper, tools_path, assign_filename):
def check_completion(self, zookeeper, tools_path, extension, assign_filename):
FNULL = open(os.devnull, 'w')
proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--verify',
proc = subprocess.Popen(['{0}/kafka-reassign-partitions{1}'.format(tools_path, extension), '--verify',
'--zookeeper', zookeeper,
'--reassignment-json-file', assign_filename],
stdout=subprocess.PIPE, stderr=FNULL)
Expand Down
16 changes: 13 additions & 3 deletions kafka/tools/assigner/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,17 @@ def find_path_containing(fname):
raise ConfigurationException("Cannot find the Kafka admin utilities using PATH. Try using the --tools-path option")


def get_tools_path(tools_path=None):
def get_tools_path_with_ext(tools_path=None, executable_name='kafka-reassign-partitions'):
extension = ".sh"
try:
tools_path = get_tools_path(tools_path=tools_path, executable_name=executable_name)
return tools_path, ""
except ConfigurationException:
tools_path = get_tools_path(tools_path=tools_path, executable_name=(executable_name+extension))
return tools_path, extension


def get_tools_path(tools_path=None, executable_name='kafka-reassign-partitions.sh'):
"""
Find the Kafka admin utilities, either from the provided arg or the PATH.

Expand All @@ -58,12 +68,12 @@ def get_tools_path(tools_path=None):
:raises: ConfigurationException if the path cannot be determined
"""
if tools_path is not None:
script_file = os.path.join(tools_path, 'kafka-reassign-partitions.sh')
script_file = os.path.join(tools_path, executable_name)
if not is_exec_file(script_file):
raise ConfigurationException("--tools-path does not lead to the Kafka admin utilities ({0} is not an executable)".format(script_file))
return tools_path

return find_path_containing('kafka-reassign-partitions.sh')
return find_path_containing(executable_name)


def check_java_home():
Expand Down