diff --git a/hail/Makefile b/hail/Makefile index 0398e46b59a..ec6127e6a3f 100644 --- a/hail/Makefile +++ b/hail/Makefile @@ -186,6 +186,8 @@ pytest-inter-cloud: upload-remote-test-resources install-editable HAIL_TEST_S3_BUCKET=hail-test-dy5rg \ HAIL_TEST_AZURE_ACCOUNT=hailtest \ HAIL_TEST_AZURE_CONTAINER=hail-test-4nxei \ + HAIL_TEST_AZURE_RESGRP=hail-dev \ + HAIL_TEST_AZURE_SUBID=22cd45fe-f996-4c51-af67-ef329d977519 \ $(HAIL_PYTHON3) -m pytest \ -Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \ --log-cli-level=INFO \ diff --git a/hail/python/hailtop/aiocloud/aioaws/fs.py b/hail/python/hailtop/aiocloud/aioaws/fs.py index b45bcbae7cc..c7a7f71f4dc 100644 --- a/hail/python/hailtop/aiocloud/aioaws/fs.py +++ b/hail/python/hailtop/aiocloud/aioaws/fs.py @@ -93,8 +93,11 @@ def __init__(self, head_object_resp, url: str): self.head_object_resp = head_object_resp self._url = url + def __repr__(self): + return f'S3HeadObjectFileStatus({self.head_object_resp}, {self._url})' + def basename(self) -> str: - return os.path.basename(self._url.rstrip('/')) + return os.path.basename(self._url) def url(self) -> str: return self._url @@ -121,8 +124,11 @@ def __init__(self, item: Dict[str, Any], url: str): self._item = item self._url = url + def __repr__(self): + return f'S3ListFilesFileStatus({self._item}, {self._url})' + def basename(self) -> str: - return os.path.basename(self._url.rstrip('/')) + return os.path.basename(self._url) def url(self) -> str: return self._url @@ -195,8 +201,15 @@ def __init__(self, bucket: str, key: str, item: Optional[Dict[str, Any]]): self._item = item self._status: Optional[S3ListFilesFileStatus] = None + def __repr__(self): + return f'S3FileListEntry({self._bucket}, {self._key}, {self._item})' + def basename(self) -> str: - return os.path.basename(self._key.rstrip('/')) + object_name = self._key + if self._is_dir(): + assert object_name[-1] == '/' + object_name = object_name[:-1] + return os.path.basename(object_name) async def url(self) -> str: return f's3://{self._bucket}/{self._key}' @@ -204,9 +217,12 @@ async def url(self) -> str: async def is_file(self) -> bool: return self._item is not None - async def is_dir(self) -> bool: + def _is_dir(self) -> bool: return self._item is None + async def is_dir(self) -> bool: + return self._is_dir() + async def status(self) -> FileStatus: if self._status is None: if self._item is None: diff --git a/hail/python/hailtop/aiocloud/aioazure/fs.py b/hail/python/hailtop/aiocloud/aioazure/fs.py index 9db30e9cc1d..8acdc0c7257 100644 --- a/hail/python/hailtop/aiocloud/aioazure/fs.py +++ b/hail/python/hailtop/aiocloud/aioazure/fs.py @@ -243,8 +243,15 @@ def __init__(self, url: 'AzureAsyncFSURL', blob_props: Optional[BlobProperties]) self._blob_props = blob_props self._status: Optional[AzureFileStatus] = None + def __repr__(self): + return f'AzureFileListEntry({self._url}, {self._blob_props})' + def basename(self) -> str: - return os.path.basename(self._url.base.rstrip('/')) + url_no_sas = self._url.base + if self._is_dir(): + assert url_no_sas[-1] == '/' + url_no_sas = url_no_sas[:-1] + return os.path.basename(url_no_sas) async def url(self) -> str: return self._url.base @@ -255,9 +262,12 @@ async def url_full(self) -> str: async def is_file(self) -> bool: return self._blob_props is not None - async def is_dir(self) -> bool: + def _is_dir(self) -> bool: return self._blob_props is None + async def is_dir(self) -> bool: + return self._is_dir() + async def status(self) -> FileStatus: if self._status is None: if self._blob_props is None: @@ -271,8 +281,11 @@ def __init__(self, blob_props: BlobProperties, url: 'AzureAsyncFSURL'): self.blob_props = blob_props self._url = url + def __repr__(self): + return f'AzureFileStatus({self.blob_props}, {self._url})' + def basename(self) -> str: - return os.path.basename(self._url.base.rstrip('/')) + return os.path.basename(self._url.base) def url(self) -> str: return str(self._url) @@ -546,9 +559,11 @@ async def isfile(self, url: str) -> bool: @handle_public_access_error async def isdir(self, url: str) -> bool: fs_url = self.parse_url(url, error_if_bucket=True) - assert not fs_url.path or fs_url.path.endswith('/'), fs_url.path client = await self.get_container_client(fs_url) - async for _ in client.walk_blobs(name_starts_with=fs_url.path, include=['metadata'], delimiter='/'): + prefix = fs_url.path + if prefix[-1] != '/': + prefix = prefix + '/' + async for _ in client.walk_blobs(name_starts_with=prefix, include=['metadata'], delimiter='/'): return True return False diff --git a/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py b/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py index 382c5541735..d72b7ee9e52 100644 --- a/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py +++ b/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py @@ -80,12 +80,13 @@ async def _cleanup_future(fut: asyncio.Future): class InsertObjectStream(WritableStream): - def __init__(self, it: FeedableAsyncIterable[bytes], request_task: asyncio.Task[aiohttp.ClientResponse]): + def __init__(self, it: FeedableAsyncIterable[bytes], request_task: asyncio.Task[aiohttp.ClientResponse], url: str): super().__init__() self._it = it self._request_task = request_task self._value = None self._exit_stack = AsyncExitStack() + self.url = url async def cleanup_request_task(): if not self._request_task.cancelled(): @@ -364,7 +365,7 @@ async def insert_object(self, bucket: str, name: str, **kwargs) -> WritableStrea f'https://storage.googleapis.com/upload/storage/v1/b/{bucket}/o', retry=False, **kwargs ) ) - return InsertObjectStream(it, request_task) + return InsertObjectStream(it, request_task, 'gs://' + bucket + '/' + name) # Write using resumable uploads. See: # https://cloud.google.com/storage/docs/performing-resumable-uploads @@ -461,8 +462,11 @@ def __init__(self, items: Dict[str, str], url: str): self._items = items self._url = url + def __repr__(self): + return f'GetObjectFileStatus({self._items}, {self._url})' + def basename(self) -> str: - return os.path.basename(self._url.rstrip('/')) + return os.path.basename(self._url) def url(self) -> str: return self._url @@ -487,8 +491,15 @@ def __init__(self, bucket: str, name: str, items: Optional[Dict[str, Any]]): self._items = items self._status: Optional[GetObjectFileStatus] = None + def __repr__(self): + return f'GoogleStorageFileListEntry({self._bucket}, {self._name}, {self._items})' + def basename(self) -> str: - return os.path.basename(self._name.rstrip('/')) + object_name = self._name + if self._is_dir(): + assert object_name[-1] == '/' + object_name = object_name[:-1] + return os.path.basename(object_name) async def url(self) -> str: return f'gs://{self._bucket}/{self._name}' @@ -496,9 +507,12 @@ async def url(self) -> str: async def is_file(self) -> bool: return self._items is not None - async def is_dir(self) -> bool: + def _is_dir(self) -> bool: return self._items is None + async def is_dir(self) -> bool: + return self._is_dir() + async def status(self) -> FileStatus: if self._status is None: if self._items is None: @@ -856,7 +870,9 @@ async def isfile(self, url: str) -> bool: async def isdir(self, url: str) -> bool: fsurl = self.parse_url(url, error_if_bucket=True) - assert not fsurl._path or fsurl.path.endswith('/'), fsurl._path + prefix = fsurl._path + if len(prefix) > 0 and prefix[-1] != '/': + prefix += '/' params = {'prefix': fsurl._path, 'delimiter': '/', 'includeTrailingDelimiter': 'true', 'maxResults': 1} async for page in await self._storage_client.list_objects(fsurl._bucket, params=params): prefixes = page.get('prefixes') diff --git a/hail/python/hailtop/aiotools/plan.py b/hail/python/hailtop/aiotools/plan.py new file mode 100644 index 00000000000..6545c3791fb --- /dev/null +++ b/hail/python/hailtop/aiotools/plan.py @@ -0,0 +1,276 @@ +import asyncio +import os +from contextlib import AsyncExitStack +from typing import List, Optional, Tuple + +from hailtop.aiotools import FileAndDirectoryError + +from ..utils.rich_progress_bar import CopyToolProgressBar, Progress +from .fs import AsyncFS, FileListEntry, FileStatus, WritableStream +from .router_fs import RouterAsyncFS + + +class PlanError(ValueError): + pass + + +async def plan( + folder: str, + copy_to: List[Tuple[str, str]], + copy_into: List[Tuple[str, str]], + gcs_requester_pays_project: Optional[str], + verbose: bool, + max_parallelism: int, +): + if gcs_requester_pays_project: + gcs_kwargs = {'gcs_requester_pays_configuration': gcs_requester_pays_project} + else: + gcs_kwargs = {} + + total_n_files = 0 + total_n_bytes = 0 + + async with AsyncExitStack() as cleanups: + fs = await cleanups.enter_async_context(RouterAsyncFS(gcs_kwargs=gcs_kwargs)) + source_destination_pairs = [ + *copy_to, + *((src, copied_into_path(fs, source_path=src, folder_path=dst)) for src, dst in copy_into), + ] + + if any(await asyncio.gather(fs.isfile(folder), fs.isdir(folder.rstrip('/') + '/'))): + raise PlanError(f'plan folder already exists: {folder}', 1) + + await fs.mkdir(folder) + matches = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'matches'))) + differs = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'differs'))) + srconly = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'srconly'))) + dstonly = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'dstonly'))) + plan = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'plan'))) + + progress = cleanups.enter_context(CopyToolProgressBar(transient=True, disable=not verbose)) + + for src, dst in source_destination_pairs: + n_files, n_bytes = await find_all_copy_pairs( + fs, + matches, + differs, + srconly, + dstonly, + plan, + src, + dst, + progress, + asyncio.Semaphore(max_parallelism), + source_must_exist=True, + ) + total_n_files += n_files + total_n_bytes += n_bytes + + summary = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'summary'))) + await summary.write((f'{total_n_files}\t{total_n_bytes}\n').encode('utf-8')) + + +def copied_into_path(fs: AsyncFS, *, source_path: str, folder_path: str) -> str: + src_url = fs.parse_url(source_path) + dest_url = fs.parse_url(folder_path) + src_basename = os.path.basename(src_url.path.rstrip('/')) + return str(dest_url.with_new_path_component(src_basename)) + + +class FileStat: + def __init__(self, basename, url, is_dir, size): + self.basename = basename + self.url = url + self.is_dir = is_dir + self.size = size + + def __repr__(self) -> str: + return f'FileStat({self.basename}, {self.url}, {self.is_dir}, {self.size})' + + @staticmethod + async def from_file_list_entry(x: FileListEntry) -> 'FileStat': + url, is_dir = await asyncio.gather(x.url(), x.is_dir()) + if is_dir: + size = 0 + else: + size = await (await x.status()).size() + return FileStat(x.basename(), url, is_dir, size) + + @staticmethod + async def from_file_status(x: FileStatus) -> 'FileStat': + return FileStat(x.basename(), x.url(), False, await x.size()) + + +async def listfiles(fs: AsyncFS, x: str) -> List[FileStat]: + try: + it = await fs.listfiles(x, recursive=True) + return [await FileStat.from_file_list_entry(x) async for x in it] + except (FileNotFoundError, NotADirectoryError): + return [] + + +async def statfile(fs: AsyncFS, x: str) -> Optional[FileStat]: + try: + file_status = await fs.statfile(x) + return await FileStat.from_file_status(file_status) + except FileNotFoundError: + return None + + +async def writeline(file: WritableStream, *columns: str): + await file.write(('\t'.join(columns) + '\n').encode('utf-8')) + + +def relativize_url(folder: str, file: str) -> str: + if folder[-1] != '/': + folder = folder + '/' + relative_path = file.removeprefix(folder) + assert relative_path[0] != '/', (relative_path, folder, file) + return relative_path + + +async def find_all_copy_pairs( + fs: AsyncFS, + matches: WritableStream, + differs: WritableStream, + srconly: WritableStream, + dstonly: WritableStream, + plan: WritableStream, + src: str, + dst: str, + progress: Progress, + sema: asyncio.Semaphore, + source_must_exist: bool, +) -> Tuple[int, int]: + async with sema: + srcstat, srcfiles, dststat, dstfiles = await asyncio.gather( + statfile(fs, src), + listfiles(fs, src), + statfile(fs, dst), + listfiles(fs, dst), + ) + + if srcstat: + if srcstat.url[-1] == '/': + if srcstat.size == 0: + srcstat = None + if srcfiles: + print(f'Not copying size-zero source file which shares a name with a source directory. {src}') + else: + print(f'Not copying size-zero source file whose name looks like a directory. {src}') + else: + raise PlanError( + f'Source is a non-size-zero file whose name looks like a directory. This is not supported. {src} {srcstat}', + 1, + ) + elif srcfiles: + raise PlanError( + f'Source is both a directory and a file (other than a size-zero file whose name ends in "/"). ' + f'This is not supported. {src} {srcstat}', + 1, + ) from FileAndDirectoryError(src) + if dststat: + if dststat.url[-1] == '/': + if dststat.size == 0: + dststat = None + if dstfiles: + print( + f'Ignoring size-zero destination file which shares a name with a destination directory. {dst}' + ) + else: + print(f'Ignoring size-zero destination file whose name looks like a directory. {dst}') + else: + raise PlanError( + f'Destination is a non-size-zero file whose name looks like a directory. This is not supported. {dst} {dststat}', + 1, + ) + elif dstfiles: + raise PlanError( + f'Destination identifies both a directory and a file (other than a size-zero file whose name ends ' + f'in "/"). This is not supported. {dst} {dststat}', + 1, + ) from FileAndDirectoryError(dst) + if srcstat and dstfiles: + raise PlanError( + f'Source is a file but destination is a directory. This is not supported. {src} -> {dst}', 1 + ) from IsADirectoryError(dst) + if srcfiles and dststat: + raise PlanError( + f'Source is a directory but destination is a file. This is not supported. {src} -> {dst}', 1 + ) from IsADirectoryError(src) + if source_must_exist and not srcstat and not srcfiles: + raise PlanError(f'Source is neither a folder nor a file: {src}', 1) from FileNotFoundError(src) + + if srcstat: + assert len(srcfiles) == 0 + assert len(dstfiles) == 0 + + if dststat: + if srcstat.size == dststat.size: + await writeline(matches, srcstat.url, dststat.url) + return 0, 0 + await writeline(differs, srcstat.url, dststat.url, str(srcstat.size), str(dststat.size)) + return 0, 0 + await writeline(srconly, srcstat.url) + await writeline(plan, srcstat.url, dst) + return 1, srcstat.size + + srcfiles.sort(key=lambda x: x.url) + dstfiles.sort(key=lambda x: x.url) + + tid = progress.add_task(description=src, total=len(srcfiles) + len(dstfiles)) + + srcidx = 0 + dstidx = 0 + + n_files = 0 + n_bytes = 0 + + while srcidx < len(srcfiles) and dstidx < len(dstfiles): + srcf = srcfiles[srcidx] + dstf = dstfiles[dstidx] + relsrcf = relativize_url(folder=src, file=srcf.url) + reldstf = relativize_url(folder=dst, file=dstf.url) + if relsrcf == reldstf: + if srcf.size == dstf.size: + await writeline(matches, srcf.url, dstf.url) + else: + await writeline(differs, srcf.url, dstf.url, str(srcf.size), str(dstf.size)) + dstidx += 1 + srcidx += 1 + progress.update(tid, advance=2) + elif relsrcf < reldstf: + await writeline(srconly, srcf.url) + await writeline(plan, srcf.url, os.path.join(dst, relativize_url(folder=src, file=srcf.url))) + n_files += 1 + n_bytes += srcf.size + srcidx += 1 + progress.update(tid, advance=1) + else: + assert relsrcf >= reldstf + dstidx += 1 + progress.update(tid, advance=1) + await writeline(dstonly, dstf.url) + + while srcidx < len(srcfiles): + srcf = srcfiles[srcidx] + + await writeline(srconly, srcf.url) + await writeline(plan, srcf.url, os.path.join(dst, relativize_url(folder=src, file=srcf.url))) + n_files += 1 + n_bytes += srcf.size + srcidx += 1 + progress.update(tid, advance=1) + + while dstidx < len(dstfiles): + dstf = dstfiles[dstidx] + + await writeline(dstonly, dstf.url) + dstidx += 1 + progress.update(tid, advance=1) + + # a short sleep ensures the progress bar is visible for a moment to the user + await asyncio.sleep(0.150) + progress.remove_task(tid) + + return n_files, n_bytes diff --git a/hail/python/hailtop/aiotools/sync.py b/hail/python/hailtop/aiotools/sync.py new file mode 100644 index 00000000000..d98fd43c3f5 --- /dev/null +++ b/hail/python/hailtop/aiotools/sync.py @@ -0,0 +1,191 @@ +import asyncio +import functools +import os +from typing import Optional + +from ..utils import bounded_gather2, retry_transient_errors +from ..utils.rich_progress_bar import CopyToolProgressBar, make_listener +from .copy import GrowingSempahore +from .fs.copier import Copier +from .fs.exceptions import UnexpectedEOFError +from .fs.fs import MultiPartCreate +from .router_fs import AsyncFS, RouterAsyncFS + + +class SyncError(ValueError): + pass + + +async def _copy_file_one_part( + fs: RouterAsyncFS, + srcfile: str, + size: int, + destfile: str, + files_listener, + bytes_listener, +) -> None: + assert not destfile.endswith('/') + + total_written = 0 + async with await fs.open(srcfile) as srcf: + try: + dest_cm = await fs.create(destfile, retry_writes=False) + except FileNotFoundError: + await fs.makedirs(os.path.dirname(destfile), exist_ok=True) + dest_cm = await fs.create(destfile) + + async with dest_cm as destf: + while True: + b = await srcf.read(Copier.BUFFER_SIZE) + if not b: + files_listener(-1) + bytes_listener(-total_written) + return + written = await destf.write(b) + assert written == len(b) + total_written += written + assert total_written == size + + +async def _copy_part( + fs: RouterAsyncFS, + part_size: int, + srcfile: str, + part_number: int, + this_part_size: int, + part_creator: MultiPartCreate, + bytes_listener, +) -> None: + total_written = 0 + async with await fs.open_from(srcfile, part_number * part_size, length=this_part_size) as srcf: + async with await part_creator.create_part( + part_number, part_number * part_size, size_hint=this_part_size + ) as destf: + n = this_part_size + while n > 0: + b = await srcf.read(min(Copier.BUFFER_SIZE, n)) + if len(b) == 0: + raise UnexpectedEOFError() + written = await destf.write(b) + assert written == len(b) + total_written += written + n -= len(b) + bytes_listener(-total_written) + + +async def _copy_file( + fs: RouterAsyncFS, + transfer_sema: asyncio.Semaphore, + srcfile: str, + destfile: str, + files_listener, + bytes_listener, +): + srcstat = await fs.statfile(srcfile) + + size = await srcstat.size() + part_size = fs.copy_part_size(destfile) + + if size <= part_size: + return await retry_transient_errors( + _copy_file_one_part, fs, srcfile, size, destfile, files_listener, bytes_listener + ) + + n_parts, rem = divmod(size, part_size) + if rem: + n_parts += 1 + + try: + part_creator = await fs.multi_part_create(transfer_sema, destfile, n_parts) + except FileNotFoundError: + await fs.makedirs(os.path.dirname(destfile), exist_ok=True) + part_creator = await fs.multi_part_create(transfer_sema, destfile, n_parts) + + async with part_creator: + + async def f(i): + this_part_size = rem if i == n_parts - 1 and rem else part_size + await retry_transient_errors( + _copy_part, fs, part_size, srcfile, i, this_part_size, part_creator, bytes_listener + ) + + await bounded_gather2( + transfer_sema, + *[functools.partial(f, i) for i in range(n_parts)], + cancel_on_error=True, + ) + files_listener(-1) + + +async def sync( + plan_folder: str, + gcs_requester_pays_project: Optional[str], + verbose: bool, + max_parallelism: int, +) -> None: + gcs_kwargs = {'gcs_requester_pays_configuration': gcs_requester_pays_project} + s3_kwargs = {'max_pool_connections': max_parallelism * 5, 'max_workers': max_parallelism} + + async with RouterAsyncFS(gcs_kwargs=gcs_kwargs, s3_kwargs=s3_kwargs) as fs: + if not all( + await asyncio.gather( + *( + fs.exists(os.path.join(plan_folder, x)) + for x in ('matches', 'differs', 'srconly', 'dstonly', 'plan', 'summary') + ) + ) + ): + raise SyncError('Run hailctl fs sync --make-plan first.', 1) + results = (await fs.read(os.path.join(plan_folder, 'summary'))).decode('utf-8') + n_files, n_bytes = (int(x) for x in results.split('\t')) + with CopyToolProgressBar(transient=True, disable=not verbose) as progress: + files_tid = progress.add_task(description='files', total=n_files, visible=verbose) + bytes_tid = progress.add_task(description='bytes', total=n_bytes, visible=verbose) + + files_listener = make_listener(progress, files_tid) + bytes_listener = make_listener(progress, bytes_tid) + + max_file_parallelism = max(1, max_parallelism // 10) + + initial_parallelism = min(10, max_parallelism) + initial_file_parallelism = min(10, max_file_parallelism) + + parallelism_tid = progress.add_task( + description='transfer parallelism', + completed=initial_parallelism, + total=max_parallelism, + visible=verbose, + ) + file_parallelism_tid = progress.add_task( + description='file parallelism', + completed=initial_file_parallelism, + total=max_file_parallelism, + visible=verbose, + ) + + async with GrowingSempahore( + initial_file_parallelism, max_file_parallelism, (progress, file_parallelism_tid) + ) as file_sema: + async with GrowingSempahore( + initial_parallelism, max_parallelism, (progress, parallelism_tid) + ) as transfer_sema: + await bounded_gather2( + file_sema, + *[ + functools.partial(_copy_file, fs, transfer_sema, src, dst, files_listener, bytes_listener) + async for src, dst in iterate_plan_file(plan_folder, fs) + ], + cancel_on_error=True, + ) + + +async def iterate_plan_file(plan_folder: str, fs: AsyncFS): + lineno = 0 + plan = (await fs.read(os.path.join(plan_folder, 'plan'))).decode('utf-8') + for line in plan.split('\n'): + if not line: + continue + parts = line.strip().split('\t') + if len(parts) != 2: + raise SyncError(f'Malformed plan line, {lineno}, must have exactly one tab: {line}', 1) + yield parts diff --git a/hail/python/hailtop/hailctl/__main__.py b/hail/python/hailtop/hailctl/__main__.py index 98e80043868..3f514d2b224 100644 --- a/hail/python/hailtop/hailctl/__main__.py +++ b/hail/python/hailtop/hailctl/__main__.py @@ -1,5 +1,7 @@ import os +from typing import cast +import click import typer from .auth import cli as auth_cli @@ -8,6 +10,7 @@ from .dataproc import cli as dataproc_cli from .describe import describe from .dev import cli as dev_cli +from .fs import cli as fs_cli from .hdinsight import cli as hdinsight_cli app = typer.Typer( @@ -69,4 +72,6 @@ async def _curl( def main(): - app() + click_app = cast(click.Group, typer.main.get_command(app)) + click_app.add_command(fs_cli.app) + click_app() diff --git a/hail/python/hailtop/hailctl/fs/__init__.py b/hail/python/hailtop/hailctl/fs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/hail/python/hailtop/hailctl/fs/cli.py b/hail/python/hailtop/hailctl/fs/cli.py new file mode 100644 index 00000000000..f27a7e57526 --- /dev/null +++ b/hail/python/hailtop/hailctl/fs/cli.py @@ -0,0 +1,190 @@ +import asyncio +import sys +from typing import List, Optional, Tuple, cast + +import click +import typer + +from hailtop import uvloopx +from hailtop.aiotools.plan import PlanError, plan +from hailtop.aiotools.sync import SyncError +from hailtop.aiotools.sync import sync as aiotools_sync + +app_without_click = typer.Typer( + name='fs', + no_args_is_help=True, + help='Use cloud object stores and local filesystems.', + pretty_exceptions_show_locals=False, +) + + +@app_without_click.callback() +def callback(): + """ + Typer app, including Click subapp + """ + + +@click.command() +@click.option( + '--copy-to', + help='Pairs of source and destination URL. May be specified multiple times. The destination is always treated as a file. See --copy-into to copy into a directory', + type=(str, str), + required=False, + multiple=True, + default=(), +) +@click.option( + '--copy-into', + help='Copies the source path into the target path. The target must not be a file.', + type=(str, str), + required=False, + multiple=True, + default=(), +) +@click.option( + '-v', + '--verbose', + help='Enable extra logging information including a progress bar.', + is_flag=True, + required=False, + default=False, +) +@click.option( + '--max-parallelism', help='The maximum number of concurrent requests.', type=int, required=False, default=75 +) +@click.option( + '--make-plan', + help='The folder in which to create a new synchronization plan. Must not exist.', + type=str, + required=False, +) +@click.option('--use-plan', help='The plan to execute. Must exist.', type=str, required=False) +@click.option( + '--gcs-requester-pays-project', help='The Google project to which to charge egress costs.', type=str, required=False +) +def sync( + copy_to: List[Tuple[str, str]], + copy_into: List[Tuple[str, str]], + verbose: bool, + max_parallelism: int, + make_plan: Optional[str] = None, + use_plan: Optional[str] = None, + gcs_requester_pays_project: Optional[str] = None, +): + """Synchronize files between one or more pairs of locations. + + If a corresponding file already exists at the destination with the same size in bytes, this + command will not copy it. If you want to replace files that have the exact same size in bytes, + delete the destination files first. THIS COMMAND DOES NOT CHECK MD5s OR SHAs! + + First generate a plan with --make-plan, then use the plan with --use-plan. + + ----- + + Plans + + ----- + + The command, `hailctl fs sync --make-plan` creates a "plan". A plan is a folder containing six + files: + + 1. matches: files whose names and sizes match. Two columns: source URL, destination URL. + + 2. differs: files or folders whose names match but either differ in size or differ in type. + Four columns: source URL, destination URL, source state, destination state. The states + are either: file, dif, or a size. If either state is a size, both states are sizes. + + 3. srconly: files only present in the source. One column: source URL. + + 4. dstonly: files only present in the destination. One column: destination URL. + + 5. plan: a proposed set of object-to-object copies. Two columns: source URL, destination URL. + + 6. summary: a one-line file containing the total number of copies in plan and the total number of bytes which would be copied. + + + -------- + + Examples + + -------- + + Copy all the files under gs://gcs-bucket/a to s3://s3-bucket/b. For example, + gs://gcs-bucket/a/b/c will appear at s3://s3-bucket/b/b/c: + + + + $ hailctl fs sync --make-plan plan1 --copy-to gs://gcs-bucket/a s3://s3-bucket/b + + + + $ hailctl fs sync --use-plan plan1 + + + + + Copy all the files under gs://gcs-bucket/a into the (possibly not existing) s3://s3-bucket/b folder. + For example, gs://gcs-bucket/a/b/c will appear at s3://s3-bucket/b/a/b/c: + + + + $ hailctl fs sync --make-plan plan1 --copy-into gs://gcs-bucket/a s3://s3-bucket/b + + + + $ hailctl fs sync --use-plan plan1 + + + + + Sync to cloud locations and then regenerate a plan folder to verify that all copies completed + successfully: + + + + $ hailctl fs sync --make-plan plan1 --copy-to gs://gcs-bucket/a s3://s3-bucket/b + + + + $ hailctl fs sync --use-plan plan1 + + + + $ hailctl fs sync --make-plan plan1 --copy-to gs://gcs-bucket/a s3://s3-bucket/b + + + + $ cat plan1/summary + + + + $ less plan1/plan + """ + uvloopx.install() + + if (make_plan is None and use_plan is None) or (make_plan is not None and use_plan is not None): + print('Must specify one of --make-plan or --use-plan. See hailctl fs sync --help.') + raise typer.Exit(1) + + if make_plan: + try: + asyncio.run(plan(make_plan, copy_to, copy_into, gcs_requester_pays_project, verbose, max_parallelism)) + except PlanError as err: + print('ERROR: ' + err.args[0]) + sys.exit(err.args[1]) + if use_plan: + if copy_to or copy_into: + print( + 'Do not specify --copy-to or --copy-into with --use-plan. Create the plan with --make-plan then call --use-plan without any --copy-to and --copy-into.' + ) + raise typer.Exit(1) + try: + asyncio.run(aiotools_sync(use_plan, gcs_requester_pays_project, verbose, max_parallelism)) + except SyncError as err: + print('ERROR: ' + err.args[0]) + sys.exit(err.args[1]) + + +app = cast(click.Group, typer.main.get_command(app_without_click)) +app.add_command(sync) diff --git a/hail/python/test/hailtop/inter_cloud/conftest.py b/hail/python/test/hailtop/inter_cloud/conftest.py index 841650d9b6e..65caca0053a 100644 --- a/hail/python/test/hailtop/inter_cloud/conftest.py +++ b/hail/python/test/hailtop/inter_cloud/conftest.py @@ -45,3 +45,8 @@ async def router_filesystem() -> AsyncIterator[Tuple[asyncio.Semaphore, AsyncFS, assert not await fs.isdir(gs_base) assert not await fs.isdir(s3_base) assert not await fs.isdir(azure_base) + + +@pytest.fixture(params=['gs', 's3', 'azure-https'], scope='module') +async def cloud_scheme(request): + yield request.param diff --git a/hail/python/test/hailtop/inter_cloud/test_copy.py b/hail/python/test/hailtop/inter_cloud/test_copy.py index baaa4fbb1d3..49d6ac29b98 100644 --- a/hail/python/test/hailtop/inter_cloud/test_copy.py +++ b/hail/python/test/hailtop/inter_cloud/test_copy.py @@ -1,10 +1,21 @@ import asyncio +import os import secrets +import tempfile from typing import AsyncIterator, Dict, List, Tuple import pytest -from hailtop.aiotools import AsyncFS, Copier, FileAndDirectoryError, FileListEntry, Transfer +from hailtop.aiotools import ( + AsyncFS, + Copier, + FileAndDirectoryError, + FileListEntry, + Transfer, +) +from hailtop.aiotools.plan import PlanError, plan +from hailtop.aiotools.router_fs import RouterAsyncFS +from hailtop.aiotools.sync import SyncError, sync from hailtop.utils import url_scheme from .copy_test_specs import COPY_TEST_SPECS @@ -24,11 +35,6 @@ async def test_spec(request): return request.param -@pytest.fixture(params=['gs', 's3', 'azure-https']) -async def cloud_scheme(request): - yield request.param - - @pytest.fixture( params=[ 'file/file', @@ -91,6 +97,61 @@ async def expect_file(fs, path, expected): assert actual == expected, (actual, expected) +async def copier_copy(fs, sema, transfer): + await Copier.copy(fs, sema, transfer) + + +async def sync_tool(fs: RouterAsyncFS, sema, transfer: Transfer): + max_parallelism = sema._value + + sources = transfer.src + if isinstance(sources, str): + sources = [sources] + + copy_to = [] + copy_into = [] + + if transfer.treat_dest_as == Transfer.DEST_DIR or ( + transfer.treat_dest_as == Transfer.INFER_DEST and await fs.isdir(transfer.dest) + ): + copy_into = [(src, transfer.dest) for src in sources] + elif transfer.treat_dest_as == Transfer.DEST_IS_TARGET or ( + transfer.treat_dest_as == Transfer.INFER_DEST and not await fs.isdir(transfer.dest) + ): + if await fs.isfile(transfer.dest): + if len(sources) > 1 or await fs.isdir(sources[0]): + raise NotADirectoryError(transfer.dest) + copy_to = [(src, transfer.dest) for src in sources] + else: + raise ValueError(f'unsupported treat_dest_as: {transfer.treat_dest_as}') + + with tempfile.TemporaryDirectory() as folder: + try: + await plan( + os.path.join(folder, 'planfolder'), + copy_to, + copy_into, + None, + True, + max_parallelism, + ) + await sync(os.path.join(folder, 'planfolder'), None, True, max_parallelism) + except (PlanError, SyncError) as err: + if err.__cause__: + raise err.__cause__ + else: + raise err + + +@pytest.fixture(params=['Copier.copy', 'hailctl_sync']) +def copy_tool(request): + if request.param == 'Copier.copy': + return copier_copy + if request.param == 'hailctl_sync': + return sync_tool + raise ValueError('bad: ' + request.param) + + class DidNotRaiseError(Exception): pass @@ -120,24 +181,24 @@ def __exit__(self, type, value, traceback): return True -async def test_copy_doesnt_exist(copy_test_context): +async def test_copy_doesnt_exist(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context with pytest.raises(FileNotFoundError): - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base)) -async def test_copy_file(copy_test_context): +async def test_copy_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a', 'src/a') -async def test_copy_large_file(copy_test_context): +async def test_copy_large_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context # mainly needs to be larger than the transfer block size (8K) @@ -145,44 +206,54 @@ async def test_copy_large_file(copy_test_context): async with await fs.create(f'{src_base}a') as f: await f.write(contents) - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) async with await fs.open(f'{dest_base}a') as f: copy_contents = await f.read() assert copy_contents == contents -async def test_copy_rename_file(copy_test_context): +async def test_copy_rename_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) + await copy_tool(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) await expect_file(fs, f'{dest_base}x', 'src/a') -async def test_copy_rename_file_dest_target_file(copy_test_context): +async def test_copy_rename_file_dest_target_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET), + ) await expect_file(fs, f'{dest_base}x', 'src/a') -async def test_copy_file_dest_target_directory_doesnt_exist(copy_test_context): +async def test_copy_file_dest_target_directory_doesnt_exist(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') # SourceCopier._copy_file creates destination directories as needed - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_DIR)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_DIR), + ) await expect_file(fs, f'{dest_base}x/a', 'src/a') -async def test_overwrite_rename_file(copy_test_context): +async def test_overwrite_rename_file( + copy_test_context, +): # hailctl fs sync does not support overwriting sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') @@ -193,153 +264,183 @@ async def test_overwrite_rename_file(copy_test_context): await expect_file(fs, f'{dest_base}x', 'src/a') -async def test_copy_rename_dir(copy_test_context): +async def test_copy_rename_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) + await copy_tool(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) await expect_file(fs, f'{dest_base}x/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}x/subdir/file2', 'src/a/subdir/file2') -async def test_copy_rename_dir_dest_is_target(copy_test_context): +async def test_copy_rename_dir_dest_is_target(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET), + ) await expect_file(fs, f'{dest_base}x/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}x/subdir/file2', 'src/a/subdir/file2') async def test_overwrite_rename_dir(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support overwrite sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') await create_test_dir(fs, 'dest', dest_base, 'x/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET), + ) await expect_file(fs, f'{dest_base}x/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}x/subdir/file2', 'src/a/subdir/file2') await expect_file(fs, f'{dest_base}x/file3', 'dest/x/file3') -async def test_copy_file_dest_trailing_slash_target_dir(copy_test_context): +async def test_copy_file_dest_trailing_slash_target_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base, treat_dest_as=Transfer.DEST_DIR)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base, treat_dest_as=Transfer.DEST_DIR)) await expect_file(fs, f'{dest_base}a', 'src/a') -async def test_copy_file_dest_target_dir(copy_test_context): +async def test_copy_file_dest_target_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_DIR)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_DIR), + ) await expect_file(fs, f'{dest_base}a', 'src/a') -async def test_copy_file_dest_target_file(copy_test_context): +async def test_copy_file_dest_target_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}a', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', f'{dest_base}a', treat_dest_as=Transfer.DEST_IS_TARGET), + ) await expect_file(fs, f'{dest_base}a', 'src/a') -async def test_copy_dest_target_file_is_dir(copy_test_context): +async def test_copy_dest_target_file_is_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') with RaisesOrObjectStore(dest_base, IsADirectoryError): - await Copier.copy( - fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET) + await copy_tool( + fs, + sema, + Transfer( + f'{src_base}a', + dest_base.rstrip('/'), + treat_dest_as=Transfer.DEST_IS_TARGET, + ), ) async def test_overwrite_file(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support overwriting sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') await create_test_file(fs, 'dest', dest_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a', 'src/a') async def test_copy_file_src_trailing_slash(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support trailing slashes on files sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') with pytest.raises(FileNotFoundError): - await Copier.copy(fs, sema, Transfer(f'{src_base}a/', dest_base)) + await copy_tool(fs, sema, Transfer(f'{src_base}a/', dest_base)) -async def test_copy_dir(copy_test_context): +async def test_copy_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}a/subdir/file2', 'src/a/subdir/file2') async def test_overwrite_dir(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support overwrite sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') await create_test_dir(fs, 'dest', dest_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}a/subdir/file2', 'src/a/subdir/file2') await expect_file(fs, f'{dest_base}a/file3', 'dest/a/file3') -async def test_copy_multiple(copy_test_context): +async def test_copy_multiple(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') await create_test_file(fs, 'src', src_base, 'b') - await Copier.copy(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a', 'src/a') await expect_file(fs, f'{dest_base}b', 'src/b') -async def test_copy_multiple_dest_target_file(copy_test_context): +async def test_copy_multiple_dest_target_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') await create_test_file(fs, 'src', src_base, 'b') with RaisesOrObjectStore(dest_base, NotADirectoryError): - await Copier.copy( + await copy_tool( fs, sema, - Transfer([f'{src_base}a', f'{src_base}b'], dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET), + Transfer( + [f'{src_base}a', f'{src_base}b'], + dest_base.rstrip('/'), + treat_dest_as=Transfer.DEST_IS_TARGET, + ), ) -async def test_copy_multiple_dest_file(copy_test_context): +async def test_copy_multiple_dest_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') @@ -347,22 +448,28 @@ async def test_copy_multiple_dest_file(copy_test_context): await create_test_file(fs, 'dest', dest_base, 'x') with RaisesOrObjectStore(dest_base, NotADirectoryError): - await Copier.copy(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], f'{dest_base}x')) + await copy_tool(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], f'{dest_base}x')) -async def test_file_overwrite_dir(copy_test_context): +async def test_file_overwrite_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') with RaisesOrObjectStore(dest_base, IsADirectoryError): - await Copier.copy( - fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET) + await copy_tool( + fs, + sema, + Transfer( + f'{src_base}a', + dest_base.rstrip('/'), + treat_dest_as=Transfer.DEST_IS_TARGET, + ), ) async def test_file_and_directory_error( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str + router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str, copy_tool ): sema, fs, bases = router_filesystem @@ -373,18 +480,22 @@ async def test_file_and_directory_error( await create_test_file(fs, 'src', src_base, 'a/subfile') with pytest.raises(FileAndDirectoryError): - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) -async def test_copy_src_parts(copy_test_context): +async def test_copy_src_parts(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy( + await copy_tool( fs, sema, - Transfer([f'{src_base}a/file1', f'{src_base}a/subdir'], dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_DIR), + Transfer( + [f'{src_base}a/file1', f'{src_base}a/subdir'], + dest_base.rstrip('/'), + treat_dest_as=Transfer.DEST_DIR, + ), ) await expect_file(fs, f'{dest_base}file1', 'src/a/file1') @@ -400,39 +511,72 @@ async def collect_files(it: AsyncIterator[FileListEntry]) -> List[str]: return [await x.url() async for x in it] -async def test_file_and_directory_error_with_slash_empty_file( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str +async def test_size_zero_directory_named_files_are_not_copied_but_do_not_impair_other_copies( + router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], + cloud_scheme: str, + copy_tool, ): sema, fs, bases = router_filesystem src_base = await fresh_dir(fs, bases, cloud_scheme) - await write_file(fs, f'{src_base}empty/', '') - await write_file(fs, f'{src_base}empty/foo', b'foo') + await write_file(fs, f'{src_base}folder-and-file/', '') + await write_file(fs, f'{src_base}folder-and-file/just-a-file', b'just-a-file') + await write_file(fs, f'{src_base}folder-and-file/nested-folder-and-file/', '') + await write_file(fs, f'{src_base}folder-and-file/nested-folder-and-file/nested-just-a-file', b'nested-just-a-file') + await write_file(fs, f'{src_base}folder-and-file/file-with-trailing-slash-but-not-a-folder/', '') await collect_files(await fs.listfiles(f'{src_base}')) await collect_files(await fs.listfiles(f'{src_base}', recursive=True)) - await collect_files(await fs.listfiles(f'{src_base}empty/')) - await collect_files(await fs.listfiles(f'{src_base}empty/', recursive=True)) + await collect_files(await fs.listfiles(f'{src_base}folder-and-file/')) + await collect_files(await fs.listfiles(f'{src_base}folder-and-file/', recursive=True)) - for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): - dest_base = await fresh_dir(fs, bases, cloud_scheme) + for transfer_type in ( + Transfer.DEST_IS_TARGET, + Transfer.DEST_DIR, + Transfer.INFER_DEST, + ): + destination = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}empty/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}folder-and-file/', destination, treat_dest_as=transfer_type), + ) - await collect_files(await fs.listfiles(f'{dest_base}')) - await collect_files(await fs.listfiles(f'{dest_base}', recursive=True)) + if transfer_type == Transfer.DEST_DIR or (transfer_type == Transfer.INFER_DEST and destination[-1] == '/'): + destination = destination + 'folder-and-file/' - if transfer_type == Transfer.DEST_DIR: - exp_dest = f'{dest_base}empty/foo' - await expect_file(fs, exp_dest, 'foo') - assert not await fs.isfile(f'{dest_base}empty/') - assert await fs.isdir(f'{dest_base}empty/') - await collect_files(await fs.listfiles(f'{dest_base}empty/')) - await collect_files(await fs.listfiles(f'{dest_base}empty/', recursive=True)) - else: - exp_dest = f'{dest_base}foo' - await expect_file(fs, exp_dest, 'foo') + assert not await fs.isfile(destination), (transfer_type, destination) + assert await fs.isdir(destination), (transfer_type, destination) + + just_a_file_url = f'{destination}just-a-file' + assert await fs.read(just_a_file_url) == b'just-a-file', (transfer_type, destination) + + nested_folder_url = f'{destination}nested-folder-and-file/' + assert not await fs.exists(nested_folder_url), (transfer_type, destination) + + nested_just_a_file_url = f'{destination}nested-folder-and-file/nested-just-a-file' + assert await fs.read(nested_just_a_file_url) == b'nested-just-a-file', (transfer_type, destination) + + file_with_trailing_slash_url = f'{destination}file-with-trailing-slash-but-not-a-folder/' + assert not await fs.exists(file_with_trailing_slash_url), (transfer_type, destination) + + expected = [nested_folder_url, just_a_file_url] + actual = await collect_files(await fs.listfiles(destination)) + assert actual == expected, (transfer_type, destination) + + expected = [just_a_file_url, nested_just_a_file_url] + actual = await collect_files(await fs.listfiles(destination, recursive=True)) + assert actual == expected, (transfer_type, destination) + + expected = [nested_just_a_file_url] + actual = await collect_files(await fs.listfiles(nested_folder_url)) + assert actual == expected, (transfer_type, destination) + + expected = [nested_just_a_file_url] + actual = await collect_files(await fs.listfiles(nested_folder_url, recursive=True)) + assert actual == expected, (transfer_type, destination) async def test_file_and_directory_error_with_slash_non_empty_file_for_google_non_recursive( @@ -453,7 +597,9 @@ async def test_file_and_directory_error_with_slash_non_empty_file_for_google_non async def test_file_and_directory_error_with_slash_non_empty_file( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str + router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], + cloud_scheme: str, + copy_tool, ): sema, fs, bases = router_filesystem @@ -468,11 +614,21 @@ async def test_file_and_directory_error_with_slash_non_empty_file( with pytest.raises(FileAndDirectoryError): await collect_files(await fs.listfiles(f'{src_base}not-empty/', recursive=True)) - for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): + for transfer_type in ( + Transfer.DEST_IS_TARGET, + Transfer.DEST_DIR, + Transfer.INFER_DEST, + ): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy( - fs, sema, Transfer(f'{src_base}not-empty/bar', dest_base.rstrip('/'), treat_dest_as=transfer_type) + await copy_tool( + fs, + sema, + Transfer( + f'{src_base}not-empty/bar', + dest_base.rstrip('/'), + treat_dest_as=transfer_type, + ), ) if transfer_type == Transfer.DEST_DIR: exp_dest = f'{dest_base}bar' @@ -486,16 +642,26 @@ async def test_file_and_directory_error_with_slash_non_empty_file( with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy( - fs, sema, Transfer(f'{src_base}not-empty/', dest_base.rstrip('/'), treat_dest_as=transfer_type) + await copy_tool( + fs, + sema, + Transfer( + f'{src_base}not-empty/', + dest_base.rstrip('/'), + treat_dest_as=transfer_type, + ), ) with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type), + ) -async def test_file_and_directory_error_with_slash_non_empty_file_only_for_google_non_recursive( +async def test_copying_a_folder_with_only_a_size_zero_directory_named_file_copies_nothing_and_does_not_error_in_google( router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], ): sema, fs, bases = router_filesystem @@ -507,10 +673,22 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only_for_googl await collect_files(await fs.listfiles(f'{src_base}')) await collect_files(await fs.listfiles(f'{src_base}empty-only/')) - for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): + for transfer_type in ( + Transfer.DEST_IS_TARGET, + Transfer.DEST_DIR, + Transfer.INFER_DEST, + ): dest_base = await fresh_dir(fs, bases, 'gs') - await Copier.copy( - fs, sema, Transfer(f'{src_base}empty-only/', dest_base.rstrip('/'), treat_dest_as=transfer_type) + await Copier.copy( # hailctl fs sync errors when the source is an empty directory or + # non-extant file (excluding the size-zero, ending in trailing slash + # files). + fs, + sema, + Transfer( + f'{src_base}empty-only/', + dest_base.rstrip('/'), + treat_dest_as=transfer_type, + ), ) # We ignore empty directories when copying @@ -518,8 +696,9 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only_for_googl await collect_files(await fs.listfiles(f'{dest_base}empty-only/')) -async def test_file_and_directory_error_with_slash_empty_file_only( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str +async def test_copying_a_folder_with_only_a_size_zero_directory_named_file_copies_nothing_and_does_not_error( + router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], + cloud_scheme: str, ): sema, fs, bases = router_filesystem @@ -530,17 +709,35 @@ async def test_file_and_directory_error_with_slash_empty_file_only( await collect_files(await fs.listfiles(f'{src_base}', recursive=True)) await collect_files(await fs.listfiles(f'{src_base}empty-only/', recursive=True)) - for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): + for transfer_type in ( + Transfer.DEST_IS_TARGET, + Transfer.DEST_DIR, + Transfer.INFER_DEST, + ): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy( - fs, sema, Transfer(f'{src_base}empty-only/', dest_base.rstrip('/'), treat_dest_as=transfer_type) + await Copier.copy( # hailctl fs sync errors when the source is an empty directory or + # non-extant file (excluding the size-zero, ending in trailing slash + # files). + fs, + sema, + Transfer( + f'{src_base}empty-only/', + dest_base.rstrip('/'), + treat_dest_as=transfer_type, + ), ) with pytest.raises(FileNotFoundError): await collect_files(await fs.listfiles(f'{dest_base}empty-only/', recursive=True)) dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await Copier.copy( # hailctl fs sync errors when the source is an empty directory or + # non-extant file (excluding the size-zero, ending in trailing slash + # files). + fs, + sema, + Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type), + ) async def test_file_and_directory_error_with_slash_non_empty_file_only_google_non_recursive( @@ -560,7 +757,9 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only_google_no async def test_file_and_directory_error_with_slash_non_empty_file_only( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str + router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], + cloud_scheme: str, + copy_tool, ): sema, fs, bases = router_filesystem @@ -574,15 +773,27 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only( with pytest.raises(FileAndDirectoryError): await collect_files(await fs.listfiles(f'{src_base}not-empty-file-w-slash/', recursive=True)) - for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): + for transfer_type in ( + Transfer.DEST_IS_TARGET, + Transfer.DEST_DIR, + Transfer.INFER_DEST, + ): with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy( + await copy_tool( fs, sema, - Transfer(f'{src_base}not-empty-file-w-slash/', dest_base.rstrip('/'), treat_dest_as=transfer_type), + Transfer( + f'{src_base}not-empty-file-w-slash/', + dest_base.rstrip('/'), + treat_dest_as=transfer_type, + ), ) with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type), + ) diff --git a/hail/python/test/hailtop/inter_cloud/test_fs.py b/hail/python/test/hailtop/inter_cloud/test_fs.py index 689de9b5d28..e504e925626 100644 --- a/hail/python/test/hailtop/inter_cloud/test_fs.py +++ b/hail/python/test/hailtop/inter_cloud/test_fs.py @@ -537,6 +537,27 @@ async def test_basename_is_not_path(filesystem: Tuple[asyncio.Semaphore, AsyncFS assert (await fs.statfile(str(base.with_new_path_component('abc123')))).basename() == 'abc123' +async def test_basename_of_file_ending_in_slash_is_empty_in_cloud( + filesystem: Tuple[asyncio.Semaphore, AsyncFS, AsyncFSURL], +): + _, fs, base = filesystem + + if base.scheme in ('', 'file'): + return + + await fs.write(str(base.with_new_path_component('file-is-folder/')), b'') + + files = [x async for x in await fs.listfiles(str(base))] + assert len(files) == 1 + file = files[0] + assert file.basename() == 'file-is-folder' and file.is_dir() + + files = [x async for x in await fs.listfiles(str(await file.url()), exclude_trailing_slash_files=False)] + assert len(files) == 1 + file = files[0] + assert file.basename() == '' and file.is_file() + + async def test_listfiles(filesystem: Tuple[asyncio.Semaphore, AsyncFS, AsyncFSURL]): _, fs, base = filesystem diff --git a/hail/python/test/hailtop/inter_cloud/test_hailctl_fs_sync.py b/hail/python/test/hailtop/inter_cloud/test_hailctl_fs_sync.py new file mode 100644 index 00000000000..1b6701aba4f --- /dev/null +++ b/hail/python/test/hailtop/inter_cloud/test_hailctl_fs_sync.py @@ -0,0 +1,119 @@ +import asyncio +from typing import Dict, Tuple + +from hailtop.aiotools import AsyncFS +from hailtop.utils import check_exec_output + +from .utils import fresh_dir + + +async def test_cli_file_and_dir(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme): + sema, fs, bases = router_filesystem + + test_dir = await fresh_dir(fs, bases, cloud_scheme) + plandir = await fresh_dir(fs, bases, cloud_scheme) + plandir2 = await fresh_dir(fs, bases, cloud_scheme) + + await fs.write(f"{test_dir}file1", b"hello world\n") + + await check_exec_output( + 'hailctl', + 'fs', + 'sync', + '--make-plan', + plandir, + '--copy-to', + f'{test_dir}file1', + f'{test_dir}file2', + '--copy-into', + f'{test_dir}file1', + f'{test_dir}dir1', + ) + + await check_exec_output( + 'hailctl', + 'fs', + 'sync', + '--use-plan', + plandir, + ) + + expected_files = [f"{test_dir}file1", f"{test_dir}file2", f"{test_dir}dir1/file1"] + for url in expected_files: + assert await fs.read(url) == b"hello world\n" + + await check_exec_output( + 'hailctl', + 'fs', + 'sync', + '--make-plan', + plandir2, + '--copy-to', + f'{test_dir}file1', + f'{test_dir}file2', + '--copy-into', + f'{test_dir}file1', + f'{test_dir}dir1', + ) + assert await fs.read(plandir2 + 'differs') == b'' + assert await fs.read(plandir2 + 'dstonly') == b'' + assert await fs.read(plandir2 + 'srconly') == b'' + + +async def test_cli_subdir(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme): + sema, fs, bases = router_filesystem + + test_dir = await fresh_dir(fs, bases, cloud_scheme) + plandir = await fresh_dir(fs, bases, cloud_scheme) + plandir2 = await fresh_dir(fs, bases, cloud_scheme) + + await fs.makedirs(f"{test_dir}dir") + await fs.makedirs(f"{test_dir}dir/subdir") + await fs.write(f"{test_dir}dir/subdir/file1", b"hello world\n") + + await check_exec_output( + 'hailctl', 'fs', 'sync', '--make-plan', plandir, '--copy-to', f'{test_dir}dir', f'{test_dir}dir2' + ) + + await check_exec_output( + 'hailctl', + 'fs', + 'sync', + '--use-plan', + plandir, + ) + + assert await fs.read(f"{test_dir}dir2/subdir/file1") == b"hello world\n" + + await check_exec_output( + 'hailctl', 'fs', 'sync', '--make-plan', plandir2, '--copy-to', f'{test_dir}dir', f'{test_dir}dir2' + ) + + assert await fs.read(plandir2 + 'plan') == b'' + assert await fs.read(plandir2 + 'summary') == b'0\t0\n' + assert await fs.read(plandir2 + 'differs') == b'' + assert await fs.read(plandir2 + 'dstonly') == b'' + assert await fs.read(plandir2 + 'srconly') == b'' + assert await fs.read(plandir2 + 'matches') == f'{test_dir}dir/subdir/file1\t{test_dir}dir2/subdir/file1\n'.encode() + + +async def test_cli_already_synced(router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme): + sema, fs, bases = router_filesystem + + test_dir = await fresh_dir(fs, bases, cloud_scheme) + plandir = await fresh_dir(fs, bases, cloud_scheme) + + await fs.makedirs(f"{test_dir}dir") + await fs.write(f"{test_dir}dir/foo", b"hello world\n") + await fs.write(f"{test_dir}bar", b"hello world\n") + + await check_exec_output( + 'hailctl', 'fs', 'sync', '--make-plan', plandir, '--copy-to', f'{test_dir}dir/foo', f'{test_dir}bar' + ) + + assert await fs.read(plandir + 'plan') == b'' + assert await fs.read(plandir + 'summary') == b'0\t0\n' + assert await fs.read(plandir + 'differs') == b'' + assert await fs.read(plandir + 'dstonly') == b'' + assert await fs.read(plandir + 'srconly') == b'' + assert await fs.read(plandir + 'matches') == f'{test_dir}dir/foo\t{test_dir}bar\n'.encode()