From a4b485d6730d966234fc30222295364fd8183763 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Fri, 1 Mar 2024 13:37:58 +0000
Subject: [PATCH 1/3] Tidy events managers
---
cylc/flow/task_events_mgr.py | 113 ++++++-------
cylc/flow/task_pool.py | 24 ++-
cylc/flow/workflow_events.py | 71 ++++----
tests/integration/test_task_events_mgr.py | 1 -
...t_workflow_db_mgr.py => test_db_compat.py} | 45 +++++-
tests/unit/test_subprocpool.py | 3 +-
tests/unit/test_task_events_mgr.py | 151 ++++++++++++++----
tests/unit/test_task_events_mgr_2.py | 101 ------------
8 files changed, 269 insertions(+), 240 deletions(-)
rename tests/unit/{test_workflow_db_mgr.py => test_db_compat.py} (76%)
delete mode 100644 tests/unit/test_task_events_mgr_2.py
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index 2ff2d7e44d1..3811283ec19 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -26,7 +26,6 @@
"""
from contextlib import suppress
-from collections import namedtuple
from enum import Enum
from logging import DEBUG, INFO, getLevelName
import os
@@ -34,12 +33,13 @@
import shlex
from time import time
from typing import (
+ TYPE_CHECKING,
Any,
Dict,
List,
NamedTuple,
Optional,
- TYPE_CHECKING,
+ Sequence,
Union,
cast,
)
@@ -88,6 +88,7 @@
)
from cylc.flow.workflow_events import (
EventData as WorkflowEventData,
+ construct_mail_cmd,
get_template_variables as get_workflow_template_variables,
process_mail_footer,
)
@@ -99,19 +100,21 @@
from cylc.flow.scheduler import Scheduler
-CustomTaskEventHandlerContext = namedtuple(
- "CustomTaskEventHandlerContext",
- ["key", "ctx_type", "cmd"])
+class CustomTaskEventHandlerContext(NamedTuple):
+ key: Union[str, Sequence[str]]
+ cmd: str
-TaskEventMailContext = namedtuple(
- "TaskEventMailContext",
- ["key", "ctx_type", "mail_from", "mail_to"])
+class TaskEventMailContext(NamedTuple):
+ key: str
+ mail_from: str
+ mail_to: str
-TaskJobLogsRetrieveContext = namedtuple(
- "TaskJobLogsRetrieveContext",
- ["key", "ctx_type", "platform_name", "max_size"])
+class TaskJobLogsRetrieveContext(NamedTuple):
+ key: Union[str, Sequence[str]]
+ platform_name: Optional[str]
+ max_size: Optional[int]
class EventKey(NamedTuple):
@@ -563,7 +566,7 @@ def process_events(self, schd: 'Scheduler') -> None:
# Avoid flooding user's mail box with mail notification.
# Group together as many notifications as possible within a
# given interval.
- timer.ctx.ctx_type == self.HANDLER_MAIL and
+ isinstance(timer.ctx, TaskEventMailContext) and
not schd.stop_mode and
self.next_mail_time is not None and
self.next_mail_time > now
@@ -571,7 +574,7 @@ def process_events(self, schd: 'Scheduler') -> None:
continue
timer.set_waiting()
- if timer.ctx.ctx_type == self.HANDLER_CUSTOM:
+ if isinstance(timer.ctx, CustomTaskEventHandlerContext):
# Run custom event handlers on their own
self.proc_pool.put_command(
SubProcContext(
@@ -591,11 +594,11 @@ def process_events(self, schd: 'Scheduler') -> None:
next_mail_time = now + self.mail_interval
for ctx, id_keys in ctx_groups.items():
- if ctx.ctx_type == self.HANDLER_MAIL:
+ if isinstance(ctx, TaskEventMailContext):
# Set next_mail_time if any mail sent
self.next_mail_time = next_mail_time
self._process_event_email(schd, ctx, id_keys)
- elif ctx.ctx_type == self.HANDLER_JOB_LOGS_RETRIEVE:
+ elif isinstance(ctx, TaskJobLogsRetrieveContext):
self._process_job_logs_retrieval(schd, ctx, id_keys)
def process_message(
@@ -884,15 +887,15 @@ def _process_message_check(
)
return False
- severity = cast('int', LOG_LEVELS.get(severity, INFO))
+ severity_lvl: int = LOG_LEVELS.get(severity, INFO)
# Demote log level to DEBUG if this is a message that duplicates what
# gets logged by itask state change anyway (and not manual poll)
- if severity > DEBUG and flag != self.FLAG_POLLED and message in {
+ if severity_lvl > DEBUG and flag != self.FLAG_POLLED and message in {
self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED,
self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
}:
- severity = DEBUG
- LOG.log(severity, f"[{itask}] {flag}{message}{timestamp}")
+ severity_lvl = DEBUG
+ LOG.log(severity_lvl, f"[{itask}] {flag}{message}{timestamp}")
return True
def setup_event_handlers(self, itask, event, message):
@@ -937,7 +940,7 @@ def _db_events_insert(self, itask, event="", message=""):
def _process_event_email(
self,
schd: 'Scheduler',
- ctx,
+ ctx: TaskEventMailContext,
id_keys: List[EventKey],
) -> None:
"""Process event notification, by email."""
@@ -957,12 +960,6 @@ def _process_event_email(
# n events from n tasks
subject = "[%d task events] %s" % (
len(id_keys), schd.workflow)
- cmd = ["mail", "-s", subject]
-
- # From: and To:
- cmd.append("-r")
- cmd.append(ctx.mail_from)
- cmd.append(ctx.mail_to)
# STDIN for mail, tasks
stdin_str = ""
@@ -990,16 +987,19 @@ def _process_event_email(
id_keys[-1].message,
),
)
- self._send_mail(ctx, cmd, stdin_str, id_keys, schd)
+ self._send_mail(ctx, subject, stdin_str, id_keys, schd)
def _send_mail(
self,
- ctx,
- cmd,
- stdin_str,
+ ctx: TaskEventMailContext,
+ subject: str,
+ stdin_str: str,
id_keys: List[EventKey],
schd: 'Scheduler',
) -> None:
+ cmd = construct_mail_cmd(
+ subject, from_address=ctx.mail_from, to_address=ctx.mail_to
+ )
# SMTP server
env = dict(os.environ)
if self.mail_smtp:
@@ -1037,15 +1037,17 @@ def _event_email_callback(self, proc_ctx, schd) -> None:
except KeyError as exc:
LOG.exception(exc)
- def _get_events_conf(self, itask, key, default=None):
+ def _get_events_conf(
+ self, itask: 'TaskProxy', key: str, default: Any = None
+ ) -> Any:
"""Return an events setting from workflow then global configuration."""
- for getter in [
- self.broadcast_mgr.get_broadcast(itask.tokens).get("events"),
- itask.tdef.rtconfig["mail"],
- itask.tdef.rtconfig["events"],
- glbl_cfg().get(["scheduler", "mail"]),
- glbl_cfg().get()["task events"],
- ]:
+ for getter in (
+ self.broadcast_mgr.get_broadcast(itask.tokens).get("events"),
+ itask.tdef.rtconfig["mail"],
+ itask.tdef.rtconfig["events"],
+ glbl_cfg().get(["scheduler", "mail"]),
+ glbl_cfg().get()["task events"],
+ ):
try:
value = getter.get(key)
except (AttributeError, ItemNotFoundError, KeyError):
@@ -1058,7 +1060,7 @@ def _get_events_conf(self, itask, key, default=None):
def _process_job_logs_retrieval(
self,
schd: 'Scheduler',
- ctx,
+ ctx: TaskJobLogsRetrieveContext,
id_keys: List[EventKey],
) -> None:
"""Process retrieval of task job logs from remote user@host."""
@@ -1521,10 +1523,11 @@ def _setup_job_logs_retrieval(self, itask, event) -> None:
id_key,
TaskActionTimer(
TaskJobLogsRetrieveContext(
- self.HANDLER_JOB_LOGS_RETRIEVE, # key
- self.HANDLER_JOB_LOGS_RETRIEVE, # ctx_type
- itask.platform['name'],
- self._get_remote_conf(itask, "retrieve job logs max size"),
+ key=self.HANDLER_JOB_LOGS_RETRIEVE,
+ platform_name=itask.platform['name'],
+ max_size=self._get_remote_conf(
+ itask, "retrieve job logs max size"
+ ),
),
retry_delays
)
@@ -1555,14 +1558,11 @@ def _setup_event_mail(
id_key,
TaskActionTimer(
TaskEventMailContext(
- self.HANDLER_MAIL, # key
- self.HANDLER_MAIL, # ctx_type
- self._get_events_conf( # mail_from
- itask,
- "from",
- "notifications@" + get_host(),
+ key=self.HANDLER_MAIL,
+ mail_from=self._get_events_conf(
+ itask, "from", f"notifications@{get_host()}"
),
- self._get_events_conf(itask, "to", get_user()) # mail_to
+ mail_to=self._get_events_conf(itask, "to", get_user())
)
)
)
@@ -1632,11 +1632,7 @@ def _setup_custom_event_handlers(
self.add_event_timer(
id_key,
TaskActionTimer(
- CustomTaskEventHandlerContext(
- key1,
- self.HANDLER_CUSTOM,
- cmd,
- ),
+ CustomTaskEventHandlerContext(key=key1, cmd=cmd),
retry_delays
)
)
@@ -1865,12 +1861,7 @@ def remove_event_timer(self, id_key: EventKey) -> None:
self.event_timers_updated = True
def unset_waiting_event_timer(self, id_key: EventKey) -> None:
- """Invoke unset_waiting on an event timer.
-
- Args:
- key (str)
-
- """
+ """Invoke unset_waiting on an event timer."""
self._event_timers[id_key].unset_waiting()
self.event_timers_updated = True
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index daa4d8fe00e..db79be4fa5d 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -24,10 +24,12 @@
Dict,
Iterable,
List,
+ NamedTuple,
Optional,
Set,
TYPE_CHECKING,
Tuple,
+ Type,
Union,
)
import logging
@@ -592,7 +594,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):
self.compute_runahead()
self.release_runahead_tasks()
- def load_db_task_action_timers(self, row_idx, row) -> None:
+ def load_db_task_action_timers(self, row_idx: int, row: Iterable) -> None:
"""Load a task action timer, e.g. event handlers, retry states."""
if row_idx == 0:
LOG.info("LOADING task action timers")
@@ -607,14 +609,22 @@ def load_db_task_action_timers(self, row_idx, row) -> None:
# Extract type namedtuple variables from JSON strings
ctx_key = json.loads(str(ctx_key_raw))
ctx_data = json.loads(str(ctx_raw))
- for known_cls in [
- CustomTaskEventHandlerContext,
- TaskEventMailContext,
- TaskJobLogsRetrieveContext]:
+ known_cls: Type[NamedTuple]
+ for known_cls in (
+ CustomTaskEventHandlerContext,
+ TaskEventMailContext,
+ TaskJobLogsRetrieveContext
+ ):
if ctx_data and ctx_data[0] == known_cls.__name__:
- ctx = known_cls(*ctx_data[1])
+ ctx_args: list = ctx_data[1]
+ if len(ctx_args) > len(known_cls._fields):
+ # BACK COMPAT: no-longer used ctx_type arg
+ # from: Cylc 7
+ # to: 8.3.0
+ ctx_args.pop(1)
+ ctx: tuple = known_cls(*ctx_args)
break
- else:
+ else: # no break
ctx = ctx_data
if ctx is not None:
ctx = tuple(ctx)
diff --git a/cylc/flow/workflow_events.py b/cylc/flow/workflow_events.py
index 63fe8464eca..2b99ef3d2cf 100644
--- a/cylc/flow/workflow_events.py
+++ b/cylc/flow/workflow_events.py
@@ -18,7 +18,7 @@
from enum import Enum
import os
from shlex import quote
-from typing import Dict, Union, TYPE_CHECKING
+from typing import Any, Dict, List, Union, TYPE_CHECKING
from cylc.flow import LOG
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
@@ -27,6 +27,7 @@
from cylc.flow.subprocctx import SubProcContext
if TYPE_CHECKING:
+ from cylc.flow.config import WorkflowConfig
from cylc.flow.scheduler import Scheduler
@@ -126,6 +127,18 @@ class EventData(Enum):
"""
+def construct_mail_cmd(
+ subject: str, from_address: str, to_address: str
+) -> List[str]:
+ """Construct a mail command."""
+ return [
+ 'mail',
+ '-s', subject,
+ '-r', from_address,
+ to_address
+ ]
+
+
def get_template_variables(
schd: 'Scheduler',
event: str,
@@ -211,26 +224,17 @@ def __init__(self, proc_pool):
self.proc_pool = proc_pool
@staticmethod
- def get_events_conf(config, key, default=None):
+ def get_events_conf(
+ config: 'WorkflowConfig', key: str, default: Any = None
+ ) -> Any:
"""Return a named [scheduler][[events]] configuration."""
- # Mail doesn't have any defaults in workflow.py
- if 'mail' in config.cfg['scheduler']:
- getters = [
- config.cfg['scheduler']['events'],
- config.cfg['scheduler']['mail'],
- glbl_cfg().get(['scheduler', 'events']),
- glbl_cfg().get(['scheduler', 'mail'])
- ]
- else:
- getters = [
- config.cfg['scheduler']['events'],
- glbl_cfg().get(['scheduler', 'events']),
- glbl_cfg().get(['scheduler', 'mail'])
- ]
- value = None
- for getter in getters:
- if key in getter:
- value = getter.get(key)
+ for getter in (
+ config.cfg['scheduler']['events'],
+ config.cfg['scheduler'].get('mail', {}),
+ glbl_cfg().get(['scheduler', 'events']),
+ glbl_cfg().get(['scheduler', 'mail'])
+ ):
+ value = getter.get(key)
if value is not None and value != []:
return value
return default
@@ -277,19 +281,26 @@ def _run_event_mail(self, schd, template_variables, event):
)
self._send_mail(event, subject, stdin_str, schd, env)
- def _send_mail(self, event, subject, message, schd, env):
+ def _send_mail(
+ self,
+ event: str,
+ subject: str,
+ message: str,
+ schd: 'Scheduler',
+ env: Dict[str, str]
+ ) -> None:
proc_ctx = SubProcContext(
(self.WORKFLOW_EVENT_HANDLER, event),
- [
- 'mail',
- '-s', subject,
- '-r', self.get_events_conf(
- schd.config,
- 'from', 'notifications@' + get_host()),
- self.get_events_conf(schd.config, 'to', get_user()),
- ],
+ construct_mail_cmd(
+ subject,
+ from_address=self.get_events_conf(
+ schd.config, 'from', f'notifications@{get_host()}'
+ ),
+ to_address=self.get_events_conf(schd.config, 'to', get_user())
+ ),
env=env,
- stdin_str=message)
+ stdin_str=message
+ )
if self.proc_pool.closed:
# Run command in foreground if process pool is closed
self.proc_pool.run_command(proc_ctx)
diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py
index 2f1494ee26b..5798b0c3d2a 100644
--- a/tests/integration/test_task_events_mgr.py
+++ b/tests/integration/test_task_events_mgr.py
@@ -27,7 +27,6 @@ async def test_process_job_logs_retrieval_warns_no_platform(
"""Job log retrieval handles `NoHostsError`"""
ctx = TaskJobLogsRetrieveContext(
- ctx_type='raa',
platform_name='skarloey',
max_size=256,
key='skarloey'
diff --git a/tests/unit/test_workflow_db_mgr.py b/tests/unit/test_db_compat.py
similarity index 76%
rename from tests/unit/test_workflow_db_mgr.py
rename to tests/unit/test_db_compat.py
index 48de335ac4c..1cb31173371 100644
--- a/tests/unit/test_workflow_db_mgr.py
+++ b/tests/unit/test_db_compat.py
@@ -14,14 +14,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""
-Tests for worklfow_db_manager
-"""
+"""Compatibility tests for handling old workflow databases."""
+from functools import partial
+from unittest.mock import Mock
import pytest
import sqlite3
from cylc.flow.exceptions import CylcError, ServiceFileError
+from cylc.flow.task_pool import TaskPool
from cylc.flow.workflow_db_mgr import (
CylcWorkflowDAO,
WorkflowDatabaseManager,
@@ -136,3 +137,41 @@ def test_cylc_7_db_wflow_params_table(_setup_db):
checker.get_remote_point_format()
assert checker.get_remote_point_format_compat() == ptformat
+
+
+def test_pre_830_task_action_timers(_setup_db):
+ """Test back compat for task_action_timers table.
+
+ Before 8.3.0, TaskEventMailContext had an extra field "ctx_type" at
+ index 1. TaskPool.load_db_task_action_timers() should be able to
+ discard this field.
+ """
+ values = [
+ r'''
+ CREATE TABLE task_action_timers(
+ cycle TEXT, name TEXT, ctx_key TEXT, ctx TEXT, delays TEXT,
+ num INTEGER, delay TEXT, timeout TEXT,
+ PRIMARY KEY(cycle, name, ctx_key)
+ );
+ ''',
+ r'''
+ INSERT INTO task_action_timers VALUES(
+ '1','foo','[["event-mail", "failed"], 9]',
+ '["TaskEventMailContext", ["event-mail", "event-mail", "notifications@fbc.gov", "jfaden"]]',
+ '[0.0]',1,'0.0','1709229449.61275'
+ );
+ ''',
+ r'''
+ INSERT INTO task_action_timers VALUES(
+ '1','foo','["try_timers", "execution-retry"]', null,
+ '[94608000.0]',1,NULL,NULL
+ );
+ ''',
+ ]
+ db_file = _setup_db(values)
+ mock_pool = Mock()
+ load_db_task_action_timers = partial(
+ TaskPool.load_db_task_action_timers, mock_pool
+ )
+ with CylcWorkflowDAO(db_file, create_tables=True) as dao:
+ dao.select_task_action_timers(load_db_task_action_timers)
diff --git a/tests/unit/test_subprocpool.py b/tests/unit/test_subprocpool.py
index 9e2d4af212b..feffdab19d0 100644
--- a/tests/unit/test_subprocpool.py
+++ b/tests/unit/test_subprocpool.py
@@ -253,7 +253,7 @@ def _test_callback_255(ctx, foo=''):
pytest.param(
'platform: localhost - Could not connect to mouse.',
255,
- TaskJobLogsRetrieveContext(['ssh', 'something'], None, None, None),
+ TaskJobLogsRetrieveContext(['ssh', 'something'], None, None),
id="return 255 (log-ret)"
)
]
@@ -280,7 +280,6 @@ def test__run_command_exit_no_255_callback(caplog, mock_ctx):
def test__run_command_exit_no_gettable_platform(caplog, mock_ctx):
"""It logs being unable to select a platform"""
ret_ctx = TaskJobLogsRetrieveContext(
- ctx_type='raa',
platform_name='rhenas',
max_size=256,
key='rhenas'
diff --git a/tests/unit/test_task_events_mgr.py b/tests/unit/test_task_events_mgr.py
index 6cc34da7faa..bd9a16b2c8b 100644
--- a/tests/unit/test_task_events_mgr.py
+++ b/tests/unit/test_task_events_mgr.py
@@ -14,42 +14,123 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-import unittest
-from unittest import mock
+from unittest.mock import Mock, patch
+
+import pytest
+
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.subprocctx import SubProcContext
-class TestTaskEventsManager(unittest.TestCase):
-
- @mock.patch("cylc.flow.task_events_mgr.LOG")
- def test_log_error_on_error_exit_code(self, cylc_log):
- """Test that an error log is emitted when the log retrieval command
- exited with a code different than zero.
-
- :param cylc_log: mocked cylc logger
- :type cylc_log: mock.MagicMock
- """
- task_events_manager = TaskEventsManager(
- None, None, None, None, None, None, None, None, None)
- proc_ctx = SubProcContext(cmd_key=None, cmd="error", ret_code=1,
- err="Error!", id_keys=[])
- task_events_manager._job_logs_retrieval_callback(proc_ctx, None)
- self.assertEqual(1, cylc_log.error.call_count)
- self.assertTrue(cylc_log.error.call_args.contains("Error!"))
-
- @mock.patch("cylc.flow.task_events_mgr.LOG")
- def test_log_debug_on_noerror_exit_code(self, cylc_log):
- """Test that a debug log is emitted when the log retrieval command
- exited with an non-error code (i.e. 0).
-
- :param cylc_log: mocked cylc logger
- :type cylc_log: mock.MagicMock
- """
- task_events_manager = TaskEventsManager(
- None, None, None, None, None, None, None, None, None)
- proc_ctx = SubProcContext(cmd_key=None, cmd="ls /tmp/123", ret_code=0,
- err="", id_keys=[])
- task_events_manager._job_logs_retrieval_callback(proc_ctx, None)
- self.assertEqual(1, cylc_log.debug.call_count)
- self.assertTrue(cylc_log.debug.call_args.contains("ls /tmp/123"))
+@patch("cylc.flow.task_events_mgr.LOG")
+def test_log_error_on_error_exit_code(cylc_log):
+ """Test that an error log is emitted when the log retrieval command
+ exited with a code different than zero.
+
+ :param cylc_log: mocked cylc logger
+ :type cylc_log: mock.MagicMock
+ """
+ task_events_manager = TaskEventsManager(
+ None, None, None, None, None, None, None, None, None)
+ proc_ctx = SubProcContext(
+ cmd_key=None, cmd="error", ret_code=1, err="Error!", id_keys=[])
+ task_events_manager._job_logs_retrieval_callback(proc_ctx, None)
+ assert cylc_log.error.call_count == 1
+ assert cylc_log.error.call_args.contains("Error!")
+
+
+@patch("cylc.flow.task_events_mgr.LOG")
+def test_log_debug_on_noerror_exit_code(cylc_log):
+ """Test that a debug log is emitted when the log retrieval command
+ exited with an non-error code (i.e. 0).
+
+ :param cylc_log: mocked cylc logger
+ :type cylc_log: mock.MagicMock
+ """
+ task_events_manager = TaskEventsManager(
+ None, None, None, None, None, None, None, None, None)
+ proc_ctx = SubProcContext(
+ cmd_key=None, cmd="ls /tmp/123", ret_code=0, err="", id_keys=[])
+ task_events_manager._job_logs_retrieval_callback(proc_ctx, None)
+ assert cylc_log.debug.call_count == 1
+ assert cylc_log.debug.call_args.contains("ls /tmp/123")
+
+
+@pytest.mark.parametrize(
+ "broadcast, remote, platforms, expected",
+ [
+ ("hpc1", "a", "b", "hpc1"),
+ (None, "hpc1", "b", "hpc1"),
+ (None, None, "hpc1", "hpc1"),
+ (None, None, None, None),
+ ]
+)
+def test_get_remote_conf(broadcast, remote, platforms, expected):
+ """Test TaskEventsManager._get_remote_conf()."""
+
+ task_events_mgr = TaskEventsManager(
+ None, None, None, None, None, None, None, None, None)
+
+ task_events_mgr.broadcast_mgr = Mock(
+ get_broadcast=lambda x: {
+ "remote": {
+ "host": broadcast
+ }
+ }
+ )
+
+ itask = Mock(
+ identity='foo.1',
+ tdef=Mock(
+ rtconfig={
+ 'remote': {
+ 'host': remote
+ }
+ }
+ ),
+ platform={
+ 'host': platforms
+ }
+ )
+
+ assert task_events_mgr._get_remote_conf(itask, 'host') == expected
+
+
+@pytest.mark.parametrize(
+ "broadcast, workflow, platforms, expected",
+ [
+ ([800], [700], [600], [800]),
+ (None, [700], [600], [700]),
+ (None, None, [600], [600]),
+ ]
+)
+def test_get_workflow_platforms_conf(broadcast, workflow, platforms, expected):
+ """Test TaskEventsManager._get_polling_interval_conf()."""
+
+ task_events_mgr = TaskEventsManager(
+ None, None, None, None, None, None, None, None, None)
+
+ KEY = "execution polling intervals"
+
+ task_events_mgr.broadcast_mgr = Mock(
+ get_broadcast=lambda x: {
+ KEY: broadcast
+ }
+ )
+
+ itask = Mock(
+ identity='foo.1',
+ tdef=Mock(
+ rtconfig={
+ KEY: workflow
+ }
+ ),
+ platform={
+ KEY: platforms
+ }
+ )
+
+ assert (
+ task_events_mgr._get_workflow_platforms_conf(itask, KEY) ==
+ expected
+ )
diff --git a/tests/unit/test_task_events_mgr_2.py b/tests/unit/test_task_events_mgr_2.py
deleted file mode 100644
index e129e3397a3..00000000000
--- a/tests/unit/test_task_events_mgr_2.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
-# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-
-from unittest.mock import Mock
-import pytest
-
-
-from cylc.flow.task_events_mgr import TaskEventsManager
-
-
-@pytest.mark.parametrize(
- "broadcast, remote, platforms, expected",
- [
- ("hpc1", "a", "b", "hpc1"),
- (None, "hpc1", "b", "hpc1"),
- (None, None, "hpc1", "hpc1"),
- (None, None, None, None),
- ]
-)
-def test_get_remote_conf(broadcast, remote, platforms, expected):
- """Test TaskEventsManager._get_remote_conf()."""
-
- task_events_mgr = TaskEventsManager(
- None, None, None, None, None, None, None, None, None)
-
- task_events_mgr.broadcast_mgr = Mock(
- get_broadcast=lambda x: {
- "remote": {
- "host": broadcast
- }
- }
- )
-
- itask = Mock(
- identity='foo.1',
- tdef=Mock(
- rtconfig={
- 'remote': {
- 'host': remote
- }
- }
- ),
- platform={
- 'host': platforms
- }
- )
-
- assert task_events_mgr._get_remote_conf(itask, 'host') == expected
-
-
-@pytest.mark.parametrize(
- "broadcast, workflow, platforms, expected",
- [
- ([800], [700], [600], [800]),
- (None, [700], [600], [700]),
- (None, None, [600], [600]),
- ]
-)
-def test_get_workflow_platforms_conf(broadcast, workflow, platforms, expected):
- """Test TaskEventsManager._get_polling_interval_conf()."""
-
- task_events_mgr = TaskEventsManager(
- None, None, None, None, None, None, None, None, None)
-
- KEY = "execution polling intervals"
-
- task_events_mgr.broadcast_mgr = Mock(
- get_broadcast=lambda x: {
- KEY: broadcast
- }
- )
-
- itask = Mock(
- identity='foo.1',
- tdef=Mock(
- rtconfig={
- KEY: workflow
- }
- ),
- platform={
- KEY: platforms
- }
- )
-
- assert (
- task_events_mgr._get_workflow_platforms_conf(itask, KEY) ==
- expected
- )
From f703031a72eee51b12040fd3b3a5137390700406 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Tue, 5 Mar 2024 16:49:56 +0000
Subject: [PATCH 2/3] Ensure task mail respects `[scheduler][mail]to/from`
---
cylc/flow/scheduler.py | 1 -
cylc/flow/task_events_mgr.py | 13 +++---
tests/unit/test_task_events_mgr.py | 63 +++++++++++++++++++++++++++++-
tests/unit/test_workflow_events.py | 40 ++++++++-----------
4 files changed, 85 insertions(+), 32 deletions(-)
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index fb1609ca00a..9b098d2bbcd 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -467,7 +467,6 @@ async def configure(self, params):
"task event batch interval"]
self.task_events_mgr.mail_smtp = self._get_events_conf("smtp")
self.task_events_mgr.mail_footer = self._get_events_conf("footer")
- self.task_events_mgr.workflow_url = self.config.cfg['meta']['URL']
self.task_events_mgr.workflow_cfg = self.config.cfg
if self.options.genref:
LOG.addHandler(ReferenceLogFileHandler(
diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py
index 3811283ec19..04cbac361af 100644
--- a/cylc/flow/task_events_mgr.py
+++ b/cylc/flow/task_events_mgr.py
@@ -429,23 +429,23 @@ class TaskEventsManager():
EVENT_SUCCEEDED
}
+ workflow_cfg: Dict[str, Any]
+ uuid_str: str
+ mail_interval: float = 0
+ mail_smtp: Optional[str] = None
+ mail_footer: Optional[str] = None
+
def __init__(
self, workflow, proc_pool, workflow_db_mgr, broadcast_mgr,
xtrigger_mgr, data_store_mgr, timestamp, bad_hosts,
reset_inactivity_timer_func
):
self.workflow = workflow
- self.workflow_url = None
- self.workflow_cfg = {}
- self.uuid_str = None
self.proc_pool = proc_pool
self.workflow_db_mgr = workflow_db_mgr
self.broadcast_mgr = broadcast_mgr
self.xtrigger_mgr = xtrigger_mgr
self.data_store_mgr = data_store_mgr
- self.mail_interval = 0.0
- self.mail_smtp = None
- self.mail_footer = None
self.next_mail_time = None
self.reset_inactivity_timer_func = reset_inactivity_timer_func
# NOTE: do not mutate directly
@@ -1045,6 +1045,7 @@ def _get_events_conf(
self.broadcast_mgr.get_broadcast(itask.tokens).get("events"),
itask.tdef.rtconfig["mail"],
itask.tdef.rtconfig["events"],
+ self.workflow_cfg.get("scheduler", {}).get("mail", {}),
glbl_cfg().get(["scheduler", "mail"]),
glbl_cfg().get()["task events"],
):
diff --git a/tests/unit/test_task_events_mgr.py b/tests/unit/test_task_events_mgr.py
index bd9a16b2c8b..b0c989e5af0 100644
--- a/tests/unit/test_task_events_mgr.py
+++ b/tests/unit/test_task_events_mgr.py
@@ -14,12 +14,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+from typing import Optional
from unittest.mock import Mock, patch
import pytest
-from cylc.flow.task_events_mgr import TaskEventsManager
+from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.subprocctx import SubProcContext
+from cylc.flow.task_events_mgr import TaskEventsManager
+from cylc.flow.task_proxy import TaskProxy
+from cylc.flow.taskdef import TaskDef
@patch("cylc.flow.task_events_mgr.LOG")
@@ -134,3 +138,60 @@ def test_get_workflow_platforms_conf(broadcast, workflow, platforms, expected):
task_events_mgr._get_workflow_platforms_conf(itask, KEY) ==
expected
)
+
+
+@pytest.mark.parametrize(
+ 'rt_val, schd_val, glbl_val, expected',
+ [
+ ('rt', 'schd', 'glbl', 'rt'),
+ (None, 'schd', 'glbl', 'schd'),
+ (None, None, 'glbl', 'glbl'),
+ (None, None, None, 'default'),
+ ]
+)
+def test_get_events_conf__mail_to_from(
+ mock_glbl_cfg,
+ rt_val: Optional[str],
+ schd_val: Optional[str],
+ glbl_val: Optional[str],
+ expected: str
+):
+ """Test order of precedence for [mail]to/from."""
+ if glbl_val:
+ mock_glbl_cfg(
+ 'cylc.flow.task_events_mgr.glbl_cfg',
+ f'''
+ [scheduler]
+ [[mail]]
+ from = {glbl_val}
+ to = {glbl_val}
+ '''
+ )
+
+ mock_task = Mock(
+ spec=TaskProxy,
+ tdef=Mock(
+ spec=TaskDef,
+ rtconfig={
+ 'events': {},
+ 'mail': {'to': rt_val, 'from': rt_val} if rt_val else {},
+ },
+ ),
+ )
+ mock_task_events_mgr = Mock(
+ spec=TaskEventsManager,
+ workflow_cfg={
+ 'scheduler': {
+ 'mail': {'to': schd_val, 'from': schd_val},
+ },
+ } if schd_val else {},
+ broadcast_mgr=Mock(
+ spec_set=BroadcastMgr,
+ get_broadcast=lambda *a, **k: {},
+ ),
+ )
+
+ for key in ('to', 'from'):
+ assert TaskEventsManager._get_events_conf(
+ mock_task_events_mgr, itask=mock_task, key=key, default='default'
+ ) == expected
diff --git a/tests/unit/test_workflow_events.py b/tests/unit/test_workflow_events.py
index 3e6493c393d..89449953f20 100644
--- a/tests/unit/test_workflow_events.py
+++ b/tests/unit/test_workflow_events.py
@@ -29,22 +29,24 @@
@pytest.mark.parametrize(
- 'key, expected, scheduler_mail_defined',
+ 'key, workflow_cfg, glbl_cfg, expected',
[
- ('handlers', ['stall'], True),
- ('hotel', None, True),
- ('from', 'highway@mixture', True),
- ('abort on workflow timeout', True, True),
- ('handlers', ['stall'], False),
- ('hotel', None, False),
- ('abort on workflow timeout', True, False),
+ ('handlers', True, True, ['stall']),
+ ('handlers', False, True, None),
+ ('handlers', False, False, None),
+ ('from', True, True, 'docklands@railway'),
+ ('from', False, True, 'highway@mixture'),
+ ('from', False, False, None),
+ ('abort on workflow timeout', True, True, True),
+ ('abort on workflow timeout', False, True, True),
+ ('abort on workflow timeout', False, False, False),
]
)
def test_get_events_handler(
- mock_glbl_cfg, key, expected, scheduler_mail_defined
+ mock_glbl_cfg, key, workflow_cfg, glbl_cfg, expected
):
- # It checks that method returns sensible answers.
- if scheduler_mail_defined is True:
+ """Test order of precedence for getting event handler configuration."""
+ if glbl_cfg:
mock_glbl_cfg(
'cylc.flow.workflow_events.glbl_cfg',
'''
@@ -55,23 +57,13 @@ def test_get_events_handler(
abort on workflow timeout = True
'''
)
- else:
- mock_glbl_cfg(
- 'cylc.flow.workflow_events.glbl_cfg',
- '''
- [scheduler]
- [[events]]
- abort on workflow timeout = True
- '''
- )
config = SimpleNamespace()
config.cfg = {
'scheduler': {
- 'events': {
- 'handlers': ['stall']
- },
- }
+ 'events': {'handlers': ['stall']},
+ 'mail': {'from': 'docklands@railway'},
+ } if workflow_cfg else {'events': {}}
}
assert WorkflowEventHandler.get_events_conf(config, key) == expected
From 4278550675d242cabf2decf545b6123f4ec7cdf3 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Mon, 11 Mar 2024 10:31:17 +0000
Subject: [PATCH 3/3] Update changelog [skip ci]
---
changes.d/6008.fix.md | 1 +
1 file changed, 1 insertion(+)
create mode 100644 changes.d/6008.fix.md
diff --git a/changes.d/6008.fix.md b/changes.d/6008.fix.md
new file mode 100644
index 00000000000..7741792de27
--- /dev/null
+++ b/changes.d/6008.fix.md
@@ -0,0 +1 @@
+Fixed bug where the `[scheduler][mail]to/from` settings did not apply as defaults for task event mail.