Skip to content

Commit

Permalink
TaskEventsManager: remove redundant events that are identical to outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Feb 11, 2025
1 parent 57af3a1 commit b1714c3
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 93 deletions.
29 changes: 18 additions & 11 deletions cylc/flow/run_modes/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@

from dataclasses import dataclass
from logging import INFO
from typing import (
TYPE_CHECKING, Any, Dict, List, Tuple, Union)
from time import time
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Tuple,
Union,
)

from metomi.isodatetime.parsers import DurationParser

Expand All @@ -29,22 +35,23 @@
from cylc.flow.cycling.loader import get_point
from cylc.flow.exceptions import PointParsingError
from cylc.flow.platforms import FORBIDDEN_WITH_PLATFORM
from cylc.flow.task_outputs import TASK_OUTPUT_SUBMITTED
from cylc.flow.task_state import (
TASK_STATUS_RUNNING,
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
from cylc.flow.run_modes import RunMode
from cylc.flow.task_outputs import (
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_SUCCEEDED,
)
from cylc.flow.task_state import TASK_STATUS_RUNNING
from cylc.flow.wallclock import get_unix_time_from_time_string
from cylc.flow.run_modes import RunMode


if TYPE_CHECKING:
from typing_extensions import Literal

from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_job_mgr import TaskJobManager
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
from typing_extensions import Literal


def submit_task_job(
Expand Down Expand Up @@ -357,12 +364,12 @@ def sim_time_check(
if now > itask.mode_settings.timeout:
if itask.mode_settings.sim_task_fails:
task_events_manager.process_message(
itask, 'CRITICAL', TASK_STATUS_FAILED,
itask, 'CRITICAL', TASK_OUTPUT_FAILED,
flag=task_events_manager.FLAG_RECEIVED
)
else:
task_events_manager.process_message(
itask, 'DEBUG', TASK_STATUS_SUCCEEDED,
itask, 'DEBUG', TASK_OUTPUT_SUCCEEDED,
flag=task_events_manager.FLAG_RECEIVED
)
# Simulate message outputs.
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_job_mgr import TaskJobManager
from cylc.flow.task_outputs import TASK_OUTPUT_FAILED
from cylc.flow.task_pool import TaskPool
from cylc.flow.task_remote_mgr import (
REMOTE_FILE_INSTALL_255,
Expand All @@ -144,7 +145,6 @@
REMOTE_INIT_FAILED,
)
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_PREPARING,
TASK_STATUS_RUNNING,
TASK_STATUS_SUBMITTED,
Expand Down Expand Up @@ -1085,7 +1085,7 @@ def kill_tasks(
if jobless:
# Directly set failed in sim mode:
self.task_events_mgr.process_message(
itask, 'CRITICAL', TASK_STATUS_FAILED,
itask, 'CRITICAL', TASK_OUTPUT_FAILED,
flag=self.task_events_mgr.FLAG_RECEIVED
)
else:
Expand Down
89 changes: 49 additions & 40 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,32 +413,27 @@ class TaskEventsManager():
* Set up task (submission) retries on job (submission) failures.
* Generate and manage task event handlers.
"""
EVENT_FAILED = TASK_OUTPUT_FAILED
EVENT_LATE = "late"
EVENT_RETRY = "retry"
EVENT_STARTED = TASK_OUTPUT_STARTED
EVENT_SUBMITTED = TASK_OUTPUT_SUBMITTED
EVENT_EXPIRED = TASK_OUTPUT_EXPIRED
EVENT_SUBMIT_FAILED = "submission failed"
_EVENT_SUBMIT_FAILED = "submission failed"
EVENT_SUBMIT_RETRY = "submission retry"
EVENT_SUCCEEDED = TASK_OUTPUT_SUCCEEDED
HANDLER_CUSTOM = "event-handler"
HANDLER_MAIL = "event-mail"
JOB_FAILED = "job failed"
HANDLER_JOB_LOGS_RETRIEVE = "job-logs-retrieve"
FLAG_INTERNAL = "(internal)"
FLAG_RECEIVED = "(received)"
FLAG_RECEIVED_IGNORED = "(received-ignored)"
_FLAG_RECEIVED_IGNORED = "(received-ignored)"
FLAG_POLLED = "(polled)"
FLAG_POLLED_IGNORED = "(polled-ignored)"
_FLAG_POLLED_IGNORED = "(polled-ignored)"
KEY_EXECUTE_TIME_LIMIT = 'execution_time_limit'
NON_UNIQUE_EVENTS = ('warning', 'critical', 'custom')
JOB_SUBMIT_SUCCESS_FLAG = 0
JOB_SUBMIT_FAIL_FLAG = 1
JOB_LOGS_RETRIEVAL_EVENTS = {
EVENT_FAILED,
TASK_OUTPUT_FAILED,
EVENT_RETRY,
EVENT_SUCCEEDED
TASK_OUTPUT_SUCCEEDED
}

workflow_cfg: Dict[str, Any]
Expand Down Expand Up @@ -668,10 +663,16 @@ def process_message(
True: if polling is required to confirm a reversal of status.
"""

# Log messages
if event_time is None:
event_time = get_current_time_string()

event = message
if message == TASK_OUTPUT_SUBMIT_FAILED:
# Previously we were using the event name as the message
event = self._EVENT_SUBMIT_FAILED
elif message == self._EVENT_SUBMIT_FAILED:
message = TASK_OUTPUT_SUBMIT_FAILED

if submit_num is None:
submit_num = itask.submit_num
if isinstance(severity, int):
Expand Down Expand Up @@ -705,7 +706,10 @@ def process_message(
msg0 = TASK_OUTPUT_FAILED

completed_output: Optional[bool] = False
if msg0 not in [TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_FAILED]:
if msg0 not in {TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_FAILED}:
# Do not complete (submit-)failed outputs as there might be
# retries. We check this in the respective _process_message_x
# helper functions and complete the outputs there if appropriate.
completed_output = (
itask.state.outputs.set_message_complete(msg0, forced)
)
Expand All @@ -723,7 +727,7 @@ def process_message(
self.FLAG_INTERNAL, submit_num, forced
)

if message == self.EVENT_STARTED:
if message == TASK_OUTPUT_STARTED:
if (
flag == self.FLAG_RECEIVED
and itask.state.is_gt(TASK_STATUS_RUNNING)
Expand All @@ -733,15 +737,15 @@ def process_message(
self._process_message_started(itask, event_time, forced)
self.spawn_children(itask, TASK_OUTPUT_STARTED)

elif message == self.EVENT_SUCCEEDED:
elif message == TASK_OUTPUT_SUCCEEDED:
self._process_message_succeeded(itask, event_time, forced)
self.spawn_children(itask, TASK_OUTPUT_SUCCEEDED)

elif message == self.EVENT_EXPIRED:
elif message == TASK_OUTPUT_EXPIRED:
self._process_message_expired(itask, forced)
self.spawn_children(itask, TASK_OUTPUT_EXPIRED)

elif message == self.EVENT_FAILED:
elif message == TASK_OUTPUT_FAILED:
if (
flag == self.FLAG_RECEIVED
and itask.state.is_gt(TASK_STATUS_FAILED)
Expand All @@ -753,7 +757,7 @@ def process_message(
):
self.spawn_children(itask, TASK_OUTPUT_FAILED)

elif message == self.EVENT_SUBMIT_FAILED:
elif message == TASK_OUTPUT_SUBMIT_FAILED:
if (
flag == self.FLAG_RECEIVED
and itask.state.is_gt(TASK_STATUS_SUBMIT_FAILED)
Expand All @@ -763,7 +767,7 @@ def process_message(
if self._process_message_submit_failed(itask, event_time, forced):
self.spawn_children(itask, TASK_OUTPUT_SUBMIT_FAILED)

elif message == self.EVENT_SUBMITTED:
elif message == TASK_OUTPUT_SUBMITTED:
if (
flag == self.FLAG_RECEIVED
and itask.state.is_gte(TASK_STATUS_SUBMITTED)
Expand Down Expand Up @@ -811,7 +815,7 @@ def process_message(
# Already failed.
return True
aborted_with = message[len(ABORT_MESSAGE_PREFIX):]
self._db_events_insert(itask, "aborted", message)
self._db_events_insert(itask, "aborted", event)
self.workflow_db_mgr.put_update_task_jobs(
itask, {"run_signal": aborted_with})
if self._process_message_failed(
Expand All @@ -821,7 +825,7 @@ def process_message(

elif message.startswith(VACATION_MESSAGE_PREFIX):
# Task job pre-empted into a vacation state
self._db_events_insert(itask, "vacated", message)
self._db_events_insert(itask, "vacated", event)
itask.set_summary_time('started') # unset
if TimerFlags.SUBMISSION_RETRY in itask.try_timers:
itask.try_timers[TimerFlags.SUBMISSION_RETRY].num = 0
Expand All @@ -835,15 +839,15 @@ def process_message(
# this feature can only be used on the deprecated loadleveler
# system, we should probably aim to remove support for job vacation
# instead. Otherwise, we should have:
# self.setup_event_handlers(itask, 'vacated', message)
# self.setup_event_handlers(itask, 'vacated', event)

elif completed_output:
# Message of a custom task output.
# No state change.
# Log completion of o (not needed for standard outputs)
trigger = itask.state.outputs.get_trigger(message)
LOG.info(f"[{itask}] completed output {trigger}")
self.setup_event_handlers(itask, trigger, message)
self.setup_event_handlers(itask, trigger, event)
self.spawn_children(itask, msg0)

else:
Expand All @@ -854,11 +858,11 @@ def process_message(
# No state change.
LOG.debug(f"[{itask}] unhandled: {message}")
self._db_events_insert(
itask, (f"message {lseverity}"), message)
itask, (f"message {lseverity}"), event)

if lseverity in self.NON_UNIQUE_EVENTS:
itask.non_unique_events.update({lseverity: 1})
self.setup_event_handlers(itask, lseverity, message)
self.setup_event_handlers(itask, lseverity, event)

return None

Expand Down Expand Up @@ -889,7 +893,7 @@ def _process_message_check(
# Ignore received messages from old jobs
LOG.warning(
f"[{itask}] "
f"{self.FLAG_RECEIVED_IGNORED}{message}{timestamp} "
f"{self._FLAG_RECEIVED_IGNORED}{message}{timestamp} "
f"for job({submit_num:02d}) != job({itask.submit_num:02d})"
)
return False
Expand Down Expand Up @@ -919,22 +923,25 @@ def _process_message_check(
if flag == self.FLAG_RECEIVED:
LOG.warning(
f"[{itask}] "
f"{self.FLAG_RECEIVED_IGNORED}{message}{timestamp}"
f"{self._FLAG_RECEIVED_IGNORED}{message}{timestamp}"
)

else:
LOG.warning(
f"[{itask}] "
f"{self.FLAG_POLLED_IGNORED}{message}{timestamp}"
f"{self._FLAG_POLLED_IGNORED}{message}{timestamp}"
)
return False

severity_lvl: int = LOG_LEVELS.get(severity, INFO)
# Demote log level to DEBUG if this is a message that duplicates what
# gets logged by itask state change anyway (and not manual poll)
if severity_lvl > DEBUG and flag != self.FLAG_POLLED and message in {
self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED,
self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_SUBMIT_FAILED,
f'{FAIL_MESSAGE_PREFIX}ERR'
}:
severity_lvl = DEBUG
LOG.log(severity_lvl, f"[{itask}] {flag}{message}{timestamp}")
Expand Down Expand Up @@ -1334,7 +1341,7 @@ def _process_message_failed(
self.workflow_db_mgr.put_update_task_state(itask)
else:
self.setup_event_handlers(
itask, self.EVENT_FAILED, message
itask, TASK_OUTPUT_FAILED, message
)
itask.state.outputs.set_message_complete(TASK_OUTPUT_FAILED)
self.data_store_mgr.delta_task_output(
Expand Down Expand Up @@ -1367,7 +1374,8 @@ def _process_message_started(
)
if itask.state_reset(TASK_STATUS_RUNNING, forced=forced):
self.setup_event_handlers(
itask, self.EVENT_STARTED, f'job {self.EVENT_STARTED}')
itask, TASK_OUTPUT_STARTED, f'job {TASK_OUTPUT_STARTED}'
)
self.data_store_mgr.delta_task_state(itask)
self._reset_job_timers(itask)

Expand All @@ -1382,7 +1390,7 @@ def _process_message_expired(self, itask: 'TaskProxy', forced: bool):
self.data_store_mgr.delta_task_state(itask)
self.setup_event_handlers(
itask,
self.EVENT_EXPIRED,
TASK_OUTPUT_EXPIRED,
"Task expired: will not submit job."
)

Expand Down Expand Up @@ -1413,7 +1421,8 @@ def _process_message_succeeded(
itask.summary['started_time'])
if itask.state_reset(TASK_STATUS_SUCCEEDED, forced=forced):
self.setup_event_handlers(
itask, self.EVENT_SUCCEEDED, f"job {self.EVENT_SUCCEEDED}")
itask, TASK_OUTPUT_SUCCEEDED, f"job {TASK_OUTPUT_SUCCEEDED}"
)
self.data_store_mgr.delta_task_state(itask)
self._reset_job_timers(itask)

Expand All @@ -1428,7 +1437,7 @@ def _process_message_submit_failed(
Return True if no retries (hence go to the submit-failed state).
"""
no_retries = False
LOG.critical(f"[{itask}] {self.EVENT_SUBMIT_FAILED}")
LOG.critical(f"[{itask}] {self._EVENT_SUBMIT_FAILED}")
if event_time is None:
event_time = get_current_time_string()
self.workflow_db_mgr.put_update_task_jobs(itask, {
Expand All @@ -1450,8 +1459,8 @@ def _process_message_submit_failed(
self.workflow_db_mgr.put_update_task_state(itask)
else:
self.setup_event_handlers(
itask, self.EVENT_SUBMIT_FAILED,
f'job {self.EVENT_SUBMIT_FAILED}'
itask, self._EVENT_SUBMIT_FAILED,
f'job {self._EVENT_SUBMIT_FAILED}'
)
itask.state.outputs.set_message_complete(
TASK_OUTPUT_SUBMIT_FAILED
Expand All @@ -1465,7 +1474,7 @@ def _process_message_submit_failed(
self._retry_task(itask, timer.timeout, submit_retry=True)
delay_msg = f"retrying in {timer.delay_timeout_as_str()}"
LOG.warning(f"[{itask}] {delay_msg}")
msg = f"job {self.EVENT_SUBMIT_FAILED}, {delay_msg}"
msg = f"job {self._EVENT_SUBMIT_FAILED}, {delay_msg}"
self.setup_event_handlers(itask, self.EVENT_SUBMIT_RETRY, msg)

# Register newly submit-failed job with the database and datastore.
Expand Down Expand Up @@ -1514,8 +1523,8 @@ def _process_message_submitted(
itask.state_reset(is_queued=False, forced=forced)
self.setup_event_handlers(
itask,
self.EVENT_SUBMITTED,
f'job {self.EVENT_SUBMITTED}',
TASK_OUTPUT_SUBMITTED,
f'job {TASK_OUTPUT_SUBMITTED}',
)
self.data_store_mgr.delta_task_state(itask)
self._reset_job_timers(itask)
Expand Down
Loading

0 comments on commit b1714c3

Please sign in to comment.