Skip to content

Commit de1ecf0

Browse files
committed
Use env var to determine number of workers
1 parent efe6ba9 commit de1ecf0

File tree

1 file changed

+36
-2
lines changed

1 file changed

+36
-2
lines changed

python/lsst/resources/_resourcePath.py

+36-2
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
import io
2020
import locale
2121
import logging
22+
import multiprocessing
2223
import os
2324
import posixpath
2425
import re
2526
import shutil
2627
import tempfile
2728
import urllib.parse
29+
from functools import cache
2830
from pathlib import Path, PurePath, PurePosixPath
2931
from random import Random
3032

@@ -59,6 +61,38 @@
5961
MAX_WORKERS = 10
6062

6163

64+
def _get_int_env_var(env_var: str) -> int | None:
65+
int_value = None
66+
env_value = os.getenv(env_var)
67+
if env_value is not None:
68+
with contextlib.suppress(TypeError):
69+
int_value = int(env_value)
70+
return int_value
71+
72+
73+
@cache
74+
def _get_num_workers() -> int:
75+
f"""Calculate the number of workers to use.
76+
77+
Returns
78+
-------
79+
num : `int`
80+
The number of workers to use. Will use the value of the
81+
``LSST_RESOURCES_NUM_WORKERS`` environment variable if set. Will fall
82+
back to using the CPU count (plus 2) but capped at {MAX_WORKERS}.
83+
"""
84+
num_workers: int | None = None
85+
num_workers = _get_int_env_var("LSST_RESOURCES_NUM_WORKERS")
86+
if num_workers is None:
87+
# CPU_LIMIT is used on nublado.
88+
cpu_limit = _get_int_env_var("CPU_LIMIT") or multiprocessing.cpu_count()
89+
if cpu_limit is not None:
90+
num_workers = cpu_limit + 2
91+
92+
# But don't ever return more than the maximum allowed.
93+
return min([num_workers, MAX_WORKERS])
94+
95+
6296
class ResourcePath: # numpydoc ignore=PR02
6397
"""Convenience wrapper around URI parsers.
6498
@@ -883,7 +917,7 @@ def _mexists(cls, uris: Iterable[ResourcePath]) -> dict[ResourcePath, bool]:
883917
existence : `dict` of [`ResourcePath`, `bool`]
884918
Mapping of original URI to boolean indicating existence.
885919
"""
886-
exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
920+
exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers())
887921
future_exists = {exists_executor.submit(uri.exists): uri for uri in uris}
888922

889923
results: dict[ResourcePath, bool] = {}
@@ -926,7 +960,7 @@ def mtransfer(
926960
A dict of all the transfer attempts with a boolean indicating
927961
whether the transfer succeeded for the target URI.
928962
"""
929-
exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
963+
exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers())
930964
future_transfers = {
931965
exists_executor.submit(
932966
to_uri.transfer_from,

0 commit comments

Comments
 (0)