diff --git a/py/desispec/scripts/proc_night.py b/py/desispec/scripts/proc_night.py index 5e571005a..768b2f832 100644 --- a/py/desispec/scripts/proc_night.py +++ b/py/desispec/scripts/proc_night.py @@ -36,7 +36,7 @@ set_calibrator_flag, make_exposure_prow, \ all_calibs_submitted, \ update_and_recursively_submit, update_accounted_for_with_linking -from desispec.workflow.queue import update_from_queue, any_jobs_failed, \ +from desispec.workflow.queue import update_from_queue, any_jobs_need_resubmission, \ get_resubmission_states from desispec.io.util import decode_camword, difference_camwords, \ create_camword, replace_prefix, erow_to_goodcamword, camword_union @@ -364,35 +364,37 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, ptable = update_from_queue(ptable, dry_run_level=dry_run_level) if dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') - if any_jobs_failed(ptable['STATUS']): + if any_jobs_need_resubmission(ptable['STATUS']): ## Try up to two times to resubmit failures, afterwards give up ## unless explicitly told to proceed with the failures ## Note after 2 resubmissions, the code won't resubmit anymore even ## if given ignore_proc_table_failures - if np.max([len(qids) for qids in ptable['ALL_QIDS']]) < 3: - log.info("Job failures were detected. Resubmitting those jobs " - + "before continuing with new submissions.") - - ptable, nsubmits = update_and_recursively_submit(ptable, - no_resub_failed=no_resub_failed, - ptab_name=proc_table_pathname, - dry_run_level=dry_run_level, - reservation=reservation) - elif not ignore_proc_table_failures: - err = "Some jobs have an incomplete job status. This script " \ - + "will not fix them. You should remedy those first. " - log.error(err) - ## if the failures are in calibrations, then crash since - ## we need them for any new jobs - if any_jobs_failed(ptable['STATUS'][ptable['CALIBRATOR'] > 0]): - err += "To proceed anyway use " - err += "'--ignore-proc-table-failures'. Exiting." - raise AssertionError(err) - else: - log.warning("Some jobs have an incomplete job status, but " - + "you entered '--ignore-proc-table-failures'. This " - + "script will not fix them. " - + "You should have fixed those first. Proceeding...") + log.info("Job failures were detected. Resubmitting those jobs " + + "before continuing with new submissions.") + + ptable, nsubmits = update_and_recursively_submit(ptable, + no_resub_failed=no_resub_failed, + max_resubs=2, + ptab_name=proc_table_pathname, + dry_run_level=dry_run_level, + reservation=reservation) + + if any_jobs_need_resubmission(ptable['STATUS']): + if not ignore_proc_table_failures: + err = "Some jobs have an incomplete job status. This script " \ + + "will not fix them. You should remedy those first. " + log.error(err) + ## if the failures are in calibrations, then crash since + ## we need them for any new jobs + if any_jobs_need_resubmission(ptable['STATUS'][ptable['CALIBRATOR'] > 0]): + err += "To proceed anyway use " + err += "'--ignore-proc-table-failures'. Exiting." + raise AssertionError(err) + else: + log.warning("Some jobs have an incomplete job status, but " + + "you entered '--ignore-proc-table-failures'. This " + + "script will not fix them. " + + "You should have fixed those first. Proceeding...") ## Short cut to exit faster if all science exposures have been processed ## but only if we have successfully processed the calibrations good_etab = etable[etable['LASTSTEP']=='all'] diff --git a/py/desispec/workflow/processing.py b/py/desispec/workflow/processing.py index 2759c9d40..6fba3eb7d 100644 --- a/py/desispec/workflow/processing.py +++ b/py/desispec/workflow/processing.py @@ -1166,7 +1166,8 @@ def all_calibs_submitted(accounted_for, do_cte_flats): return np.all(list(test_dict.values())) -def update_and_recursively_submit(proc_table, submits=0, resubmission_states=None, +def update_and_recursively_submit(proc_table, submits=0, max_resubs=100, + resubmission_states=None, no_resub_failed=False, ptab_name=None, dry_run_level=0, reservation=None): """ @@ -1178,6 +1179,7 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non Args: proc_table, Table, the processing table with a row per job. submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler. + max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100. resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a possible Slurm scheduler state, where you wish for jobs with that outcome to be resubmitted @@ -1221,16 +1223,19 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)} for rown in range(len(proc_table)): if proc_table['STATUS'][rown] in resubmission_states: - proc_table, submits = recursive_submit_failed(rown, proc_table, submits, - id_to_row_map, ptab_name, - resubmission_states, - reservation, dry_run_level) + proc_table, submits = recursive_submit_failed(rown=rown, proc_table=proc_table, + submits=submits, max_resubs=max_resubs, + id_to_row_map=id_to_row_map, + ptab_name=ptab_name, + resubmission_states=resubmission_states, + reservation=reservation, + dry_run_level=dry_run_level) proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level) return proc_table, submits -def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None, +def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, max_resubs=100, ptab_name=None, resubmission_states=None, reservation=None, dry_run_level=0): """ Given a row of a processing table and the full processing table, this resubmits the given job. @@ -1244,6 +1249,7 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= submits, int, the number of submissions made to the queue. Used for saving files and in not overloading the scheduler. id_to_row_map, dict, lookup dictionary where the keys are internal ids (INTID's) and the values are the row position in the processing table. + max_resubs, int, the number of times a job should be resubmitted before giving up. Default is very high at 100. ptab_name, str, the full pathname where the processing table should be saved. resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a possible Slurm scheduler state, where you wish for jobs with that @@ -1271,7 +1277,13 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= log = get_logger() row = proc_table[rown] log.info(f"Identified row {row['INTID']} as needing resubmission.") - log.info(f"{row['INTID']}: Expid(s): {row['EXPID']} Job: {row['JOBDESC']}") + log.info(f"{row['INTID']}: Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, Jobdesc={row['JOBDESC']}") + if len(proc_table['ALL_QIDS'][rown]) > max_resubs: + log.warning(f"Tileid={row['TILEID']}, Expid(s)={row['EXPID']}, " + + f"Jobdesc={row['JOBDESC']} has already been submitted " + + f"{max_resubs+1} times. Not resubmitting.") + proc_table['STATUS'][rown] = "MAX_RESUB" + return proc_table, submits if resubmission_states is None: resubmission_states = get_resubmission_states() ideps = proc_table['INT_DEP_IDS'][rown] diff --git a/py/desispec/workflow/queue.py b/py/desispec/workflow/queue.py index 8544c2d9e..0b0e6b57e 100644 --- a/py/desispec/workflow/queue.py +++ b/py/desispec/workflow/queue.py @@ -49,13 +49,13 @@ def get_resubmission_states(no_resub_failed=False): ## 'UNSUBMITTED' is default pipeline state for things not yet submitted ## 'DEP_NOT_SUBD' is set when resubmission can't proceed because a ## dependency has failed - resub_states = ['UNSUBMITTED', 'DEP_NOT_SUBD', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL', - 'OUT_OF_MEMORY', 'PREEMPTED', 'TIMEOUT', 'CANCELLED'] + resub_states = ['UNSUBMITTED', 'DEP_NOT_SUBD', 'MAX_RESUB', 'BOOT_FAIL', + 'DEADLINE', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED', + 'REVOKED', 'SUSPENDED', 'TIMEOUT', 'CANCELLED'] if not no_resub_failed: resub_states.append('FAILED') return resub_states - def get_termination_states(): """ Defines what Slurm job states that are final and aren't in question about needing resubmission. @@ -560,20 +560,17 @@ def any_jobs_not_complete(statuses, termination_states=None): termination_states = get_termination_states() return np.any([status not in termination_states for status in statuses]) -def any_jobs_failed(statuses, failed_states=None): +def any_jobs_need_resubmission(statuses, resub_states=None): """ Returns True if any of the job statuses in the input column of the - processing table, statuses, are not complete (as based on the list of - acceptable final states, termination_states, given as an argument. These - should be states that are viewed as final, as opposed to job states - that require resubmission. + processing table, statuses, are not in the resubmission states. Parameters ---------- statuses : Table.Column or list or np.array The statuses in the processing table "STATUS". Each element should be a string. - failed_states : list or np.array + resub_states : list or np.array Each element should be a string signifying a state that is returned by the Slurm scheduler that should be consider failing or problematic. @@ -584,9 +581,9 @@ def any_jobs_failed(statuses, failed_states=None): True if any of the statuses of the jobs given in statuses are a member of the failed_states. """ - if failed_states is None: - failed_states = get_failed_states() - return np.any([status in failed_states for status in statuses]) + if resub_states is None: + resub_states = get_resubmission_states(no_resub_failed=False) + return np.any(np.isin(statuses, resub_states)) def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0): """