-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathdSQBatch.py
executable file
·121 lines (102 loc) · 3.09 KB
/
dSQBatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/bin/env python
from datetime import datetime
from functools import partial
from os import path
from subprocess import Popen
import argparse
import os
import platform
import signal
import sys
import time
__version__ = 1.05
def forward_signal_to_child(pid, signum, frame):
print("[dSQ]: ", pid, signum, frame)
os.kill(pid, signum)
def exec_job(job_str):
process = Popen(job_str, shell=True)
signal.signal(signal.SIGCONT, partial(forward_signal_to_child, process.pid))
signal.signal(signal.SIGTERM, partial(forward_signal_to_child, process.pid))
return_code = process.wait()
return return_code
desc = """Dead Simple Queue Batch v{}
https://github.com/ycrc/dSQ
A wrapper script to run job arrays from job files, where each line in the plain-text file is a self-contained job. This script is usually called from a batch script generated by dsq.
""".format(
__version__
)
parser = argparse.ArgumentParser(
description=desc,
usage="%(prog)s --job-file jobfile.txt [--suppress-stats-file | --status-dir dir/ ]",
formatter_class=argparse.RawTextHelpFormatter,
prog=path.basename(sys.argv[0]),
)
parser.add_argument(
"-v", "--version", action="version", version="%(prog)s {}".format(__version__)
)
parser.add_argument(
"--job-file",
nargs=1,
help="Job file, one job per line (not your job submission script).",
)
parser.add_argument(
"--suppress-stats-file",
action="store_true",
help="Don't save job stats to job_jobid_status.tsv",
)
parser.add_argument(
"--status-dir",
metavar="dir",
nargs=1,
default=".",
help="Directory to save the job_jobid_status.tsv file to. Defaults to working directory.",
)
args = parser.parse_args()
job_file = args.job_file[0]
print_stats_file = True
if args.suppress_stats_file:
print_stats_file = False
else:
status_outdir = args.status_dir[0]
jid = int(os.environ.get("SLURM_ARRAY_JOB_ID"))
tid = int(os.environ.get("SLURM_ARRAY_TASK_ID"))
# slurm calls individual job array indices "tasks"
hostname = platform.node()
# use task_id to get my job out of job_file
with open(job_file, "r") as tf:
for i, l in enumerate(tf):
if i == tid:
mycmd = l.strip()
break
# run job and track its execution time
st = datetime.now()
ret = exec_job(mycmd)
et = datetime.now()
if print_stats_file:
# set up job stats
out_cols = [
"Array_Task_ID",
"Exit_Code",
"Hostname",
"T_Start",
"T_End",
"T_Elapsed",
"Task",
]
time_fmt = "%Y-%m-%d %H:%M:%S"
time_start = st.strftime(time_fmt)
time_end = et.strftime(time_fmt)
time_elapsed = (et - st).total_seconds()
out_dict = dict(
zip(out_cols, [tid, ret, hostname, time_start, time_end, time_elapsed, mycmd])
)
# append status file with job stats
with open(
path.join(status_outdir, "job_{}_status.tsv".format(jid)), "a"
) as out_status:
out_status.write(
"{Array_Task_ID}\t{Exit_Code}\t{Hostname}\t{T_Start}\t{T_End}\t{T_Elapsed:.02f}\t{Task}\n".format(
**out_dict
)
)
sys.exit(ret)