From 1e1dfa7d73ba3acb649d3b5adc0769b302f584c9 Mon Sep 17 00:00:00 2001 From: MichaelLampe Date: Tue, 4 Aug 2015 15:14:31 -0500 Subject: [PATCH] Completed Condor enabled GLSeq Wrapper This python program now interfaces with pydagman to construct, group, and run GLSeq2 pipelines. Of note, the GLSeq2 update that allows for this has note been pushed yet as it is still under testing, but preliminary tests indicate that running this wrapper's PyGLSeqWrapper with the normal arguments (Replacing Rscript with python PyGLSeqWrapper.py) will result in a condor enabled run. --- PythonWrapper/CentralInterface.py | 25 ------- PythonWrapper/Command.py | 45 ++++++++++++- PythonWrapper/CommandFile.py | 33 +++++++-- PythonWrapper/CommandStack.py | 91 +++++++++++++++++++++++-- PythonWrapper/CommandsProcessor.py | 104 +++++++++++------------------ PythonWrapper/CondorGrapher.py | 29 ++++++++ PythonWrapper/PyGLSeqWrapper.py | 49 ++++++++++++++ PythonWrapper/Wrapper.py | 5 +- 8 files changed, 275 insertions(+), 106 deletions(-) delete mode 100644 PythonWrapper/CentralInterface.py create mode 100644 PythonWrapper/CondorGrapher.py create mode 100644 PythonWrapper/PyGLSeqWrapper.py diff --git a/PythonWrapper/CentralInterface.py b/PythonWrapper/CentralInterface.py deleted file mode 100644 index 866e0ec..0000000 --- a/PythonWrapper/CentralInterface.py +++ /dev/null @@ -1,25 +0,0 @@ -__author__ = 'mlampe' - -import sys -from Wrapper import GlSeqRun -from CommandsProcessor import CommandsProcessor - -# Command line args incoming~ -update_database = sys.argv[1] -prepare = sys.argv[2] -align = sys.argv[3] -count = sys.argv[4] -collect = sys.argv[5] -run_name = sys.argv[6] -protocol_id = sys.argv[7] -attribute_file_path = sys.argv[8] - -current_run = GlSeqRun(update_database, prepare, align, count, collect, run_name, protocol_id, attribute_file_path,"TRUE") - -# Initiates a run through the wrapper, and takes in all the output which is the various commands that would be run -commands = current_run.run() -# We get back a list of commands that are only cleaned up (Aka they could run if we just threw them at the shell) -# Let's parallize and break apart the commands in the CommandsProcessor -processor = CommandsProcessor(commands) -processor.handle() - diff --git a/PythonWrapper/Command.py b/PythonWrapper/Command.py index 81ccdac..dd53ba5 100644 --- a/PythonWrapper/Command.py +++ b/PythonWrapper/Command.py @@ -1,5 +1,46 @@ __author__ = 'mlampe' +from CommandFile import CommandFile +# Pydagman can be found at https://github.com/brandentimm/pydagman +from pydagman.job import Job + class Command(): - def __init__(self): - self.parent = parent + parallel_track = list() + job_count = 1 + def __init__(self,commands,parent_node): + self.parent_node = parent_node + Command.parallel_track.append(list()) + Command.parallel_track[self.parent_node].append(self) + # A list of commands that can be run in parallel and sequentially + self.commands = commands + self.associated_bash_files = list() + self.dag_jobs = list() + + # To string method I used for debugging of individual command nodes + def toString(self): + return("The command is " + str(self.commands) + "." + " This command has parent node of " + str(self.parent_node) + ".") + + # Creates the bash files that will be addedd to the job flow + def create_bash_files(self,run_name): + for command in self.commands: + new_command = CommandFile(run_name,command,self.parent_node) + bash_file = self.command_file = new_command.generate_bash_file() + self.associated_bash_files.append(bash_file) + + # Sets up dag jobs. + # Will need to add a controller in the future to optimize mem and cpu usage, as well as GPU. + # This will really help when we are mass scheduling batch jobs. + def create_dag_jobs(self,run_name): + for subcommand in range(0,len(self.commands)): + current_job = Job((run_name + "/" + run_name +".submit"),"JOB" + str(Command.job_count)) + current_job.add_var("mem","4G") + current_job.add_var("cpus","2") + current_job.add_var("execute","./" + run_name + "/" + self.associated_bash_files[subcommand]) + current_job.retry(2) + current_job.pre_skip("1") + # Still need to add parent interactions, which is done in the comm stack + Command.job_count = Command.job_count + 1 + self.dag_jobs.append(current_job) + for job in range(0,len(self.dag_jobs)): + if (job != 0): + self.dag_jobs[job].add_parent(self.dag_jobs[job-1]) diff --git a/PythonWrapper/CommandFile.py b/PythonWrapper/CommandFile.py index 997fa4e..1406a19 100644 --- a/PythonWrapper/CommandFile.py +++ b/PythonWrapper/CommandFile.py @@ -1,29 +1,48 @@ __author__ = 'mlampe' +from subprocess import Popen class CommandFile: file_count = 0 - def __init__(self,run_name,command_bundle): + def __init__(self,run_name,command_bundle,parent): # Keep track of how many command files we have CommandFile.file_count = CommandFile.file_count + 1 # Just make the files based on the number of their command and the run name because it is easy. - self.file_name = run_name + "_" + str(CommandFile.file_count) + self.run_name = run_name + self.file_name = run_name + "_" + str(CommandFile.file_count) + ".sh" # Should be a list of commands that the class command has prepared. # They will be strings that are just a single command - self.command_bundle = command_bundle + self.command_bundle = command_bundle.split(" && ") + # + self.parent = parent def generate_bash_file(self): - with open(self.file_name,"w") as command: + file = self.run_name + "/" + self.file_name + with open(file,"w") as command: # Bash header command.write("#!/bin/bash\n\n") # Index through the commands writing them as we go w/ error checking for x in range(0,len(self.command_bundle)): - command.write(self.command_bundle[x]) + # lstrip removes the leading white space which could be annoying + self.command_bundle[x] = self.command_bundle[x].replace("\"","") + command.write(self.command_bundle[x].lstrip()) command.write(self.error_checking(x,self.command_bundle[x])) + # Makes the newly created file an executable + Popen(self.create_executable(file)) + return self.file_name + def create_executable(self,sh_file): + command_list = list() + command_list.append("chmod") + # Allows for it to be executable on the file system + command_list.append("+x") + command_list.append(sh_file) + return command_list + + # Adds an error checking script to each command in the shell script def error_checking(self,command_number,command): error_check = "\n" - error_check = error_check + "if [[ $? -ne 0]]; then\n" - error_check = error_check + " echo \"Step " + str(command_number) + " failed. The command was \"" + command + "\" Exiting now.\" \n" + error_check = error_check + "if [[ $? -ne 0 ]]; then\n" + error_check = error_check + " echo \"Step " + str(command_number) + " failed. The command was " + command + " Exiting now.\" \n" error_check = error_check + " exit 1\n" error_check = error_check + "fi\n" error_check = error_check + "\n" diff --git a/PythonWrapper/CommandStack.py b/PythonWrapper/CommandStack.py index 90fc42d..24eea20 100644 --- a/PythonWrapper/CommandStack.py +++ b/PythonWrapper/CommandStack.py @@ -1,6 +1,89 @@ __author__ = 'mlampe' -class CommandStack: - # This will take one parallel command stream - def __init__(self): - x = 6 \ No newline at end of file +from Command import Command +# Pydagman can be found at https://github.com/brandentimm/pydagman +from pydagman.dagfile import Dagfile +from subprocess import Popen + +class Stack: + # This will take all the commands together and organize them into a command stack composed of command objects. + def __init__(self,command_list): + self.command_list = command_list + self.command_stack = list() + + self.group_keywords = [ + # The space is important for keeping them as just the linux commands + 'mkdir ', + 'cd ', + 'mv ', + 'cp ', + 'date ', + 'samtools ', + 'rm ', + 'echo ' + ] + + def create_stack(self,run_name): + # Connects steps that have a key word + # The last step is always either the end of this parallel command + # Or a larger step that is not a keyword in hopes of creating "larger" chunked steps. + for steps in self.command_list: + for step in steps: + for x in range (len(step) - 1 ,0,-1): + for word in self.group_keywords: + if (word in step[x - 1]): + if (x != 0): + step[x - 1] = step[x-1] + " && " + step[x] + step.pop(x) + # Exit after you find one because that's all good + break + for parent in range (0, len(self.command_list)): + for step in self.command_list[parent]: + command = Command(step,parent) + self.command_stack.append(command) + + for command in self.command_stack: + command.create_bash_files(run_name) + self.create_submit_file(run_name) + command.create_dag_jobs(run_name) + + def create_dag_workflow(self,run_name): + mydag = Dagfile() + + # Takes care of parallelizing parent child relationships on the graph + for x in range(0,len(Command.parallel_track)): + if (x != 0): + # Add all the previous jobs that are parents + for y in range(0,len(Command.parallel_track[x-1])): + # Add all the children + for z in range(0,len(Command.parallel_track[x])): + child_job = Command.parallel_track[x][z].dag_jobs[0] + parent_job = Command.parallel_track[x-1][y].dag_jobs[len(Command.parallel_track[x-1][y].dag_jobs)-1] + child_job.add_parent(parent_job) + # Put everything together in the Dagfile workflow + for command in self.command_stack: + for job in command.dag_jobs: + mydag.add_job(job) + self.dag_file = run_name + "/my_workflow.dag" + mydag.save(self.dag_file) + + def create_submit_file(self,run_name): + submit_file_name = str(run_name + "/" + run_name + ".submit") + with open(str(submit_file_name),'w') as submit_file: + submit_file.write("universe = vanilla\n") + submit_file.write("executable = $(execute)\n") + #submit_file.write("arguments = $(args)\n") + submit_file.write("log = job.log\n") + submit_file.write("out = job.out\n") + submit_file.write("err = job.err\n") + submit_file.write("request_memory = $(mem)\n") + submit_file.write("request_cpus = $(cpus)\n") + submit_file.write("queue\n") + self.submit_file = submit_file_name + + def submit(self): + submit_command = list() + submit_command.append("condor_submit_dag") + submit_command.append(self.dag_file) + Popen(submit_command) + diff --git a/PythonWrapper/CommandsProcessor.py b/PythonWrapper/CommandsProcessor.py index 5b1df3f..17088ee 100644 --- a/PythonWrapper/CommandsProcessor.py +++ b/PythonWrapper/CommandsProcessor.py @@ -1,82 +1,54 @@ __author__ = 'mlampe' import re -command = "command 1 && command2 & parallel1 && parallel2 && parallel3 &" class CommandsProcessor: def __init__(self,commands): # The list of all commands broken into the order they were sent self.commands = commands - # These are keywords that can be matched so we can better groups jobs - self.group_keywords = { - 'mkdir', - 'cd', - 'mv', - 'cp', - 'date', - 'samtools' - 'rm' - } - self.parallelize_keywords = { - 'HTSeq.scripts.count' - 'GLSeq.FeatureCounts.R' - 'RSEM' - 'cufflinks' - } - # Just kinda sounds cool when you call it # This takes all the commands and divides it into the order they were run and tries # to paralleize commands run together and group smaller commands def handle(self): # All the commands in the order they should appear - ordered_commands = list() + command_list = list() + # Splits the commands by parallel processes (&) for command in self.commands: - # Parallel commands are divided into another list, so that order is retained, but - # the ability to be run in parallel is easily identified. - parallel_list = self.parallelize_commands(command) - ordered_commands.append(parallel_list) - grouped_ordered_commands = list() - for command in ordered_commands: - grouped_ordered_commands.append(self.group_commands(command)) - - - def parallelize_commands(self,command): - command = command.split("&&") - parallel_commands = list() - single_command = list() - for comm in command: - if ("&" in comm): - # Split on parallelizer - parts = comm.split("&") - # Add the last command of the first to that command - if ("wait" not in parts[0]): - single_command.append(parts[0]) - # This command is finished, add it to the whole deal & remove empty strings - single_command = filter(bool,single_command) - parallel_commands.append(single_command) - # Clear it - single_command = list() - # Add the first part to the new list - if ("wait" not in parts[1]): - single_command.append(parts[1]) - else: - if ("wait" not in comm): - single_command.append(comm) - single_command = filter(bool,single_command) - parallel_commands.append(single_command) - # Remove empty strings - parallel_commands = filter(bool,parallel_commands) - return parallel_commands + command = self.split_background(command) + command = filter(bool,command) + command_list.append(command) + # Reassign and clear + self.commands = command_list + command_list = list() + for command in self.commands: + command = self.split_linked(command) + command = filter(bool,command) + command_list.append(command) + return command_list - def group_commands(self,command): - print command + # This breaks parts that have only one "&" into an array that will then be able to be run in parallel + def split_background(self,command): + re1='(?