Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-31824: Add experimental mtransfer class method #107

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@
]
nitpick_ignore = [
("py:obj", "lsst.daf.butler.core.datastore.DatastoreTransaction"),
("py:obj", "lsst.resources.ResourcePathExpression"),
("py:class", "MTransferResult"),
("py:obj", "MTransferResult"),
]
161 changes: 154 additions & 7 deletions python/lsst/resources/_resourcePath.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import io
import locale
import logging
import multiprocessing
import os
import posixpath
import re
import shutil
import tempfile
import urllib.parse
from functools import cache
from pathlib import Path, PurePath, PurePosixPath
from random import Random

Expand All @@ -36,7 +38,7 @@
AbstractFileSystem = type

from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING, Any, Literal, overload
from typing import TYPE_CHECKING, Any, Literal, NamedTuple, overload

from ._resourceHandles._baseResourceHandle import ResourceHandleProtocol
from .utils import ensure_directory_is_writeable
Expand All @@ -59,6 +61,45 @@
MAX_WORKERS = 10


class MTransferResult(NamedTuple):
"""Report on a bulk transfer."""

success: bool
exception: Exception | None


def _get_int_env_var(env_var: str) -> int | None:
int_value = None
env_value = os.getenv(env_var)
if env_value is not None:
with contextlib.suppress(TypeError):
int_value = int(env_value)

Check warning on line 76 in python/lsst/resources/_resourcePath.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/_resourcePath.py#L75-L76

Added lines #L75 - L76 were not covered by tests
return int_value


@cache
def _get_num_workers() -> int:
f"""Calculate the number of workers to use.

Returns
-------
num : `int`
The number of workers to use. Will use the value of the
``LSST_RESOURCES_NUM_WORKERS`` environment variable if set. Will fall
back to using the CPU count (plus 2) but capped at {MAX_WORKERS}.
"""
num_workers: int | None = None
num_workers = _get_int_env_var("LSST_RESOURCES_NUM_WORKERS")
if num_workers is None:
# CPU_LIMIT is used on nublado.
cpu_limit = _get_int_env_var("CPU_LIMIT") or multiprocessing.cpu_count()
if cpu_limit is not None:
num_workers = cpu_limit + 2

# But don't ever return more than the maximum allowed.
return min([num_workers, MAX_WORKERS])


class ResourcePath: # numpydoc ignore=PR02
"""Convenience wrapper around URI parsers.

Expand Down Expand Up @@ -883,7 +924,7 @@
existence : `dict` of [`ResourcePath`, `bool`]
Mapping of original URI to boolean indicating existence.
"""
exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers())
future_exists = {exists_executor.submit(uri.exists): uri for uri in uris}

results: dict[ResourcePath, bool] = {}
Expand All @@ -896,6 +937,74 @@
results[uri] = exists
return results

@classmethod
def mtransfer(
cls,
transfer: str,
from_to: Iterable[tuple[ResourcePath, ResourcePath]],
overwrite: bool = False,
transaction: TransactionProtocol | None = None,
do_raise: bool = True,
) -> dict[ResourcePath, MTransferResult]:
"""Transfer many files in bulk.

Parameters
----------
transfer : `str`
Mode to use for transferring the resource. Generically there are
many standard options: copy, link, symlink, hardlink, relsymlink.
Not all URIs support all modes.
from_to : `list` [ `tuple` [ `ResourcePath`, `ResourcePath` ] ]
A sequence of the source URIs and the target URIs.
overwrite : `bool`, optional
Allow an existing file to be overwritten. Defaults to `False`.
transaction : `~lsst.resources.utils.TransactionProtocol`, optional
A transaction object that can (depending on implementation)
rollback transfers on error. Not guaranteed to be implemented.
do_raise : `bool`, optional
If `True` an `ExceptionGroup` will be raised containing any
exceptions raised by the individual transfers. Else a dict
reporting the status of each `ResourcePath` will be returned.

Returns
-------
copy_status : `dict` [ `ResourcePath`, `MTransferResult` ]
A dict of all the transfer attempts with a value indicating
whether the transfer succeeded for the target URI.
"""
exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers())
future_transfers = {
exists_executor.submit(
to_uri.transfer_from,
from_uri,
transfer=transfer,
overwrite=overwrite,
transaction=transaction,
multithreaded=False,
): to_uri
for from_uri, to_uri in from_to
}
results: dict[ResourcePath, MTransferResult] = {}
failed = False
for future in concurrent.futures.as_completed(future_transfers):
to_uri = future_transfers[future]
try:
future.result()
except Exception as e:
transferred = MTransferResult(False, e)
failed = True
else:
transferred = MTransferResult(True, None)
results[to_uri] = transferred

if do_raise and failed:
raise ExceptionGroup(
f"Errors transferring {len(results)} artifacts",
tuple(res.exception for res in results.values() if res.exception is not None),
)

return results

def remove(self) -> None:
"""Remove the resource."""
raise NotImplementedError()
Expand Down Expand Up @@ -923,11 +1032,23 @@
"""
return self

def _as_local(self) -> tuple[str, bool]:
def _as_local(self, multithreaded: bool = True, tmpdir: ResourcePath | None = None) -> tuple[str, bool]:
"""Return the location of the (possibly remote) resource as local file.

This is a helper function for `as_local` context manager.

Parameters
----------
multithreaded : `bool`, optional
If `True` the transfer will be allowed to attempt to improve
throughput by using parallel download streams. This may of no
effect if the URI scheme does not support parallel streams or
if a global override has been applied. If `False` parallel
streams will be disabled.
tmpdir : `ResourcePath` or `None`, optional
Explicit override of the temporary directory to use for remote
downloads.

Returns
-------
path : `str`
Expand All @@ -941,9 +1062,24 @@
raise NotImplementedError()

@contextlib.contextmanager
def as_local(self) -> Iterator[ResourcePath]:
def as_local(
self, multithreaded: bool = True, tmpdir: ResourcePathExpression | None = None
) -> Iterator[ResourcePath]:
"""Return the location of the (possibly remote) resource as local file.

Parameters
----------
multithreaded : `bool`, optional
If `True` the transfer will be allowed to attempt to improve
throughput by using parallel download streams. This may of no
effect if the URI scheme does not support parallel streams or
if a global override has been applied. If `False` parallel
streams will be disabled.
tmpdir : `lsst.resources.ResourcePathExpression` or `None`, optional
Explicit override of the temporary directory to use for remote
downloads. This directory must be a local POSIX directory and
must exist.

Yields
------
local : `ResourcePath`
Expand All @@ -968,7 +1104,10 @@
"""
if self.isdir():
raise IsADirectoryError(f"Directory-like URI {self} cannot be fetched as local.")
local_src, is_temporary = self._as_local()
temp_dir = ResourcePath(tmpdir, forceDirectory=True) if tmpdir is not None else None
if temp_dir is not None and not temp_dir.isLocal:
raise ValueError(f"Temporary directory for as_local must be local resource not {temp_dir}")
local_src, is_temporary = self._as_local(multithreaded=multithreaded, tmpdir=temp_dir)
local_uri = ResourcePath(local_src, isTemporary=is_temporary)

try:
Expand All @@ -994,8 +1133,9 @@
Parameters
----------
prefix : `ResourcePath`, optional
Prefix to use. Without this the path will be formed as a local
file URI in a temporary directory. Ensuring that the prefix
Temporary directory to use (can be any scheme). Without this the
path will be formed as a local file URI in a temporary directory
created by `tempfile.mkdtemp`. Ensuring that the prefix
location exists is the responsibility of the caller.
suffix : `str`, optional
A file suffix to be used. The ``.`` should be included in this
Expand Down Expand Up @@ -1247,6 +1387,7 @@
transfer: str,
overwrite: bool = False,
transaction: TransactionProtocol | None = None,
multithreaded: bool = True,
) -> None:
"""Transfer to this URI from another.

Expand All @@ -1263,6 +1404,12 @@
transaction : `~lsst.resources.utils.TransactionProtocol`, optional
A transaction object that can (depending on implementation)
rollback transfers on error. Not guaranteed to be implemented.
multithreaded : `bool`, optional
If `True` the transfer will be allowed to attempt to improve
throughput by using parallel download streams. This may of no
effect if the URI scheme does not support parallel streams or
if a global override has been applied. If `False` parallel
streams will be disabled.

Notes
-----
Expand Down
99 changes: 57 additions & 42 deletions python/lsst/resources/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,18 @@ def remove(self) -> None:
"""Remove the resource."""
os.remove(self.ospath)

def _as_local(self) -> tuple[str, bool]:
def _as_local(self, multithreaded: bool = True, tmpdir: ResourcePath | None = None) -> tuple[str, bool]:
"""Return the local path of the file.

This is an internal helper for ``as_local()``.

Parameters
----------
multithreaded : `bool`, optional
Unused.
tmpdir : `ResourcePath` or `None`, optional
Unused.

Returns
-------
path : `str`
Expand Down Expand Up @@ -141,6 +148,7 @@ def transfer_from(
transfer: str,
overwrite: bool = False,
transaction: TransactionProtocol | None = None,
multithreaded: bool = True,
) -> None:
"""Transfer the current resource to a local file.

Expand All @@ -155,6 +163,8 @@ def transfer_from(
Allow an existing file to be overwritten. Defaults to `False`.
transaction : `~lsst.resources.utils.TransactionProtocol`, optional
If a transaction is provided, undo actions will be registered.
multithreaded : `bool`, optional
Whether threads are allowed to be used or not.
"""
# Fail early to prevent delays if remote resources are requested
if transfer not in self.transferModes:
Expand All @@ -172,9 +182,53 @@ def transfer_from(
transfer,
)

# The output location should not exist unless overwrite=True.
# Rather than use `exists()`, use os.stat since we might need
# the full answer later.
dest_stat: os.stat_result | None
try:
# Do not read through links of the file itself.
dest_stat = os.lstat(self.ospath)
except FileNotFoundError:
dest_stat = None

# It is possible that the source URI and target URI refer
# to the same file. This can happen for a number of reasons
# (such as soft links in the path, or they really are the same).
# In that case log a message and return as if the transfer
# completed (it technically did). A temporary file download
# can't be the same so the test can be skipped.
if dest_stat and src.isLocal and not src.isTemporary:
# Be consistent and use lstat here (even though realpath
# has been called). It does not harm.
local_src_stat = os.lstat(src.ospath)
if dest_stat.st_ino == local_src_stat.st_ino and dest_stat.st_dev == local_src_stat.st_dev:
log.debug(
"Destination URI %s is the same file as source URI %s, returning immediately."
" No further action required.",
self,
src,
)
return

if not overwrite and dest_stat:
raise FileExistsError(
f"Destination path '{self}' already exists. Transfer from {src} cannot be completed."
)

# Make the destination path absolute (but don't follow links since
# that would possibly cause us to end up in the wrong place if the
# file existed already as a soft link)
newFullPath = os.path.abspath(self.ospath)
outputDir = os.path.dirname(newFullPath)

# We do not have to special case FileResourcePath here because
# as_local handles that.
with src.as_local() as local_uri:
# as_local handles that. If remote download, download it to the
# destination directory to allow an atomic rename but only if that
# directory exists because we do not want to create a directory
# but then end up with the download failing.
tmpdir = outputDir if os.path.exists(outputDir) else None
with src.as_local(multithreaded=multithreaded, tmpdir=tmpdir) as local_uri:
is_temporary = local_uri.isTemporary
local_src = local_uri.ospath

Expand Down Expand Up @@ -228,45 +282,6 @@ def transfer_from(
if src != local_uri and is_temporary and transfer == "copy":
transfer = "move"

# The output location should not exist unless overwrite=True.
# Rather than use `exists()`, use os.stat since we might need
# the full answer later.
dest_stat: os.stat_result | None
try:
# Do not read through links of the file itself.
dest_stat = os.lstat(self.ospath)
except FileNotFoundError:
dest_stat = None

# It is possible that the source URI and target URI refer
# to the same file. This can happen for a number of reasons
# (such as soft links in the path, or they really are the same).
# In that case log a message and return as if the transfer
# completed (it technically did). A temporary file download
# can't be the same so the test can be skipped.
if dest_stat and not is_temporary:
# Be consistent and use lstat here (even though realpath
# has been called). It does not harm.
local_src_stat = os.lstat(local_src)
if dest_stat.st_ino == local_src_stat.st_ino and dest_stat.st_dev == local_src_stat.st_dev:
log.debug(
"Destination URI %s is the same file as source URI %s, returning immediately."
" No further action required.",
self,
local_uri,
)
return

if not overwrite and dest_stat:
raise FileExistsError(
f"Destination path '{self}' already exists. Transfer from {src} cannot be completed."
)

# Make the path absolute (but don't follow links since that
# would possibly cause us to end up in the wrong place if the
# file existed already as a soft link)
newFullPath = os.path.abspath(self.ospath)
outputDir = os.path.dirname(newFullPath)
if not os.path.isdir(outputDir):
# Must create the directory -- this can not be rolled back
# since another transfer running concurrently may
Expand Down
Loading
Loading