Skip to content

Commit

Permalink
Avoid expanding paths when source and destination are lists (#1349)
Browse files Browse the repository at this point in the history
  • Loading branch information
john-jam authored Sep 2, 2023
1 parent 212c26f commit 1bbbbdf
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 163 deletions.
176 changes: 101 additions & 75 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,31 +344,42 @@ async def _copy(
elif on_error is None:
on_error = "raise"

source_is_str = isinstance(path1, str)
paths = await self._expand_path(path1, maxdepth=maxdepth, recursive=recursive)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not copy directories
paths = [p for p in paths if not (trailing_sep(p) or await self._isdir(p))]
if not paths:
return
if isinstance(path1, list) and isinstance(path2, list):
# No need to expand paths when both source and destination
# are provided as lists
paths1 = path1
paths2 = path2
else:
source_is_str = isinstance(path1, str)
paths1 = await self._expand_path(
path1, maxdepth=maxdepth, recursive=recursive
)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not copy directories
paths1 = [
p for p in paths1 if not (trailing_sep(p) or await self._isdir(p))
]
if not paths1:
return

source_is_file = len(paths1) == 1
dest_is_dir = isinstance(path2, str) and (
trailing_sep(path2) or await self._isdir(path2)
)

source_is_file = len(paths) == 1
dest_is_dir = isinstance(path2, str) and (
trailing_sep(path2) or await self._isdir(path2)
)
exists = source_is_str and (
(has_magic(path1) and source_is_file)
or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
)
paths2 = other_paths(
paths1,
path2,
exists=exists,
flatten=not source_is_str,
)

exists = source_is_str and (
(has_magic(path1) and source_is_file)
or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
)
path2 = other_paths(
paths,
path2,
exists=exists,
flatten=not source_is_str,
)
batch_size = batch_size or self.batch_size
coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths, path2)]
coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)]
result = await _run_coros_in_chunks(
coros, batch_size=batch_size, return_exceptions=True, nofiles=True
)
Expand Down Expand Up @@ -503,33 +514,39 @@ async def _put(
constructor, or for all instances by setting the "gather_batch_size" key
in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
"""
source_is_str = isinstance(lpath, str)
if source_is_str:
lpath = make_path_posix(lpath)
fs = LocalFileSystem()
lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not copy directories
lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
if not lpaths:
return

source_is_file = len(lpaths) == 1
dest_is_dir = isinstance(rpath, str) and (
trailing_sep(rpath) or await self._isdir(rpath)
)
if isinstance(lpath, list) and isinstance(rpath, list):
# No need to expand paths when both source and destination
# are provided as lists
rpaths = rpath
lpaths = lpath
else:
source_is_str = isinstance(lpath, str)
if source_is_str:
lpath = make_path_posix(lpath)
fs = LocalFileSystem()
lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not copy directories
lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
if not lpaths:
return

source_is_file = len(lpaths) == 1
dest_is_dir = isinstance(rpath, str) and (
trailing_sep(rpath) or await self._isdir(rpath)
)

rpath = self._strip_protocol(rpath)
exists = source_is_str and (
(has_magic(lpath) and source_is_file)
or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
)
rpaths = other_paths(
lpaths,
rpath,
exists=exists,
flatten=not source_is_str,
)
rpath = self._strip_protocol(rpath)
exists = source_is_str and (
(has_magic(lpath) and source_is_file)
or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
)
rpaths = other_paths(
lpaths,
rpath,
exists=exists,
flatten=not source_is_str,
)

is_dir = {l: os.path.isdir(l) for l in lpaths}
rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
Expand Down Expand Up @@ -574,35 +591,44 @@ async def _get(
constructor, or for all instances by setting the "gather_batch_size" key
in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
"""
source_is_str = isinstance(rpath, str)
# First check for rpath trailing slash as _strip_protocol removes it.
source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
rpath = self._strip_protocol(rpath)
rpaths = await self._expand_path(rpath, recursive=recursive, maxdepth=maxdepth)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not copy directories
rpaths = [
p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
]
if not rpaths:
return
if isinstance(lpath, list) and isinstance(rpath, list):
# No need to expand paths when both source and destination
# are provided as lists
rpaths = rpath
lpaths = lpath
else:
source_is_str = isinstance(rpath, str)
# First check for rpath trailing slash as _strip_protocol removes it.
source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
rpath = self._strip_protocol(rpath)
rpaths = await self._expand_path(
rpath, recursive=recursive, maxdepth=maxdepth
)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not copy directories
rpaths = [
p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
]
if not rpaths:
return

lpath = make_path_posix(lpath)
source_is_file = len(rpaths) == 1
dest_is_dir = isinstance(lpath, str) and (
trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
)
lpath = make_path_posix(lpath)
source_is_file = len(rpaths) == 1
dest_is_dir = isinstance(lpath, str) and (
trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
)

exists = source_is_str and (
(has_magic(rpath) and source_is_file)
or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
)
lpaths = other_paths(
rpaths,
lpath,
exists=exists,
flatten=not source_is_str,
)

exists = source_is_str and (
(has_magic(rpath) and source_is_file)
or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
)
lpaths = other_paths(
rpaths,
lpath,
exists=exists,
flatten=not source_is_str,
)
[os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
batch_size = kwargs.pop("batch_size", self.batch_size)

Expand Down
Loading

0 comments on commit 1bbbbdf

Please sign in to comment.