diff --git a/cylc/flow/job_runner_mgr.py b/cylc/flow/job_runner_mgr.py index f9646e27f1..247a699f80 100644 --- a/cylc/flow/job_runner_mgr.py +++ b/cylc/flow/job_runner_mgr.py @@ -445,7 +445,11 @@ def _jobs_poll_status_files(self, job_log_root, job_log_dir): # If the log directory has been deleted prematurely, return a task # failure and an explanation: if not os.path.exists(os.path.join(job_log_root, ctx.job_log_dir)): + # The job may still be in the job runner and may yet succeed, + # but we assume it failed & exited because it's the best we + # can do as it is no longer possible to poll it. ctx.run_status = 1 + ctx.job_runner_exit_polled = 1 ctx.run_signal = JOB_FILES_REMOVED_MESSAGE return ctx try: diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 96af05b487..adcb875b7c 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -159,7 +159,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr, self.workflow = workflow self.proc_pool = proc_pool self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr - self.task_events_mgr = task_events_mgr + self.task_events_mgr: TaskEventsManager = task_events_mgr self.data_store_mgr = data_store_mgr self.job_file_writer = JobFileWriter() self.job_runner_mgr = self.job_file_writer.job_runner_mgr @@ -753,16 +753,6 @@ def _manip_task_jobs_callback( or (ctx.ret_code and ctx.ret_code != 255) ): LOG.error(ctx) - # A polling task lets us know that a task has failed because it's - # log folder has been deleted whilst the task was active: - if ( - getattr(ctx, 'out', None) - and JOB_FILES_REMOVED_MESSAGE in ctx.out - ): - LOG.error( - f'Task {ctx.cmd[-1]} failed because task log directory' - f'\n{"/".join(ctx.cmd[-2:])}\nhas been removed.' - ) # A dict for easy reference of (CYCLE, NAME, SUBMIT_NUM) -> TaskProxy # # Note for "reload": A TaskProxy instance may be replaced on reload, so @@ -845,7 +835,13 @@ def _poll_task_job_callback_255(self, workflow, itask, cmd_ctx, line): ) self.poll_task_jobs(workflow, [itask]) - def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): + def _poll_task_job_callback( + self, + workflow: str, + itask: 'TaskProxy', + cmd_ctx: SubProcContext, + line: str, + ): """Helper for _poll_task_jobs_callback, on one task job.""" ctx = SubProcContext(self.JOBS_POLL, None) ctx.out = line @@ -872,6 +868,13 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): log_lvl = DEBUG if ( itask.platform.get('communication method') == 'poll' ) else INFO + + if jp_ctx.run_signal == JOB_FILES_REMOVED_MESSAGE: + LOG.error( + f"platform: {itask.platform['name']} - job log directory " + f"{job_tokens.relative_id} no longer exists" + ) + if jp_ctx.run_status == 1 and jp_ctx.run_signal in ["ERR", "EXIT"]: # Failed normally self.task_events_mgr.process_message( @@ -879,9 +882,7 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): elif jp_ctx.run_status == 1 and jp_ctx.job_runner_exit_polled == 1: # Failed by a signal, and no longer in job runner self.task_events_mgr.process_message( - itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag) - self.task_events_mgr.process_message( - itask, log_lvl, FAIL_MESSAGE_PREFIX + jp_ctx.run_signal, + itask, log_lvl, f"{FAIL_MESSAGE_PREFIX}{jp_ctx.run_signal}", jp_ctx.time_run_exit, flag) elif jp_ctx.run_status == 1: # noqa: SIM114 diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py index 26b9355c0f..090a3540b4 100644 --- a/tests/integration/test_task_job_mgr.py +++ b/tests/integration/test_task_job_mgr.py @@ -15,14 +15,18 @@ # along with this program. If not, see . from contextlib import suppress +import json import logging -from types import SimpleNamespace from typing import Any as Fixture +from unittest.mock import Mock from cylc.flow import CYLC_LOG from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE from cylc.flow.scheduler import Scheduler -from cylc.flow.task_state import TASK_STATUS_RUNNING +from cylc.flow.task_state import ( + TASK_STATUS_FAILED, + TASK_STATUS_RUNNING, +) async def test_run_job_cmd_no_hosts_error( @@ -238,23 +242,30 @@ async def test_broadcast_platform_change( async def test_poll_job_deleted_log_folder( - one_conf, flow, scheduler, start, caplog + one_conf, flow, scheduler, start, log_filter ): """Capture a task error caused by polling finding the job log dir deleted. https://github.com/cylc/cylc-flow/issues/6425 """ - ctx = SimpleNamespace() - ctx.out = JOB_FILES_REMOVED_MESSAGE - ctx.ret_code = None - ctx.cmd = ['foo', 'bar'] - - schd = scheduler(flow(one_conf), run_mode='live', paused_start=False) + response = { + 'run_signal': JOB_FILES_REMOVED_MESSAGE, + 'run_status': 1, + 'job_runner_exit_polled': 1, + } + schd: Scheduler = scheduler(flow(one_conf)) async with start(schd): - schd.task_job_mgr._manip_task_jobs_callback(ctx, '', [], '') + itask = schd.pool.get_tasks()[0] + itask.submit_num = 1 + job_id = itask.tokens.duplicate(job='01').relative_id + schd.task_job_mgr._poll_task_job_callback( + schd.workflow, + itask, + cmd_ctx=Mock(), + line=f'2025-02-13T12:08:30Z|{job_id}|{json.dumps(response)}', + ) + assert itask.state(TASK_STATUS_FAILED) - assert ( - 'Task bar failed because task log directory' - '\nfoo/bar\nhas been removed.' - in caplog.messages + assert log_filter( + logging.ERROR, f"job log directory {job_id} no longer exists" ) diff --git a/tests/unit/test_job_runner_mgr.py b/tests/unit/test_job_runner_mgr.py index 2d6fc6e606..40c69f66dd 100644 --- a/tests/unit/test_job_runner_mgr.py +++ b/tests/unit/test_job_runner_mgr.py @@ -66,6 +66,7 @@ def test__job_poll_status_files_deleted_logdir(): ctx = jrm._jobs_poll_status_files('foo', 'bar') assert ctx.run_signal == JOB_FILES_REMOVED_MESSAGE assert ctx.run_status == 1 + assert ctx.job_runner_exit_polled == 1 def test__job_poll_status_files_ioerror(tmp_path, capsys): @@ -75,4 +76,3 @@ def test__job_poll_status_files_ioerror(tmp_path, capsys): jrm._jobs_poll_status_files(str(tmp_path), 'sub') cap = capsys.readouterr() assert '[Errno 2] No such file or directory' in cap.err -