From d4f90c35eae3829063c270005aec84f72978f338 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Fri, 11 Aug 2023 09:57:05 +0100 Subject: [PATCH 1/5] id: make Tokens objects immutable * Having Tokens objects mutable seemed like a good idea at the time. * But in practice, Tokens are rarely modified. * And their mutability is a barrier to use as dictionary keys and caching. * Implement the `__hash__` interface and block the `__setitem__` and `update` interfaces. --- cylc/flow/id.py | 83 +++++++++----------------- cylc/flow/id_cli.py | 8 ++- cylc/flow/scripts/completion_server.py | 1 - cylc/flow/task_id.py | 5 +- cylc/flow/task_pool.py | 4 +- tests/integration/test_task_pool.py | 13 +++- tests/unit/test_id.py | 24 ++++++-- tests/unit/test_id_cli.py | 2 +- 8 files changed, 69 insertions(+), 71 deletions(-) diff --git a/cylc/flow/id.py b/cylc/flow/id.py index 222d16f82b6..5e2a7da7d6d 100644 --- a/cylc/flow/id.py +++ b/cylc/flow/id.py @@ -74,7 +74,7 @@ class Tokens(dict): >>> Tokens(workflow='w', cycle='c')['job'] - # Make a copy (note Tokens are mutable): + # Make a copy (note Tokens are immutable): >>> tokens.duplicate() >>> tokens.duplicate(job='02') # make changes at the same time @@ -118,9 +118,10 @@ def __init__( dict.__init__(self, **kwargs) def __setitem__(self, key, value): - if key not in self._KEYS: - raise ValueError(f'Invalid token: {key}') - dict.__setitem__(self, key, value) + raise Exception('Tokens objects are not mutable') + + def update(self, other): + raise Exception('Tokens objects are not mutable') def __getitem__(self, key): try: @@ -151,6 +152,9 @@ def __repr__(self): id_ = self.id return f'' + def __hash__(self): + return hash(tuple(self.values())) + def __eq__(self, other): if not isinstance(other, self.__class__): return False @@ -336,11 +340,9 @@ def is_null(self) -> bool: >>> tokens = Tokens() >>> tokens.is_null True - >>> tokens['job_sel'] = 'x' - >>> tokens.is_null + >>> tokens.duplicate(job_sel='x').is_null True - >>> tokens['job'] = '01' - >>> tokens.is_null + >>> tokens.duplicate(job='01').is_null False """ @@ -348,50 +350,10 @@ def is_null(self) -> bool: self[key] for key in self._REGULAR_KEYS ) - def update_tokens( - self, - tokens: 'Optional[Tokens]' = None, - **kwargs - ) -> None: - """Update the tokens dictionary. - - Similar to dict.update but with an optional Tokens argument. - - Examples: - >>> tokens = Tokens('x') - >>> tokens.update_tokens(workflow='y') - >>> tokens - - >>> tokens.update_tokens(Tokens('z')) - >>> tokens - - >>> tokens.update_tokens(Tokens('a'), cycle='b') - >>> tokens - - - """ - if tokens: - for key, value in tokens.items(): - self[key] = value - for key, value in kwargs.items(): - self[key] = value - - def update(self, other): - """dict.update. - - Example: - >>> tokens = Tokens(workflow='w') - >>> tokens.update({'cycle': 'c'}) - >>> tokens.id - 'w//c' - - """ - return self.update_tokens(**other) - def duplicate( self, - tokens: 'Optional[Tokens]' = None, - **kwargs + *tokens_list, + **kwargs, ) -> 'Tokens': """Duplicate a tokens object. @@ -408,17 +370,28 @@ def duplicate( >>> id(tokens1) == id(tokens2) False - Make a copy and modify it: + Make a copy with a modification: >>> tokens1.duplicate(cycle='1').id '~u/w//1' - Original not changed + The Original is not changed: >>> tokens1.id '~u/w' + + Arguments override in definition order: + >>> Tokens.duplicate( + ... tokens1, + ... Tokens(cycle='c', task='a', job='01'), + ... task='b' + ... ).id + '~u/w//c/b/01' + """ - ret = Tokens(self) - ret.update_tokens(tokens, **kwargs) - return ret + _kwargs = {} + for tokens in (self, *tokens_list): + _kwargs.update(tokens) + _kwargs.update(kwargs) + return Tokens(**_kwargs) # //cycle[:sel][/task[:sel][/job[:sel]]] diff --git a/cylc/flow/id_cli.py b/cylc/flow/id_cli.py index b47ba739bea..b6d2a9c4150 100644 --- a/cylc/flow/id_cli.py +++ b/cylc/flow/id_cli.py @@ -294,7 +294,7 @@ async def parse_ids_async( # infer the run number if not specified the ID (and if possible) if infer_latest_runs: - _infer_latest_runs(*tokens_list, src_path=src_path) + _infer_latest_runs(tokens_list, src_path=src_path) _validate_number( *tokens_list, @@ -409,12 +409,14 @@ def _validate_workflow_ids(*tokens_list, src_path): detect_both_flow_and_suite(src_path) -def _infer_latest_runs(*tokens_list, src_path): +def _infer_latest_runs(tokens_list, src_path): for ind, tokens in enumerate(tokens_list): if ind == 0 and src_path: # source workflow passed in as a path continue - tokens['workflow'] = infer_latest_run_from_id(tokens['workflow']) + tokens_list[ind] = tokens.duplicate( + workflow=infer_latest_run_from_id(tokens['workflow']) + ) pass diff --git a/cylc/flow/scripts/completion_server.py b/cylc/flow/scripts/completion_server.py index ead311955da..1677adb0d1e 100644 --- a/cylc/flow/scripts/completion_server.py +++ b/cylc/flow/scripts/completion_server.py @@ -390,7 +390,6 @@ async def list_in_workflow(tokens: Tokens, infer_run=True) -> t.List[str]: # list possible IDs cli_detokenise( tokens.duplicate( - tokens=None, # use the workflow ID provided on the CLI to allow # run name inference workflow=input_workflow, diff --git a/cylc/flow/task_id.py b/cylc/flow/task_id.py index 4a17a59548d..dbe003222b6 100644 --- a/cylc/flow/task_id.py +++ b/cylc/flow/task_id.py @@ -107,5 +107,6 @@ def get_standardised_point( def get_standardised_taskid(cls, task_id): """Return task ID with standardised cycle point.""" tokens = Tokens(task_id, relative=True) - tokens['cycle'] = cls.get_standardised_point_string(tokens['cycle']) - return tokens.relative_id + return tokens.duplicate( + cycle=cls.get_standardised_point_string(tokens['cycle']) + ).relative_id diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 851ee1e39b7..0fff50fc93e 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1917,7 +1917,7 @@ def match_future_tasks( # Glob or task state was not matched by active tasks if not tokens['task']: # make task globs explicit to make warnings clearer - tokens['task'] = '*' + tokens = tokens.duplicate(task='*') LOG.warning( 'No active tasks matching:' # preserve :selectors when logging the id @@ -1985,7 +1985,7 @@ def match_taskdefs( point_str = tokens['cycle'] if not tokens['task']: # make task globs explicit to make warnings clearer - tokens['task'] = '*' + tokens = tokens.duplicate(task='*') name_str = tokens['task'] try: point_str = standardise_point_string(point_str) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 97526088660..4688ca54586 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -216,7 +216,8 @@ async def test_filter_task_proxies( expected_bad_items: List[str], expected_warnings: List[str], mod_example_flow: Scheduler, - caplog: pytest.LogCaptureFixture + caplog: pytest.LogCaptureFixture, + monkeypatch, ) -> None: """Test TaskPool.filter_task_proxies(). @@ -256,7 +257,8 @@ async def test_filter_task_proxies_hidden( expected_bad_items: List[str], expected_warnings: List[str], mod_example_flow: Scheduler, - caplog: pytest.LogCaptureFixture + caplog: pytest.LogCaptureFixture, + monkeypatch, ) -> None: """Test TaskPool.filter_task_proxies(). @@ -277,6 +279,13 @@ async def test_filter_task_proxies_hidden( expected_bad_items: Expected to be returned. expected_warnings: Expected to be logged. """ + monkeypatch.setattr( + # make Tokens objects mutable to allow deepcopy to work on TaskProxy + # objects + 'cylc.flow.id.Tokens.__setitem__', + lambda self, key, value: dict.__setitem__(self, key, value), + ) + caplog.set_level(logging.WARNING, CYLC_LOG) task_pool = mod_example_flow.pool diff --git a/tests/unit/test_id.py b/tests/unit/test_id.py index ba47083b02f..4d46bebf725 100644 --- a/tests/unit/test_id.py +++ b/tests/unit/test_id.py @@ -316,9 +316,9 @@ def test_tokens(): with pytest.raises(ValueError): Tokens(foo='a') - Tokens()['cycle'] = 'a' + Tokens().duplicate(cycle='a') with pytest.raises(ValueError): - Tokens()['foo'] = 'a' + Tokens(foo='a') # test equality assert Tokens('a') == Tokens('a') @@ -335,10 +335,24 @@ def test_tokens(): assert not Tokens('a') == 1 tokens = Tokens('a//b') - tokens.update({'cycle': 'c', 'task': 'd'}) - assert tokens == Tokens('a//c/d') - with pytest.raises(ValueError): + new_tokens = tokens.duplicate(cycle='c', task='d') + assert new_tokens == Tokens('a//c/d') + with pytest.raises(Exception): tokens.update({'foo': 'c'}) + with pytest.raises(Exception): + tokens['cycle'] = 'a' + + # test gt/lt + assert sorted( + tokens.id + for tokens in [ + Tokens('~u/c'), + Tokens('~u/b//1'), + Tokens('~u/a'), + Tokens('~u/b'), + Tokens('~u/b//2'), + ] + ) == ['~u/a', '~u/b', '~u/b//1', '~u/b//2', '~u/c'] def test_no_look_behind(): diff --git a/tests/unit/test_id_cli.py b/tests/unit/test_id_cli.py index 9642a3c9874..699f69c4370 100644 --- a/tests/unit/test_id_cli.py +++ b/tests/unit/test_id_cli.py @@ -576,6 +576,6 @@ async def test_expand_workflow_tokens_impl_selector(no_scan): """It should reject filters it can't handle.""" tokens = tokenise('~user/*') await _expand_workflow_tokens([tokens]) - tokens['workflow_sel'] = 'stopped' + tokens = tokens.duplicate(workflow_sel='stopped') with pytest.raises(InputError): await _expand_workflow_tokens([tokens]) From 2fbcf4950ac89331657adff98d41259b3d385f61 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 3 Aug 2023 09:44:47 +0100 Subject: [PATCH 2/5] task_events_mgr: refactor event keys * Use a named tuple to store task event keys. * Rationalise the fields of task event keys. * Centralise event ID code. * Fixes an unrelated bug where the workflow port was not being included in emails. --- cylc/flow/task_events_mgr.py | 321 ++++++++++++++----- cylc/flow/task_pool.py | 40 +-- cylc/flow/task_proxy.py | 12 +- cylc/flow/workflow_db_mgr.py | 14 +- tests/integration/events/test_task_events.py | 5 +- 5 files changed, 279 insertions(+), 113 deletions(-) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index ec5e467b8aa..31514f88ca5 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -33,7 +33,16 @@ from shlex import quote import shlex from time import time -from typing import TYPE_CHECKING, List, Optional, Union, cast +from typing import ( + Any, + Dict, + List, + NamedTuple, + Optional, + TYPE_CHECKING, + Union, + cast, +) from cylc.flow import LOG, LOG_LEVELS from cylc.flow.cfgspec.glbl_cfg import glbl_cfg @@ -85,6 +94,7 @@ if TYPE_CHECKING: + from cylc.flow.id import Tokens from cylc.flow.task_proxy import TaskProxy from cylc.flow.scheduler import Scheduler @@ -104,6 +114,56 @@ ["key", "ctx_type", "platform_name", "max_size"]) +class EventKey(NamedTuple): + """Unique identifier for a task event. + + This contains event context information for event handlers. + """ + + """The event handler name.""" + handler: str + + """The task event.""" + event: str + + """The job tokens.""" + tokens: 'Tokens' + + +def get_event_id(event: str, itask: 'TaskProxy') -> str: + """Return a unique event identifier. + + Some events are not unique e.g. task "started" is unique in that it can + only happen once per-job, "warning", however, is not unique as this is a + message severity level which could be associated with any number of + custom task messages. + + To handle this tasks track non-unique-events and number them to ensure + their EventKey's remain unique for ease of event tracking. + + Examples: + >>> from types import SimpleNamespace + + # regular events are passed straight through: + >>> get_event_id('whatever', SimpleNamespace()) + 'whatever' + + # non-unique events get an integer added to the end: + >>> get_event_id('warning', SimpleNamespace(non_unique_events={ + ... 'warning': None, + ... })) + 'warning-1' + >>> get_event_id('warning', SimpleNamespace(non_unique_events={ + ... 'warning': 2, + ... })) + 'warning-2' + + """ + if event in TaskEventsManager.NON_UNIQUE_EVENTS: + event = f'{event}-{itask.non_unique_events[event] or 1:d}' + return event + + def log_task_job_activity(ctx, workflow, point, name, submit_num=None): """Log an activity for a task job.""" ctx_str = str(ctx) @@ -353,6 +413,11 @@ class TaskEventsManager(): NON_UNIQUE_EVENTS = ('warning', 'critical', 'custom') JOB_SUBMIT_SUCCESS_FLAG = 0 JOB_SUBMIT_FAIL_FLAG = 1 + JOB_LOGS_RETRIEVAL_EVENTS = { + EVENT_FAILED, + EVENT_RETRY, + EVENT_SUCCEEDED + } def __init__( self, workflow, proc_pool, workflow_db_mgr, broadcast_mgr, @@ -375,7 +440,7 @@ def __init__( self.reset_inactivity_timer_func = reset_inactivity_timer_func # NOTE: do not mutate directly # use the {add,remove,unset_waiting}_event_timers methods - self._event_timers = {} + self._event_timers: Dict[EventKey, Any] = {} # NOTE: flag for DB use self.event_timers_updated = True # To be set by the task pool: @@ -458,15 +523,15 @@ def process_events(self, schd: 'Scheduler') -> None: ctx_groups: dict = {} now = time() for id_key, timer in self._event_timers.copy().items(): - key1, point, name, submit_num = id_key if timer.is_waiting: continue # Set timer if timeout is None. if not timer.is_timeout_set(): if timer.next() is None: LOG.warning( - f"{point}/{name}/{submit_num:02d}" - f" handler:{key1[0]} for task event:{key1[1]} failed" + f"{id_key.tokens.relative_id}" + f" handler:{id_key.handler}" + f" for task event:{id_key.event} failed" ) self.remove_event_timer(id_key) continue @@ -474,16 +539,18 @@ def process_events(self, schd: 'Scheduler') -> None: msg = None if timer.num > 1: msg = ( - f"handler:{key1[0]} for task event:{key1[1]} failed," + f"handler:{id_key.handler}" + f" for task event:{id_key.event} failed," f" retrying in {timer.delay_timeout_as_str()}" ) elif timer.delay: msg = ( - f"handler:{key1[0]} for task event:{key1[1]} will" + f"handler:{id_key.handler}" + f" for task event:{id_key.event} will" f" run after {timer.delay_timeout_as_str()}" ) if msg: - LOG.debug(f"{point}/{name}/{submit_num:02d} {msg}") + LOG.debug("%s %s", id_key.tokens.relative_id, msg) # Ready to run? if not timer.is_delay_done() or ( # Avoid flooding user's mail box with mail notification. @@ -501,7 +568,7 @@ def process_events(self, schd: 'Scheduler') -> None: # Run custom event handlers on their own self.proc_pool.put_command( SubProcContext( - (key1, submit_num), + ((id_key.handler, id_key.event), id_key.tokens['job']), timer.ctx.cmd, env=os.environ, shell=True, # nosec @@ -833,10 +900,21 @@ def setup_event_handlers(self, itask, event, message): self._setup_event_mail(itask, event) self._setup_custom_event_handlers(itask, event, message) - def _custom_handler_callback(self, ctx, schd, id_key): + def _custom_handler_callback( + self, + ctx, + schd: 'Scheduler', + id_key: EventKey, + ) -> None: """Callback when a custom event handler is done.""" - _, point, name, submit_num = id_key - log_task_job_activity(ctx, schd.workflow, point, name, submit_num) + tokens = id_key.tokens + log_task_job_activity( + ctx, + schd.workflow, + tokens['cycle'], + tokens['task'], + tokens['job'], + ) if ctx.ret_code == 0: self.remove_event_timer(id_key) else: @@ -849,15 +927,21 @@ def _db_events_insert(self, itask, event="", message=""): "event": event, "message": message}) - def _process_event_email(self, schd: 'Scheduler', ctx, id_keys) -> None: + def _process_event_email( + self, + schd: 'Scheduler', + ctx, + id_keys: List[EventKey], + ) -> None: """Process event notification, by email.""" if len(id_keys) == 1: - # 1 event from 1 task - (_, event), point, name, submit_num = id_keys[0] - subject = "[%s/%s/%02d %s] %s" % ( - point, name, submit_num, event, schd.workflow) + id_key = id_keys[0] + subject = ( + f'[{id_key.tokens.relative_id} {id_key.event}]' + f' {schd.workflow}' + ) else: - event_set = {id_key[0][1] for id_key in id_keys} + event_set = {id_key.event for id_key in id_keys} if len(event_set) == 1: # 1 event from n tasks subject = "[%d tasks %s] %s" % ( @@ -874,28 +958,33 @@ def _process_event_email(self, schd: 'Scheduler', ctx, id_keys) -> None: # STDIN for mail, tasks stdin_str = "" for id_key in sorted(id_keys): - (_, event), point, name, submit_num = id_key - stdin_str += "%s: %s/%s/%02d\n" % (event, point, name, submit_num) + stdin_str += f'{id_key.event}: {id_key.tokens.relative_id}\n' + # STDIN for mail, event info + workflow detail stdin_str += "\n" - for key in ( - WorkflowEventData.Workflow.value, - WorkflowEventData.Host.value, - WorkflowEventData.Port.value, - WorkflowEventData.Owner.value, + for key, value in ( + (WorkflowEventData.Workflow.value, schd.workflow), + (WorkflowEventData.Host.value, schd.host), + (WorkflowEventData.Port.value, schd.server.port), + (WorkflowEventData.Owner.value, schd.owner), ): - value = getattr(schd, key, None) - if value: - stdin_str += '%s: %s\n' % (key, value) + stdin_str += '%s: %s\n' % (key, value) if self.mail_footer: stdin_str += process_mail_footer( self.mail_footer, - get_workflow_template_variables(schd, event, ''), + get_workflow_template_variables(schd, id_keys[-1].event, ''), ) self._send_mail(ctx, cmd, stdin_str, id_keys, schd) - def _send_mail(self, ctx, cmd, stdin_str, id_keys, schd): + def _send_mail( + self, + ctx, + cmd, + stdin_str, + id_keys: List[EventKey], + schd: 'Scheduler', + ) -> None: # SMTP server env = dict(os.environ) if self.mail_smtp: @@ -906,17 +995,28 @@ def _send_mail(self, ctx, cmd, stdin_str, id_keys, schd): ), callback=self._event_email_callback, callback_args=[schd]) - def _event_email_callback(self, proc_ctx, schd): + def _event_email_callback(self, proc_ctx, schd) -> None: """Call back when email notification command exits.""" + id_key: EventKey for id_key in proc_ctx.cmd_kwargs["id_keys"]: - key1, point, name, submit_num = id_key try: if proc_ctx.ret_code == 0: self.remove_event_timer(id_key) - log_ctx = SubProcContext((key1, submit_num), None) + log_ctx = SubProcContext( + ( + (id_key.handler, id_key.event), + id_key.tokens['job'] + ), + None, + ) log_ctx.ret_code = 0 log_task_job_activity( - log_ctx, schd.workflow, point, name, submit_num) + log_ctx, + schd.workflow, + id_key.tokens['cycle'], + id_key.tokens['task'], + id_key.tokens['job'], + ) else: self.unset_waiting_event_timer(id_key) except KeyError as exc: @@ -940,7 +1040,12 @@ def _get_events_conf(self, itask, key, default=None): return value return default - def _process_job_logs_retrieval(self, schd, ctx, id_keys): + def _process_job_logs_retrieval( + self, + schd: 'Scheduler', + ctx, + id_keys: List[EventKey], + ) -> None: """Process retrieval of task job logs from remote user@host.""" # get a host to run retrieval on try: @@ -981,12 +1086,29 @@ def _process_job_logs_retrieval(self, schd, ctx, id_keys): cmd.append("--max-size=%s" % (ctx.max_size,)) # Includes and excludes includes = set() - for _, point, name, submit_num in id_keys: + for id_key in id_keys: # Include relevant directories, all levels needed - includes.add("/%s" % (point)) - includes.add("/%s/%s" % (point, name)) - includes.add("/%s/%s/%02d" % (point, name, submit_num)) - includes.add("/%s/%s/%02d/**" % (point, name, submit_num)) + includes.add("/%s" % (id_key.tokens['cycle'])) + includes.add( + "/%s/%s" % ( + id_key.tokens['cycle'], + id_key.tokens['task'] + ) + ) + includes.add( + "/%s/%s/%02d" % ( + id_key.tokens['cycle'], + id_key.tokens['task'], + id_key.tokens['job'], + ) + ) + includes.add( + "/%s/%s/%02d/**" % ( + id_key.tokens['cycle'], + id_key.tokens['task'], + id_key.tokens['job'], + ) + ) cmd += ["--include=%s" % (include) for include in sorted(includes)] cmd.append("--exclude=/**") # exclude everything else # Remote source @@ -1009,16 +1131,15 @@ def _process_job_logs_retrieval(self, schd, ctx, id_keys): callback_255=self._job_logs_retrieval_callback_255 ) - def _job_logs_retrieval_callback_255(self, proc_ctx, schd): + def _job_logs_retrieval_callback_255(self, proc_ctx, schd) -> None: """Call back when log job retrieval fails with a 255 error.""" self.bad_hosts.add(proc_ctx.host) - for id_key in proc_ctx.cmd_kwargs["id_keys"]: - key1, point, name, submit_num = id_key + for _ in proc_ctx.cmd_kwargs["id_keys"]: for key in proc_ctx.cmd_kwargs['id_keys']: timer = self._event_timers[key] timer.reset() - def _job_logs_retrieval_callback(self, proc_ctx, schd): + def _job_logs_retrieval_callback(self, proc_ctx, schd) -> None: """Call back when log job retrieval completes.""" if ( (proc_ctx.ret_code and LOG.isEnabledFor(DEBUG)) @@ -1027,20 +1148,31 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd): LOG.error(proc_ctx) else: LOG.debug(proc_ctx) + id_key: EventKey for id_key in proc_ctx.cmd_kwargs["id_keys"]: - key1, point, name, submit_num = id_key try: # All completed jobs are expected to have a "job.out". fnames = [JOB_LOG_OUT] with suppress(TypeError): - if key1[1] not in 'succeeded': + if id_key.event not in 'succeeded': fnames.append(JOB_LOG_ERR) fname_oks = {} for fname in fnames: fname_oks[fname] = os.path.exists(get_task_job_log( - schd.workflow, point, name, submit_num, fname)) + schd.workflow, + id_key.tokens['cycle'], + id_key.tokens['task'], + id_key.tokens['job'], + fname, + )) # All expected paths must exist to record a good attempt - log_ctx = SubProcContext((key1, submit_num), None) + log_ctx = SubProcContext( + ( + (id_key.handler, id_key.event), + id_key.tokens['job'] + ), + None, + ) if all(fname_oks.values()): log_ctx.ret_code = 0 self.remove_event_timer(id_key) @@ -1052,7 +1184,12 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd): log_ctx.err += " %s" % fname self.unset_waiting_event_timer(id_key) log_task_job_activity( - log_ctx, schd.workflow, point, name, submit_num) + log_ctx, + schd.workflow, + id_key.tokens['cycle'], + id_key.tokens['task'], + id_key.tokens['job'], + ) except KeyError as exc: LOG.exception(exc) @@ -1337,23 +1474,29 @@ def _insert_task_job( } ) - def _setup_job_logs_retrieval(self, itask, event): + def _setup_job_logs_retrieval(self, itask, event) -> None: """Set up remote job logs retrieval. For a task with a job completion event, i.e. succeeded, failed, (execution) retry. """ - id_key = ( - (self.HANDLER_JOB_LOGS_RETRIEVE, event), - str(itask.point), itask.tdef.name, itask.submit_num) - events = (self.EVENT_FAILED, self.EVENT_RETRY, self.EVENT_SUCCEEDED) if ( - event not in events or - not is_remote_platform(itask.platform) or - not self._get_remote_conf(itask, "retrieve job logs") or - id_key in self._event_timers + event not in self.JOB_LOGS_RETRIEVAL_EVENTS + or not is_remote_platform(itask.platform) + or not self._get_remote_conf(itask, "retrieve job logs") ): + # event does not need to be processed return + + id_key = EventKey( + self.HANDLER_JOB_LOGS_RETRIEVE, + event, + itask.tokens.duplicate(job=itask.submit_num), + ) + if id_key in self._event_timers: + # event already being processed + return + retry_delays = self._get_remote_conf( itask, "retrieve job logs retry delays") if not retry_delays: @@ -1371,18 +1514,19 @@ def _setup_job_logs_retrieval(self, itask, event): ) ) - def _setup_event_mail(self, itask, event): + def _setup_event_mail(self, itask: 'TaskProxy', event: str) -> None: """Set up task event notification, by email.""" - if event in self.NON_UNIQUE_EVENTS: - key1 = ( - self.HANDLER_MAIL, - '%s-%d' % (event, itask.non_unique_events[event] or 1) - ) - else: - key1 = (self.HANDLER_MAIL, event) - id_key = (key1, str(itask.point), itask.tdef.name, itask.submit_num) - if (id_key in self._event_timers or - event not in self._get_events_conf(itask, "mail events", [])): + if event not in self._get_events_conf(itask, "mail events", []): + # event does not need to be processed + return + + id_key = EventKey( + self.HANDLER_MAIL, + get_event_id(event, itask), + itask.tokens.duplicate(job=itask.submit_num), + ) + if id_key in self._event_timers: + # event already being processed return self.add_event_timer( @@ -1401,11 +1545,18 @@ def _setup_event_mail(self, itask, event): ) ) - def _setup_custom_event_handlers(self, itask, event, message): + def _setup_custom_event_handlers( + self, + itask: 'TaskProxy', + event: str, + message: str, + ) -> None: """Set up custom task event handlers.""" handlers = self._get_events_conf(itask, f'{event} handlers') - if (handlers is None and - event in self._get_events_conf(itask, 'handler events', [])): + if ( + handlers is None + and event in self._get_events_conf(itask, 'handler events', []) + ): handlers = self._get_events_conf(itask, 'handlers') if handlers is None: return @@ -1417,15 +1568,12 @@ def _setup_custom_event_handlers(self, itask, event, message): retry_delays = [0] # There can be multiple custom event handlers for i, handler in enumerate(handlers): - if event in self.NON_UNIQUE_EVENTS: - key1 = ( - f'{self.HANDLER_CUSTOM}-{i:02d}', - f'{event}-{itask.non_unique_events[event] or 1:d}' - ) - else: - key1 = (f'{self.HANDLER_CUSTOM}-{i:02d}', event) - id_key = ( - key1, str(itask.point), itask.tdef.name, itask.submit_num) + id_key = EventKey( + f'{self.HANDLER_CUSTOM}-{i:02d}', + get_event_id(event, itask), + itask.tokens.duplicate(job=itask.submit_num), + ) + if id_key in self._event_timers: continue # Note: user@host may not always be set for a submit number, e.g. @@ -1444,12 +1592,13 @@ def _setup_custom_event_handlers(self, itask, event, message): message, platform_name, ) + key1 = (id_key.handler, id_key.event) try: cmd = handler % template_variables except KeyError as exc: LOG.error( - f"{itask.point}/{itask.tdef.name}/{itask.submit_num:02d} " - f"{key1} bad template: {exc}") + f'{id_key.tokens.relative_id}' + f" {key1} bad template: {exc}") continue if cmd == handler: @@ -1671,7 +1820,7 @@ def process_execution_polling_intervals( delays += time_limit_polling_intervals return delays - def add_event_timer(self, id_key, event_timer): + def add_event_timer(self, id_key: EventKey, event_timer) -> None: """Add a new event timer. Args: @@ -1682,7 +1831,7 @@ def add_event_timer(self, id_key, event_timer): self._event_timers[id_key] = event_timer self.event_timers_updated = True - def remove_event_timer(self, id_key): + def remove_event_timer(self, id_key: EventKey) -> None: """Remove an event timer. Args: @@ -1692,7 +1841,7 @@ def remove_event_timer(self, id_key): del self._event_timers[id_key] self.event_timers_updated = True - def unset_waiting_event_timer(self, id_key): + def unset_waiting_event_timer(self, id_key: EventKey) -> None: """Invoke unset_waiting on an event timer. Args: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 0fff50fc93e..a7a2faa0a5a 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -43,8 +43,11 @@ from cylc.flow.workflow_status import StopMode from cylc.flow.task_action_timer import TaskActionTimer, TimerFlags from cylc.flow.task_events_mgr import ( - CustomTaskEventHandlerContext, TaskEventMailContext, - TaskJobLogsRetrieveContext) + CustomTaskEventHandlerContext, + EventKey, + TaskEventMailContext, + TaskJobLogsRetrieveContext, +) from cylc.flow.task_id import TaskID from cylc.flow.task_proxy import TaskProxy from cylc.flow.task_state import ( @@ -617,16 +620,17 @@ def load_db_task_pool_for_restart(self, row_idx, row): self.compute_runahead() self.release_runahead_tasks() - def load_db_task_action_timers(self, row_idx, row): + def load_db_task_action_timers(self, row_idx, row) -> None: """Load a task action timer, e.g. event handlers, retry states.""" if row_idx == 0: LOG.info("LOADING task action timers") (cycle, name, ctx_key_raw, ctx_raw, delays_raw, num, delay, timeout) = row - id_ = Tokens( + tokens = Tokens( cycle=cycle, task=name, - ).relative_id + ) + id_ = tokens.relative_id try: # Extract type namedtuple variables from JSON strings ctx_key = json.loads(str(ctx_key_raw)) @@ -680,14 +684,13 @@ def load_db_task_action_timers(self, row_idx, row): itask.try_timers[ctx_key[1]] = TaskActionTimer( ctx, delays, num, delay, timeout) elif ctx: - key1, submit_num = ctx_key - # Convert key1 to type tuple - JSON restores as type list - # and this will not previously have been converted back - if isinstance(key1, list): - key1 = tuple(key1) - key = (key1, cycle, name, submit_num) + (handler, event), submit_num = ctx_key self.task_events_mgr.add_event_timer( - key, + EventKey( + handler, + event, + tokens.duplicate(job=submit_num), + ), TaskActionTimer( ctx, delays, num, delay, timeout ) @@ -1091,7 +1094,7 @@ def can_stop(self, stop_mode): for itask in self.get_tasks() ) - def warn_stop_orphans(self): + def warn_stop_orphans(self) -> None: """Log (warning) orphaned tasks on workflow stop.""" orphans = [] orphans_kill_failed = [] @@ -1118,11 +1121,12 @@ def warn_stop_orphans(self): ) ) - for key1, point, name, submit_num in ( - self.task_events_mgr._event_timers - ): - LOG.warning("%s/%s/%s: incomplete task event handler %s" % ( - point, name, submit_num, key1)) + for id_key in self.task_events_mgr._event_timers: + LOG.warning( + f"{id_key.tokens.relative_id}:" + " incomplete task event handler" + f" {(id_key.handler, id_key.event)}" + ) def log_incomplete_tasks(self) -> bool: """Log finished but incomplete tasks; return True if there any.""" diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 3db5c731ec7..0e7fe1d8868 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -20,7 +20,15 @@ from copy import copy from fnmatch import fnmatchcase from typing import ( - Any, Callable, Dict, List, Set, Tuple, Optional, TYPE_CHECKING + Any, + Callable, + Counter as TypingCounter, + Dict, + List, + Optional, + Set, + TYPE_CHECKING, + Tuple, ) from metomi.isodatetime.timezone import get_local_time_zone @@ -242,7 +250,7 @@ def __init__( self.poll_timer: Optional['TaskActionTimer'] = None self.timeout: Optional[float] = None self.try_timers: Dict[str, 'TaskActionTimer'] = {} - self.non_unique_events = Counter() # type: ignore # TODO: figure out + self.non_unique_events: TypingCounter[str] = Counter() self.clock_trigger_times: Dict[str, int] = {} self.expire_time: Optional[float] = None diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index e9402ba058b..99dc91e975d 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -47,6 +47,7 @@ from cylc.flow.cycling import PointBase from cylc.flow.scheduler import Scheduler from cylc.flow.task_pool import TaskPool + from cylc.flow.task_events_mgr import EventKey Version = Any # TODO: narrow down Any (should be str | int) after implementing type @@ -385,16 +386,17 @@ def put_workflow_template_vars( for key, value in template_vars.items() ) - def put_task_event_timers(self, task_events_mgr): + def put_task_event_timers(self, task_events_mgr) -> None: """Put statements to update the task_action_timers table.""" if task_events_mgr.event_timers_updated: self.db_deletes_map[self.TABLE_TASK_ACTION_TIMERS].append({}) - for key, timer in task_events_mgr._event_timers.items(): - key1, point, name, submit_num = key + id_key: 'EventKey' + for id_key, timer in task_events_mgr._event_timers.items(): + key1 = (id_key.handler, id_key.event) self.db_inserts_map[self.TABLE_TASK_ACTION_TIMERS].append({ - "name": name, - "cycle": point, - "ctx_key": json.dumps((key1, submit_num,)), + "name": id_key.tokens['task'], + "cycle": id_key.tokens['cycle'], + "ctx_key": json.dumps((key1, id_key.tokens['job'],)), "ctx": self._namedtuple2json(timer.ctx), "delays": json.dumps(timer.delays), "num": timer.num, diff --git a/tests/integration/events/test_task_events.py b/tests/integration/events/test_task_events.py index 4d155c4a48d..2e7db7efe21 100644 --- a/tests/integration/events/test_task_events.py +++ b/tests/integration/events/test_task_events.py @@ -18,6 +18,9 @@ from .test_workflow_events import TEMPLATES +from cylc.flow.id import Tokens +from cylc.flow.task_events_mgr import EventKey + import pytest @@ -48,7 +51,7 @@ async def test_mail_footer_template( # start the workflow and get it to send an email ctx = SimpleNamespace(mail_to=None, mail_from=None) - id_keys = [((None, 'failed'), '1', 'a', 1)] + id_keys = [EventKey('none', 'failed', Tokens('//1/a'))] async with start(mod_one) as one_log: mod_one.task_events_mgr._process_event_email(mod_one, ctx, id_keys) From 01d6b8f74622245271e1cb0270665a956ff22789 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 12 Oct 2023 17:27:06 +0100 Subject: [PATCH 3/5] task_events_mgr: include event message with event emails * Closes #5566 * Note, the event message is not stored in the database, so when events are restored from the DB on restart, the event message will default to the event name. --- changes.d/5769.feat.md | 1 + cylc/flow/id.py | 6 +++ cylc/flow/task_events_mgr.py | 25 +++++++++-- cylc/flow/task_pool.py | 3 ++ tests/functional/events/09-task-event-mail.t | 20 ++++----- .../functional/events/29-task-event-mail-1.t | 16 ++++---- .../functional/events/30-task-event-mail-2.t | 41 +++++++++++-------- tests/integration/events/test_task_events.py | 29 ++++++++++++- 8 files changed, 102 insertions(+), 39 deletions(-) create mode 100644 changes.d/5769.feat.md diff --git a/changes.d/5769.feat.md b/changes.d/5769.feat.md new file mode 100644 index 00000000000..e0bda65a22c --- /dev/null +++ b/changes.d/5769.feat.md @@ -0,0 +1 @@ +Include task messages and workflow port as appropriate in emails configured by "mail events". diff --git a/cylc/flow/id.py b/cylc/flow/id.py index 5e2a7da7d6d..db6d5ca8d80 100644 --- a/cylc/flow/id.py +++ b/cylc/flow/id.py @@ -163,6 +163,12 @@ def __eq__(self, other): for key in self._KEYS ) + def __lt__(self, other): + return self.id < other.id + + def __gt__(self, other): + return self.id > other.id + def __ne__(self, other): if not isinstance(other, self.__class__): return True diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 31514f88ca5..650f4da21fa 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -126,6 +126,13 @@ class EventKey(NamedTuple): """The task event.""" event: str + """The task event message. + + Warning: This information is not currently preserved in the DB so will be + lost on restart. + """ + message: str + """The job tokens.""" tokens: 'Tokens' @@ -897,7 +904,7 @@ def setup_event_handlers(self, itask, event, message): msg = message self._db_events_insert(itask, event, msg) self._setup_job_logs_retrieval(itask, event) - self._setup_event_mail(itask, event) + self._setup_event_mail(itask, event, message) self._setup_custom_event_handlers(itask, event, message) def _custom_handler_callback( @@ -951,14 +958,18 @@ def _process_event_email( subject = "[%d task events] %s" % ( len(id_keys), schd.workflow) cmd = ["mail", "-s", subject] + # From: and To: cmd.append("-r") cmd.append(ctx.mail_from) cmd.append(ctx.mail_to) + # STDIN for mail, tasks stdin_str = "" for id_key in sorted(id_keys): - stdin_str += f'{id_key.event}: {id_key.tokens.relative_id}\n' + stdin_str += f'job: {id_key.tokens.relative_id}\n' + stdin_str += f'event: {id_key.event}\n' + stdin_str += f'message: {id_key.message}\n\n' # STDIN for mail, event info + workflow detail stdin_str += "\n" @@ -1491,6 +1502,7 @@ def _setup_job_logs_retrieval(self, itask, event) -> None: id_key = EventKey( self.HANDLER_JOB_LOGS_RETRIEVE, event, + event, itask.tokens.duplicate(job=itask.submit_num), ) if id_key in self._event_timers: @@ -1514,7 +1526,12 @@ def _setup_job_logs_retrieval(self, itask, event) -> None: ) ) - def _setup_event_mail(self, itask: 'TaskProxy', event: str) -> None: + def _setup_event_mail( + self, + itask: 'TaskProxy', + event: str, + message: str, + ) -> None: """Set up task event notification, by email.""" if event not in self._get_events_conf(itask, "mail events", []): # event does not need to be processed @@ -1523,6 +1540,7 @@ def _setup_event_mail(self, itask: 'TaskProxy', event: str) -> None: id_key = EventKey( self.HANDLER_MAIL, get_event_id(event, itask), + message, itask.tokens.duplicate(job=itask.submit_num), ) if id_key in self._event_timers: @@ -1571,6 +1589,7 @@ def _setup_custom_event_handlers( id_key = EventKey( f'{self.HANDLER_CUSTOM}-{i:02d}', get_event_id(event, itask), + message, itask.tokens.duplicate(job=itask.submit_num), ) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index a7a2faa0a5a..7cefa96e2cd 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -689,6 +689,9 @@ def load_db_task_action_timers(self, row_idx, row) -> None: EventKey( handler, event, + # NOTE: the event "message" is not preserved in the DB so + # we use the event as a placeholder + event, tokens.duplicate(job=submit_num), ), TaskActionTimer( diff --git a/tests/functional/events/09-task-event-mail.t b/tests/functional/events/09-task-event-mail.t index 9f6195feb20..f859b16c761 100755 --- a/tests/functional/events/09-task-event-mail.t +++ b/tests/functional/events/09-task-event-mail.t @@ -20,7 +20,7 @@ if ! command -v mail 2>'/dev/null'; then skip_all '"mail" command not available' fi -set_test_number 5 +set_test_number 6 mock_smtpd_init OPT_SET= if [[ "${TEST_NAME_BASE}" == *-globalcfg ]]; then @@ -49,17 +49,15 @@ run_ok "${TEST_NAME_BASE}-validate" \ workflow_run_ok "${TEST_NAME_BASE}-run" \ cylc play --reference-test --debug --no-detach ${OPT_SET} "${WORKFLOW_NAME}" -contains_ok "${TEST_SMTPD_LOG}" <<__LOG__ -retry: 1/t1/01 -succeeded: 1/t1/02 -see: http://localhost/stuff/${USER}/${WORKFLOW_NAME}/ -__LOG__ +run_ok "${TEST_NAME_BASE}-grep-log-1" \ + grep -Pizo 'job: 1/t1/01.*\n.*event: retry' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-grep-log-2" \ + grep -Pizo 'job: 1/t1/02.*\n.*event: succeeded' "${TEST_SMTPD_LOG}" - -run_ok "${TEST_NAME_BASE}-grep-log" \ - grep -qPizo "Subject: \[1/t1/01 retry\]\n? ${WORKFLOW_NAME}" "${TEST_SMTPD_LOG}" -run_ok "${TEST_NAME_BASE}-grep-log" \ - grep -qPizo "Subject: \[1/t1/02 succeeded\]\n? ${WORKFLOW_NAME}" "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-grep-log-3" \ + grep -Pizo "Subject: \[1/t1/01 retry\].*(\n)?.* ${WORKFLOW_NAME}" "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-grep-log-4" \ + grep -Pizo "Subject: \[1/t1/02 succeeded\].*(\n)?.* ${WORKFLOW_NAME}" "${TEST_SMTPD_LOG}" purge mock_smtpd_kill diff --git a/tests/functional/events/29-task-event-mail-1.t b/tests/functional/events/29-task-event-mail-1.t index 7ca6e76a7c2..94669abc9be 100755 --- a/tests/functional/events/29-task-event-mail-1.t +++ b/tests/functional/events/29-task-event-mail-1.t @@ -20,7 +20,7 @@ if ! command -v mail 2>'/dev/null'; then skip_all '"mail" command not available' fi -set_test_number 4 +set_test_number 5 mock_smtpd_init create_test_global_config " @@ -37,13 +37,15 @@ run_ok "${TEST_NAME_BASE}-validate" \ workflow_run_ok "${TEST_NAME_BASE}-run" \ cylc play --reference-test --debug --no-detach "$WORKFLOW_NAME" -contains_ok "${TEST_SMTPD_LOG}" <<__LOG__ -retry: 1/t1/01 -see: http://localhost/stuff/${USER}/${WORKFLOW_NAME}/ -__LOG__ +run_ok "${TEST_NAME_BASE}-grep-log-1" \ + grep -Pizo "job: 1/t1/01.*\n.*event: retry.*\n.*" "${TEST_SMTPD_LOG}" -run_ok "${TEST_NAME_BASE}-grep-log" \ - grep -qPizo "Subject: \[1/t1/01 retry\]\n? ${WORKFLOW_NAME}" "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-grep-log-2" grep \ + "see: http://localhost/stuff/${USER}/${WORKFLOW_NAME}/" \ + "${TEST_SMTPD_LOG}" + +run_ok "${TEST_NAME_BASE}-grep-log-2" \ + grep -Pizo "Subject: \\[1/t1/01 retry\\].*(\n)?.*${WORKFLOW_NAME}" "${TEST_SMTPD_LOG}" purge mock_smtpd_kill diff --git a/tests/functional/events/30-task-event-mail-2.t b/tests/functional/events/30-task-event-mail-2.t index 85f6b301654..09d2618a515 100755 --- a/tests/functional/events/30-task-event-mail-2.t +++ b/tests/functional/events/30-task-event-mail-2.t @@ -20,7 +20,7 @@ if ! command -v mail 2>'/dev/null'; then skip_all '"mail" command not available' fi -set_test_number 5 +set_test_number 20 mock_smtpd_init OPT_SET= if [[ "${TEST_NAME_BASE}" == *-globalcfg ]]; then @@ -49,22 +49,29 @@ run_ok "${TEST_NAME_BASE}-validate" \ workflow_run_fail "${TEST_NAME_BASE}-run" \ cylc play --reference-test --debug --no-detach ${OPT_SET} "${WORKFLOW_NAME}" +# 1 - retry +run_ok "${TEST_NAME_BASE}-t1-01" grep -Pizo 'job: 1/t1/01.*\n.*event: retry' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t2-01" grep -Pizo 'job: 1/t2/01.*\n.*event: retry' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t3-01" grep -Pizo 'job: 1/t3/01.*\n.*event: retry' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t4-01" grep -Pizo 'job: 1/t4/01.*\n.*event: retry' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t5-01" grep -Pizo 'job: 1/t5/01.*\n.*event: retry' "${TEST_SMTPD_LOG}" + +# 2 - retry +run_ok "${TEST_NAME_BASE}-t1-02" grep -Pizo 'job: 1/t1/02.*\n.*event: retry' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t2-02" grep -Pizo 'job: 1/t2/02.*\n.*event: retry' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t3-02" grep -Pizo 'job: 1/t3/02.*\n.*event: retry' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t4-02" grep -Pizo 'job: 1/t4/02.*\n.*event: retry' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t5-02" grep -Pizo 'job: 1/t5/02.*\n.*event: retry' "${TEST_SMTPD_LOG}" + +# 3 - fail +run_ok "${TEST_NAME_BASE}-t1-03" grep -Pizo 'job: 1/t1/03.*\n.*event: failed' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t2-03" grep -Pizo 'job: 1/t2/03.*\n.*event: failed' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t3-03" grep -Pizo 'job: 1/t3/03.*\n.*event: failed' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t4-03" grep -Pizo 'job: 1/t4/03.*\n.*event: failed' "${TEST_SMTPD_LOG}" +run_ok "${TEST_NAME_BASE}-t5-03" grep -Pizo 'job: 1/t5/03.*\n.*event: failed' "${TEST_SMTPD_LOG}" + + contains_ok "${TEST_SMTPD_LOG}" <<__LOG__ -retry: 1/t1/01 -retry: 1/t2/01 -retry: 1/t3/01 -retry: 1/t4/01 -retry: 1/t5/01 -retry: 1/t1/02 -retry: 1/t2/02 -retry: 1/t3/02 -retry: 1/t4/02 -retry: 1/t5/02 -failed: 1/t1/03 -failed: 1/t2/03 -failed: 1/t3/03 -failed: 1/t4/03 -failed: 1/t5/03 see: http://localhost/stuff/${USER}/${WORKFLOW_NAME}/ __LOG__ @@ -73,6 +80,6 @@ run_ok "${TEST_NAME_BASE}-grep-log" \ run_ok "${TEST_NAME_BASE}-grep-log" \ grep -qPizo "Subject: \[. tasks failed\]\n? ${WORKFLOW_NAME}" "${TEST_SMTPD_LOG}" - purge +purge mock_smtpd_kill exit diff --git a/tests/integration/events/test_task_events.py b/tests/integration/events/test_task_events.py index 2e7db7efe21..3ae30c1fe73 100644 --- a/tests/integration/events/test_task_events.py +++ b/tests/integration/events/test_task_events.py @@ -51,7 +51,7 @@ async def test_mail_footer_template( # start the workflow and get it to send an email ctx = SimpleNamespace(mail_to=None, mail_from=None) - id_keys = [EventKey('none', 'failed', Tokens('//1/a'))] + id_keys = [EventKey('none', 'failed', 'failed', Tokens('//1/a'))] async with start(mod_one) as one_log: mod_one.task_events_mgr._process_event_email(mod_one, ctx, id_keys) @@ -72,5 +72,32 @@ async def test_mail_footer_template( assert len(mail_calls) == 1 +async def test_event_email_body( + mod_one, + start, + capcall, +): + """It should send an email with the event context.""" + mail_calls = capcall( + 'cylc.flow.task_events_mgr.TaskEventsManager._send_mail' + ) + + # start the workflow and get it to send an email + ctx = SimpleNamespace(mail_to=None, mail_from=None) + async with start(mod_one): + # send a custom task message with the warning severity level + id_keys = [EventKey('none', 'warning', 'warning message', Tokens('//1/a/01'))] + mod_one.task_events_mgr._process_event_email(mod_one, ctx, id_keys) + + # test the email which would have been sent for this message + email_body = mail_calls[0][0][3] + assert 'event: warning' + assert 'job: 1/a/01' in email_body + assert 'message: warning message' in email_body + assert f'workflow: {mod_one.tokens["workflow"]}' in email_body + assert f'host: {mod_one.host}' in email_body + assert f'port: {mod_one.server.port}' in email_body + assert f'owner: {mod_one.owner}' in email_body + # NOTE: we do not test custom event handlers here because these are tested # as a part of workflow validation (now also performed by cylc play) From a4976e1e3c7bda092a2423c1a1d61238224ad375 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 12 Oct 2023 17:05:20 +0100 Subject: [PATCH 4/5] tests/f: fix restart/44 * Test had become broken by a positive behaviour change. --- tests/functional/restart/44-reinvoke.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/restart/44-reinvoke.t b/tests/functional/restart/44-reinvoke.t index ed9c6905d69..d9043fa6c85 100755 --- a/tests/functional/restart/44-reinvoke.t +++ b/tests/functional/restart/44-reinvoke.t @@ -53,7 +53,7 @@ run_ok "${TEST_NAME_BASE}-cmd" \ sed -n -i 's/CYLC_WORKFLOW_COMMAND=.*cylc //p' "${TEST_NAME_BASE}-cmd.stdout" # ensure the whole workflow ID is present in the command (including the run number) -CMD="^play --pause ${WORKFLOW_NAME}/run1 --host=localhost$" +CMD="^play --pause ${WORKFLOW_NAME}/run1 --host=localhost --color=never$" if grep "${CMD}" "${TEST_NAME_BASE}-cmd.stdout"; then ok "${TEST_NAME_BASE}-re-invoked-id" else From 615013b76e72fc68dcce0314107494d8386c3e90 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 16 Jan 2024 09:56:25 +0000 Subject: [PATCH 5/5] task message: provide message to scheduler mail footer --- cylc/flow/task_events_mgr.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 650f4da21fa..2ff2d7e44d1 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -984,7 +984,11 @@ def _process_event_email( if self.mail_footer: stdin_str += process_mail_footer( self.mail_footer, - get_workflow_template_variables(schd, id_keys[-1].event, ''), + get_workflow_template_variables( + schd, + id_keys[-1].event, + id_keys[-1].message, + ), ) self._send_mail(ctx, cmd, stdin_str, id_keys, schd)