Skip to content

DM-31824: Add mtransfer class method #107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/changes/DM-31824.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
* Added ``ResourcePath.mtransfer()`` for doing multiple transfers in parallel.
The number of workers can be controlled using the ``$LSST_RESOURCES_NUM_WORKERS`` environment variable.
* ``transfer_from`` and ``as_local`` now have an additional parameter that can control whether implicit multithreading should be used for a single download.
* ``as_local`` has a new parameter that can be used to explicitly specify the local download location. This can be used for ``transfer_from`` to allow the file to be downloaded to the local destination directory immediately.
3 changes: 3 additions & 0 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@
]
nitpick_ignore = [
("py:obj", "lsst.daf.butler.core.datastore.DatastoreTransaction"),
("py:obj", "lsst.resources.ResourcePathExpression"),
("py:class", "MTransferResult"),
("py:obj", "MTransferResult"),
]
182 changes: 165 additions & 17 deletions python/lsst/resources/_resourcePath.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import io
import locale
import logging
import multiprocessing
import os
import posixpath
import re
import shutil
import tempfile
import urllib.parse
from functools import cache
from pathlib import Path, PurePath, PurePosixPath
from random import Random

Expand All @@ -36,7 +38,7 @@
AbstractFileSystem = type

from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING, Any, Literal, overload
from typing import TYPE_CHECKING, Any, Literal, NamedTuple, overload

from ._resourceHandles._baseResourceHandle import ResourceHandleProtocol
from .utils import ensure_directory_is_writeable
Expand All @@ -59,6 +61,45 @@
MAX_WORKERS = 10


class MTransferResult(NamedTuple):
"""Report on a bulk transfer."""

success: bool
exception: Exception | None


def _get_int_env_var(env_var: str) -> int | None:
int_value = None
env_value = os.getenv(env_var)
if env_value is not None:
with contextlib.suppress(TypeError):
int_value = int(env_value)

Check warning on line 76 in python/lsst/resources/_resourcePath.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/_resourcePath.py#L75-L76

Added lines #L75 - L76 were not covered by tests
return int_value


@cache
def _get_num_workers() -> int:
f"""Calculate the number of workers to use.

Returns
-------
num : `int`
The number of workers to use. Will use the value of the
``LSST_RESOURCES_NUM_WORKERS`` environment variable if set. Will fall
back to using the CPU count (plus 2) but capped at {MAX_WORKERS}.
"""
num_workers: int | None = None
num_workers = _get_int_env_var("LSST_RESOURCES_NUM_WORKERS")
if num_workers is None:
# CPU_LIMIT is used on nublado.
cpu_limit = _get_int_env_var("CPU_LIMIT") or multiprocessing.cpu_count()
if cpu_limit is not None:
num_workers = cpu_limit + 2

# But don't ever return more than the maximum allowed.
return min([num_workers, MAX_WORKERS])


class ResourcePath: # numpydoc ignore=PR02
"""Convenience wrapper around URI parsers.

Expand Down Expand Up @@ -883,17 +924,86 @@
existence : `dict` of [`ResourcePath`, `bool`]
Mapping of original URI to boolean indicating existence.
"""
exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
future_exists = {exists_executor.submit(uri.exists): uri for uri in uris}

results: dict[ResourcePath, bool] = {}
for future in concurrent.futures.as_completed(future_exists):
uri = future_exists[future]
try:
exists = future.result()
except Exception:
exists = False
results[uri] = exists
with concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers()) as exists_executor:
future_exists = {exists_executor.submit(uri.exists): uri for uri in uris}

results: dict[ResourcePath, bool] = {}
for future in concurrent.futures.as_completed(future_exists):
uri = future_exists[future]
try:
exists = future.result()
except Exception:
exists = False

Check warning on line 936 in python/lsst/resources/_resourcePath.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/_resourcePath.py#L935-L936

Added lines #L935 - L936 were not covered by tests
results[uri] = exists
return results

@classmethod
def mtransfer(
cls,
transfer: str,
from_to: Iterable[tuple[ResourcePath, ResourcePath]],
overwrite: bool = False,
transaction: TransactionProtocol | None = None,
do_raise: bool = True,
) -> dict[ResourcePath, MTransferResult]:
"""Transfer many files in bulk.

Parameters
----------
transfer : `str`
Mode to use for transferring the resource. Generically there are
many standard options: copy, link, symlink, hardlink, relsymlink.
Not all URIs support all modes.
from_to : `list` [ `tuple` [ `ResourcePath`, `ResourcePath` ] ]
A sequence of the source URIs and the target URIs.
overwrite : `bool`, optional
Allow an existing file to be overwritten. Defaults to `False`.
transaction : `~lsst.resources.utils.TransactionProtocol`, optional
A transaction object that can (depending on implementation)
rollback transfers on error. Not guaranteed to be implemented.
The transaction object must be thread safe.
do_raise : `bool`, optional
If `True` an `ExceptionGroup` will be raised containing any
exceptions raised by the individual transfers. Else a dict
reporting the status of each `ResourcePath` will be returned.

Returns
-------
copy_status : `dict` [ `ResourcePath`, `MTransferResult` ]
A dict of all the transfer attempts with a value indicating
whether the transfer succeeded for the target URI.
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers()) as transfer_executor:
future_transfers = {
transfer_executor.submit(
to_uri.transfer_from,
from_uri,
transfer=transfer,
overwrite=overwrite,
transaction=transaction,
multithreaded=False,
): to_uri
for from_uri, to_uri in from_to
}
results: dict[ResourcePath, MTransferResult] = {}
failed = False
for future in concurrent.futures.as_completed(future_transfers):
to_uri = future_transfers[future]
try:
future.result()
except Exception as e:
transferred = MTransferResult(False, e)
failed = True
else:
transferred = MTransferResult(True, None)
results[to_uri] = transferred

if do_raise and failed:
raise ExceptionGroup(
f"Errors transferring {len(results)} artifacts",
tuple(res.exception for res in results.values() if res.exception is not None),
)

return results

def remove(self) -> None:
Expand Down Expand Up @@ -923,11 +1033,23 @@
"""
return self

def _as_local(self) -> tuple[str, bool]:
def _as_local(self, multithreaded: bool = True, tmpdir: ResourcePath | None = None) -> tuple[str, bool]:
"""Return the location of the (possibly remote) resource as local file.

This is a helper function for `as_local` context manager.

Parameters
----------
multithreaded : `bool`, optional
If `True` the transfer will be allowed to attempt to improve
throughput by using parallel download streams. This may of no
effect if the URI scheme does not support parallel streams or
if a global override has been applied. If `False` parallel
streams will be disabled.
tmpdir : `ResourcePath` or `None`, optional
Explicit override of the temporary directory to use for remote
downloads.

Returns
-------
path : `str`
Expand All @@ -941,9 +1063,24 @@
raise NotImplementedError()

@contextlib.contextmanager
def as_local(self) -> Iterator[ResourcePath]:
def as_local(
self, multithreaded: bool = True, tmpdir: ResourcePathExpression | None = None
) -> Iterator[ResourcePath]:
"""Return the location of the (possibly remote) resource as local file.

Parameters
----------
multithreaded : `bool`, optional
If `True` the transfer will be allowed to attempt to improve
throughput by using parallel download streams. This may of no
effect if the URI scheme does not support parallel streams or
if a global override has been applied. If `False` parallel
streams will be disabled.
tmpdir : `lsst.resources.ResourcePathExpression` or `None`, optional
Explicit override of the temporary directory to use for remote
downloads. This directory must be a local POSIX directory and
must exist.

Yields
------
local : `ResourcePath`
Expand All @@ -968,7 +1105,10 @@
"""
if self.isdir():
raise IsADirectoryError(f"Directory-like URI {self} cannot be fetched as local.")
local_src, is_temporary = self._as_local()
temp_dir = ResourcePath(tmpdir, forceDirectory=True) if tmpdir is not None else None
if temp_dir is not None and not temp_dir.isLocal:
raise ValueError(f"Temporary directory for as_local must be local resource not {temp_dir}")
local_src, is_temporary = self._as_local(multithreaded=multithreaded, tmpdir=temp_dir)
local_uri = ResourcePath(local_src, isTemporary=is_temporary)

try:
Expand All @@ -994,8 +1134,9 @@
Parameters
----------
prefix : `ResourcePath`, optional
Prefix to use. Without this the path will be formed as a local
file URI in a temporary directory. Ensuring that the prefix
Temporary directory to use (can be any scheme). Without this the
path will be formed as a local file URI in a temporary directory
created by `tempfile.mkdtemp`. Ensuring that the prefix
location exists is the responsibility of the caller.
suffix : `str`, optional
A file suffix to be used. The ``.`` should be included in this
Expand Down Expand Up @@ -1247,6 +1388,7 @@
transfer: str,
overwrite: bool = False,
transaction: TransactionProtocol | None = None,
multithreaded: bool = True,
) -> None:
"""Transfer to this URI from another.

Expand All @@ -1263,6 +1405,12 @@
transaction : `~lsst.resources.utils.TransactionProtocol`, optional
A transaction object that can (depending on implementation)
rollback transfers on error. Not guaranteed to be implemented.
multithreaded : `bool`, optional
If `True` the transfer will be allowed to attempt to improve
throughput by using parallel download streams. This may of no
effect if the URI scheme does not support parallel streams or
if a global override has been applied. If `False` parallel
streams will be disabled.

Notes
-----
Expand Down
Loading
Loading