From 0b10421bd18b7d56f40c69b5d4b81afce03d923a Mon Sep 17 00:00:00 2001 From: Dan King Date: Tue, 12 Dec 2023 20:07:53 -0500 Subject: [PATCH] works at last --- hail/python/hailtop/aiotools/plan.py | 27 ++- hail/python/hailtop/hailctl/fs/cli.py | 26 ++- .../inter_cloud/copier_test_utilities.py | 13 +- .../test/hailtop/inter_cloud/test_copy.py | 154 +++++++++++------- 4 files changed, 141 insertions(+), 79 deletions(-) diff --git a/hail/python/hailtop/aiotools/plan.py b/hail/python/hailtop/aiotools/plan.py index 5048497956c..423eee51def 100644 --- a/hail/python/hailtop/aiotools/plan.py +++ b/hail/python/hailtop/aiotools/plan.py @@ -24,7 +24,8 @@ class PlanError(ValueError): async def plan( folder: str, - copy: List[Tuple[str, str]], + copy_to: List[Tuple[str, str]], + copy_into: List[Tuple[str, str]], gcs_requester_pays_project: Optional[str], verbose: bool, max_parallelism: int, @@ -38,6 +39,16 @@ async def plan( total_n_bytes = 0 async with RouterAsyncFS(gcs_kwargs=gcs_kwargs) as fs: + def create_copy_into(copy_into_tuple: Tuple[str, str]) -> Tuple[str, str]: + src, dest = copy_into_tuple + src_url = fs.parse_url(src) + dest_url = fs.parse_url(dest) + src_basename = os.path.basename(src_url.path) + destination_file = dest_url.with_new_path_component(src_basename) + return (src, str(destination_file)) + + copy = [*copy_to, *(create_copy_into(x) for x in copy_into)] + if any(await asyncio.gather(fs.isfile(folder), fs.isdir(folder.rstrip('/') + '/'))): raise PlanError(f'plan folder already exists: {folder}', 1) @@ -121,16 +132,16 @@ async def find_all_copy_pairs( listfiles(fs, dst), ) - print((srcstat, srcfiles, dststat, dstfiles)) - if srcstat and srcfiles: raise PlanError(f'Source is both a directory and a file. This is not supported. {src}', 1) if dststat and dstfiles: raise PlanError(f'Destination is both a directory and a file. This is not supported. {dst}', 1) if srcstat and dstfiles: - raise PlanError(f'Source is a file but destination is a directory. This is not supported. {src} -> {dst}', 1) + raise PlanError(f'Source is a file but destination is a directory. This is not supported. {src} -> {dst}', 1) from ( + IsADirectoryError(dst)) if srcfiles and dststat: - raise PlanError(f'Source is a directory but destination is a file. This is not supported. {src} -> {dst}', 1) + raise PlanError(f'Source is a directory but destination is a file. This is not supported. {src} -> {dst}', 1) from ( + IsADirectoryError(src)) if srcstat: assert len(srcfiles) == 0 assert len(dstfiles) == 0 @@ -147,9 +158,9 @@ async def find_all_copy_pairs( await srconly.write((srcurl + '\n').encode('utf-8')) await plan.write((srcurl + '\t' + dst + '\n').encode('utf-8')) return 1, srcsize - elif dststat: - await dstonly.write((dst + '\n').encode('utf-8')) - return 0, 0 + elif not srcfiles: + assert srcstat is None + raise PlanError(f'Source is neither a folder nor a file: {src}') from FileNotFoundError(src) srcfiles.sort(key=lambda x: x[0]) dstfiles.sort(key=lambda x: x[0]) diff --git a/hail/python/hailtop/hailctl/fs/cli.py b/hail/python/hailtop/hailctl/fs/cli.py index fdd29bd8e45..d2277409615 100644 --- a/hail/python/hailtop/hailctl/fs/cli.py +++ b/hail/python/hailtop/hailctl/fs/cli.py @@ -1,8 +1,9 @@ from typing import Optional, List, Tuple, cast import asyncio -import typer import click +import os import sys +import typer from hailtop.aiotools.plan import plan, PlanError from hailtop.aiotools.sync import sync as aiotools_sync, SyncError @@ -24,8 +25,16 @@ def callback(): @click.command() @click.option( - '--copy', - help='Pairs of source and destination URL. May be specified multiple times.', + '--copy-to', + help='Pairs of source and destination URL. May be specified multiple times. The destination is always treated as a file. See --copy-into to copy into a directory', + type=(str, str), + required=False, + multiple=True, + default=(), +) +@click.option( + '--copy-into', + help='Copies the source path into the target path. The target must not be a file.', type=(str, str), required=False, multiple=True, @@ -37,7 +46,8 @@ def callback(): @click.option('--use-plan', help='The plan to execute. Must exist.', type=str, required=False) @click.option('--gcs-requester-pays-project', help='The Google project to which to charge egress costs.', type=str, required=False) def sync( - copy: List[Tuple[str, str]], + copy_to: List[Tuple[str, str]], + copy_into: List[Tuple[str, str]], verbose: bool, max_parallelism: int, make_plan: Optional[str] = None, @@ -69,7 +79,13 @@ def sync( if make_plan: try: - asyncio.run(plan(make_plan, copy, gcs_requester_pays_project, verbose, max_parallelism)) + asyncio.run(plan( + make_plan, + copy_to, + copy_into, + gcs_requester_pays_project, + verbose, + max_parallelism)) except PlanError as err: print(err.args[0]) sys.exit(err.args[1]) diff --git a/hail/python/test/hailtop/inter_cloud/copier_test_utilities.py b/hail/python/test/hailtop/inter_cloud/copier_test_utilities.py index 8e062a6236e..8b69976037e 100644 --- a/hail/python/test/hailtop/inter_cloud/copier_test_utilities.py +++ b/hail/python/test/hailtop/inter_cloud/copier_test_utilities.py @@ -78,14 +78,15 @@ async def router_filesystem(request) -> AsyncIterator[Tuple[asyncio.Semaphore, A yield (sema, fs, bases) await bounded_gather2(sema, functools.partial(fs.rmtree, sema, file_base), - functools.partial(fs.rmtree, sema, gs_base), - functools.partial(fs.rmtree, sema, s3_base), - functools.partial(fs.rmtree, sema, azure_base)) + # functools.partial(fs.rmtree, sema, gs_base), + # functools.partial(fs.rmtree, sema, s3_base), + # functools.partial(fs.rmtree, sema, azure_base) + ) assert not await fs.isdir(file_base) - assert not await fs.isdir(gs_base) - assert not await fs.isdir(s3_base) - assert not await fs.isdir(azure_base) + # assert not await fs.isdir(gs_base) + # assert not await fs.isdir(s3_base) + # assert not await fs.isdir(azure_base) async def fresh_dir(fs, bases, scheme): diff --git a/hail/python/test/hailtop/inter_cloud/test_copy.py b/hail/python/test/hailtop/inter_cloud/test_copy.py index f0cc06679b0..525364ca7b8 100644 --- a/hail/python/test/hailtop/inter_cloud/test_copy.py +++ b/hail/python/test/hailtop/inter_cloud/test_copy.py @@ -1,10 +1,12 @@ from typing import Tuple, Dict, AsyncIterator, List -import secrets import asyncio +import os import pytest +import secrets +import tempfile from hailtop.utils import url_scheme -from hailtop.aiotools.plan import plan -from hailtop.aiotools.sync import sync +from hailtop.aiotools.plan import plan, PlanError +from hailtop.aiotools.sync import sync, SyncError from hailtop.aiotools import Transfer, FileAndDirectoryError, Copier, AsyncFS, FileListEntry @@ -42,14 +44,42 @@ async def copier_copy(fs, sema, transfer): await Copier.copy(fs, sema, transfer) -async def sync_tool(fs, sema, transfer): +async def sync_tool(fs, sema, transfer: Transfer): del fs max_parallelism = sema._value - copies = [ - (transfer.src, transfer.dest) # FIXME: what about treat_dest_as? - ] - await plan('plan1', copies, None, True, max_parallelism) - await sync('plan1', None, True, max_parallelism) + + sources = transfer.src + if isinstance(sources, str): + sources = [sources] + + copy_to = [] + copy_into = [] + + if transfer.treat_dest_as == Transfer.DEST_DIR or (transfer.treat_dest_as == Transfer.INFER_DEST and os.path.isdir(transfer.dest)): + copy_into = [ + (src, transfer.dest) + for src in sources + ] + elif transfer.treat_dest_as == Transfer.DEST_IS_TARGET or (transfer.treat_dest_as == Transfer.INFER_DEST and not os.path.isdir(transfer.dest)): + if os.path.isfile(transfer.dest): + if len(sources) > 1 or os.path.isdir(sources[0]): + raise NotADirectoryError(transfer.dest) + copy_to = [ + (src, transfer.dest) + for src in sources + ] + else: + raise ValueError(f'unsupported treat_dest_as: {transfer.treat_dest_as}') + + with tempfile.TemporaryDirectory() as folder: + try: + await plan(os.path.join(folder, 'plan'), copy_to, copy_into, None, True, max_parallelism) + await sync(os.path.join(folder, 'plan'), None, True, max_parallelism) + except (PlanError, SyncError) as err: + if err.__cause__: + raise err.__cause__ + else: + raise err @pytest.fixture(params=['Copier.copy', 'hailctl_sync']) @@ -126,40 +156,40 @@ async def test_copy_large_file(copy_test_context, copy_tool): @pytest.mark.asyncio -async def test_copy_rename_file(copy_test_context): +async def test_copy_rename_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) + await copy_tool(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) await expect_file(fs, f'{dest_base}x', 'src/a') @pytest.mark.asyncio -async def test_copy_rename_file_dest_target_file(copy_test_context): +async def test_copy_rename_file_dest_target_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) await expect_file(fs, f'{dest_base}x', 'src/a') @pytest.mark.asyncio -async def test_copy_file_dest_target_directory_doesnt_exist(copy_test_context): +async def test_copy_file_dest_target_directory_doesnt_exist(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') # SourceCopier._copy_file creates destination directories as needed - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_DIR)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_DIR)) await expect_file(fs, f'{dest_base}x/a', 'src/a') @pytest.mark.asyncio -async def test_overwrite_rename_file(copy_test_context): +async def test_overwrite_rename_file(copy_test_context): # hailctl fs sync does not support overwriting sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') @@ -171,24 +201,24 @@ async def test_overwrite_rename_file(copy_test_context): @pytest.mark.asyncio -async def test_copy_rename_dir(copy_test_context): +async def test_copy_rename_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) + await copy_tool(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) await expect_file(fs, f'{dest_base}x/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}x/subdir/file2', 'src/a/subdir/file2') @pytest.mark.asyncio -async def test_copy_rename_dir_dest_is_target(copy_test_context): +async def test_copy_rename_dir_dest_is_target(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) await expect_file(fs, f'{dest_base}x/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}x/subdir/file2', 'src/a/subdir/file2') @@ -196,12 +226,13 @@ async def test_copy_rename_dir_dest_is_target(copy_test_context): @pytest.mark.asyncio async def test_overwrite_rename_dir(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support overwrite sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') await create_test_dir(fs, 'dest', dest_base, 'x/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) await expect_file(fs, f'{dest_base}x/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}x/subdir/file2', 'src/a/subdir/file2') @@ -209,77 +240,79 @@ async def test_overwrite_rename_dir(copy_test_context): @pytest.mark.asyncio -async def test_copy_file_dest_trailing_slash_target_dir(copy_test_context): +async def test_copy_file_dest_trailing_slash_target_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base, treat_dest_as=Transfer.DEST_DIR)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base, treat_dest_as=Transfer.DEST_DIR)) await expect_file(fs, f'{dest_base}a', 'src/a') @pytest.mark.asyncio -async def test_copy_file_dest_target_dir(copy_test_context): +async def test_copy_file_dest_target_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_DIR)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_DIR)) await expect_file(fs, f'{dest_base}a', 'src/a') @pytest.mark.asyncio -async def test_copy_file_dest_target_file(copy_test_context): +async def test_copy_file_dest_target_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}a', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', f'{dest_base}a', treat_dest_as=Transfer.DEST_IS_TARGET)) await expect_file(fs, f'{dest_base}a', 'src/a') @pytest.mark.asyncio -async def test_copy_dest_target_file_is_dir(copy_test_context): +async def test_copy_dest_target_file_is_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') with RaisesOrObjectStore(dest_base, IsADirectoryError): - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET)) @pytest.mark.asyncio async def test_overwrite_file(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support overwriting sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') await create_test_file(fs, 'dest', dest_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a', 'src/a') @pytest.mark.asyncio async def test_copy_file_src_trailing_slash(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support trailing slashes on files sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') with pytest.raises(FileNotFoundError): - await Copier.copy(fs, sema, Transfer(f'{src_base}a/', dest_base)) + await copy_tool(fs, sema, Transfer(f'{src_base}a/', dest_base)) @pytest.mark.asyncio -async def test_copy_dir(copy_test_context): +async def test_copy_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}a/subdir/file2', 'src/a/subdir/file2') @@ -287,12 +320,13 @@ async def test_copy_dir(copy_test_context): @pytest.mark.asyncio async def test_overwrite_dir(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support overwrite sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') await create_test_dir(fs, 'dest', dest_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}a/subdir/file2', 'src/a/subdir/file2') @@ -300,31 +334,31 @@ async def test_overwrite_dir(copy_test_context): @pytest.mark.asyncio -async def test_copy_multiple(copy_test_context): +async def test_copy_multiple(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') await create_test_file(fs, 'src', src_base, 'b') - await Copier.copy(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a', 'src/a') await expect_file(fs, f'{dest_base}b', 'src/b') @pytest.mark.asyncio -async def test_copy_multiple_dest_target_file(copy_test_context): +async def test_copy_multiple_dest_target_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') await create_test_file(fs, 'src', src_base, 'b') with RaisesOrObjectStore(dest_base, NotADirectoryError): - await Copier.copy(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET)) @pytest.mark.asyncio -async def test_copy_multiple_dest_file(copy_test_context): +async def test_copy_multiple_dest_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') @@ -332,17 +366,17 @@ async def test_copy_multiple_dest_file(copy_test_context): await create_test_file(fs, 'dest', dest_base, 'x') with RaisesOrObjectStore(dest_base, NotADirectoryError): - await Copier.copy(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], f'{dest_base}x')) + await copy_tool(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], f'{dest_base}x')) @pytest.mark.asyncio -async def test_file_overwrite_dir(copy_test_context): +async def test_file_overwrite_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') with RaisesOrObjectStore(dest_base, IsADirectoryError): - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET)) @pytest.mark.asyncio @@ -356,16 +390,16 @@ async def test_file_and_directory_error(router_filesystem: Tuple[asyncio.Semapho await create_test_file(fs, 'src', src_base, 'a/subfile') with pytest.raises(FileAndDirectoryError): - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) @pytest.mark.asyncio -async def test_copy_src_parts(copy_test_context): +async def test_copy_src_parts(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy(fs, sema, Transfer([f'{src_base}a/file1', f'{src_base}a/subdir'], dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_DIR)) + await copy_tool(fs, sema, Transfer([f'{src_base}a/file1', f'{src_base}a/subdir'], dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_DIR)) await expect_file(fs, f'{dest_base}file1', 'src/a/file1') await expect_file(fs, f'{dest_base}subdir/file2', 'src/a/subdir/file2') @@ -381,7 +415,7 @@ async def collect_files(it: AsyncIterator[FileListEntry]) -> List[str]: @pytest.mark.asyncio -async def test_file_and_directory_error_with_slash_empty_file(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str): +async def test_file_and_directory_error_with_slash_empty_file(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str, copy_tool): sema, fs, bases = router_filesystem src_base = await fresh_dir(fs, bases, cloud_scheme) @@ -397,11 +431,11 @@ async def test_file_and_directory_error_with_slash_empty_file(router_filesystem: for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}empty/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool(fs, sema, Transfer(f'{src_base}empty/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) await collect_files(await fs.listfiles(f'{dest_base}')) await collect_files(await fs.listfiles(f'{dest_base}', recursive=True)) @@ -434,7 +468,7 @@ async def test_file_and_directory_error_with_slash_non_empty_file_for_google_non @pytest.mark.asyncio -async def test_file_and_directory_error_with_slash_non_empty_file(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str): +async def test_file_and_directory_error_with_slash_non_empty_file(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str, copy_tool): sema, fs, bases = router_filesystem src_base = await fresh_dir(fs, bases, cloud_scheme) @@ -451,7 +485,7 @@ async def test_file_and_directory_error_with_slash_non_empty_file(router_filesys for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}not-empty/bar', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool(fs, sema, Transfer(f'{src_base}not-empty/bar', dest_base.rstrip('/'), treat_dest_as=transfer_type)) if transfer_type == Transfer.DEST_DIR: exp_dest = f'{dest_base}bar' await expect_file(fs, exp_dest, 'bar') @@ -464,15 +498,15 @@ async def test_file_and_directory_error_with_slash_non_empty_file(router_filesys with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}not-empty/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool(fs, sema, Transfer(f'{src_base}not-empty/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) @pytest.mark.asyncio -async def test_file_and_directory_error_with_slash_non_empty_file_only_for_google_non_recursive(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]]): +async def test_file_and_directory_error_with_slash_non_empty_file_only_for_google_non_recursive(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], copy_tool): sema, fs, bases = router_filesystem src_base = await fresh_dir(fs, bases, 'gs') @@ -484,7 +518,7 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only_for_googl for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): dest_base = await fresh_dir(fs, bases, 'gs') - await Copier.copy(fs, sema, Transfer(f'{src_base}empty-only/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool(fs, sema, Transfer(f'{src_base}empty-only/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) # We ignore empty directories when copying with pytest.raises(FileNotFoundError): @@ -492,7 +526,7 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only_for_googl @pytest.mark.asyncio -async def test_file_and_directory_error_with_slash_empty_file_only(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str): +async def test_file_and_directory_error_with_slash_empty_file_only(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str, copy_tool): sema, fs, bases = router_filesystem src_base = await fresh_dir(fs, bases, cloud_scheme) @@ -504,13 +538,13 @@ async def test_file_and_directory_error_with_slash_empty_file_only(router_filesy for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}empty-only/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool(fs, sema, Transfer(f'{src_base}empty-only/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) with pytest.raises(FileNotFoundError): await collect_files(await fs.listfiles(f'{dest_base}empty-only/', recursive=True)) dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) @pytest.mark.asyncio @@ -529,7 +563,7 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only_google_no @pytest.mark.asyncio -async def test_file_and_directory_error_with_slash_non_empty_file_only(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str): +async def test_file_and_directory_error_with_slash_non_empty_file_only(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str, copy_tool): sema, fs, bases = router_filesystem src_base = await fresh_dir(fs, bases, cloud_scheme) @@ -545,8 +579,8 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only(router_fi for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}not-empty-file-w-slash/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool(fs, sema, Transfer(f'{src_base}not-empty-file-w-slash/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type))