Skip to content

Commit

Permalink
several 1.0 changes
Browse files Browse the repository at this point in the history
- minor reformatting, version bump
- argparse-ified dSQBatch.py
- dSQAutopsy queries slurm/sacct now, not job status
- dSQAutopsy beter summarizes array status now
- added dSQ --supress-stats-file option
- fixed dSQ --submit option
- updated readme
  • Loading branch information
brevans committed May 7, 2020
1 parent 2d5d136 commit de7b003
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 87 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,8 @@ ENV/

# Rope project settings
.ropeproject

# dSQ files
job_*_status.tsv
dsq-*.sh
dsq-*.out
47 changes: 36 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ Required Arguments:
Optional Arguments:
-h, --help Show this help message and exit.
--version show program's version number and exit
--submit Submit the job array on the fly instead of creating a submission script.
--max-jobs number Maximum number of simultaneously running jobs from the job array.
--batch-file sub_script.sh
Name for batch script file. Defaults to dsq-jobfile-YYYY-MM-DD.sh
-J jobname, --job-name jobname
Name of your job array. Defaults to dsq-jobfile
--max-jobs number Maximum number of simultaneously running jobs from the job array.
-o fmt_string, --output fmt_string
Slurm output file pattern. There will be one file per line in your job file. To suppress slurm out files, set this to /dev/null. Defaults to dsq-jobfile-%A_%a-%N.out
--batch-file sub_script.sh
Name for batch script file. Defaults to dsq-jobfile-YYYY-MM-DD.sh
--status-dir dir Directory to save the job_jobid_status.tsv file to. Defaults to working directory.
--supress-stats-file Don't save job stats to job_jobid_status.tsv
--submit Submit the job array on the fly instead of creating a submission script.
```

In the example above, we want walltime of 10 minutes and memory=4GB per job. Our invocation would be:
Expand All @@ -75,7 +77,8 @@ Which will create a file called `dsq-joblist-yyyy-mm-dd.sh`, where the y, m, and
#SBATCH --job-name dsq-joblist
#SBATCH --mem-per-cpu 4g -t 10:00 --mail-type ALL

/ysm-gpfs/apps/software/dSQ/version/dSQBatch.py /path/to/my/joblist.txt
# DO NOT EDIT LINE BELOW
/path/to/dSQBatch.py --job-file /path/to/joblist.txt --status-dir /path/to/here
```

## Step 3: Submit Batch Script
Expand All @@ -86,7 +89,7 @@ sbatch dsq-joblist-yyyy-mm-dd.sh

## Manage Your dSQ Job

You can refer to any portion of your job with `jobid_index` syntax, or the entire array with its jobid. The index Dead Simple Queue uses **starts at zero**, so the 3rd line in your job file will have an index of 2\. You can also specify ranges.
You can refer to any portion of your job with `jobid_index` syntax, or the entire array with its jobid. The index Dead Simple Queue uses **starts at zero**, so the 3rd line in your job file will have an index of 2. You can also specify ranges.

``` bash
# to cancel job 4 for array job 14567
Expand All @@ -100,7 +103,7 @@ scancel 14567_[10-20]

You can monitor the status of your jobs in Slurm by using `squeue -u <netid>`.

dSQ creates a file named `job_jobid_status.tsv`, which will report the success or failure of each job as it finishes. Note this file will not contain information for any jobs that were canceled (e.g. by the user with scancel) before they began. This file contains details about the completed jobs in the following tab-separated columns:
dSQ creates a file named `job_jobid_status.tsv`, unless you suppress this output with `--supress-stats-file`. This file will report the success or failure of each job as it finishes. Note this file will not contain information for any jobs that were canceled (e.g. by the user with scancel) before they began. This file contains details about the completed jobs in the following tab-separated columns:

* Job_ID: the zero-based line number from your job file.
* Exit_Code: exit code returned from your job (non-zero number generally indicates a failed job).
Expand All @@ -112,14 +115,36 @@ dSQ creates a file named `job_jobid_status.tsv`, which will report the success o

## dSQAutopsy

Once the dSQ job is finished, you can use dSQAutopsy to create both a report of the run, as well as a new jobsfile that contains just the jobss that failed.
You can use dSQAutopsy or `dsqa` to create a simple report of the array of jobs, and a new jobsfile that contains just the jobs you want to re-run if you specify the original jobsfile. Options listed below

``` text
-j JOB_ID, --job-id JOB_ID
The Job ID of a running or completed dSQ Array
-f JOB_FILE, --job-file JOB_FILE
Job file, one job per line (not your job submission script).
-s STATES, --states STATES
Comma separated list of states to use for re-writing job file. Default: CANCELLED,NODE_FAIL,PREEMPTED
```
dsqa jobsfile.txt job_2629186_status.tsv

Asking for a simple report:

``` bash
dsqa -j 13233846
```

You can conveniently redirect the report and the failed jobss to separate files:
Produces one

``` text
State Summary for Array 13233846
State Num_Jobs Indices
----- -------- -------
COMPLETED 12 4,7-17
RUNNING 5 1-3,5-6
PREEMPTED 1 0
```
dsqa jobsfile.txt job_2629186_status.tsv > failedjobs.txt 2> report.txt

You can redirect the report and the failed jobs to separate files:

``` bash
dsqa jobsfile.txt -j 2629186 -f jobsfile.txt > re-run_jobs.txt 2> 2629186_report.txt
```
54 changes: 31 additions & 23 deletions dSQ.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
import re

__version__ = 0.96
__version__ = 1.0

def safe_fill(text, wrap_width):
if sys.__stdin__.isatty():
Expand Down Expand Up @@ -127,6 +127,9 @@ def format_range(jobnums):
metavar="dir",
nargs=1,
help="Directory to save the job_jobid_status.tsv file to. Defaults to working directory.")
optional_dsq.add_argument("--supress-stats-file",
action="store_true",
help="Don't save job stats to job_jobid_status.tsv")
optional_dsq.add_argument("--stdout",
action="store_true",
help=argparse.SUPPRESS)
Expand All @@ -148,6 +151,7 @@ def format_range(jobnums):
job_info["job_id_list"] = []
job_info["run_script"] = path.join(path.dirname(path.abspath(sys.argv[0])), "dSQBatch.py")
job_info["job_file_name"] = path.abspath(args.job_file[0].name)
job_info["job_file_arg"] = "--job-file {}".format(job_info["job_file_name"])
job_info["slurm_args"] = {}
job_info["user_slurm_args"] = " ".join(user_slurm_args)
job_info["job_file_no_ext"] = path.splitext(path.basename(job_info["job_file_name"]))[0]
Expand Down Expand Up @@ -175,7 +179,7 @@ def format_range(jobnums):

# make sure there are jobs to submit
if job_info["num_jobs"] == 0:
sys.stderr.write("No jobs found in {job_file_name}\n".format(**job_info))
print("No jobs found in {job_file_name}".format(**job_info), file=sys.stderr)
sys.exit(1)

# set output file format
Expand All @@ -185,13 +189,17 @@ def format_range(jobnums):
job_info["slurm_args"]["--output"] = "dsq-{job_file_no_ext}-%A_%{array_fmt_width}a-%N.out".format(**job_info)

# set ouput directory
if args.status_dir is not None:
job_info["status_dir"] = path.abspath(args.status_dir[0])
if args.supress_stats_file:
job_info["status_dir_arg"] = "--supress-stats-file"
else:
job_info["status_dir"] = path.abspath('./')
if not os.access(job_info["status_dir"], os.W_OK | os.X_OK):
sys.stderr.write("{status_dir} doesn't appear to be a writeable directory.\n".format(**job_info))
sys.exit(1)
if args.status_dir is not None:
job_info["status_dir"] = path.abspath(args.status_dir[0])
else:
job_info["status_dir"] = path.abspath("./")
if not os.access(job_info["status_dir"], os.W_OK | os.X_OK):
print("{status_dir} does not appear to be a writeable directory.".format(**job_info), file=sys.stderr)
sys.exit(1)
job_info["status_dir_arg"] = "--status-dir {}".format(job_info["status_dir"])

# set array range string
if job_info["max_jobs"] == None:
Expand All @@ -206,18 +214,6 @@ def format_range(jobnums):
else:
job_info["slurm_args"]["--job-name"] = "dsq-{job_file_no_ext}".format(**job_info)

# set batch script name
if args.stdout:
job_info["batch_script_out"] = sys.stdout
else:
try:
if args.batch_file is not None:
job_info["batch_script_out"] = open(args.batch_file[0], 'w')
else:
job_info["batch_script_out"] = open("dsq-{job_file_no_ext}-{today}.sh".format(**job_info), 'w')
except Exception as e:
print("Error: Couldn't open {batch_script_out} for writing. ".format(**job_info), e)

# submit or print the job script
if args.submit:

Expand All @@ -226,19 +222,31 @@ def format_range(jobnums):
for option, value in job_info["slurm_args"].items():
job_info["cli_args"] += " %s=%s" % (option, value)

cmd = "sbatch {cli_args} {user_slurm_args} {run_script} {job_file_name} {status_dir}".format(**job_info)
# print("submitting:\n {}".format(cmd))
cmd = "sbatch {cli_args} {user_slurm_args} {run_script} {job_file_arg} {status_dir_arg}".format(**job_info)
print("submitting:\n {}".format(cmd))
ret = call(cmd, shell=True)
sys.exit(ret)

else:
# set batch script name
if args.stdout:
job_info["batch_script_out"] = sys.stdout
else:
try:
if args.batch_file is not None:
job_info["batch_script_out"] = open(args.batch_file[0], "w")
else:
job_info["batch_script_out"] = open("dsq-{job_file_no_ext}-{today}.sh".format(**job_info), "w")
except Exception as e:
print("Error: Couldn't open {batch_script_out} for writing. ".format(**job_info), e)

print("#!/bin/bash", file=job_info["batch_script_out"])
for option, value in job_info["slurm_args"].items():
print("#SBATCH {} {}".format(option, value), file=job_info["batch_script_out"])
if len(job_info["user_slurm_args"]) > 0:
print("#SBATCH {user_slurm_args}".format(**job_info), file=job_info["batch_script_out"])
print("\n# DO NOT EDIT LINE BELOW".format(**job_info), file=job_info["batch_script_out"])
print("{run_script} {job_file_name} {status_dir}\n".format(**job_info), file=job_info["batch_script_out"])
print("{run_script} {job_file_arg} {status_dir_arg}\n".format(**job_info), file=job_info["batch_script_out"])
if not args.stdout:
print("Batch script generated. To submit your jobs, run:\n sbatch {}".format(job_info["batch_script_out"].name))

136 changes: 105 additions & 31 deletions dSQAutopsy.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,80 @@
#!/usr/bin/env python
from __future__ import print_function #to make printing stderr work cleanly
from __future__ import print_function
import sys
import argparse
from textwrap import fill
from subprocess import check_output
from subprocess import call, check_output
from collections import defaultdict
from itertools import groupby

__version__ = 0.96
__version__ = 1.0
array_state_header = ['JobID', 'State']
sacct_cmd = ['sacct',
'-o' + ','.join(array_state_header),
'-nXPj']
possible_states = ['BOOT_FAIL', 'CANCELLED', 'COMPLETED', 'DEADLINE', 'FAILED', 'NODE_FAIL', 'OUT_OF_MEMORY',
'PENDING', 'PREEMPTED', 'RUNNING', 'REQUEUED', 'RESIZING', 'REVOKED', 'SUSPENDED', 'TIMEOUT']

def collapse_ranges(i):
for a, b in groupby(enumerate(i), lambda pair: pair[1] - pair[0]):
b = list(b)
if b[0][1] == b[-1][1]:
yield '{}'.format(b[0][1])
else:
yield '{}-{}'.format(b[0][1], b[-1][1])

def safe_fill(text, wrap_width):
if sys.__stdin__.isatty():
return fill(text, wrap_width)
else:
return text

def get_state_status(jid, rerun_states):
state_summary = defaultdict(lambda: 0)
array_states = defaultdict(lambda: [])
state_summary_header = ['State', 'Num_Jobs', 'Indices']
reruns = []
sacct_cmd.append(job_id)
try:
sacct_output = check_output(sacct_cmd).decode().split('\n')
except Exception as e:
# give up if we hit an error
print("Error looking up job {}.".format(job_id), file=sys.stderr)
sys.exit(1)
column_lengths = dict(zip(state_summary_header, [len(x)+2 for x in state_summary_header]))
# if there's job info
if len(sacct_output)>=1:
for l in sacct_output:
split_line = l.split('|')
if len(split_line) == len(array_state_header):
line_dict = dict(zip(array_state_header,split_line))
state_summary[line_dict['State']]+=1
# track column widths for pretty printing
if len(line_dict['State'])+2 > column_lengths['State']:
column_lengths['State']=len(line_dict['State'])+2
if '_' in line_dict['JobID']:
# track array idx
array_id = int(line_dict['JobID'].split('_')[1])
array_states[line_dict['State']].append(array_id)
if line_dict['State'] in rerun_states:
# add them to the reruns list if desired
reruns.append(array_id)

for state in array_states:
array_states[state] = ','.join(collapse_ranges(sorted(array_states[state])))
if len(array_states[state])+2 > column_lengths['State']:
# track column widths for pretty printing
column_lengths['State'] = len(array_states[state])+2

print('State Summary for Array {}'.format(job_id), file=sys.stderr)
summary_template = '{{:<{}}}{{:^{}}}{{:<{}}}'.format(*[column_lengths[x] for x in state_summary_header])

print(summary_template.format(*state_summary_header), file=sys.stderr)
print(summary_template.format(*['-' * len(x) for x in state_summary_header]), file=sys.stderr)
for state in sorted(state_summary, key=state_summary.get, reverse=True):
print(summary_template.format(state, state_summary[state], array_states[state]), file=sys.stderr)
return reruns

# get terminal columns for wrapping
# Check if dSQ is being run interactively
if sys.__stdin__.isatty():
Expand All @@ -26,47 +87,60 @@ def safe_fill(text, wrap_width):

desc = """Dead Simple Queue Autopsy v{}
https://github.com/ycrc/dSQ
A helper script for analyzing the success state of your jobs after a dSQ run has completed. Specify the job file and the job_jobid_status.tsv file generated by the dSQ job and dSQAutopsy will print the jobs that didn't run or completed with non-zero exit codes. It will also report count of each to stderr.
A helper script for analyzing the state of your dSQ jobs and identifying which you want to re-run. Specify the Slurm Job ID to see the state status. If you would like to generate a new job file with the jobs that need re-running, also specify your original job file and optionally the statuses you want to re-run. You can then redirect the output to a new file.
For more in-depth job stats use sacct, seff, or seff-array.
Example usage:
dsqa -j 1111
dsqa -j 1243 -f jobs.txt -s NODE_FAIL,PREEMPTED > rerun_jobs.txt
""".format(__version__)
"\n".join([safe_fill(x, term_columns-1) for x in str.splitlines(desc)])
desc = "\n".join([safe_fill(x, term_columns-1) for x in str.splitlines(desc)])

# argument parsing
parser = argparse.ArgumentParser(description=desc,
usage='%(prog)s jobfile status.tsv',
usage='%(prog)s --job-id jobid [--job-file jobfile.txt [--states STATES] > new_jobs.txt]',
formatter_class=argparse.RawTextHelpFormatter,
prog=sys.argv[0])
parser.add_argument('-v','--version',
action='version',
version='%(prog)s {}'.format(__version__))
parser.add_argument('jobfile',
parser.add_argument('-j', '--job-id',
nargs=1,
required=True,
help='The Job ID of a running or completed dSQ Array')
parser.add_argument('-f', '--job-file',
nargs=1,
type=argparse.FileType('r'),
help='Job file, one job per line (not your job submission script).')
parser.add_argument('statusfile',
parser.add_argument('-s', '--states',
nargs=1,
type=argparse.FileType('r'),
help='The job_jobid_status.tsv file generated from your dSQ run.')
default='CANCELLED,NODE_FAIL,PREEMPTED',
help='Comma separated list of states to use for re-writing job file. Default: CANCELLED,NODE_FAIL,PREEMPTED')

args = parser.parse_args()
job_id = args.job_id[0]
rerun_states = []
for state in args.states.split(','):
if state in possible_states:
rerun_states.append(state)
else:
print('Unknown state: {}.'.format(state), file=sys.stderr)
print('Choose from {}.'.format(','.join(possible_states)), file=sys.stderr)
sys.exit(1)

try:
succeeded = set()
failed = set()
norun = set()
for l in args.statusfile[0]:
tid, exit_code, rest = l.split('\t',2)
if exit_code == "0":
succeeded.add(int(tid))
else:
failed.add(int(tid))

for i,l in enumerate(args.jobfile[0]):
if i not in succeeded:
if i not in failed:
norun.add(i)
print(l, end='')
print("Autopsy Job Report:\n{} succeeded\n{} failed\n{} didn't run.".format(len(succeeded), len(failed), len(norun)), file=sys.stderr)
except Exception as e:
print ("Something went wrong. Did you specify the right files?")
sys.exit(1)
print_reruns = False
if (args.job_file):
try:
job_file = open(args.job_file[0], 'r')
print_reruns = True
except Exception as e:
print('Could not open {}.'.format(args.job_file[0]), file=sys.stderr)
sys.exit(1)

reruns = get_state_status(job_id, rerun_states)
if print_reruns:
for i, line in enumerate(job_file):
if i in reruns:
print(line.rstrip())
Loading

0 comments on commit de7b003

Please sign in to comment.