diff --git a/DESCRIPTION b/DESCRIPTION index c078f1e..4dd2d1d 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,5 +1,5 @@ Package: parallelly -Version: 1.42.0-9002 +Version: 1.42.0-9003 Title: Enhancing the 'parallel' Package Imports: parallel, diff --git a/NEWS.md b/NEWS.md index 9549117..330b91b 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,10 +1,16 @@ # Version (development version) +## Significant Changes + + * Now `availableCores()` memoizes the values of all its components. + This means that as soon as it has been called, environment variables + such as `NSLOTS` will no longer be queried. + ## Documentation -* Add more help on the R option `parallelly.maxWorkers.localhost` - limits. Improved the warning and error messages that are produced - when these settings are exceeded. + * Add more help on the R option `parallelly.maxWorkers.localhost` + limits. Improved the warning and error messages that are produced + when these settings are exceeded. # Version 1.42.0 [2025-01-30] diff --git a/R/availableCores.R b/R/availableCores.R index 0022c04..e9622e5 100644 --- a/R/availableCores.R +++ b/R/availableCores.R @@ -438,36 +438,50 @@ availableCores <- function(constraints = NULL, methods = getOption2("parallelly. } # availableCores() -getNproc <- function(ignore = c("OMP_NUM_THREADS", "OMP_THREAD_LIMIT")) { - ## 'nproc' is limited by 'OMP_NUM_THREADS' and 'OMP_THREAD_LIMIT', if set. - ## However, that is not what we want for availableCores(). Because of - ## this, we unset those while querying 'nproc'. - if (length(ignore) > 0) { - ignore <- intersect(ignore, names(Sys.getenv())) +getNproc <- local({ + res <- NULL + + function(ignore = c("OMP_NUM_THREADS", "OMP_THREAD_LIMIT")) { + if (!is.null(res)) return(res) + + ## 'nproc' is limited by 'OMP_NUM_THREADS' and 'OMP_THREAD_LIMIT', if set. + ## However, that is not what we want for availableCores(). Because of + ## this, we unset those while querying 'nproc'. if (length(ignore) > 0) { - oignore <- Sys.getenv(ignore, names = TRUE) - oignore <- as.list(oignore) - on.exit(do.call(Sys.setenv, args = oignore), add = TRUE) - Sys.unsetenv(ignore) + ignore <- intersect(ignore, names(Sys.getenv())) + if (length(ignore) > 0) { + oignore <- Sys.getenv(ignore, names = TRUE) + oignore <- as.list(oignore) + on.exit(do.call(Sys.setenv, args = oignore), add = TRUE) + Sys.unsetenv(ignore) + } + } + + systems <- list(linux = "nproc 2>/dev/null") + os <- names(systems) + m <- pmatch(os, table = R.version$os, nomatch = NA_integer_) + m <- os[!is.na(m)] + if (length(m) == 0L) { + res <<- NA_integer_ + return(res) } - } - systems <- list(linux = "nproc 2>/dev/null") - os <- names(systems) - m <- pmatch(os, table = R.version$os, nomatch = NA_integer_) - m <- os[!is.na(m)] - if (length(m) == 0L) return(NA_integer_) - - for (cmd in systems[[m]]) { - tryCatch({ - res <- suppressWarnings(system(cmd, intern=TRUE)) - res <- gsub("(^[[:space:]]+|[[:space:]]+$)", "", res[1]) - if (grepl("^[[:digit:]]+$", res)) return(as.integer(res)) - }, error = identity) + for (cmd in systems[[m]]) { + tryCatch({ + value <- suppressWarnings(system(cmd, intern=TRUE)) + value <- gsub("(^[[:space:]]+|[[:space:]]+$)", "", value[1]) + if (grepl("^[[:digit:]]+$", value)) { + res <<- as.integer(value) + return(res) + } + }, error = identity) + } + + res <<- NA_integer_ + + res } - - NA_integer_ -} +}) checkNumberOfLocalWorkers <- function(workers) { @@ -534,103 +548,122 @@ getopt_int <- function(name, mode = "integer") { # High-Performance Compute (HPC) Schedulers # -------------------------------------------------------------------------- ## Number of slots assigned by LSF -availableCoresLSF <- function() { - n <- getenv_int("LSB_DJOB_NUMPROC") - n -} +availableCoresLSF <- local({ + n <- NULL + function() { + if (!is.null(n)) return(n) + n <<- getenv_int("LSB_DJOB_NUMPROC") + n + } +}) ## Number of cores assigned by TORQUE/PBS -availableCoresPBS <- function() { - n <- getenv_int("PBS_NUM_PPN") - if (is.na(n)) { - ## PBSPro sets 'NCPUS' but not 'PBS_NUM_PPN' - n <- getenv_int("NCPUS") +availableCoresPBS <- local({ + n <- NULL + function() { + n <<- getenv_int("PBS_NUM_PPN") + if (is.na(n)) { + ## PBSPro sets 'NCPUS' but not 'PBS_NUM_PPN' + n <<- getenv_int("NCPUS") + } + n } - n -} +}) ## Number of slots assigned by Fujitsu Technical Computing Suite ## We choose to call this job scheduler "PJM" based on the prefix ## it's environment variables use. -availableCoresPJM <- function() { - ## PJM_VNODE_CORE: e.g. pjsub -L vnode-core=8 - ## "This environment variable is set only when virtual nodes - ## are allocated, and it is not set when nodes are allocated." - n <- getenv_int("PJM_VNODE_CORE") - if (is.na(n)) { - ## PJM_PROC_BY_NODE: e.g. pjsub -L vnode-core=8 - ## "Maximum number of processes that are generated per node by - ## an MPI program. However, if a single node (node=1) or virtual - ## node (vnode=1) is allocated and the mpi option of the pjsub - ## command is not specified, this environment variable is not set." - n <- getenv_int("PJM_PROC_BY_NODE") +availableCoresPJM <- local({ + n <- NULL + function() { + ## PJM_VNODE_CORE: e.g. pjsub -L vnode-core=8 + ## "This environment variable is set only when virtual nodes + ## are allocated, and it is not set when nodes are allocated." + n <<- getenv_int("PJM_VNODE_CORE") + if (is.na(n)) { + ## PJM_PROC_BY_NODE: e.g. pjsub -L vnode-core=8 + ## "Maximum number of processes that are generated per node by + ## an MPI program. However, if a single node (node=1) or virtual + ## node (vnode=1) is allocated and the mpi option of the pjsub + ## command is not specified, this environment variable is not set." + n <<- getenv_int("PJM_PROC_BY_NODE") + } + n } - n -} +}) ## Number of cores assigned by Oracle/Son/Sun/Univa Grid Engine (SGE/UGE) -availableCoresSGE <- function() { - n <- getenv_int("NSLOTS") - n -} +availableCoresSGE <- local({ + n <- NULL + function() { + n <<- getenv_int("NSLOTS") + n + } +}) ## Number of cores assigned by Slurm -availableCoresSlurm <- function() { - ## The assumption is that the following works regardless of - ## number of nodes requested /HB 2020-09-18 - ## Example: --cpus-per-task={n} - n <- getenv_int("SLURM_CPUS_PER_TASK") - if (is.na(n)) { - ## Example: --nodes={nnodes} (defaults to 1, short: -N {nnodes}) - ## From 'man sbatch': - ## SLURM_JOB_NUM_NODES (and SLURM_NNODES for backwards compatibility) - ## Total number of nodes in the job's resource allocation. - nnodes <- getenv_int("SLURM_JOB_NUM_NODES") - if (is.na(nnodes)) nnodes <- getenv_int("SLURM_NNODES") - if (is.na(nnodes)) nnodes <- 1L ## Can this happen? /HB 2020-09-18 - - if (nnodes == 1L) { - ## Example: --nodes=1 --ntasks={n} (short: -n {n}) - ## IMPORTANT: 'SLURM_CPUS_ON_NODE' appears to be rounded up when nodes > 1. - ## Example 1: With --nodes=2 --cpus-per-task=3 we see SLURM_CPUS_ON_NODE=4 - ## although SLURM_CPUS_PER_TASK=3. - ## Example 2: With --nodes=2 --ntasks=7, we see SLURM_CPUS_ON_NODE=6, - ## SLURM_JOB_CPUS_PER_NODE=6,2, no SLURM_CPUS_PER_TASK, and - ## SLURM_TASKS_PER_NODE=5,2. - ## Conclusions: We can only use 'SLURM_CPUS_ON_NODE' for nnodes = 1. - n <- getenv_int("SLURM_CPUS_ON_NODE") - } else { - ## Parse `SLURM_TASKS_PER_NODE` - nodecounts <- getenv_int("SLURM_TASKS_PER_NODE", mode = "character") - if (!is.na(nodecounts)) { - ## Examples: - ## SLURM_TASKS_PER_NODE=5,2 - ## SLURM_TASKS_PER_NODE=2(x2),1(x3) # Source: 'man sbatch' - n <- slurm_expand_nodecounts(nodecounts) - if (anyNA(n)) return(NA_real_) - - ## ASSUMPTION: We assume that it is the first component on the list that - ## corresponds to the current machine. /HB 2021-03-05 - n <- n[1] +availableCoresSlurm <- local({ + n <- NULL + function() { + ## The assumption is that the following works regardless of + ## number of nodes requested /HB 2020-09-18 + ## Example: --cpus-per-task={n} + n <<- getenv_int("SLURM_CPUS_PER_TASK") + if (is.na(n)) { + ## Example: --nodes={nnodes} (defaults to 1, short: -N {nnodes}) + ## From 'man sbatch': + ## SLURM_JOB_NUM_NODES (and SLURM_NNODES for backwards compatibility) + ## Total number of nodes in the job's resource allocation. + nnodes <- getenv_int("SLURM_JOB_NUM_NODES") + if (is.na(nnodes)) nnodes <- getenv_int("SLURM_NNODES") + if (is.na(nnodes)) nnodes <- 1L ## Can this happen? /HB 2020-09-18 + + if (nnodes == 1L) { + ## Example: --nodes=1 --ntasks={n} (short: -n {n}) + ## IMPORTANT: 'SLURM_CPUS_ON_NODE' appears to be rounded up when nodes > 1. + ## Example 1: With --nodes=2 --cpus-per-task=3 we see SLURM_CPUS_ON_NODE=4 + ## although SLURM_CPUS_PER_TASK=3. + ## Example 2: With --nodes=2 --ntasks=7, we see SLURM_CPUS_ON_NODE=6, + ## SLURM_JOB_CPUS_PER_NODE=6,2, no SLURM_CPUS_PER_TASK, and + ## SLURM_TASKS_PER_NODE=5,2. + ## Conclusions: We can only use 'SLURM_CPUS_ON_NODE' for nnodes = 1. + n <<- getenv_int("SLURM_CPUS_ON_NODE") + } else { + ## Parse `SLURM_TASKS_PER_NODE` + nodecounts <- getenv_int("SLURM_TASKS_PER_NODE", mode = "character") + if (!is.na(nodecounts)) { + ## Examples: + ## SLURM_TASKS_PER_NODE=5,2 + ## SLURM_TASKS_PER_NODE=2(x2),1(x3) # Source: 'man sbatch' + n <<- slurm_expand_nodecounts(nodecounts) + if (anyNA(n)) { + n <<- NA_real_ + return(n) + } + + ## ASSUMPTION: We assume that it is the first component on the list that + ## corresponds to the current machine. /HB 2021-03-05 + n <<- n[1] + } } } - } - - ## TODO?: Can we validate above assumptions/results? /HB 2020-09-18 - if (FALSE && !is.na(n)) { - ## Is any of the following useful? + + ## TODO?: Can we validate above assumptions/results? /HB 2020-09-18 + if (FALSE && !is.na(n)) { + ## Is any of the following useful? + + ## Example: --ntasks={ntasks} (no default, short: -n {ntasks}) + ## From 'man sbatch': + ## SLURM_NTASKS (and SLURM_NPROCS for backwards compatibility) + ## Same as -n, --ntasks + ntasks <- getenv_int("SLURM_NTASKS") + if (is.na(ntasks)) ntasks <- getenv_int("SLURM_NPROCS") + } - ## Example: --ntasks={ntasks} (no default, short: -n {ntasks}) - ## From 'man sbatch': - ## SLURM_NTASKS (and SLURM_NPROCS for backwards compatibility) - ## Same as -n, --ntasks - ntasks <- getenv_int("SLURM_NTASKS") - if (is.na(ntasks)) ntasks <- getenv_int("SLURM_NPROCS") + n } - - n -} ## availableCoresSlurm() +}) ## availableCoresSlurm() diff --git a/R/cgroups.R b/R/cgroups.R index 41aad13..da6c447 100644 --- a/R/cgroups.R +++ b/R/cgroups.R @@ -250,6 +250,20 @@ withCGroups <- function(tarball, expr = NULL, envir = parent.frame(), tmpdir = N uid <- as.integer(uid) message(sprintf(" - UID: %d", uid)) + ## Clear all memoization caches + fcns <- list( + getCGroupsMounts, getCGroups, getCGroupsVersion, + getCGroups1CpuSet, getCGroups1CpuPeriodMicroseconds, getCGroups1CpuQuota, + getCGroups2CpuMax + ) + for (fcn in fcns) { + environment(fcn)$.cache <- NULL + } + fcns <- list(getCGroupsRoot, getCGroupsPath, getCGroupsValue) + for (fcn in fcns) { + environment(fcn)$.cache <- list() + } + ## Adjust /proc accordingly old_procPath <- procPath(file.path(tmpdir, "proc")) on.exit(procPath(old_procPath), add = TRUE) @@ -417,10 +431,10 @@ getCGroupsMounts <- local({ # #' @importFrom utils file_test getCGroups <- local({ - .data <- NULL + .cache <- NULL function() { - data <- .data + data <- .cache if (!is.null(data)) return(data) ## Get cgroups @@ -429,7 +443,7 @@ getCGroups <- local({ ## cgroups is not set? if (!file_test("-f", file)) { data <- data.frame(hierarchy_id = integer(0L), controller = character(0L), path = character(0L), stringsAsFactors = FALSE) - .data <<- data + .cache <<- data return(data) } @@ -461,7 +475,7 @@ getCGroups <- local({ ## Order by hierarchy ID data <- data[order(data$hierarchy_id), ] - .data <<- data + .cache <<- data data } @@ -476,39 +490,53 @@ getCGroups <- local({ # If no folder could be found, `NA_character_` is returned. # #' @importFrom utils file_test -getCGroupsPath <- function(controller) { - root <- getCGroupsRoot(controller = controller) - if (is.na(root)) return(NA_character_) - - data <- getCGroups() - - set <- data[data$controller == controller, ] - if (nrow(set) == 0L) { - return(NA_character_) - } - - set <- set$path - path <- file.path(root, set) - while (set != "/") { - if (file_test("-d", path)) { - break +getCGroupsPath <- local({ + .cache <- list() + + function(controller) { + res <- .cache[[controller]] + if (!is.null(res)) return(res) + + root <- getCGroupsRoot(controller = controller) + if (is.na(root)) { + res <- NA_character_ + .cache[[controller]] <<- res + return(res) } - ## Should this ever happen? - set_prev <- set - set <- dirname(set) - if (set == set_prev) break - path <- file.path(root, set) - } - - ## Should the following ever happen? - if (!file_test("-d", path)) { - return(NA_character_) - } - path <- normalizePath(path, mustWork = FALSE) + data <- getCGroups() - path -} + set <- data[data$controller == controller, ] + if (nrow(set) == 0L) { + res <- NA_character_ + .cache[[controller]] <<- res + return(res) + } + + set <- set$path + path <- file.path(root, set) + while (set != "/") { + if (file_test("-d", path)) { + break + } + ## Should this ever happen? + set_prev <- set + set <- dirname(set) + if (set == set_prev) break + path <- file.path(root, set) + } + + ## Should the following ever happen? + if (!file_test("-d", path)) { + res <- NA_character_ + .cache[[controller]] <<- res + } + + res <- normalizePath(path, mustWork = FALSE) + .cache[[controller]] <<- res + res + } +}) # Get the value of specific cgroups controller and field @@ -522,25 +550,45 @@ getCGroupsPath <- function(controller) { # NA_character_ is returned. # #' @importFrom utils file_test -getCGroupsValue <- function(controller, field) { - path <- getCGroupsPath(controller = controller) - if (is.na(path)) return(NA_character_) - - path_prev <- "" - while (path != path_prev) { - file <- file.path(path, field) - if (file_test("-f", file)) { - value <- readLines(file, warn = FALSE) - if (length(value) == 0L) value <- NA_character_ - attr(value, "path") <- path - return(value) +getCGroupsValue <- local({ + .cache <- list() + + function(controller, field) { + cache_controller <- .cache[[controller]] + if (!is.null(cache_controller)) { + res <- cache_controller[[field]] + if (!is.null(res)) return(res) + } + + path <- getCGroupsPath(controller = controller) + if (is.na(path)) { + res <- NA_character_ + cache_controller[[field]] <- res + .cache[[controller]] <<- cache_controller + return(res) } - path_prev <- path - path <- dirname(path) - } - NA_character_ -} + path_prev <- "" + while (path != path_prev) { + file <- file.path(path, field) + if (file_test("-f", file)) { + value <- readLines(file, warn = FALSE) + if (length(value) == 0L) value <- NA_character_ + attr(value, "path") <- path + cache_controller[[field]] <- value + .cache[[controller]] <<- cache_controller + return(value) + } + path_prev <- path + path <- dirname(path) + } + + res <- NA_character_ + cache_controller[[field]] <- res + .cache[[controller]] <<- cache_controller + res + } +}) # Get the value of specific cgroups v1 field @@ -575,12 +623,28 @@ getCGroups2Value <- function(field) { # If it is under cgroups v2, then `2L` is returned. # If not under cgroups control, then `-1L` is returned. # -getCGroupsVersion <- function() { - cgroups <- getCGroups() - if (nrow(cgroups) == 0) return(-1L) - if (nrow(cgroups) == 1 && cgroups$controller == "") return(2L) - 1L -} +getCGroupsVersion <- local({ + .cache <- NULL + + function() { + res <- .cache + + if (!is.null(res)) return(res) + + cgroups <- getCGroups() + if (nrow(cgroups) == 0) { + res <- -1L + } else if (nrow(cgroups) == 1 && cgroups$controller == "") { + res <- 2L + } else { + res <- 1L + } + + .cache <<- res + + res + } +}) @@ -598,55 +662,65 @@ getCGroupsVersion <- function() { # # [1] https://www.kernel.org/doc/Documentation/cgroup-v1/cpusets.txt # -getCGroups1CpuSet <- function() { - ## TEMPORARY: In case the cgroups options causes problems, make - ## it possible to override their values via hidden options - cpuset <- get_package_option("cgroups.cpuset", NULL) - - if (!is.null(cpuset)) return(cpuset) - - ## e.g. /sys/fs/cgroup/cpuset/cpuset.cpus - value0 <- getCGroups1Value("cpuset", "cpuset.cpus") - if (is.na(value0)) { - return(integer(0L)) - } +getCGroups1CpuSet <- local({ + .cache <- NULL - ## Parse 0-63; 0-7,9; 0-7,10-12; etc. - code <- gsub("-", ":", value0, fixed = TRUE) - code <- sprintf("c(%s)", code) - expr <- tryCatch({ - parse(text = code) - }, error = function(ex) { - warning(sprintf("Syntax error parsing %s: %s", sQuote(file), sQuote(value0))) - integer(0L) - }) - - value <- tryCatch({ - suppressWarnings(as.integer(eval(expr))) - }, error = function(ex) { - warning(sprintf("Failed to parse %s: %s", sQuote(file), sQuote(value0))) - integer(0L) - }) + function() { + res <- .cache + if (!is.null(res)) return(res) + + ## TEMPORARY: In case the cgroups options causes problems, make + ## it possible to override their values via hidden options + cpuset <- get_package_option("cgroups.cpuset", NULL) + if (!is.null(cpuset)) return(cpuset) + + ## e.g. /sys/fs/cgroup/cpuset/cpuset.cpus + value0 <- getCGroups1Value("cpuset", "cpuset.cpus") + if (is.na(value0)) { + res <- integer(0L) + .cache <<- res + return(res) + } + + ## Parse 0-63; 0-7,9; 0-7,10-12; etc. + code <- gsub("-", ":", value0, fixed = TRUE) + code <- sprintf("c(%s)", code) + expr <- tryCatch({ + parse(text = code) + }, error = function(ex) { + warning(sprintf("Syntax error parsing %s: %s", sQuote(file), sQuote(value0))) + integer(0L) + }) + + value <- tryCatch({ + suppressWarnings(as.integer(eval(expr))) + }, error = function(ex) { + warning(sprintf("Failed to parse %s: %s", sQuote(file), sQuote(value0))) + integer(0L) + }) + + ## Sanity checks + max_cores <- maxCores() + if (any(value < 0L | value >= max_cores)) { + warning(sprintf("[INTERNAL]: Will ignore the cgroups CPU set, because it contains one or more CPU indices that is out of range [0,%d]: %s", max_cores - 1L, value0)) + value <- integer(0L) + } + + if (any(duplicated(value))) { + warning(sprintf("[INTERNAL]: Detected and dropped duplicated CPU indices in the cgroups CPU set: %s", value0)) + value <- unique(value) + } + + cpuset <- value + + ## Should never happen, but just in case + stop_if_not(length(cpuset) <= max_cores) - ## Sanity checks - max_cores <- maxCores() - if (any(value < 0L | value >= max_cores)) { - warning(sprintf("[INTERNAL]: Will ignore the cgroups CPU set, because it contains one or more CPU indices that is out of range [0,%d]: %s", max_cores - 1L, value0)) - value <- integer(0L) - } + .cache <<- cpuset - if (any(duplicated(value))) { - warning(sprintf("[INTERNAL]: Detected and dropped duplicated CPU indices in the cgroups CPU set: %s", value0)) - value <- unique(value) + cpuset } - - cpuset <- value - - ## Should never happen, but just in case - stop_if_not(length(cpuset) <= max_cores) - - cpuset -} +}) # @@ -681,46 +755,60 @@ getCGroups1CpuQuotaMicroseconds <- function() { } -getCGroups1CpuPeriodMicroseconds <- function() { - value <- suppressWarnings({ - ## e.g. /sys/fs/cgroup/cpu/cpu.cfs_period_us - as.integer(getCGroups1Value("cpu", "cpu.cfs_period_us")) - }) +getCGroups1CpuPeriodMicroseconds <- local({ + .cache <- NULL + + function() { + res <- .cache + if (!is.null(res)) return(res) - value -} + value <- suppressWarnings({ + ## e.g. /sys/fs/cgroup/cpu/cpu.cfs_period_us + as.integer(getCGroups1Value("cpu", "cpu.cfs_period_us")) + }) + + .cache <<- value + value + } +}) # @return A non-negative numeric. # If cgroups is not in use, or could not be queried, NA_real_ is returned. # -getCGroups1CpuQuota <- function() { - ## TEMPORARY: In case the cgroups options causes problems, make - ## it possible to override their values via hidden options - quota <- get_package_option("cgroups.cpuquota", NULL) +getCGroups1CpuQuota <- local({ + .cache <- NULL - if (!is.null(quota)) return(quota) + function() { + res <- .cache + if (!is.null(res)) return(res) + + ## TEMPORARY: In case the cgroups options causes problems, make + ## it possible to override their values via hidden options + quota <- get_package_option("cgroups.cpuquota", NULL) + if (!is.null(quota)) return(quota) - ms <- getCGroups1CpuQuotaMicroseconds() - if (!is.na(ms) && ms < 0) ms <- NA_integer_ - - total <- getCGroups1CpuPeriodMicroseconds() - if (!is.na(total) && total < 0) total <- NA_integer_ + ms <- getCGroups1CpuQuotaMicroseconds() + if (!is.na(ms) && ms < 0) ms <- NA_integer_ + + total <- getCGroups1CpuPeriodMicroseconds() + if (!is.na(total) && total < 0) total <- NA_integer_ + + value <- ms / total - value <- ms / total - - if (!is.na(value)) { - max_cores <- maxCores() - if (!is.finite(value) || value <= 0.0 || value > max_cores) { - warning(sprintf("[INTERNAL]: Will ignore the cgroups CPU quota, because it is out of range [1,%d]: %s", max_cores, value)) - value <- NA_real_ + if (!is.na(value)) { + max_cores <- maxCores() + if (!is.finite(value) || value <= 0.0 || value > max_cores) { + warning(sprintf("[INTERNAL]: Will ignore the cgroups CPU quota, because it is out of range [1,%d]: %s", max_cores, value)) + value <- NA_real_ + } } - } - - quota <- value - quota -} + .cache <<- value + + value + } +}) # -------------------------------------------------------------------------- @@ -745,48 +833,58 @@ getCGroups1CpuQuota <- function() { # # [1] https://docs.kernel.org/admin-guide/cgroup-v2.html # -getCGroups2CpuMax <- function() { - ## TEMPORARY: In case the cgroups options causes problems, make - ## it possible to override their values via hidden options - quota <- get_package_option("cgroups2.cpu.max", NULL) +getCGroups2CpuMax <- local({ + .cache <- NULL - if (!is.null(quota)) return(quota) - - raw <- suppressWarnings({ - ## e.g. /sys/fs/cgroup/cpu.max - getCGroups2Value("cpu.max") - }) - - if (is.na(raw)) { - return(NA_real_) - } + function() { + res <- .cache + if (!is.null(res)) return(res) + + ## TEMPORARY: In case the cgroups options causes problems, make + ## it possible to override their values via hidden options + quota <- get_package_option("cgroups2.cpu.max", NULL) + if (!is.null(quota)) return(quota) - values <- strsplit(raw, split = "[[:space:]]+")[[1]] - if (length(values) != 2L) { - return(NA_real_) - } - - period <- as.integer(values[2]) - if (is.na(period) && period <= 0L) { - return(NA_real_) - } + raw <- suppressWarnings({ + ## e.g. /sys/fs/cgroup/cpu.max + getCGroups2Value("cpu.max") + }) - max <- values[1] - if (max == "max") { - return(NA_real_) - } + if (is.na(raw)) { + .cache <<- NA_real_ + return(.cache) + } + + values <- strsplit(raw, split = "[[:space:]]+")[[1]] + if (length(values) != 2L) { + .cache <<- NA_real_ + return(.cache) + } - max <- as.integer(max) - value <- max / period - if (!is.na(value)) { - max_cores <- maxCores() - if (!is.finite(value) || value <= 0.0 || value > max_cores) { - warning(sprintf("[INTERNAL]: Will ignore the cgroups v2 CPU quota, because it is out of range [1,%d]: %s", max_cores, value)) - value <- NA_real_ + period <- as.integer(values[2]) + if (is.na(period) && period <= 0L) { + .cache <<- NA_real_ + return(.cache) + } + + max <- values[1] + if (max == "max") { + .cache <<- NA_real_ + return(.cache) + } + + max <- as.integer(max) + value <- max / period + if (!is.na(value)) { + max_cores <- maxCores() + if (!is.finite(value) || value <= 0.0 || value > max_cores) { + warning(sprintf("[INTERNAL]: Will ignore the cgroups v2 CPU quota, because it is out of range [1,%d]: %s", max_cores, value)) + value <- NA_real_ + } } - } - - quota <- value - quota -} + .cache <<- value + + value + } +}) diff --git a/R/detectCores.R b/R/detectCores.R index 46a39c4..0b9feea 100644 --- a/R/detectCores.R +++ b/R/detectCores.R @@ -1,9 +1,12 @@ detectCores <- local({ - cache <- list() + cache <- list(NULL, NULL) function(logical = TRUE) { + key <- as.integer(logical) + 1L + value <- cache[[key]] + if (!is.null(value)) return(value) + stop_if_not(is.logical(logical), length(logical) == 1L, !is.na(logical)) - key <- paste("logical=", logical, sep = "") ## Get number of system cores from option, cache, and finally ## detectCores(). This is also designed such that it is indeed @@ -14,20 +17,20 @@ detectCores <- local({ return(value) } - value <- cache[[key]] - if (is.null(value)) { - value <- parallel::detectCores(logical = logical) + value <- parallel::detectCores(logical = logical) - ## If unknown, set default to 1L - if (is.na(value)) value <- 1L + ## If unknown, set default to 1L + if (is.na(value)) { + value <- 1L + } else { value <- as.integer(value) + } - ## Assert positive integer - stop_if_not(length(value) == 1L, is.numeric(value), + ## Assert positive integer + stop_if_not(length(value) == 1L, is.numeric(value), is.finite(value), value >= 1L) - cache[[key]] <<- value - } + cache[[key]] <<- value value } diff --git a/tests/availableCores.R b/tests/availableCores.R index c09c1df..2468731 100644 --- a/tests/availableCores.R +++ b/tests/availableCores.R @@ -48,6 +48,8 @@ ncores0 <- 42L message("*** LSF ...") message(" - LSB_DJOB_NUMPROC") Sys.setenv(LSB_DJOB_NUMPROC = as.character(ncores0)) +env <- environment(parallelly:::availableCoresLSF) +env$n <- NULL ncores <- availableCores(methods = "LSF") print(ncores) stopifnot(ncores == ncores0) @@ -56,6 +58,8 @@ message("*** LSF ... done") message("*** PJM (Fujitsu Technical Computing Suite) ...") message(" - PJM_VNODE_CORE") Sys.setenv(PJM_VNODE_CORE = as.character(ncores0)) +env <- environment(parallelly:::availableCoresPJM) +env$n <- NULL ncores <- availableCores(methods = "PJM") print(ncores) stopifnot(ncores == ncores0) @@ -63,6 +67,8 @@ Sys.unsetenv("PJM_VNODE_CORE") message(" - PJM_PROC_BY_NODE") Sys.setenv(PJM_PROC_BY_NODE = as.character(ncores0)) +env <- environment(parallelly:::availableCoresPJM) +env$n <- NULL ncores <- availableCores(methods = "PJM") print(ncores) stopifnot(ncores == ncores0) @@ -76,7 +82,7 @@ message("*** Internal detectCores() ...") ## Reset internal cache env <- environment(parallelly:::detectCores) -env$cache <- list() +env$cache <- list(NULL, NULL) options(parallelly.availableCores.system = 2L) n <- detectCores() @@ -86,7 +92,7 @@ options(parallelly.availableCores.system = NULL) ## Reset env <- environment(parallelly:::detectCores) -env$cache <- list() +env$cache <- list(NULL, NULL) n <- detectCores() print(n) diff --git a/tests/cgroups.R b/tests/cgroups.R index 1b51b62..a0b4c6e 100644 --- a/tests/cgroups.R +++ b/tests/cgroups.R @@ -101,10 +101,10 @@ for (dir in c("no-cgroups", "mixed-cgroups", "cgroups1", "cgroups2")) { for (name in names(tarballs)) { parallelly:::withCGroups(tarballs[name], { - file <- file.path(path, sprintf("%s.R", name)) - if (file_test("-f", file)) { - source(file, local = FALSE) - } + file <- file.path(path, sprintf("%s.R", name)) + if (file_test("-f", file)) { + source(file, local = FALSE) + } }) } message(sprintf("%s - real-world ... done", dir))