diff --git a/src/DIRAC/Core/Utilities/Platform.py b/src/DIRAC/Core/Utilities/Platform.py index 1a42899c777..f68af6b70e9 100644 --- a/src/DIRAC/Core/Utilities/Platform.py +++ b/src/DIRAC/Core/Utilities/Platform.py @@ -5,6 +5,8 @@ import sys import os import re +import subprocess +from pathlib import Path # We need to patch python platform module. It does a string comparison for the libc versions. # it fails when going from 2.9 to 2.10, @@ -144,3 +146,19 @@ def getPlatform(): def getPlatformTuple(): getPlatform() return _gPlatformTuple + + +def availableCgroupsV2Controllers() -> set[str]: + """Get the list of available cgroup2 controllers.""" + controllers = set() + + cmd = ["findmnt", "--source", "cgroup2", "--output", "target", "--noheadings"] + proc = subprocess.run(cmd, check=False, capture_output=True, text=True) + if proc.returncode == 0: + for target in proc.stdout.strip().split("\n"): + subtree_control_path = Path(target) / "cgroup.subtree_control" + if subtree_control_path.is_file(): + subtree_control_info = subtree_control_path.read_text().strip() + controllers.update(subtree_control_info.split(" ")) + + return controllers diff --git a/src/DIRAC/Resources/Computing/SingularityComputingElement.py b/src/DIRAC/Resources/Computing/SingularityComputingElement.py index f0e153d11b4..c01999db6c8 100644 --- a/src/DIRAC/Resources/Computing/SingularityComputingElement.py +++ b/src/DIRAC/Resources/Computing/SingularityComputingElement.py @@ -17,11 +17,13 @@ import os import re import shutil +import stat import sys import tempfile import DIRAC from DIRAC import S_OK, S_ERROR, gConfig, gLogger +from DIRAC.Core.Utilities.Platform import availableCgroupsV2Controllers from DIRAC.Core.Utilities.Subprocess import systemCall from DIRAC.ConfigurationSystem.Client.Helpers import Operations from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler @@ -97,6 +99,7 @@ r"BEARER_TOKEN.*", ] ENV_VAR_WHITELIST = re.compile(r"^(" + r"|".join(ENV_VAR_WHITELIST) + r")$") +CGROUPV2_REQUIRED_VARS = ["XDG_RUNTIME_DIR", "DBUS_SESSION_BUS_ADDRESS"] class SingularityComputingElement(ComputingElement): @@ -163,6 +166,36 @@ def __hasSingularity(self): # No suitable binaries found return False + def _resourceLimitsArgs(self): + """Get singularity arguments for enforcing resource limits with cgroup2. + + If the associated cgroup2 controllers are not available, the corresponding + options are ignored. + """ + controllers = availableCgroupsV2Controllers() + self.log.debug(f"Available cgroup2 controllers: {controllers}") + + args = [] + + env = self.__getEnv() + for var_name in CGROUPV2_REQUIRED_VARS: + if var_name not in env: + self.log.warn(f"{var_name} is required for cgroup2 but is not set, disabling...") + return args + + if "memory" in controllers: + memoryLimit = int(self.ceParameters.get("MemoryLimitMB", 0)) + if memoryLimit: + args.extend(["--memory", f"{memoryLimit}M"]) + memoryRes = int(self.ceParameters.get("MemoryReservationMB", memoryLimit * 4 // 5)) + args.extend(["--memory-reservation", f"{memoryRes}M"]) + swapLimit = int(self.ceParameters.get("SwapLimitMB", 0)) + if swapLimit: + args.extend(["--memory-swap", f"{swapLimit}M"]) + if "cpu" in controllers and self.ceParameters.get("EnforceCPULimit", "no") in ("yes", "true"): + args.extend(["--cpus", str(self.processors)]) + return args + @staticmethod def __findInstallBaseDir(): """Find the path to root of the current DIRAC installation""" @@ -336,6 +369,16 @@ def __getEnv(self): payloadEnv["X509_USER_PROXY"] = os.path.join(self.__innerdir, "proxy") payloadEnv["DIRACSYSCONFIG"] = os.path.join(self.__innerdir, "pilot.cfg") + xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", f"/run/user/{os.getuid()}") + dbus_session_address = os.environ.get("DBUS_SESSION_BUS_ADDRESS", f"unix:path={xdg_runtime_dir}/bus") + if dbus_session_address.startswith("unix:path="): + path = dbus_session_address.split("=", 1)[1] + if not os.path.exists(path) or not stat.S_ISSOCK(os.stat(path).st_mode): + dbus_session_address = None + if dbus_session_address: + payloadEnv["XDG_RUNTIME_DIR"] = xdg_runtime_dir + payloadEnv["DBUS_SESSION_BUS_ADDRESS"] = dbus_session_address + return payloadEnv @staticmethod @@ -407,6 +450,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs): cmd.extend(["--bind", "/cvmfs"]) if not self.__installDIRACInContainer: cmd.extend(["--bind", "{0}:{0}:ro".format(self.__findInstallBaseDir())]) + cmd.extend(self._resourceLimitsArgs()) bindPaths = self.ceParameters.get("ContainerBind", "").split(",") siteName = gConfig.getValue("/LocalSite/Site", "")