Skip to content

Commit

Permalink
Simplify poll handling of prematurely deleted job log dir
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Feb 14, 2025
1 parent 7385b58 commit 17156aa
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 30 deletions.
4 changes: 4 additions & 0 deletions cylc/flow/job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,11 @@ def _jobs_poll_status_files(self, job_log_root, job_log_dir):
# If the log directory has been deleted prematurely, return a task
# failure and an explanation:
if not os.path.exists(os.path.join(job_log_root, ctx.job_log_dir)):
# The job may still be in the job runner and may yet succeed,
# but we assume it failed & exited because it's the best we
# can do as it is no longer possible to poll it.
ctx.run_status = 1
ctx.job_runner_exit_polled = 1
ctx.run_signal = JOB_FILES_REMOVED_MESSAGE
return ctx
try:
Expand Down
31 changes: 16 additions & 15 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr,
self.workflow = workflow
self.proc_pool = proc_pool
self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr
self.task_events_mgr = task_events_mgr
self.task_events_mgr: TaskEventsManager = task_events_mgr
self.data_store_mgr = data_store_mgr
self.job_file_writer = JobFileWriter()
self.job_runner_mgr = self.job_file_writer.job_runner_mgr
Expand Down Expand Up @@ -753,16 +753,6 @@ def _manip_task_jobs_callback(
or (ctx.ret_code and ctx.ret_code != 255)
):
LOG.error(ctx)
# A polling task lets us know that a task has failed because it's
# log folder has been deleted whilst the task was active:
if (
getattr(ctx, 'out', None)
and JOB_FILES_REMOVED_MESSAGE in ctx.out
):
LOG.error(
f'Task {ctx.cmd[-1]} failed because task log directory'
f'\n{"/".join(ctx.cmd[-2:])}\nhas been removed.'
)
# A dict for easy reference of (CYCLE, NAME, SUBMIT_NUM) -> TaskProxy
#
# Note for "reload": A TaskProxy instance may be replaced on reload, so
Expand Down Expand Up @@ -845,7 +835,13 @@ def _poll_task_job_callback_255(self, workflow, itask, cmd_ctx, line):
)
self.poll_task_jobs(workflow, [itask])

def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
def _poll_task_job_callback(
self,
workflow: str,
itask: 'TaskProxy',
cmd_ctx: SubProcContext,
line: str,
):
"""Helper for _poll_task_jobs_callback, on one task job."""
ctx = SubProcContext(self.JOBS_POLL, None)
ctx.out = line
Expand All @@ -872,16 +868,21 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
log_lvl = DEBUG if (
itask.platform.get('communication method') == 'poll'
) else INFO

if jp_ctx.run_signal == JOB_FILES_REMOVED_MESSAGE:
LOG.error(
f"platform: {itask.platform['name']} - job log directory "
f"{job_tokens.relative_id} no longer exists"
)

if jp_ctx.run_status == 1 and jp_ctx.run_signal in ["ERR", "EXIT"]:
# Failed normally
self.task_events_mgr.process_message(
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
elif jp_ctx.run_status == 1 and jp_ctx.job_runner_exit_polled == 1:
# Failed by a signal, and no longer in job runner
self.task_events_mgr.process_message(
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
self.task_events_mgr.process_message(
itask, log_lvl, FAIL_MESSAGE_PREFIX + jp_ctx.run_signal,
itask, log_lvl, f"{FAIL_MESSAGE_PREFIX}{jp_ctx.run_signal}",
jp_ctx.time_run_exit,
flag)
elif jp_ctx.run_status == 1: # noqa: SIM114
Expand Down
39 changes: 25 additions & 14 deletions tests/integration/test_task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from contextlib import suppress
import json
import logging
from types import SimpleNamespace
from typing import Any as Fixture
from unittest.mock import Mock

from cylc.flow import CYLC_LOG
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import TASK_STATUS_RUNNING
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
)


async def test_run_job_cmd_no_hosts_error(
Expand Down Expand Up @@ -238,23 +242,30 @@ async def test_broadcast_platform_change(


async def test_poll_job_deleted_log_folder(
one_conf, flow, scheduler, start, caplog
one_conf, flow, scheduler, start, log_filter
):
"""Capture a task error caused by polling finding the job log dir deleted.
https://github.com/cylc/cylc-flow/issues/6425
"""
ctx = SimpleNamespace()
ctx.out = JOB_FILES_REMOVED_MESSAGE
ctx.ret_code = None
ctx.cmd = ['foo', 'bar']

schd = scheduler(flow(one_conf), run_mode='live', paused_start=False)
response = {
'run_signal': JOB_FILES_REMOVED_MESSAGE,
'run_status': 1,
'job_runner_exit_polled': 1,
}
schd: Scheduler = scheduler(flow(one_conf))
async with start(schd):
schd.task_job_mgr._manip_task_jobs_callback(ctx, '', [], '')
itask = schd.pool.get_tasks()[0]
itask.submit_num = 1
job_id = itask.tokens.duplicate(job='01').relative_id
schd.task_job_mgr._poll_task_job_callback(
schd.workflow,
itask,
cmd_ctx=Mock(),
line=f'2025-02-13T12:08:30Z|{job_id}|{json.dumps(response)}',
)
assert itask.state(TASK_STATUS_FAILED)

assert (
'Task bar failed because task log directory'
'\nfoo/bar\nhas been removed.'
in caplog.messages
assert log_filter(
logging.ERROR, f"job log directory {job_id} no longer exists"
)
2 changes: 1 addition & 1 deletion tests/unit/test_job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def test__job_poll_status_files_deleted_logdir():
ctx = jrm._jobs_poll_status_files('foo', 'bar')
assert ctx.run_signal == JOB_FILES_REMOVED_MESSAGE
assert ctx.run_status == 1
assert ctx.job_runner_exit_polled == 1


def test__job_poll_status_files_ioerror(tmp_path, capsys):
Expand All @@ -75,4 +76,3 @@ def test__job_poll_status_files_ioerror(tmp_path, capsys):
jrm._jobs_poll_status_files(str(tmp_path), 'sub')
cap = capsys.readouterr()
assert '[Errno 2] No such file or directory' in cap.err

0 comments on commit 17156aa

Please sign in to comment.