From 87ad1b11e95eb157fba66bd108bf5a7d99227cb7 Mon Sep 17 00:00:00 2001 From: notoraptor Date: Wed, 4 Dec 2024 19:26:58 -0500 Subject: [PATCH] [sarc-386] Acquire node_to_gpu and gpu_billing from cached slurm conf files. (#138) * [sarc-386] Acquire node_to_gpu and gpu_billing from cached slurm conf files. * Display an error message and halt script for cluster mila which is not yet correctly supported. --------- Co-authored-by: Bruno Carrez --- config/sarc-dev.json | 2 - config/sarc-prod.json | 2 - sarc/cli/acquire/__init__.py | 2 + sarc/cli/acquire/slurmconfig.py | 272 ++++++++++++++++++ sarc/cli/db/init.py | 28 ++ sarc/client/__init__.py | 2 + sarc/client/gpumetrics.py | 99 +++++++ sarc/config.py | 14 - sarc/jobs/node_gpu_mapping.py | 165 ++++++----- sarc/jobs/sacct.py | 18 +- .../cli/acquire/test_acquire_slurmconfig.py | 250 ++++++++++++++++ tests/functional/cli/db/test_db_init.py | 20 +- tests/functional/conftest.py | 2 + tests/functional/jobs/test_func_sacct.py | 36 +++ .../node_to_gpu_raisin_no_prometheus.json | 15 - .../nodes_raisin_no_prometheus.txt | 1 - tests/sarc-test.json | 3 +- 17 files changed, 821 insertions(+), 110 deletions(-) create mode 100644 sarc/cli/acquire/slurmconfig.py create mode 100644 sarc/client/gpumetrics.py create mode 100644 tests/functional/cli/acquire/test_acquire_slurmconfig.py delete mode 100644 tests/not-so-secrets/raisin_no_prometheus/node_to_gpu_raisin_no_prometheus.json delete mode 100644 tests/not-so-secrets/raisin_no_prometheus/nodes_raisin_no_prometheus.txt diff --git a/config/sarc-dev.json b/config/sarc-dev.json index d061ad2e..29eb5fad 100644 --- a/config/sarc-dev.json +++ b/config/sarc-dev.json @@ -69,7 +69,6 @@ "prometheus_url": null, "prometheus_headers_file": null, "start_date": "2022-04-01", - "nodes_info_file": "secrets/nodes_graham.txt", "rgu_start_date": "2024-04-03", "gpu_to_rgu_billing": "secrets/gpu_to_rgu_billing_graham.json" }, @@ -84,7 +83,6 @@ "prometheus_url": null, "prometheus_headers_file": null, "start_date": "2022-04-01", - "nodes_info_file": "secrets/nodes_cedar.txt", "rgu_start_date": "2024-04-03", "gpu_to_rgu_billing": "secrets/gpu_to_rgu_billing_cedar.json" } diff --git a/config/sarc-prod.json b/config/sarc-prod.json index 898411cc..dfd4ffb7 100644 --- a/config/sarc-prod.json +++ b/config/sarc-prod.json @@ -69,7 +69,6 @@ "prometheus_url": null, "prometheus_headers_file": null, "start_date": "2022-04-01", - "nodes_info_file": "secrets/nodes_graham.txt", "rgu_start_date": "2024-04-03", "gpu_to_rgu_billing": "secrets/gpu_to_rgu_billing_graham.json" }, @@ -84,7 +83,6 @@ "prometheus_url": null, "prometheus_headers_file": null, "start_date": "2022-04-01", - "nodes_info_file": "secrets/nodes_cedar.txt", "rgu_start_date": "2024-04-03", "gpu_to_rgu_billing": "secrets/gpu_to_rgu_billing_cedar.json" } diff --git a/sarc/cli/acquire/__init__.py b/sarc/cli/acquire/__init__.py index 0f497882..711d28f9 100644 --- a/sarc/cli/acquire/__init__.py +++ b/sarc/cli/acquire/__init__.py @@ -5,6 +5,7 @@ from sarc.cli.acquire.allocations import AcquireAllocations from sarc.cli.acquire.jobs import AcquireJobs +from sarc.cli.acquire.slurmconfig import AcquireSlurmConfig from sarc.cli.acquire.storages import AcquireStorages from sarc.cli.acquire.users import AcquireUsers @@ -17,6 +18,7 @@ class Acquire: "jobs": AcquireJobs, "storages": AcquireStorages, "users": AcquireUsers, + "slurmconfig": AcquireSlurmConfig, } ) diff --git a/sarc/cli/acquire/slurmconfig.py b/sarc/cli/acquire/slurmconfig.py new file mode 100644 index 00000000..9f479a4c --- /dev/null +++ b/sarc/cli/acquire/slurmconfig.py @@ -0,0 +1,272 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import Dict, List + +from hostlist import expand_hostlist +from simple_parsing import field + +from sarc.cache import CachePolicy, with_cache +from sarc.client.gpumetrics import _gpu_billing_collection +from sarc.config import config +from sarc.jobs.node_gpu_mapping import _node_gpu_mapping_collection + +logger = logging.getLogger(__name__) + + +@dataclass +class AcquireSlurmConfig: + cluster_name: str = field(alias=["-c"]) + day: str = field(alias=["-d"]) + + def execute(self) -> int: + if self.cluster_name == "mila": + logger.error("Cluster `mila` not yet supported.") + return -1 + + parser = SlurmConfigParser(self.cluster_name, self.day) + slurm_conf = parser.get_slurm_config() + _gpu_billing_collection().save_gpu_billing( + self.cluster_name, self.day, slurm_conf.gpu_to_billing + ) + _node_gpu_mapping_collection().save_node_gpu_mapping( + self.cluster_name, self.day, slurm_conf.node_to_gpu + ) + return 0 + + +class SlurmConfigParser: + def __init__(self, cluster_name: str, day: str): + self.cluster_name = cluster_name + self.day = day + + def get_slurm_config(self) -> SlurmConfig: + return with_cache( + self._get_slurm_conf, + subdirectory="slurm_conf", + key=self._cache_key, + formatter=self, + )(cache_policy=CachePolicy.always) + + def _get_slurm_conf(self): + raise RuntimeError( + f"Please add cluster slurm.conf file into cache, at location: " + f"{config().cache}/slurm_conf/{self._cache_key()}" + ) + + def _cache_key(self): + return f"slurm.{self.cluster_name}.{self.day}.conf" + + def load(self, file) -> SlurmConfig: + """ + Parse cached slurm conf file and return a SlurmConfig object + containing node_to_gpu and gpu_to_billing. + """ + + partitions: List[Partition] = [] + node_to_gpu = {} + + # Parse lines: extract partitions and node_to_gpu + for line_number, line in enumerate(file): + line = line.strip() + if line.startswith("PartitionName="): + partitions.append( + Partition( + line_number=line_number + 1, + line=line, + info=dict( + option.split("=", maxsplit=1) for option in line.split() + ), + ) + ) + elif line.startswith("NodeName="): + nodes_config = dict( + option.split("=", maxsplit=1) for option in line.split() + ) + gpu_type = nodes_config.get("Gres") + if gpu_type: + node_to_gpu.update( + { + node_name: gpu_type + for node_name in expand_hostlist(nodes_config["NodeName"]) + } + ) + + # Parse partitions: extract gpu_to_billing + gpu_to_billing = self._parse_gpu_to_billing(partitions, node_to_gpu) + + # Return parsed data + return SlurmConfig(node_to_gpu=node_to_gpu, gpu_to_billing=gpu_to_billing) + + def _parse_gpu_to_billing( + self, partitions: List[Partition], node_to_gpu: Dict[str, str] + ) -> Dict[str, float]: + + # Mapping of GPU to partition billing. + # ALlow to check that inferred billing for a GPU is the same across partitions. + # If not, an error will be raised with additional info about involved partitions. + gpu_to_partition_billing: Dict[str, PartitionGPUBilling] = {} + + # Collection for all GPUs found in partition nodes. + # We will later iterate on this collection to resolve any GPU without billing. + all_partition_node_gpus = set() + + for partition in partitions: + # Get all GPUs in partition nodes and all partition GPU billings. + local_gres, local_gpu_to_billing = ( + partition.get_gpus_and_partition_billings(node_to_gpu) + ) + + # Merge local GPUs into global partition node GPUs. + all_partition_node_gpus.update(local_gres) + + # Merge local GPU billings into global GPU billings + for gpu_type, value in local_gpu_to_billing.items(): + new_billing = PartitionGPUBilling( + gpu_type=gpu_type, value=value, partition=partition + ) + if gpu_type not in gpu_to_partition_billing: + # New GPU found, add it + gpu_to_partition_billing[gpu_type] = new_billing + elif gpu_to_partition_billing[gpu_type].value != value: + # GPU already found, with a different billing. Problem. + raise InconsistentGPUBillingError( + gpu_type, gpu_to_partition_billing[gpu_type], new_billing + ) + + # Generate GPU->billing mapping + gpu_to_billing = { + gpu_type: billing.value + for gpu_type, billing in gpu_to_partition_billing.items() + } + + # Resolve GPUs without billing + for gpu_desc in all_partition_node_gpus: + if gpu_desc not in gpu_to_billing: + if gpu_desc.startswith("gpu:") and gpu_desc.count(":") == 2: + # GPU resource with format `gpu::` + _, gpu_type, gpu_count = gpu_desc.split(":") + if gpu_type in gpu_to_billing: + billing = gpu_to_billing[gpu_type] * int(gpu_count) + gpu_to_billing[gpu_desc] = billing + logger.info(f"Inferred billing for {gpu_desc} := {billing}") + else: + logger.warning( + f"Cannot find GPU billing for GPU type {gpu_type} in GPU resource {gpu_desc}" + ) + else: + logger.warning(f"Cannot infer billing for GPU: {gpu_desc}") + + # We can finally return GPU->billing mapping. + return gpu_to_billing + + +@dataclass +class SlurmConfig: + """Parsed data from slurm config file""" + + node_to_gpu: Dict[str, str] + gpu_to_billing: Dict[str, float] + + +@dataclass +class Partition: + """Partition entry in slurm config file""" + + line_number: int + line: str + info: Dict[str, str] + + def get_gpus_and_partition_billings(self, node_to_gpu: Dict[str, str]): + """ + Parse and return: + - partition node GPUs + - partition GPU billings inferred from field `TRESBillingWeights` + """ + + # Get partition node GPUs + local_gres = self._get_node_gpus(node_to_gpu) + + # Get GPU weights from TRESBillingWeights + tres_billing_weights = dict( + option.split("=", maxsplit=1) + for option in self.info.get("TRESBillingWeights", "").split(",") + if option + ) + gpu_weights = { + key: value + for key, value in tres_billing_weights.items() + if key.startswith("GRES/gpu") + } + + # Parse local GPU billings + local_gpu_to_billing = {} + for key, value in gpu_weights.items(): + value = float(value) + if key == "GRES/gpu": + if len(gpu_weights) == 1: + # We only have `GRES/gpu=` + # Let's map value to each GPU found in partition nodes + local_gpu_to_billing.update( + {gpu_name: value for gpu_name in local_gres} + ) + else: + # We have both `GRES/gpu=` and at least one `GRES/gpu:a_gpu=a_value`. + # Ambiguous case, cannot map `GRES/gpu=` to a specific GPU. + logger.debug( + f"[line {self.line_number}] " + f"Ignored ambiguous GPU billing (cannot match to a specific GPU): `{key}={value}` " + f"| partition: {self.info['PartitionName']} " + # f"| nodes: {partition.info['Nodes']}, " + f"| nodes GPUs: {', '.join(local_gres)} " + f"| TRESBillingWeights: {self.info['TRESBillingWeights']}" + ) + else: + # We have `GRES/gpu:a_gpu=a_value`. + # We can parse. + _, gpu_name = key.split(":", maxsplit=1) + local_gpu_to_billing[gpu_name] = value + + return local_gres, local_gpu_to_billing + + def _get_node_gpus(self, node_to_gpu: Dict[str, str]) -> List[str]: + """Return all GPUs found in partition nodes""" + return sorted( + { + gres + for node_name in expand_hostlist(self.info["Nodes"]) + for gres in node_to_gpu.get(node_name, "").split(",") + if gres + } + ) + + +@dataclass +class PartitionGPUBilling: + """Represents a GPU billing found in a specific partition entry.""" + + partition: Partition + gpu_type: str + value: float + + +class InconsistentGPUBillingError(Exception): + def __init__( + self, + gpu_type: str, + prev_billing: PartitionGPUBilling, + new_billing: PartitionGPUBilling, + ): + super().__init__( + f"\n" + f"GPU billing differs.\n" + f"GPU name: {gpu_type}\n" + f"Previous value: {prev_billing.value}\n" + f"From line: {prev_billing.partition.line_number}\n" + f"{prev_billing.partition.line}\n" + f"\n" + f"New value: {new_billing.value}\n" + f"From line: {new_billing.partition.line_number}\n" + f"{new_billing.partition.line}\n" + ) diff --git a/sarc/cli/db/init.py b/sarc/cli/db/init.py index 25e340ba..5611ff4c 100644 --- a/sarc/cli/db/init.py +++ b/sarc/cli/db/init.py @@ -52,6 +52,10 @@ def execute(self) -> int: create_users_indices(db) + create_gpu_billing_indices(db) + + create_node_gpu_mapping_indices(db) + return 0 def create_acount(self, client, db): @@ -92,6 +96,8 @@ def create_readonly_role(self, db): "users", "jobs", "clusters", + "gpu_billing", + "node_gpu_mapping", ] try: @@ -143,6 +149,28 @@ def create_users_indices(db): ) +def create_gpu_billing_indices(db): + db_collection = db.gpu_billing + db_collection.create_index( + [ + ("cluster_name", pymongo.ASCENDING), + ("since", pymongo.ASCENDING), + ], + unique=True, + ) + + +def create_node_gpu_mapping_indices(db): + db_collection = db.node_gpu_mapping + db_collection.create_index( + [ + ("cluster_name", pymongo.ASCENDING), + ("since", pymongo.ASCENDING), + ], + unique=True, + ) + + def create_allocations_indices(db): db_collection = AllocationsRepository(database=db).get_collection() diff --git a/sarc/client/__init__.py b/sarc/client/__init__.py index 25ef68db..43c06561 100644 --- a/sarc/client/__init__.py +++ b/sarc/client/__init__.py @@ -1,3 +1,4 @@ +from .gpumetrics import get_cluster_gpu_billings from .job import count_jobs, get_available_clusters, get_job, get_jobs from .users.api import get_user, get_users @@ -8,4 +9,5 @@ "get_jobs", "get_user", "get_users", + "get_cluster_gpu_billings", ] diff --git a/sarc/client/gpumetrics.py b/sarc/client/gpumetrics.py new file mode 100644 index 00000000..490c602a --- /dev/null +++ b/sarc/client/gpumetrics.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import logging +from datetime import datetime, time +from typing import Dict, List + +from pydantic import validator +from pydantic_mongo import AbstractRepository, ObjectIdField + +from sarc.config import MTL, UTC, BaseModel, config, scraping_mode_required + +logger = logging.getLogger(__name__) + + +class GPUBilling(BaseModel): + """Holds data for a GPU Billing.""" + + # # Database ID + id: ObjectIdField = None + + cluster_name: str + since: datetime + gpu_to_billing: Dict[str, float] + + @validator("since", pre=True) + @classmethod + def _ensure_since(cls, value): + """Parse `since` from stored string to Python datetime.""" + if isinstance(value, str): + return datetime.combine(datetime.fromisoformat(value), time.min).replace( + tzinfo=MTL + ) + else: + assert isinstance(value, datetime) + return value.replace(tzinfo=UTC).astimezone(MTL) + + +class GPUBillingRepository(AbstractRepository[GPUBilling]): + class Meta: + collection_name = "gpu_billing" + + @scraping_mode_required + def save_gpu_billing( + self, + cluster_name: str, + since: str, + gpu_to_billing: Dict[str, float], + ): + """Save GPU->billing mapping into database.""" + + billing = GPUBilling( + cluster_name=cluster_name, + since=since, + gpu_to_billing=gpu_to_billing, + ) + # Check if a GPU->billing mapping was already registered + # for given cluster and date. + exists = list( + self.find_by( + { + "cluster_name": billing.cluster_name, + "since": billing.since, + } + ) + ) + if exists: + # If a record was found, update it if changed. + (prev_billing,) = exists + if prev_billing.gpu_to_billing != billing.gpu_to_billing: + self.get_collection().update_one( + { + "cluster_name": billing.cluster_name, + "since": billing.since, + }, + {"$set": self.to_document(billing)}, + ) + logger.info( + f"[{billing.cluster_name}] GPU<->billing updated for: {billing.since}" + ) + else: + # If no record found, create a new one. + self.save(billing) + logger.info( + f"[{billing.cluster_name}] GPU<->billing saved for: {billing.since}" + ) + + +def _gpu_billing_collection(): + """Return the gpu_billing collection in the current MongoDB.""" + db = config().mongo.database_instance + return GPUBillingRepository(database=db) + + +def get_cluster_gpu_billings(cluster_name: str) -> List[GPUBilling]: + """Return GPU->billing mapping records for a cluster, sorted by ascending `since`.""" + return sorted( + _gpu_billing_collection().find_by({"cluster_name": cluster_name}), + key=lambda b: b.since, + ) diff --git a/sarc/config.py b/sarc/config.py index 54d84cac..a980354f 100644 --- a/sarc/config.py +++ b/sarc/config.py @@ -75,7 +75,6 @@ class ClusterConfig(BaseModel): timezone: Union[str, zoneinfo.ZoneInfo] # | does not work with Pydantic's eval prometheus_url: str = None prometheus_headers_file: str = None - nodes_info_file: str = None name: str = None sacct_bin: str = "sacct" accounts: list[str] = None @@ -127,19 +126,6 @@ def prometheus(self): ) return PrometheusConnect(url=self.prometheus_url, headers=headers) - @cached_property - def node_to_gpu(self): - """ - Return a dictionary-like mapping a cluster's node name to a GPU type. - - Parsed from self.nodes_info_file if available. - - Dict-like object will return None if queried node name is not found. - """ - from .jobs.node_gpu_mapping import NodeToGPUMapping - - return NodeToGPUMapping(self.name, self.nodes_info_file) - class MongoConfig(BaseModel): connection_string: str diff --git a/sarc/jobs/node_gpu_mapping.py b/sarc/jobs/node_gpu_mapping.py index f89989c9..6550a6c3 100644 --- a/sarc/jobs/node_gpu_mapping.py +++ b/sarc/jobs/node_gpu_mapping.py @@ -1,77 +1,108 @@ -""" -This module provides a dict-like class NodeToGPUMapping -to map a cluster's node name to GPU type -by parsing TXT files containing node descriptions like: - NodeName= Gres= ... -""" +from __future__ import annotations -import json -import os +import bisect +import logging +from datetime import datetime, time +from typing import Dict, Optional -from hostlist import expand_hostlist +from pydantic import validator +from pydantic_mongo import AbstractRepository, ObjectIdField +from sarc.config import MTL, UTC, BaseModel, config, scraping_mode_required -class NodeToGPUMapping: - """Helper class to generate JSON file, load it in memory, and query GPU type for a nodename.""" +logger = logging.getLogger(__name__) - def __init__(self, cluster_name, nodes_info_file): - """Initialize with cluster name and TXT file path to parse.""" - # Mapping is empty by default. - self.mapping = {} - self.json_path = None +class NodeGPUMapping(BaseModel): + """Holds data for a mapping -> .""" - # Mapping is filled only if TXT file is available. - if nodes_info_file and os.path.exists(nodes_info_file): - nodes_info_file = os.path.abspath(nodes_info_file) - # JSON file is expected to be located in same folder as TXT file. - self.json_path = os.path.join( - os.path.dirname(nodes_info_file), f"node_to_gpu_{cluster_name}.json" + # # Database ID + id: ObjectIdField = None + + cluster_name: str + since: datetime + node_to_gpu: Dict[str, str] + + @validator("since", pre=True) + @classmethod + def _ensure_since(cls, value): + """Parse `since` from stored string to Python datetime.""" + if isinstance(value, str): + return datetime.combine(datetime.fromisoformat(value), time.min).replace( + tzinfo=MTL ) + else: + assert isinstance(value, datetime) + return value.replace(tzinfo=UTC).astimezone(MTL) + + def __lt__(self, other): + return self.since < other.since + - info_file_stat = os.stat(nodes_info_file) - # JSON file is (re)generated if it does not yet exist - # or if it's older than TXT file. - if ( - not os.path.exists(self.json_path) - or os.stat(self.json_path).st_mtime < info_file_stat.st_mtime - ): - # Pase TXT file into self.mapping. - self._parse_nodenames(nodes_info_file, self.mapping) - # Save self.mapping into JSON file. - with open(self.json_path, "w", encoding="utf-8") as file: - json.dump(self.mapping, file, indent=1) - else: - # Otherwise, just load existing JSON file. - with open(self.json_path, encoding="utf-8") as file: - self.mapping = json.load(file) - - def __getitem__(self, nodename): - """Return GPU type for nodename, or None if not found.""" - return self.mapping.get(nodename, None) - - @staticmethod - def _parse_nodenames(path: str, output: dict): - """ - Parse node-to-GPU mapping from a path and save parsed nodes into output dict. - - Path should be a txt file containing lines like: - - NodeName= Gres= <...other key=value will be ignored> - - Other lines (e.g. blank lines or commented lines) will be ignored. - """ - with open(path, encoding="utf-8") as file: - for line in file: - # Parse only lines starting with "NodeName" - line = line.strip() - - if not line.startswith("NodeName"): - continue - - nodes_config = dict( - option.split("=", maxsplit=1) for option in line.split() +class NodeGPUMappingRepository(AbstractRepository[NodeGPUMapping]): + class Meta: + collection_name = "node_gpu_mapping" + + @scraping_mode_required + def save_node_gpu_mapping( + self, cluster_name: str, since: str, node_to_gpu: Dict[str, str] + ): + mapping = NodeGPUMapping( + cluster_name=cluster_name, since=since, node_to_gpu=node_to_gpu + ) + # Check if a node->GPU mapping was already registered + # for given cluster and date. + exists = list( + self.find_by( + { + "cluster_name": mapping.cluster_name, + "since": mapping.since, + } + ) + ) + if exists: + # If a record was found, update it if changed. + (prev_mapping,) = exists + if prev_mapping.node_to_gpu != mapping.node_to_gpu: + self.get_collection().update_one( + { + "cluster_name": mapping.cluster_name, + "since": mapping.since, + }, + {"$set": self.to_document(mapping)}, + ) + logger.info( + f"[{mapping.cluster_name}] node<->GPU updated for: {mapping.since}" ) - all_nodenames = expand_hostlist(nodes_config["NodeName"]) - gres = nodes_config["Gres"] - output.update({node_name: gres for node_name in all_nodenames}) + else: + # If no record found, create a new one. + self.save(mapping) + logger.info( + f"[{mapping.cluster_name}] node<->GPU saved for: {mapping.since}" + ) + + +def _node_gpu_mapping_collection(): + """Return the node_gpu_mapping collection in the current MongoDB.""" + db = config().mongo.database_instance + return NodeGPUMappingRepository(database=db) + + +def get_node_to_gpu( + cluster_name: str, required_date: Optional[datetime] = None +) -> Optional[NodeGPUMapping]: + mappings = sorted( + _node_gpu_mapping_collection().find_by({"cluster_name": cluster_name}), + key=lambda b: b.since, + ) + if not mappings: + return None + + if required_date is None: + return mappings[-1] + + index_mapping = max( + 0, + bisect.bisect_right([mapping.since for mapping in mappings], required_date) - 1, + ) + return mappings[index_mapping] diff --git a/sarc/jobs/sacct.py b/sarc/jobs/sacct.py index 6adbb213..b244c89a 100644 --- a/sarc/jobs/sacct.py +++ b/sarc/jobs/sacct.py @@ -10,6 +10,7 @@ from sarc.cache import with_cache from sarc.client.job import SlurmJob, _jobs_collection from sarc.config import UTC, ClusterConfig +from sarc.jobs.node_gpu_mapping import get_node_to_gpu from sarc.jobs.series import get_job_time_series from sarc.traces import trace_decorator, using_trace @@ -308,10 +309,17 @@ def update_allocated_gpu_type(cluster: ClusterConfig, entry: SlurmJob) -> Option entry.allocated.gpu_type = output[0]["metric"]["gpu_type"] else: # No prometheus config. Try to get GPU type from local JSON file. - gpu_types = {cluster.node_to_gpu[nodename] for nodename in entry.nodes} - # We should not have more than 1 GPU type per job. - assert len(gpu_types) <= 1 - if gpu_types: - entry.allocated.gpu_type = gpu_types.pop() + node_gpu_mapping = get_node_to_gpu(cluster.name, entry.start_time) + if node_gpu_mapping: + node_to_gpu = node_gpu_mapping.node_to_gpu + gpu_types = { + node_to_gpu[nodename] + for nodename in entry.nodes + if nodename in node_to_gpu + } + # We should not have more than 1 GPU type per job. + assert len(gpu_types) <= 1 + if gpu_types: + entry.allocated.gpu_type = gpu_types.pop() return entry.allocated.gpu_type diff --git a/tests/functional/cli/acquire/test_acquire_slurmconfig.py b/tests/functional/cli/acquire/test_acquire_slurmconfig.py new file mode 100644 index 00000000..85304425 --- /dev/null +++ b/tests/functional/cli/acquire/test_acquire_slurmconfig.py @@ -0,0 +1,250 @@ +import re +from datetime import datetime, time +from typing import List + +import pytest +from hostlist import expand_hostlist + +from sarc.cache import CacheException +from sarc.cli.acquire.slurmconfig import SlurmConfigParser +from sarc.client.gpumetrics import GPUBilling, get_cluster_gpu_billings +from sarc.config import MTL, config +from sarc.jobs.node_gpu_mapping import NodeGPUMapping, get_node_to_gpu + +SLURM_CONF_RAISIN_2020_01_01 = """ +NodeName=mynode[1,2,5-20,30,40-43] UselessParam=UselessValue Gres=gpu1 + +# No Gres for this node, should be totally ignored +NodeName=cpu_node[1-100] UselessParam=UselessValue + +NodeName=myothernode[1,2,5-20,30,40-43] UselessParam=UselessValue Gres=gpu2 +NodeName=myothernode2[1,2,5-20,30,50] UselessParam=UselessValue Gres=gpu2,gpu:gpu1:5 + +# Node present in a GPU partition below, but: +# - GPU `bad_named_gpu` not billed. Should generate a GPU billing warning. +# - GPU `gpu:what:2` based on GPU `what` which is not billed. Should generate a GPU billing warning. +NodeName=myothernode20 Gres=bad_named_gpu,gpu:what:2 + +# Node with a GPU, but not present in GPU partitions below. GPU should be ignored. +NodeName=myothernode[100-102] Gres=gpu3 + +NodeName=alone_node UselessParam=UselessValue Gres=gpu:gpu2:1,gpu:gpu1:9 + + +PartitionName=partition1 Nodes=mynode[1,5,6,29-41] TRESBillingWeights=x=1,GRES/gpu=5000,y=2 +PartitionName=partition2 Nodes=mynode[2,8-11,42] TRESBillingWeights=x=1,GRES/gpu:gpu1=5000,y=2 +PartitionName=partition3 Nodes=myothernode[10,20] TRESBillingWeights=GRES/gpu:gpu2=7500 + +# No gres specified, but billing for node GPUs (gpu:gpu2:1,gpu:gpu1:9) should be inferred +# thanks to GPU billings parsed in partitions above (gpu1 and gpu2). +PartitionName=partition4 Nodes=alone_node +""" + + +SLURM_CONF_RAISIN_2020_05_01 = """ +NodeName=mynode[1,2,5-20,30,40-43] UselessParam=UselessValue Gres=gpu1 + +# No Gres for this node, should be totally ignored +NodeName=cpu_node[1-100] UselessParam=UselessValue + +NodeName=myothernode[1,2,5-20,30,40-43] UselessParam=UselessValue Gres=gpu2 +NodeName=myothernode2[1,2,5-20,30,50] UselessParam=UselessValue Gres=gpu2,gpu:gpu1:5 + +# Node present in a GPU partition below, but: +# - GPU `bad_named_gpu` not billed. Should generate a GPU billing warning. +# - GPU `gpu:what:2` based on GPU `what` which is not billed. Should generate a GPU billing warning. +NodeName=myothernode20 Gres=bad_named_gpu,gpu:what:2 + +# Node with a GPU, but not present in GPU partitions below. GPU should be ignored. +NodeName=myothernode[100-102] Gres=gpu3 + +# Node retired in this config. Neither node nor node GPUs should appear in parsed data. +# NodeName=alone_node UselessParam=UselessValue Gres=gpu:gpu2:1,gpu:gpu1:9 + + +PartitionName=partition1 Nodes=mynode[1,5,6,29-41] TRESBillingWeights=x=1,GRES/gpu=4000,y=2 +PartitionName=partition2 Nodes=mynode[2,8-11,42] TRESBillingWeights=x=1,GRES/gpu:gpu1=4000,y=2 +PartitionName=partition3 Nodes=myothernode[10,20] TRESBillingWeights=GRES/gpu:gpu2=9000 + +# No gres specified, but billing for node GPUs (gpu:gpu2:1,gpu:gpu1:9) should be inferred +# thanks to GPU billings parsed in partitions above (gpu1 and gpu2). +PartitionName=partition4 Nodes=alone_node +""" + + +@pytest.mark.usefixtures("empty_read_write_db") +def test_acquire_slurmconfig(cli_main, caplog): + assert get_cluster_gpu_billings("raisin") == [] + assert get_node_to_gpu("raisin") == None + + _save_slurm_conf("raisin", "2020-01-01", SLURM_CONF_RAISIN_2020_01_01) + + with pytest.raises(CacheException): + cli_main(["acquire", "slurmconfig", "-c", "unknown_raisin", "-d", "2020-01-01"]) + + with pytest.raises(CacheException): + cli_main(["acquire", "slurmconfig", "-c", "raisin", "-d", "1999-01-01"]) + + assert ( + cli_main(["-v", "acquire", "slurmconfig", "-c", "raisin", "-d", "2020-01-01"]) + == 0 + ) + + assert re.search( + r"WARNING +sarc\.cli\.acquire\.slurmconfig:slurmconfig\.py:[0-9]+ +" + r"Cannot infer billing for GPU: bad_named_gpu", + caplog.text, + ) + assert re.search( + r"WARNING +sarc\.cli\.acquire\.slurmconfig:slurmconfig\.py:[0-9]+ +" + r"Cannot find GPU billing for GPU type what in GPU resource gpu:what:2", + caplog.text, + ) + + expected_gpu_billing_1 = GPUBilling( + cluster_name="raisin", + since="2020-01-01", + gpu_to_billing={ + "gpu1": 5000, + "gpu:gpu1:9": 9 * 5000, + "gpu2": 7500, + "gpu:gpu2:1": 1 * 7500, + }, + ) + assert_same_billings(get_cluster_gpu_billings("raisin"), [expected_gpu_billing_1]) + + expected_node_to_gpu_1 = NodeGPUMapping( + cluster_name="raisin", + since="2020-01-01", + node_to_gpu={ + **{ + node_name: "gpu1" + for node_name in expand_hostlist("mynode[1,2,5-20,30,40-43]") + }, + **{ + node_name: "gpu2" + for node_name in expand_hostlist("myothernode[1,2,5-20,30,40-43]") + }, + **{ + node_name: "gpu2,gpu:gpu1:5" + for node_name in expand_hostlist("myothernode2[1,2,5-20,30,50]") + }, + "myothernode20": "bad_named_gpu,gpu:what:2", + **{ + node_name: "gpu3" + for node_name in expand_hostlist("myothernode[100-102]") + }, + "alone_node": "gpu:gpu2:1,gpu:gpu1:9", + }, + ) + assert_same_node_gpu_mapping(get_node_to_gpu("raisin"), expected_node_to_gpu_1) + + # Save next conf file + _save_slurm_conf("raisin", "2020-05-01", SLURM_CONF_RAISIN_2020_05_01) + assert ( + cli_main(["-v", "acquire", "slurmconfig", "-c", "raisin", "-d", "2020-05-01"]) + == 0 + ) + expected_gpu_billing_2 = GPUBilling( + cluster_name="raisin", + since="2020-05-01", + gpu_to_billing={ + "gpu1": 4000, + "gpu2": 9000, + }, + ) + assert_same_billings( + get_cluster_gpu_billings("raisin"), + [expected_gpu_billing_1, expected_gpu_billing_2], + ) + + expected_node_to_gpu_2 = NodeGPUMapping( + cluster_name="raisin", + since="2020-05-01", + node_to_gpu=expected_node_to_gpu_1.node_to_gpu.copy(), + ) + del expected_node_to_gpu_2.node_to_gpu["alone_node"] + assert_same_node_gpu_mapping(get_node_to_gpu("raisin"), expected_node_to_gpu_2) + + # Check that we get the right node_to_gpu for a given date + def _parse_date(value: str): + return datetime.combine(datetime.fromisoformat(value), time.min).replace( + tzinfo=MTL + ) + + assert_same_node_gpu_mapping( + get_node_to_gpu("raisin", _parse_date("2019-12-01")), expected_node_to_gpu_1 + ) + assert_same_node_gpu_mapping( + get_node_to_gpu("raisin", _parse_date("2020-01-01")), expected_node_to_gpu_1 + ) + assert_same_node_gpu_mapping( + get_node_to_gpu("raisin", _parse_date("2020-03-07")), expected_node_to_gpu_1 + ) + assert_same_node_gpu_mapping( + get_node_to_gpu("raisin", _parse_date("2020-05-01")), expected_node_to_gpu_2 + ) + assert_same_node_gpu_mapping( + get_node_to_gpu("raisin", _parse_date("2020-05-20")), expected_node_to_gpu_2 + ) + assert_same_node_gpu_mapping( + get_node_to_gpu("raisin", _parse_date("2020-10-10")), expected_node_to_gpu_2 + ) + + +@pytest.mark.usefixtures("empty_read_write_db") +def test_acuire_slurmconfig_inconsistent_billing(cli_main, caplog): + _save_slurm_conf( + "raisin", + "2020-01-01", + """ + NodeName=mynode[1,2,5-20,30,40-43] UselessParam=UselessValue Gres=gpu1 + + PartitionName=partition1 Nodes=mynode[1,5,6,29-41] TRESBillingWeights=x=1,GRES/gpu=5000,y=2 + PartitionName=partition2 Nodes=mynode[2,8-11,42] TRESBillingWeights=x=1,GRES/gpu:gpu1=6000,y=2 + """, + ) + + with pytest.raises(CacheException): + cli_main(["acquire", "slurmconfig", "-c", "raisin", "-d", "2020-01-01"]) + + assert ( + """InconsistentGPUBillingError: +GPU billing differs. +GPU name: gpu1 +Previous value: 5000.0 +From line: 4 +PartitionName=partition1 Nodes=mynode[1,5,6,29-41] TRESBillingWeights=x=1,GRES/gpu=5000,y=2 + +New value: 6000.0 +From line: 5 +PartitionName=partition2 Nodes=mynode[2,8-11,42] TRESBillingWeights=x=1,GRES/gpu:gpu1=6000,y=2 +""" + in caplog.text + ) + + +def assert_same_billings(given: List[GPUBilling], expected: List[GPUBilling]): + assert len(given) == len(expected) + for given_billing, expected_billing in zip(given, expected): + assert given_billing.since == expected_billing.since + assert given_billing.gpu_to_billing == expected_billing.gpu_to_billing + + +def assert_same_node_gpu_mapping( + given_billing: NodeGPUMapping, expected_billing: NodeGPUMapping +): + assert given_billing.since == expected_billing.since + assert given_billing.node_to_gpu == expected_billing.node_to_gpu + + +def _save_slurm_conf(cluster_name: str, day: str, content: str): + scp = SlurmConfigParser(cluster_name, day) + folder = "slurm_conf" + filename = scp._cache_key() + cache_dir = config().cache + file_dir = cache_dir / folder + file_dir.mkdir(parents=True, exist_ok=True) + file_path = file_dir / filename + with file_path.open("w") as file: + file.write(content) diff --git a/tests/functional/cli/db/test_db_init.py b/tests/functional/cli/db/test_db_init.py index bc961943..5ca7ff85 100644 --- a/tests/functional/cli/db/test_db_init.py +++ b/tests/functional/cli/db/test_db_init.py @@ -13,12 +13,28 @@ def test_db_init(cli_main): # from sarc.jobs.job import SlurmJobRepository # from sarc.storage.diskusage import ClusterDiskUsageRepository - for collection_name in ["jobs", "allocations", "diskusage", "users", "clusters"]: + for collection_name in [ + "jobs", + "allocations", + "diskusage", + "users", + "clusters", + "gpu_billing", + "node_gpu_mapping", + ]: collection = db[collection_name] assert not collection.index_information() cli_main(["db", "init"]) - for collection_name in ["jobs", "allocations", "diskusage", "users", "clusters"]: + for collection_name in [ + "jobs", + "allocations", + "diskusage", + "users", + "clusters", + "gpu_billing", + "node_gpu_mapping", + ]: collection = db[collection_name] assert collection.index_information() diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py index 95188fc6..8effb64a 100644 --- a/tests/functional/conftest.py +++ b/tests/functional/conftest.py @@ -39,6 +39,8 @@ def clear_db(db): db.diskusage.drop() db.users.drop() db.clusters.drop() + db.gpu_billing.drop() + db.node_gpu_mapping.drop() def fill_db(db, with_users=False, with_clusters=False, job_patch=None): diff --git a/tests/functional/jobs/test_func_sacct.py b/tests/functional/jobs/test_func_sacct.py index 2f8ced7d..5e2cbfde 100644 --- a/tests/functional/jobs/test_func_sacct.py +++ b/tests/functional/jobs/test_func_sacct.py @@ -431,6 +431,27 @@ def test_get_gpu_type_without_prometheus( # Import here so that config() is setup correctly when CLI is created. import sarc.cli + # Save slurm config in cache. + _save_slurm_conf( + "raisin_no_prometheus", + "2023-02-15", + "NodeName=cn-c0[18-30] Param1=Anything1 Param2=Anything2 Gres=gpu:asupergpu:4 Param3=Anything3", + ) + # Acquire slurm config. + assert ( + cli_main( + [ + "acquire", + "slurmconfig", + "--cluster_name", + "raisin_no_prometheus", + "--day", + "2023-02-15", + ] + ) + == 0 + ) + assert ( cli_main( [ @@ -459,6 +480,21 @@ def test_get_gpu_type_without_prometheus( ) +def _save_slurm_conf(cluster_name: str, day: str, content: str): + from sarc.cli.acquire.slurmconfig import SlurmConfigParser + + scp = SlurmConfigParser(cluster_name, day) + folder = "slurm_conf" + filename = scp._cache_key() + cache_dir = config().cache + file_dir = cache_dir / folder + file_dir.mkdir(parents=True, exist_ok=True) + file_path = file_dir / filename + print(file_path) + with file_path.open("w") as file: + file.write(content) + + @pytest.mark.parametrize( "test_config", [{"clusters": {"raisin": {"host": "raisin"}}}], indirect=True ) diff --git a/tests/not-so-secrets/raisin_no_prometheus/node_to_gpu_raisin_no_prometheus.json b/tests/not-so-secrets/raisin_no_prometheus/node_to_gpu_raisin_no_prometheus.json deleted file mode 100644 index 57ace0f4..00000000 --- a/tests/not-so-secrets/raisin_no_prometheus/node_to_gpu_raisin_no_prometheus.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "cn-c018": "gpu:asupergpu:4", - "cn-c019": "gpu:asupergpu:4", - "cn-c020": "gpu:asupergpu:4", - "cn-c021": "gpu:asupergpu:4", - "cn-c022": "gpu:asupergpu:4", - "cn-c023": "gpu:asupergpu:4", - "cn-c024": "gpu:asupergpu:4", - "cn-c025": "gpu:asupergpu:4", - "cn-c026": "gpu:asupergpu:4", - "cn-c027": "gpu:asupergpu:4", - "cn-c028": "gpu:asupergpu:4", - "cn-c029": "gpu:asupergpu:4", - "cn-c030": "gpu:asupergpu:4" -} \ No newline at end of file diff --git a/tests/not-so-secrets/raisin_no_prometheus/nodes_raisin_no_prometheus.txt b/tests/not-so-secrets/raisin_no_prometheus/nodes_raisin_no_prometheus.txt deleted file mode 100644 index 77a572c9..00000000 --- a/tests/not-so-secrets/raisin_no_prometheus/nodes_raisin_no_prometheus.txt +++ /dev/null @@ -1 +0,0 @@ -NodeName=cn-c0[18-30] Param1=Anything1 Param2=Anything2 Gres=gpu:asupergpu:4 Param3=Anything3 diff --git a/tests/sarc-test.json b/tests/sarc-test.json index e6f286f2..cd64bbc4 100644 --- a/tests/sarc-test.json +++ b/tests/sarc-test.json @@ -41,8 +41,7 @@ "duc_inodes_command": null, "duc_storage_command": null, "diskusage_report_command": null, - "prometheus_url": null, - "nodes_info_file": "tests/not-so-secrets/raisin_no_prometheus/nodes_raisin_no_prometheus.txt" + "prometheus_url": null }, "fromage": { "host": "fromage",