diff --git a/.gitignore b/.gitignore index 72364f9..e0d9c22 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,8 @@ ENV/ # Rope project settings .ropeproject + +# dSQ files +job_*_status.tsv +dsq-*.sh +dsq-*.out \ No newline at end of file diff --git a/README.md b/README.md index da31248..f316dbf 100644 --- a/README.md +++ b/README.md @@ -50,14 +50,16 @@ Required Arguments: Optional Arguments: -h, --help Show this help message and exit. --version show program's version number and exit - --submit Submit the job array on the fly instead of creating a submission script. - --max-jobs number Maximum number of simultaneously running jobs from the job array. + --batch-file sub_script.sh + Name for batch script file. Defaults to dsq-jobfile-YYYY-MM-DD.sh -J jobname, --job-name jobname Name of your job array. Defaults to dsq-jobfile + --max-jobs number Maximum number of simultaneously running jobs from the job array. -o fmt_string, --output fmt_string Slurm output file pattern. There will be one file per line in your job file. To suppress slurm out files, set this to /dev/null. Defaults to dsq-jobfile-%A_%a-%N.out - --batch-file sub_script.sh - Name for batch script file. Defaults to dsq-jobfile-YYYY-MM-DD.sh + --status-dir dir Directory to save the job_jobid_status.tsv file to. Defaults to working directory. + --supress-stats-file Don't save job stats to job_jobid_status.tsv + --submit Submit the job array on the fly instead of creating a submission script. ``` In the example above, we want walltime of 10 minutes and memory=4GB per job. Our invocation would be: @@ -75,7 +77,8 @@ Which will create a file called `dsq-joblist-yyyy-mm-dd.sh`, where the y, m, and #SBATCH --job-name dsq-joblist #SBATCH --mem-per-cpu 4g -t 10:00 --mail-type ALL -/ysm-gpfs/apps/software/dSQ/version/dSQBatch.py /path/to/my/joblist.txt +# DO NOT EDIT LINE BELOW +/path/to/dSQBatch.py --job-file /path/to/joblist.txt --status-dir /path/to/here ``` ## Step 3: Submit Batch Script @@ -86,7 +89,7 @@ sbatch dsq-joblist-yyyy-mm-dd.sh ## Manage Your dSQ Job -You can refer to any portion of your job with `jobid_index` syntax, or the entire array with its jobid. The index Dead Simple Queue uses **starts at zero**, so the 3rd line in your job file will have an index of 2\. You can also specify ranges. +You can refer to any portion of your job with `jobid_index` syntax, or the entire array with its jobid. The index Dead Simple Queue uses **starts at zero**, so the 3rd line in your job file will have an index of 2. You can also specify ranges. ``` bash # to cancel job 4 for array job 14567 @@ -100,7 +103,7 @@ scancel 14567_[10-20] You can monitor the status of your jobs in Slurm by using `squeue -u `. -dSQ creates a file named `job_jobid_status.tsv`, which will report the success or failure of each job as it finishes. Note this file will not contain information for any jobs that were canceled (e.g. by the user with scancel) before they began. This file contains details about the completed jobs in the following tab-separated columns: +dSQ creates a file named `job_jobid_status.tsv`, unless you suppress this output with `--supress-stats-file`. This file will report the success or failure of each job as it finishes. Note this file will not contain information for any jobs that were canceled (e.g. by the user with scancel) before they began. This file contains details about the completed jobs in the following tab-separated columns: * Job_ID: the zero-based line number from your job file. * Exit_Code: exit code returned from your job (non-zero number generally indicates a failed job). @@ -112,14 +115,36 @@ dSQ creates a file named `job_jobid_status.tsv`, which will report the success o ## dSQAutopsy -Once the dSQ job is finished, you can use dSQAutopsy to create both a report of the run, as well as a new jobsfile that contains just the jobss that failed. +You can use dSQAutopsy or `dsqa` to create a simple report of the array of jobs, and a new jobsfile that contains just the jobs you want to re-run if you specify the original jobsfile. Options listed below +``` text + -j JOB_ID, --job-id JOB_ID + The Job ID of a running or completed dSQ Array + -f JOB_FILE, --job-file JOB_FILE + Job file, one job per line (not your job submission script). + -s STATES, --states STATES + Comma separated list of states to use for re-writing job file. Default: CANCELLED,NODE_FAIL,PREEMPTED ``` -dsqa jobsfile.txt job_2629186_status.tsv + +Asking for a simple report: + +``` bash +dsqa -j 13233846 ``` -You can conveniently redirect the report and the failed jobss to separate files: +Produces one +``` text +State Summary for Array 13233846 +State Num_Jobs Indices +----- -------- ------- +COMPLETED 12 4,7-17 +RUNNING 5 1-3,5-6 +PREEMPTED 1 0 ``` -dsqa jobsfile.txt job_2629186_status.tsv > failedjobs.txt 2> report.txt + +You can redirect the report and the failed jobs to separate files: + +``` bash +dsqa jobsfile.txt -j 2629186 -f jobsfile.txt > re-run_jobs.txt 2> 2629186_report.txt ``` diff --git a/dSQ.py b/dSQ.py index 6c473be..d478884 100755 --- a/dSQ.py +++ b/dSQ.py @@ -10,7 +10,7 @@ import sys import re -__version__ = 0.96 +__version__ = 1.0 def safe_fill(text, wrap_width): if sys.__stdin__.isatty(): @@ -127,6 +127,9 @@ def format_range(jobnums): metavar="dir", nargs=1, help="Directory to save the job_jobid_status.tsv file to. Defaults to working directory.") +optional_dsq.add_argument("--supress-stats-file", + action="store_true", + help="Don't save job stats to job_jobid_status.tsv") optional_dsq.add_argument("--stdout", action="store_true", help=argparse.SUPPRESS) @@ -148,6 +151,7 @@ def format_range(jobnums): job_info["job_id_list"] = [] job_info["run_script"] = path.join(path.dirname(path.abspath(sys.argv[0])), "dSQBatch.py") job_info["job_file_name"] = path.abspath(args.job_file[0].name) +job_info["job_file_arg"] = "--job-file {}".format(job_info["job_file_name"]) job_info["slurm_args"] = {} job_info["user_slurm_args"] = " ".join(user_slurm_args) job_info["job_file_no_ext"] = path.splitext(path.basename(job_info["job_file_name"]))[0] @@ -175,7 +179,7 @@ def format_range(jobnums): # make sure there are jobs to submit if job_info["num_jobs"] == 0: - sys.stderr.write("No jobs found in {job_file_name}\n".format(**job_info)) + print("No jobs found in {job_file_name}".format(**job_info), file=sys.stderr) sys.exit(1) # set output file format @@ -185,13 +189,17 @@ def format_range(jobnums): job_info["slurm_args"]["--output"] = "dsq-{job_file_no_ext}-%A_%{array_fmt_width}a-%N.out".format(**job_info) # set ouput directory -if args.status_dir is not None: - job_info["status_dir"] = path.abspath(args.status_dir[0]) +if args.supress_stats_file: + job_info["status_dir_arg"] = "--supress-stats-file" else: - job_info["status_dir"] = path.abspath('./') -if not os.access(job_info["status_dir"], os.W_OK | os.X_OK): - sys.stderr.write("{status_dir} doesn't appear to be a writeable directory.\n".format(**job_info)) - sys.exit(1) + if args.status_dir is not None: + job_info["status_dir"] = path.abspath(args.status_dir[0]) + else: + job_info["status_dir"] = path.abspath("./") + if not os.access(job_info["status_dir"], os.W_OK | os.X_OK): + print("{status_dir} does not appear to be a writeable directory.".format(**job_info), file=sys.stderr) + sys.exit(1) + job_info["status_dir_arg"] = "--status-dir {}".format(job_info["status_dir"]) # set array range string if job_info["max_jobs"] == None: @@ -206,18 +214,6 @@ def format_range(jobnums): else: job_info["slurm_args"]["--job-name"] = "dsq-{job_file_no_ext}".format(**job_info) -# set batch script name -if args.stdout: - job_info["batch_script_out"] = sys.stdout -else: - try: - if args.batch_file is not None: - job_info["batch_script_out"] = open(args.batch_file[0], 'w') - else: - job_info["batch_script_out"] = open("dsq-{job_file_no_ext}-{today}.sh".format(**job_info), 'w') - except Exception as e: - print("Error: Couldn't open {batch_script_out} for writing. ".format(**job_info), e) - # submit or print the job script if args.submit: @@ -226,19 +222,31 @@ def format_range(jobnums): for option, value in job_info["slurm_args"].items(): job_info["cli_args"] += " %s=%s" % (option, value) - cmd = "sbatch {cli_args} {user_slurm_args} {run_script} {job_file_name} {status_dir}".format(**job_info) - # print("submitting:\n {}".format(cmd)) + cmd = "sbatch {cli_args} {user_slurm_args} {run_script} {job_file_arg} {status_dir_arg}".format(**job_info) + print("submitting:\n {}".format(cmd)) ret = call(cmd, shell=True) sys.exit(ret) else: + # set batch script name + if args.stdout: + job_info["batch_script_out"] = sys.stdout + else: + try: + if args.batch_file is not None: + job_info["batch_script_out"] = open(args.batch_file[0], "w") + else: + job_info["batch_script_out"] = open("dsq-{job_file_no_ext}-{today}.sh".format(**job_info), "w") + except Exception as e: + print("Error: Couldn't open {batch_script_out} for writing. ".format(**job_info), e) + print("#!/bin/bash", file=job_info["batch_script_out"]) for option, value in job_info["slurm_args"].items(): print("#SBATCH {} {}".format(option, value), file=job_info["batch_script_out"]) if len(job_info["user_slurm_args"]) > 0: print("#SBATCH {user_slurm_args}".format(**job_info), file=job_info["batch_script_out"]) print("\n# DO NOT EDIT LINE BELOW".format(**job_info), file=job_info["batch_script_out"]) - print("{run_script} {job_file_name} {status_dir}\n".format(**job_info), file=job_info["batch_script_out"]) + print("{run_script} {job_file_arg} {status_dir_arg}\n".format(**job_info), file=job_info["batch_script_out"]) if not args.stdout: print("Batch script generated. To submit your jobs, run:\n sbatch {}".format(job_info["batch_script_out"].name)) diff --git a/dSQAutopsy.py b/dSQAutopsy.py index 6078dc2..fcc5b83 100755 --- a/dSQAutopsy.py +++ b/dSQAutopsy.py @@ -1,12 +1,27 @@ #!/usr/bin/env python -from __future__ import print_function #to make printing stderr work cleanly +from __future__ import print_function import sys import argparse from textwrap import fill -from subprocess import check_output +from subprocess import call, check_output +from collections import defaultdict +from itertools import groupby -__version__ = 0.96 +__version__ = 1.0 +array_state_header = ['JobID', 'State'] +sacct_cmd = ['sacct', + '-o' + ','.join(array_state_header), + '-nXPj'] +possible_states = ['BOOT_FAIL', 'CANCELLED', 'COMPLETED', 'DEADLINE', 'FAILED', 'NODE_FAIL', 'OUT_OF_MEMORY', + 'PENDING', 'PREEMPTED', 'RUNNING', 'REQUEUED', 'RESIZING', 'REVOKED', 'SUSPENDED', 'TIMEOUT'] +def collapse_ranges(i): + for a, b in groupby(enumerate(i), lambda pair: pair[1] - pair[0]): + b = list(b) + if b[0][1] == b[-1][1]: + yield '{}'.format(b[0][1]) + else: + yield '{}-{}'.format(b[0][1], b[-1][1]) def safe_fill(text, wrap_width): if sys.__stdin__.isatty(): @@ -14,6 +29,52 @@ def safe_fill(text, wrap_width): else: return text +def get_state_status(jid, rerun_states): + state_summary = defaultdict(lambda: 0) + array_states = defaultdict(lambda: []) + state_summary_header = ['State', 'Num_Jobs', 'Indices'] + reruns = [] + sacct_cmd.append(job_id) + try: + sacct_output = check_output(sacct_cmd).decode().split('\n') + except Exception as e: + # give up if we hit an error + print("Error looking up job {}.".format(job_id), file=sys.stderr) + sys.exit(1) + column_lengths = dict(zip(state_summary_header, [len(x)+2 for x in state_summary_header])) + # if there's job info + if len(sacct_output)>=1: + for l in sacct_output: + split_line = l.split('|') + if len(split_line) == len(array_state_header): + line_dict = dict(zip(array_state_header,split_line)) + state_summary[line_dict['State']]+=1 + # track column widths for pretty printing + if len(line_dict['State'])+2 > column_lengths['State']: + column_lengths['State']=len(line_dict['State'])+2 + if '_' in line_dict['JobID']: + # track array idx + array_id = int(line_dict['JobID'].split('_')[1]) + array_states[line_dict['State']].append(array_id) + if line_dict['State'] in rerun_states: + # add them to the reruns list if desired + reruns.append(array_id) + + for state in array_states: + array_states[state] = ','.join(collapse_ranges(sorted(array_states[state]))) + if len(array_states[state])+2 > column_lengths['State']: + # track column widths for pretty printing + column_lengths['State'] = len(array_states[state])+2 + + print('State Summary for Array {}'.format(job_id), file=sys.stderr) + summary_template = '{{:<{}}}{{:^{}}}{{:<{}}}'.format(*[column_lengths[x] for x in state_summary_header]) + + print(summary_template.format(*state_summary_header), file=sys.stderr) + print(summary_template.format(*['-' * len(x) for x in state_summary_header]), file=sys.stderr) + for state in sorted(state_summary, key=state_summary.get, reverse=True): + print(summary_template.format(state, state_summary[state], array_states[state]), file=sys.stderr) + return reruns + # get terminal columns for wrapping # Check if dSQ is being run interactively if sys.__stdin__.isatty(): @@ -26,47 +87,60 @@ def safe_fill(text, wrap_width): desc = """Dead Simple Queue Autopsy v{} https://github.com/ycrc/dSQ -A helper script for analyzing the success state of your jobs after a dSQ run has completed. Specify the job file and the job_jobid_status.tsv file generated by the dSQ job and dSQAutopsy will print the jobs that didn't run or completed with non-zero exit codes. It will also report count of each to stderr. +A helper script for analyzing the state of your dSQ jobs and identifying which you want to re-run. Specify the Slurm Job ID to see the state status. If you would like to generate a new job file with the jobs that need re-running, also specify your original job file and optionally the statuses you want to re-run. You can then redirect the output to a new file. + +For more in-depth job stats use sacct, seff, or seff-array. + +Example usage: + +dsqa -j 1111 +dsqa -j 1243 -f jobs.txt -s NODE_FAIL,PREEMPTED > rerun_jobs.txt """.format(__version__) -"\n".join([safe_fill(x, term_columns-1) for x in str.splitlines(desc)]) +desc = "\n".join([safe_fill(x, term_columns-1) for x in str.splitlines(desc)]) # argument parsing parser = argparse.ArgumentParser(description=desc, - usage='%(prog)s jobfile status.tsv', + usage='%(prog)s --job-id jobid [--job-file jobfile.txt [--states STATES] > new_jobs.txt]', formatter_class=argparse.RawTextHelpFormatter, prog=sys.argv[0]) parser.add_argument('-v','--version', action='version', version='%(prog)s {}'.format(__version__)) -parser.add_argument('jobfile', +parser.add_argument('-j', '--job-id', + nargs=1, + required=True, + help='The Job ID of a running or completed dSQ Array') +parser.add_argument('-f', '--job-file', nargs=1, - type=argparse.FileType('r'), help='Job file, one job per line (not your job submission script).') -parser.add_argument('statusfile', +parser.add_argument('-s', '--states', nargs=1, - type=argparse.FileType('r'), - help='The job_jobid_status.tsv file generated from your dSQ run.') + default='CANCELLED,NODE_FAIL,PREEMPTED', + help='Comma separated list of states to use for re-writing job file. Default: CANCELLED,NODE_FAIL,PREEMPTED') args = parser.parse_args() +job_id = args.job_id[0] +rerun_states = [] +for state in args.states.split(','): + if state in possible_states: + rerun_states.append(state) + else: + print('Unknown state: {}.'.format(state), file=sys.stderr) + print('Choose from {}.'.format(','.join(possible_states)), file=sys.stderr) + sys.exit(1) -try: - succeeded = set() - failed = set() - norun = set() - for l in args.statusfile[0]: - tid, exit_code, rest = l.split('\t',2) - if exit_code == "0": - succeeded.add(int(tid)) - else: - failed.add(int(tid)) - - for i,l in enumerate(args.jobfile[0]): - if i not in succeeded: - if i not in failed: - norun.add(i) - print(l, end='') - print("Autopsy Job Report:\n{} succeeded\n{} failed\n{} didn't run.".format(len(succeeded), len(failed), len(norun)), file=sys.stderr) -except Exception as e: - print ("Something went wrong. Did you specify the right files?") - sys.exit(1) +print_reruns = False +if (args.job_file): + try: + job_file = open(args.job_file[0], 'r') + print_reruns = True + except Exception as e: + print('Could not open {}.'.format(args.job_file[0]), file=sys.stderr) + sys.exit(1) + +reruns = get_state_status(job_id, rerun_states) +if print_reruns: + for i, line in enumerate(job_file): + if i in reruns: + print(line.rstrip()) \ No newline at end of file diff --git a/dSQBatch.py b/dSQBatch.py index dc9e208..d6bc739 100755 --- a/dSQBatch.py +++ b/dSQBatch.py @@ -4,14 +4,12 @@ import time import signal import platform +import argparse from functools import partial from subprocess import Popen from datetime import datetime -""" -This script accepts the name of a job file and a directory to save status info to. Meant to be called with submission scripts generated with dSQ.py. -""" - +__version__ = 1.0 def forward_signal_to_child(pid, signum, frame): print("[dSQ]: ", pid, signum, frame) @@ -24,10 +22,39 @@ def exec_job(job_str): return_code = process.wait() return(return_code) -# jobfile is a path to text file with jobs, 1 per line -jobfile = sys.argv[1] -# statusfile output directory -status_outdir = sys.argv[2] +desc = """Dead Simple Queue Batch v{} +https://github.com/ycrc/dSQ +A wrapper script to run job arrays from job files, where each line in the plain-text file is a self-contained job. This script is usually called from a batch script generated by dsq. + +""".format(__version__) + +parser = argparse.ArgumentParser(description=desc, + usage='%(prog)s --job-file jobfile.txt [--supress-stats-file | --status-dir dir/ ]', + formatter_class=argparse.RawTextHelpFormatter, + prog=sys.argv[0]) +parser.add_argument('-v','--version', + action='version', + version='%(prog)s {}'.format(__version__)) +parser.add_argument('--job-file', + nargs=1, + help='Job file, one job per line (not your job submission script).') +parser.add_argument("--supress-stats-file", + action="store_true", + help="Don't save job stats to job_jobid_status.tsv") +parser.add_argument("--status-dir", + metavar="dir", + nargs=1, + default=".", + help="Directory to save the job_jobid_status.tsv file to. Defaults to working directory.") +args = parser.parse_args() + +job_file = args.job_file[0] + +print_stats_file = True +if args.supress_stats_file: + print_stats_file = False +else: + status_outdir = args.status_dir[0] jid = int(os.environ.get("SLURM_ARRAY_JOB_ID")) @@ -36,8 +63,8 @@ def exec_job(job_str): hostname = platform.node() -# use task_id to get my job out of jobfile -with open(jobfile, 'r') as tf: +# use task_id to get my job out of job_file +with open(job_file, 'r') as tf: for i, l in enumerate(tf): if i == tid: mycmd=l.strip() @@ -48,17 +75,18 @@ def exec_job(job_str): ret = exec_job(mycmd) et = datetime.now() -# set up job stats -out_cols = ["Array_Task_ID", "Exit_Code", "Hostname", "T_Start", "T_End", "T_Elapsed", "Task"] -time_fmt = "%Y-%m-%d %H:%M:%S" -time_start = st.strftime(time_fmt) -time_end = et.strftime(time_fmt) -time_elapsed = (et-st).total_seconds() -out_dict = dict(zip(out_cols, - [tid, ret, hostname, time_start, time_end, time_elapsed, mycmd])) - -# append status file with job stats -with open(os.path.join(status_outdir, "job_{}_status.tsv".format(jid)), "a") as out_status: - out_status.write("{Array_Task_ID}\t{Exit_Code}\t{Hostname}\t{T_Start}\t{T_End}\t{T_Elapsed:.02f}\t{Task}\n".format(**out_dict)) +if print_stats_file: + # set up job stats + out_cols = ["Array_Task_ID", "Exit_Code", "Hostname", "T_Start", "T_End", "T_Elapsed", "Task"] + time_fmt = "%Y-%m-%d %H:%M:%S" + time_start = st.strftime(time_fmt) + time_end = et.strftime(time_fmt) + time_elapsed = (et-st).total_seconds() + out_dict = dict(zip(out_cols, + [tid, ret, hostname, time_start, time_end, time_elapsed, mycmd])) + + # append status file with job stats + with open(os.path.join(status_outdir, "job_{}_status.tsv".format(jid)), "a") as out_status: + out_status.write("{Array_Task_ID}\t{Exit_Code}\t{Hostname}\t{T_Start}\t{T_End}\t{T_Elapsed:.02f}\t{Task}\n".format(**out_dict)) sys.exit(ret) diff --git a/examples/simple_test.sh b/examples/simple_test.sh new file mode 100755 index 0000000..5331b36 --- /dev/null +++ b/examples/simple_test.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +#SBATCH --partition=admintest +#SBATCH --job-name=test_dsq +#SBATCH --ntasks=1 --nodes=1 +#SBATCH -t 05:00 + +python2 ./dSQ.py +python2 ./dSQ.py --job-file sleepyjobs.txt +python2 ./dSQ.py --stdout --job-file sleepyjobs.txt +python2 ./dSQ.py --help + +module load miniconda +source activate py3 +python3 ./dSQ.py +python3 ./dSQ.py --job-file sleepyjobs.txt +python3 ./dSQ.py --stdout --job-file sleepyjobs.txt +python3 ./dSQ.py --help