diff --git a/PythonWrapper/CentralInterface.py b/PythonWrapper/CentralInterface.py deleted file mode 100644 index 866e0ec..0000000 --- a/PythonWrapper/CentralInterface.py +++ /dev/null @@ -1,25 +0,0 @@ -__author__ = 'mlampe' - -import sys -from Wrapper import GlSeqRun -from CommandsProcessor import CommandsProcessor - -# Command line args incoming~ -update_database = sys.argv[1] -prepare = sys.argv[2] -align = sys.argv[3] -count = sys.argv[4] -collect = sys.argv[5] -run_name = sys.argv[6] -protocol_id = sys.argv[7] -attribute_file_path = sys.argv[8] - -current_run = GlSeqRun(update_database, prepare, align, count, collect, run_name, protocol_id, attribute_file_path,"TRUE") - -# Initiates a run through the wrapper, and takes in all the output which is the various commands that would be run -commands = current_run.run() -# We get back a list of commands that are only cleaned up (Aka they could run if we just threw them at the shell) -# Let's parallize and break apart the commands in the CommandsProcessor -processor = CommandsProcessor(commands) -processor.handle() - diff --git a/PythonWrapper/Command.py b/PythonWrapper/Command.py index 81ccdac..dd53ba5 100644 --- a/PythonWrapper/Command.py +++ b/PythonWrapper/Command.py @@ -1,5 +1,46 @@ __author__ = 'mlampe' +from CommandFile import CommandFile +# Pydagman can be found at https://github.com/brandentimm/pydagman +from pydagman.job import Job + class Command(): - def __init__(self): - self.parent = parent + parallel_track = list() + job_count = 1 + def __init__(self,commands,parent_node): + self.parent_node = parent_node + Command.parallel_track.append(list()) + Command.parallel_track[self.parent_node].append(self) + # A list of commands that can be run in parallel and sequentially + self.commands = commands + self.associated_bash_files = list() + self.dag_jobs = list() + + # To string method I used for debugging of individual command nodes + def toString(self): + return("The command is " + str(self.commands) + "." + " This command has parent node of " + str(self.parent_node) + ".") + + # Creates the bash files that will be addedd to the job flow + def create_bash_files(self,run_name): + for command in self.commands: + new_command = CommandFile(run_name,command,self.parent_node) + bash_file = self.command_file = new_command.generate_bash_file() + self.associated_bash_files.append(bash_file) + + # Sets up dag jobs. + # Will need to add a controller in the future to optimize mem and cpu usage, as well as GPU. + # This will really help when we are mass scheduling batch jobs. + def create_dag_jobs(self,run_name): + for subcommand in range(0,len(self.commands)): + current_job = Job((run_name + "/" + run_name +".submit"),"JOB" + str(Command.job_count)) + current_job.add_var("mem","4G") + current_job.add_var("cpus","2") + current_job.add_var("execute","./" + run_name + "/" + self.associated_bash_files[subcommand]) + current_job.retry(2) + current_job.pre_skip("1") + # Still need to add parent interactions, which is done in the comm stack + Command.job_count = Command.job_count + 1 + self.dag_jobs.append(current_job) + for job in range(0,len(self.dag_jobs)): + if (job != 0): + self.dag_jobs[job].add_parent(self.dag_jobs[job-1]) diff --git a/PythonWrapper/CommandFile.py b/PythonWrapper/CommandFile.py index 997fa4e..1406a19 100644 --- a/PythonWrapper/CommandFile.py +++ b/PythonWrapper/CommandFile.py @@ -1,29 +1,48 @@ __author__ = 'mlampe' +from subprocess import Popen class CommandFile: file_count = 0 - def __init__(self,run_name,command_bundle): + def __init__(self,run_name,command_bundle,parent): # Keep track of how many command files we have CommandFile.file_count = CommandFile.file_count + 1 # Just make the files based on the number of their command and the run name because it is easy. - self.file_name = run_name + "_" + str(CommandFile.file_count) + self.run_name = run_name + self.file_name = run_name + "_" + str(CommandFile.file_count) + ".sh" # Should be a list of commands that the class command has prepared. # They will be strings that are just a single command - self.command_bundle = command_bundle + self.command_bundle = command_bundle.split(" && ") + # + self.parent = parent def generate_bash_file(self): - with open(self.file_name,"w") as command: + file = self.run_name + "/" + self.file_name + with open(file,"w") as command: # Bash header command.write("#!/bin/bash\n\n") # Index through the commands writing them as we go w/ error checking for x in range(0,len(self.command_bundle)): - command.write(self.command_bundle[x]) + # lstrip removes the leading white space which could be annoying + self.command_bundle[x] = self.command_bundle[x].replace("\"","") + command.write(self.command_bundle[x].lstrip()) command.write(self.error_checking(x,self.command_bundle[x])) + # Makes the newly created file an executable + Popen(self.create_executable(file)) + return self.file_name + def create_executable(self,sh_file): + command_list = list() + command_list.append("chmod") + # Allows for it to be executable on the file system + command_list.append("+x") + command_list.append(sh_file) + return command_list + + # Adds an error checking script to each command in the shell script def error_checking(self,command_number,command): error_check = "\n" - error_check = error_check + "if [[ $? -ne 0]]; then\n" - error_check = error_check + " echo \"Step " + str(command_number) + " failed. The command was \"" + command + "\" Exiting now.\" \n" + error_check = error_check + "if [[ $? -ne 0 ]]; then\n" + error_check = error_check + " echo \"Step " + str(command_number) + " failed. The command was " + command + " Exiting now.\" \n" error_check = error_check + " exit 1\n" error_check = error_check + "fi\n" error_check = error_check + "\n" diff --git a/PythonWrapper/CommandStack.py b/PythonWrapper/CommandStack.py index 90fc42d..24eea20 100644 --- a/PythonWrapper/CommandStack.py +++ b/PythonWrapper/CommandStack.py @@ -1,6 +1,89 @@ __author__ = 'mlampe' -class CommandStack: - # This will take one parallel command stream - def __init__(self): - x = 6 \ No newline at end of file +from Command import Command +# Pydagman can be found at https://github.com/brandentimm/pydagman +from pydagman.dagfile import Dagfile +from subprocess import Popen + +class Stack: + # This will take all the commands together and organize them into a command stack composed of command objects. + def __init__(self,command_list): + self.command_list = command_list + self.command_stack = list() + + self.group_keywords = [ + # The space is important for keeping them as just the linux commands + 'mkdir ', + 'cd ', + 'mv ', + 'cp ', + 'date ', + 'samtools ', + 'rm ', + 'echo ' + ] + + def create_stack(self,run_name): + # Connects steps that have a key word + # The last step is always either the end of this parallel command + # Or a larger step that is not a keyword in hopes of creating "larger" chunked steps. + for steps in self.command_list: + for step in steps: + for x in range (len(step) - 1 ,0,-1): + for word in self.group_keywords: + if (word in step[x - 1]): + if (x != 0): + step[x - 1] = step[x-1] + " && " + step[x] + step.pop(x) + # Exit after you find one because that's all good + break + for parent in range (0, len(self.command_list)): + for step in self.command_list[parent]: + command = Command(step,parent) + self.command_stack.append(command) + + for command in self.command_stack: + command.create_bash_files(run_name) + self.create_submit_file(run_name) + command.create_dag_jobs(run_name) + + def create_dag_workflow(self,run_name): + mydag = Dagfile() + + # Takes care of parallelizing parent child relationships on the graph + for x in range(0,len(Command.parallel_track)): + if (x != 0): + # Add all the previous jobs that are parents + for y in range(0,len(Command.parallel_track[x-1])): + # Add all the children + for z in range(0,len(Command.parallel_track[x])): + child_job = Command.parallel_track[x][z].dag_jobs[0] + parent_job = Command.parallel_track[x-1][y].dag_jobs[len(Command.parallel_track[x-1][y].dag_jobs)-1] + child_job.add_parent(parent_job) + # Put everything together in the Dagfile workflow + for command in self.command_stack: + for job in command.dag_jobs: + mydag.add_job(job) + self.dag_file = run_name + "/my_workflow.dag" + mydag.save(self.dag_file) + + def create_submit_file(self,run_name): + submit_file_name = str(run_name + "/" + run_name + ".submit") + with open(str(submit_file_name),'w') as submit_file: + submit_file.write("universe = vanilla\n") + submit_file.write("executable = $(execute)\n") + #submit_file.write("arguments = $(args)\n") + submit_file.write("log = job.log\n") + submit_file.write("out = job.out\n") + submit_file.write("err = job.err\n") + submit_file.write("request_memory = $(mem)\n") + submit_file.write("request_cpus = $(cpus)\n") + submit_file.write("queue\n") + self.submit_file = submit_file_name + + def submit(self): + submit_command = list() + submit_command.append("condor_submit_dag") + submit_command.append(self.dag_file) + Popen(submit_command) + diff --git a/PythonWrapper/CommandsProcessor.py b/PythonWrapper/CommandsProcessor.py index 5b1df3f..17088ee 100644 --- a/PythonWrapper/CommandsProcessor.py +++ b/PythonWrapper/CommandsProcessor.py @@ -1,82 +1,54 @@ __author__ = 'mlampe' import re -command = "command 1 && command2 & parallel1 && parallel2 && parallel3 &" class CommandsProcessor: def __init__(self,commands): # The list of all commands broken into the order they were sent self.commands = commands - # These are keywords that can be matched so we can better groups jobs - self.group_keywords = { - 'mkdir', - 'cd', - 'mv', - 'cp', - 'date', - 'samtools' - 'rm' - } - self.parallelize_keywords = { - 'HTSeq.scripts.count' - 'GLSeq.FeatureCounts.R' - 'RSEM' - 'cufflinks' - } - # Just kinda sounds cool when you call it # This takes all the commands and divides it into the order they were run and tries # to paralleize commands run together and group smaller commands def handle(self): # All the commands in the order they should appear - ordered_commands = list() + command_list = list() + # Splits the commands by parallel processes (&) for command in self.commands: - # Parallel commands are divided into another list, so that order is retained, but - # the ability to be run in parallel is easily identified. - parallel_list = self.parallelize_commands(command) - ordered_commands.append(parallel_list) - grouped_ordered_commands = list() - for command in ordered_commands: - grouped_ordered_commands.append(self.group_commands(command)) - - - def parallelize_commands(self,command): - command = command.split("&&") - parallel_commands = list() - single_command = list() - for comm in command: - if ("&" in comm): - # Split on parallelizer - parts = comm.split("&") - # Add the last command of the first to that command - if ("wait" not in parts[0]): - single_command.append(parts[0]) - # This command is finished, add it to the whole deal & remove empty strings - single_command = filter(bool,single_command) - parallel_commands.append(single_command) - # Clear it - single_command = list() - # Add the first part to the new list - if ("wait" not in parts[1]): - single_command.append(parts[1]) - else: - if ("wait" not in comm): - single_command.append(comm) - single_command = filter(bool,single_command) - parallel_commands.append(single_command) - # Remove empty strings - parallel_commands = filter(bool,parallel_commands) - return parallel_commands + command = self.split_background(command) + command = filter(bool,command) + command_list.append(command) + # Reassign and clear + self.commands = command_list + command_list = list() + for command in self.commands: + command = self.split_linked(command) + command = filter(bool,command) + command_list.append(command) + return command_list - def group_commands(self,command): - print command + # This breaks parts that have only one "&" into an array that will then be able to be run in parallel + def split_background(self,command): + re1='(?<!&)&(?!&)' # Matches only a single & that cannot be preceded or succeeded by another & + rg = re.compile(re1,re.IGNORECASE) + command_split = rg.split(command) + return command_split -# Command 1 needs to be run first -command1 = "command 1 && command2 & parallel1 && parallel2 && parallel3 &" -# Command 2 is run after -command2 = "comm2 && comm3 & wait" -together = list() -together.append(command1) -together.append(command2) -proc = CommandsProcessor(together) -proc.handle() \ No newline at end of file + # This breaks linked commands (&& or ;) into parts that are in sequence (In this same array) + def split_linked(self,command): + """ + I know this somewhat violates duck typing but it is an easy (And really readable) way to check if it is a + single command. Without this, iteration would cause individual string characters to be printed and the lack + of iteration would cause a regex error because you can't regex a list type. + """ + if type(command) is list: + split_list = list() + for c in command: + re1 = '&&|;' + rg = re.compile(re1,re.IGNORECASE) + split_list.append(rg.split(c)) + return split_list + else: + re1 = '&&|;' + rg = re.compile(re1,re.IGNORECASE) + command_split = rg.split(command) + return command_split \ No newline at end of file diff --git a/PythonWrapper/CondorGrapher.py b/PythonWrapper/CondorGrapher.py new file mode 100644 index 0000000..872f880 --- /dev/null +++ b/PythonWrapper/CondorGrapher.py @@ -0,0 +1,29 @@ +__author__ = 'mlampe' + +# Pygraph library +# Modified from https://github.com/pmatiello/python-graph +from pygraph.classes.digraph import digraph as graph + + +class GraphCondor: + def __init__(self,dag_file_name): + self.dag_file_name = dag_file_name + self.condor_graph = graph() + + def read_connections(self): + with open(self.dag_file_name,'r'): + for line in file: + if "PARENT" in line: + # Splits the line and the left space is the string "PARENT" + # with the right half being the value of that + split = line.split(" ") + parent = split[1] + child = split[3] + self.create_connection(parent,child) + def create_connection(self,parent,child): + # Adds the parent and child nodes if they don't exist already + [self.condor_graph.add_node(n) for n in (parent,child) if not self.condor_graph.has_node(n)] + # Adds the edge from parent to child + self.condor_graph.add_edge(parent,child) + + # Still need to add visualization \ No newline at end of file diff --git a/PythonWrapper/PyGLSeqWrapper.py b/PythonWrapper/PyGLSeqWrapper.py new file mode 100644 index 0000000..9e64331 --- /dev/null +++ b/PythonWrapper/PyGLSeqWrapper.py @@ -0,0 +1,49 @@ +__author__ = 'mlampe' + +import sys +import os +from Wrapper import GlSeqRun +from CommandsProcessor import CommandsProcessor +from CommandStack import Stack +# Command line args incoming~ +glseq_path = sys.argv[1] +update_database = sys.argv[2] +prepare = sys.argv[3] +align = sys.argv[4] +count = sys.argv[5] +collect = sys.argv[6] +run_name = sys.argv[7] +protocol_id = sys.argv[8] +attribute_file_path = sys.argv[9] + +current_run = GlSeqRun(glseq_path,update_database, prepare, align, count, collect, run_name, protocol_id, attribute_file_path,"TRUE") + +# Initiates a run through the wrapper, and takes in all the output which is the various commands that would be run +commands = current_run.run() +# We get back a list of commands that are only cleaned up (Aka they could run if we just threw them at the shell) +# Let's parallize and break apart the commands in the CommandsProcessor +# Command 1 needs to be run first + +# # Test Stuff +# command0 = "first" +# command1 = "\"mkdir \"I love cookies\" && mkdir 2 && mkdir 3\"" +# # Command 2 is run after +# command2 = "above && above2 ; above3" +# commands = list() +# commands.append(command0) +# commands.append(command1) +# commands.append(command2) + +if not os.path.exists(os.path.dirname(run_name + "/")): + os.makedirs(os.path.dirname(run_name + "/")) + +processor = CommandsProcessor(commands) +# This will break apart the commands in groups that can be parallelized while still being ordered. +ordered_commands = processor.handle() +command_stack = Stack(ordered_commands) +# This is temp +#run_name = "temp" +command_stack.create_stack(run_name) +command_stack.create_dag_workflow(run_name) +command_stack.submit() + diff --git a/PythonWrapper/Wrapper.py b/PythonWrapper/Wrapper.py index a0a0ac9..ab0c1fa 100644 --- a/PythonWrapper/Wrapper.py +++ b/PythonWrapper/Wrapper.py @@ -5,7 +5,8 @@ # GLSeq and receive the output command class GlSeqRun: - def __init__(self,update_database,prepare_data,align,count,collect,run_name,protocol_id,attribute_file_path,condor): + def __init__(self,glseq_path,update_database,prepare_data,align,count,collect,run_name,protocol_id,attribute_file_path,condor): + self.glseq_path = glseq_path self.update = update_database self.prepare = prepare_data self.align = align @@ -27,7 +28,7 @@ def run(self): def defineRunArgs(self): run = list() run.append("Rscript") - run.append("GLSeq.top.R") + run.append(self.glseq_path) run.append(self.update) run.append(self.prepare) run.append(self.align)