Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure task mail respects [scheduler][mail]to/from #6008

Merged
merged 3 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,6 @@ async def configure(self, params):
"task event batch interval"]
self.task_events_mgr.mail_smtp = self._get_events_conf("smtp")
self.task_events_mgr.mail_footer = self._get_events_conf("footer")
self.task_events_mgr.workflow_url = self.config.cfg['meta']['URL']
self.task_events_mgr.workflow_cfg = self.config.cfg
if self.options.genref:
LOG.addHandler(ReferenceLogFileHandler(
Expand Down
126 changes: 59 additions & 67 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@
"""

from contextlib import suppress
from collections import namedtuple
from enum import Enum
from logging import DEBUG, INFO, getLevelName
import os
from shlex import quote
import shlex
from time import time
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
NamedTuple,
Optional,
TYPE_CHECKING,
Sequence,
Union,
cast,
)
Expand Down Expand Up @@ -88,6 +88,7 @@
)
from cylc.flow.workflow_events import (
EventData as WorkflowEventData,
construct_mail_cmd,
get_template_variables as get_workflow_template_variables,
process_mail_footer,
)
Expand All @@ -99,19 +100,21 @@
from cylc.flow.scheduler import Scheduler


CustomTaskEventHandlerContext = namedtuple(
"CustomTaskEventHandlerContext",
["key", "ctx_type", "cmd"])
class CustomTaskEventHandlerContext(NamedTuple):
key: Union[str, Sequence[str]]
cmd: str


TaskEventMailContext = namedtuple(
"TaskEventMailContext",
["key", "ctx_type", "mail_from", "mail_to"])
class TaskEventMailContext(NamedTuple):
key: str
mail_from: str
mail_to: str


TaskJobLogsRetrieveContext = namedtuple(
"TaskJobLogsRetrieveContext",
["key", "ctx_type", "platform_name", "max_size"])
class TaskJobLogsRetrieveContext(NamedTuple):
key: Union[str, Sequence[str]]
platform_name: Optional[str]
max_size: Optional[int]


class EventKey(NamedTuple):
Expand Down Expand Up @@ -426,23 +429,23 @@
EVENT_SUCCEEDED
}

workflow_cfg: Dict[str, Any]
uuid_str: str
mail_interval: float = 0
mail_smtp: Optional[str] = None
mail_footer: Optional[str] = None

def __init__(
self, workflow, proc_pool, workflow_db_mgr, broadcast_mgr,
xtrigger_mgr, data_store_mgr, timestamp, bad_hosts,
reset_inactivity_timer_func
):
self.workflow = workflow
self.workflow_url = None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was unused

self.workflow_cfg = {}
self.uuid_str = None
self.proc_pool = proc_pool
self.workflow_db_mgr = workflow_db_mgr
self.broadcast_mgr = broadcast_mgr
self.xtrigger_mgr = xtrigger_mgr
self.data_store_mgr = data_store_mgr
self.mail_interval = 0.0
self.mail_smtp = None
self.mail_footer = None
self.next_mail_time = None
self.reset_inactivity_timer_func = reset_inactivity_timer_func
# NOTE: do not mutate directly
Expand Down Expand Up @@ -563,15 +566,15 @@
# Avoid flooding user's mail box with mail notification.
# Group together as many notifications as possible within a
# given interval.
timer.ctx.ctx_type == self.HANDLER_MAIL and
isinstance(timer.ctx, TaskEventMailContext) and
not schd.stop_mode and
self.next_mail_time is not None and
self.next_mail_time > now
):
continue

timer.set_waiting()
if timer.ctx.ctx_type == self.HANDLER_CUSTOM:
if isinstance(timer.ctx, CustomTaskEventHandlerContext):
# Run custom event handlers on their own
self.proc_pool.put_command(
SubProcContext(
Expand All @@ -591,11 +594,11 @@

next_mail_time = now + self.mail_interval
for ctx, id_keys in ctx_groups.items():
if ctx.ctx_type == self.HANDLER_MAIL:
if isinstance(ctx, TaskEventMailContext):
# Set next_mail_time if any mail sent
self.next_mail_time = next_mail_time
self._process_event_email(schd, ctx, id_keys)
elif ctx.ctx_type == self.HANDLER_JOB_LOGS_RETRIEVE:
elif isinstance(ctx, TaskJobLogsRetrieveContext):
self._process_job_logs_retrieval(schd, ctx, id_keys)

def process_message(
Expand Down Expand Up @@ -884,15 +887,15 @@
)
return False

severity = cast('int', LOG_LEVELS.get(severity, INFO))
severity_lvl: int = LOG_LEVELS.get(severity, INFO)
# Demote log level to DEBUG if this is a message that duplicates what
# gets logged by itask state change anyway (and not manual poll)
if severity > DEBUG and flag != self.FLAG_POLLED and message in {
if severity_lvl > DEBUG and flag != self.FLAG_POLLED and message in {
self.EVENT_SUBMITTED, self.EVENT_STARTED, self.EVENT_SUCCEEDED,
self.EVENT_SUBMIT_FAILED, f'{FAIL_MESSAGE_PREFIX}ERR'
}:
severity = DEBUG
LOG.log(severity, f"[{itask}] {flag}{message}{timestamp}")
severity_lvl = DEBUG
LOG.log(severity_lvl, f"[{itask}] {flag}{message}{timestamp}")
return True

def setup_event_handlers(self, itask, event, message):
Expand Down Expand Up @@ -937,7 +940,7 @@
def _process_event_email(
self,
schd: 'Scheduler',
ctx,
ctx: TaskEventMailContext,
id_keys: List[EventKey],
) -> None:
"""Process event notification, by email."""
Expand All @@ -957,12 +960,6 @@
# n events from n tasks
subject = "[%d task events] %s" % (
len(id_keys), schd.workflow)
cmd = ["mail", "-s", subject]

# From: and To:
cmd.append("-r")
cmd.append(ctx.mail_from)
cmd.append(ctx.mail_to)

# STDIN for mail, tasks
stdin_str = ""
Expand Down Expand Up @@ -990,16 +987,19 @@
id_keys[-1].message,
),
)
self._send_mail(ctx, cmd, stdin_str, id_keys, schd)
self._send_mail(ctx, subject, stdin_str, id_keys, schd)

def _send_mail(
self,
ctx,
cmd,
stdin_str,
ctx: TaskEventMailContext,
subject: str,
stdin_str: str,
id_keys: List[EventKey],
schd: 'Scheduler',
) -> None:
cmd = construct_mail_cmd(

Check warning on line 1000 in cylc/flow/task_events_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_events_mgr.py#L1000

Added line #L1000 was not covered by tests
subject, from_address=ctx.mail_from, to_address=ctx.mail_to
)
# SMTP server
env = dict(os.environ)
if self.mail_smtp:
Expand Down Expand Up @@ -1037,15 +1037,18 @@
except KeyError as exc:
LOG.exception(exc)

def _get_events_conf(self, itask, key, default=None):
def _get_events_conf(
self, itask: 'TaskProxy', key: str, default: Any = None
) -> Any:
"""Return an events setting from workflow then global configuration."""
for getter in [
self.broadcast_mgr.get_broadcast(itask.tokens).get("events"),
itask.tdef.rtconfig["mail"],
itask.tdef.rtconfig["events"],
glbl_cfg().get(["scheduler", "mail"]),
glbl_cfg().get()["task events"],
]:
for getter in (
self.broadcast_mgr.get_broadcast(itask.tokens).get("events"),
itask.tdef.rtconfig["mail"],
itask.tdef.rtconfig["events"],
self.workflow_cfg.get("scheduler", {}).get("mail", {}),
glbl_cfg().get(["scheduler", "mail"]),
glbl_cfg().get()["task events"],
):
try:
value = getter.get(key)
except (AttributeError, ItemNotFoundError, KeyError):
Expand All @@ -1058,7 +1061,7 @@
def _process_job_logs_retrieval(
self,
schd: 'Scheduler',
ctx,
ctx: TaskJobLogsRetrieveContext,
id_keys: List[EventKey],
) -> None:
"""Process retrieval of task job logs from remote user@host."""
Expand Down Expand Up @@ -1521,10 +1524,11 @@
id_key,
TaskActionTimer(
TaskJobLogsRetrieveContext(
self.HANDLER_JOB_LOGS_RETRIEVE, # key
self.HANDLER_JOB_LOGS_RETRIEVE, # ctx_type
itask.platform['name'],
self._get_remote_conf(itask, "retrieve job logs max size"),
key=self.HANDLER_JOB_LOGS_RETRIEVE,
platform_name=itask.platform['name'],
max_size=self._get_remote_conf(
itask, "retrieve job logs max size"
),
),
retry_delays
)
Expand Down Expand Up @@ -1555,14 +1559,11 @@
id_key,
TaskActionTimer(
TaskEventMailContext(
self.HANDLER_MAIL, # key
self.HANDLER_MAIL, # ctx_type
self._get_events_conf( # mail_from
itask,
"from",
"notifications@" + get_host(),
key=self.HANDLER_MAIL,
mail_from=self._get_events_conf(
itask, "from", f"notifications@{get_host()}"
),
self._get_events_conf(itask, "to", get_user()) # mail_to
mail_to=self._get_events_conf(itask, "to", get_user())
)
)
)
Expand Down Expand Up @@ -1632,11 +1633,7 @@
self.add_event_timer(
id_key,
TaskActionTimer(
CustomTaskEventHandlerContext(
key1,
self.HANDLER_CUSTOM,
cmd,
),
CustomTaskEventHandlerContext(key=key1, cmd=cmd),
retry_delays
)
)
Expand Down Expand Up @@ -1865,12 +1862,7 @@
self.event_timers_updated = True

def unset_waiting_event_timer(self, id_key: EventKey) -> None:
"""Invoke unset_waiting on an event timer.

Args:
key (str)

"""
"""Invoke unset_waiting on an event timer."""
self._event_timers[id_key].unset_waiting()
self.event_timers_updated = True

Expand Down
24 changes: 17 additions & 7 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
Dict,
Iterable,
List,
NamedTuple,
Optional,
Set,
TYPE_CHECKING,
Tuple,
Type,
Union,
)
import logging
Expand Down Expand Up @@ -592,7 +594,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):
self.compute_runahead()
self.release_runahead_tasks()

def load_db_task_action_timers(self, row_idx, row) -> None:
def load_db_task_action_timers(self, row_idx: int, row: Iterable) -> None:
"""Load a task action timer, e.g. event handlers, retry states."""
if row_idx == 0:
LOG.info("LOADING task action timers")
Expand All @@ -607,14 +609,22 @@ def load_db_task_action_timers(self, row_idx, row) -> None:
# Extract type namedtuple variables from JSON strings
ctx_key = json.loads(str(ctx_key_raw))
ctx_data = json.loads(str(ctx_raw))
for known_cls in [
CustomTaskEventHandlerContext,
TaskEventMailContext,
TaskJobLogsRetrieveContext]:
known_cls: Type[NamedTuple]
for known_cls in (
CustomTaskEventHandlerContext,
TaskEventMailContext,
TaskJobLogsRetrieveContext
):
if ctx_data and ctx_data[0] == known_cls.__name__:
ctx = known_cls(*ctx_data[1])
ctx_args: list = ctx_data[1]
if len(ctx_args) > len(known_cls._fields):
# BACK COMPAT: no-longer used ctx_type arg
# from: Cylc 7
# to: 8.3.0
ctx_args.pop(1)
ctx: tuple = known_cls(*ctx_args)
break
else:
else: # no break
ctx = ctx_data
if ctx is not None:
ctx = tuple(ctx)
Expand Down
Loading
Loading