From d3be5eca58c17ed315836e42d222151fedfb489b Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Mon, 13 Jan 2025 15:17:40 +0000
Subject: [PATCH 1/9] Tidy
---
cylc/flow/scheduler.py | 7 ++++---
cylc/flow/subprocctx.py | 4 ++++
cylc/flow/task_job_mgr.py | 43 +++++++++++++++++++++++++++------------
cylc/flow/task_proxy.py | 4 ++--
4 files changed, 40 insertions(+), 18 deletions(-)
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index dec3a279ce9..dbfbcdd5069 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -1250,6 +1250,7 @@ def get_contact_data(self) -> Dict[str, str]:
"""
fields = workflow_files.ContactFileFields
proc = psutil.Process()
+ platform = get_platform()
# fmt: off
return {
fields.API:
@@ -1275,11 +1276,11 @@ def get_contact_data(self) -> Dict[str, str]:
fields.VERSION:
CYLC_VERSION,
fields.SCHEDULER_SSH_COMMAND:
- str(get_platform()['ssh command']),
+ str(platform['ssh command']),
fields.SCHEDULER_CYLC_PATH:
- str(get_platform()['cylc path']),
+ str(platform['cylc path']),
fields.SCHEDULER_USE_LOGIN_SHELL:
- str(get_platform()['use login shell'])
+ str(platform['use login shell'])
}
# fmt: on
diff --git a/cylc/flow/subprocctx.py b/cylc/flow/subprocctx.py
index 41b735a71c5..e54b2815f3b 100644
--- a/cylc/flow/subprocctx.py
+++ b/cylc/flow/subprocctx.py
@@ -23,6 +23,7 @@
from shlex import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
+
from cylc.flow.wallclock import get_current_time_string
if TYPE_CHECKING:
@@ -137,6 +138,9 @@ def __str__(self):
'mesg': mesg}
return ret.rstrip()
+ def __repr__(self) -> str:
+ return f"<{type(self).__name__} {self.cmd_key}>"
+
class SubFuncContext(SubProcContext):
"""Represent the context of a Python function to run as a subprocess.
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 7e1cfa02cc4..5595fd474f5 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -123,6 +123,11 @@
if TYPE_CHECKING:
+ # BACK COMPAT: typing_extensions.Literal
+ # FROM: Python 3.7
+ # TO: Python 3.8
+ from typing_extensions import Literal
+
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
@@ -159,7 +164,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr,
self.workflow = workflow
self.proc_pool = proc_pool
self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr
- self.task_events_mgr = task_events_mgr
+ self.task_events_mgr: TaskEventsManager = task_events_mgr
self.data_store_mgr = data_store_mgr
self.job_file_writer = JobFileWriter()
self.job_runner_mgr = self.job_file_writer.job_runner_mgr
@@ -220,14 +225,19 @@ def poll_task_jobs(self, workflow, itasks, msg=None):
self._poll_task_jobs_callback_255
)
- def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
+ def prep_submit_task_jobs(
+ self,
+ workflow: str,
+ itasks: 'Iterable[TaskProxy]',
+ check_syntax: bool = True,
+ ) -> 'Tuple[List[TaskProxy], List[TaskProxy]]':
"""Prepare task jobs for submit.
Prepare tasks where possible. Ignore tasks that are waiting for host
select command to complete. Bad host select command or error writing to
a job file will cause a bad task - leading to submission failure.
- Return [list, list]: list of good tasks, list of bad tasks
+ Return (good_tasks, bad_tasks)
"""
prepared_tasks = []
bad_tasks = []
@@ -244,16 +254,16 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
prepared_tasks.append(itask)
elif prep_task is False:
bad_tasks.append(itask)
- return [prepared_tasks, bad_tasks]
+ return (prepared_tasks, bad_tasks)
def submit_task_jobs(
self,
workflow,
- itasks,
+ itasks: 'Iterable[TaskProxy]',
curve_auth,
client_pub_key_dir,
run_mode: RunMode = RunMode.LIVE,
- ):
+ ) -> 'List[TaskProxy]':
"""Prepare for job submission and submit task jobs.
Preparation (host selection, remote host init, and remote install)
@@ -264,7 +274,7 @@ def submit_task_jobs(
Once preparation has completed or failed, reset .waiting_on_job_prep in
task instances so the scheduler knows to stop sending them back here.
- This method uses prep_submit_task_job() as helper.
+ This method uses prep_submit_task_jobs() as helper.
Return (list): list of tasks that attempted submission.
"""
@@ -1029,7 +1039,7 @@ def _set_retry_timers(
def submit_nonlive_task_jobs(
self: 'TaskJobManager',
workflow: str,
- itasks: 'List[TaskProxy]',
+ itasks: 'Iterable[TaskProxy]',
workflow_run_mode: RunMode,
) -> 'Tuple[List[TaskProxy], List[TaskProxy]]':
"""Identify task mode and carry out alternative submission
@@ -1152,7 +1162,7 @@ def _prep_submit_task_job(
workflow: str,
itask: 'TaskProxy',
check_syntax: bool = True
- ):
+ ) -> 'Union[TaskProxy, None, Literal[False]]':
"""Prepare a task job submission.
Returns:
@@ -1217,7 +1227,7 @@ def _prep_submit_task_job(
else:
# host/platform select not ready
if host_n is None and platform_name is None:
- return
+ return None
elif (
host_n is None
and rtconfig['platform']
@@ -1292,7 +1302,13 @@ def _prep_submit_task_job(
itask.local_job_file_path = local_job_file_path
return itask
- def _prep_submit_task_job_error(self, workflow, itask, action, exc):
+ def _prep_submit_task_job_error(
+ self,
+ workflow: str,
+ itask: 'TaskProxy',
+ action: str,
+ exc: Union[Exception, str],
+ ) -> None:
"""Helper for self._prep_submit_task_job. On error."""
log_task_job_activity(
SubProcContext(self.JOBS_SUBMIT, action, err=exc, ret_code=1),
@@ -1306,11 +1322,12 @@ def _prep_submit_task_job_error(self, workflow, itask, action, exc):
# than submit-failed
# provide a dummy job config - this info will be added to the data
# store
+ try_num = itask.get_try_num()
itask.jobs.append({
'task_id': itask.identity,
'platform': itask.platform,
'submit_num': itask.submit_num,
- 'try_num': itask.get_try_num(),
+ 'try_num': try_num,
})
# create a DB entry for the submit-failed job
self.workflow_db_mgr.put_insert_task_jobs(
@@ -1319,7 +1336,7 @@ def _prep_submit_task_job_error(self, workflow, itask, action, exc):
'flow_nums': serialise_set(itask.flow_nums),
'job_id': itask.summary.get('submit_method_id'),
'is_manual_submit': itask.is_manual_submit,
- 'try_num': itask.get_try_num(),
+ 'try_num': try_num,
'time_submit': get_current_time_string(),
'platform_name': itask.platform['name'],
'job_runner_name': itask.summary['job_runner_name'],
diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py
index 8d9134a6ac4..620b647f909 100644
--- a/cylc/flow/task_proxy.py
+++ b/cylc/flow/task_proxy.py
@@ -152,7 +152,7 @@ class TaskProxy:
graph children: {msg: [(name, point), ...]}
.flow_nums:
flows I belong to (if empty, belongs to 'none' flow)
- flow_wait:
+ .flow_wait:
wait for flow merge before spawning children
.waiting_on_job_prep:
True whilst task is awaiting job prep, reset to False once the
@@ -316,7 +316,7 @@ def __init__(
)
def __repr__(self) -> str:
- return f"<{self.__class__.__name__} {self.identity}>"
+ return f"<{type(self).__name__} {self.identity} {self.state}>"
def __str__(self) -> str:
"""Stringify with tokens, state, submit_num, and flow_nums.
From 21be5b01c44be1773627235fc2c04f7d6521da84 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Mon, 13 Jan 2025 16:34:04 +0000
Subject: [PATCH 2/9] Kill tasks during job prep
---
changes.d/6535.fix.md | 1 +
cylc/flow/scheduler.py | 13 +++++----
cylc/flow/task_job_mgr.py | 11 +++++++-
tests/integration/test_kill.py | 48 ++++++++++++++++++++++++++++++++++
4 files changed, 67 insertions(+), 6 deletions(-)
create mode 100644 changes.d/6535.fix.md
create mode 100644 tests/integration/test_kill.py
diff --git a/changes.d/6535.fix.md b/changes.d/6535.fix.md
new file mode 100644
index 00000000000..ab5fc6bb3ca
--- /dev/null
+++ b/changes.d/6535.fix.md
@@ -0,0 +1 @@
+Ensure tasks can be killed while in the preparing state.
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index dbfbcdd5069..eec8dd43b67 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -1078,9 +1078,14 @@ def kill_tasks(
to_kill: List[TaskProxy] = []
unkillable: List[TaskProxy] = []
for itask in itasks:
- if itask.state(*TASK_STATUSES_ACTIVE):
- if itask.state_reset(is_held=True):
- self.data_store_mgr.delta_task_state(itask)
+ if not itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
+ unkillable.append(itask)
+ continue
+ if itask.state_reset(is_held=True):
+ self.data_store_mgr.delta_task_state(itask)
+ if itask.state(TASK_STATUS_PREPARING):
+ self.task_job_mgr.kill_prep_task(itask)
+ else:
to_kill.append(itask)
if jobless:
# Directly set failed in sim mode:
@@ -1088,8 +1093,6 @@ def kill_tasks(
itask, 'CRITICAL', TASK_STATUS_FAILED,
flag=self.task_events_mgr.FLAG_RECEIVED
)
- else:
- unkillable.append(itask)
if warn and unkillable:
LOG.warning(
"Tasks not killable: "
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 5595fd474f5..69da98eb8ff 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -161,7 +161,7 @@ class TaskJobManager:
def __init__(self, workflow, proc_pool, workflow_db_mgr,
task_events_mgr, data_store_mgr, bad_hosts):
- self.workflow = workflow
+ self.workflow: str = workflow
self.proc_pool = proc_pool
self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr
self.task_events_mgr: TaskEventsManager = task_events_mgr
@@ -201,6 +201,15 @@ def kill_task_jobs(
self._kill_task_jobs_callback_255
)
+ def kill_prep_task(self, itask: 'TaskProxy') -> None:
+ """Kill a preparing task."""
+ itask.waiting_on_job_prep = False
+ itask.local_job_file_path = None # reset for retry
+ self._set_retry_timers(itask)
+ self._prep_submit_task_job_error(
+ self.workflow, itask, '(killed in job prep)', ''
+ )
+
def poll_task_jobs(self, workflow, itasks, msg=None):
"""Poll jobs of specified tasks.
diff --git a/tests/integration/test_kill.py b/tests/integration/test_kill.py
new file mode 100644
index 00000000000..be88adf123a
--- /dev/null
+++ b/tests/integration/test_kill.py
@@ -0,0 +1,48 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import logging
+import pytest
+
+from cylc.flow.commands import kill_tasks, run_cmd
+from cylc.flow.scheduler import Scheduler
+from cylc.flow.task_state import TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_WAITING
+
+
+async def test_kill_preparing(
+ one_conf,
+ flow,
+ scheduler,
+ start,
+ monkeypatch: pytest.MonkeyPatch,
+ log_filter,
+):
+ """Test killing a preparing task."""
+ schd: Scheduler = scheduler(
+ flow(one_conf), run_mode='live', paused_start=False
+ )
+ async with start(schd):
+ # Make the task indefinitely preparing:
+ monkeypatch.setattr(
+ schd.task_job_mgr, '_prep_submit_task_job', lambda *a, **k: None
+ )
+ itask = schd.pool.get_tasks()[0]
+ assert itask.state(TASK_STATUS_WAITING, is_held=False)
+ schd.start_job_submission([itask])
+
+ await run_cmd(kill_tasks(schd, [itask.tokens.relative_id]))
+ assert itask.state(TASK_STATUS_SUBMIT_FAILED, is_held=True)
+ assert log_filter(logging.ERROR, 'killed in job prep')
From da76cf48809ecfbe777458a70d511e3f380570e5 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 16 Jan 2025 14:57:03 +0000
Subject: [PATCH 3/9] Add tests for `cylc kill` handlers & retries behaviour
---
tests/functional/cylc-kill/04-handlers.t | 47 +++++++++++++++++++
.../cylc-kill/04-handlers/flow.cylc | 39 +++++++++++++++
.../cylc-kill/04-handlers/reference.log | 6 +++
tests/functional/cylc-kill/05-retries.t | 38 +++++++++++++++
.../functional/cylc-kill/05-retries/flow.cylc | 38 +++++++++++++++
.../cylc-kill/05-retries/reference.log | 6 +++
tests/functional/lib/bash/test_header | 4 +-
7 files changed, 177 insertions(+), 1 deletion(-)
create mode 100644 tests/functional/cylc-kill/04-handlers.t
create mode 100644 tests/functional/cylc-kill/04-handlers/flow.cylc
create mode 100644 tests/functional/cylc-kill/04-handlers/reference.log
create mode 100644 tests/functional/cylc-kill/05-retries.t
create mode 100644 tests/functional/cylc-kill/05-retries/flow.cylc
create mode 100644 tests/functional/cylc-kill/05-retries/reference.log
diff --git a/tests/functional/cylc-kill/04-handlers.t b/tests/functional/cylc-kill/04-handlers.t
new file mode 100644
index 00000000000..f90e0c7a09a
--- /dev/null
+++ b/tests/functional/cylc-kill/04-handlers.t
@@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+# Test event handlers when killing running/submitted/preparing tasks.
+# Any downstream tasks that depend on the `:submit-fail`/`:fail` outputs
+# SHOULD run.
+# Handlers for the `submission failed`/`failed` events SHOULD run.
+
+export REQUIRE_PLATFORM='runner:at'
+. "$(dirname "$0")/test_header"
+set_test_number 5
+
+# Create platform that ensures job will be in submitted state for long enough
+create_test_global_config '' "
+[platforms]
+ [[old_street]]
+ job runner = at
+ job runner command template = at now + 5 minutes
+ hosts = localhost
+ install target = localhost
+"
+
+install_and_validate
+reftest_run
+
+grep_workflow_log_ok "grep-a" "[(('event-handler-00', 'failed'), 1) out] 1/a" -F
+
+for task in b c; do
+ grep_workflow_log_ok "grep-${task}" \
+ "[(('event-handler-00', 'submission failed'), 1) out] 1/${task}" -F
+done
+
+purge
diff --git a/tests/functional/cylc-kill/04-handlers/flow.cylc b/tests/functional/cylc-kill/04-handlers/flow.cylc
new file mode 100644
index 00000000000..28adbfa35cb
--- /dev/null
+++ b/tests/functional/cylc-kill/04-handlers/flow.cylc
@@ -0,0 +1,39 @@
+[scheduler]
+ allow implicit tasks = True
+ [[events]]
+ expected task failures = 1/a, 1/b, 1/c
+ stall timeout = PT0S
+ abort on stall timeout = True
+[scheduling]
+ [[graph]]
+ R1 = """
+ a:started => killer
+ a:failed => end
+
+ b:submitted? => killer
+ b:submit-failed? => end
+
+ c:submit-failed? => end
+
+ c:submitted? => nope
+ """
+[runtime]
+ [[a, b, c]]
+ [[[events]]]
+ failed handlers = echo %(id)s
+ submission failed handlers = echo %(id)s
+ [[a]]
+ script = sleep 40
+ [[b]]
+ platform = old_street
+ [[c]]
+ platform = $(sleep 40; echo localhost)
+ [[killer]]
+ script = """
+ cylc kill "$CYLC_WORKFLOW_ID//1/a" "$CYLC_WORKFLOW_ID//1/b"
+
+ cylc__job__poll_grep_workflow_log -E '1\/c.* => preparing'
+ cylc kill "$CYLC_WORKFLOW_ID//1/c"
+ """
+ [[end]]
+ script = cylc stop "$CYLC_WORKFLOW_ID" --now --now
diff --git a/tests/functional/cylc-kill/04-handlers/reference.log b/tests/functional/cylc-kill/04-handlers/reference.log
new file mode 100644
index 00000000000..fa8414bf650
--- /dev/null
+++ b/tests/functional/cylc-kill/04-handlers/reference.log
@@ -0,0 +1,6 @@
+Initial point: 1
+Final point: 1
+1/a -triggered off []
+1/b -triggered off []
+1/killer -triggered off ['1/a', '1/b']
+1/end -triggered off ['1/a', '1/b', '1/c']
diff --git a/tests/functional/cylc-kill/05-retries.t b/tests/functional/cylc-kill/05-retries.t
new file mode 100644
index 00000000000..03a90a6001f
--- /dev/null
+++ b/tests/functional/cylc-kill/05-retries.t
@@ -0,0 +1,38 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+# Test retries when killing running/submitted/preparing tasks.
+# As killing tasks puts them in the held state, retries should NOT go ahead.
+
+export REQUIRE_PLATFORM='runner:at'
+. "$(dirname "$0")/test_header"
+set_test_number 2
+
+# Create platform that ensures job will be in submitted state for long enough
+create_test_global_config '' "
+[platforms]
+ [[old_street]]
+ job runner = at
+ job runner command template = at now + 5 minutes
+ hosts = localhost
+ install target = localhost
+"
+
+install_and_validate
+reftest_run
+
+purge
diff --git a/tests/functional/cylc-kill/05-retries/flow.cylc b/tests/functional/cylc-kill/05-retries/flow.cylc
new file mode 100644
index 00000000000..0b6c276cd1d
--- /dev/null
+++ b/tests/functional/cylc-kill/05-retries/flow.cylc
@@ -0,0 +1,38 @@
+[scheduler]
+ allow implicit tasks = True
+ [[events]]
+ expected task failures = 1/a, 1/b, 1/c
+ stall timeout = PT0S
+ abort on stall timeout = True
+[scheduling]
+ [[graph]]
+ R1 = """
+ a:started & b:submitted? => killer => end
+
+ # Should not get these outputs:
+ a:failed? | b:submit-failed? | c:submit-failed? => nope
+ """
+[runtime]
+ [[a]]
+ script = sleep 40
+ execution retry delays = PT0S
+ [[b]]
+ platform = old_street
+ submission retry delays = PT0S
+ [[c]]
+ platform = $(sleep 40; echo localhost)
+ submission retry delays = PT0S
+ [[killer]]
+ script = """
+ cylc kill "$CYLC_WORKFLOW_ID//1/a" "$CYLC_WORKFLOW_ID//1/b"
+
+ cylc__job__poll_grep_workflow_log -E '1\/c.* => preparing'
+ cylc kill "$CYLC_WORKFLOW_ID//1/c"
+ """
+ [[end]]
+ script = """
+ for task in a b c; do
+ cylc__job__poll_grep_workflow_log -E "1\/${task}.* retrying"
+ done
+ cylc stop "$CYLC_WORKFLOW_ID" --now --now
+ """
diff --git a/tests/functional/cylc-kill/05-retries/reference.log b/tests/functional/cylc-kill/05-retries/reference.log
new file mode 100644
index 00000000000..121a4a40def
--- /dev/null
+++ b/tests/functional/cylc-kill/05-retries/reference.log
@@ -0,0 +1,6 @@
+Initial point: 1
+Final point: 1
+1/a -triggered off []
+1/b -triggered off []
+1/killer -triggered off ['1/a', '1/b']
+1/end -triggered off ['1/killer']
diff --git a/tests/functional/lib/bash/test_header b/tests/functional/lib/bash/test_header
index 76f52524eb0..07d75a7c4df 100644
--- a/tests/functional/lib/bash/test_header
+++ b/tests/functional/lib/bash/test_header
@@ -433,8 +433,10 @@ grep_workflow_log_ok() {
local TEST_NAME="$1"
local PATTERN="$2"
shift 2
+ OPTS="$*"
local LOG_FILE="${WORKFLOW_RUN_DIR}/log/scheduler/log"
- if grep "$@" -s -e "$PATTERN" "$LOG_FILE"; then
+ # shellcheck disable=SC2086
+ if grep ${OPTS} -s -e "$PATTERN" "$LOG_FILE"; then
ok "${TEST_NAME}"
return
fi
From 02a025e1c563559273e8699411e288f57eff4c2f Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 16 Jan 2025 15:02:57 +0000
Subject: [PATCH 4/9] Replace functional test with integration test
---
tests/functional/cylc-kill/03-simulation.t | 42 ----------------
.../cylc-kill/03-simulation/flow.cylc | 12 -----
tests/integration/test_kill.py | 50 ++++++++++++++++++-
3 files changed, 48 insertions(+), 56 deletions(-)
delete mode 100755 tests/functional/cylc-kill/03-simulation.t
delete mode 100644 tests/functional/cylc-kill/03-simulation/flow.cylc
diff --git a/tests/functional/cylc-kill/03-simulation.t b/tests/functional/cylc-kill/03-simulation.t
deleted file mode 100755
index dadfeede123..00000000000
--- a/tests/functional/cylc-kill/03-simulation.t
+++ /dev/null
@@ -1,42 +0,0 @@
-#!/usr/bin/env bash
-# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
-# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-
-# Test kill a running simulation job
-
-. "$(dirname "$0")/test_header"
-
-set_test_number 3
-install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
-
-run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
-
-# run workflow in background
-cylc play --debug -m simulation "${WORKFLOW_NAME}" >/dev/null 2>&1
-
-# wait for simulated job start
-poll_grep_workflow_log "1/foo.* running" -E
-
-# kill it
-run_ok killer cylc kill "${WORKFLOW_NAME}//1/foo"
-
-# wait for shut down
-poll_grep_workflow_log "INFO - DONE"
-
-# check the sim job was kiled
-grep_workflow_log_ok killed "1/foo.* failed" -E
-
-purge
diff --git a/tests/functional/cylc-kill/03-simulation/flow.cylc b/tests/functional/cylc-kill/03-simulation/flow.cylc
deleted file mode 100644
index 03b6249e962..00000000000
--- a/tests/functional/cylc-kill/03-simulation/flow.cylc
+++ /dev/null
@@ -1,12 +0,0 @@
-[scheduler]
- [[events]]
- inactivity timeout = PT20S
- abort on inactivity timeout = True
-[scheduling]
- [[graph]]
- R1 = "foo?"
-[runtime]
- [[root]]
- [[[simulation]]]
- default run length = PT30S
- [[foo]]
diff --git a/tests/integration/test_kill.py b/tests/integration/test_kill.py
index be88adf123a..fd38b7e42ce 100644
--- a/tests/integration/test_kill.py
+++ b/tests/integration/test_kill.py
@@ -14,12 +14,58 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+
+import asyncio
import logging
+
+from async_timeout import timeout as async_timeout
import pytest
-from cylc.flow.commands import kill_tasks, run_cmd
+from cylc.flow.commands import (
+ kill_tasks,
+ run_cmd,
+)
from cylc.flow.scheduler import Scheduler
-from cylc.flow.task_state import TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_WAITING
+from cylc.flow.task_proxy import TaskProxy
+from cylc.flow.task_state import (
+ TASK_STATUS_FAILED,
+ TASK_STATUS_PREPARING,
+ TASK_STATUS_RUNNING,
+ TASK_STATUS_SUBMIT_FAILED,
+)
+
+
+async def task_state(itask: TaskProxy, state: str, timeout=4, **kwargs):
+ """Await task state."""
+ async with async_timeout(timeout):
+ while not itask.state(state, **kwargs):
+ await asyncio.sleep(0.1)
+
+
+async def test_simulation(flow, scheduler, run):
+ """Test killing a running task in simulation mode."""
+ conf = {
+ 'scheduling': {
+ 'graph': {
+ 'R1': 'foo',
+ },
+ },
+ 'runtime': {
+ 'root': {
+ 'simulation': {
+ 'default run length': 'PT30S',
+ },
+ },
+ },
+ }
+ schd: Scheduler = scheduler(flow(conf), paused_start=False)
+ async with run(schd):
+ itask = schd.pool.get_tasks()[0]
+ await task_state(itask, TASK_STATUS_RUNNING)
+
+ await run_cmd(kill_tasks(schd, [itask.identity]))
+ await task_state(itask, TASK_STATUS_FAILED, is_held=True)
+ assert schd.check_workflow_stalled()
async def test_kill_preparing(
From 9146357590ba9ed10ed3360f2711d16b82f40242 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Fri, 17 Jan 2025 15:51:14 +0000
Subject: [PATCH 5/9] Fix return value of `Scheduler.start_job_submission()`
Was returning True even if no tasks were submitted
---
cylc/flow/scheduler.py | 26 +++++++++++++++-----------
1 file changed, 15 insertions(+), 11 deletions(-)
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index eec8dd43b67..324f701486e 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -19,6 +19,7 @@
from collections import deque
from contextlib import suppress
import itertools
+import logging
import os
from pathlib import Path
from queue import (
@@ -82,6 +83,7 @@
FLOW_NONE,
FlowMgr,
repr_flow_nums,
+ stringify_flow_nums,
)
from cylc.flow.host_select import (
HostSelectException,
@@ -1535,22 +1537,24 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
- log = LOG.debug
- if self.options.reftest or self.options.genref:
- log = LOG.info
-
- for itask in self.task_job_mgr.submit_task_jobs(
+ submitted = self.task_job_mgr.submit_task_jobs(
self.workflow,
itasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
run_mode=self.get_run_mode()
- ):
- if itask.flow_nums:
- flow = ','.join(str(i) for i in itask.flow_nums)
- else:
- flow = FLOW_NONE
- log(
+ )
+ if not submitted:
+ return False
+
+ log_lvl = logging.DEBUG
+ if self.options.reftest or self.options.genref:
+ log_lvl = logging.INFO
+
+ for itask in submitted:
+ flow = stringify_flow_nums(itask.flow_nums) or FLOW_NONE
+ LOG.log(
+ log_lvl,
f"{itask.identity} -triggered off "
f"{itask.state.get_resolved_dependencies()} in flow {flow}"
)
From 475962bdda46efa67fc62bc73ba63b09bf986026 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Fri, 17 Jan 2025 15:51:29 +0000
Subject: [PATCH 6/9] Add test
---
tests/integration/test_kill.py | 99 +++++++++++++++++++++++++++++-----
1 file changed, 86 insertions(+), 13 deletions(-)
diff --git a/tests/integration/test_kill.py b/tests/integration/test_kill.py
index fd38b7e42ce..3588ccf812e 100644
--- a/tests/integration/test_kill.py
+++ b/tests/integration/test_kill.py
@@ -17,6 +17,8 @@
import asyncio
import logging
+from secrets import token_hex
+from unittest.mock import Mock
from async_timeout import timeout as async_timeout
import pytest
@@ -27,6 +29,12 @@
)
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_proxy import TaskProxy
+from cylc.flow.task_remote_mgr import (
+ REMOTE_FILE_INSTALL_DONE,
+ REMOTE_FILE_INSTALL_IN_PROGRESS,
+ REMOTE_INIT_DONE,
+ REMOTE_INIT_IN_PROGRESS,
+)
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_PREPARING,
@@ -35,6 +43,9 @@
)
+LOCALHOST = 'localhost'
+
+
async def task_state(itask: TaskProxy, state: str, timeout=4, **kwargs):
"""Await task state."""
async with async_timeout(timeout):
@@ -42,6 +53,11 @@ async def task_state(itask: TaskProxy, state: str, timeout=4, **kwargs):
await asyncio.sleep(0.1)
+def patch_remote_init(schd: Scheduler, value: str):
+ """Set remote init state."""
+ schd.task_job_mgr.task_remote_mgr.remote_init_map[LOCALHOST] = value
+
+
async def test_simulation(flow, scheduler, run):
"""Test killing a running task in simulation mode."""
conf = {
@@ -69,26 +85,83 @@ async def test_simulation(flow, scheduler, run):
async def test_kill_preparing(
- one_conf,
- flow,
- scheduler,
- start,
- monkeypatch: pytest.MonkeyPatch,
- log_filter,
+ flow, scheduler, run, monkeypatch: pytest.MonkeyPatch, log_filter
):
"""Test killing a preparing task."""
schd: Scheduler = scheduler(
- flow(one_conf), run_mode='live', paused_start=False
+ flow('foo'), run_mode='live', paused_start=False
)
- async with start(schd):
+ async with run(schd):
# Make the task indefinitely preparing:
monkeypatch.setattr(
- schd.task_job_mgr, '_prep_submit_task_job', lambda *a, **k: None
+ schd.task_job_mgr, '_prep_submit_task_job', Mock(return_value=None)
)
itask = schd.pool.get_tasks()[0]
- assert itask.state(TASK_STATUS_WAITING, is_held=False)
- schd.start_job_submission([itask])
+ await task_state(itask, TASK_STATUS_PREPARING, is_held=False)
- await run_cmd(kill_tasks(schd, [itask.tokens.relative_id]))
- assert itask.state(TASK_STATUS_SUBMIT_FAILED, is_held=True)
+ await run_cmd(kill_tasks(schd, [itask.identity]))
+ await task_state(itask, TASK_STATUS_SUBMIT_FAILED, is_held=True)
assert log_filter(logging.ERROR, 'killed in job prep')
+
+
+async def test_kill_preparing_pipeline(
+ flow, scheduler, run, monkeypatch: pytest.MonkeyPatch
+):
+ """Test killing a preparing task through various stages of the preparing
+ pipeline that involve submitting subprocesses and waiting for them to
+ complete."""
+ # Make localhost look like a remote target so we can test
+ # remote init/file install stages:
+ monkeypatch.setattr(
+ 'cylc.flow.task_job_mgr.get_localhost_install_target',
+ Mock(return_value=token_hex()),
+ )
+
+ schd: Scheduler = scheduler(
+ flow('one'), run_mode='live', paused_start=False
+ )
+ async with run(schd):
+ remote_mgr = schd.task_job_mgr.task_remote_mgr
+ mock_eval_platform = Mock(return_value=None)
+ monkeypatch.setattr(remote_mgr, 'eval_platform', mock_eval_platform)
+ mock_remote_init = Mock()
+ monkeypatch.setattr(remote_mgr, 'remote_init', mock_remote_init)
+ mock_file_install = Mock()
+ monkeypatch.setattr(remote_mgr, 'file_install', mock_file_install)
+ itask = schd.pool.get_tasks()[0]
+
+ # Platform eval:
+ await task_state(itask, TASK_STATUS_PREPARING)
+ assert schd.release_tasks_to_run() is False
+ await run_cmd(kill_tasks(schd, [itask.identity]))
+ await task_state(itask, TASK_STATUS_SUBMIT_FAILED)
+ # Set to finished:
+ mock_eval_platform.return_value = LOCALHOST
+ # Should not submit after finish because it was killed:
+ assert schd.release_tasks_to_run() is False
+
+ # Remote init:
+ patch_remote_init(schd, REMOTE_INIT_IN_PROGRESS)
+ schd.pool._force_trigger(itask)
+ await task_state(itask, TASK_STATUS_PREPARING)
+ assert schd.release_tasks_to_run() is False
+ await run_cmd(kill_tasks(schd, [itask.identity]))
+ await task_state(itask, TASK_STATUS_SUBMIT_FAILED)
+ # Set to finished:
+ patch_remote_init(schd, REMOTE_INIT_DONE)
+ # Should not submit after finish because it was killed:
+ assert schd.release_tasks_to_run() is False
+ assert not mock_remote_init.called
+
+ # Remote file install:
+ patch_remote_init(schd, REMOTE_FILE_INSTALL_IN_PROGRESS)
+ schd.pool._force_trigger(itask)
+ await task_state(itask, TASK_STATUS_PREPARING)
+ assert schd.release_tasks_to_run() is False
+ await run_cmd(kill_tasks(schd, [itask.identity]))
+ await task_state(itask, TASK_STATUS_SUBMIT_FAILED)
+ # Set to finished:
+ patch_remote_init(schd, REMOTE_FILE_INSTALL_DONE)
+ # Should not submit after finish because it was killed:
+ assert schd.release_tasks_to_run() is False
+ assert not mock_file_install.called
From adf59f3507c4233a181efda15c0ed068030e325f Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Fri, 17 Jan 2025 18:15:34 +0000
Subject: [PATCH 7/9] Mypy
---
cylc/flow/task_job_mgr.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 69da98eb8ff..23d644f812a 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -346,7 +346,7 @@ def submit_livelike_task_jobs(
bc_mgr = self.task_events_mgr.broadcast_mgr
rtconf = bc_mgr.get_updated_rtconfig(itask)
try:
- platform = get_platform(
+ platform = get_platform( # type: ignore[assignment]
rtconf,
bad_hosts=self.bad_hosts
)
@@ -1278,7 +1278,7 @@ def _prep_submit_task_job(
workflow, itask, '(platform not defined)', exc)
return False
else:
- itask.platform = platform
+ itask.platform = platform # type: ignore[assignment]
# Retry delays, needed for the try_num
self._set_retry_timers(itask, rtconfig)
From eae06d76d5311688d0e4d2fe126e4f2d5f83853e Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 13 Feb 2025 17:16:38 +0000
Subject: [PATCH 8/9] Tidy API for submitting task jobs
---
cylc/flow/network/server.py | 16 +++--
cylc/flow/scheduler.py | 22 +++----
cylc/flow/task_job_mgr.py | 63 +++++++++++--------
cylc/flow/task_remote_mgr.py | 30 +++------
tests/integration/conftest.py | 8 +--
.../run_modes/test_mode_overrides.py | 12 +---
tests/integration/run_modes/test_nonlive.py | 52 +++++++--------
tests/integration/run_modes/test_skip.py | 29 ++-------
tests/integration/test_kill.py | 24 ++++---
tests/integration/test_task_events_mgr.py | 8 +--
tests/integration/test_task_job_mgr.py | 6 +-
tests/unit/test_scheduler.py | 4 +-
tests/unit/test_task_remote_mgr.py | 7 ++-
13 files changed, 122 insertions(+), 159 deletions(-)
diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py
index 2c170e61198..b2b7753ee55 100644
--- a/cylc/flow/network/server.py
+++ b/cylc/flow/network/server.py
@@ -123,6 +123,10 @@ class WorkflowRuntimeServer:
"""
endpoints: Dict[str, object]
+ curve_auth: ThreadAuthenticator
+ """The ZMQ authenticator."""
+ client_pub_key_dir: str
+ """Client public key directory, used by the ZMQ authenticator."""
OPERATE_SLEEP_INTERVAL = 0.2
STOP_SLEEP_INTERVAL = 0.2
@@ -136,8 +140,6 @@ def __init__(self, schd):
self.publisher = None
self.loop = None
self.thread = None
- self.curve_auth = None
- self.client_pub_key_dir = None
self.schd: 'Scheduler' = schd
self.resolvers = Resolvers(
@@ -184,10 +186,7 @@ def start(self, barrier):
self.client_pub_key_dir = client_pub_keyinfo.key_path
# Initial load for the localhost key.
- self.curve_auth.configure_curve(
- domain='*',
- location=(self.client_pub_key_dir)
- )
+ self.configure_curve()
min_, max_ = glbl_cfg().get(['scheduler', 'run hosts', 'ports'])
self.replier = WorkflowReplier(self, context=self.zmq_context)
@@ -207,6 +206,11 @@ def start(self, barrier):
self.operate()
+ def configure_curve(self) -> None:
+ self.curve_auth.configure_curve(
+ domain='*', location=self.client_pub_key_dir
+ )
+
async def stop(self, reason: Union[BaseException, str]) -> None:
"""Stop the TCP servers, and clean up authentication.
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index 324f701486e..8fe3f9a4809 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -442,7 +442,8 @@ async def initialise(self):
self.workflow_db_mgr,
self.task_events_mgr,
self.data_store_mgr,
- self.bad_hosts
+ self.bad_hosts,
+ self.server,
)
self.profiler = Profiler(self, self.options.profile_mode)
@@ -912,9 +913,7 @@ def restart_remote_init(self):
if install_target == get_localhost_install_target():
continue
# set off remote init
- self.task_job_mgr.task_remote_mgr.remote_init(
- platform, self.server.curve_auth,
- self.server.client_pub_key_dir)
+ self.task_job_mgr.task_remote_mgr.remote_init(platform)
# Remote init/file-install is done via process pool
self.proc_pool.process()
# add platform to map (to be picked up on main loop)
@@ -1537,13 +1536,7 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
- submitted = self.task_job_mgr.submit_task_jobs(
- self.workflow,
- itasks,
- self.server.curve_auth,
- self.server.client_pub_key_dir,
- run_mode=self.get_run_mode()
- )
+ submitted = self.submit_task_jobs(itasks)
if not submitted:
return False
@@ -1562,6 +1555,13 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:
# one or more tasks were passed through the submission pipeline
return True
+ def submit_task_jobs(
+ self, itasks: 'Iterable[TaskProxy]'
+ ) -> 'List[TaskProxy]':
+ """Submit task jobs, return tasks that attempted submission."""
+ # Note: keep this as simple wrapper for task job mgr's method
+ return self.task_job_mgr.submit_task_jobs(itasks, self.get_run_mode())
+
def process_workflow_db_queue(self):
"""Update workflow DB."""
self.workflow_db_mgr.process_queued_ops()
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 23d644f812a..e9c0bfc6343 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -159,8 +159,16 @@ class TaskJobManager:
REMOTE_INIT_IN_PROGRESS: REMOTE_INIT_MSG
}
- def __init__(self, workflow, proc_pool, workflow_db_mgr,
- task_events_mgr, data_store_mgr, bad_hosts):
+ def __init__(
+ self,
+ workflow,
+ proc_pool,
+ workflow_db_mgr,
+ task_events_mgr,
+ data_store_mgr,
+ bad_hosts,
+ server,
+ ):
self.workflow: str = workflow
self.proc_pool = proc_pool
self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr
@@ -171,7 +179,8 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr,
self.bad_hosts = bad_hosts
self.bad_hosts_to_clear = set()
self.task_remote_mgr = TaskRemoteMgr(
- workflow, proc_pool, self.bad_hosts, self.workflow_db_mgr)
+ workflow, proc_pool, self.bad_hosts, self.workflow_db_mgr, server
+ )
def check_task_jobs(self, workflow, task_pool):
"""Check submission and execution timeout and polling timers.
@@ -267,40 +276,43 @@ def prep_submit_task_jobs(
def submit_task_jobs(
self,
- workflow,
itasks: 'Iterable[TaskProxy]',
- curve_auth,
- client_pub_key_dir,
- run_mode: RunMode = RunMode.LIVE,
+ run_mode: RunMode,
) -> 'List[TaskProxy]':
"""Prepare for job submission and submit task jobs.
- Preparation (host selection, remote host init, and remote install)
- is done asynchronously. Newly released tasks may be sent here several
- times until these init subprocesses have returned. Failure during
- preparation is considered to be job submission failure.
-
- Once preparation has completed or failed, reset .waiting_on_job_prep in
- task instances so the scheduler knows to stop sending them back here.
-
- This method uses prep_submit_task_jobs() as helper.
-
- Return (list): list of tasks that attempted submission.
+ Return: tasks that attempted submission.
"""
# submit "simulation/skip" mode tasks, modify "dummy" task configs:
itasks, submitted_nonlive_tasks = self.submit_nonlive_task_jobs(
- workflow, itasks, run_mode)
+ self.workflow, itasks, run_mode
+ )
# submit "live" mode tasks (and "dummy" mode tasks)
submitted_live_tasks = self.submit_livelike_task_jobs(
- workflow, itasks, curve_auth, client_pub_key_dir)
+ self.workflow, itasks
+ )
return submitted_nonlive_tasks + submitted_live_tasks
def submit_livelike_task_jobs(
- self, workflow, itasks, curve_auth, client_pub_key_dir
+ self,
+ workflow: str,
+ itasks: 'Iterable[TaskProxy]',
) -> 'List[TaskProxy]':
"""Submission for live tasks and dummy tasks.
+
+ Preparation (host selection, remote host init, and remote install)
+ is done asynchronously. Newly released tasks may be sent here several
+ times until these init subprocesses have returned. Failure during
+ preparation is considered to be job submission failure.
+
+ Once preparation has completed or failed, reset .waiting_on_job_prep in
+ task instances so the scheduler knows to stop sending them back here.
+
+ This method uses prep_submit_task_jobs() as helper.
+
+ Return: tasks that attempted submission.
"""
done_tasks: 'List[TaskProxy]' = []
# Mapping of platforms to task proxies:
@@ -404,8 +416,7 @@ def submit_livelike_task_jobs(
elif install_target not in ri_map:
# Remote init not in progress for target, so start it.
- self.task_remote_mgr.remote_init(
- platform, curve_auth, client_pub_key_dir)
+ self.task_remote_mgr.remote_init(platform)
for itask in itasks:
self.data_store_mgr.delta_job_msg(
itask.tokens.duplicate(
@@ -433,8 +444,7 @@ def submit_livelike_task_jobs(
# Remote init previously failed because a host was
# unreachable, so start it again.
del ri_map[install_target]
- self.task_remote_mgr.remote_init(
- platform, curve_auth, client_pub_key_dir)
+ self.task_remote_mgr.remote_init(platform)
for itask in itasks:
self.data_store_mgr.delta_job_msg(
itask.tokens.duplicate(
@@ -457,8 +467,7 @@ def submit_livelike_task_jobs(
)
except NoHostsError:
del ri_map[install_target]
- self.task_remote_mgr.remote_init(
- platform, curve_auth, client_pub_key_dir)
+ self.task_remote_mgr.remote_init(platform)
for itask in itasks:
self.data_store_mgr.delta_job_msg(
itask.tokens.duplicate(
diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py
index 50b523fb50c..9132a8e44c5 100644
--- a/cylc/flow/task_remote_mgr.py
+++ b/cylc/flow/task_remote_mgr.py
@@ -81,7 +81,7 @@
)
if TYPE_CHECKING:
- from zmq.auth.thread import ThreadAuthenticator
+ from cylc.flow.network.server import WorkflowRuntimeServer
# Remote installation literals
REMOTE_INIT_DONE = 'REMOTE INIT DONE'
@@ -103,7 +103,7 @@ class RemoteTidyQueueTuple(NamedTuple):
class TaskRemoteMgr:
"""Manage task remote initialisation, tidy, selection."""
- def __init__(self, workflow, proc_pool, bad_hosts, db_mgr):
+ def __init__(self, workflow, proc_pool, bad_hosts, db_mgr, server):
self.workflow = workflow
self.proc_pool = proc_pool
# self.remote_command_map = {command: host|PlatformError|None}
@@ -117,6 +117,7 @@ def __init__(self, workflow, proc_pool, bad_hosts, db_mgr):
self.is_reload = False
self.is_restart = False
self.db_mgr = db_mgr
+ self.server: WorkflowRuntimeServer = server
def _subshell_eval(
self, eval_str: str, command_pattern: re.Pattern
@@ -207,9 +208,7 @@ def subshell_eval_reset(self):
if value is not None:
del self.remote_command_map[key]
- def remote_init(
- self, platform: Dict[str, Any], curve_auth: 'ThreadAuthenticator',
- client_pub_key_dir: str) -> None:
+ def remote_init(self, platform: Dict[str, Any]) -> None:
"""Initialise a remote host if necessary.
Call "cylc remote-init" to install workflow items to remote:
@@ -219,9 +218,6 @@ def remote_init(
Args:
platform: A dict containing settings relating to platform used in
this remote installation.
- curve_auth: The ZMQ authenticator.
- client_pub_key_dir: Client public key directory, used by the
- ZMQ authenticator.
"""
install_target = platform['install target']
@@ -277,18 +273,13 @@ def remote_init(
cmd = construct_ssh_cmd(cmd, platform, host)
self.proc_pool.put_command(
SubProcContext(
- 'remote-init',
- cmd,
- stdin_files=[tmphandle],
- host=host
+ 'remote-init', cmd, stdin_files=[tmphandle], host=host
),
bad_hosts=self.bad_hosts,
callback=self._remote_init_callback,
- callback_args=[
- platform, tmphandle, curve_auth, client_pub_key_dir
- ],
+ callback_args=[platform, tmphandle],
callback_255=self._remote_init_callback_255,
- callback_255_args=[platform]
+ callback_255_args=[platform],
)
def construct_remote_tidy_ssh_cmd(
@@ -485,9 +476,7 @@ def _remote_init_callback_255(self, proc_ctx, platform):
self.ready = True
return
- def _remote_init_callback(
- self, proc_ctx, platform, tmphandle,
- curve_auth, client_pub_key_dir):
+ def _remote_init_callback(self, proc_ctx, platform, tmphandle):
"""Callback when "cylc remote-init" exits.
Write public key for install target into client public key
@@ -518,8 +507,7 @@ def _remote_init_callback(
# configure_curve must be called every time certificates are
# added or removed, in order to update the Authenticator's
# state.
- curve_auth.configure_curve(
- domain='*', location=(client_pub_key_dir))
+ self.server.configure_curve()
self.remote_init_map[install_target] = REMOTE_INIT_DONE
self.ready = True
return
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
index cfcd137b453..5af6a2b47f5 100644
--- a/tests/integration/conftest.py
+++ b/tests/integration/conftest.py
@@ -424,14 +424,14 @@ def capture_submission():
def _disable_submission(schd: 'Scheduler') -> 'Set[TaskProxy]':
submitted_tasks: 'Set[TaskProxy]' = set()
- def _submit_task_jobs(_, itasks, *args, **kwargs):
+ def _submit_task_jobs(itasks):
nonlocal submitted_tasks
for itask in itasks:
itask.state_reset(TASK_STATUS_SUBMITTED)
submitted_tasks.update(itasks)
return itasks
- schd.task_job_mgr.submit_task_jobs = _submit_task_jobs # type: ignore
+ schd.submit_task_jobs = _submit_task_jobs # type: ignore
return submitted_tasks
return _disable_submission
@@ -564,7 +564,7 @@ def reflog():
"""
def _reflog(schd: 'Scheduler', flow_nums: bool = False) -> Set[tuple]:
- submit_task_jobs = schd.task_job_mgr.submit_task_jobs
+ submit_task_jobs = schd.submit_task_jobs
triggers = set()
def _submit_task_jobs(*args, **kwargs):
@@ -580,7 +580,7 @@ def _submit_task_jobs(*args, **kwargs):
triggers.add((itask.identity, deps or None))
return itasks
- schd.task_job_mgr.submit_task_jobs = _submit_task_jobs
+ schd.submit_task_jobs = _submit_task_jobs
return triggers
diff --git a/tests/integration/run_modes/test_mode_overrides.py b/tests/integration/run_modes/test_mode_overrides.py
index 25cdf6cf68a..5aa15394d18 100644
--- a/tests/integration/run_modes/test_mode_overrides.py
+++ b/tests/integration/run_modes/test_mode_overrides.py
@@ -95,11 +95,7 @@ async def test_force_trigger_does_not_override_run_mode(
schd.pool.force_trigger_tasks('1/foo', [1])
# ... but job submission will always change this to the correct mode:
- schd.task_job_mgr.submit_task_jobs(
- schd.workflow,
- [foo],
- schd.server.curve_auth,
- schd.server.client_pub_key_dir)
+ schd.submit_task_jobs([foo])
assert foo.run_mode.value == 'skip'
@@ -157,10 +153,6 @@ async def test_run_mode_override_from_broadcast(
foo_1000 = schd.pool.get_task(ISO8601Point('1000'), 'foo')
foo_1001 = schd.pool.get_task(ISO8601Point('1001'), 'foo')
- schd.task_job_mgr.submit_task_jobs(
- schd.workflow,
- [foo_1000, foo_1001],
- schd.server.curve_auth,
- schd.server.client_pub_key_dir)
+ schd.submit_task_jobs([foo_1000, foo_1001])
assert foo_1000.run_mode.value == 'skip'
assert capture_live_submissions() == {'1001/foo'}
diff --git a/tests/integration/run_modes/test_nonlive.py b/tests/integration/run_modes/test_nonlive.py
index 90cefbf7701..d975c34922c 100644
--- a/tests/integration/run_modes/test_nonlive.py
+++ b/tests/integration/run_modes/test_nonlive.py
@@ -19,6 +19,7 @@
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.cycling.iso8601 import ISO8601Point
+from cylc.flow.scheduler import Scheduler
# Define here to ensure test doesn't just mirror code:
@@ -30,9 +31,11 @@
'submit_status': 0,
'run_signal': None,
'run_status': 0,
+ # capture_live_submissions fixture submits jobs in sim mode
'platform_name': 'simulation',
'job_runner_name': 'simulation',
- 'job_id': None},
+ 'job_id': None,
+ },
'skip': {
'flow_nums': '[1]',
'is_manual_submit': 0,
@@ -42,7 +45,8 @@
'run_status': 0,
'platform_name': 'skip',
'job_runner_name': 'skip',
- 'job_id': None},
+ 'job_id': None,
+ },
}
@@ -59,12 +63,7 @@ def submit_and_check_db():
"""
def _inner(schd):
# Submit task jobs:
- schd.task_job_mgr.submit_task_jobs(
- schd.workflow,
- schd.pool.get_tasks(),
- schd.server.curve_auth,
- schd.server.client_pub_key_dir
- )
+ schd.submit_task_jobs(schd.pool.get_tasks())
# Make sure that db changes are enacted:
schd.workflow_db_mgr.process_queued_ops()
@@ -77,7 +76,7 @@ def _inner(schd):
# Check that timestamps have been created:
for timestamp in [
- 'time_submit', 'time_submit_exit', 'time_run', 'time_run_exit'
+ 'time_submit', 'time_submit_exit', 'time_run', 'time_run_exit'
]:
assert task_jobs[timestamp] is not None
return _inner
@@ -90,19 +89,25 @@ async def test_db_task_jobs(
"""Ensure that task job data is added to the database correctly
for each run mode.
"""
- schd = scheduler(flow({
- 'scheduling': {'graph': {
- 'R1': '&'.join(KGO)}},
- 'runtime': {
- mode: {'run mode': mode} for mode in KGO}
- }))
+ schd: Scheduler = scheduler(
+ flow({
+ 'scheduling': {
+ 'graph': {
+ 'R1': ' & '.join(KGO)
+ }
+ },
+ 'runtime': {
+ mode: {'run mode': mode} for mode in KGO
+ },
+ }),
+ run_mode='live'
+ )
async with start(schd):
# Reference all task proxies so we can examine them
# at the end of the test:
itask_skip = schd.pool.get_task(IntegerPoint('1'), 'skip')
itask_live = schd.pool.get_task(IntegerPoint('1'), 'live')
-
submit_and_check_db(schd)
# Set outputs to failed:
@@ -110,6 +115,7 @@ async def test_db_task_jobs(
submit_and_check_db(schd)
+ # capture_live_submissions fixture submits jobs in sim mode
assert itask_live.run_mode.value == 'simulation'
assert itask_skip.run_mode.value == 'skip'
@@ -124,12 +130,7 @@ async def test_db_task_states(
conf['runtime'] = {'one': {'run mode': 'skip'}}
schd = scheduler(flow(conf))
async with start(schd):
- schd.task_job_mgr.submit_task_jobs(
- schd.workflow,
- schd.pool.get_tasks(),
- schd.server.curve_auth,
- schd.server.client_pub_key_dir
- )
+ schd.submit_task_jobs(schd.pool.get_tasks())
schd.workflow_db_mgr.process_queued_ops()
result = schd.workflow_db_mgr.pri_dao.connect().execute(
'SELECT * FROM task_states').fetchone()
@@ -165,12 +166,7 @@ async def test_mean_task_time(
itask.tdef.elapsed_times.extend([133.0, 132.4])
# Submit two tasks:
- schd.task_job_mgr.submit_task_jobs(
- schd.workflow,
- [itask],
- schd.server.curve_auth,
- schd.server.client_pub_key_dir
- )
+ schd.submit_task_jobs([itask])
# Ensure that the skipped task has succeeded, and that the
# number of items in the elapsed_times has not changed.
diff --git a/tests/integration/run_modes/test_skip.py b/tests/integration/run_modes/test_skip.py
index 5c9c31a6d56..1bcdf3891af 100644
--- a/tests/integration/run_modes/test_skip.py
+++ b/tests/integration/run_modes/test_skip.py
@@ -51,12 +51,7 @@ async def test_settings_override_from_broadcast(
foo, = schd.pool.get_tasks()
- schd.task_job_mgr.submit_task_jobs(
- schd.workflow,
- schd.pool.get_tasks(),
- schd.server.curve_auth,
- schd.server.client_pub_key_dir
- )
+ schd.submit_task_jobs(schd.pool.get_tasks())
# Run mode has changed:
assert foo.platform['name'] == 'skip'
# Output failed emitted:
@@ -215,13 +210,7 @@ async def test_prereqs_marked_satisfied_by_skip_mode(
async with start(schd):
foo = schd.pool.get_task(IntegerPoint(1), 'foo')
- schd.task_job_mgr.submit_task_jobs(
- schd.workflow,
- [foo],
- schd.server.curve_auth,
- schd.server.client_pub_key_dir,
- run_mode=schd.get_run_mode()
- )
+ schd.submit_task_jobs([foo])
bar = schd.pool.get_task(IntegerPoint(1), 'bar')
satisfied_message, = bar.state.prerequisites[0]._satisfied.values()
assert satisfied_message == 'satisfied by skip mode'
@@ -240,20 +229,10 @@ async def test_outputs_can_be_changed(one_conf, flow, start, scheduler, validate
{"skip": {"outputs": "failed"}},
],
)
- schd.task_job_mgr.submit_task_jobs(
- schd.workflow,
- schd.pool.get_tasks(),
- None,
- None
- )
+ schd.submit_task_jobs(schd.pool.get_tasks())
# Broadcast the task into skip mode, output succeeded and submit it:
schd.broadcast_mgr.put_broadcast(
['1'], ['one'], [{'skip': {'outputs': 'succeeded'}}]
)
- schd.task_job_mgr.submit_task_jobs(
- schd.workflow,
- schd.pool.get_tasks(),
- None,
- None
- )
+ schd.submit_task_jobs(schd.pool.get_tasks())
diff --git a/tests/integration/test_kill.py b/tests/integration/test_kill.py
index 3588ccf812e..ab13fb43b10 100644
--- a/tests/integration/test_kill.py
+++ b/tests/integration/test_kill.py
@@ -105,7 +105,7 @@ async def test_kill_preparing(
async def test_kill_preparing_pipeline(
- flow, scheduler, run, monkeypatch: pytest.MonkeyPatch
+ flow, scheduler, start, monkeypatch: pytest.MonkeyPatch
):
"""Test killing a preparing task through various stages of the preparing
pipeline that involve submitting subprocesses and waiting for them to
@@ -120,7 +120,7 @@ async def test_kill_preparing_pipeline(
schd: Scheduler = scheduler(
flow('one'), run_mode='live', paused_start=False
)
- async with run(schd):
+ async with start(schd):
remote_mgr = schd.task_job_mgr.task_remote_mgr
mock_eval_platform = Mock(return_value=None)
monkeypatch.setattr(remote_mgr, 'eval_platform', mock_eval_platform)
@@ -131,10 +131,12 @@ async def test_kill_preparing_pipeline(
itask = schd.pool.get_tasks()[0]
# Platform eval:
- await task_state(itask, TASK_STATUS_PREPARING)
+ schd.submit_task_jobs([itask])
+ assert itask.state(TASK_STATUS_PREPARING)
assert schd.release_tasks_to_run() is False
await run_cmd(kill_tasks(schd, [itask.identity]))
- await task_state(itask, TASK_STATUS_SUBMIT_FAILED)
+ assert itask.state(TASK_STATUS_SUBMIT_FAILED)
+ assert schd.release_tasks_to_run() is False
# Set to finished:
mock_eval_platform.return_value = LOCALHOST
# Should not submit after finish because it was killed:
@@ -142,11 +144,12 @@ async def test_kill_preparing_pipeline(
# Remote init:
patch_remote_init(schd, REMOTE_INIT_IN_PROGRESS)
- schd.pool._force_trigger(itask)
- await task_state(itask, TASK_STATUS_PREPARING)
+ schd.submit_task_jobs([itask])
+ assert itask.state(TASK_STATUS_PREPARING)
assert schd.release_tasks_to_run() is False
await run_cmd(kill_tasks(schd, [itask.identity]))
- await task_state(itask, TASK_STATUS_SUBMIT_FAILED)
+ assert itask.state(TASK_STATUS_SUBMIT_FAILED)
+ assert schd.release_tasks_to_run() is False
# Set to finished:
patch_remote_init(schd, REMOTE_INIT_DONE)
# Should not submit after finish because it was killed:
@@ -155,11 +158,12 @@ async def test_kill_preparing_pipeline(
# Remote file install:
patch_remote_init(schd, REMOTE_FILE_INSTALL_IN_PROGRESS)
- schd.pool._force_trigger(itask)
- await task_state(itask, TASK_STATUS_PREPARING)
+ schd.submit_task_jobs([itask])
+ assert itask.state(TASK_STATUS_PREPARING)
assert schd.release_tasks_to_run() is False
await run_cmd(kill_tasks(schd, [itask.identity]))
- await task_state(itask, TASK_STATUS_SUBMIT_FAILED)
+ assert itask.state(TASK_STATUS_SUBMIT_FAILED)
+ assert schd.release_tasks_to_run() is False
# Set to finished:
patch_remote_init(schd, REMOTE_FILE_INSTALL_DONE)
# Should not submit after finish because it was killed:
diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py
index ac1fb2f9344..4569009196c 100644
--- a/tests/integration/test_task_events_mgr.py
+++ b/tests/integration/test_task_events_mgr.py
@@ -151,13 +151,7 @@ async def test__always_insert_task_job(
schd = scheduler(id_, run_mode='live')
schd.bad_hosts = {'no-such-host-1', 'no-such-host-2'}
async with start(schd):
- schd.task_job_mgr.submit_task_jobs(
- schd.workflow,
- schd.pool.get_tasks(),
- schd.server.curve_auth,
- schd.server.client_pub_key_dir,
- run_mode=RunMode('live')
- )
+ schd.submit_task_jobs(schd.pool.get_tasks())
# Both tasks are in a waiting state:
assert all(
diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py
index b1cf1347071..eda704a2d39 100644
--- a/tests/integration/test_task_job_mgr.py
+++ b/tests/integration/test_task_job_mgr.py
@@ -223,11 +223,7 @@ async def test_broadcast_platform_change(
schd.task_job_mgr.task_remote_mgr.bad_hosts = {'food'}
# Attempt job submission:
- schd.task_job_mgr.submit_task_jobs(
- schd.workflow,
- schd.pool.get_tasks(),
- schd.server.curve_auth,
- schd.server.client_pub_key_dir)
+ schd.submit_task_jobs(schd.pool.get_tasks())
# Check that task platform hasn't become "localhost":
assert schd.pool.get_tasks()[0].platform['name'] == 'foo'
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 9163b9e3567..4ee63695efa 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -94,7 +94,7 @@ def test_should_auto_restart_now(
def test_release_tasks_to_run__auto_restart():
"""Test that Scheduler.release_tasks_to_run() works as expected
during auto restart."""
- mock_schd = Mock(
+ mock_schd = MagicMock(
auto_restart_time=(time() - 100),
auto_restart_mode=AutoRestartMode.RESTART_NORMAL,
is_paused=False,
@@ -113,7 +113,7 @@ def test_release_tasks_to_run__auto_restart():
mock_schd.pool.release_queued_tasks.assert_not_called()
Scheduler.start_job_submission(mock_schd, mock_schd.pool.get_tasks())
- mock_schd.task_job_mgr.submit_task_jobs.assert_called()
+ mock_schd.submit_task_jobs.assert_called()
def test_auto_restart_DNS_error(monkeypatch, caplog, log_filter):
diff --git a/tests/unit/test_task_remote_mgr.py b/tests/unit/test_task_remote_mgr.py
index 61cdcce2bc5..115d3a1c729 100644
--- a/tests/unit/test_task_remote_mgr.py
+++ b/tests/unit/test_task_remote_mgr.py
@@ -90,7 +90,7 @@ def test_remote_init_skip(
'get_dirs_to_symlink'):
monkeypatch.setattr(f'cylc.flow.task_remote_mgr.{item}', MagicMock())
- TaskRemoteMgr.remote_init(mock_task_remote_mgr, platform, None, '')
+ TaskRemoteMgr.remote_init(mock_task_remote_mgr, platform)
call_expected = not skip_expected
assert mock_task_remote_mgr._remote_init_items.called is call_expected
assert mock_construct_ssh_cmd.called is call_expected
@@ -115,7 +115,7 @@ def test_get_log_file_name(tmp_path: Path,
install_target: str,
load_type: Optional[str],
expected: str):
- task_remote_mgr = TaskRemoteMgr('some_workflow', None, None, None)
+ task_remote_mgr = TaskRemoteMgr('some_workflow', None, None, None, None)
if load_type == 'restart':
task_remote_mgr.is_restart = True
elif load_type == 'reload':
@@ -285,7 +285,8 @@ def _task_remote_mgr_eval(remote_cmd_map: dict) -> TaskRemoteMgr:
workflow='usg_ishimura',
proc_pool=Mock(),
bad_hosts=[],
- db_mgr=None
+ db_mgr=None,
+ server=None
)
task_remote_mgr.remote_command_map = remote_cmd_map
return task_remote_mgr
From c28a565b2208b8f2dbb9c0cb519ac9340625fd39 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Thu, 13 Feb 2025 17:18:18 +0000
Subject: [PATCH 9/9] Wipe platform info for task killed during preparation
---
cylc/flow/task_job_mgr.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index e9c0bfc6343..9ce496c8fd2 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -212,6 +212,7 @@ def kill_task_jobs(
def kill_prep_task(self, itask: 'TaskProxy') -> None:
"""Kill a preparing task."""
+ itask.summary['platforms_used'][itask.submit_num] = ''
itask.waiting_on_job_prep = False
itask.local_job_file_path = None # reset for retry
self._set_retry_timers(itask)