From b420e9ac126cac6b2a2cc98135a68a0a47a67174 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Tue, 4 Feb 2025 11:46:10 +0100 Subject: [PATCH] Refactor counting cpu-seconds for processtree --- .../forward_model_step.py | 34 ++++++++++++------- .../test_forward_model_step.py | 21 ++++++++++++ 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/src/_ert/forward_model_runner/forward_model_step.py b/src/_ert/forward_model_runner/forward_model_step.py index f75e77a3a2e..4f3b3d2aa74 100644 --- a/src/_ert/forward_model_runner/forward_model_step.py +++ b/src/_ert/forward_model_runner/forward_model_step.py @@ -10,6 +10,7 @@ import time import uuid from collections.abc import Generator, Sequence +from dataclasses import dataclass, field from datetime import datetime as dt from pathlib import Path from subprocess import Popen, run @@ -188,7 +189,7 @@ def _run(self) -> Generator[Start | Exited | Running]: max_memory_usage = 0 fm_step_pids = {int(process.pid)} - cpu_seconds_pr_pid: dict[str, float] = {} + cpu_seconds_processtree: ProcesstreeTimer = ProcesstreeTimer() while True: try: exit_code = process.wait(timeout=self.MEMORY_POLL_PERIOD) @@ -207,16 +208,7 @@ def _run(self) -> Generator[Start | Exited | Running]: (memory_rss, cpu_seconds_snapshot, oom_score, pids) = _get_processtree_data( process ) - for pid, seconds in cpu_seconds_snapshot.items(): - if cpu_seconds_pr_pid.get(str(pid), 0.0) <= seconds: - cpu_seconds_pr_pid[str(pid)] = seconds - else: - # cpu_seconds must be monotonely increasing. Since - # decreasing cpu_seconds was detected, it must be due to pid reuse - cpu_seconds_pr_pid[str(pid) + str(uuid.uuid4())] = ( - cpu_seconds_pr_pid[str(pid)] - ) - cpu_seconds_pr_pid[str(pid)] = seconds + cpu_seconds_processtree.update(cpu_seconds_snapshot) fm_step_pids |= pids max_memory_usage = max(memory_rss, max_memory_usage) yield Running( @@ -226,7 +218,7 @@ def _run(self) -> Generator[Start | Exited | Running]: max_rss=max_memory_usage, fm_step_id=self.index, fm_step_name=self.job_data.get("name"), - cpu_seconds=sum(cpu_seconds_pr_pid.values()), + cpu_seconds=cpu_seconds_processtree.total_cpu_seconds(), oom_score=oom_score, ), ) @@ -427,6 +419,24 @@ def ensure_file_handles_closed(file_handles: Sequence[io.TextIOWrapper | None]) file_handle.close() +@dataclass +class ProcesstreeTimer: + _cpu_seconds_pr_pid: dict[str, float] = field(default_factory=dict, init=False) + + def update(self, cpu_seconds_snapshot: dict[str, float]) -> None: + for pid, seconds in cpu_seconds_snapshot.items(): + if self._cpu_seconds_pr_pid.get(pid, 0.0) > seconds: + # cpu_seconds for a process must increase monotonically. + # Since decreasing cpu_seconds was detected, it must be due to pid reuse + self._cpu_seconds_pr_pid[pid + "-" + str(uuid.uuid4())] = ( + self._cpu_seconds_pr_pid[pid] + ) + self._cpu_seconds_pr_pid[pid] = seconds + + def total_cpu_seconds(self) -> float: + return sum(self._cpu_seconds_pr_pid.values()) + + def _get_processtree_data( process: Process, ) -> tuple[int, dict[str, float], int | None, set[int]]: diff --git a/tests/ert/unit_tests/forward_model_runner/test_forward_model_step.py b/tests/ert/unit_tests/forward_model_runner/test_forward_model_step.py index f1f2dd5c29a..613b6718f64 100644 --- a/tests/ert/unit_tests/forward_model_runner/test_forward_model_step.py +++ b/tests/ert/unit_tests/forward_model_runner/test_forward_model_step.py @@ -11,6 +11,7 @@ from _ert.forward_model_runner.forward_model_step import ( ForwardModelStep, + ProcesstreeTimer, _get_processtree_data, ) from _ert.forward_model_runner.reporting.message import Exited, Running, Start @@ -319,3 +320,23 @@ def test_makedirs(monkeypatch, tmp_path): pass assert (tmp_path / "a/file").is_file() assert (tmp_path / "b/c/file").is_file() + + +@pytest.mark.parametrize( + "snapshots, expected_total_seconds", + [ + ([{}], 0.0), + ([{"1": 1.1}], 1.1), + ([{"1": 1.1}, {"1": 2.1}], 2.1), + ([{"1": 1.1}, {"1": 1.1}], 1.1), + ([{"1": 1.1}, {"2": 3.1}], 1.1 + 3.1), + ([{"1": 1.1}, {"1": 0.2}], 1.1 + 0.2), # pid reuse + ], +) +def test_processtree_timer( + snapshots: list[dict[str, float]], expected_total_seconds: float +): + timer = ProcesstreeTimer() + for snapshot in snapshots: + timer.update(snapshot) + assert timer.total_cpu_seconds() == expected_total_seconds