From a3bd4b3f6bd184ea65fdd066ea28292fd8f146a1 Mon Sep 17 00:00:00 2001 From: belthlemar Date: Fri, 17 May 2024 17:16:14 +0200 Subject: [PATCH] feat(launcher): support solver option -z --- .../adapters/slurm_launcher/slurm_launcher.py | 9 +++--- antarest/launcher/service.py | 29 +++++++++---------- antarest/study/storage/utils.py | 19 +++++++----- requirements.txt | 2 +- tests/launcher/test_service.py | 16 +++++----- tests/launcher/test_slurm_launcher.py | 19 ++++++++++-- 6 files changed, 54 insertions(+), 40 deletions(-) diff --git a/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py b/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py index 6aba6f21b3..b871f36446 100644 --- a/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py +++ b/antarest/launcher/adapters/slurm_launcher/slurm_launcher.py @@ -28,6 +28,7 @@ from antarest.launcher.model import JobStatus, LauncherParametersDTO, LogType, XpansionParametersDTO from antarest.study.storage.rawstudy.ini_reader import IniReader from antarest.study.storage.rawstudy.ini_writer import IniWriter +from antarest.study.storage.utils import retrieve_output_path logger = logging.getLogger(__name__) logging.getLogger("paramiko").setLevel("WARN") @@ -209,7 +210,7 @@ def _import_study_output( xpansion_mode: Optional[str] = None, log_dir: Optional[str] = None, ) -> Optional[str]: - if xpansion_mode is not None: + if xpansion_mode: self._import_xpansion_result(job_id, xpansion_mode) launcher_logs: Dict[str, List[Path]] = {} @@ -235,14 +236,12 @@ def _import_study_output( # `antarest.launcher.service.LauncherService._import_output` return self.callbacks.import_output( job_id, - self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id / "output", + self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id, launcher_logs, ) def _import_xpansion_result(self, job_id: str, xpansion_mode: str) -> None: - output_path = self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id / "output" - if output_path.exists() and len(os.listdir(output_path)) == 1: - output_path = output_path / os.listdir(output_path)[0] + if output_path := retrieve_output_path(self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id): if output_path.name.endswith(".zip"): logger.info("Unzipping zipped output for xpansion result storage") unzipped_output_path = ( diff --git a/antarest/launcher/service.py b/antarest/launcher/service.py index d3a439e17c..9ae3f0028e 100644 --- a/antarest/launcher/service.py +++ b/antarest/launcher/service.py @@ -2,6 +2,7 @@ import logging import os import shutil +import zipfile from datetime import datetime, timedelta from http import HTTPStatus from pathlib import Path @@ -42,7 +43,7 @@ from antarest.launcher.ssh_config import SSHConfigDTO from antarest.study.repository import AccessPermissions, StudyFilter from antarest.study.service import StudyService -from antarest.study.storage.utils import assert_permission, extract_output_name, find_single_output_path +from antarest.study.storage.utils import assert_permission, extract_output_name, retrieve_output_path logger = logging.getLogger(__name__) @@ -510,9 +511,9 @@ def _import_output( study_id = job_result.study_id job_launch_params = LauncherParametersDTO.from_launcher_params(job_result.launcher_params) - # this now can be a zip file instead of a directory ! - output_true_path = find_single_output_path(output_path) - output_is_zipped = is_zip(output_true_path) + # this now can be a zip file instead of a directory! + output_true_path = retrieve_output_path(output_path) + output_is_zipped = output_true_path.suffix.lower() == ".zip" output_suffix = cast( Optional[str], getattr( @@ -535,6 +536,12 @@ def _import_output( log_paths, output_true_path / log_name, ) + if additional_logs and output_is_zipped: + with zipfile.ZipFile(output_true_path, "a") as zf: + for log_paths in additional_logs.values(): + for path in log_paths: + dest_name = path.name[: path.name.rfind("-")] + ".log" + zf.write(filename=path, arcname=dest_name) if study_id: zip_path: Optional[Path] = None @@ -548,18 +555,6 @@ def _import_output( final_output_path = zip_path or output_true_path with db(): try: - if additional_logs and output_is_zipped: - for log_name, log_paths in additional_logs.items(): - log_type = LogType.from_filename(log_name) - log_suffix = log_name - if log_type: - log_suffix = log_type.to_suffix() - self.study_service.save_logs( - study_id, - job_id, - log_suffix, - concat_files_to_str(log_paths), - ) return self.study_service.import_output( study_id, final_output_path, @@ -576,6 +571,8 @@ def _import_output( finally: if zip_path: os.unlink(zip_path) + if output_is_zipped: + os.unlink(output_true_path) raise JobNotFound() def _download_fallback_output(self, job_id: str, params: RequestParameters) -> FileDownloadTaskDTO: diff --git a/antarest/study/storage/utils.py b/antarest/study/storage/utils.py index 5dc97b081b..62a8491c78 100644 --- a/antarest/study/storage/utils.py +++ b/antarest/study/storage/utils.py @@ -93,13 +93,18 @@ def fix_study_root(study_path: Path) -> None: shutil.rmtree(sub_root_path) -def find_single_output_path(all_output_path: Path) -> Path: - children = os.listdir(all_output_path) - if len(children) == 1: - if children[0].endswith(".zip"): - return all_output_path / children[0] - return find_single_output_path(all_output_path / children[0]) - return all_output_path +def retrieve_output_path(job_path: Path) -> Path: + output_already_zipped_path = job_path.with_suffix(".zip") + if output_already_zipped_path.exists(): + return output_already_zipped_path + + output_inside_study = job_path / "output" + if output_inside_study.is_dir(): + output_folders = os.listdir(output_inside_study) + if len(output_folders) == 1: + return output_inside_study / output_folders[0] + + return Path() def extract_output_name(path_output: Path, new_suffix_name: t.Optional[str] = None) -> str: diff --git a/requirements.txt b/requirements.txt index 835af45e10..3723ca1313 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -Antares-Launcher~=1.3.2 +Antares-Launcher~=1.4.0 alembic~=1.7.5 asgi-ratelimit[redis]==0.7.0 diff --git a/tests/launcher/test_service.py b/tests/launcher/test_service.py index 9095673070..e53b2283fc 100644 --- a/tests/launcher/test_service.py +++ b/tests/launcher/test_service.py @@ -730,17 +730,15 @@ def test_manage_output(self, tmp_path: Path) -> None: ) output_path = tmp_path / "output" - zipped_output_path = tmp_path / "zipped_output" os.mkdir(output_path) - os.mkdir(zipped_output_path) new_output_path = output_path / "new_output" os.mkdir(new_output_path) (new_output_path / "log").touch() (new_output_path / "data").touch() additional_log = tmp_path / "output.log" additional_log.write_text("some log") - new_output_zipped_path = zipped_output_path / "test.zip" - with ZipFile(new_output_zipped_path, "w", ZIP_DEFLATED) as output_data: + zipped_path = tmp_path / "test.zip" + with ZipFile(zipped_path, "w", ZIP_DEFLATED) as output_data: output_data.writestr("some output", "0\n1") job_id = "job_id" zipped_job_id = "zipped_job_id" @@ -766,9 +764,9 @@ def test_manage_output(self, tmp_path: Path) -> None: ), ] with pytest.raises(JobNotFound): - launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]}) + launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]}) - launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]}) + launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]}) assert not launcher_service._get_job_output_fallback_path(job_id).exists() launcher_service.study_service.import_output.assert_called() @@ -777,7 +775,7 @@ def test_manage_output(self, tmp_path: Path) -> None: launcher_service._import_output( zipped_job_id, - zipped_output_path, + zipped_path, { "out.log": [additional_log], "antares-out": [additional_log], @@ -797,10 +795,10 @@ def test_manage_output(self, tmp_path: Path) -> None: StudyNotFoundError(""), ] - assert launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]}) is None + assert launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]}) is None (new_output_path / "info.antares-output").write_text(f"[general]\nmode=eco\nname=foo\ntimestamp={time.time()}") - output_name = launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]}) + output_name = launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]}) assert output_name is not None assert output_name.endswith("-hello") assert launcher_service._get_job_output_fallback_path(job_id).exists() diff --git a/tests/launcher/test_slurm_launcher.py b/tests/launcher/test_slurm_launcher.py index e1c69f63d4..5eb92abdac 100644 --- a/tests/launcher/test_slurm_launcher.py +++ b/tests/launcher/test_slurm_launcher.py @@ -2,6 +2,7 @@ import shutil import textwrap import uuid +import zipfile from argparse import Namespace from pathlib import Path from unittest.mock import ANY, Mock, patch @@ -386,7 +387,7 @@ def test_import_study_output(launcher_config, tmp_path) -> None: slurm_launcher.callbacks.import_output.assert_called_once_with( "1", - launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1" / "output", + launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1", {}, ) assert res == "output" @@ -413,6 +414,20 @@ def test_import_study_output(launcher_config, tmp_path) -> None: assert (output_dir / "results" / "something_else").exists() assert (output_dir / "results" / "something_else").read_text() == "world" + # asserts that a xpansion output zipped can be imported + xpansion_zip_dir = launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "2" + xpansion_zip_dir.mkdir(parents=True) + (xpansion_zip_dir / "input" / "links").mkdir(parents=True) + xpansion_out_put_dir = xpansion_zip_dir / "output" + xpansion_out_put_dir.mkdir(parents=True) + xpansion_output_file = xpansion_out_put_dir / "xpansion.zip" + with zipfile.ZipFile(xpansion_output_file, "w") as zipf: + zipf.write(xpansion_dir / "something_else", "some_file.txt") + slurm_launcher._import_study_output("2", "cpp") + assert ( + launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "2" / "output" / xpansion_output_file.name[:-4] + ).exists() + log_dir = tmp_path / "logs" log_dir.mkdir() log_info = log_dir / "antares-out-xxxx.txt" @@ -423,7 +438,7 @@ def test_import_study_output(launcher_config, tmp_path) -> None: slurm_launcher._import_study_output("1", None, str(log_dir)) slurm_launcher.callbacks.import_output.assert_called_once_with( "1", - launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1" / "output", + launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1", { "antares-out.log": [log_info], "antares-err.log": [log_error],