Skip to content

Commit

Permalink
feat: If possible, use cgroups v2 to enforce resource limits in Singu…
Browse files Browse the repository at this point in the history
…larityCE
  • Loading branch information
chrisburr committed Jul 12, 2024
1 parent 71def4b commit 2d9a677
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
18 changes: 18 additions & 0 deletions src/DIRAC/Core/Utilities/Platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import sys
import os
import re
import subprocess
from pathlib import Path

# We need to patch python platform module. It does a string comparison for the libc versions.
# it fails when going from 2.9 to 2.10,
Expand Down Expand Up @@ -144,3 +146,19 @@ def getPlatform():
def getPlatformTuple():
getPlatform()
return _gPlatformTuple


def availableCgroupsV2Controllers() -> set[str]:
"""Get the list of available cgroup2 controllers."""
controllers = set()

cmd = ["findmnt", "--source", "cgroup2", "--output", "target", "--noheadings"]
proc = subprocess.run(cmd, check=False, capture_output=True, text=True)
if proc.returncode == 0:
for target in proc.stdout.strip().split("\n"):
subtree_control_path = Path(target) / "cgroup.subtree_control"
if subtree_control_path.is_file():
subtree_control_info = subtree_control_path.read_text().strip()
controllers.update(subtree_control_info.split(" "))

return controllers
44 changes: 44 additions & 0 deletions src/DIRAC/Resources/Computing/SingularityComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import os
import re
import shutil
import stat
import sys
import tempfile

import DIRAC
from DIRAC import S_OK, S_ERROR, gConfig, gLogger
from DIRAC.Core.Utilities.Platform import availableCgroupsV2Controllers
from DIRAC.Core.Utilities.Subprocess import systemCall
from DIRAC.ConfigurationSystem.Client.Helpers import Operations
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
Expand Down Expand Up @@ -97,6 +99,7 @@
r"BEARER_TOKEN.*",
]
ENV_VAR_WHITELIST = re.compile(r"^(" + r"|".join(ENV_VAR_WHITELIST) + r")$")
CGROUPV2_REQUIRED_VARS = ["XDG_RUNTIME_DIR", "DBUS_SESSION_BUS_ADDRESS"]


class SingularityComputingElement(ComputingElement):
Expand Down Expand Up @@ -163,6 +166,36 @@ def __hasSingularity(self):
# No suitable binaries found
return False

def _resourceLimitsArgs(self):
"""Get singularity arguments for enforcing resource limits with cgroup2.
If the associated cgroup2 controllers are not available, the corresponding
options are ignored.
"""
controllers = availableCgroupsV2Controllers()
self.log.debug(f"Available cgroup2 controllers: {controllers}")

args = []

env = self.__getEnv()
for var_name in CGROUPV2_REQUIRED_VARS:
if var_name not in env:
self.log.warn(f"{var_name} is required for cgroup2 but is not set, disabling...")
return args

if "memory" in controllers:
memoryLimit = int(self.ceParameters.get("MemoryLimitMB", 0))
if memoryLimit:
args.extend(["--memory", f"{memoryLimit}M"])
memoryRes = int(self.ceParameters.get("MemoryReservationMB", memoryLimit * 4 // 5))
args.extend(["--memory-reservation", f"{memoryRes}M"])
swapLimit = int(self.ceParameters.get("SwapLimitMB", 0))
if swapLimit:
args.extend(["--memory-swap", f"{swapLimit}M"])
if "cpu" in controllers and self.ceParameters.get("EnforceCPULimit", "no") in ("yes", "true"):
args.extend(["--cpus", str(self.processors)])
return args

@staticmethod
def __findInstallBaseDir():
"""Find the path to root of the current DIRAC installation"""
Expand Down Expand Up @@ -336,6 +369,16 @@ def __getEnv(self):
payloadEnv["X509_USER_PROXY"] = os.path.join(self.__innerdir, "proxy")
payloadEnv["DIRACSYSCONFIG"] = os.path.join(self.__innerdir, "pilot.cfg")

xdg_runtime_dir = os.environ.get("XDG_RUNTIME_DIR", f"/run/user/{os.getuid()}")
dbus_session_address = os.environ.get("DBUS_SESSION_BUS_ADDRESS", f"unix:path={xdg_runtime_dir}/bus")
if dbus_session_address.startswith("unix:path="):
path = dbus_session_address.split("=", 1)[1]
if not os.path.exists(path) or not stat.S_ISSOCK(os.stat(path).st_mode):
dbus_session_address = None
if dbus_session_address:
payloadEnv["XDG_RUNTIME_DIR"] = xdg_runtime_dir
payloadEnv["DBUS_SESSION_BUS_ADDRESS"] = dbus_session_address

return payloadEnv

@staticmethod
Expand Down Expand Up @@ -407,6 +450,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
cmd.extend(["--bind", "/cvmfs"])
if not self.__installDIRACInContainer:
cmd.extend(["--bind", "{0}:{0}:ro".format(self.__findInstallBaseDir())])
cmd.extend(self._resourceLimitsArgs())

bindPaths = self.ceParameters.get("ContainerBind", "").split(",")
siteName = gConfig.getValue("/LocalSite/Site", "")
Expand Down

0 comments on commit 2d9a677

Please sign in to comment.