Skip to content

Commit

Permalink
Merge pull request #983 from AntaresSimulatorTeam/dev
Browse files Browse the repository at this point in the history
v2.6.0
  • Loading branch information
pl-buiquang authored Jul 27, 2022
2 parents 34779e6 + 028b611 commit 29c5d9c
Show file tree
Hide file tree
Showing 132 changed files with 4,743 additions and 1,286 deletions.
2 changes: 1 addition & 1 deletion antarest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.5.1"
__version__ = "2.6.0"

from pathlib import Path

Expand Down
2 changes: 2 additions & 0 deletions antarest/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class SlurmConfig:
default_n_cpu: int = 1
default_json_db_name: str = ""
slurm_script_path: str = ""
max_cores: int = 64
antares_versions_on_remote_server: List[str] = field(
default_factory=lambda: []
)
Expand All @@ -185,6 +186,7 @@ def from_dict(data: JSON) -> "SlurmConfig":
antares_versions_on_remote_server=data[
"antares_versions_on_remote_server"
],
max_cores=data.get("max_cores", 64),
)


Expand Down
6 changes: 3 additions & 3 deletions antarest/core/logging/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ def filter(self, record: logging.LogRecord) -> bool:
request: Optional[Request] = _request.get()
request_id: Optional[str] = _request_id.get()
if request is not None:
record.ip = request.scope.get("client", ("undefined"))[0] # type: ignore
record.trace_id = request_id # type: ignore
record.pid = os.getpid() # type: ignore
record.ip = request.scope.get("client", ("undefined"))[0]
record.trace_id = request_id
record.pid = os.getpid()
return True


Expand Down
52 changes: 29 additions & 23 deletions antarest/launcher/adapters/slurm_launcher/slurm_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,12 @@ def _check_studies_state(self) -> None:
study_list = self.data_repo_tinydb.get_list_of_studies()

nb_study_done = 0

studies_to_cleanup = []
for study in study_list:
nb_study_done += 1 if (study.finished or study.with_error) else 0
if study.done:
try:
studies_to_cleanup.append(study.name)
self.log_tail_manager.stop_tracking(
SlurmLauncher._get_log_path(study)
)
Expand Down Expand Up @@ -347,17 +348,20 @@ def _check_studies_state(self) -> None:
f"Failed to finalize study {study.name} launch",
exc_info=e,
)
finally:
self._clean_up_study(study.name)
else:
self.log_tail_manager.track(
SlurmLauncher._get_log_path(study),
self.create_update_log(study.name),
)

# we refetch study list here because by the time the import_output is done, maybe some new studies has been added
if nb_study_done == len(self.data_repo_tinydb.get_list_of_studies()):
self.stop()
# also we clean up the study after because it remove the study in the database
with self.antares_launcher_lock:
nb_studies = self.data_repo_tinydb.get_list_of_studies()
for study_id in studies_to_cleanup:
self._clean_up_study(study_id)
if nb_study_done == len(nb_studies):
self.stop()

@staticmethod
def _get_log_path(
Expand Down Expand Up @@ -444,9 +448,9 @@ def _run_study(
) -> None:
study_path = Path(self.launcher_args.studies_in) / str(launch_uuid)

try:
# export study
with self.antares_launcher_lock:
with self.antares_launcher_lock:
try:
# export study
self.callbacks.export_study(
launch_uuid, study_uuid, study_path, launcher_params
)
Expand All @@ -468,25 +472,27 @@ def _run_study(
)
logger.info("Study exported and run with launcher")

self.callbacks.update_status(
str(launch_uuid), JobStatus.RUNNING, None, None
)
except Exception as e:
logger.error(f"Failed to launch study {study_uuid}", exc_info=e)
self.callbacks.append_after_log(
launch_uuid,
f"Unexpected error when launching study : {str(e)}",
)
self.callbacks.update_status(
str(launch_uuid), JobStatus.FAILED, str(e), None
)
self._clean_up_study(str(launch_uuid))
self.callbacks.update_status(
str(launch_uuid), JobStatus.RUNNING, None, None
)
except Exception as e:
logger.error(
f"Failed to launch study {study_uuid}", exc_info=e
)
self.callbacks.append_after_log(
launch_uuid,
f"Unexpected error when launching study : {str(e)}",
)
self.callbacks.update_status(
str(launch_uuid), JobStatus.FAILED, str(e), None
)
self._clean_up_study(str(launch_uuid))
finally:
self._delete_workspace_file(study_path)

if not self.thread:
self.start()

self._delete_workspace_file(study_path)

def _check_and_apply_launcher_params(
self, launcher_params: LauncherParametersDTO
) -> argparse.Namespace:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ remove_data <- function(path_prefix, data_type, data_list, include_id) {
if (!data_list[[item]]) {
item_data <- paste(c(path_prefix, data_type, item), collapse="/")
cat(paste0("Removing from ", data_type, " ", item, " in ", path_prefix, "\n"))
unlink(file.path(paste0(item_data, "values-hourly.txt", collapse="/")))
unlink(file.path(paste0(c(item_data, "values-hourly.txt"), collapse="/")))
if (include_id) {
unlink(file.path(paste0(item_data, "id-hourly.txt", collapse="/")))
unlink(file.path(paste0(c(item_data, "id-hourly.txt"), collapse="/")))
}
unlink(file.path(paste0(item_data, "details-hourly.txt", collapse="/")))
unlink(file.path(paste0(c(item_data, "details-hourly.txt"), collapse="/")))
if (length(list.files(file.path(item_data))) == 0) {
unlink(file.path(item_data))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ remove_data <- function(path_prefix, data_type, data_list, include_id) {
if (!data_list[[item]]) {
item_data <- paste(c(path_prefix, data_type, item), collapse="/")
cat(paste0("Removing from ", data_type, " ", item, " in ", path_prefix, "\n"))
unlink(file.path(paste0(item_data, "values-hourly.txt", collapse="/")))
unlink(file.path(paste0(c(item_data, "values-hourly.txt"), collapse="/")))
if (include_id) {
unlink(file.path(paste0(item_data, "id-hourly.txt", collapse="/")))
unlink(file.path(paste0(c(item_data, "id-hourly.txt"), collapse="/")))
}
unlink(file.path(paste0(item_data, "details-hourly.txt", collapse="/")))
unlink(file.path(paste0(c(item_data, "details-hourly.txt"), collapse="/")))
if (length(list.files(file.path(item_data))) == 0) {
unlink(file.path(item_data))
}
Expand Down
1 change: 1 addition & 0 deletions antarest/launcher/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class LauncherParametersDTO(BaseModel):
xpansion: bool = False
xpansion_r_version: bool = False
archive_output: bool = True
auto_unzip: bool = True
output_suffix: Optional[str] = None
other_options: Optional[str] = None
# add extensions field here
Expand Down
7 changes: 7 additions & 0 deletions antarest/launcher/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ def get_all(
job_results: List[JobResult] = query.all()
return job_results

def get_running(self) -> List[JobResult]:
query = db.session.query(JobResult).where(
JobResult.completion_date == None
)
job_results: List[JobResult] = query.all()
return job_results

def find_by_study(self, study_id: str) -> List[JobResult]:
logger.debug(f"Retrieving JobResults from study {study_id}")
job_results: List[JobResult] = (
Expand Down
61 changes: 54 additions & 7 deletions antarest/launcher/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self, engine: str):

ORPHAN_JOBS_VISIBILITY_THRESHOLD = 10 # days
LAUNCHER_PARAM_NAME_SUFFIX = "output_suffix"
EXECUTION_INFO_FILE = "execution_info.ini"


class LauncherService:
Expand Down Expand Up @@ -328,7 +329,7 @@ def _filter_from_user_permission(
allowed_job_results.append(job_result)
except StudyNotFoundError:
if (
(user and user.is_site_admin())
(user and (user.is_site_admin() or user.is_admin_token()))
or job_result.creation_date >= orphan_visibility_threshold
):
allowed_job_results.append(job_result)
Expand Down Expand Up @@ -519,7 +520,7 @@ def _save_solver_stats(
self, job_result: JobResult, output_path: Path
) -> None:
try:
measurement_file = output_path / "time_measurement.txt"
measurement_file = output_path / EXECUTION_INFO_FILE
if measurement_file.exists():
job_result.solver_stats = measurement_file.read_text(
encoding="utf-8"
Expand Down Expand Up @@ -551,20 +552,18 @@ def _import_output(
output_true_path,
job_launch_params,
)
self._save_solver_stats(job_result, output_path)
self._save_solver_stats(job_result, output_true_path)

if additional_logs:
for log_name, log_paths in additional_logs.items():
concat_files(
log_paths,
output_path / log_name,
output_true_path / log_name,
)

zip_path: Optional[Path] = None
stopwatch = StopWatch()
if LauncherParametersDTO.parse_raw(
job_result.launcher_params or "{}"
).archive_output:
if job_launch_params.archive_output:
logger.info("Re zipping output for transfer")
zip_path = (
output_true_path.parent
Expand Down Expand Up @@ -593,6 +592,7 @@ def _import_output(
None,
),
),
job_launch_params.auto_unzip,
)
except StudyNotFoundError:
return self._import_fallback_output(
Expand All @@ -607,6 +607,9 @@ def _import_output(
),
),
)
finally:
if zip_path:
os.unlink(zip_path)
raise JobNotFound()

def _download_fallback_output(
Expand Down Expand Up @@ -669,6 +672,50 @@ def download_output(
)
raise JobNotFound()

def get_load(self, from_cluster: bool = False) -> Dict[str, float]:
all_running_jobs = self.job_result_repository.get_running()
local_running_jobs = []
slurm_running_jobs = []
for job in all_running_jobs:
if job.launcher == "slurm":
slurm_running_jobs.append(job)
elif job.launcher == "local":
local_running_jobs.append(job)
else:
logger.warning(f"Unknown job launcher {job.launcher}")
load = {}
if self.config.launcher.slurm:
if from_cluster:
raise NotImplementedError
slurm_used_cpus = reduce(
lambda count, j: count
+ (
LauncherParametersDTO.parse_raw(
j.launcher_params or "{}"
).nb_cpu
or self.config.launcher.slurm.default_n_cpu # type: ignore
),
slurm_running_jobs,
0,
)
load["slurm"] = (
float(slurm_used_cpus) / self.config.launcher.slurm.max_cores
)
if self.config.launcher.local:
local_used_cpus = reduce(
lambda count, j: count
+ (
LauncherParametersDTO.parse_raw(
j.launcher_params or "{}"
).nb_cpu
or 1
),
local_running_jobs,
0,
)
load["local"] = float(local_used_cpus) / (os.cpu_count() or 1)
return load

def get_versions(self, params: RequestParameters) -> Dict[str, List[str]]:
version_dict = {}
if self.config.launcher.local:
Expand Down
13 changes: 13 additions & 0 deletions antarest/launcher/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,19 @@ def get_engines() -> Any:
logger.info(f"Listing launch engines")
return LauncherEnginesDTO(engines=service.get_launchers())

@bp.get(
"/launcher/load",
tags=[APITag.launcher],
summary="Get the cluster load in usage percent",
)
def get_load(
from_cluster: bool = False,
current_user: JWTUser = Depends(auth.get_current_user),
) -> Dict[str, float]:
params = RequestParameters(user=current_user)
logger.info("Fetching launcher load")
return service.get_load(from_cluster)

@bp.get(
"/launcher/_versions",
tags=[APITag.launcher],
Expand Down
11 changes: 11 additions & 0 deletions antarest/study/business/area_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class AreaCreationDTO(BaseModel):
class ClusterInfoDTO(PatchCluster):
id: str
name: str
enabled: bool = True
unitcount: int = 0
nominalcapacity: int = 0
group: Optional[str] = None
Expand Down Expand Up @@ -187,11 +188,21 @@ def update_area_ui(
data=area_ui.x,
command_context=self.storage_service.variant_study_service.command_factory.command_context,
),
UpdateConfig(
target=f"input/areas/{area_id}/ui/layerX/0",
data=area_ui.x,
command_context=self.storage_service.variant_study_service.command_factory.command_context,
),
UpdateConfig(
target=f"input/areas/{area_id}/ui/ui/y",
data=area_ui.y,
command_context=self.storage_service.variant_study_service.command_factory.command_context,
),
UpdateConfig(
target=f"input/areas/{area_id}/ui/layerY/0",
data=area_ui.y,
command_context=self.storage_service.variant_study_service.command_factory.command_context,
),
UpdateConfig(
target=f"input/areas/{area_id}/ui/ui/color_r",
data=area_ui.color_rgb[0],
Expand Down
4 changes: 2 additions & 2 deletions antarest/study/business/config_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def get_thematic_trimming(self, study: Study) -> Dict[str, bool]:
storage_service = self.storage_service.get_storage(study)
file_study = storage_service.get_raw(study)
config = file_study.tree.get(["settings", "generaldata"])
trimming_config = config.get("variable selection", None)
trimming_config = config.get("variables selection", None)
variable_list = self.get_output_variables(study)
if trimming_config:
if trimming_config.get("selected_vars_reset", True):
Expand Down Expand Up @@ -137,7 +137,7 @@ def set_thematic_trimming(
"select_var +": state_by_active[True],
}
command = UpdateConfig(
target="settings/generaldata/variable selection",
target="settings/generaldata/variables selection",
data=config_data,
command_context=self.storage_service.variant_study_service.command_factory.command_context,
)
Expand Down
4 changes: 3 additions & 1 deletion antarest/study/common/studystorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,5 +283,7 @@ def archive_study_output(self, study: T, output_id: str) -> bool:
raise NotImplementedError()

@abstractmethod
def unarchive_study_output(self, study: T, output_id: str) -> bool:
def unarchive_study_output(
self, study: T, output_id: str, keep_src_zip: bool
) -> bool:
raise NotImplementedError()
Loading

0 comments on commit 29c5d9c

Please sign in to comment.