diff --git a/slurm_state/helpers/clusters_helper.py b/slurm_state/helpers/clusters_helper.py index ce65d219..c17a8293 100644 --- a/slurm_state/helpers/clusters_helper.py +++ b/slurm_state/helpers/clusters_helper.py @@ -65,6 +65,7 @@ def _load_clusters_from_config(): clusters_valid.add_field("sacct_path", optional_string) clusters_valid.add_field("sinfo_path", optional_string) + clusters_valid.add_field("slurm_version", optional_string, default=None) # Load the clusters from the configuration file, asserting that it uses the # predefined format diff --git a/slurm_state/helpers/parser_helper.py b/slurm_state/helpers/parser_helper.py index 7ad8d1a3..5750b7e7 100644 --- a/slurm_state/helpers/parser_helper.py +++ b/slurm_state/helpers/parser_helper.py @@ -19,3 +19,154 @@ def renamer(k, v, res): res[name] = v return renamer + + +def copy_and_stringify(k, v, res): + res[k] = str(v) + + +def rename_subitems(subitem_dict): + def renamer(k, v, res): + for subitem, name in subitem_dict.items(): + res[name] = v[subitem] + + return renamer + + +def translate_with_value_modification(v_modification, translator, **args): + """ + This function returns a translator that includes a modification + on the value which will be transmitted to it + + Parameters: + v_modification The function modifying the value before it + is transmitted to the translator + translator The translator called + + Returns: + A translator function including the expected value modification + """ + # Some translator can depend on specific arguments. Thus, we call + # them before to apply it and get the translator which has to be + # used + final_translator = translator(**args) + + # This helper is used to update the value v before applying the + # translator on the triplet (k, v, res) + def combiner(k, v, res): + final_translator(k, v_modification(v), res) + + return combiner + + +def zero_to_null(v): + """ + Convert the value from 0 to null + + Parameter: + v The values to be converted if appliable + + Return: + The converted values + """ + # If a value of v equals 0, transform it to None + for (v_k, v_v) in v.items(): + if v_v == 0: + v[v_k] = None + # Return v + return v + + +def rename_and_stringify_subitems(subitem_dict): + def renamer(k, v, res): + for subitem, name in subitem_dict.items(): + res[name] = str(v[subitem]) + + return renamer + + +def join_subitems(separator, name): + def joiner(k, v, res): + values = [] + for _, value in v.items(): + values.append(str(value)) + res[name] = separator.join(values) + + return joiner + + +def extract_tres_data(k, v, res): + """ + Extract count of the elements present in the value associated to the key "tres" + in the input dictionary. Such a dictionary would present a structure similar as depicted below: + "tres": { + 'allocated': [ + {'type': 'cpu', 'name': None, 'id': 1, 'count': 4}, + {'type': 'mem', 'name': None, 'id': 2, 'count': 40960}, + {'type': 'node', 'name': None, 'id': 4, 'count': 1}, + {'type': 'billing', 'name': None, 'id': 5, 'count': 1}, + {'type': 'gres', 'name': 'gpu', 'id': 1001, 'count': 1} + ], + 'requested': [ + {'type': 'cpu', 'name': None, 'id': 1, 'count': 4}, + {'type': 'mem', 'name': None, 'id': 2, 'count': 40960}, + {'type': 'node', 'name': None, 'id': 4, 'count': 1}, + {'type': 'billing', 'name': None, 'id': 5, 'count': 1}, + {'type': 'gres', 'name': 'gpu', 'id': 1001, 'count': 1} + ] + } + + The dictionaries (and their associated keys) inserted in the job result by this function + for this input should be: + "tres_allocated": { + "billing": 1, + "mem": 40960, + "num_cpus": 4, + "num_gpus": 1, + "num_nodes": 1 + } + "tres_requested": { + "billing": 1, + "mem": 40960, + "num_cpus": 4, + "num_gpus": 1, + "num_nodes": 1 + } + """ + + def get_tres_key(tres_type, tres_name): + """ + Basically, this function is used to rename the element + we want to retrieve regarding the TRES type (as we are + for now only interested by the "count" of the entity) + """ + if tres_type == "mem" or tres_type == "billing": + return tres_type + elif tres_type == "cpu": + return "num_cpus" + elif tres_type == "gres": + if tres_name == "gpu": + return "num_gpus" + else: + return "gres" + elif tres_type == "node": + return "num_nodes" + else: + return None + + tres_subdict_names = [ + {"sacct_name": "allocated", "cw_name": "tres_allocated"}, + {"sacct_name": "requested", "cw_name": "tres_requested"}, + ] + for tres_subdict_name in tres_subdict_names: + res[ + tres_subdict_name["cw_name"] + ] = {} # Initialize the "tres_allocated" and the "tres_requested" subdicts + for tres_subdict in v[tres_subdict_name["sacct_name"]]: + tres_key = get_tres_key( + tres_subdict["type"], tres_subdict["name"] + ) # Define the key associated to the TRES + if tres_key: + res[tres_subdict_name["cw_name"]][tres_key] = tres_subdict[ + "count" + ] # Associate the count of the element, as value associated to the key defined previously diff --git a/slurm_state/helpers/ssh_helper.py b/slurm_state/helpers/ssh_helper.py index 4211a815..851ea878 100644 --- a/slurm_state/helpers/ssh_helper.py +++ b/slurm_state/helpers/ssh_helper.py @@ -43,3 +43,62 @@ def open_connection(hostname, username, ssh_key_path, port=22): ssh_client = None return ssh_client + + +def launch_slurm_command(command, hostname, username, ssh_key_filename, port=22): + """ + Launch a Slurm command through SSH and retrieve its response. + + Parameters: + command The Slurm command to launch through SSH + hostname The hostname used for the SSH connection to launch the Slurm command + username The username used for the SSH connection to launch the Slurm command + ssh_key_filename The name of the private key in .ssh folder used for the SSH connection to launch the Slurm command + port The port used for the SSH connection to launch the sinfo command + """ + # Print the command to use + print(f"The command launched through SSH is:\n{command}") + + # Check the given SSH key + assert ssh_key_filename, "Missing ssh_key_filename from config." + + # Now this is the private ssh key that we are using with Paramiko. + ssh_key_path = os.path.join(os.path.expanduser("~"), ".ssh", ssh_key_filename) + + # Connect through SSH + try: + ssh_client = open_connection( + hostname, username, ssh_key_path=ssh_key_path, port=port + ) + except Exception as inst: + print( + f"Error. Failed to connect to {hostname} to launch the command:\n{command}" + ) + print(inst) + return [] + + # If a connection has been established + if ssh_client: + ssh_stdin, ssh_stdout, ssh_stderr = ssh_client.exec_command(command) + + # We should find a better option to retrieve stderr + """ + response_stderr = "".join(ssh_stderr.readlines()) + if len(response_stderr): + print( + f"Stderr in sinfo call on {hostname}. This doesn't mean that the call failed entirely, though.\n{response_stderr}" + ) + """ + stdout = ssh_stdout.readlines() + ssh_client.close() + return stdout + + else: + print( + f"Error. Failed to connect to {hostname} to make the call. Returned `None` but no exception was thrown." + ) + + # If no SSH connection has been established, raise an exception + raise Exception( + f"No SSH connection has been established while trying to run {command}." + ) diff --git a/slurm_state/mongo_update.py b/slurm_state/mongo_update.py index ffdadab5..2e9d0471 100644 --- a/slurm_state/mongo_update.py +++ b/slurm_state/mongo_update.py @@ -9,8 +9,9 @@ from slurm_state.helpers.gpu_helper import get_cw_gres_description from slurm_state.helpers.clusters_helper import get_all_clusters -from slurm_state.sinfo_parser import node_parser, generate_node_report -from slurm_state.sacct_parser import job_parser, generate_job_report +# Import parser classes +from slurm_state.parsers.job_parser import JobParser +from slurm_state.parsers.node_parser import NodeParser def pprint_bulk_result(result): @@ -20,11 +21,13 @@ def pprint_bulk_result(result): print(result.bulk_api_result) -def fetch_slurm_report(parser, cluster_name, report_path): +def fetch_slurm_report(parser, report_path): """ Yields elements ready to be slotted into the "slurm" field, but they have to be processed further before committing to MongoDB. """ + # Retrieve the cluster name + cluster_name = parser.cluster["name"] assert os.path.exists(report_path), f"The report path {report_path} is missing." @@ -32,7 +35,7 @@ def fetch_slurm_report(parser, cluster_name, report_path): assert ctx is not None, f"{cluster_name} not configured" with open(report_path, "r") as f: - for e in parser(f): + for e in parser.parser(f): e["cluster_name"] = cluster_name yield e @@ -136,6 +139,10 @@ def main_read_report_and_update_collection( # Initialize the time of this operation's beginning timestamp_start = time.time() + # Retrieve clusters data from the configuration file + clusters = get_all_clusters() + assert cluster_name in clusters + # Check the input parameters assert entity in ["jobs", "nodes"] @@ -143,26 +150,24 @@ def main_read_report_and_update_collection( id_key = ( "job_id" # The id_key is used to determine how to retrieve the ID of a job ) - parser = job_parser # This parser is used to retrieve and format useful information from a sacct job + parser = JobParser( + cluster_name + ) # This parser is used to retrieve and format useful information from a sacct job from_slurm_to_clockwork = slurm_job_to_clockwork_job # This function is used to translate a Slurm job (created through the parser) to a Clockwork job - generate_report = generate_job_report # This function is used to generate the file gathering the job information which will be explained later elif entity == "nodes": id_key = ( "name" # The id_key is used to determine how to retrieve the ID of a node ) - parser = node_parser # This parser is used to retrieve and format useful information from a sacct node + parser = NodeParser( + cluster_name + ) # This parser is used to retrieve and format useful information from a sacct node from_slurm_to_clockwork = slurm_node_to_clockwork_node # This function is used to translate a Slurm node (created through the parser) to a Clockwork node - generate_report = generate_node_report # This function is used to generate the file gathering the node information which will be explained later else: # Raise an error because it should not happen raise ValueError( f'Incorrect value for entity in main_read_sacct_and_update_collection: "{entity}" when it should be "jobs" or "nodes".' ) - # Retrieve clusters data from the configuration file - clusters = get_all_clusters() - assert cluster_name in clusters - ## Retrieve entities ## # Generate a report file if required @@ -170,13 +175,13 @@ def main_read_report_and_update_collection( print( f"Generate report file for the {cluster_name} cluster at location {report_file_path}." ) - generate_report(cluster_name, report_file_path) + parser.generate_report(report_file_path) # Construct an iterator over the list of entities in the report file, # each one of them is turned into a clockwork job or node, according to applicability I_clockwork_entities_from_report = map( from_slurm_to_clockwork, - fetch_slurm_report(parser, cluster_name, report_file_path), + fetch_slurm_report(parser, report_file_path), ) L_updates_to_do = [] # Entity updates to store in the database if requested diff --git a/slurm_state/parsers/job_parser.py b/slurm_state/parsers/job_parser.py new file mode 100644 index 00000000..d186992d --- /dev/null +++ b/slurm_state/parsers/job_parser.py @@ -0,0 +1,119 @@ +# These functions are translators used in order to handle the values +# we could encounter while parsing a job dictionary retrieved from a +# sacct command. +from slurm_state.helpers.parser_helper import ( + copy, + copy_and_stringify, + extract_tres_data, + join_subitems, + rename, + rename_subitems, + rename_and_stringify_subitems, + translate_with_value_modification, + zero_to_null, +) + +from slurm_state.parsers.slurm_parser import SlurmParser + +# Common imports +import json, re + + +class JobParser(SlurmParser): + """ """ + + def __init__(self, cluster_name, slurm_version=None): + super().__init__("jobs", "sacct", cluster_name, slurm_version=slurm_version) + + def generate_report(self, file_name): + + # Retrieve the allocations associated to the cluster + allocations = self.cluster["allocations"] + + if allocations == []: + # If the cluster has no associated allocation, nothing is requested + print( + f"The cluster {self.cluster['name']} has no allocation related to it. Thus, no job has been retrieved. Associated allocations can be provided in the Clockwork configuration file." + ) + return [] + else: + # Set the sacct command + # -S is a condition on the start time, 600 being in seconds + # -E is a condition on the end time + # -X means "Only show statistics relevant to the job allocation itself, not taking steps into consideration." + # --associations is used in order to limit the fetched jobs to the ones related to Mila and/or professors who + # may use Clockwork + if allocations == "*": + # We do not provide --associations information because the default for this parameter + # is "all associations" + remote_command = ( + f"{self.slurm_command_path} -S now-600 -E now -X --allusers --json" + ) + else: + accounts_list = ",".join(allocations) + remote_command = f"{self.slurm_command_path} -S now-600 -E now -X --accounts={accounts_list} --allusers --json" + print(f"remote_command is\n{remote_command}") + + return super().generate_report(remote_command, file_name) + + def parser(self, f): + """ """ + if re.search(r"^22\..*$", self.slurm_version): + return self.parser_v22_and_23(f) + elif re.search(r"^23\..*$", self.slurm_version): + return self.parser_v22_and_23(f) + else: + raise Exception( + f'The {self.entity} parser is not implemented for the Slurm version "{self.slurm_version}".' + ) + + def parser_v22_and_23(self, f): + JOB_FIELD_MAP = { + "account": copy, + "array": rename_and_stringify_subitems( + {"job_id": "array_job_id", "task_id": "array_task_id"} + ), + "cluster": rename("cluster_name"), + "exit_code": join_subitems(":", "exit_code"), + "job_id": copy_and_stringify, + "name": copy, + "nodes": copy, + "partition": copy, + "state": rename_subitems({"current": "job_state"}), + "time": translate_with_value_modification( + zero_to_null, + rename_subitems, + subitem_dict={ + "limit": "time_limit", + "submission": "submit_time", + "start": "start_time", + "end": "end_time", + }, + ), + "tres": extract_tres_data, + "user": rename("username"), + "working_directory": copy, + } + + # Load the JSON file generated using the Slurm command + # (At this point, slurm_data is a hierarchical structure of dictionaries and lists) + slurm_data = json.load(f) + + slurm_entities = slurm_data[self.entity] + + for slurm_entity in slurm_entities: + res_entity = ( + dict() + ) # Initialize the dictionary which will store the newly formatted Slurm data + + for k, v in slurm_entity.items(): + # We will use a handler mapping to translate this + translator = JOB_FIELD_MAP.get(k, None) + + if translator is not None: + # Translate using the translator retrieved from the fields map + translator(k, v, res_entity) + + # If no translator has been provided: ignore the field + + yield res_entity diff --git a/slurm_state/parsers/node_parser.py b/slurm_state/parsers/node_parser.py new file mode 100644 index 00000000..74e4ee36 --- /dev/null +++ b/slurm_state/parsers/node_parser.py @@ -0,0 +1,79 @@ +from slurm_state.parsers.slurm_parser import SlurmParser + +# These functions are translators used in order to handle the values +# we could encounter while parsing a node dictionary retrieved from a +# sinfo command. +from slurm_state.helpers.parser_helper import ( + copy, + copy_with_none_as_empty_string, + rename, +) + +# Common imports +import json, re + + +class NodeParser(SlurmParser): + """ """ + + def __init__(self, cluster_name, slurm_version=None): + super().__init__("nodes", "sinfo", cluster_name, slurm_version=slurm_version) + + def generate_report(self, file_name): + # The command to be launched through SSH is "sinfo --json" + remote_command = f"{self.slurm_command_path} --json" + + return super().generate_report(remote_command, file_name) + + def parser(self, f): + """ """ + if re.search(r"^22\..*$", self.slurm_version): + return self.parser_v22(f) + else: + raise Exception( + f'The {self.entity} parser is not implemented for the Slurm version "{self.slurm_version}".' + ) + + def parser_v22(self, f): + NODE_FIELD_MAP = { + "architecture": rename("arch"), + "comment": copy, + "cores": copy, + "cpus": copy, + "last_busy": copy, + "features": copy, + "gres": copy_with_none_as_empty_string, + "gres_used": copy, + "name": copy, + "address": rename("addr"), + "state": copy, + "state_flags": copy, + "real_memory": rename("memory"), + "reason": copy, + "reason_changed_at": copy, + "tres": copy, + "tres_used": copy, + } + + # Load the JSON file generated using the Slurm command + # (At this point, slurm_data is a hierarchical structure of dictionaries and lists) + slurm_data = json.load(f) + + slurm_entities = slurm_data[self.entity] + + for slurm_entity in slurm_entities: + res_entity = ( + dict() + ) # Initialize the dictionary which will store the newly formatted Slurm data + + for k, v in slurm_entity.items(): + # We will use a handler mapping to translate this + translator = NODE_FIELD_MAP.get(k, None) + + if translator is not None: + # Translate using the translator retrieved from the fields map + translator(k, v, res_entity) + + # If no translator has been provided: ignore the field + + yield res_entity diff --git a/slurm_state/parsers/slurm_parser.py b/slurm_state/parsers/slurm_parser.py new file mode 100644 index 00000000..56c9bb62 --- /dev/null +++ b/slurm_state/parsers/slurm_parser.py @@ -0,0 +1,94 @@ +# Imports to retrieve the values related to Slurm command +from slurm_state.helpers.ssh_helper import launch_slurm_command, open_connection +from slurm_state.helpers.clusters_helper import get_all_clusters + +# Common imports +import os, re + + +class SlurmParser: + """ + A parser for Slurm entities + """ + + def __init__(self, entity, slurm_command, cluster_name, slurm_version=None): + self.entity = entity + assert entity in ["jobs", "nodes"] + + self.cluster = get_all_clusters()[cluster_name] + self.cluster["name"] = cluster_name + + self.slurm_command = slurm_command + # Retrieve the path to the Slurm command we want to launch on the cluster + # It is stored in the cluster data under the key "sacct_path" for the sacct command + # and "sinfo_path" for the sinfo command + self.slurm_command_path = self.cluster[f"{self.slurm_command}_path"] + # Check if slurm_command_path exists + assert ( + self.slurm_command_path + ), f"Error. We have called the function to make updates with {self.slurm_command} but the {self.slurm_command}_path config is empty." + assert self.slurm_command_path.endswith( + self.slurm_command + ), f"Error. The {self.slurm_command}_path configuration needs to end with '{self.slurm_command}'. It is currently {self.slurm_command_path} ." + + if slurm_version is not None: + self.slurm_version = slurm_version + else: + # If no Slurm version is provided, retrieve the version of Slurm installed on the current cluster + self.slurm_version = self.get_slurm_version() + + def get_slurm_version(self): + """ + Get the Slurm version + """ + if ( + "slurm_version" in self.cluster + and self.cluster["slurm_version"] is not None + ): + # If the Slurm version has been added to the configuration file, + # return the value of the configuration + return self.cluster["slurm_version"] + else: + print("3") + # Launch the sacct or sinfo command to get its version + remote_command = f"{self.slurm_command_path} -V" + response = self.launch_slurm_command(remote_command) + assert len(response) == 1 + version_regex = re.compile(r"^slurm (\d+\.\d+\.\d+)$") + if m := version_regex.match(response): + return m.group(1) + # If the version has not been identified, raise an error + raise Exception( + f'The version "{response}" has not been recognized as a Slurm version.' + ) + + def launch_slurm_command(self, remote_command): + """ """ + return launch_slurm_command( + remote_command, + self.cluster["remote_hostname"], + self.cluster["remote_user"], + self.cluster["ssh_key_filename"], + self.cluster["ssh_port"], + ) + + def generate_report(self, remote_command, file_name): + """ + Launch a Slurm command in order to retrieve JSON report containing + jobs or nodes information + + Parameters: + cluster_name The name of the cluster on which the Slurm command will be launched + remote_command The command used to retrieve the data from Slurm + file_name The path of the report file to write + """ + # Launch the requested command in order to retrieve Slurm information + stdout = self.launch_slurm_command(remote_command) + + # Create directories if needed + os.makedirs(os.path.dirname(file_name), exist_ok=True) + + # Write the command output to a file + with open(file_name, "w") as outfile: + for line in stdout: + outfile.write(line) diff --git a/slurm_state/sinfo_parser.py b/slurm_state/sinfo_parser.py index 5e00997c..9c75d2cc 100644 --- a/slurm_state/sinfo_parser.py +++ b/slurm_state/sinfo_parser.py @@ -147,7 +147,7 @@ def node_parser(f): translator = NODE_FIELD_MAP.get(k, None) if translator is not None: - # Translate using the translator retrieved from JOB_FIELD_MAP + # Translate using the translator retrieved from NODE_FIELD_MAP translator(k, v, res_node) # If no translator has been provided: ignore the field @@ -156,8 +156,6 @@ def node_parser(f): # The functions used to create the report file, gathering the information to parse - - def generate_node_report( cluster_name, file_name, diff --git a/slurm_state_test/test_mongo_update.py b/slurm_state_test/test_mongo_update.py index 4d40a19a..739c3757 100644 --- a/slurm_state_test/test_mongo_update.py +++ b/slurm_state_test/test_mongo_update.py @@ -2,20 +2,20 @@ from slurm_state.mongo_client import get_mongo_client from slurm_state.config import get_config -from datetime import datetime -from slurm_state.sinfo_parser import node_parser -from slurm_state.sacct_parser import job_parser +# Import jobs and nodes parsers +from slurm_state.parsers.job_parser import JobParser +from slurm_state.parsers.node_parser import NodeParser +# Common imports +from datetime import datetime import pytest -import pprint def test_fetch_slurm_report_jobs(): res = list( fetch_slurm_report( - job_parser, - "cedar", - "slurm_state_test/files/sacct_1", + JobParser("cedar", slurm_version="23.02.6"), # parser + "slurm_state_test/files/sacct_1", # report path ) ) @@ -90,8 +90,7 @@ def test_fetch_slurm_report_jobs(): def test_fetch_slurm_report_nodes(): res = list( fetch_slurm_report( - node_parser, - "mila", + NodeParser("mila", slurm_version="22.05.9"), "slurm_state_test/files/sinfo_1", ) ) @@ -229,13 +228,23 @@ def test_main_read_jobs_and_update_collection(): db.drop_collection("test_jobs") main_read_report_and_update_collection( - "jobs", db.test_jobs, db.test_users, "cedar", "slurm_state_test/files/sacct_1" + "jobs", + db.test_jobs, + db.test_users, + "cedar", + "slurm_state_test/files/sacct_1", + from_file=True, ) assert db.test_jobs.count_documents({}) == 2 main_read_report_and_update_collection( - "jobs", db.test_jobs, db.test_users, "cedar", "slurm_state_test/files/sacct_2" + "jobs", + db.test_jobs, + db.test_users, + "cedar", + "slurm_state_test/files/sacct_2", + from_file=True, ) assert db.test_jobs.count_documents({}) == 3 @@ -255,6 +264,7 @@ def test_main_read_nodes_and_update_collection(): None, "mila", "slurm_state_test/files/sinfo_1", + from_file=True, ) assert db.test_nodes.count_documents({}) == 2 @@ -265,6 +275,7 @@ def test_main_read_nodes_and_update_collection(): None, "mila", "slurm_state_test/files/sinfo_2", + from_file=True, ) assert db.test_nodes.count_documents({}) == 3 diff --git a/test_config.toml b/test_config.toml index 551ddd70..3d3b2e91 100644 --- a/test_config.toml +++ b/test_config.toml @@ -53,6 +53,7 @@ remote_hostname="clockwork-stats" sacct_enabled=true sacct_path="/opt/slurm/bin/sacct" sinfo_path="/opt/slurm/bin/sinfo" +slurm_version="22.05.9" ssh_key_filename="id_clockwork" ssh_port=22 @@ -110,6 +111,7 @@ remote_hostname="cedar.computecanada.ca" sacct_enabled=false sacct_path="/opt/software/slurm/bin/sacct" sinfo_path="/opt/software/slurm/bin/sinfo" +slurm_version="23.02.6" ssh_key_filename="id_clockwork" ssh_port=22