diff --git a/cylc/flow/run_modes/simulation.py b/cylc/flow/run_modes/simulation.py index a9090bf0a8..53ee381925 100644 --- a/cylc/flow/run_modes/simulation.py +++ b/cylc/flow/run_modes/simulation.py @@ -18,9 +18,15 @@ from dataclasses import dataclass from logging import INFO -from typing import ( - TYPE_CHECKING, Any, Dict, List, Tuple, Union) from time import time +from typing import ( + TYPE_CHECKING, + Any, + Dict, + List, + Tuple, + Union, +) from metomi.isodatetime.parsers import DurationParser @@ -29,22 +35,23 @@ from cylc.flow.cycling.loader import get_point from cylc.flow.exceptions import PointParsingError from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM -from cylc.flow.task_outputs import TASK_OUTPUT_SUBMITTED -from cylc.flow.task_state import ( - TASK_STATUS_RUNNING, - TASK_STATUS_FAILED, - TASK_STATUS_SUCCEEDED, +from cylc.flow.run_modes import RunMode +from cylc.flow.task_outputs import ( + TASK_OUTPUT_FAILED, + TASK_OUTPUT_SUBMITTED, + TASK_OUTPUT_SUCCEEDED, ) +from cylc.flow.task_state import TASK_STATUS_RUNNING from cylc.flow.wallclock import get_unix_time_from_time_string -from cylc.flow.run_modes import RunMode if TYPE_CHECKING: + from typing_extensions import Literal + from cylc.flow.task_events_mgr import TaskEventsManager from cylc.flow.task_job_mgr import TaskJobManager from cylc.flow.task_proxy import TaskProxy from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager - from typing_extensions import Literal def submit_task_job( @@ -357,12 +364,12 @@ def sim_time_check( if now > itask.mode_settings.timeout: if itask.mode_settings.sim_task_fails: task_events_manager.process_message( - itask, 'CRITICAL', TASK_STATUS_FAILED, + itask, 'CRITICAL', TASK_OUTPUT_FAILED, flag=task_events_manager.FLAG_RECEIVED ) else: task_events_manager.process_message( - itask, 'DEBUG', TASK_STATUS_SUCCEEDED, + itask, 'DEBUG', TASK_OUTPUT_SUCCEEDED, flag=task_events_manager.FLAG_RECEIVED ) # Simulate message outputs. diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index c9003c6c70..cbd0efebbd 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -134,6 +134,7 @@ from cylc.flow.subprocpool import SubProcPool from cylc.flow.task_events_mgr import TaskEventsManager from cylc.flow.task_job_mgr import TaskJobManager +from cylc.flow.task_outputs import TASK_OUTPUT_FAILED from cylc.flow.task_pool import TaskPool from cylc.flow.task_remote_mgr import ( REMOTE_FILE_INSTALL_255, @@ -144,7 +145,6 @@ REMOTE_INIT_FAILED, ) from cylc.flow.task_state import ( - TASK_STATUS_FAILED, TASK_STATUS_PREPARING, TASK_STATUS_RUNNING, TASK_STATUS_SUBMITTED, @@ -1085,7 +1085,7 @@ def kill_tasks( if jobless: # Directly set failed in sim mode: self.task_events_mgr.process_message( - itask, 'CRITICAL', TASK_STATUS_FAILED, + itask, 'CRITICAL', TASK_OUTPUT_FAILED, flag=self.task_events_mgr.FLAG_RECEIVED ) else: diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 679c8a0aff..bacdd40faa 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -413,32 +413,27 @@ class TaskEventsManager(): * Set up task (submission) retries on job (submission) failures. * Generate and manage task event handlers. """ - EVENT_FAILED = TASK_OUTPUT_FAILED EVENT_LATE = "late" EVENT_RETRY = "retry" - EVENT_STARTED = TASK_OUTPUT_STARTED - EVENT_SUBMITTED = TASK_OUTPUT_SUBMITTED - EVENT_EXPIRED = TASK_OUTPUT_EXPIRED - EVENT_SUBMIT_FAILED = "submission failed" + _EVENT_SUBMIT_FAILED = "submission failed" EVENT_SUBMIT_RETRY = "submission retry" - EVENT_SUCCEEDED = TASK_OUTPUT_SUCCEEDED HANDLER_CUSTOM = "event-handler" HANDLER_MAIL = "event-mail" JOB_FAILED = "job failed" HANDLER_JOB_LOGS_RETRIEVE = "job-logs-retrieve" FLAG_INTERNAL = "(internal)" FLAG_RECEIVED = "(received)" - FLAG_RECEIVED_IGNORED = "(received-ignored)" + _FLAG_RECEIVED_IGNORED = "(received-ignored)" FLAG_POLLED = "(polled)" - FLAG_POLLED_IGNORED = "(polled-ignored)" + _FLAG_POLLED_IGNORED = "(polled-ignored)" KEY_EXECUTE_TIME_LIMIT = 'execution_time_limit' NON_UNIQUE_EVENTS = ('warning', 'critical', 'custom') JOB_SUBMIT_SUCCESS_FLAG = 0 JOB_SUBMIT_FAIL_FLAG = 1 JOB_LOGS_RETRIEVAL_EVENTS = { - EVENT_FAILED, + TASK_OUTPUT_FAILED, EVENT_RETRY, - EVENT_SUCCEEDED + TASK_OUTPUT_SUCCEEDED } workflow_cfg: Dict[str, Any] @@ -668,10 +663,16 @@ def process_message( True: if polling is required to confirm a reversal of status. """ - - # Log messages if event_time is None: event_time = get_current_time_string() + + event = message + if message == TASK_OUTPUT_SUBMIT_FAILED: + # Previously we were using the event name as the message + event = self._EVENT_SUBMIT_FAILED + elif message == self._EVENT_SUBMIT_FAILED: + message = TASK_OUTPUT_SUBMIT_FAILED + if submit_num is None: submit_num = itask.submit_num if isinstance(severity, int): @@ -705,7 +706,7 @@ def process_message( msg0 = TASK_OUTPUT_FAILED completed_output: Optional[bool] = False - if msg0 not in [TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_FAILED]: + if msg0 not in {TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_FAILED}: completed_output = ( itask.state.outputs.set_message_complete(msg0, forced) ) @@ -723,7 +724,7 @@ def process_message( self.FLAG_INTERNAL, submit_num, forced ) - if message == self.EVENT_STARTED: + if message == TASK_OUTPUT_STARTED: if ( flag == self.FLAG_RECEIVED and itask.state.is_gt(TASK_STATUS_RUNNING) @@ -733,15 +734,15 @@ def process_message( self._process_message_started(itask, event_time, forced) self.spawn_children(itask, TASK_OUTPUT_STARTED) - elif message == self.EVENT_SUCCEEDED: + elif message == TASK_OUTPUT_SUCCEEDED: self._process_message_succeeded(itask, event_time, forced) self.spawn_children(itask, TASK_OUTPUT_SUCCEEDED) - elif message == self.EVENT_EXPIRED: + elif message == TASK_OUTPUT_EXPIRED: self._process_message_expired(itask, forced) self.spawn_children(itask, TASK_OUTPUT_EXPIRED) - elif message == self.EVENT_FAILED: + elif message == TASK_OUTPUT_FAILED: if ( flag == self.FLAG_RECEIVED and itask.state.is_gt(TASK_STATUS_FAILED) @@ -753,7 +754,7 @@ def process_message( ): self.spawn_children(itask, TASK_OUTPUT_FAILED) - elif message == self.EVENT_SUBMIT_FAILED: + elif message == TASK_OUTPUT_SUBMIT_FAILED: if ( flag == self.FLAG_RECEIVED and itask.state.is_gt(TASK_STATUS_SUBMIT_FAILED) @@ -763,7 +764,7 @@ def process_message( if self._process_message_submit_failed(itask, event_time, forced): self.spawn_children(itask, TASK_OUTPUT_SUBMIT_FAILED) - elif message == self.EVENT_SUBMITTED: + elif message == TASK_OUTPUT_SUBMITTED: if ( flag == self.FLAG_RECEIVED and itask.state.is_gte(TASK_STATUS_SUBMITTED) @@ -811,7 +812,7 @@ def process_message( # Already failed. return True aborted_with = message[len(ABORT_MESSAGE_PREFIX):] - self._db_events_insert(itask, "aborted", message) + self._db_events_insert(itask, "aborted", event) self.workflow_db_mgr.put_update_task_jobs( itask, {"run_signal": aborted_with}) if self._process_message_failed( @@ -821,7 +822,7 @@ def process_message( elif message.startswith(VACATION_MESSAGE_PREFIX): # Task job pre-empted into a vacation state - self._db_events_insert(itask, "vacated", message) + self._db_events_insert(itask, "vacated", event) itask.set_summary_time('started') # unset if TimerFlags.SUBMISSION_RETRY in itask.try_timers: itask.try_timers[TimerFlags.SUBMISSION_RETRY].num = 0 @@ -835,7 +836,7 @@ def process_message( # this feature can only be used on the deprecated loadleveler # system, we should probably aim to remove support for job vacation # instead. Otherwise, we should have: - # self.setup_event_handlers(itask, 'vacated', message) + # self.setup_event_handlers(itask, 'vacated', event) elif completed_output: # Message of a custom task output. @@ -843,7 +844,7 @@ def process_message( # Log completion of o (not needed for standard outputs) trigger = itask.state.outputs.get_trigger(message) LOG.info(f"[{itask}] completed output {trigger}") - self.setup_event_handlers(itask, trigger, message) + self.setup_event_handlers(itask, trigger, event) self.spawn_children(itask, msg0) else: @@ -854,11 +855,11 @@ def process_message( # No state change. LOG.debug(f"[{itask}] unhandled: {message}") self._db_events_insert( - itask, (f"message {lseverity}"), message) + itask, (f"message {lseverity}"), event) if lseverity in self.NON_UNIQUE_EVENTS: itask.non_unique_events.update({lseverity: 1}) - self.setup_event_handlers(itask, lseverity, message) + self.setup_event_handlers(itask, lseverity, event) return None @@ -889,7 +890,7 @@ def _process_message_check( # Ignore received messages from old jobs LOG.warning( f"[{itask}] " - f"{self.FLAG_RECEIVED_IGNORED}{message}{timestamp} " + f"{self._FLAG_RECEIVED_IGNORED}{message}{timestamp} " f"for job({submit_num:02d}) != job({itask.submit_num:02d})" ) return False @@ -919,13 +920,13 @@ def _process_message_check( if flag == self.FLAG_RECEIVED: LOG.warning( f"[{itask}] " - f"{self.FLAG_RECEIVED_IGNORED}{message}{timestamp}" + f"{self._FLAG_RECEIVED_IGNORED}{message}{timestamp}" ) else: LOG.warning( f"[{itask}] " - f"{self.FLAG_POLLED_IGNORED}{message}{timestamp}" + f"{self._FLAG_POLLED_IGNORED}{message}{timestamp}" ) return False @@ -933,8 +934,11 @@ def _process_message_check( # Demote log level to DEBUG if this is a message that duplicates what # gets logged by itask state change anyway (and not manual poll) if severity_lvl > DEBUG and flag != self.FLAG_POLLED and message in { - self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED, - self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR' + TASK_OUTPUT_SUBMITTED, + TASK_OUTPUT_STARTED, + TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUT_SUBMIT_FAILED, + f'{FAIL_MESSAGE_PREFIX}ERR' }: severity_lvl = DEBUG LOG.log(severity_lvl, f"[{itask}] {flag}{message}{timestamp}") @@ -1334,7 +1338,7 @@ def _process_message_failed( self.workflow_db_mgr.put_update_task_state(itask) else: self.setup_event_handlers( - itask, self.EVENT_FAILED, message + itask, TASK_OUTPUT_FAILED, message ) itask.state.outputs.set_message_complete(TASK_OUTPUT_FAILED) self.data_store_mgr.delta_task_output( @@ -1367,7 +1371,8 @@ def _process_message_started( ) if itask.state_reset(TASK_STATUS_RUNNING, forced=forced): self.setup_event_handlers( - itask, self.EVENT_STARTED, f'job {self.EVENT_STARTED}') + itask, TASK_OUTPUT_STARTED, f'job {TASK_OUTPUT_STARTED}' + ) self.data_store_mgr.delta_task_state(itask) self._reset_job_timers(itask) @@ -1382,7 +1387,7 @@ def _process_message_expired(self, itask: 'TaskProxy', forced: bool): self.data_store_mgr.delta_task_state(itask) self.setup_event_handlers( itask, - self.EVENT_EXPIRED, + TASK_OUTPUT_EXPIRED, "Task expired: will not submit job." ) @@ -1413,7 +1418,8 @@ def _process_message_succeeded( itask.summary['started_time']) if itask.state_reset(TASK_STATUS_SUCCEEDED, forced=forced): self.setup_event_handlers( - itask, self.EVENT_SUCCEEDED, f"job {self.EVENT_SUCCEEDED}") + itask, TASK_OUTPUT_SUCCEEDED, f"job {TASK_OUTPUT_SUCCEEDED}" + ) self.data_store_mgr.delta_task_state(itask) self._reset_job_timers(itask) @@ -1428,7 +1434,7 @@ def _process_message_submit_failed( Return True if no retries (hence go to the submit-failed state). """ no_retries = False - LOG.critical(f"[{itask}] {self.EVENT_SUBMIT_FAILED}") + LOG.critical(f"[{itask}] {self._EVENT_SUBMIT_FAILED}") if event_time is None: event_time = get_current_time_string() self.workflow_db_mgr.put_update_task_jobs(itask, { @@ -1450,8 +1456,8 @@ def _process_message_submit_failed( self.workflow_db_mgr.put_update_task_state(itask) else: self.setup_event_handlers( - itask, self.EVENT_SUBMIT_FAILED, - f'job {self.EVENT_SUBMIT_FAILED}' + itask, self._EVENT_SUBMIT_FAILED, + f'job {self._EVENT_SUBMIT_FAILED}' ) itask.state.outputs.set_message_complete( TASK_OUTPUT_SUBMIT_FAILED @@ -1465,7 +1471,7 @@ def _process_message_submit_failed( self._retry_task(itask, timer.timeout, submit_retry=True) delay_msg = f"retrying in {timer.delay_timeout_as_str()}" LOG.warning(f"[{itask}] {delay_msg}") - msg = f"job {self.EVENT_SUBMIT_FAILED}, {delay_msg}" + msg = f"job {self._EVENT_SUBMIT_FAILED}, {delay_msg}" self.setup_event_handlers(itask, self.EVENT_SUBMIT_RETRY, msg) # Register newly submit-failed job with the database and datastore. @@ -1514,8 +1520,8 @@ def _process_message_submitted( itask.state_reset(is_queued=False, forced=forced) self.setup_event_handlers( itask, - self.EVENT_SUBMITTED, - f'job {self.EVENT_SUBMITTED}', + TASK_OUTPUT_SUBMITTED, + f'job {TASK_OUTPUT_SUBMITTED}', ) self.data_store_mgr.delta_task_state(itask) self._reset_job_timers(itask) diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 6fd8ba534c..a9d5654c9a 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -95,6 +95,7 @@ from cylc.flow.task_outputs import ( TASK_OUTPUT_FAILED, TASK_OUTPUT_STARTED, + TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUCCEEDED, ) @@ -723,8 +724,8 @@ def _kill_task_job_callback(self, itask, cmd_ctx, line): itask.state.kill_failed = True elif itask.state(TASK_STATUS_SUBMITTED): self.task_events_mgr.process_message( - itask, CRITICAL, self.task_events_mgr.EVENT_SUBMIT_FAILED, - ctx.timestamp) + itask, CRITICAL, TASK_OUTPUT_SUBMIT_FAILED, ctx.timestamp + ) elif itask.state(TASK_STATUS_RUNNING): self.task_events_mgr.process_message( itask, CRITICAL, TASK_OUTPUT_FAILED) @@ -861,7 +862,7 @@ def _poll_task_job_callback( TASK_STATUS_SUBMITTED: jp_ctx.time_submit_exit, } submit_failed_msgs = { - TaskEventsManager.EVENT_SUBMIT_FAILED: jp_ctx.time_submit_exit, + TASK_OUTPUT_SUBMIT_FAILED: jp_ctx.time_submit_exit, } started_msgs = { **submitted_msgs, @@ -1162,8 +1163,8 @@ def _submit_task_job_callback(self, itask, cmd_ctx, line): itask, DEBUG, TASK_OUTPUT_SUBMITTED, ctx.timestamp) else: self.task_events_mgr.process_message( - itask, CRITICAL, self.task_events_mgr.EVENT_SUBMIT_FAILED, - ctx.timestamp) + itask, CRITICAL, TASK_OUTPUT_SUBMIT_FAILED, ctx.timestamp + ) def _prep_submit_task_job( self, @@ -1350,7 +1351,8 @@ def _prep_submit_task_job_error( } ) self.task_events_mgr.process_message( - itask, CRITICAL, self.task_events_mgr.EVENT_SUBMIT_FAILED) + itask, CRITICAL, TASK_OUTPUT_SUBMIT_FAILED + ) def _prep_submit_task_job_impl(self, itask, rtconfig): """Helper for self._prep_submit_task_job.""" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c5e84158ab..39619907b6 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -47,10 +47,11 @@ prereqs_and_outputs_query, ) from cylc.flow.scripts.validate import ValidateOptions -from cylc.flow.task_state import ( - TASK_STATUS_SUBMITTED, - TASK_STATUS_SUCCEEDED, +from cylc.flow.task_outputs import ( + TASK_OUTPUT_SUBMITTED, + TASK_OUTPUT_SUCCEEDED, ) +from cylc.flow.task_state import TASK_STATUS_SUBMITTED from cylc.flow.util import serialise_set from cylc.flow.wallclock import get_current_time_string from cylc.flow.workflow_files import infer_latest_run_from_id @@ -742,11 +743,11 @@ def capture_live_submissions(capcall, monkeypatch): def fake_submit(self, itasks, *_): self.submit_nonlive_task_jobs(itasks, RunMode.SIMULATION) for itask in itasks: - for status in (TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED): + for output in (TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUCCEEDED): self.task_events_mgr.process_message( itask, 'INFO', - status, + output, '2000-01-01T00:00:00Z', '(received)', ) diff --git a/tests/integration/test_data_store_mgr.py b/tests/integration/test_data_store_mgr.py index 5c42f9d352..e0cec07eef 100644 --- a/tests/integration/test_data_store_mgr.py +++ b/tests/integration/test_data_store_mgr.py @@ -14,11 +14,18 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from typing import Iterable, List, TYPE_CHECKING, cast +from typing import ( + Iterable, + List, + cast, +) import pytest -from cylc.flow.data_messages_pb2 import PbPrerequisite, PbTaskProxy +from cylc.flow.data_messages_pb2 import ( + PbPrerequisite, + PbTaskProxy, +) from cylc.flow.data_store_mgr import ( EDGES, FAMILY_PROXIES, @@ -29,10 +36,9 @@ ) from cylc.flow.id import Tokens from cylc.flow.scheduler import Scheduler -from cylc.flow.task_events_mgr import TaskEventsManager from cylc.flow.task_outputs import ( - TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, + TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_SUCCEEDED, ) from cylc.flow.task_state import ( @@ -43,10 +49,6 @@ from cylc.flow.wallclock import get_current_time_string -if TYPE_CHECKING: - from cylc.flow.scheduler import Scheduler - - # NOTE: Some of these tests mutate the data store, so running them in # isolation may see failures when they actually pass if you run the # whole file @@ -460,7 +462,7 @@ def _patch_remove(*args, **kwargs): # satisfy the submitted & started outputs # (note started implies submitted) one.task_events_mgr.process_message( - itask, 'INFO', TaskEventsManager.EVENT_STARTED + itask, 'INFO', TASK_OUTPUT_STARTED ) # the delta should be populated with the newly satisfied outputs @@ -480,7 +482,7 @@ def _patch_remove(*args, **kwargs): # satisfy the succeeded output one.task_events_mgr.process_message( - itask, 'INFO', TaskEventsManager.EVENT_SUCCEEDED + itask, 'INFO', TASK_OUTPUT_SUCCEEDED ) # the delta should be populated with ALL satisfied outputs diff --git a/tests/integration/test_optional_outputs.py b/tests/integration/test_optional_outputs.py index da4ff989b3..fb2e7bf957 100644 --- a/tests/integration/test_optional_outputs.py +++ b/tests/integration/test_optional_outputs.py @@ -22,24 +22,23 @@ """ from itertools import combinations -from typing import TYPE_CHECKING import pytest from cylc.flow.cycling.integer import IntegerPoint from cylc.flow.cycling.iso8601 import ISO8601Point from cylc.flow.network.resolvers import TaskMsg -from cylc.flow.task_events_mgr import ( - TaskEventsManager, -) +from cylc.flow.scheduler import Scheduler from cylc.flow.task_outputs import ( - TASK_OUTPUTS, TASK_OUTPUT_EXPIRED, TASK_OUTPUT_FAILED, TASK_OUTPUT_FINISHED, + TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUTS, get_completion_expression, ) +from cylc.flow.task_proxy import TaskProxy from cylc.flow.task_state import ( TASK_STATUS_EXPIRED, TASK_STATUS_PREPARING, @@ -47,10 +46,6 @@ TASK_STATUS_WAITING, ) -if TYPE_CHECKING: - from cylc.flow.task_proxy import TaskProxy - from cylc.flow.scheduler import Scheduler - def reset_outputs(itask: 'TaskProxy'): """Undo the consequences of setting task outputs. @@ -206,7 +201,7 @@ async def test_expire_orthogonality(flow, scheduler, start): '1/a/01', '2000-01-01T00:00:00+00', 'INFO', - TaskEventsManager.EVENT_SUBMIT_FAILED + TASK_OUTPUT_SUBMIT_FAILED, ), ) schd.process_queued_task_messages() @@ -220,7 +215,7 @@ async def test_expire_orthogonality(flow, scheduler, start): '1/a/01', '2000-01-01T00:00:00+00', 'INFO', - TaskEventsManager.EVENT_FAILED, + TASK_OUTPUT_FAILED, ), ) schd.process_queued_task_messages() @@ -234,7 +229,7 @@ async def test_expire_orthogonality(flow, scheduler, start): '1/a/01', '2000-01-01T00:00:00+00', 'INFO', - TaskEventsManager.EVENT_EXPIRED, + TASK_OUTPUT_EXPIRED, ), ) schd.process_queued_task_messages() diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index ffb255c0cf..be0b8c73d2 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -1174,27 +1174,26 @@ async def test_detect_incomplete_tasks( incomplete_final_task_states = { # final task states that would leave a task with # completion=succeeded incomplete - TASK_STATUS_FAILED: TaskEventsManager.EVENT_FAILED, - TASK_STATUS_EXPIRED: TaskEventsManager.EVENT_EXPIRED, - TASK_STATUS_SUBMIT_FAILED: TaskEventsManager.EVENT_SUBMIT_FAILED + TASK_STATUS_FAILED, + TASK_STATUS_EXPIRED, + TASK_STATUS_SUBMIT_FAILED, } id_ = flow({ 'scheduling': { 'graph': { # a workflow with one task for each of the final task states - 'R1': '\n'.join(incomplete_final_task_states.keys()) + 'R1': '\n'.join(incomplete_final_task_states) } } }) - schd = scheduler(id_) + schd: Scheduler = scheduler(id_) async with start(schd, level=logging.DEBUG) as log: itasks = schd.pool.get_tasks() for itask in itasks: itask.state_reset(is_queued=False) # spawn the output corresponding to the task schd.pool.task_events_mgr.process_message( - itask, 1, - incomplete_final_task_states[itask.tdef.name] + itask, 1, message=itask.tdef.name ) # ensure that it is correctly identified as incomplete assert not itask.state.outputs.is_complete()