-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathsjm.py
125 lines (107 loc) · 3.64 KB
/
sjm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""Wrapper for simple job manager (sjm)"""
import subprocess
import os.path
import sys
import random
import time
SJM_COMMAND = 'sjm'
class Job:
def __init__(self, name, commands, time=None, memory=None, queue=None, host=None, dependencies=None,modules = [], project=None, sched_options=None):
self.name = name
if not type(commands) is list:
self.commands = [commands,]
else:
self.commands = commands
self.time = time
self.memory = memory
self.modules = modules
self.queue = queue
self.host = host
if not dependencies:
self.dependencies = []
else:
self.dependencies = dependencies
self.project = project
self.sched_options = sched_options
def job_definition(self):
'''Returns a string with the job definition in sjm format'''
output = 'job_begin\n'
output += ' name %s\n' % self.name
if self.modules:
for mod in self.modules:
output += " module {mod}\n".format(mod=mod)
if self.time:
output += ' time %s\n' % str(self.time)
if self.memory:
output += ' memory %s\n' % str(self.memory)
if self.queue:
output += ' queue %s\n' % str(self.queue)
if self.host:
output += ' host %s\n' % str(self.host)
if self.project:
output += ' project %s\n' % str(self.project)
if len(self.commands) == 1:
output += ' cmd %s\n' % str(self.commands[0])
else:
output += ' cmd_begin\n'
output += ' %s\n' % ' &&'.join(self.commands)
output += ' cmd_end\n'
output += ' sched_options %s\n' % str("-A chipseq_scoring")
output += 'job_end\n'
return output
def add_dependency(self, job):
self.dependencies.append(job)
def add_module(module):
self.modules.append(module)
def dependency_definition(self):
'''Returns a string with the dependency definition in sjm format'''
output = ''
for job in self.dependencies:
output += 'order %s after %s\n' % (self.name, job.name)
return output
def __str__(self):
return self.name
class Submission:
def __init__(self, jobs, log_directory=None, notify=[]):
"""
Args : notify - list of one more more email addresses.
"""
self.jobs = jobs
self.log_directory = log_directory
if not notify:
raise Exception("the 'notify' argument is required when instantiating sjm.Submission!")
self.notify = notify
def build(self, job_description_file):
if not type(job_description_file) is file:
job_description_file = open(job_description_file, 'w')
for j in self.jobs:
job_description_file.write(j.job_definition())
for j in self.jobs:
job_description_file.write(j.dependency_definition())
if self.log_directory:
job_description_file.write('log_dir %s\n' % self.log_directory)
job_description_file.close()
return job_description_file
def run(self, job_description_file,foreground=False):
"""Run sjm jobs. Wait for sjm to complete if 'foreground' argument is set to True."""
job_description_file = self.build(job_description_file)
cmd = SJM_COMMAND
if foreground:
cmd += " -i"
if self.log_directory:
cmd += " -l %s " % os.path.join(self.log_directory, job_description_file.name + '.status.log')
for n in self.notify:
cmd += " --mail %s" % n
print("Running sjm command '%s %s\n'" % (cmd,job_description_file.name))
rand = str(random.random())
curTime = time.time()
ferr = open(str(curTime) + "_" + rand + "_sjm-run_stderr.txt","w")
try:
subprocess.check_call("%s %s" % (cmd, job_description_file.name), shell=True,stderr=ferr)
except subprocess.CalledProcessError as e:
msg = "Error with return code {retcode} while running command {cmd}:".format(retcode=e.returncode,cmd=e.cmd)
msg += open(ferr.name,'r').read()
sys.stderr.write(msg)
raise
finally:
os.remove(ferr.name)