diff --git a/cylc/flow/job_runner_mgr.py b/cylc/flow/job_runner_mgr.py index 6aa6403c98..088a52fdaf 100644 --- a/cylc/flow/job_runner_mgr.py +++ b/cylc/flow/job_runner_mgr.py @@ -24,6 +24,10 @@ """ from contextlib import suppress +from dataclasses import ( + asdict, + dataclass, +) import json import os from pathlib import Path @@ -34,10 +38,12 @@ from subprocess import DEVNULL # nosec import sys import traceback -from typing import Optional +from typing import ( + List, + Optional, +) from cylc.flow.cylc_subproc import procopen -from cylc.flow.parsec.OrderedDict import OrderedDict from cylc.flow.task_job_logs import ( JOB_LOG_ERR, JOB_LOG_JOB, @@ -55,59 +61,36 @@ from cylc.flow.wallclock import get_current_time_string -class JobPollContext(): +@dataclass +class JobPollContext: """Context object for a job poll.""" - CONTEXT_ATTRIBUTES = ( - 'job_log_dir', # cycle/task/submit_num - 'job_runner_name', - 'job_id', # job id in job runner - 'job_runner_exit_polled', # 0 for false, 1 for true - 'run_status', # 0 for success, 1 for failure - 'run_signal', # signal received on run failure - 'time_submit_exit', # submit (exit) time - 'time_run', # run start time - 'time_run_exit', # run exit time - 'job_runner_call_no_lines', # line count in job runner call stdout - ) - - __slots__ = CONTEXT_ATTRIBUTES + ( - 'pid', - 'messages' - ) - - def __init__(self, job_log_dir, **attrs): - self.job_log_dir = job_log_dir - self.job_runner_name: Optional[str] = None - self.job_id = None - self.job_runner_exit_polled: Optional[int] = None + job_log_dir: str # cycle/task/submit_num + job_runner_name: Optional[str] = None + job_id = None # job id in job runner + job_runner_exit_polled: Optional[int] = None # 0 for false, 1 for true + run_status: Optional[int] = None # 0 for success, 1 for failure + run_signal: Optional[str] = None # signal received on run failure + time_submit_exit: Optional[str] = None # submit (exit) time + time_run: Optional[str] = None # run start time + time_run_exit: Optional[str] = None # run exit time + job_runner_call_no_lines = None # line count in job runner call stdout + + def __post_init__(self): self.pid = None - self.run_status: Optional[int] = None - self.run_signal: Optional[str] = None - self.time_submit_exit: Optional[str] = None - self.time_run: Optional[str] = None - self.time_run_exit: Optional[str] = None - self.job_runner_call_no_lines = None - self.messages = [] - - if attrs: - for key, value in attrs.items(): - if key not in self.CONTEXT_ATTRIBUTES: - raise ValueError('Invalid kwarg "%s"' % key) - setattr(self, key, value) - - def update(self, other): + self.messages: List[str] = [] + + def update(self, other: 'JobPollContext') -> None: """Update my data from given file context.""" - for i in self.__slots__: + for i in self.__dict__: setattr(self, i, getattr(other, i)) - def get_summary_str(self): + def get_summary_str(self) -> str: """Return the poll context as a summary string delimited by "|".""" - ret = OrderedDict() - for key in self.CONTEXT_ATTRIBUTES: - value = getattr(self, key) - if key == 'job_log_dir' or value is None: - continue - ret[key] = value + ret = { + key: value + for key, value in asdict(self).items() + if key != 'job_log_dir' and value is not None + } return '%s|%s' % (self.job_log_dir, json.dumps(ret)) @@ -235,7 +218,7 @@ def jobs_poll(self, job_log_root, job_log_dirs): job_log_root = os.path.expandvars(job_log_root) self.configure_workflow_run_dir(job_log_root.rsplit(os.sep, 2)[0]) - ctx_list = [] # Contexts for all relevant jobs + ctx_list: List[JobPollContext] = [] # Contexts for all relevant jobs ctx_list_by_job_runner = {} # {job_runner_name1: [ctx1, ...], ...} for job_log_dir in job_log_dirs: