diff --git a/changes.d/6008.fix.md b/changes.d/6008.fix.md new file mode 100644 index 00000000000..7741792de27 --- /dev/null +++ b/changes.d/6008.fix.md @@ -0,0 +1 @@ +Fixed bug where the `[scheduler][mail]to/from` settings did not apply as defaults for task event mail. diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index fb1609ca00a..9b098d2bbcd 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -467,7 +467,6 @@ async def configure(self, params): "task event batch interval"] self.task_events_mgr.mail_smtp = self._get_events_conf("smtp") self.task_events_mgr.mail_footer = self._get_events_conf("footer") - self.task_events_mgr.workflow_url = self.config.cfg['meta']['URL'] self.task_events_mgr.workflow_cfg = self.config.cfg if self.options.genref: LOG.addHandler(ReferenceLogFileHandler( diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 2ff2d7e44d1..04cbac361af 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -26,7 +26,6 @@ """ from contextlib import suppress -from collections import namedtuple from enum import Enum from logging import DEBUG, INFO, getLevelName import os @@ -34,12 +33,13 @@ import shlex from time import time from typing import ( + TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, - TYPE_CHECKING, + Sequence, Union, cast, ) @@ -88,6 +88,7 @@ ) from cylc.flow.workflow_events import ( EventData as WorkflowEventData, + construct_mail_cmd, get_template_variables as get_workflow_template_variables, process_mail_footer, ) @@ -99,19 +100,21 @@ from cylc.flow.scheduler import Scheduler -CustomTaskEventHandlerContext = namedtuple( - "CustomTaskEventHandlerContext", - ["key", "ctx_type", "cmd"]) +class CustomTaskEventHandlerContext(NamedTuple): + key: Union[str, Sequence[str]] + cmd: str -TaskEventMailContext = namedtuple( - "TaskEventMailContext", - ["key", "ctx_type", "mail_from", "mail_to"]) +class TaskEventMailContext(NamedTuple): + key: str + mail_from: str + mail_to: str -TaskJobLogsRetrieveContext = namedtuple( - "TaskJobLogsRetrieveContext", - ["key", "ctx_type", "platform_name", "max_size"]) +class TaskJobLogsRetrieveContext(NamedTuple): + key: Union[str, Sequence[str]] + platform_name: Optional[str] + max_size: Optional[int] class EventKey(NamedTuple): @@ -426,23 +429,23 @@ class TaskEventsManager(): EVENT_SUCCEEDED } + workflow_cfg: Dict[str, Any] + uuid_str: str + mail_interval: float = 0 + mail_smtp: Optional[str] = None + mail_footer: Optional[str] = None + def __init__( self, workflow, proc_pool, workflow_db_mgr, broadcast_mgr, xtrigger_mgr, data_store_mgr, timestamp, bad_hosts, reset_inactivity_timer_func ): self.workflow = workflow - self.workflow_url = None - self.workflow_cfg = {} - self.uuid_str = None self.proc_pool = proc_pool self.workflow_db_mgr = workflow_db_mgr self.broadcast_mgr = broadcast_mgr self.xtrigger_mgr = xtrigger_mgr self.data_store_mgr = data_store_mgr - self.mail_interval = 0.0 - self.mail_smtp = None - self.mail_footer = None self.next_mail_time = None self.reset_inactivity_timer_func = reset_inactivity_timer_func # NOTE: do not mutate directly @@ -563,7 +566,7 @@ def process_events(self, schd: 'Scheduler') -> None: # Avoid flooding user's mail box with mail notification. # Group together as many notifications as possible within a # given interval. - timer.ctx.ctx_type == self.HANDLER_MAIL and + isinstance(timer.ctx, TaskEventMailContext) and not schd.stop_mode and self.next_mail_time is not None and self.next_mail_time > now @@ -571,7 +574,7 @@ def process_events(self, schd: 'Scheduler') -> None: continue timer.set_waiting() - if timer.ctx.ctx_type == self.HANDLER_CUSTOM: + if isinstance(timer.ctx, CustomTaskEventHandlerContext): # Run custom event handlers on their own self.proc_pool.put_command( SubProcContext( @@ -591,11 +594,11 @@ def process_events(self, schd: 'Scheduler') -> None: next_mail_time = now + self.mail_interval for ctx, id_keys in ctx_groups.items(): - if ctx.ctx_type == self.HANDLER_MAIL: + if isinstance(ctx, TaskEventMailContext): # Set next_mail_time if any mail sent self.next_mail_time = next_mail_time self._process_event_email(schd, ctx, id_keys) - elif ctx.ctx_type == self.HANDLER_JOB_LOGS_RETRIEVE: + elif isinstance(ctx, TaskJobLogsRetrieveContext): self._process_job_logs_retrieval(schd, ctx, id_keys) def process_message( @@ -884,15 +887,15 @@ def _process_message_check( ) return False - severity = cast('int', LOG_LEVELS.get(severity, INFO)) + severity_lvl: int = LOG_LEVELS.get(severity, INFO) # Demote log level to DEBUG if this is a message that duplicates what # gets logged by itask state change anyway (and not manual poll) - if severity > DEBUG and flag != self.FLAG_POLLED and message in { + if severity_lvl > DEBUG and flag != self.FLAG_POLLED and message in { self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED, self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR' }: - severity = DEBUG - LOG.log(severity, f"[{itask}] {flag}{message}{timestamp}") + severity_lvl = DEBUG + LOG.log(severity_lvl, f"[{itask}] {flag}{message}{timestamp}") return True def setup_event_handlers(self, itask, event, message): @@ -937,7 +940,7 @@ def _db_events_insert(self, itask, event="", message=""): def _process_event_email( self, schd: 'Scheduler', - ctx, + ctx: TaskEventMailContext, id_keys: List[EventKey], ) -> None: """Process event notification, by email.""" @@ -957,12 +960,6 @@ def _process_event_email( # n events from n tasks subject = "[%d task events] %s" % ( len(id_keys), schd.workflow) - cmd = ["mail", "-s", subject] - - # From: and To: - cmd.append("-r") - cmd.append(ctx.mail_from) - cmd.append(ctx.mail_to) # STDIN for mail, tasks stdin_str = "" @@ -990,16 +987,19 @@ def _process_event_email( id_keys[-1].message, ), ) - self._send_mail(ctx, cmd, stdin_str, id_keys, schd) + self._send_mail(ctx, subject, stdin_str, id_keys, schd) def _send_mail( self, - ctx, - cmd, - stdin_str, + ctx: TaskEventMailContext, + subject: str, + stdin_str: str, id_keys: List[EventKey], schd: 'Scheduler', ) -> None: + cmd = construct_mail_cmd( + subject, from_address=ctx.mail_from, to_address=ctx.mail_to + ) # SMTP server env = dict(os.environ) if self.mail_smtp: @@ -1037,15 +1037,18 @@ def _event_email_callback(self, proc_ctx, schd) -> None: except KeyError as exc: LOG.exception(exc) - def _get_events_conf(self, itask, key, default=None): + def _get_events_conf( + self, itask: 'TaskProxy', key: str, default: Any = None + ) -> Any: """Return an events setting from workflow then global configuration.""" - for getter in [ - self.broadcast_mgr.get_broadcast(itask.tokens).get("events"), - itask.tdef.rtconfig["mail"], - itask.tdef.rtconfig["events"], - glbl_cfg().get(["scheduler", "mail"]), - glbl_cfg().get()["task events"], - ]: + for getter in ( + self.broadcast_mgr.get_broadcast(itask.tokens).get("events"), + itask.tdef.rtconfig["mail"], + itask.tdef.rtconfig["events"], + self.workflow_cfg.get("scheduler", {}).get("mail", {}), + glbl_cfg().get(["scheduler", "mail"]), + glbl_cfg().get()["task events"], + ): try: value = getter.get(key) except (AttributeError, ItemNotFoundError, KeyError): @@ -1058,7 +1061,7 @@ def _get_events_conf(self, itask, key, default=None): def _process_job_logs_retrieval( self, schd: 'Scheduler', - ctx, + ctx: TaskJobLogsRetrieveContext, id_keys: List[EventKey], ) -> None: """Process retrieval of task job logs from remote user@host.""" @@ -1521,10 +1524,11 @@ def _setup_job_logs_retrieval(self, itask, event) -> None: id_key, TaskActionTimer( TaskJobLogsRetrieveContext( - self.HANDLER_JOB_LOGS_RETRIEVE, # key - self.HANDLER_JOB_LOGS_RETRIEVE, # ctx_type - itask.platform['name'], - self._get_remote_conf(itask, "retrieve job logs max size"), + key=self.HANDLER_JOB_LOGS_RETRIEVE, + platform_name=itask.platform['name'], + max_size=self._get_remote_conf( + itask, "retrieve job logs max size" + ), ), retry_delays ) @@ -1555,14 +1559,11 @@ def _setup_event_mail( id_key, TaskActionTimer( TaskEventMailContext( - self.HANDLER_MAIL, # key - self.HANDLER_MAIL, # ctx_type - self._get_events_conf( # mail_from - itask, - "from", - "notifications@" + get_host(), + key=self.HANDLER_MAIL, + mail_from=self._get_events_conf( + itask, "from", f"notifications@{get_host()}" ), - self._get_events_conf(itask, "to", get_user()) # mail_to + mail_to=self._get_events_conf(itask, "to", get_user()) ) ) ) @@ -1632,11 +1633,7 @@ def _setup_custom_event_handlers( self.add_event_timer( id_key, TaskActionTimer( - CustomTaskEventHandlerContext( - key1, - self.HANDLER_CUSTOM, - cmd, - ), + CustomTaskEventHandlerContext(key=key1, cmd=cmd), retry_delays ) ) @@ -1865,12 +1862,7 @@ def remove_event_timer(self, id_key: EventKey) -> None: self.event_timers_updated = True def unset_waiting_event_timer(self, id_key: EventKey) -> None: - """Invoke unset_waiting on an event timer. - - Args: - key (str) - - """ + """Invoke unset_waiting on an event timer.""" self._event_timers[id_key].unset_waiting() self.event_timers_updated = True diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index daa4d8fe00e..db79be4fa5d 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -24,10 +24,12 @@ Dict, Iterable, List, + NamedTuple, Optional, Set, TYPE_CHECKING, Tuple, + Type, Union, ) import logging @@ -592,7 +594,7 @@ def load_db_task_pool_for_restart(self, row_idx, row): self.compute_runahead() self.release_runahead_tasks() - def load_db_task_action_timers(self, row_idx, row) -> None: + def load_db_task_action_timers(self, row_idx: int, row: Iterable) -> None: """Load a task action timer, e.g. event handlers, retry states.""" if row_idx == 0: LOG.info("LOADING task action timers") @@ -607,14 +609,22 @@ def load_db_task_action_timers(self, row_idx, row) -> None: # Extract type namedtuple variables from JSON strings ctx_key = json.loads(str(ctx_key_raw)) ctx_data = json.loads(str(ctx_raw)) - for known_cls in [ - CustomTaskEventHandlerContext, - TaskEventMailContext, - TaskJobLogsRetrieveContext]: + known_cls: Type[NamedTuple] + for known_cls in ( + CustomTaskEventHandlerContext, + TaskEventMailContext, + TaskJobLogsRetrieveContext + ): if ctx_data and ctx_data[0] == known_cls.__name__: - ctx = known_cls(*ctx_data[1]) + ctx_args: list = ctx_data[1] + if len(ctx_args) > len(known_cls._fields): + # BACK COMPAT: no-longer used ctx_type arg + # from: Cylc 7 + # to: 8.3.0 + ctx_args.pop(1) + ctx: tuple = known_cls(*ctx_args) break - else: + else: # no break ctx = ctx_data if ctx is not None: ctx = tuple(ctx) diff --git a/cylc/flow/workflow_events.py b/cylc/flow/workflow_events.py index 63fe8464eca..2b99ef3d2cf 100644 --- a/cylc/flow/workflow_events.py +++ b/cylc/flow/workflow_events.py @@ -18,7 +18,7 @@ from enum import Enum import os from shlex import quote -from typing import Dict, Union, TYPE_CHECKING +from typing import Any, Dict, List, Union, TYPE_CHECKING from cylc.flow import LOG from cylc.flow.cfgspec.glbl_cfg import glbl_cfg @@ -27,6 +27,7 @@ from cylc.flow.subprocctx import SubProcContext if TYPE_CHECKING: + from cylc.flow.config import WorkflowConfig from cylc.flow.scheduler import Scheduler @@ -126,6 +127,18 @@ class EventData(Enum): """ +def construct_mail_cmd( + subject: str, from_address: str, to_address: str +) -> List[str]: + """Construct a mail command.""" + return [ + 'mail', + '-s', subject, + '-r', from_address, + to_address + ] + + def get_template_variables( schd: 'Scheduler', event: str, @@ -211,26 +224,17 @@ def __init__(self, proc_pool): self.proc_pool = proc_pool @staticmethod - def get_events_conf(config, key, default=None): + def get_events_conf( + config: 'WorkflowConfig', key: str, default: Any = None + ) -> Any: """Return a named [scheduler][[events]] configuration.""" - # Mail doesn't have any defaults in workflow.py - if 'mail' in config.cfg['scheduler']: - getters = [ - config.cfg['scheduler']['events'], - config.cfg['scheduler']['mail'], - glbl_cfg().get(['scheduler', 'events']), - glbl_cfg().get(['scheduler', 'mail']) - ] - else: - getters = [ - config.cfg['scheduler']['events'], - glbl_cfg().get(['scheduler', 'events']), - glbl_cfg().get(['scheduler', 'mail']) - ] - value = None - for getter in getters: - if key in getter: - value = getter.get(key) + for getter in ( + config.cfg['scheduler']['events'], + config.cfg['scheduler'].get('mail', {}), + glbl_cfg().get(['scheduler', 'events']), + glbl_cfg().get(['scheduler', 'mail']) + ): + value = getter.get(key) if value is not None and value != []: return value return default @@ -277,19 +281,26 @@ def _run_event_mail(self, schd, template_variables, event): ) self._send_mail(event, subject, stdin_str, schd, env) - def _send_mail(self, event, subject, message, schd, env): + def _send_mail( + self, + event: str, + subject: str, + message: str, + schd: 'Scheduler', + env: Dict[str, str] + ) -> None: proc_ctx = SubProcContext( (self.WORKFLOW_EVENT_HANDLER, event), - [ - 'mail', - '-s', subject, - '-r', self.get_events_conf( - schd.config, - 'from', 'notifications@' + get_host()), - self.get_events_conf(schd.config, 'to', get_user()), - ], + construct_mail_cmd( + subject, + from_address=self.get_events_conf( + schd.config, 'from', f'notifications@{get_host()}' + ), + to_address=self.get_events_conf(schd.config, 'to', get_user()) + ), env=env, - stdin_str=message) + stdin_str=message + ) if self.proc_pool.closed: # Run command in foreground if process pool is closed self.proc_pool.run_command(proc_ctx) diff --git a/tests/integration/test_task_events_mgr.py b/tests/integration/test_task_events_mgr.py index 2f1494ee26b..5798b0c3d2a 100644 --- a/tests/integration/test_task_events_mgr.py +++ b/tests/integration/test_task_events_mgr.py @@ -27,7 +27,6 @@ async def test_process_job_logs_retrieval_warns_no_platform( """Job log retrieval handles `NoHostsError`""" ctx = TaskJobLogsRetrieveContext( - ctx_type='raa', platform_name='skarloey', max_size=256, key='skarloey' diff --git a/tests/unit/test_workflow_db_mgr.py b/tests/unit/test_db_compat.py similarity index 76% rename from tests/unit/test_workflow_db_mgr.py rename to tests/unit/test_db_compat.py index 48de335ac4c..1cb31173371 100644 --- a/tests/unit/test_workflow_db_mgr.py +++ b/tests/unit/test_db_compat.py @@ -14,14 +14,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -""" -Tests for worklfow_db_manager -""" +"""Compatibility tests for handling old workflow databases.""" +from functools import partial +from unittest.mock import Mock import pytest import sqlite3 from cylc.flow.exceptions import CylcError, ServiceFileError +from cylc.flow.task_pool import TaskPool from cylc.flow.workflow_db_mgr import ( CylcWorkflowDAO, WorkflowDatabaseManager, @@ -136,3 +137,41 @@ def test_cylc_7_db_wflow_params_table(_setup_db): checker.get_remote_point_format() assert checker.get_remote_point_format_compat() == ptformat + + +def test_pre_830_task_action_timers(_setup_db): + """Test back compat for task_action_timers table. + + Before 8.3.0, TaskEventMailContext had an extra field "ctx_type" at + index 1. TaskPool.load_db_task_action_timers() should be able to + discard this field. + """ + values = [ + r''' + CREATE TABLE task_action_timers( + cycle TEXT, name TEXT, ctx_key TEXT, ctx TEXT, delays TEXT, + num INTEGER, delay TEXT, timeout TEXT, + PRIMARY KEY(cycle, name, ctx_key) + ); + ''', + r''' + INSERT INTO task_action_timers VALUES( + '1','foo','[["event-mail", "failed"], 9]', + '["TaskEventMailContext", ["event-mail", "event-mail", "notifications@fbc.gov", "jfaden"]]', + '[0.0]',1,'0.0','1709229449.61275' + ); + ''', + r''' + INSERT INTO task_action_timers VALUES( + '1','foo','["try_timers", "execution-retry"]', null, + '[94608000.0]',1,NULL,NULL + ); + ''', + ] + db_file = _setup_db(values) + mock_pool = Mock() + load_db_task_action_timers = partial( + TaskPool.load_db_task_action_timers, mock_pool + ) + with CylcWorkflowDAO(db_file, create_tables=True) as dao: + dao.select_task_action_timers(load_db_task_action_timers) diff --git a/tests/unit/test_subprocpool.py b/tests/unit/test_subprocpool.py index 9e2d4af212b..feffdab19d0 100644 --- a/tests/unit/test_subprocpool.py +++ b/tests/unit/test_subprocpool.py @@ -253,7 +253,7 @@ def _test_callback_255(ctx, foo=''): pytest.param( 'platform: localhost - Could not connect to mouse.', 255, - TaskJobLogsRetrieveContext(['ssh', 'something'], None, None, None), + TaskJobLogsRetrieveContext(['ssh', 'something'], None, None), id="return 255 (log-ret)" ) ] @@ -280,7 +280,6 @@ def test__run_command_exit_no_255_callback(caplog, mock_ctx): def test__run_command_exit_no_gettable_platform(caplog, mock_ctx): """It logs being unable to select a platform""" ret_ctx = TaskJobLogsRetrieveContext( - ctx_type='raa', platform_name='rhenas', max_size=256, key='rhenas' diff --git a/tests/unit/test_task_events_mgr.py b/tests/unit/test_task_events_mgr.py index 6cc34da7faa..b0c989e5af0 100644 --- a/tests/unit/test_task_events_mgr.py +++ b/tests/unit/test_task_events_mgr.py @@ -14,42 +14,184 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import unittest -from unittest import mock -from cylc.flow.task_events_mgr import TaskEventsManager +from typing import Optional +from unittest.mock import Mock, patch + +import pytest + +from cylc.flow.broadcast_mgr import BroadcastMgr from cylc.flow.subprocctx import SubProcContext +from cylc.flow.task_events_mgr import TaskEventsManager +from cylc.flow.task_proxy import TaskProxy +from cylc.flow.taskdef import TaskDef + + +@patch("cylc.flow.task_events_mgr.LOG") +def test_log_error_on_error_exit_code(cylc_log): + """Test that an error log is emitted when the log retrieval command + exited with a code different than zero. + + :param cylc_log: mocked cylc logger + :type cylc_log: mock.MagicMock + """ + task_events_manager = TaskEventsManager( + None, None, None, None, None, None, None, None, None) + proc_ctx = SubProcContext( + cmd_key=None, cmd="error", ret_code=1, err="Error!", id_keys=[]) + task_events_manager._job_logs_retrieval_callback(proc_ctx, None) + assert cylc_log.error.call_count == 1 + assert cylc_log.error.call_args.contains("Error!") + + +@patch("cylc.flow.task_events_mgr.LOG") +def test_log_debug_on_noerror_exit_code(cylc_log): + """Test that a debug log is emitted when the log retrieval command + exited with an non-error code (i.e. 0). + + :param cylc_log: mocked cylc logger + :type cylc_log: mock.MagicMock + """ + task_events_manager = TaskEventsManager( + None, None, None, None, None, None, None, None, None) + proc_ctx = SubProcContext( + cmd_key=None, cmd="ls /tmp/123", ret_code=0, err="", id_keys=[]) + task_events_manager._job_logs_retrieval_callback(proc_ctx, None) + assert cylc_log.debug.call_count == 1 + assert cylc_log.debug.call_args.contains("ls /tmp/123") + + +@pytest.mark.parametrize( + "broadcast, remote, platforms, expected", + [ + ("hpc1", "a", "b", "hpc1"), + (None, "hpc1", "b", "hpc1"), + (None, None, "hpc1", "hpc1"), + (None, None, None, None), + ] +) +def test_get_remote_conf(broadcast, remote, platforms, expected): + """Test TaskEventsManager._get_remote_conf().""" + + task_events_mgr = TaskEventsManager( + None, None, None, None, None, None, None, None, None) + + task_events_mgr.broadcast_mgr = Mock( + get_broadcast=lambda x: { + "remote": { + "host": broadcast + } + } + ) + + itask = Mock( + identity='foo.1', + tdef=Mock( + rtconfig={ + 'remote': { + 'host': remote + } + } + ), + platform={ + 'host': platforms + } + ) + + assert task_events_mgr._get_remote_conf(itask, 'host') == expected + + +@pytest.mark.parametrize( + "broadcast, workflow, platforms, expected", + [ + ([800], [700], [600], [800]), + (None, [700], [600], [700]), + (None, None, [600], [600]), + ] +) +def test_get_workflow_platforms_conf(broadcast, workflow, platforms, expected): + """Test TaskEventsManager._get_polling_interval_conf().""" + + task_events_mgr = TaskEventsManager( + None, None, None, None, None, None, None, None, None) + + KEY = "execution polling intervals" + + task_events_mgr.broadcast_mgr = Mock( + get_broadcast=lambda x: { + KEY: broadcast + } + ) + + itask = Mock( + identity='foo.1', + tdef=Mock( + rtconfig={ + KEY: workflow + } + ), + platform={ + KEY: platforms + } + ) + + assert ( + task_events_mgr._get_workflow_platforms_conf(itask, KEY) == + expected + ) + + +@pytest.mark.parametrize( + 'rt_val, schd_val, glbl_val, expected', + [ + ('rt', 'schd', 'glbl', 'rt'), + (None, 'schd', 'glbl', 'schd'), + (None, None, 'glbl', 'glbl'), + (None, None, None, 'default'), + ] +) +def test_get_events_conf__mail_to_from( + mock_glbl_cfg, + rt_val: Optional[str], + schd_val: Optional[str], + glbl_val: Optional[str], + expected: str +): + """Test order of precedence for [mail]to/from.""" + if glbl_val: + mock_glbl_cfg( + 'cylc.flow.task_events_mgr.glbl_cfg', + f''' + [scheduler] + [[mail]] + from = {glbl_val} + to = {glbl_val} + ''' + ) + mock_task = Mock( + spec=TaskProxy, + tdef=Mock( + spec=TaskDef, + rtconfig={ + 'events': {}, + 'mail': {'to': rt_val, 'from': rt_val} if rt_val else {}, + }, + ), + ) + mock_task_events_mgr = Mock( + spec=TaskEventsManager, + workflow_cfg={ + 'scheduler': { + 'mail': {'to': schd_val, 'from': schd_val}, + }, + } if schd_val else {}, + broadcast_mgr=Mock( + spec_set=BroadcastMgr, + get_broadcast=lambda *a, **k: {}, + ), + ) -class TestTaskEventsManager(unittest.TestCase): - - @mock.patch("cylc.flow.task_events_mgr.LOG") - def test_log_error_on_error_exit_code(self, cylc_log): - """Test that an error log is emitted when the log retrieval command - exited with a code different than zero. - - :param cylc_log: mocked cylc logger - :type cylc_log: mock.MagicMock - """ - task_events_manager = TaskEventsManager( - None, None, None, None, None, None, None, None, None) - proc_ctx = SubProcContext(cmd_key=None, cmd="error", ret_code=1, - err="Error!", id_keys=[]) - task_events_manager._job_logs_retrieval_callback(proc_ctx, None) - self.assertEqual(1, cylc_log.error.call_count) - self.assertTrue(cylc_log.error.call_args.contains("Error!")) - - @mock.patch("cylc.flow.task_events_mgr.LOG") - def test_log_debug_on_noerror_exit_code(self, cylc_log): - """Test that a debug log is emitted when the log retrieval command - exited with an non-error code (i.e. 0). - - :param cylc_log: mocked cylc logger - :type cylc_log: mock.MagicMock - """ - task_events_manager = TaskEventsManager( - None, None, None, None, None, None, None, None, None) - proc_ctx = SubProcContext(cmd_key=None, cmd="ls /tmp/123", ret_code=0, - err="", id_keys=[]) - task_events_manager._job_logs_retrieval_callback(proc_ctx, None) - self.assertEqual(1, cylc_log.debug.call_count) - self.assertTrue(cylc_log.debug.call_args.contains("ls /tmp/123")) + for key in ('to', 'from'): + assert TaskEventsManager._get_events_conf( + mock_task_events_mgr, itask=mock_task, key=key, default='default' + ) == expected diff --git a/tests/unit/test_task_events_mgr_2.py b/tests/unit/test_task_events_mgr_2.py deleted file mode 100644 index e129e3397a3..00000000000 --- a/tests/unit/test_task_events_mgr_2.py +++ /dev/null @@ -1,101 +0,0 @@ -# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. -# Copyright (C) NIWA & British Crown (Met Office) & Contributors. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -from unittest.mock import Mock -import pytest - - -from cylc.flow.task_events_mgr import TaskEventsManager - - -@pytest.mark.parametrize( - "broadcast, remote, platforms, expected", - [ - ("hpc1", "a", "b", "hpc1"), - (None, "hpc1", "b", "hpc1"), - (None, None, "hpc1", "hpc1"), - (None, None, None, None), - ] -) -def test_get_remote_conf(broadcast, remote, platforms, expected): - """Test TaskEventsManager._get_remote_conf().""" - - task_events_mgr = TaskEventsManager( - None, None, None, None, None, None, None, None, None) - - task_events_mgr.broadcast_mgr = Mock( - get_broadcast=lambda x: { - "remote": { - "host": broadcast - } - } - ) - - itask = Mock( - identity='foo.1', - tdef=Mock( - rtconfig={ - 'remote': { - 'host': remote - } - } - ), - platform={ - 'host': platforms - } - ) - - assert task_events_mgr._get_remote_conf(itask, 'host') == expected - - -@pytest.mark.parametrize( - "broadcast, workflow, platforms, expected", - [ - ([800], [700], [600], [800]), - (None, [700], [600], [700]), - (None, None, [600], [600]), - ] -) -def test_get_workflow_platforms_conf(broadcast, workflow, platforms, expected): - """Test TaskEventsManager._get_polling_interval_conf().""" - - task_events_mgr = TaskEventsManager( - None, None, None, None, None, None, None, None, None) - - KEY = "execution polling intervals" - - task_events_mgr.broadcast_mgr = Mock( - get_broadcast=lambda x: { - KEY: broadcast - } - ) - - itask = Mock( - identity='foo.1', - tdef=Mock( - rtconfig={ - KEY: workflow - } - ), - platform={ - KEY: platforms - } - ) - - assert ( - task_events_mgr._get_workflow_platforms_conf(itask, KEY) == - expected - ) diff --git a/tests/unit/test_workflow_events.py b/tests/unit/test_workflow_events.py index 3e6493c393d..89449953f20 100644 --- a/tests/unit/test_workflow_events.py +++ b/tests/unit/test_workflow_events.py @@ -29,22 +29,24 @@ @pytest.mark.parametrize( - 'key, expected, scheduler_mail_defined', + 'key, workflow_cfg, glbl_cfg, expected', [ - ('handlers', ['stall'], True), - ('hotel', None, True), - ('from', 'highway@mixture', True), - ('abort on workflow timeout', True, True), - ('handlers', ['stall'], False), - ('hotel', None, False), - ('abort on workflow timeout', True, False), + ('handlers', True, True, ['stall']), + ('handlers', False, True, None), + ('handlers', False, False, None), + ('from', True, True, 'docklands@railway'), + ('from', False, True, 'highway@mixture'), + ('from', False, False, None), + ('abort on workflow timeout', True, True, True), + ('abort on workflow timeout', False, True, True), + ('abort on workflow timeout', False, False, False), ] ) def test_get_events_handler( - mock_glbl_cfg, key, expected, scheduler_mail_defined + mock_glbl_cfg, key, workflow_cfg, glbl_cfg, expected ): - # It checks that method returns sensible answers. - if scheduler_mail_defined is True: + """Test order of precedence for getting event handler configuration.""" + if glbl_cfg: mock_glbl_cfg( 'cylc.flow.workflow_events.glbl_cfg', ''' @@ -55,23 +57,13 @@ def test_get_events_handler( abort on workflow timeout = True ''' ) - else: - mock_glbl_cfg( - 'cylc.flow.workflow_events.glbl_cfg', - ''' - [scheduler] - [[events]] - abort on workflow timeout = True - ''' - ) config = SimpleNamespace() config.cfg = { 'scheduler': { - 'events': { - 'handlers': ['stall'] - }, - } + 'events': {'handlers': ['stall']}, + 'mail': {'from': 'docklands@railway'}, + } if workflow_cfg else {'events': {}} } assert WorkflowEventHandler.get_events_conf(config, key) == expected