Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distinguish never-spawned from never-submitted. #6067

Merged
merged 13 commits into from
Apr 29, 2024
1 change: 1 addition & 0 deletions changes.d/6067.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug that sometimes allowed suicide-triggered or manually removed tasks to be added back later.
3 changes: 2 additions & 1 deletion cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1788,6 +1788,7 @@ def _reset_job_timers(self, itask):
itask.timeout = None
itask.poll_timer = None
return

ctx = (itask.submit_num, itask.state.status)
if itask.poll_timer and itask.poll_timer.ctx == ctx:
return
Expand Down Expand Up @@ -1844,7 +1845,7 @@ def _reset_job_timers(self, itask):
message += '%d*' % (num + 1)
message += '%s,' % intvl_as_str(item)
message += '...'
LOG.info(f"[{itask}] {message}")
LOG.debug(f"[{itask}] {message}")
# Set next poll time
self.check_poll_time(itask)

Expand Down
77 changes: 55 additions & 22 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class TaskPool:

ERR_TMPL_NO_TASKID_MATCH = "No matching tasks found: {0}"
ERR_PREFIX_TASK_NOT_ON_SEQUENCE = "Invalid cycle point for task: {0}, {1}"
SUICIDE_MSG = "suicide"
SUICIDE_MSG = "suicide trigger"

def __init__(
self,
Expand Down Expand Up @@ -221,7 +221,7 @@ def add_to_pool(self, itask) -> None:
self.active_tasks.setdefault(itask.point, {})
self.active_tasks[itask.point][itask.identity] = itask
self.active_tasks_changed = True
LOG.info(f"[{itask}] added to active task pool")
LOG.debug(f"[{itask}] added to active task pool")

self.create_data_store_elements(itask)

Expand Down Expand Up @@ -807,10 +807,11 @@ def remove(self, itask, reason=None):
itask.flow_nums
)

msg = "removed from active task pool"
if reason is None:
msg = "task completed"
msg += ": completed"
else:
msg = f"removed ({reason})"
msg += f": {reason}"

if itask.is_xtrigger_sequential:
self.xtrigger_mgr.sequential_spawn_next.discard(itask.identity)
Expand All @@ -837,7 +838,17 @@ def remove(self, itask, reason=None):
# Event-driven final update of task_states table.
# TODO: same for datastore (still updated by scheduler loop)
self.workflow_db_mgr.put_update_task_state(itask)
LOG.info(f"[{itask}] {msg}")

level = logging.DEBUG
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
):
level = logging.WARNING
msg += " - active job orphaned"

LOG.log(level, f"[{itask}] {msg}")
del itask

def get_tasks(self) -> List[TaskProxy]:
Expand Down Expand Up @@ -1392,14 +1403,12 @@ def spawn_on_output(self, itask, output, forced=False):
suicide.append(t)

for c_task in suicide:
msg = self.__class__.SUICIDE_MSG
if c_task.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
is_held=False):
msg += " suiciding while active"
self.remove(c_task, msg)
self.remove(c_task, self.__class__.SUICIDE_MSG)

if suicide:
# Update DB now in case of very quick respawn attempt.
# See https://github.com/cylc/cylc-flow/issues/6066
self.workflow_db_mgr.process_queued_ops()

self.remove_if_complete(itask, output)

Expand Down Expand Up @@ -1555,17 +1564,33 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:

def _get_task_history(
self, name: str, point: 'PointBase', flow_nums: Set[int]
) -> Tuple[int, str, bool]:
"""Get history of previous submits for this task."""
) -> Tuple[bool, int, str, bool]:
"""Get history of previous submits for this task.

Args:
name: task name
point: task cycle point
flow_nums: task flow numbers

Returns:
never_spawned: if task never spawned before
submit_num: submit number of previous submit
prev_status: task status of previous sumbit
prev_flow_wait: if previous submit was a flow-wait task

"""
info = self.workflow_db_mgr.pri_dao.select_prev_instances(
name, str(point)
)
try:
submit_num: int = max(s[0] for s in info)
except ValueError:
# never spawned before in any flow
# never spawned in any flow
submit_num = 0
never_spawned = True
else:
never_spawned = False
# (submit_num could still be zero, if removed before submit)

prev_status: str = TASK_STATUS_WAITING
prev_flow_wait = False
Expand All @@ -1582,7 +1607,7 @@ def _get_task_history(
# overlap due to merges (they'll have have same snum and
# f_wait); keep going to find the finished one, if any.

return submit_num, prev_status, prev_flow_wait
return never_spawned, submit_num, prev_status, prev_flow_wait

def _load_historical_outputs(self, itask):
"""Load a task's historical outputs from the DB."""
Expand Down Expand Up @@ -1619,10 +1644,19 @@ def spawn_task(
if not self.can_be_spawned(name, point):
return None

submit_num, prev_status, prev_flow_wait = (
never_spawned, submit_num, prev_status, prev_flow_wait = (
self._get_task_history(name, point, flow_nums)
)

if (
not never_spawned and
not prev_flow_wait and
submit_num == 0
):
# Previous instance removed before completing any outputs.
LOG.debug(f"Not spawning {point}/{name} - task removed")
return None

itask = self._get_task_proxy_db_outputs(
point,
self.config.get_taskdef(name),
Expand Down Expand Up @@ -1653,8 +1687,6 @@ def spawn_task(
if itask.transient and not force:
return None

# (else not previously finishedr, so run it)

if not itask.transient:
if (name, point) in self.tasks_to_hold:
LOG.info(f"[{itask}] holding (as requested earlier)")
Expand Down Expand Up @@ -2117,8 +2149,9 @@ def force_trigger_tasks(
if not self.can_be_spawned(name, point):
continue

submit_num, _prev_status, prev_fwait = self._get_task_history(
name, point, flow_nums)
_, submit_num, _prev_status, prev_fwait = (
self._get_task_history(name, point, flow_nums)
)

itask = TaskProxy(
self.tokens,
Expand Down
6 changes: 3 additions & 3 deletions tests/functional/cylc-set/02-off-flow-out.t
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ reftest_run

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/a_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/a_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" '1/a_cold.* task completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" '1/a_cold.* completed'

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/b_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/b_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" '1/b_cold.* task completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" '1/b_cold.* completed'

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/c_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/c_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" '1/c_cold.* task completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" '1/c_cold.* completed'

purge
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ __FLOW__

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"

cylc play "${WORKFLOW_NAME}"
cylc play --debug "${WORKFLOW_NAME}"

poll_grep_workflow_log "INFO - DONE"

Expand Down
2 changes: 1 addition & 1 deletion tests/functional/hold-release/05-release.t
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
inherit = STOP
script = """
cylc__job__poll_grep_workflow_log -E \
'1/dog1/01:succeeded.* task completed'
'1/dog1/01:succeeded.* completed'
cylc stop "${CYLC_WORKFLOW_ID}"
"""
__FLOW_CONFIG__
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/spawn-on-demand/09-set-outputs/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
cylc set --flow=2 --output=out1 --output=out2 "${CYLC_WORKFLOW_ID}//1/foo"

# Set bar outputs after it is gone from the pool.
cylc__job__poll_grep_workflow_log -E "1/bar.* task completed"
cylc__job__poll_grep_workflow_log -E "1/bar.* completed"
cylc set --flow=2 --output=out1 --output=out2 "${CYLC_WORKFLOW_ID}//1/bar"
"""
[[qux, quw, fux, fuw]]
Expand Down
24 changes: 0 additions & 24 deletions tests/functional/triggering/15-suicide.t

This file was deleted.

23 changes: 0 additions & 23 deletions tests/functional/triggering/15-suicide/flow.cylc

This file was deleted.

5 changes: 0 additions & 5 deletions tests/functional/triggering/15-suicide/reference.log

This file was deleted.

34 changes: 0 additions & 34 deletions tests/functional/triggering/18-suicide-active.t

This file was deleted.

11 changes: 0 additions & 11 deletions tests/functional/triggering/18-suicide-active/flow.cylc

This file was deleted.

3 changes: 2 additions & 1 deletion tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext
from cylc.flow.scheduler import Scheduler

import logging
from typing import Any as Fixture


Expand Down Expand Up @@ -51,7 +52,7 @@ async def test__reset_job_timers(
process_execution_polling_intervals.
"""
schd = scheduler(flow(one_conf))
async with start(schd):
async with start(schd, level=logging.DEBUG):
itask = schd.pool.get_tasks()[0]
itask.state.status = 'running'
itask.platform['execution polling intervals'] = [25]
Expand Down
Loading