Skip to content

Commit

Permalink
change daily to resubmit all jobs twice before giving up unless a cal
Browse files Browse the repository at this point in the history
  • Loading branch information
akremin committed Nov 12, 2024
1 parent 02b28e9 commit b8b6881
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 45 deletions.
54 changes: 28 additions & 26 deletions py/desispec/scripts/proc_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
set_calibrator_flag, make_exposure_prow, \
all_calibs_submitted, \
update_and_recursively_submit, update_accounted_for_with_linking
from desispec.workflow.queue import update_from_queue, any_jobs_failed, \
from desispec.workflow.queue import update_from_queue, any_jobs_need_resubmission, \
get_resubmission_states
from desispec.io.util import decode_camword, difference_camwords, \
create_camword, replace_prefix, erow_to_goodcamword, camword_union
Expand Down Expand Up @@ -364,35 +364,37 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
ptable = update_from_queue(ptable, dry_run_level=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
if any_jobs_failed(ptable['STATUS']):
if any_jobs_need_resubmission(ptable['STATUS']):
## Try up to two times to resubmit failures, afterwards give up
## unless explicitly told to proceed with the failures
## Note after 2 resubmissions, the code won't resubmit anymore even
## if given ignore_proc_table_failures
if np.max([len(qids) for qids in ptable['ALL_QIDS']]) < 3:
log.info("Job failures were detected. Resubmitting those jobs "
+ "before continuing with new submissions.")

ptable, nsubmits = update_and_recursively_submit(ptable,
no_resub_failed=no_resub_failed,
ptab_name=proc_table_pathname,
dry_run_level=dry_run_level,
reservation=reservation)
elif not ignore_proc_table_failures:
err = "Some jobs have an incomplete job status. This script " \
+ "will not fix them. You should remedy those first. "
log.error(err)
## if the failures are in calibrations, then crash since
## we need them for any new jobs
if any_jobs_failed(ptable['STATUS'][ptable['CALIBRATOR'] > 0]):
err += "To proceed anyway use "
err += "'--ignore-proc-table-failures'. Exiting."
raise AssertionError(err)
else:
log.warning("Some jobs have an incomplete job status, but "
+ "you entered '--ignore-proc-table-failures'. This "
+ "script will not fix them. "
+ "You should have fixed those first. Proceeding...")
log.info("Job failures were detected. Resubmitting those jobs "
+ "before continuing with new submissions.")

ptable, nsubmits = update_and_recursively_submit(ptable,
no_resub_failed=no_resub_failed,
max_resubs=2,
ptab_name=proc_table_pathname,
dry_run_level=dry_run_level,
reservation=reservation)

if any_jobs_need_resubmission(ptable['STATUS']):
if not ignore_proc_table_failures:
err = "Some jobs have an incomplete job status. This script " \
+ "will not fix them. You should remedy those first. "
log.error(err)
## if the failures are in calibrations, then crash since
## we need them for any new jobs
if any_jobs_need_resubmission(ptable['STATUS'][ptable['CALIBRATOR'] > 0]):
err += "To proceed anyway use "
err += "'--ignore-proc-table-failures'. Exiting."
raise AssertionError(err)
else:
log.warning("Some jobs have an incomplete job status, but "
+ "you entered '--ignore-proc-table-failures'. This "
+ "script will not fix them. "
+ "You should have fixed those first. Proceeding...")
## Short cut to exit faster if all science exposures have been processed
## but only if we have successfully processed the calibrations
good_etab = etable[etable['LASTSTEP']=='all']
Expand Down
26 changes: 19 additions & 7 deletions py/desispec/workflow/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,8 @@ def all_calibs_submitted(accounted_for, do_cte_flats):

return np.all(list(test_dict.values()))

def update_and_recursively_submit(proc_table, submits=0, resubmission_states=None,
def update_and_recursively_submit(proc_table, submits=0, max_resubs=100,
resubmission_states=None,
no_resub_failed=False, ptab_name=None,
dry_run_level=0, reservation=None):
"""
Expand All @@ -1178,6 +1179,7 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non
Args:
proc_table, Table, the processing table with a row per job.
submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100.
resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
possible Slurm scheduler state, where you wish for jobs with that
outcome to be resubmitted
Expand Down Expand Up @@ -1221,16 +1223,19 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non
id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
for rown in range(len(proc_table)):
if proc_table['STATUS'][rown] in resubmission_states:
proc_table, submits = recursive_submit_failed(rown, proc_table, submits,
id_to_row_map, ptab_name,
resubmission_states,
reservation, dry_run_level)
proc_table, submits = recursive_submit_failed(rown=rown, proc_table=proc_table,
submits=submits, max_resubs=max_resubs,
id_to_row_map=id_to_row_map,
ptab_name=ptab_name,
resubmission_states=resubmission_states,
reservation=reservation,
dry_run_level=dry_run_level)

proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)

return proc_table, submits

def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None,
def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, max_resubs=100, ptab_name=None,
resubmission_states=None, reservation=None, dry_run_level=0):
"""
Given a row of a processing table and the full processing table, this resubmits the given job.
Expand All @@ -1244,6 +1249,7 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=
submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler.
id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position
in the processing table.
max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100.
ptab_name, str, the full pathname where the processing table should be saved.
resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a
possible Slurm scheduler state, where you wish for jobs with that
Expand Down Expand Up @@ -1271,7 +1277,13 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=
log = get_logger()
row = proc_table[rown]
log.info(f"Identified row {row['INTID']} as needing resubmission.")
log.info(f"{row['INTID']}: Expid(s): {row['EXPID']} Job: {row['JOBDESC']}")
log.info(f"{row['INTID']}: Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, Jobdesc={row['JOBDESC']}")
if len(proc_table['ALL_QIDS'][rown]) > max_resubs:
log.warning(f"Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, "
+ f"Jobdesc={row['JOBDESC']} has already been submitted "
+ f"{max_resubs+1} times. Not resubmitting.")
proc_table['STATUS'][rown] = "MAX_RESUB"
return proc_table, submits
if resubmission_states is None:
resubmission_states = get_resubmission_states()
ideps = proc_table['INT_DEP_IDS'][rown]
Expand Down
21 changes: 9 additions & 12 deletions py/desispec/workflow/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ def get_resubmission_states(no_resub_failed=False):
## 'UNSUBMITTED' is default pipeline state for things not yet submitted
## 'DEP_NOT_SUBD' is set when resubmission can't proceed because a
## dependency has failed
resub_states = ['UNSUBMITTED', 'DEP_NOT_SUBD', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL',
'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED']
resub_states = ['UNSUBMITTED', 'DEP_NOT_SUBD', 'MAX_RESUB', 'BOOT_FAIL',
'DEADLINE', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED',
'REVOKED', 'SUSPENDED', 'TIMEOUT', 'CANCELLED']
if not no_resub_failed:
resub_states.append('FAILED')
return resub_states


def get_termination_states():
"""
Defines what Slurm job states that are final and aren't in question about needing resubmission.
Expand Down Expand Up @@ -560,20 +560,17 @@ def any_jobs_not_complete(statuses, termination_states=None):
termination_states = get_termination_states()
return np.any([status not in termination_states for status in statuses])

def any_jobs_failed(statuses, failed_states=None):
def any_jobs_need_resubmission(statuses, resub_states=None):
"""
Returns True if any of the job statuses in the input column of the
processing table, statuses, are not complete (as based on the list of
acceptable final states, termination_states, given as an argument. These
should be states that are viewed as final, as opposed to job states
that require resubmission.
processing table, statuses, are not in the resubmission states.
Parameters
----------
statuses : Table.Column or list or np.array
The statuses in the
processing table "STATUS". Each element should be a string.
failed_states : list or np.array
resub_states : list or np.array
Each element should be a string
signifying a state that is returned by the Slurm scheduler that
should be consider failing or problematic.
Expand All @@ -584,9 +581,9 @@ def any_jobs_failed(statuses, failed_states=None):
True if any of the statuses of the jobs given in statuses are
a member of the failed_states.
"""
if failed_states is None:
failed_states = get_failed_states()
return np.any([status in failed_states for status in statuses])
if resub_states is None:
resub_states = get_resubmission_states(no_resub_failed=False)
return np.any(np.isin(statuses, resub_states))

def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0):
"""
Expand Down

0 comments on commit b8b6881

Please sign in to comment.