|
19 | 19 | import io
|
20 | 20 | import locale
|
21 | 21 | import logging
|
| 22 | +import multiprocessing |
22 | 23 | import os
|
23 | 24 | import posixpath
|
24 | 25 | import re
|
25 | 26 | import shutil
|
26 | 27 | import tempfile
|
27 | 28 | import urllib.parse
|
| 29 | +from functools import cache |
28 | 30 | from pathlib import Path, PurePath, PurePosixPath
|
29 | 31 | from random import Random
|
30 | 32 |
|
|
59 | 61 | MAX_WORKERS = 10
|
60 | 62 |
|
61 | 63 |
|
| 64 | +@cache |
| 65 | +def _get_num_workers() -> int: |
| 66 | + f"""Calculate the number of workers to use. |
| 67 | +
|
| 68 | + Returns |
| 69 | + ------- |
| 70 | + num : `int` |
| 71 | + The number of workers to use. Will use the value of the |
| 72 | + ``LSST_RESOURCES_NUM_WORKERS`` environment variable if set. Will fall |
| 73 | + back to using the CPU count (plus 2) but capped at {MAX_WORKERS}. |
| 74 | + """ |
| 75 | + num_workers = -1 |
| 76 | + env_value = os.getenv("LSST_RESOURCES_NUM_WORKERS") |
| 77 | + if env_value is not None: |
| 78 | + try: |
| 79 | + num_workers = int(env_value) |
| 80 | + except TypeError: |
| 81 | + pass |
| 82 | + if num_workers == -1: |
| 83 | + # Look at the processor count and add 2. |
| 84 | + num_workers = multiprocessing.cpu_count() + 2 |
| 85 | + # But don't ever return more than the maximum allowed. |
| 86 | + return min([num_workers, MAX_WORKERS]) |
| 87 | + |
| 88 | + |
62 | 89 | class ResourcePath: # numpydoc ignore=PR02
|
63 | 90 | """Convenience wrapper around URI parsers.
|
64 | 91 |
|
@@ -883,7 +910,7 @@ def _mexists(cls, uris: Iterable[ResourcePath]) -> dict[ResourcePath, bool]:
|
883 | 910 | existence : `dict` of [`ResourcePath`, `bool`]
|
884 | 911 | Mapping of original URI to boolean indicating existence.
|
885 | 912 | """
|
886 |
| - exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) |
| 913 | + exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers()) |
887 | 914 | future_exists = {exists_executor.submit(uri.exists): uri for uri in uris}
|
888 | 915 |
|
889 | 916 | results: dict[ResourcePath, bool] = {}
|
@@ -926,7 +953,7 @@ def mtransfer(
|
926 | 953 | A dict of all the transfer attempts with a boolean indicating
|
927 | 954 | whether the transfer succeeded for the target URI.
|
928 | 955 | """
|
929 |
| - exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) |
| 956 | + exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers()) |
930 | 957 | future_transfers = {
|
931 | 958 | exists_executor.submit(
|
932 | 959 | to_uri.transfer_from,
|
|
0 commit comments