Skip to content

Commit

Permalink
Merge pull request #62 from MichaelLampe/development
Browse files Browse the repository at this point in the history
Completed Condor enabled GLSeq Wrapper
  • Loading branch information
MichaelLampe committed Aug 4, 2015
2 parents 2e2a824 + 1e1dfa7 commit 7a16f18
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 106 deletions.
25 changes: 0 additions & 25 deletions PythonWrapper/CentralInterface.py

This file was deleted.

45 changes: 43 additions & 2 deletions PythonWrapper/Command.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,46 @@
__author__ = 'mlampe'

from CommandFile import CommandFile
# Pydagman can be found at https://github.com/brandentimm/pydagman
from pydagman.job import Job

class Command():
def __init__(self):
self.parent = parent
parallel_track = list()
job_count = 1
def __init__(self,commands,parent_node):
self.parent_node = parent_node
Command.parallel_track.append(list())
Command.parallel_track[self.parent_node].append(self)
# A list of commands that can be run in parallel and sequentially
self.commands = commands
self.associated_bash_files = list()
self.dag_jobs = list()

# To string method I used for debugging of individual command nodes
def toString(self):
return("The command is " + str(self.commands) + "." + " This command has parent node of " + str(self.parent_node) + ".")

# Creates the bash files that will be addedd to the job flow
def create_bash_files(self,run_name):
for command in self.commands:
new_command = CommandFile(run_name,command,self.parent_node)
bash_file = self.command_file = new_command.generate_bash_file()
self.associated_bash_files.append(bash_file)

# Sets up dag jobs.
# Will need to add a controller in the future to optimize mem and cpu usage, as well as GPU.
# This will really help when we are mass scheduling batch jobs.
def create_dag_jobs(self,run_name):
for subcommand in range(0,len(self.commands)):
current_job = Job((run_name + "/" + run_name +".submit"),"JOB" + str(Command.job_count))
current_job.add_var("mem","4G")
current_job.add_var("cpus","2")
current_job.add_var("execute","./" + run_name + "/" + self.associated_bash_files[subcommand])
current_job.retry(2)
current_job.pre_skip("1")
# Still need to add parent interactions, which is done in the comm stack
Command.job_count = Command.job_count + 1
self.dag_jobs.append(current_job)
for job in range(0,len(self.dag_jobs)):
if (job != 0):
self.dag_jobs[job].add_parent(self.dag_jobs[job-1])
33 changes: 26 additions & 7 deletions PythonWrapper/CommandFile.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,48 @@
__author__ = 'mlampe'

from subprocess import Popen
class CommandFile:
file_count = 0
def __init__(self,run_name,command_bundle):
def __init__(self,run_name,command_bundle,parent):
# Keep track of how many command files we have
CommandFile.file_count = CommandFile.file_count + 1
# Just make the files based on the number of their command and the run name because it is easy.
self.file_name = run_name + "_" + str(CommandFile.file_count)
self.run_name = run_name
self.file_name = run_name + "_" + str(CommandFile.file_count) + ".sh"
# Should be a list of commands that the class command has prepared.
# They will be strings that are just a single command
self.command_bundle = command_bundle
self.command_bundle = command_bundle.split(" && ")
#
self.parent = parent

def generate_bash_file(self):
with open(self.file_name,"w") as command:
file = self.run_name + "/" + self.file_name
with open(file,"w") as command:
# Bash header
command.write("#!/bin/bash\n\n")
# Index through the commands writing them as we go w/ error checking
for x in range(0,len(self.command_bundle)):
command.write(self.command_bundle[x])
# lstrip removes the leading white space which could be annoying
self.command_bundle[x] = self.command_bundle[x].replace("\"","")
command.write(self.command_bundle[x].lstrip())
command.write(self.error_checking(x,self.command_bundle[x]))
# Makes the newly created file an executable
Popen(self.create_executable(file))
return self.file_name

def create_executable(self,sh_file):
command_list = list()
command_list.append("chmod")
# Allows for it to be executable on the file system
command_list.append("+x")
command_list.append(sh_file)
return command_list

# Adds an error checking script to each command in the shell script
def error_checking(self,command_number,command):
error_check = "\n"
error_check = error_check + "if [[ $? -ne 0]]; then\n"
error_check = error_check + " echo \"Step " + str(command_number) + " failed. The command was \"" + command + "\" Exiting now.\" \n"
error_check = error_check + "if [[ $? -ne 0 ]]; then\n"
error_check = error_check + " echo \"Step " + str(command_number) + " failed. The command was " + command + " Exiting now.\" \n"
error_check = error_check + " exit 1\n"
error_check = error_check + "fi\n"
error_check = error_check + "\n"
Expand Down
91 changes: 87 additions & 4 deletions PythonWrapper/CommandStack.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,89 @@
__author__ = 'mlampe'

class CommandStack:
# This will take one parallel command stream
def __init__(self):
x = 6
from Command import Command
# Pydagman can be found at https://github.com/brandentimm/pydagman
from pydagman.dagfile import Dagfile
from subprocess import Popen

class Stack:
# This will take all the commands together and organize them into a command stack composed of command objects.
def __init__(self,command_list):
self.command_list = command_list
self.command_stack = list()

self.group_keywords = [
# The space is important for keeping them as just the linux commands
'mkdir ',
'cd ',
'mv ',
'cp ',
'date ',
'samtools ',
'rm ',
'echo '
]

def create_stack(self,run_name):
# Connects steps that have a key word
# The last step is always either the end of this parallel command
# Or a larger step that is not a keyword in hopes of creating "larger" chunked steps.
for steps in self.command_list:
for step in steps:
for x in range (len(step) - 1 ,0,-1):
for word in self.group_keywords:
if (word in step[x - 1]):
if (x != 0):
step[x - 1] = step[x-1] + " && " + step[x]
step.pop(x)
# Exit after you find one because that's all good
break
for parent in range (0, len(self.command_list)):
for step in self.command_list[parent]:
command = Command(step,parent)
self.command_stack.append(command)

for command in self.command_stack:
command.create_bash_files(run_name)
self.create_submit_file(run_name)
command.create_dag_jobs(run_name)

def create_dag_workflow(self,run_name):
mydag = Dagfile()

# Takes care of parallelizing parent child relationships on the graph
for x in range(0,len(Command.parallel_track)):
if (x != 0):
# Add all the previous jobs that are parents
for y in range(0,len(Command.parallel_track[x-1])):
# Add all the children
for z in range(0,len(Command.parallel_track[x])):
child_job = Command.parallel_track[x][z].dag_jobs[0]
parent_job = Command.parallel_track[x-1][y].dag_jobs[len(Command.parallel_track[x-1][y].dag_jobs)-1]
child_job.add_parent(parent_job)
# Put everything together in the Dagfile workflow
for command in self.command_stack:
for job in command.dag_jobs:
mydag.add_job(job)
self.dag_file = run_name + "/my_workflow.dag"
mydag.save(self.dag_file)

def create_submit_file(self,run_name):
submit_file_name = str(run_name + "/" + run_name + ".submit")
with open(str(submit_file_name),'w') as submit_file:
submit_file.write("universe = vanilla\n")
submit_file.write("executable = $(execute)\n")
#submit_file.write("arguments = $(args)\n")
submit_file.write("log = job.log\n")
submit_file.write("out = job.out\n")
submit_file.write("err = job.err\n")
submit_file.write("request_memory = $(mem)\n")
submit_file.write("request_cpus = $(cpus)\n")
submit_file.write("queue\n")
self.submit_file = submit_file_name

def submit(self):
submit_command = list()
submit_command.append("condor_submit_dag")
submit_command.append(self.dag_file)
Popen(submit_command)

104 changes: 38 additions & 66 deletions PythonWrapper/CommandsProcessor.py
Original file line number Diff line number Diff line change
@@ -1,82 +1,54 @@
__author__ = 'mlampe'

import re
command = "command 1 && command2 & parallel1 && parallel2 && parallel3 &"

class CommandsProcessor:
def __init__(self,commands):
# The list of all commands broken into the order they were sent
self.commands = commands
# These are keywords that can be matched so we can better groups jobs
self.group_keywords = {
'mkdir',
'cd',
'mv',
'cp',
'date',
'samtools'
'rm'
}
self.parallelize_keywords = {
'HTSeq.scripts.count'
'GLSeq.FeatureCounts.R'
'RSEM'
'cufflinks'
}

# Just kinda sounds cool when you call it
# This takes all the commands and divides it into the order they were run and tries
# to paralleize commands run together and group smaller commands
def handle(self):
# All the commands in the order they should appear
ordered_commands = list()
command_list = list()
# Splits the commands by parallel processes (&)
for command in self.commands:
# Parallel commands are divided into another list, so that order is retained, but
# the ability to be run in parallel is easily identified.
parallel_list = self.parallelize_commands(command)
ordered_commands.append(parallel_list)
grouped_ordered_commands = list()
for command in ordered_commands:
grouped_ordered_commands.append(self.group_commands(command))


def parallelize_commands(self,command):
command = command.split("&&")
parallel_commands = list()
single_command = list()
for comm in command:
if ("&" in comm):
# Split on parallelizer
parts = comm.split("&")
# Add the last command of the first to that command
if ("wait" not in parts[0]):
single_command.append(parts[0])
# This command is finished, add it to the whole deal & remove empty strings
single_command = filter(bool,single_command)
parallel_commands.append(single_command)
# Clear it
single_command = list()
# Add the first part to the new list
if ("wait" not in parts[1]):
single_command.append(parts[1])
else:
if ("wait" not in comm):
single_command.append(comm)
single_command = filter(bool,single_command)
parallel_commands.append(single_command)
# Remove empty strings
parallel_commands = filter(bool,parallel_commands)
return parallel_commands
command = self.split_background(command)
command = filter(bool,command)
command_list.append(command)
# Reassign and clear
self.commands = command_list
command_list = list()
for command in self.commands:
command = self.split_linked(command)
command = filter(bool,command)
command_list.append(command)
return command_list

def group_commands(self,command):
print command
# This breaks parts that have only one "&" into an array that will then be able to be run in parallel
def split_background(self,command):
re1='(?<!&)&(?!&)' # Matches only a single & that cannot be preceded or succeeded by another &
rg = re.compile(re1,re.IGNORECASE)
command_split = rg.split(command)
return command_split

# Command 1 needs to be run first
command1 = "command 1 && command2 & parallel1 && parallel2 && parallel3 &"
# Command 2 is run after
command2 = "comm2 && comm3 & wait"
together = list()
together.append(command1)
together.append(command2)
proc = CommandsProcessor(together)
proc.handle()
# This breaks linked commands (&& or ;) into parts that are in sequence (In this same array)
def split_linked(self,command):
"""
I know this somewhat violates duck typing but it is an easy (And really readable) way to check if it is a
single command. Without this, iteration would cause individual string characters to be printed and the lack
of iteration would cause a regex error because you can't regex a list type.
"""
if type(command) is list:
split_list = list()
for c in command:
re1 = '&&|;'
rg = re.compile(re1,re.IGNORECASE)
split_list.append(rg.split(c))
return split_list
else:
re1 = '&&|;'
rg = re.compile(re1,re.IGNORECASE)
command_split = rg.split(command)
return command_split
29 changes: 29 additions & 0 deletions PythonWrapper/CondorGrapher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
__author__ = 'mlampe'

# Pygraph library
# Modified from https://github.com/pmatiello/python-graph
from pygraph.classes.digraph import digraph as graph


class GraphCondor:
def __init__(self,dag_file_name):
self.dag_file_name = dag_file_name
self.condor_graph = graph()

def read_connections(self):
with open(self.dag_file_name,'r'):
for line in file:
if "PARENT" in line:
# Splits the line and the left space is the string "PARENT"
# with the right half being the value of that
split = line.split(" ")
parent = split[1]
child = split[3]
self.create_connection(parent,child)
def create_connection(self,parent,child):
# Adds the parent and child nodes if they don't exist already
[self.condor_graph.add_node(n) for n in (parent,child) if not self.condor_graph.has_node(n)]
# Adds the edge from parent to child
self.condor_graph.add_edge(parent,child)

# Still need to add visualization
Loading

0 comments on commit 7a16f18

Please sign in to comment.