Skip to content

Commit

Permalink
fix: listo composer
Browse files Browse the repository at this point in the history
  • Loading branch information
sgatto committed Mar 4, 2021
1 parent 4c42564 commit 6e9758c
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 224 deletions.
2 changes: 1 addition & 1 deletion antareslauncher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
DESCRIPTION = "Antares_Launcher to run Antares on a remote linux machine"

#: Project's version (:py:class:`Version <rspace.application.Version.Version>`)
VERSION = "1.1.2"
VERSION = "1.1.3"
14 changes: 0 additions & 14 deletions antareslauncher/remote_environnement/iremote_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,6 @@ def get_queue_info(self):
def kill_remote_job(self, job_id):
raise NotImplementedError

@abstractmethod
def compose_command_to_check_state(self, job_id):
raise NotImplementedError

@abstractmethod
def check_job_state(self, arg_study: StudyDTO):
raise NotImplementedError

@abstractmethod
def upload_file(self, src):
raise NotImplementedError
Expand All @@ -73,12 +65,6 @@ def download_final_zip(self, study: StudyDTO) -> str:
def clean_remote_server(self, study: StudyDTO) -> bool:
raise NotImplementedError

def execute_command(self, cmd):
return self.connection.execute_command(cmd)

def check_file_not_empty(self, file_path):
return self.connection.check_file_not_empty(file_path)

@abstractmethod
def submit_job(self, _study: StudyDTO):
raise NotImplementedError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time
from pathlib import Path

from antareslauncher import definitions
from antareslauncher.remote_environnement import iconnection
from antareslauncher.remote_environnement.iremote_environment import (
IRemoteEnvironment,
Expand Down Expand Up @@ -38,11 +37,16 @@ def __init__(
):
super(RemoteEnvironmentWithSlurm, self).__init__(_connection=_connection)
self.slurm_script_features = slurm_script_features
self.initialise_remote_path()
self.check_remote_script()
self.remote_base_path: str = ""
self._initialise_remote_path()
self._check_remote_script()

def initialise_remote_path(self):
"""Checks if remote base dir exists and create it if not"""
def _initialise_remote_path(self):
self._set_remote_base_path()
if not self.connection.make_dir(self.remote_base_path):
raise NoRemoteBaseDirException

def _set_remote_base_path(self):
remote_home_dir = self.connection.home_dir
self.remote_base_path = (
str(remote_home_dir)
Expand All @@ -51,23 +55,10 @@ def initialise_remote_path(self):
+ "_"
+ socket.gethostname()
)
if not self.connection.make_dir(self.remote_base_path):
raise NoRemoteBaseDirException

def check_remote_script(self):
"""Checks if self.script_filename is not empty.
Returns:
True if file not empty
Raises:
NoLaunchScriptFoundException"""

def _check_remote_script(self):
remote_antares_script = self.slurm_script_features.solver_script_path

if self.connection.check_file_not_empty(remote_antares_script):
return True
else:
if not self.connection.check_file_not_empty(remote_antares_script):
raise NoLaunchScriptFoundException

def get_queue_info(self):
Expand All @@ -76,9 +67,7 @@ def get_queue_info(self):
Returns:
The error if connection.execute_command raises an error, otherwise the slurm queue info
"""

username = self.connection.username

command = f"squeue -u {username} --Format=name:40,state:12,starttime:22,TimeUsed:12,timelimit:12"
output, error = self.connection.execute_command(command)
if error:
Expand Down Expand Up @@ -151,35 +140,18 @@ def submit_job(self, my_study: StudyDTO):
output, error = self.connection.execute_command(command)
if error:
raise SubmitJobErrorException
job_id = self._get_jobid_from_output_of_submit_command(output)
return job_id

@staticmethod
def _get_jobid_from_output_of_submit_command(output):
job_id = None
# squeue SLURM command returns f'Submitted {job_id}' if successful
# SLURM squeue command returns f'Submitted {job_id}' if successful
stdout_list = str(output).split()
if stdout_list and stdout_list[0] == "Submitted":
job_id = int(stdout_list[-1])
return job_id

def compose_command_to_check_state(self, job_id):
"""Compose the command to ask SLURM the state of a given job
It executes "sacct -j JOB_ID" to get the state
It uses the option "-n --format=state" to select the column STATE
It adds "head -1" to get the state of the global job-tasks and drop the sub task
It adds the awk -F " " '{print $1}' in order to drop the blank before the state string
Args:
job_id: Slurm job_id
Returns:
The Slurm state, it can be "COMPLETED", "CANCELLED BY...", "PENDING", "TIMEOUT" , "RUNNING", ...
"""
return (
f"sacct -j {job_id} -n --format=state | head -1 "
+ "| awk -F\" \" '{print $1}'"
)

@staticmethod
def get_advancement_flags_from_state(state):
"""Converts the slurm state of the job to 3 boolean values
Expand Down Expand Up @@ -215,7 +187,7 @@ def get_advancement_flags_from_state(state):

return started, finished, with_error

def check_job_state(self, job_id: int):
def _check_job_state(self, job_id: int):
"""Checks the slurm state of a study
Args:
Expand All @@ -227,7 +199,7 @@ def check_job_state(self, job_id: int):
Raises:
GetJobStateErrorException if the job_state has not been obtained
"""
command = self.compose_command_to_check_state(job_id)
command = self._compose_command_to_get_state_as_one_word(job_id)
max_number_of_tries = 3
for _ in range(max_number_of_tries):
output, error = self.connection.execute_command(command)
Expand All @@ -240,6 +212,13 @@ def check_job_state(self, job_id: int):

raise GetJobStateOutputException

@staticmethod
def _compose_command_to_get_state_as_one_word(job_id):
return (
f"sacct -j {int(job_id)} -n --format=state | head -1 "
+ "| awk -F\" \" '{print $1}'"
)

def get_job_state_flags(self, study) -> [bool, bool, bool]:
"""Checks the job state of a submitted study and converts it to flags
Expand All @@ -249,7 +228,7 @@ def get_job_state_flags(self, study) -> [bool, bool, bool]:
Returns:
started, finished, with_error: The booleans representing the advancement of the slurm_job
"""
job_state = self.check_job_state(study.job_id)
job_state = self._check_job_state(study.job_id)
return self.get_advancement_flags_from_state(job_state)

def upload_file(self, src):
Expand All @@ -271,17 +250,17 @@ def _list_remote_logs(self, job_id: int):
List containing the files name, empty if none is present
"""

def path_leaf(path):
head, tail = ntpath.split(path)
return tail or ntpath.basename(head)

list_of_files = []
logs_path_signature = self.slurm_script_features.get_logs_path_signature(
self.remote_base_path, job_id
)
command = f"ls " + logs_path_signature
output, error = self.execute_command(command)
output, error = self.connection.execute_command(command)

def path_leaf(path):
head, tail = ntpath.split(path)
return tail or ntpath.basename(head)

list_of_files = []
if output and not error:
list_of_files = [path_leaf(Path(file)) for file in output.splitlines()]

Expand All @@ -303,9 +282,7 @@ def download_logs(self, study: StudyDTO):
for file in file_list:
src = self.remote_base_path + "/" + file
dst = str(Path(study.job_log_dir) / file)
return_flag = return_flag and self.connection.download_file(
src, dst
)
return_flag = return_flag and self.connection.download_file(src, dst)
else:
return_flag = False
return return_flag
Expand All @@ -323,7 +300,7 @@ def check_final_zip_not_empty(self, study: StudyDTO, final_zip_name: str):
return_flag = False
if study.finished:
filename = self.remote_base_path + "/" + final_zip_name
if self.check_file_not_empty(filename):
if self.connection.check_file_not_empty(filename) is True:
return_flag = True
return return_flag

Expand Down
9 changes: 8 additions & 1 deletion antareslauncher/remote_environnement/ssh_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ def __init__(self, config: dict = None):
self.__client = None
self.__home_dir = None
self.timeout = 10
self.host = ""
self.username = ""
self.port = 22
self.password = None
self.private_key = None

if config:
self.logger.info("Loading ssh connection from config dictionary")
Expand Down Expand Up @@ -71,7 +76,9 @@ def __init_from_config(self, config: dict):
key_file_name=key_file_path, key_password=key_password
)
elif self.password is None:
self.logger.debug("self.password is None")
self.logger.debug(
"self.password is None, no key found, now password was given"
)
raise ValueError

def initialize_home_dir(self):
Expand Down
Loading

0 comments on commit 6e9758c

Please sign in to comment.