Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify poll handling of prematurely deleted job log dir #72

Merged
merged 3 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cylc/flow/job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,11 @@ def _jobs_poll_status_files(self, job_log_root, job_log_dir):
# If the log directory has been deleted prematurely, return a task
# failure and an explanation:
if not os.path.exists(os.path.join(job_log_root, ctx.job_log_dir)):
# The job may still be in the job runner and may yet succeed,
# but we assume it failed & exited because it's the best we
# can do as it is no longer possible to poll it.
ctx.run_status = 1
ctx.job_runner_exit_polled = 1
ctx.run_signal = JOB_FILES_REMOVED_MESSAGE
return ctx
try:
Expand Down
35 changes: 19 additions & 16 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
Optional,
Tuple,
Union,
cast,
)

from cylc.flow import LOG
Expand Down Expand Up @@ -159,7 +160,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr,
self.workflow = workflow
self.proc_pool = proc_pool
self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr
self.task_events_mgr = task_events_mgr
self.task_events_mgr: TaskEventsManager = task_events_mgr
self.data_store_mgr = data_store_mgr
self.job_file_writer = JobFileWriter()
self.job_runner_mgr = self.job_file_writer.job_runner_mgr
Expand Down Expand Up @@ -753,16 +754,6 @@ def _manip_task_jobs_callback(
or (ctx.ret_code and ctx.ret_code != 255)
):
LOG.error(ctx)
# A polling task lets us know that a task has failed because it's
# log folder has been deleted whilst the task was active:
if (
getattr(ctx, 'out', None)
and JOB_FILES_REMOVED_MESSAGE in ctx.out
):
LOG.error(
f'Task {ctx.cmd[-1]} failed because task log directory'
f'\n{"/".join(ctx.cmd[-2:])}\nhas been removed.'
)
Comment on lines -756 to -765
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved this error logging into the _poll_task_job_callback method seeing as it only applies for poll, not kill or submit which also use this _manip_task_jobs_callback method

# A dict for easy reference of (CYCLE, NAME, SUBMIT_NUM) -> TaskProxy
#
# Note for "reload": A TaskProxy instance may be replaced on reload, so
Expand Down Expand Up @@ -845,7 +836,13 @@ def _poll_task_job_callback_255(self, workflow, itask, cmd_ctx, line):
)
self.poll_task_jobs(workflow, [itask])

def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
def _poll_task_job_callback(
self,
workflow: str,
itask: 'TaskProxy',
cmd_ctx: SubProcContext,
line: str,
):
"""Helper for _poll_task_jobs_callback, on one task job."""
ctx = SubProcContext(self.JOBS_POLL, None)
ctx.out = line
Expand All @@ -872,16 +869,21 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
log_lvl = DEBUG if (
itask.platform.get('communication method') == 'poll'
) else INFO

if jp_ctx.run_signal == JOB_FILES_REMOVED_MESSAGE:
LOG.error(
f"platform: {itask.platform['name']} - job log directory "
f"{job_tokens.relative_id} no longer exists"
)

if jp_ctx.run_status == 1 and jp_ctx.run_signal in ["ERR", "EXIT"]:
# Failed normally
self.task_events_mgr.process_message(
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
elif jp_ctx.run_status == 1 and jp_ctx.job_runner_exit_polled == 1:
# Failed by a signal, and no longer in job runner
self.task_events_mgr.process_message(
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
Comment on lines 885 to -882
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a duplication of what the next line does anyway

elif message.startswith(FAIL_MESSAGE_PREFIX):
# Task received signal.
if (
flag == self.FLAG_RECEIVED
and itask.state.is_gt(TASK_STATUS_FAILED)
):
# Already failed.
return True
signal = message[len(FAIL_MESSAGE_PREFIX):]
self._db_events_insert(itask, "signaled", signal)
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": signal})
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
):

self.task_events_mgr.process_message(
itask, log_lvl, FAIL_MESSAGE_PREFIX + jp_ctx.run_signal,
itask, log_lvl, f"{FAIL_MESSAGE_PREFIX}{jp_ctx.run_signal}",
jp_ctx.time_run_exit,
flag)
elif jp_ctx.run_status == 1: # noqa: SIM114
Expand Down Expand Up @@ -1269,7 +1271,8 @@ def _prep_submit_task_job(
workflow, itask, '(platform not defined)', exc)
return False
else:
itask.platform = platform
# (platform is not None here as subshell eval has finished)
itask.platform = cast('dict', platform)
# Retry delays, needed for the try_num
self._set_retry_timers(itask, rtconfig)

Expand Down
55 changes: 10 additions & 45 deletions tests/flakyfunctional/cylc-poll/16-execution-time-limit.t
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Test execution time limit polling.
export REQUIRE_PLATFORM='loc:* comms:poll runner:background'
. "$(dirname "$0")/test_header"
#-------------------------------------------------------------------------------
set_test_number 4

set_test_number 5
create_test_global_config '' "
[platforms]
[[$CYLC_TEST_PLATFORM]]
Expand All @@ -28,51 +28,16 @@ create_test_global_config '' "
execution time limit polling intervals = PT5S
"
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
#-------------------------------------------------------------------------------

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play --reference-test -v --no-detach "${WORKFLOW_NAME}" --timestamp
#-------------------------------------------------------------------------------
# shellcheck disable=SC2317
cmp_times () {
# Test if the times $1 and $2 are within $3 seconds of each other.
python3 -u - "$@" <<'__PYTHON__'
import sys
from metomi.isodatetime.parsers import TimePointParser
parser = TimePointParser()
time_1 = parser.parse(sys.argv[1])
time_2 = parser.parse(sys.argv[2])
if abs((time_1 - time_2).get_seconds()) > int(sys.argv[3]):
sys.exit("abs(predicted - actual) > tolerance: %s" % sys.argv[1:])
__PYTHON__
}
time_offset () {
# Add an ISO8601 duration to an ISO8601 date-time.
python3 -u - "$@" <<'__PYTHON__'
import sys
from metomi.isodatetime.parsers import TimePointParser, DurationParser
print(
TimePointParser().parse(sys.argv[1]) + DurationParser().parse(sys.argv[2]))
__PYTHON__
}
#-------------------------------------------------------------------------------

LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log"
# Test logging of the "next job poll" message when task starts.
TEST_NAME="${TEST_NAME_BASE}-log-entry"
LINE="$(grep '\[1/foo.* execution timeout=None, polling intervals=' "${LOG}")"
run_ok "${TEST_NAME}" grep -q 'health: execution timeout=None, polling intervals=' <<< "${LINE}"
# Determine poll times.
PREDICTED_POLL_TIME=$(time_offset \
"$(cut -d ' ' -f 1 <<< "${LINE}")" \
"PT10S") # PT5S time limit + PT5S polling interval
ACTUAL_POLL_TIME=$(sed -n \
's|\(.*\) DEBUG - \[1/foo.* (polled)failed .*|\1|p' "${LOG}")

# Test execution timeout polling.
# Main loop is roughly 1 second, but integer rounding may give an apparent 2
# seconds delay, so set threshold as 2 seconds.
run_ok "${TEST_NAME_BASE}-poll-time" \
cmp_times "${PREDICTED_POLL_TIME}" "${ACTUAL_POLL_TIME}" '10'
Copy link
Author

@MetRonnie MetRonnie Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was horribly flaky for me locally on 8.4.x. Also the timings test was so loose so as to be pointless (threshold was at some point set to 10s instead of 2s as mentioned in the comment). I don't think it's feasible to test the timings

#-------------------------------------------------------------------------------
log_scan "${TEST_NAME_BASE}-log" "${LOG}" 1 0 \
"\[1/foo/01:submitted\] => running" \
"\[1/foo/01:running\] poll now, (next in PT5S" \
"\[1/foo/01:running\] (polled)failed/XCPU"

purge
exit
12 changes: 2 additions & 10 deletions tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@
[runtime]
[[foo]]
platform = {{ environ['CYLC_TEST_PLATFORM'] }}
init-script = cylc__job__disable_fail_signals ERR EXIT
script = """
cylc__job__wait_cylc_message_started
# give it a while for the started message to get picked up by
# the scheduler
sleep 10
exit 1
"""
[[[job]]]
execution time limit = PT5S
script = sleep 20
execution time limit = PT10S
[[bar]]
39 changes: 25 additions & 14 deletions tests/integration/test_task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from contextlib import suppress
import json
import logging
from types import SimpleNamespace
from typing import Any as Fixture
from unittest.mock import Mock

from cylc.flow import CYLC_LOG
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_state import TASK_STATUS_RUNNING
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_RUNNING,
)


async def test_run_job_cmd_no_hosts_error(
Expand Down Expand Up @@ -238,23 +242,30 @@ async def test_broadcast_platform_change(


async def test_poll_job_deleted_log_folder(
one_conf, flow, scheduler, start, caplog
one_conf, flow, scheduler, start, log_filter
):
"""Capture a task error caused by polling finding the job log dir deleted.

https://github.com/cylc/cylc-flow/issues/6425
"""
ctx = SimpleNamespace()
ctx.out = JOB_FILES_REMOVED_MESSAGE
ctx.ret_code = None
ctx.cmd = ['foo', 'bar']

schd = scheduler(flow(one_conf), run_mode='live', paused_start=False)
response = {
'run_signal': JOB_FILES_REMOVED_MESSAGE,
'run_status': 1,
'job_runner_exit_polled': 1,
}
schd: Scheduler = scheduler(flow(one_conf))
async with start(schd):
schd.task_job_mgr._manip_task_jobs_callback(ctx, '', [], '')
itask = schd.pool.get_tasks()[0]
itask.submit_num = 1
job_id = itask.tokens.duplicate(job='01').relative_id
schd.task_job_mgr._poll_task_job_callback(
schd.workflow,
itask,
cmd_ctx=Mock(),
line=f'2025-02-13T12:08:30Z|{job_id}|{json.dumps(response)}',
)
assert itask.state(TASK_STATUS_FAILED)

assert (
'Task bar failed because task log directory'
'\nfoo/bar\nhas been removed.'
in caplog.messages
assert log_filter(
logging.ERROR, f"job log directory {job_id} no longer exists"
)
2 changes: 1 addition & 1 deletion tests/unit/test_job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def test__job_poll_status_files_deleted_logdir():
ctx = jrm._jobs_poll_status_files('foo', 'bar')
assert ctx.run_signal == JOB_FILES_REMOVED_MESSAGE
assert ctx.run_status == 1
assert ctx.job_runner_exit_polled == 1


def test__job_poll_status_files_ioerror(tmp_path, capsys):
Expand All @@ -75,4 +76,3 @@ def test__job_poll_status_files_ioerror(tmp_path, capsys):
jrm._jobs_poll_status_files(str(tmp_path), 'sub')
cap = capsys.readouterr()
assert '[Errno 2] No such file or directory' in cap.err

Loading