Skip to content

Commit

Permalink
Get poll to return task failure if job/log has been removed. (#6577)
Browse files Browse the repository at this point in the history
Get poll to return task failure if job/log has been removed.

---------

Co-authored-by: Oliver Sanders <[email protected]>
Co-authored-by: Ronnie Dutta <[email protected]>
  • Loading branch information
3 people authored Feb 18, 2025
1 parent 8c7b7be commit 21d18ba
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 62 deletions.
1 change: 1 addition & 0 deletions changes.d/6577.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug where if you prematurely deleted the job log directory, it would leave tasks permanently in the submitted or running states.
13 changes: 13 additions & 0 deletions cylc/flow/job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
from cylc.flow.parsec.OrderedDict import OrderedDict


JOB_FILES_REMOVED_MESSAGE = 'ERR_JOB_FILES_REMOVED'


class JobPollContext():
"""Context object for a job poll."""
CONTEXT_ATTRIBUTES = (
Expand Down Expand Up @@ -439,6 +442,16 @@ def _filter_submit_output(cls, st_file_path, job_runner, out, err):
def _jobs_poll_status_files(self, job_log_root, job_log_dir):
"""Helper 1 for self.jobs_poll(job_log_root, job_log_dirs)."""
ctx = JobPollContext(job_log_dir)
# If the log directory has been deleted prematurely, return a task
# failure and an explanation:
if not os.path.exists(os.path.join(job_log_root, ctx.job_log_dir)):
# The job may still be in the job runner and may yet succeed,
# but we assume it failed & exited because it's the best we
# can do as it is no longer possible to poll it.
ctx.run_status = 1
ctx.job_runner_exit_polled = 1
ctx.run_signal = JOB_FILES_REMOVED_MESSAGE
return ctx

Check warning on line 454 in cylc/flow/job_runner_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/job_runner_mgr.py#L451-L454

Added lines #L451 - L454 were not covered by tests
try:
with open(
os.path.join(job_log_root, ctx.job_log_dir, JOB_LOG_STATUS)
Expand Down
25 changes: 19 additions & 6 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
Optional,
Tuple,
Union,
cast,
)

from cylc.flow import LOG
Expand All @@ -60,7 +61,7 @@
is_remote_platform,
)
from cylc.flow.job_file import JobFileWriter
from cylc.flow.job_runner_mgr import JobPollContext
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE, JobPollContext
from cylc.flow.pathutil import get_remote_workflow_run_job_dir
from cylc.flow.platforms import (
get_host_from_platform,
Expand Down Expand Up @@ -864,7 +865,13 @@ def _poll_task_job_callback_255(self, workflow, itask, cmd_ctx, line):
)
self.poll_task_jobs(workflow, [itask])

def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
def _poll_task_job_callback(
self,
workflow: str,
itask: 'TaskProxy',
cmd_ctx: SubProcContext,
line: str,
):
"""Helper for _poll_task_jobs_callback, on one task job."""
ctx = SubProcContext(self.JOBS_POLL, None)
ctx.out = line
Expand All @@ -891,16 +898,21 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
log_lvl = DEBUG if (
itask.platform.get('communication method') == 'poll'
) else INFO

if jp_ctx.run_signal == JOB_FILES_REMOVED_MESSAGE:
LOG.error(

Check warning on line 903 in cylc/flow/task_job_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_job_mgr.py#L903

Added line #L903 was not covered by tests
f"platform: {itask.platform['name']} - job log directory "
f"{job_tokens.relative_id} no longer exists"
)

if jp_ctx.run_status == 1 and jp_ctx.run_signal in ["ERR", "EXIT"]:
# Failed normally
self.task_events_mgr.process_message(
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
elif jp_ctx.run_status == 1 and jp_ctx.job_runner_exit_polled == 1:
# Failed by a signal, and no longer in job runner
self.task_events_mgr.process_message(
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
self.task_events_mgr.process_message(
itask, log_lvl, FAIL_MESSAGE_PREFIX + jp_ctx.run_signal,
itask, log_lvl, f"{FAIL_MESSAGE_PREFIX}{jp_ctx.run_signal}",
jp_ctx.time_run_exit,
flag)
elif jp_ctx.run_status == 1: # noqa: SIM114
Expand Down Expand Up @@ -1288,7 +1300,8 @@ def _prep_submit_task_job(
workflow, itask, '(platform not defined)', exc)
return False
else:
itask.platform = platform # type: ignore[assignment]
# (platform is not None here as subshell eval has finished)
itask.platform = cast('dict', platform)
# Retry delays, needed for the try_num
self._set_retry_timers(itask, rtconfig)

Expand Down
55 changes: 10 additions & 45 deletions tests/flakyfunctional/cylc-poll/16-execution-time-limit.t
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Test execution time limit polling.
export REQUIRE_PLATFORM='loc:* comms:poll runner:background'
. "$(dirname "$0")/test_header"
#-------------------------------------------------------------------------------
set_test_number 4

set_test_number 5
create_test_global_config '' "
[platforms]
[[$CYLC_TEST_PLATFORM]]
Expand All @@ -28,51 +28,16 @@ create_test_global_config '' "
execution time limit polling intervals = PT5S
"
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
#-------------------------------------------------------------------------------

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play --reference-test -v --no-detach "${WORKFLOW_NAME}" --timestamp
#-------------------------------------------------------------------------------
# shellcheck disable=SC2317
cmp_times () {
# Test if the times $1 and $2 are within $3 seconds of each other.
python3 -u - "$@" <<'__PYTHON__'
import sys
from metomi.isodatetime.parsers import TimePointParser
parser = TimePointParser()
time_1 = parser.parse(sys.argv[1])
time_2 = parser.parse(sys.argv[2])
if abs((time_1 - time_2).get_seconds()) > int(sys.argv[3]):
sys.exit("abs(predicted - actual) > tolerance: %s" % sys.argv[1:])
__PYTHON__
}
time_offset () {
# Add an ISO8601 duration to an ISO8601 date-time.
python3 -u - "$@" <<'__PYTHON__'
import sys
from metomi.isodatetime.parsers import TimePointParser, DurationParser
print(
TimePointParser().parse(sys.argv[1]) + DurationParser().parse(sys.argv[2]))
__PYTHON__
}
#-------------------------------------------------------------------------------

LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log"
# Test logging of the "next job poll" message when task starts.
TEST_NAME="${TEST_NAME_BASE}-log-entry"
LINE="$(grep '\[1/foo.* execution timeout=None, polling intervals=' "${LOG}")"
run_ok "${TEST_NAME}" grep -q 'health: execution timeout=None, polling intervals=' <<< "${LINE}"
# Determine poll times.
PREDICTED_POLL_TIME=$(time_offset \
"$(cut -d ' ' -f 1 <<< "${LINE}")" \
"PT10S") # PT5S time limit + PT5S polling interval
ACTUAL_POLL_TIME=$(sed -n \
's|\(.*\) DEBUG - \[1/foo.* (polled)failed .*|\1|p' "${LOG}")

# Test execution timeout polling.
# Main loop is roughly 1 second, but integer rounding may give an apparent 2
# seconds delay, so set threshold as 2 seconds.
run_ok "${TEST_NAME_BASE}-poll-time" \
cmp_times "${PREDICTED_POLL_TIME}" "${ACTUAL_POLL_TIME}" '10'
#-------------------------------------------------------------------------------
log_scan "${TEST_NAME_BASE}-log" "${LOG}" 1 0 \
"\[1/foo/01:submitted\] => running" \
"\[1/foo/01:running\] poll now, (next in PT5S" \
"\[1/foo/01:running\] (polled)failed/XCPU"

purge
exit
12 changes: 2 additions & 10 deletions tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@
[runtime]
[[foo]]
platform = {{ environ['CYLC_TEST_PLATFORM'] }}
init-script = cylc__job__disable_fail_signals ERR EXIT
script = """
cylc__job__wait_cylc_message_started
# give it a while for the started message to get picked up by
# the scheduler
sleep 10
exit 1
"""
[[[job]]]
execution time limit = PT5S
script = sleep 20
execution time limit = PT10S
[[bar]]
38 changes: 37 additions & 1 deletion tests/integration/test_task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from contextlib import suppress
import json
import logging
from typing import Any as Fixture
from unittest.mock import Mock

from cylc.flow import CYLC_LOG
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import TASK_STATUS_RUNNING
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
)


async def test_run_job_cmd_no_hosts_error(
Expand Down Expand Up @@ -229,3 +235,33 @@ async def test_broadcast_platform_change(
assert schd.pool.get_tasks()[0].platform['name'] == 'foo'
# ... and that remote init failed because all hosts bad:
assert log_filter(regex=r"platform: foo .*\(no hosts were reachable\)")


async def test_poll_job_deleted_log_folder(
one_conf, flow, scheduler, start, log_filter
):
"""Capture a task error caused by polling finding the job log dir deleted.
https://github.com/cylc/cylc-flow/issues/6425
"""
response = {
'run_signal': JOB_FILES_REMOVED_MESSAGE,
'run_status': 1,
'job_runner_exit_polled': 1,
}
schd: Scheduler = scheduler(flow(one_conf))
async with start(schd):
itask = schd.pool.get_tasks()[0]
itask.submit_num = 1
job_id = itask.tokens.duplicate(job='01').relative_id
schd.task_job_mgr._poll_task_job_callback(
schd.workflow,
itask,
cmd_ctx=Mock(),
line=f'2025-02-13T12:08:30Z|{job_id}|{json.dumps(response)}',
)
assert itask.state(TASK_STATUS_FAILED)

assert log_filter(
logging.ERROR, f"job log directory {job_id} no longer exists"
)
78 changes: 78 additions & 0 deletions tests/unit/test_job_runner_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow.job_runner_mgr import (
JobRunnerManager, JOB_FILES_REMOVED_MESSAGE)

jrm = JobRunnerManager()


SAMPLE_STATUS = """
ignore me, I have no = sign
CYLC_JOB_RUNNER_NAME=pbs
CYLC_JOB_ID=2361713
CYLC_JOB_RUNNER_SUBMIT_TIME=2025-01-28T14:46:04Z
CYLC_JOB_PID=2361713
CYLC_JOB_INIT_TIME=2025-01-28T14:46:05Z
CYLC_MESSAGE=2025-01-28T14:46:05Z|INFO|sleep 31
CYLC_JOB_RUNNER_EXIT_POLLED=2025-01-28T14:46:08Z
CYLC_JOB_EXIT=SUCCEEDED
CYLC_JOB_EXIT_TIME=2025-01-28T14:46:38Z
"""


def test__job_poll_status_files(tmp_path):
"""Good Path: A valid job.status files exists"""
(tmp_path / 'sub').mkdir()
(tmp_path / 'sub' / 'job.status').write_text(SAMPLE_STATUS)
ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub')
assert ctx.job_runner_name == 'pbs'
assert ctx.job_id == '2361713'
assert ctx.job_runner_exit_polled == 1
assert ctx.pid == '2361713'
assert ctx.time_submit_exit == '2025-01-28T14:46:04Z'
assert ctx.time_run == '2025-01-28T14:46:05Z'
assert ctx.time_run_exit == '2025-01-28T14:46:38Z'
assert ctx.run_status == 0
assert ctx.messages == ['2025-01-28T14:46:05Z|INFO|sleep 31']


def test__job_poll_status_files_task_failed(tmp_path):
"""Good Path: A valid job.status files exists"""
(tmp_path / 'sub').mkdir()
(tmp_path / 'sub' / 'job.status').write_text("CYLC_JOB_EXIT=FOO")
ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub')
assert ctx.run_status == 1
assert ctx.run_signal == 'FOO'


def test__job_poll_status_files_deleted_logdir():
"""The log dir has been deleted whilst the task is still active.
Return the context with the message that the task has failed.
"""
ctx = jrm._jobs_poll_status_files('foo', 'bar')
assert ctx.run_signal == JOB_FILES_REMOVED_MESSAGE
assert ctx.run_status == 1
assert ctx.job_runner_exit_polled == 1


def test__job_poll_status_files_ioerror(tmp_path, capsys):
"""There is no readable file.
"""
(tmp_path / 'sub').mkdir()
jrm._jobs_poll_status_files(str(tmp_path), 'sub')
cap = capsys.readouterr()
assert '[Errno 2] No such file or directory' in cap.err

0 comments on commit 21d18ba

Please sign in to comment.