|
19 | 19 | import io
|
20 | 20 | import locale
|
21 | 21 | import logging
|
| 22 | +import multiprocessing |
22 | 23 | import os
|
23 | 24 | import posixpath
|
24 | 25 | import re
|
25 | 26 | import shutil
|
26 | 27 | import tempfile
|
27 | 28 | import urllib.parse
|
| 29 | +from functools import cache |
28 | 30 | from pathlib import Path, PurePath, PurePosixPath
|
29 | 31 | from random import Random
|
30 | 32 |
|
|
59 | 61 | MAX_WORKERS = 10
|
60 | 62 |
|
61 | 63 |
|
| 64 | +@cache |
| 65 | +def _get_num_workers() -> int: |
| 66 | + f"""Calculate the number of workers to use. |
| 67 | +
|
| 68 | + Returns |
| 69 | + ------- |
| 70 | + num : `int` |
| 71 | + The number of workers to use. Will use the value of the |
| 72 | + ``LSST_RESOURCES_NUM_WORKERS`` environment variable if set. Will fall |
| 73 | + back to using the CPU count (plus 2) but capped at {MAX_WORKERS}. |
| 74 | + """ |
| 75 | + num_workers = -1 |
| 76 | + # CPU_LIMIT is used on nublado. |
| 77 | + for env_var in ("LSST_RESOURCES_NUM_WORKERS", "CPU_LIMIT"): |
| 78 | + env_value = os.getenv(env_var) |
| 79 | + if env_value is not None: |
| 80 | + try: |
| 81 | + num_workers = int(env_value) |
| 82 | + except TypeError: |
| 83 | + pass |
| 84 | + else: |
| 85 | + break |
| 86 | + if num_workers == -1: |
| 87 | + # Look at the processor count and add 2. |
| 88 | + # The processor count will be wrong in pods. |
| 89 | + num_workers = multiprocessing.cpu_count() + 2 |
| 90 | + # But don't ever return more than the maximum allowed. |
| 91 | + return min([num_workers, MAX_WORKERS]) |
| 92 | + |
| 93 | + |
62 | 94 | class ResourcePath: # numpydoc ignore=PR02
|
63 | 95 | """Convenience wrapper around URI parsers.
|
64 | 96 |
|
@@ -883,7 +915,7 @@ def _mexists(cls, uris: Iterable[ResourcePath]) -> dict[ResourcePath, bool]:
|
883 | 915 | existence : `dict` of [`ResourcePath`, `bool`]
|
884 | 916 | Mapping of original URI to boolean indicating existence.
|
885 | 917 | """
|
886 |
| - exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) |
| 918 | + exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers()) |
887 | 919 | future_exists = {exists_executor.submit(uri.exists): uri for uri in uris}
|
888 | 920 |
|
889 | 921 | results: dict[ResourcePath, bool] = {}
|
@@ -926,7 +958,7 @@ def mtransfer(
|
926 | 958 | A dict of all the transfer attempts with a boolean indicating
|
927 | 959 | whether the transfer succeeded for the target URI.
|
928 | 960 | """
|
929 |
| - exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) |
| 961 | + exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers()) |
930 | 962 | future_transfers = {
|
931 | 963 | exists_executor.submit(
|
932 | 964 | to_uri.transfer_from,
|
|
0 commit comments