Skip to content

Commit

Permalink
Refactor JobPollContext using dataclass
Browse files Browse the repository at this point in the history
Training day exercise
  • Loading branch information
MetRonnie committed Feb 11, 2025
1 parent 915aaef commit 57af3a1
Showing 1 changed file with 33 additions and 50 deletions.
83 changes: 33 additions & 50 deletions cylc/flow/job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
"""

from contextlib import suppress
from dataclasses import (
asdict,
dataclass,
)
import json
import os
from pathlib import Path
Expand All @@ -34,10 +38,12 @@
from subprocess import DEVNULL # nosec
import sys
import traceback
from typing import Optional
from typing import (
List,
Optional,
)

from cylc.flow.cylc_subproc import procopen
from cylc.flow.parsec.OrderedDict import OrderedDict
from cylc.flow.task_job_logs import (
JOB_LOG_ERR,
JOB_LOG_JOB,
Expand All @@ -55,59 +61,36 @@
from cylc.flow.wallclock import get_current_time_string


class JobPollContext():
@dataclass
class JobPollContext:
"""Context object for a job poll."""
CONTEXT_ATTRIBUTES = (
'job_log_dir', # cycle/task/submit_num
'job_runner_name',
'job_id', # job id in job runner
'job_runner_exit_polled', # 0 for false, 1 for true
'run_status', # 0 for success, 1 for failure
'run_signal', # signal received on run failure
'time_submit_exit', # submit (exit) time
'time_run', # run start time
'time_run_exit', # run exit time
'job_runner_call_no_lines', # line count in job runner call stdout
)

__slots__ = CONTEXT_ATTRIBUTES + (
'pid',
'messages'
)

def __init__(self, job_log_dir, **attrs):
self.job_log_dir = job_log_dir
self.job_runner_name: Optional[str] = None
self.job_id = None
self.job_runner_exit_polled: Optional[int] = None
job_log_dir: str # cycle/task/submit_num
job_runner_name: Optional[str] = None
job_id = None # job id in job runner
job_runner_exit_polled: Optional[int] = None # 0 for false, 1 for true
run_status: Optional[int] = None # 0 for success, 1 for failure
run_signal: Optional[str] = None # signal received on run failure
time_submit_exit: Optional[str] = None # submit (exit) time
time_run: Optional[str] = None # run start time
time_run_exit: Optional[str] = None # run exit time
job_runner_call_no_lines = None # line count in job runner call stdout

def __post_init__(self):
self.pid = None
self.run_status: Optional[int] = None
self.run_signal: Optional[str] = None
self.time_submit_exit: Optional[str] = None
self.time_run: Optional[str] = None
self.time_run_exit: Optional[str] = None
self.job_runner_call_no_lines = None
self.messages = []

if attrs:
for key, value in attrs.items():
if key not in self.CONTEXT_ATTRIBUTES:
raise ValueError('Invalid kwarg "%s"' % key)
setattr(self, key, value)

def update(self, other):
self.messages: List[str] = []

def update(self, other: 'JobPollContext') -> None:
"""Update my data from given file context."""
for i in self.__slots__:
for i in self.__dict__:
setattr(self, i, getattr(other, i))

def get_summary_str(self):
def get_summary_str(self) -> str:
"""Return the poll context as a summary string delimited by "|"."""
ret = OrderedDict()
for key in self.CONTEXT_ATTRIBUTES:
value = getattr(self, key)
if key == 'job_log_dir' or value is None:
continue
ret[key] = value
ret = {
key: value
for key, value in asdict(self).items()
if key != 'job_log_dir' and value is not None
}
return '%s|%s' % (self.job_log_dir, json.dumps(ret))


Expand Down Expand Up @@ -235,7 +218,7 @@ def jobs_poll(self, job_log_root, job_log_dirs):
job_log_root = os.path.expandvars(job_log_root)
self.configure_workflow_run_dir(job_log_root.rsplit(os.sep, 2)[0])

ctx_list = [] # Contexts for all relevant jobs
ctx_list: List[JobPollContext] = [] # Contexts for all relevant jobs
ctx_list_by_job_runner = {} # {job_runner_name1: [ctx1, ...], ...}

for job_log_dir in job_log_dirs:
Expand Down

0 comments on commit 57af3a1

Please sign in to comment.