Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-31824: Add experimental mtransfer class method #107

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@
]
nitpick_ignore = [
("py:obj", "lsst.daf.butler.core.datastore.DatastoreTransaction"),
("py:obj", "lsst.resources.ResourcePathExpression"),
]
138 changes: 132 additions & 6 deletions python/lsst/resources/_resourcePath.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import io
import locale
import logging
import multiprocessing
import os
import posixpath
import re
import shutil
import tempfile
import urllib.parse
from functools import cache
from pathlib import Path, PurePath, PurePosixPath
from random import Random

Expand Down Expand Up @@ -59,6 +61,38 @@
MAX_WORKERS = 10


def _get_int_env_var(env_var: str) -> int | None:
int_value = None
env_value = os.getenv(env_var)
if env_value is not None:
with contextlib.suppress(TypeError):
int_value = int(env_value)

Check warning on line 69 in python/lsst/resources/_resourcePath.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/_resourcePath.py#L68-L69

Added lines #L68 - L69 were not covered by tests
return int_value


@cache
def _get_num_workers() -> int:
f"""Calculate the number of workers to use.

Returns
-------
num : `int`
The number of workers to use. Will use the value of the
``LSST_RESOURCES_NUM_WORKERS`` environment variable if set. Will fall
back to using the CPU count (plus 2) but capped at {MAX_WORKERS}.
"""
num_workers: int | None = None
num_workers = _get_int_env_var("LSST_RESOURCES_NUM_WORKERS")
if num_workers is None:
# CPU_LIMIT is used on nublado.
cpu_limit = _get_int_env_var("CPU_LIMIT") or multiprocessing.cpu_count()
if cpu_limit is not None:
num_workers = cpu_limit + 2

# But don't ever return more than the maximum allowed.
return min([num_workers, MAX_WORKERS])


class ResourcePath: # numpydoc ignore=PR02
"""Convenience wrapper around URI parsers.

Expand Down Expand Up @@ -883,7 +917,7 @@
existence : `dict` of [`ResourcePath`, `bool`]
Mapping of original URI to boolean indicating existence.
"""
exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers())
future_exists = {exists_executor.submit(uri.exists): uri for uri in uris}

results: dict[ResourcePath, bool] = {}
Expand All @@ -896,6 +930,60 @@
results[uri] = exists
return results

@classmethod
def mtransfer(
cls,
transfer: str,
from_to: Iterable[tuple[ResourcePath, ResourcePath]],
overwrite: bool = False,
transaction: TransactionProtocol | None = None,
) -> dict[ResourcePath, bool]:
"""Transfer many files in bulk.

Parameters
----------
transfer : `str`
Mode to use for transferring the resource. Generically there are
many standard options: copy, link, symlink, hardlink, relsymlink.
Not all URIs support all modes.
from_to : `list` [ `tuple` [ `ResourcePath`, `ResourcePath` ] ]
A sequence of the source URIs and the target URIs.
overwrite : `bool`, optional
Allow an existing file to be overwritten. Defaults to `False`.
transaction : `~lsst.resources.utils.TransactionProtocol`, optional
A transaction object that can (depending on implementation)
rollback transfers on error. Not guaranteed to be implemented.

Returns
-------
copy_status : `dict` [ `ResourcePath`, `bool` ]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dhirving once sticking point for mtransfer over transfer_from in a loop is that the latter raises an exception and tells you the problem immediately and you stop doing the transfers. mtransfer as written lets everything complete and then tells you a simple yes/no when the caller would like to know some reasons for failures (such as FileExistsError). I could store the Exceptions in the returned dict. Or I could let the first failure raise (is that allowed in concurrent futures?)? Or at the end of the transfers all the number of failures could be counted and the final exception encountered could be raised (with a note saying how many other failures there were). Should there be a parameter to indicate whether the caller wants a raise vs dict returned? I would be interested in your opinion on this. (butler does need to have some idea as to why a failure happened to try to help in its error message)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea might be to raise an ExceptionGroup with every failure.

I think your existing dict thing would work too, I wonder if it could be like

class TransferResult(NamedTuple):
    success: bool
    exception: Exception | None

dict[ResourcePath, TransferResult]

or something like that.

Bailing immediately on the first failure is potentially problematic because you will still have concurrent transfers going in the background... I think you can cancel the unscheduled ones but you would want the already-executing ones to finish before throwing. I think your idea of adding a parameter to choose "bail on first failure" vs "continue as far as possible" is decent.

A dict of all the transfer attempts with a boolean indicating
whether the transfer succeeded for the target URI.
"""
exists_executor = concurrent.futures.ThreadPoolExecutor(max_workers=_get_num_workers())
future_transfers = {

Check warning on line 964 in python/lsst/resources/_resourcePath.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/_resourcePath.py#L963-L964

Added lines #L963 - L964 were not covered by tests
exists_executor.submit(
to_uri.transfer_from,
from_uri,
transfer=transfer,
overwrite=overwrite,
transaction=transaction,
multithreaded=False,
): to_uri
for from_uri, to_uri in from_to
}
results: dict[ResourcePath, bool] = {}

Check warning on line 975 in python/lsst/resources/_resourcePath.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/_resourcePath.py#L975

Added line #L975 was not covered by tests
for future in concurrent.futures.as_completed(future_transfers):
to_uri = future_transfers[future]
try:
future.result()
except Exception:
transferred = False

Check warning on line 981 in python/lsst/resources/_resourcePath.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/_resourcePath.py#L977-L981

Added lines #L977 - L981 were not covered by tests
else:
transferred = True
results[to_uri] = transferred
return results

Check warning on line 985 in python/lsst/resources/_resourcePath.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/_resourcePath.py#L983-L985

Added lines #L983 - L985 were not covered by tests

def remove(self) -> None:
"""Remove the resource."""
raise NotImplementedError()
Expand Down Expand Up @@ -923,11 +1011,23 @@
"""
return self

def _as_local(self) -> tuple[str, bool]:
def _as_local(self, multithreaded: bool = True, tmpdir: ResourcePath | None = None) -> tuple[str, bool]:
"""Return the location of the (possibly remote) resource as local file.

This is a helper function for `as_local` context manager.

Parameters
----------
multithreaded : `bool`, optional
If `True` the transfer will be allowed to attempt to improve
throughput by using parallel download streams. This may of no
effect if the URI scheme does not support parallel streams or
if a global override has been applied. If `False` parallel
streams will be disabled.
tmpdir : `ResourcePath` or `None`, optional
Explicit override of the temporary directory to use for remote
downloads.

Returns
-------
path : `str`
Expand All @@ -941,9 +1041,24 @@
raise NotImplementedError()

@contextlib.contextmanager
def as_local(self) -> Iterator[ResourcePath]:
def as_local(
self, multithreaded: bool = True, tmpdir: ResourcePathExpression | None = None
) -> Iterator[ResourcePath]:
"""Return the location of the (possibly remote) resource as local file.

Parameters
----------
multithreaded : `bool`, optional
If `True` the transfer will be allowed to attempt to improve
throughput by using parallel download streams. This may of no
effect if the URI scheme does not support parallel streams or
if a global override has been applied. If `False` parallel
streams will be disabled.
tmpdir : `lsst.resources.ResourcePathExpression` or `None`, optional
Explicit override of the temporary directory to use for remote
downloads. This directory must be a local POSIX directory and
must exist.

Yields
------
local : `ResourcePath`
Expand All @@ -968,7 +1083,10 @@
"""
if self.isdir():
raise IsADirectoryError(f"Directory-like URI {self} cannot be fetched as local.")
local_src, is_temporary = self._as_local()
temp_dir = ResourcePath(tmpdir, forceDirectory=True) if tmpdir is not None else None
if temp_dir is not None and not temp_dir.isLocal:
raise ValueError(f"Temporary directory for as_local must be local resource not {temp_dir}")

Check warning on line 1088 in python/lsst/resources/_resourcePath.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/_resourcePath.py#L1088

Added line #L1088 was not covered by tests
local_src, is_temporary = self._as_local(multithreaded=multithreaded, tmpdir=temp_dir)
local_uri = ResourcePath(local_src, isTemporary=is_temporary)

try:
Expand All @@ -994,8 +1112,9 @@
Parameters
----------
prefix : `ResourcePath`, optional
Prefix to use. Without this the path will be formed as a local
file URI in a temporary directory. Ensuring that the prefix
Temporary directory to use (can be any scheme). Without this the
path will be formed as a local file URI in a temporary directory
created by `tempfile.mkdtemp`. Ensuring that the prefix
location exists is the responsibility of the caller.
suffix : `str`, optional
A file suffix to be used. The ``.`` should be included in this
Expand Down Expand Up @@ -1247,6 +1366,7 @@
transfer: str,
overwrite: bool = False,
transaction: TransactionProtocol | None = None,
multithreaded: bool = True,
) -> None:
"""Transfer to this URI from another.

Expand All @@ -1263,6 +1383,12 @@
transaction : `~lsst.resources.utils.TransactionProtocol`, optional
A transaction object that can (depending on implementation)
rollback transfers on error. Not guaranteed to be implemented.
multithreaded : `bool`, optional
If `True` the transfer will be allowed to attempt to improve
throughput by using parallel download streams. This may of no
effect if the URI scheme does not support parallel streams or
if a global override has been applied. If `False` parallel
streams will be disabled.

Notes
-----
Expand Down
99 changes: 57 additions & 42 deletions python/lsst/resources/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,18 @@ def remove(self) -> None:
"""Remove the resource."""
os.remove(self.ospath)

def _as_local(self) -> tuple[str, bool]:
def _as_local(self, multithreaded: bool = True, tmpdir: ResourcePath | None = None) -> tuple[str, bool]:
"""Return the local path of the file.

This is an internal helper for ``as_local()``.

Parameters
----------
multithreaded : `bool`, optional
Unused.
tmpdir : `ResourcePath` or `None`, optional
Unused.

Returns
-------
path : `str`
Expand Down Expand Up @@ -141,6 +148,7 @@ def transfer_from(
transfer: str,
overwrite: bool = False,
transaction: TransactionProtocol | None = None,
multithreaded: bool = True,
) -> None:
"""Transfer the current resource to a local file.

Expand All @@ -155,6 +163,8 @@ def transfer_from(
Allow an existing file to be overwritten. Defaults to `False`.
transaction : `~lsst.resources.utils.TransactionProtocol`, optional
If a transaction is provided, undo actions will be registered.
multithreaded : `bool`, optional
Whether threads are allowed to be used or not.
"""
# Fail early to prevent delays if remote resources are requested
if transfer not in self.transferModes:
Expand All @@ -172,9 +182,53 @@ def transfer_from(
transfer,
)

# The output location should not exist unless overwrite=True.
# Rather than use `exists()`, use os.stat since we might need
# the full answer later.
dest_stat: os.stat_result | None
try:
# Do not read through links of the file itself.
dest_stat = os.lstat(self.ospath)
except FileNotFoundError:
dest_stat = None

# It is possible that the source URI and target URI refer
# to the same file. This can happen for a number of reasons
# (such as soft links in the path, or they really are the same).
# In that case log a message and return as if the transfer
# completed (it technically did). A temporary file download
# can't be the same so the test can be skipped.
if dest_stat and src.isLocal and not src.isTemporary:
# Be consistent and use lstat here (even though realpath
# has been called). It does not harm.
local_src_stat = os.lstat(src.ospath)
if dest_stat.st_ino == local_src_stat.st_ino and dest_stat.st_dev == local_src_stat.st_dev:
log.debug(
"Destination URI %s is the same file as source URI %s, returning immediately."
" No further action required.",
self,
src,
)
return

if not overwrite and dest_stat:
raise FileExistsError(
f"Destination path '{self}' already exists. Transfer from {src} cannot be completed."
)

# Make the destination path absolute (but don't follow links since
# that would possibly cause us to end up in the wrong place if the
# file existed already as a soft link)
newFullPath = os.path.abspath(self.ospath)
outputDir = os.path.dirname(newFullPath)

# We do not have to special case FileResourcePath here because
# as_local handles that.
with src.as_local() as local_uri:
# as_local handles that. If remote download, download it to the
# destination directory to allow an atomic rename but only if that
# directory exists because we do not want to create a directory
# but then end up with the download failing.
tmpdir = outputDir if os.path.exists(outputDir) else None
with src.as_local(multithreaded=multithreaded, tmpdir=tmpdir) as local_uri:
is_temporary = local_uri.isTemporary
local_src = local_uri.ospath

Expand Down Expand Up @@ -228,45 +282,6 @@ def transfer_from(
if src != local_uri and is_temporary and transfer == "copy":
transfer = "move"

# The output location should not exist unless overwrite=True.
# Rather than use `exists()`, use os.stat since we might need
# the full answer later.
dest_stat: os.stat_result | None
try:
# Do not read through links of the file itself.
dest_stat = os.lstat(self.ospath)
except FileNotFoundError:
dest_stat = None

# It is possible that the source URI and target URI refer
# to the same file. This can happen for a number of reasons
# (such as soft links in the path, or they really are the same).
# In that case log a message and return as if the transfer
# completed (it technically did). A temporary file download
# can't be the same so the test can be skipped.
if dest_stat and not is_temporary:
# Be consistent and use lstat here (even though realpath
# has been called). It does not harm.
local_src_stat = os.lstat(local_src)
if dest_stat.st_ino == local_src_stat.st_ino and dest_stat.st_dev == local_src_stat.st_dev:
log.debug(
"Destination URI %s is the same file as source URI %s, returning immediately."
" No further action required.",
self,
local_uri,
)
return

if not overwrite and dest_stat:
raise FileExistsError(
f"Destination path '{self}' already exists. Transfer from {src} cannot be completed."
)

# Make the path absolute (but don't follow links since that
# would possibly cause us to end up in the wrong place if the
# file existed already as a soft link)
newFullPath = os.path.abspath(self.ospath)
outputDir = os.path.dirname(newFullPath)
if not os.path.isdir(outputDir):
# Must create the directory -- this can not be rolled back
# since another transfer running concurrently may
Expand Down
10 changes: 7 additions & 3 deletions python/lsst/resources/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@
# Should this method do anything at all?
self.blob.upload_from_string(b"", retry=_RETRY_POLICY)

def _as_local(self) -> tuple[str, bool]:
def _as_local(self, multithreaded: bool = True, tmpdir: ResourcePath | None = None) -> tuple[str, bool]:
with (
ResourcePath.temporary_uri(suffix=self.getExtension(), delete=False) as tmp_uri,
ResourcePath.temporary_uri(prefix=tmpdir, suffix=self.getExtension(), delete=False) as tmp_uri,
time_this(log, msg="Downloading %s to local file", args=(self,)),
):
try:
Expand All @@ -220,6 +220,7 @@
transfer: str = "copy",
overwrite: bool = False,
transaction: TransactionProtocol | None = None,
multithreaded: bool = True,
) -> None:
if transfer not in self.transferModes:
raise ValueError(f"Transfer mode '{transfer}' not supported by URI scheme {self.scheme}")
Expand Down Expand Up @@ -271,7 +272,10 @@
break
else:
# Use local file and upload it
with src.as_local() as local_uri, time_this(log, msg=timer_msg, args=timer_args):
with (

Check warning on line 275 in python/lsst/resources/gs.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/gs.py#L275

Added line #L275 was not covered by tests
src.as_local(multithreaded=multithreaded) as local_uri,
time_this(log, msg=timer_msg, args=timer_args),
):
self.blob.upload_from_filename(local_uri.ospath, retry=_RETRY_POLICY)

# This was an explicit move requested from a remote resource
Expand Down
Loading
Loading