-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify poll handling of prematurely deleted job log dir #72
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -44,6 +44,7 @@ | |||||||||||||||||||||||||||||||
Optional, | ||||||||||||||||||||||||||||||||
Tuple, | ||||||||||||||||||||||||||||||||
Union, | ||||||||||||||||||||||||||||||||
cast, | ||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
from cylc.flow import LOG | ||||||||||||||||||||||||||||||||
|
@@ -159,7 +160,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr, | |||||||||||||||||||||||||||||||
self.workflow = workflow | ||||||||||||||||||||||||||||||||
self.proc_pool = proc_pool | ||||||||||||||||||||||||||||||||
self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr | ||||||||||||||||||||||||||||||||
self.task_events_mgr = task_events_mgr | ||||||||||||||||||||||||||||||||
self.task_events_mgr: TaskEventsManager = task_events_mgr | ||||||||||||||||||||||||||||||||
self.data_store_mgr = data_store_mgr | ||||||||||||||||||||||||||||||||
self.job_file_writer = JobFileWriter() | ||||||||||||||||||||||||||||||||
self.job_runner_mgr = self.job_file_writer.job_runner_mgr | ||||||||||||||||||||||||||||||||
|
@@ -753,16 +754,6 @@ def _manip_task_jobs_callback( | |||||||||||||||||||||||||||||||
or (ctx.ret_code and ctx.ret_code != 255) | ||||||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||||||
LOG.error(ctx) | ||||||||||||||||||||||||||||||||
# A polling task lets us know that a task has failed because it's | ||||||||||||||||||||||||||||||||
# log folder has been deleted whilst the task was active: | ||||||||||||||||||||||||||||||||
if ( | ||||||||||||||||||||||||||||||||
getattr(ctx, 'out', None) | ||||||||||||||||||||||||||||||||
and JOB_FILES_REMOVED_MESSAGE in ctx.out | ||||||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||||||
LOG.error( | ||||||||||||||||||||||||||||||||
f'Task {ctx.cmd[-1]} failed because task log directory' | ||||||||||||||||||||||||||||||||
f'\n{"/".join(ctx.cmd[-2:])}\nhas been removed.' | ||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||
# A dict for easy reference of (CYCLE, NAME, SUBMIT_NUM) -> TaskProxy | ||||||||||||||||||||||||||||||||
# | ||||||||||||||||||||||||||||||||
# Note for "reload": A TaskProxy instance may be replaced on reload, so | ||||||||||||||||||||||||||||||||
|
@@ -845,7 +836,13 @@ def _poll_task_job_callback_255(self, workflow, itask, cmd_ctx, line): | |||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||
self.poll_task_jobs(workflow, [itask]) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): | ||||||||||||||||||||||||||||||||
def _poll_task_job_callback( | ||||||||||||||||||||||||||||||||
self, | ||||||||||||||||||||||||||||||||
workflow: str, | ||||||||||||||||||||||||||||||||
itask: 'TaskProxy', | ||||||||||||||||||||||||||||||||
cmd_ctx: SubProcContext, | ||||||||||||||||||||||||||||||||
line: str, | ||||||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||||||
"""Helper for _poll_task_jobs_callback, on one task job.""" | ||||||||||||||||||||||||||||||||
ctx = SubProcContext(self.JOBS_POLL, None) | ||||||||||||||||||||||||||||||||
ctx.out = line | ||||||||||||||||||||||||||||||||
|
@@ -872,16 +869,21 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): | |||||||||||||||||||||||||||||||
log_lvl = DEBUG if ( | ||||||||||||||||||||||||||||||||
itask.platform.get('communication method') == 'poll' | ||||||||||||||||||||||||||||||||
) else INFO | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if jp_ctx.run_signal == JOB_FILES_REMOVED_MESSAGE: | ||||||||||||||||||||||||||||||||
LOG.error( | ||||||||||||||||||||||||||||||||
f"platform: {itask.platform['name']} - job log directory " | ||||||||||||||||||||||||||||||||
f"{job_tokens.relative_id} no longer exists" | ||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
if jp_ctx.run_status == 1 and jp_ctx.run_signal in ["ERR", "EXIT"]: | ||||||||||||||||||||||||||||||||
# Failed normally | ||||||||||||||||||||||||||||||||
self.task_events_mgr.process_message( | ||||||||||||||||||||||||||||||||
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag) | ||||||||||||||||||||||||||||||||
elif jp_ctx.run_status == 1 and jp_ctx.job_runner_exit_polled == 1: | ||||||||||||||||||||||||||||||||
# Failed by a signal, and no longer in job runner | ||||||||||||||||||||||||||||||||
self.task_events_mgr.process_message( | ||||||||||||||||||||||||||||||||
itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag) | ||||||||||||||||||||||||||||||||
Comment on lines
885
to
-882
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a duplication of what the next line does anyway cylc/cylc/flow/task_events_mgr.py Lines 788 to 802 in 5b2b180
|
||||||||||||||||||||||||||||||||
self.task_events_mgr.process_message( | ||||||||||||||||||||||||||||||||
itask, log_lvl, FAIL_MESSAGE_PREFIX + jp_ctx.run_signal, | ||||||||||||||||||||||||||||||||
itask, log_lvl, f"{FAIL_MESSAGE_PREFIX}{jp_ctx.run_signal}", | ||||||||||||||||||||||||||||||||
jp_ctx.time_run_exit, | ||||||||||||||||||||||||||||||||
flag) | ||||||||||||||||||||||||||||||||
elif jp_ctx.run_status == 1: # noqa: SIM114 | ||||||||||||||||||||||||||||||||
|
@@ -1269,7 +1271,8 @@ def _prep_submit_task_job( | |||||||||||||||||||||||||||||||
workflow, itask, '(platform not defined)', exc) | ||||||||||||||||||||||||||||||||
return False | ||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||
itask.platform = platform | ||||||||||||||||||||||||||||||||
# (platform is not None here as subshell eval has finished) | ||||||||||||||||||||||||||||||||
itask.platform = cast('dict', platform) | ||||||||||||||||||||||||||||||||
# Retry delays, needed for the try_num | ||||||||||||||||||||||||||||||||
self._set_retry_timers(itask, rtconfig) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,12 +14,12 @@ | |
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
#------------------------------------------------------------------------------- | ||
|
||
# Test execution time limit polling. | ||
export REQUIRE_PLATFORM='loc:* comms:poll runner:background' | ||
. "$(dirname "$0")/test_header" | ||
#------------------------------------------------------------------------------- | ||
set_test_number 4 | ||
|
||
set_test_number 5 | ||
create_test_global_config '' " | ||
[platforms] | ||
[[$CYLC_TEST_PLATFORM]] | ||
|
@@ -28,51 +28,16 @@ create_test_global_config '' " | |
execution time limit polling intervals = PT5S | ||
" | ||
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" | ||
#------------------------------------------------------------------------------- | ||
|
||
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" | ||
workflow_run_ok "${TEST_NAME_BASE}-run" \ | ||
cylc play --reference-test -v --no-detach "${WORKFLOW_NAME}" --timestamp | ||
#------------------------------------------------------------------------------- | ||
# shellcheck disable=SC2317 | ||
cmp_times () { | ||
# Test if the times $1 and $2 are within $3 seconds of each other. | ||
python3 -u - "$@" <<'__PYTHON__' | ||
import sys | ||
from metomi.isodatetime.parsers import TimePointParser | ||
parser = TimePointParser() | ||
time_1 = parser.parse(sys.argv[1]) | ||
time_2 = parser.parse(sys.argv[2]) | ||
if abs((time_1 - time_2).get_seconds()) > int(sys.argv[3]): | ||
sys.exit("abs(predicted - actual) > tolerance: %s" % sys.argv[1:]) | ||
__PYTHON__ | ||
} | ||
time_offset () { | ||
# Add an ISO8601 duration to an ISO8601 date-time. | ||
python3 -u - "$@" <<'__PYTHON__' | ||
import sys | ||
from metomi.isodatetime.parsers import TimePointParser, DurationParser | ||
print( | ||
TimePointParser().parse(sys.argv[1]) + DurationParser().parse(sys.argv[2])) | ||
__PYTHON__ | ||
} | ||
#------------------------------------------------------------------------------- | ||
|
||
LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log" | ||
# Test logging of the "next job poll" message when task starts. | ||
TEST_NAME="${TEST_NAME_BASE}-log-entry" | ||
LINE="$(grep '\[1/foo.* execution timeout=None, polling intervals=' "${LOG}")" | ||
run_ok "${TEST_NAME}" grep -q 'health: execution timeout=None, polling intervals=' <<< "${LINE}" | ||
# Determine poll times. | ||
PREDICTED_POLL_TIME=$(time_offset \ | ||
"$(cut -d ' ' -f 1 <<< "${LINE}")" \ | ||
"PT10S") # PT5S time limit + PT5S polling interval | ||
ACTUAL_POLL_TIME=$(sed -n \ | ||
's|\(.*\) DEBUG - \[1/foo.* (polled)failed .*|\1|p' "${LOG}") | ||
|
||
# Test execution timeout polling. | ||
# Main loop is roughly 1 second, but integer rounding may give an apparent 2 | ||
# seconds delay, so set threshold as 2 seconds. | ||
run_ok "${TEST_NAME_BASE}-poll-time" \ | ||
cmp_times "${PREDICTED_POLL_TIME}" "${ACTUAL_POLL_TIME}" '10' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test was horribly flaky for me locally on 8.4.x. Also the timings test was so loose so as to be pointless (threshold was at some point set to 10s instead of 2s as mentioned in the comment). I don't think it's feasible to test the timings |
||
#------------------------------------------------------------------------------- | ||
log_scan "${TEST_NAME_BASE}-log" "${LOG}" 1 0 \ | ||
"\[1/foo/01:submitted\] => running" \ | ||
"\[1/foo/01:running\] poll now, (next in PT5S" \ | ||
"\[1/foo/01:running\] (polled)failed/XCPU" | ||
|
||
purge | ||
exit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved this error logging into the
_poll_task_job_callback
method seeing as it only applies for poll, not kill or submit which also use this_manip_task_jobs_callback
method