Skip to content

Commit

Permalink
feat(launcher): support solver option -z
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinBelthle committed May 17, 2024
1 parent 428961e commit a3bd4b3
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 40 deletions.
9 changes: 4 additions & 5 deletions antarest/launcher/adapters/slurm_launcher/slurm_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from antarest.launcher.model import JobStatus, LauncherParametersDTO, LogType, XpansionParametersDTO
from antarest.study.storage.rawstudy.ini_reader import IniReader
from antarest.study.storage.rawstudy.ini_writer import IniWriter
from antarest.study.storage.utils import retrieve_output_path

logger = logging.getLogger(__name__)
logging.getLogger("paramiko").setLevel("WARN")
Expand Down Expand Up @@ -209,7 +210,7 @@ def _import_study_output(
xpansion_mode: Optional[str] = None,
log_dir: Optional[str] = None,
) -> Optional[str]:
if xpansion_mode is not None:
if xpansion_mode:
self._import_xpansion_result(job_id, xpansion_mode)

launcher_logs: Dict[str, List[Path]] = {}
Expand All @@ -235,14 +236,12 @@ def _import_study_output(
# `antarest.launcher.service.LauncherService._import_output`
return self.callbacks.import_output(
job_id,
self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id / "output",
self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id,
launcher_logs,
)

def _import_xpansion_result(self, job_id: str, xpansion_mode: str) -> None:
output_path = self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id / "output"
if output_path.exists() and len(os.listdir(output_path)) == 1:
output_path = output_path / os.listdir(output_path)[0]
if output_path := retrieve_output_path(self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id):
if output_path.name.endswith(".zip"):
logger.info("Unzipping zipped output for xpansion result storage")
unzipped_output_path = (
Expand Down
29 changes: 13 additions & 16 deletions antarest/launcher/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import shutil
import zipfile
from datetime import datetime, timedelta
from http import HTTPStatus
from pathlib import Path
Expand Down Expand Up @@ -42,7 +43,7 @@
from antarest.launcher.ssh_config import SSHConfigDTO
from antarest.study.repository import AccessPermissions, StudyFilter
from antarest.study.service import StudyService
from antarest.study.storage.utils import assert_permission, extract_output_name, find_single_output_path
from antarest.study.storage.utils import assert_permission, extract_output_name, retrieve_output_path

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -510,9 +511,9 @@ def _import_output(
study_id = job_result.study_id
job_launch_params = LauncherParametersDTO.from_launcher_params(job_result.launcher_params)

# this now can be a zip file instead of a directory !
output_true_path = find_single_output_path(output_path)
output_is_zipped = is_zip(output_true_path)
# this now can be a zip file instead of a directory!
output_true_path = retrieve_output_path(output_path)
output_is_zipped = output_true_path.suffix.lower() == ".zip"
output_suffix = cast(
Optional[str],
getattr(
Expand All @@ -535,6 +536,12 @@ def _import_output(
log_paths,
output_true_path / log_name,
)
if additional_logs and output_is_zipped:
with zipfile.ZipFile(output_true_path, "a") as zf:
for log_paths in additional_logs.values():
for path in log_paths:
dest_name = path.name[: path.name.rfind("-")] + ".log"
zf.write(filename=path, arcname=dest_name)

if study_id:
zip_path: Optional[Path] = None
Expand All @@ -548,18 +555,6 @@ def _import_output(
final_output_path = zip_path or output_true_path
with db():
try:
if additional_logs and output_is_zipped:
for log_name, log_paths in additional_logs.items():
log_type = LogType.from_filename(log_name)
log_suffix = log_name
if log_type:
log_suffix = log_type.to_suffix()
self.study_service.save_logs(
study_id,
job_id,
log_suffix,
concat_files_to_str(log_paths),
)
return self.study_service.import_output(
study_id,
final_output_path,
Expand All @@ -576,6 +571,8 @@ def _import_output(
finally:
if zip_path:
os.unlink(zip_path)
if output_is_zipped:
os.unlink(output_true_path)
raise JobNotFound()

def _download_fallback_output(self, job_id: str, params: RequestParameters) -> FileDownloadTaskDTO:
Expand Down
19 changes: 12 additions & 7 deletions antarest/study/storage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,18 @@ def fix_study_root(study_path: Path) -> None:
shutil.rmtree(sub_root_path)


def find_single_output_path(all_output_path: Path) -> Path:
children = os.listdir(all_output_path)
if len(children) == 1:
if children[0].endswith(".zip"):
return all_output_path / children[0]
return find_single_output_path(all_output_path / children[0])
return all_output_path
def retrieve_output_path(job_path: Path) -> Path:
output_already_zipped_path = job_path.with_suffix(".zip")
if output_already_zipped_path.exists():
return output_already_zipped_path

output_inside_study = job_path / "output"
if output_inside_study.is_dir():
output_folders = os.listdir(output_inside_study)
if len(output_folders) == 1:
return output_inside_study / output_folders[0]

return Path()


def extract_output_name(path_output: Path, new_suffix_name: t.Optional[str] = None) -> str:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Antares-Launcher~=1.3.2
Antares-Launcher~=1.4.0

alembic~=1.7.5
asgi-ratelimit[redis]==0.7.0
Expand Down
16 changes: 7 additions & 9 deletions tests/launcher/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,17 +730,15 @@ def test_manage_output(self, tmp_path: Path) -> None:
)

output_path = tmp_path / "output"
zipped_output_path = tmp_path / "zipped_output"
os.mkdir(output_path)
os.mkdir(zipped_output_path)
new_output_path = output_path / "new_output"
os.mkdir(new_output_path)
(new_output_path / "log").touch()
(new_output_path / "data").touch()
additional_log = tmp_path / "output.log"
additional_log.write_text("some log")
new_output_zipped_path = zipped_output_path / "test.zip"
with ZipFile(new_output_zipped_path, "w", ZIP_DEFLATED) as output_data:
zipped_path = tmp_path / "test.zip"
with ZipFile(zipped_path, "w", ZIP_DEFLATED) as output_data:
output_data.writestr("some output", "0\n1")
job_id = "job_id"
zipped_job_id = "zipped_job_id"
Expand All @@ -766,9 +764,9 @@ def test_manage_output(self, tmp_path: Path) -> None:
),
]
with pytest.raises(JobNotFound):
launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]})
launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]})

launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]})
launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]})
assert not launcher_service._get_job_output_fallback_path(job_id).exists()
launcher_service.study_service.import_output.assert_called()

Expand All @@ -777,7 +775,7 @@ def test_manage_output(self, tmp_path: Path) -> None:

launcher_service._import_output(
zipped_job_id,
zipped_output_path,
zipped_path,
{
"out.log": [additional_log],
"antares-out": [additional_log],
Expand All @@ -797,10 +795,10 @@ def test_manage_output(self, tmp_path: Path) -> None:
StudyNotFoundError(""),
]

assert launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]}) is None
assert launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]}) is None

(new_output_path / "info.antares-output").write_text(f"[general]\nmode=eco\nname=foo\ntimestamp={time.time()}")
output_name = launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]})
output_name = launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]})
assert output_name is not None
assert output_name.endswith("-hello")
assert launcher_service._get_job_output_fallback_path(job_id).exists()
Expand Down
19 changes: 17 additions & 2 deletions tests/launcher/test_slurm_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import shutil
import textwrap
import uuid
import zipfile
from argparse import Namespace
from pathlib import Path
from unittest.mock import ANY, Mock, patch
Expand Down Expand Up @@ -386,7 +387,7 @@ def test_import_study_output(launcher_config, tmp_path) -> None:

slurm_launcher.callbacks.import_output.assert_called_once_with(
"1",
launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1" / "output",
launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1",
{},
)
assert res == "output"
Expand All @@ -413,6 +414,20 @@ def test_import_study_output(launcher_config, tmp_path) -> None:
assert (output_dir / "results" / "something_else").exists()
assert (output_dir / "results" / "something_else").read_text() == "world"

# asserts that a xpansion output zipped can be imported
xpansion_zip_dir = launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "2"
xpansion_zip_dir.mkdir(parents=True)
(xpansion_zip_dir / "input" / "links").mkdir(parents=True)
xpansion_out_put_dir = xpansion_zip_dir / "output"
xpansion_out_put_dir.mkdir(parents=True)
xpansion_output_file = xpansion_out_put_dir / "xpansion.zip"
with zipfile.ZipFile(xpansion_output_file, "w") as zipf:
zipf.write(xpansion_dir / "something_else", "some_file.txt")
slurm_launcher._import_study_output("2", "cpp")
assert (
launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "2" / "output" / xpansion_output_file.name[:-4]
).exists()

log_dir = tmp_path / "logs"
log_dir.mkdir()
log_info = log_dir / "antares-out-xxxx.txt"
Expand All @@ -423,7 +438,7 @@ def test_import_study_output(launcher_config, tmp_path) -> None:
slurm_launcher._import_study_output("1", None, str(log_dir))
slurm_launcher.callbacks.import_output.assert_called_once_with(
"1",
launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1" / "output",
launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1",
{
"antares-out.log": [log_info],
"antares-err.log": [log_error],
Expand Down

0 comments on commit a3bd4b3

Please sign in to comment.