diff --git a/hail/Makefile b/hail/Makefile index efbf54d860a..e98ea2795bf 100644 --- a/hail/Makefile +++ b/hail/Makefile @@ -208,6 +208,8 @@ pytest-inter-cloud: upload-remote-test-resources install-editable HAIL_TEST_S3_BUCKET=hail-test-dy5rg \ HAIL_TEST_AZURE_ACCOUNT=hailtest \ HAIL_TEST_AZURE_CONTAINER=hail-test-4nxei \ + HAIL_TEST_AZURE_RESGRP=hail-dev \ + HAIL_TEST_AZURE_SUBID=22cd45fe-f996-4c51-af67-ef329d977519 \ $(HAIL_PYTHON3) -m pytest \ -Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \ --log-cli-level=INFO \ diff --git a/hail/python/hailtop/aiocloud/aioaws/fs.py b/hail/python/hailtop/aiocloud/aioaws/fs.py index f2e369c5cd4..4672bcf6783 100644 --- a/hail/python/hailtop/aiocloud/aioaws/fs.py +++ b/hail/python/hailtop/aiocloud/aioaws/fs.py @@ -92,8 +92,11 @@ def __init__(self, head_object_resp, url: str): self.head_object_resp = head_object_resp self._url = url + def __repr__(self): + return f'S3HeadObjectFileStatus({self.head_object_resp}, {self._url})' + def basename(self) -> str: - return os.path.basename(self._url.rstrip('/')) + return os.path.basename(self._url) def url(self) -> str: return self._url @@ -120,8 +123,11 @@ def __init__(self, item: Dict[str, Any], url: str): self._item = item self._url = url + def __repr__(self): + return f'S3ListFilesFileStatus({self._item}, {self._url})' + def basename(self) -> str: - return os.path.basename(self._url.rstrip('/')) + return os.path.basename(self._url) def url(self) -> str: return self._url @@ -194,8 +200,15 @@ def __init__(self, bucket: str, key: str, item: Optional[Dict[str, Any]]): self._item = item self._status: Optional[S3ListFilesFileStatus] = None + def __repr__(self): + return f'S3FileListEntry({self._bucket}, {self._key}, {self._item})' + def basename(self) -> str: - return os.path.basename(self._key.rstrip('/')) + object_name = self._key + if self._is_dir(): + assert object_name[-1] == '/' + object_name = object_name[:-1] + return os.path.basename(object_name) async def url(self) -> str: return f's3://{self._bucket}/{self._key}' @@ -203,9 +216,12 @@ async def url(self) -> str: async def is_file(self) -> bool: return self._item is not None - async def is_dir(self) -> bool: + def _is_dir(self) -> bool: return self._item is None + async def is_dir(self) -> bool: + return self._is_dir() + async def status(self) -> FileStatus: if self._status is None: if self._item is None: diff --git a/hail/python/hailtop/aiocloud/aioazure/fs.py b/hail/python/hailtop/aiocloud/aioazure/fs.py index 780d2e73eca..d49a351762a 100644 --- a/hail/python/hailtop/aiocloud/aioazure/fs.py +++ b/hail/python/hailtop/aiocloud/aioazure/fs.py @@ -238,8 +238,15 @@ def __init__(self, url: 'AzureAsyncFSURL', blob_props: Optional[BlobProperties]) self._blob_props = blob_props self._status: Optional[AzureFileStatus] = None + def __repr__(self): + return f'AzureFileListEntry({self._url}, {self._blob_props})' + def basename(self) -> str: - return os.path.basename(self._url.base.rstrip('/')) + url_no_sas = self._url.base + if self._is_dir(): + assert url_no_sas[-1] == '/' + url_no_sas = url_no_sas[:-1] + return os.path.basename(url_no_sas) async def url(self) -> str: return self._url.base @@ -250,9 +257,12 @@ async def url_full(self) -> str: async def is_file(self) -> bool: return self._blob_props is not None - async def is_dir(self) -> bool: + def _is_dir(self) -> bool: return self._blob_props is None + async def is_dir(self) -> bool: + return self._is_dir() + async def status(self) -> FileStatus: if self._status is None: if self._blob_props is None: @@ -266,8 +276,11 @@ def __init__(self, blob_props: BlobProperties, url: 'AzureAsyncFSURL'): self.blob_props = blob_props self._url = url + def __repr__(self): + return f'AzureFileStatus({self.blob_props}, {self._url})' + def basename(self) -> str: - return os.path.basename(self._url.base.rstrip('/')) + return os.path.basename(self._url.base) def url(self) -> str: return str(self._url) @@ -546,9 +559,11 @@ async def isfile(self, url: str) -> bool: @handle_public_access_error async def isdir(self, url: str) -> bool: fs_url = self.parse_url(url) - assert not fs_url.path or fs_url.path.endswith('/'), fs_url.path client = self.get_container_client(fs_url) - async for _ in client.walk_blobs(name_starts_with=fs_url.path, include=['metadata'], delimiter='/'): + path = fs_url.path + if path[-1] != '/': + path = path + '/' + async for _ in client.walk_blobs(name_starts_with=path, include=['metadata'], delimiter='/'): return True return False diff --git a/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py b/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py index 8055cc2a71f..992d4f11ec2 100644 --- a/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py +++ b/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py @@ -445,8 +445,11 @@ def __init__(self, items: Dict[str, str], url: str): self._items = items self._url = url + def __repr__(self): + return f'GetObjectFileStatus({self._items}, {self._url})' + def basename(self) -> str: - return os.path.basename(self._url.rstrip('/')) + return os.path.basename(self._url) def url(self) -> str: return self._url @@ -471,8 +474,15 @@ def __init__(self, bucket: str, name: str, items: Optional[Dict[str, Any]]): self._items = items self._status: Optional[GetObjectFileStatus] = None + def __repr__(self): + return f'GoogleStorageFileListEntry({self._bucket}, {self._name}, {self._items})' + def basename(self) -> str: - return os.path.basename(self._name.rstrip('/')) + object_name = self._name + if self._is_dir(): + assert object_name[-1] == '/' + object_name = object_name[:-1] + return os.path.basename(object_name) async def url(self) -> str: return f'gs://{self._bucket}/{self._name}' @@ -480,9 +490,12 @@ async def url(self) -> str: async def is_file(self) -> bool: return self._items is not None - async def is_dir(self) -> bool: + def _is_dir(self) -> bool: return self._items is None + async def is_dir(self) -> bool: + return self._is_dir() + async def status(self) -> FileStatus: if self._status is None: if self._items is None: @@ -798,7 +811,8 @@ async def isfile(self, url: str) -> bool: async def isdir(self, url: str) -> bool: bucket, name = self.get_bucket_and_name(url) - assert not name or name.endswith('/'), name + if name[-1] != '/': + name = name + '/' params = {'prefix': name, 'delimiter': '/', 'includeTrailingDelimiter': 'true', 'maxResults': 1} async for page in await self._storage_client.list_objects(bucket, params=params): prefixes = page.get('prefixes') diff --git a/hail/python/hailtop/aiotools/copy.py b/hail/python/hailtop/aiotools/copy.py index 0e1d5d30b64..840c8b3587a 100644 --- a/hail/python/hailtop/aiotools/copy.py +++ b/hail/python/hailtop/aiotools/copy.py @@ -67,6 +67,14 @@ async def __aexit__(self, exc_type, exc, tb): self.task.cancel() +def only_update_completions(progress: Progress, tid): + def listen(delta: int): + if delta < 0: + progress.update(tid, advance=-delta) + + return listen + + async def copy( *, max_simultaneous_transfers: Optional[int] = None, @@ -76,6 +84,7 @@ async def copy( s3_kwargs: Optional[dict] = None, transfers: List[Transfer], verbose: bool = False, + totals: Optional[Tuple[int, int]] = None, ) -> None: with ThreadPoolExecutor() as thread_pool: if max_simultaneous_transfers is None: @@ -108,15 +117,22 @@ async def copy( ) as sema: file_tid = progress.add_task(description='files', total=0, visible=verbose) bytes_tid = progress.add_task(description='bytes', total=0, visible=verbose) + + if totals: + n_files, n_bytes = totals + progress.update(file_tid, total=n_files) + progress.update(bytes_tid, total=n_bytes) + file_listener = only_update_completions(progress, file_tid) + bytes_listener = only_update_completions(progress, bytes_tid) + else: + file_listener = make_listener(progress, file_tid) + bytes_listener = make_listener(progress, bytes_tid) + copy_report = await Copier.copy( - fs, - sema, - transfers, - files_listener=make_listener(progress, file_tid), - bytes_listener=make_listener(progress, bytes_tid), + fs, sema, transfers, files_listener=file_listener, bytes_listener=bytes_listener ) if verbose: - copy_report.summarize() + copy_report.summarize(include_sources=totals is None) def make_transfer(json_object: Dict[str, str]) -> Transfer: @@ -169,7 +185,7 @@ async def main() -> None: parser.add_argument( '-v', '--verbose', action='store_const', const=True, default=False, help='show logging information' ) - parser.add_argument('--timeout', type=str, default=None, help='show logging information') + parser.add_argument('--timeout', type=str, default=None, help='Set the total timeout for HTTP requests.') args = parser.parse_args() if args.verbose: diff --git a/hail/python/hailtop/aiotools/fs/copier.py b/hail/python/hailtop/aiotools/fs/copier.py index def7933a115..b934675f08f 100644 --- a/hail/python/hailtop/aiotools/fs/copier.py +++ b/hail/python/hailtop/aiotools/fs/copier.py @@ -143,7 +143,7 @@ def mark_done(self): self._end_time = time_msecs() self._duration = self._end_time - self._start_time - def summarize(self): + def summarize(self, include_sources: bool = True): source_reports = [] def add_source_reports(transfer_report): @@ -177,9 +177,10 @@ def add_source_reports(transfer_report): file_rate = total_files / (self._duration / 1000) print(f' Average file rate: {file_rate:,.1f}/s') - print('Sources:') - for sr in source_reports: - print(f' {sr._source}: {sr._files} files, {humanize.naturalsize(sr._bytes)}') + if include_sources: + print('Sources:') + for sr in source_reports: + print(f' {sr._source}: {sr._files} files, {humanize.naturalsize(sr._bytes)}') class SourceCopier: @@ -239,6 +240,7 @@ async def _copy_part( part_creator: MultiPartCreate, return_exceptions: bool, ) -> None: + total_written = 0 try: async with self.xfer_sema.acquire_manager(min(Copier.BUFFER_SIZE, this_part_size)): async with await self.router_fs.open_from( @@ -254,8 +256,9 @@ async def _copy_part( raise UnexpectedEOFError() written = await destf.write(b) assert written == len(b) - source_report.finish_bytes(written) + total_written += written n -= len(b) + source_report.finish_bytes(total_written) except Exception as e: if return_exceptions: source_report.set_exception(e) diff --git a/hail/python/hailtop/aiotools/plan.py b/hail/python/hailtop/aiotools/plan.py new file mode 100644 index 00000000000..1a9bdd6a64e --- /dev/null +++ b/hail/python/hailtop/aiotools/plan.py @@ -0,0 +1,331 @@ +from typing import List, Tuple, Optional +import asyncio +import os +import sys +from contextlib import AsyncExitStack +from hailtop.aiotools import FileAndDirectoryError + +from .router_fs import RouterAsyncFS +from .fs import FileListEntry, FileStatus, AsyncFS, WritableStream +from ..utils.rich_progress_bar import CopyToolProgressBar, Progress + +try: + import uvloop + + uvloop_install = uvloop.install +except ImportError as e: + if not sys.platform.startswith('win32'): + raise e + + def uvloop_install(): + pass + + +class PlanError(ValueError): + pass + + +async def plan( + folder: str, + copy_to: List[Tuple[str, str]], + copy_into: List[Tuple[str, str]], + gcs_requester_pays_project: Optional[str], + verbose: bool, + max_parallelism: int, +): + if gcs_requester_pays_project: + gcs_kwargs = {'gcs_requester_pays_configuration': gcs_requester_pays_project} + else: + gcs_kwargs = {} + + total_n_files = 0 + total_n_bytes = 0 + + async with AsyncExitStack() as cleanups: + fs = await cleanups.enter_async_context(RouterAsyncFS(gcs_kwargs=gcs_kwargs)) + source_destination_pairs = [ + *copy_to, + *((src, copied_into_path(fs, source_path=src, folder_path=dst)) for src, dst in copy_into), + ] + + if any(await asyncio.gather(fs.isfile(folder), fs.isdir(folder.rstrip('/') + '/'))): + raise PlanError(f'plan folder already exists: {folder}', 1) + + await fs.mkdir(folder) + matches = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'matches'))) + differs = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'differs'))) + srconly = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'srconly'))) + dstonly = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'dstonly'))) + plan = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'plan'))) + + progress = cleanups.enter_context(CopyToolProgressBar(transient=True, disable=not verbose)) + + for src, dst in source_destination_pairs: + n_files, n_bytes = await find_all_copy_pairs( + fs, + matches, + differs, + srconly, + dstonly, + plan, + src, + dst, + progress, + asyncio.Semaphore(max_parallelism), + source_must_exist=True, + ) + total_n_files += n_files + total_n_bytes += n_bytes + + summary = await cleanups.enter_async_context(await fs.create(os.path.join(folder, 'summary'))) + await summary.write((f'{total_n_files}\t{total_n_bytes}\n').encode('utf-8')) + + +def copied_into_path(fs: AsyncFS, *, source_path: str, folder_path: str) -> str: + src_url = fs.parse_url(source_path) + dest_url = fs.parse_url(folder_path) + src_basename = os.path.basename(src_url.path.rstrip('/')) + return str(dest_url.with_new_path_component(src_basename)) + + +class FileStat: + def __init__(self, basename, url, is_dir, size): + self.basename = basename + self.url = url + self.is_dir = is_dir + self.size = size + + def __repr__(self) -> str: + return f'FileStat({self.basename}, {self.url}, {self.is_dir}, {self.size})' + + @staticmethod + async def from_file_list_entry(x: FileListEntry) -> 'FileStat': + url, is_dir = await asyncio.gather(x.url(), x.is_dir()) + if is_dir: + size = 0 + else: + size = await (await x.status()).size() + return FileStat(x.basename(), url, is_dir, size) + + @staticmethod + async def from_file_status(x: FileStatus) -> 'FileStat': + return FileStat(x.basename(), x.url(), False, await x.size()) + + +async def listfiles(fs: AsyncFS, x: str) -> List[FileStat]: + try: + it = await fs.listfiles(x) + return [await FileStat.from_file_list_entry(x) async for x in it] + except (FileNotFoundError, NotADirectoryError): + return [] + + +async def statfile(fs: AsyncFS, x: str) -> Optional[FileStat]: + try: + file_status = await fs.statfile(x) + return await FileStat.from_file_status(file_status) + except FileNotFoundError: + return None + + +async def writeline(file: WritableStream, *columns: str): + await file.write(('\t'.join(columns) + '\n').encode('utf-8')) + + +def relativize_url(folder: str, file: str) -> str: + if folder[-1] != '/': + folder = folder + '/' + relative_path = file.removeprefix(folder) + assert relative_path[0] != '/' + return relative_path + + +async def find_all_copy_pairs( + fs: AsyncFS, + matches: WritableStream, + differs: WritableStream, + srconly: WritableStream, + dstonly: WritableStream, + plan: WritableStream, + src: str, + dst: str, + progress: Progress, + sema: asyncio.Semaphore, + source_must_exist: bool, +) -> Tuple[int, int]: + async with sema: + srcstat, srcfiles, dststat, dstfiles = await asyncio.gather( + statfile(fs, src), + listfiles(fs, src), + statfile(fs, dst), + listfiles(fs, dst), + ) + + if srcstat: + if srcstat.url[-1] == '/': + if srcstat.size == 0: + srcstat = None + if srcfiles: + print(f'Not copying size-zero source file which shares a name with a source directory. {src}') + else: + print(f'Not copying size-zero source file whose name looks like a directory. {src}') + else: + raise PlanError( + f'Source is a non-size-zero file whose name looks like a directory. This is not supported. {src} {srcstat}', + 1, + ) + elif srcfiles: + raise PlanError( + f'Source is both a directory and a file (other than a size-zero file whose name ends in "/"). ' + f'This is not supported. {src} {srcstat}', + 1, + ) from FileAndDirectoryError(src) + if dststat: + if dststat.url[-1] == '/': + if dststat.size == 0: + dststat = None + if dstfiles: + print( + f'Ignoring size-zero destination file which shares a name with a destination directory. {dst}' + ) + else: + print(f'Ignoring size-zero destination file whose name looks like a directory. {dst}') + else: + raise PlanError( + f'Destination is a non-size-zero file whose name looks like a directory. This is not supported. {dst} {dststat}', + 1, + ) + elif dstfiles: + raise PlanError( + f'Destination identifies both a directory and a file (other than a size-zero file whose name ends ' + f'in "/"). This is not supported. {dst} {dststat}', + 1, + ) from FileAndDirectoryError(dst) + if srcstat and dstfiles: + raise PlanError( + f'Source is a file but destination is a directory. This is not supported. {src} -> {dst}', 1 + ) from IsADirectoryError(dst) + if srcfiles and dststat: + raise PlanError( + f'Source is a directory but destination is a file. This is not supported. {src} -> {dst}', 1 + ) from IsADirectoryError(src) + if source_must_exist and not srcstat and not srcfiles: + raise PlanError(f'Source is neither a folder nor a file: {src}', 1) from FileNotFoundError(src) + + if srcstat: + assert len(srcfiles) == 0 + assert len(dstfiles) == 0 + + if dststat: + if srcstat.size == dststat.size: + await writeline(matches, srcstat.url, dststat.url) + return 0, 0 + await writeline(differs, srcstat.url, dststat.url, str(srcstat.size), str(dststat.size)) + return 0, 0 + await writeline(srconly, srcstat.url) + await writeline(plan, srcstat.url, dst) + return 1, srcstat.size + + srcfiles.sort(key=lambda x: x.basename) + dstfiles.sort(key=lambda x: x.basename) + + tid = progress.add_task(description=src, total=len(srcfiles) + len(dstfiles)) + + srcidx = 0 + dstidx = 0 + + n_files = 0 + n_bytes = 0 + + async def process_child_directory(new_srcurl: str, new_dsturl: str) -> Tuple[int, int]: + return await find_all_copy_pairs( + fs, + matches, + differs, + srconly, + dstonly, + plan, + new_srcurl, + new_dsturl, + progress, + sema, + source_must_exist=False, + ) + + async def retrieve_child_directory_results(child_directory_task): + nonlocal n_files + nonlocal n_bytes + dir_n_files, dir_n_bytes = await child_directory_task + n_files += dir_n_files + n_bytes += dir_n_bytes + + async with AsyncExitStack() as child_directory_callbacks: + + def background_process_child_dir(new_srcurl: str, new_dsturl: str): + t = asyncio.create_task(process_child_directory(new_srcurl, new_dsturl)) + child_directory_callbacks.push_async_callback(retrieve_child_directory_results, t) + + while srcidx < len(srcfiles) and dstidx < len(dstfiles): + srcf = srcfiles[srcidx] + dstf = dstfiles[dstidx] + if srcf.basename == dstf.basename: + if srcf.is_dir and dstf.is_dir: + background_process_child_dir(srcf.url, dstf.url) + elif srcf.is_dir and not dstf.is_dir: + await writeline(differs, srcf.url, dstf.url, 'dir', 'file') + elif not srcf.is_dir and dstf.is_dir: + await writeline(differs, srcf.url, dstf.url, 'file', 'dir') + elif srcf.size == dstf.size: + await writeline(matches, srcf.url, dstf.url) + else: + await writeline(differs, srcf.url, dstf.url, str(srcf.size), str(dstf.size)) + dstidx += 1 + srcidx += 1 + progress.update(tid, advance=2) + elif srcf.basename < dstf.basename: + if srcf.is_dir: + background_process_child_dir( + srcf.url, + os.path.join(dst, relativize_url(folder=src, file=srcf.url)), + ) + else: + await writeline(srconly, srcf.url) + await writeline(plan, srcf.url, os.path.join(dst, srcf.basename)) + n_files += 1 + n_bytes += srcf.size + srcidx += 1 + progress.update(tid, advance=1) + else: + assert srcf.basename >= dstf.basename + dstidx += 1 + progress.update(tid, advance=1) + await writeline(dstonly, dstf.url) + + while srcidx < len(srcfiles): + srcf = srcfiles[srcidx] + + if srcf.is_dir: + background_process_child_dir( + srcf.url, + os.path.join(dst, relativize_url(folder=src, file=srcf.url)), + ) + else: + await writeline(srconly, srcf.url) + await writeline(plan, srcf.url, os.path.join(dst, srcf.basename)) + n_files += 1 + n_bytes += srcf.size + srcidx += 1 + progress.update(tid, advance=1) + + while dstidx < len(dstfiles): + dstf = dstfiles[dstidx] + + await writeline(dstonly, dstf.url) + dstidx += 1 + progress.update(tid, advance=1) + + # a short sleep ensures the progress bar is visible for a moment to the user + await asyncio.sleep(0.150) + progress.remove_task(tid) + + return n_files, n_bytes diff --git a/hail/python/hailtop/aiotools/sync.py b/hail/python/hailtop/aiotools/sync.py new file mode 100644 index 00000000000..4a509a12b16 --- /dev/null +++ b/hail/python/hailtop/aiotools/sync.py @@ -0,0 +1,71 @@ +from typing import Optional +import asyncio +import os +import sys + +from .fs.copier import Transfer +from .router_fs import RouterAsyncFS, AsyncFS +from .copy import copy + +try: + import uvloop + + uvloop_install = uvloop.install +except ImportError as e: + if not sys.platform.startswith('win32'): + raise e + + def uvloop_install(): + pass + + +class SyncError(ValueError): + pass + + +async def sync( + plan_folder: str, + gcs_requester_pays_project: Optional[str], + verbose: bool, + max_parallelism: int, +) -> None: + gcs_kwargs = {'gcs_requester_pays_configuration': gcs_requester_pays_project} + s3_kwargs = {'max_pool_connections': max_parallelism * 5, 'max_workers': max_parallelism} + + async with RouterAsyncFS(gcs_kwargs=gcs_kwargs, s3_kwargs=s3_kwargs) as fs: + if not all( + await asyncio.gather( + *( + fs.exists(os.path.join(plan_folder, x)) + for x in ('matches', 'differs', 'srconly', 'dstonly', 'plan', 'summary') + ) + ) + ): + raise SyncError('Run hailctl fs sync --make-plan first.', 1) + results = (await fs.read(os.path.join(plan_folder, 'summary'))).decode('utf-8') + n_files, n_bytes = (int(x) for x in results.split('\t')) + await copy( + max_simultaneous_transfers=max_parallelism, + local_kwargs=None, + gcs_kwargs=gcs_kwargs, + azure_kwargs={}, + s3_kwargs=s3_kwargs, + transfers=[ + Transfer(src, dst, treat_dest_as=Transfer.DEST_IS_TARGET) + async for src, dst in iterate_plan_file(plan_folder, fs) + ], + verbose=verbose, + totals=(n_files, n_bytes), + ) + + +async def iterate_plan_file(plan_folder: str, fs: AsyncFS): + lineno = 0 + plan = (await fs.read(os.path.join(plan_folder, 'plan'))).decode('utf-8') + for line in plan.split('\n'): + if not line: + continue + parts = line.strip().split('\t') + if len(parts) != 2: + raise SyncError(f'Malformed plan line, {lineno}, must have exactly one tab: {line}', 1) + yield parts diff --git a/hail/python/hailtop/hailctl/__main__.py b/hail/python/hailtop/hailctl/__main__.py index 999c10c60d2..14426f0948a 100644 --- a/hail/python/hailtop/hailctl/__main__.py +++ b/hail/python/hailtop/hailctl/__main__.py @@ -1,4 +1,6 @@ +from typing import cast import typer +import click import os from .auth import cli as auth_cli @@ -8,6 +10,7 @@ from .dataproc import cli as dataproc_cli from .dev import cli as dev_cli from .hdinsight import cli as hdinsight_cli +from .fs import cli as fs_cli app = typer.Typer( @@ -69,4 +72,6 @@ async def _curl( def main(): - app() + click_app = cast(click.Group, typer.main.get_command(app)) + click_app.add_command(fs_cli.app) + click_app() diff --git a/hail/python/hailtop/hailctl/fs/__init__.py b/hail/python/hailtop/hailctl/fs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/hail/python/hailtop/hailctl/fs/cli.py b/hail/python/hailtop/hailctl/fs/cli.py new file mode 100644 index 00000000000..6aee48caa47 --- /dev/null +++ b/hail/python/hailtop/hailctl/fs/cli.py @@ -0,0 +1,116 @@ +from typing import Optional, List, Tuple, cast +import asyncio +import click +import sys +import typer + +from hailtop.aiotools.plan import plan, PlanError +from hailtop.aiotools.sync import sync as aiotools_sync, SyncError + + +app_without_click = typer.Typer( + name='fs', + no_args_is_help=True, + help='Use cloud object stores and local filesystems.', + pretty_exceptions_show_locals=False, +) + + +@app_without_click.callback() +def callback(): + """ + Typer app, including Click subapp + """ + + +@click.command() +@click.option( + '--copy-to', + help='Pairs of source and destination URL. May be specified multiple times. The destination is always treated as a file. See --copy-into to copy into a directory', + type=(str, str), + required=False, + multiple=True, + default=(), +) +@click.option( + '--copy-into', + help='Copies the source path into the target path. The target must not be a file.', + type=(str, str), + required=False, + multiple=True, + default=(), +) +@click.option( + '-v', + '--verbose', + help='The Google project to which to charge egress costs.', + is_flag=True, + required=False, + default=False, +) +@click.option( + '--max-parallelism', help='The maximum number of concurrent requests.', type=int, required=False, default=75 +) +@click.option( + '--make-plan', + help='The folder in which to create a new synchronization plan. Must not exist.', + type=str, + required=False, +) +@click.option('--use-plan', help='The plan to execute. Must exist.', type=str, required=False) +@click.option( + '--gcs-requester-pays-project', help='The Google project to which to charge egress costs.', type=str, required=False +) +def sync( + copy_to: List[Tuple[str, str]], + copy_into: List[Tuple[str, str]], + verbose: bool, + max_parallelism: int, + make_plan: Optional[str] = None, + use_plan: Optional[str] = None, + gcs_requester_pays_project: Optional[str] = None, +): + """Synchronize files between one or more pairs of locations. + + If a corresponding file already exists at the destination with the same size in bytes, this + command will not copy it. If you want to replace files that have the exact same size in bytes, + delete the destination files first. THIS COMMAND DOES NOT CHECK MD5s OR SHAs! + + First generate a plan with --make-plan, then use the plan with --use-plan. + + Copy all the files under gs://gcs-bucket/a to s3://s3-bucket/b. For example, + gs://gcs-bucket/a/b/c will appear at s3://s3-bucket/b/b/c: + + + + $ hailctl fs sync --make-plan plan1 --copy gs://gcs-bucket/a s3://s3-bucket/b + + + + $ hailctl fs sync --use-plan plan1 + """ + if (make_plan is None and use_plan is None) or (make_plan is not None and use_plan is not None): + print('Must specify one of --make-plan or --use-plan. See hailctl fs sync --help.') + raise typer.Exit(1) + + if make_plan: + try: + asyncio.run(plan(make_plan, copy_to, copy_into, gcs_requester_pays_project, verbose, max_parallelism)) + except PlanError as err: + print('ERROR: ' + err.args[0]) + sys.exit(err.args[1]) + if use_plan: + if copy_to or copy_into: + print( + 'Do not specify --copy-to or --copy-into with --use-plan. Create the plan with --make-plan then call --use-plan without any --copy-to and --copy-into.' + ) + raise typer.Exit(1) + try: + asyncio.run(aiotools_sync(use_plan, gcs_requester_pays_project, verbose, max_parallelism)) + except SyncError as err: + print('ERROR: ' + err.args[0]) + sys.exit(err.args[1]) + + +app = cast(click.Group, typer.main.get_command(app_without_click)) +app.add_command(sync) diff --git a/hail/python/test/hailtop/inter_cloud/test_copy.py b/hail/python/test/hailtop/inter_cloud/test_copy.py index f9c15e85669..49a56dca2e5 100644 --- a/hail/python/test/hailtop/inter_cloud/test_copy.py +++ b/hail/python/test/hailtop/inter_cloud/test_copy.py @@ -3,18 +3,23 @@ import secrets from concurrent.futures import ThreadPoolExecutor import asyncio +import tempfile import functools import pytest from hailtop.utils import url_scheme, bounded_gather2 -from hailtop.aiotools import LocalAsyncFS, Transfer, FileAndDirectoryError, Copier, AsyncFS, FileListEntry +from hailtop.aiotools.plan import plan, PlanError +from hailtop.aiotools.sync import sync, SyncError from hailtop.aiotools.router_fs import RouterAsyncFS -from hailtop.aiocloud.aiogoogle import GoogleStorageAsyncFS -from hailtop.aiocloud.aioaws import S3AsyncFS -from hailtop.aiocloud.aioazure import AzureAsyncFS +from hailtop.aiotools import ( + Transfer, + FileAndDirectoryError, + Copier, + AsyncFS, + FileListEntry, +) from .generate_copy_test_specs import run_test_spec, create_test_file, create_test_dir - from .copy_test_specs import COPY_TEST_SPECS @@ -36,7 +41,9 @@ async def cloud_scheme(request): @pytest.fixture(scope='module') -async def router_filesystem(request) -> AsyncIterator[Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]]]: +async def router_filesystem( + request, +) -> AsyncIterator[Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]]]: token = secrets.token_hex(16) with ThreadPoolExecutor() as thread_pool: @@ -57,23 +64,28 @@ async def router_filesystem(request) -> AsyncIterator[Tuple[asyncio.Semaphore, A azure_container = os.environ['HAIL_TEST_AZURE_CONTAINER'] azure_base = f'https://{azure_account}.blob.core.windows.net/{azure_container}/tmp/{token}/' - bases = {'file': file_base, 'gs': gs_base, 's3': s3_base, 'azure-https': azure_base} + bases = { + 'file': file_base, + 'gs': gs_base, + 's3': s3_base, + 'azure-https': azure_base, + } sema = asyncio.Semaphore(50) async with sema: yield (sema, fs, bases) - await bounded_gather2( - sema, - functools.partial(fs.rmtree, sema, file_base), - functools.partial(fs.rmtree, sema, gs_base), - functools.partial(fs.rmtree, sema, s3_base), - functools.partial(fs.rmtree, sema, azure_base), - ) + # await bounded_gather2( + # sema, + # functools.partial(fs.rmtree, sema, file_base), + # functools.partial(fs.rmtree, sema, gs_base), + # functools.partial(fs.rmtree, sema, s3_base), + # functools.partial(fs.rmtree, sema, azure_base), + # ) - assert not await fs.isdir(file_base) - assert not await fs.isdir(gs_base) - assert not await fs.isdir(s3_base) - assert not await fs.isdir(azure_base) + # assert not await fs.isdir(file_base) + # assert not await fs.isdir(gs_base) + # assert not await fs.isdir(s3_base) + # assert not await fs.isdir(azure_base) async def fresh_dir(fs, bases, scheme): @@ -145,6 +157,61 @@ async def expect_file(fs, path, expected): assert actual == expected, (actual, expected) +async def copier_copy(fs, sema, transfer): + await Copier.copy(fs, sema, transfer) + + +async def sync_tool(fs: RouterAsyncFS, sema, transfer: Transfer): + max_parallelism = sema._value + + sources = transfer.src + if isinstance(sources, str): + sources = [sources] + + copy_to = [] + copy_into = [] + + if transfer.treat_dest_as == Transfer.DEST_DIR or ( + transfer.treat_dest_as == Transfer.INFER_DEST and await fs.isdir(transfer.dest) + ): + copy_into = [(src, transfer.dest) for src in sources] + elif transfer.treat_dest_as == Transfer.DEST_IS_TARGET or ( + transfer.treat_dest_as == Transfer.INFER_DEST and not await fs.isdir(transfer.dest) + ): + if await fs.isfile(transfer.dest): + if len(sources) > 1 or await fs.isdir(sources[0]): + raise NotADirectoryError(transfer.dest) + copy_to = [(src, transfer.dest) for src in sources] + else: + raise ValueError(f'unsupported treat_dest_as: {transfer.treat_dest_as}') + + with tempfile.TemporaryDirectory() as folder: + try: + await plan( + os.path.join(folder, 'planfolder'), + copy_to, + copy_into, + None, + True, + max_parallelism, + ) + await sync(os.path.join(folder, 'planfolder'), None, True, max_parallelism) + except (PlanError, SyncError) as err: + if err.__cause__: + raise err.__cause__ + else: + raise err + + +@pytest.fixture(params=['Copier.copy', 'hailctl_sync']) +def copy_tool(request): + if request.param == 'Copier.copy': + return copier_copy + if request.param == 'hailctl_sync': + return sync_tool + raise ValueError('bad: ' + request.param) + + class DidNotRaiseError(Exception): pass @@ -174,24 +241,24 @@ def __exit__(self, type, value, traceback): return True -async def test_copy_doesnt_exist(copy_test_context): +async def test_copy_doesnt_exist(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context with pytest.raises(FileNotFoundError): - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base)) -async def test_copy_file(copy_test_context): +async def test_copy_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a', 'src/a') -async def test_copy_large_file(copy_test_context): +async def test_copy_large_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context # mainly needs to be larger than the transfer block size (8K) @@ -199,44 +266,54 @@ async def test_copy_large_file(copy_test_context): async with await fs.create(f'{src_base}a') as f: await f.write(contents) - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) async with await fs.open(f'{dest_base}a') as f: copy_contents = await f.read() assert copy_contents == contents -async def test_copy_rename_file(copy_test_context): +async def test_copy_rename_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) + await copy_tool(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) await expect_file(fs, f'{dest_base}x', 'src/a') -async def test_copy_rename_file_dest_target_file(copy_test_context): +async def test_copy_rename_file_dest_target_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET), + ) await expect_file(fs, f'{dest_base}x', 'src/a') -async def test_copy_file_dest_target_directory_doesnt_exist(copy_test_context): +async def test_copy_file_dest_target_directory_doesnt_exist(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') # SourceCopier._copy_file creates destination directories as needed - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_DIR)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_DIR), + ) await expect_file(fs, f'{dest_base}x/a', 'src/a') -async def test_overwrite_rename_file(copy_test_context): +async def test_overwrite_rename_file( + copy_test_context, +): # hailctl fs sync does not support overwriting sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') @@ -247,153 +324,183 @@ async def test_overwrite_rename_file(copy_test_context): await expect_file(fs, f'{dest_base}x', 'src/a') -async def test_copy_rename_dir(copy_test_context): +async def test_copy_rename_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) + await copy_tool(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x')) await expect_file(fs, f'{dest_base}x/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}x/subdir/file2', 'src/a/subdir/file2') -async def test_copy_rename_dir_dest_is_target(copy_test_context): +async def test_copy_rename_dir_dest_is_target(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET), + ) await expect_file(fs, f'{dest_base}x/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}x/subdir/file2', 'src/a/subdir/file2') async def test_overwrite_rename_dir(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support overwrite sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') await create_test_dir(fs, 'dest', dest_base, 'x/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', f'{dest_base}x', treat_dest_as=Transfer.DEST_IS_TARGET), + ) await expect_file(fs, f'{dest_base}x/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}x/subdir/file2', 'src/a/subdir/file2') await expect_file(fs, f'{dest_base}x/file3', 'dest/x/file3') -async def test_copy_file_dest_trailing_slash_target_dir(copy_test_context): +async def test_copy_file_dest_trailing_slash_target_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base, treat_dest_as=Transfer.DEST_DIR)) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base, treat_dest_as=Transfer.DEST_DIR)) await expect_file(fs, f'{dest_base}a', 'src/a') -async def test_copy_file_dest_target_dir(copy_test_context): +async def test_copy_file_dest_target_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_DIR)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_DIR), + ) await expect_file(fs, f'{dest_base}a', 'src/a') -async def test_copy_file_dest_target_file(copy_test_context): +async def test_copy_file_dest_target_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', f'{dest_base}a', treat_dest_as=Transfer.DEST_IS_TARGET)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}a', f'{dest_base}a', treat_dest_as=Transfer.DEST_IS_TARGET), + ) await expect_file(fs, f'{dest_base}a', 'src/a') -async def test_copy_dest_target_file_is_dir(copy_test_context): +async def test_copy_dest_target_file_is_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') with RaisesOrObjectStore(dest_base, IsADirectoryError): - await Copier.copy( - fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET) + await copy_tool( + fs, + sema, + Transfer( + f'{src_base}a', + dest_base.rstrip('/'), + treat_dest_as=Transfer.DEST_IS_TARGET, + ), ) async def test_overwrite_file(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support overwriting sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') await create_test_file(fs, 'dest', dest_base, 'a') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a', 'src/a') async def test_copy_file_src_trailing_slash(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support trailing slashes on files sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') with pytest.raises(FileNotFoundError): - await Copier.copy(fs, sema, Transfer(f'{src_base}a/', dest_base)) + await copy_tool(fs, sema, Transfer(f'{src_base}a/', dest_base)) -async def test_copy_dir(copy_test_context): +async def test_copy_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}a/subdir/file2', 'src/a/subdir/file2') async def test_overwrite_dir(copy_test_context): + copy_tool = Copier.copy # hailctl fs sync does not support overwrite sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') await create_test_dir(fs, 'dest', dest_base, 'a/') - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a/file1', 'src/a/file1') await expect_file(fs, f'{dest_base}a/subdir/file2', 'src/a/subdir/file2') await expect_file(fs, f'{dest_base}a/file3', 'dest/a/file3') -async def test_copy_multiple(copy_test_context): +async def test_copy_multiple(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') await create_test_file(fs, 'src', src_base, 'b') - await Copier.copy(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], dest_base.rstrip('/'))) await expect_file(fs, f'{dest_base}a', 'src/a') await expect_file(fs, f'{dest_base}b', 'src/b') -async def test_copy_multiple_dest_target_file(copy_test_context): +async def test_copy_multiple_dest_target_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') await create_test_file(fs, 'src', src_base, 'b') with RaisesOrObjectStore(dest_base, NotADirectoryError): - await Copier.copy( + await copy_tool( fs, sema, - Transfer([f'{src_base}a', f'{src_base}b'], dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET), + Transfer( + [f'{src_base}a', f'{src_base}b'], + dest_base.rstrip('/'), + treat_dest_as=Transfer.DEST_IS_TARGET, + ), ) -async def test_copy_multiple_dest_file(copy_test_context): +async def test_copy_multiple_dest_file(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') @@ -401,22 +508,28 @@ async def test_copy_multiple_dest_file(copy_test_context): await create_test_file(fs, 'dest', dest_base, 'x') with RaisesOrObjectStore(dest_base, NotADirectoryError): - await Copier.copy(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], f'{dest_base}x')) + await copy_tool(fs, sema, Transfer([f'{src_base}a', f'{src_base}b'], f'{dest_base}x')) -async def test_file_overwrite_dir(copy_test_context): +async def test_file_overwrite_dir(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_file(fs, 'src', src_base, 'a') with RaisesOrObjectStore(dest_base, IsADirectoryError): - await Copier.copy( - fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_IS_TARGET) + await copy_tool( + fs, + sema, + Transfer( + f'{src_base}a', + dest_base.rstrip('/'), + treat_dest_as=Transfer.DEST_IS_TARGET, + ), ) async def test_file_and_directory_error( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str + router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str, copy_tool ): sema, fs, bases = router_filesystem @@ -427,18 +540,22 @@ async def test_file_and_directory_error( await create_test_file(fs, 'src', src_base, 'a/subfile') with pytest.raises(FileAndDirectoryError): - await Copier.copy(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) + await copy_tool(fs, sema, Transfer(f'{src_base}a', dest_base.rstrip('/'))) -async def test_copy_src_parts(copy_test_context): +async def test_copy_src_parts(copy_test_context, copy_tool): sema, fs, src_base, dest_base = copy_test_context await create_test_dir(fs, 'src', src_base, 'a/') - await Copier.copy( + await copy_tool( fs, sema, - Transfer([f'{src_base}a/file1', f'{src_base}a/subdir'], dest_base.rstrip('/'), treat_dest_as=Transfer.DEST_DIR), + Transfer( + [f'{src_base}a/file1', f'{src_base}a/subdir'], + dest_base.rstrip('/'), + treat_dest_as=Transfer.DEST_DIR, + ), ) await expect_file(fs, f'{dest_base}file1', 'src/a/file1') @@ -454,39 +571,72 @@ async def collect_files(it: AsyncIterator[FileListEntry]) -> List[str]: return [await x.url() async for x in it] -async def test_file_and_directory_error_with_slash_empty_file( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str +async def test_size_zero_directory_named_files_are_not_copied_but_do_not_impair_other_copies( + router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], + cloud_scheme: str, + copy_tool, ): sema, fs, bases = router_filesystem src_base = await fresh_dir(fs, bases, cloud_scheme) - await write_file(fs, f'{src_base}empty/', '') - await write_file(fs, f'{src_base}empty/foo', b'foo') + await write_file(fs, f'{src_base}folder-and-file/', '') + await write_file(fs, f'{src_base}folder-and-file/just-a-file', b'just-a-file') + await write_file(fs, f'{src_base}folder-and-file/nested-folder-and-file/', '') + await write_file(fs, f'{src_base}folder-and-file/nested-folder-and-file/nested-just-a-file', b'nested-just-a-file') + await write_file(fs, f'{src_base}folder-and-file/file-with-trailing-slash-but-not-a-folder/', '') await collect_files(await fs.listfiles(f'{src_base}')) await collect_files(await fs.listfiles(f'{src_base}', recursive=True)) - await collect_files(await fs.listfiles(f'{src_base}empty/')) - await collect_files(await fs.listfiles(f'{src_base}empty/', recursive=True)) + await collect_files(await fs.listfiles(f'{src_base}folder-and-file/')) + await collect_files(await fs.listfiles(f'{src_base}folder-and-file/', recursive=True)) - for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): - dest_base = await fresh_dir(fs, bases, cloud_scheme) + for transfer_type in ( + Transfer.DEST_IS_TARGET, + Transfer.DEST_DIR, + Transfer.INFER_DEST, + ): + destination = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}empty/', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}folder-and-file/', destination, treat_dest_as=transfer_type), + ) - await collect_files(await fs.listfiles(f'{dest_base}')) - await collect_files(await fs.listfiles(f'{dest_base}', recursive=True)) + if transfer_type == Transfer.DEST_DIR or (transfer_type == Transfer.INFER_DEST and destination[-1] == '/'): + destination = destination + 'folder-and-file/' - if transfer_type == Transfer.DEST_DIR: - exp_dest = f'{dest_base}empty/foo' - await expect_file(fs, exp_dest, 'foo') - assert not await fs.isfile(f'{dest_base}empty/') - assert await fs.isdir(f'{dest_base}empty/') - await collect_files(await fs.listfiles(f'{dest_base}empty/')) - await collect_files(await fs.listfiles(f'{dest_base}empty/', recursive=True)) - else: - exp_dest = f'{dest_base}foo' - await expect_file(fs, exp_dest, 'foo') + assert not await fs.isfile(destination), (transfer_type, destination) + assert await fs.isdir(destination), (transfer_type, destination) + + just_a_file_url = f'{destination}just-a-file' + assert await fs.read(just_a_file_url) == b'just-a-file', (transfer_type, destination) + + nested_folder_url = f'{destination}nested-folder-and-file/' + assert not await fs.exists(nested_folder_url), (transfer_type, destination) + + nested_just_a_file_url = f'{destination}nested-folder-and-file/nested-just-a-file' + assert await fs.read(nested_just_a_file_url) == b'nested-just-a-file', (transfer_type, destination) + + file_with_trailing_slash_url = f'{destination}file-with-trailing-slash-but-not-a-folder/' + assert not await fs.exists(file_with_trailing_slash_url), (transfer_type, destination) + + expected = [nested_folder_url, just_a_file_url] + actual = await collect_files(await fs.listfiles(destination)) + assert actual == expected, (transfer_type, destination) + + expected = [just_a_file_url, nested_just_a_file_url] + actual = await collect_files(await fs.listfiles(destination, recursive=True)) + assert actual == expected, (transfer_type, destination) + + expected = [nested_just_a_file_url] + actual = await collect_files(await fs.listfiles(nested_folder_url)) + assert actual == expected, (transfer_type, destination) + + expected = [nested_just_a_file_url] + actual = await collect_files(await fs.listfiles(nested_folder_url, recursive=True)) + assert actual == expected, (transfer_type, destination) async def test_file_and_directory_error_with_slash_non_empty_file_for_google_non_recursive( @@ -507,7 +657,9 @@ async def test_file_and_directory_error_with_slash_non_empty_file_for_google_non async def test_file_and_directory_error_with_slash_non_empty_file( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str + router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], + cloud_scheme: str, + copy_tool, ): sema, fs, bases = router_filesystem @@ -522,11 +674,21 @@ async def test_file_and_directory_error_with_slash_non_empty_file( with pytest.raises(FileAndDirectoryError): await collect_files(await fs.listfiles(f'{src_base}not-empty/', recursive=True)) - for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): + for transfer_type in ( + Transfer.DEST_IS_TARGET, + Transfer.DEST_DIR, + Transfer.INFER_DEST, + ): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy( - fs, sema, Transfer(f'{src_base}not-empty/bar', dest_base.rstrip('/'), treat_dest_as=transfer_type) + await copy_tool( + fs, + sema, + Transfer( + f'{src_base}not-empty/bar', + dest_base.rstrip('/'), + treat_dest_as=transfer_type, + ), ) if transfer_type == Transfer.DEST_DIR: exp_dest = f'{dest_base}bar' @@ -540,16 +702,26 @@ async def test_file_and_directory_error_with_slash_non_empty_file( with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy( - fs, sema, Transfer(f'{src_base}not-empty/', dest_base.rstrip('/'), treat_dest_as=transfer_type) + await copy_tool( + fs, + sema, + Transfer( + f'{src_base}not-empty/', + dest_base.rstrip('/'), + treat_dest_as=transfer_type, + ), ) with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type), + ) -async def test_file_and_directory_error_with_slash_non_empty_file_only_for_google_non_recursive( +async def test_copying_a_folder_with_only_a_size_zero_directory_named_file_copies_nothing_and_does_not_error_in_google( router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], ): sema, fs, bases = router_filesystem @@ -561,10 +733,22 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only_for_googl await collect_files(await fs.listfiles(f'{src_base}')) await collect_files(await fs.listfiles(f'{src_base}empty-only/')) - for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): + for transfer_type in ( + Transfer.DEST_IS_TARGET, + Transfer.DEST_DIR, + Transfer.INFER_DEST, + ): dest_base = await fresh_dir(fs, bases, 'gs') - await Copier.copy( - fs, sema, Transfer(f'{src_base}empty-only/', dest_base.rstrip('/'), treat_dest_as=transfer_type) + await Copier.copy( # hailctl fs sync errors when the source is an empty directory or + # non-extant file (excluding the size-zero, ending in trailing slash + # files). + fs, + sema, + Transfer( + f'{src_base}empty-only/', + dest_base.rstrip('/'), + treat_dest_as=transfer_type, + ), ) # We ignore empty directories when copying @@ -572,8 +756,9 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only_for_googl await collect_files(await fs.listfiles(f'{dest_base}empty-only/')) -async def test_file_and_directory_error_with_slash_empty_file_only( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str +async def test_copying_a_folder_with_only_a_size_zero_directory_named_file_copies_nothing_and_does_not_error( + router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], + cloud_scheme: str, ): sema, fs, bases = router_filesystem @@ -584,17 +769,35 @@ async def test_file_and_directory_error_with_slash_empty_file_only( await collect_files(await fs.listfiles(f'{src_base}', recursive=True)) await collect_files(await fs.listfiles(f'{src_base}empty-only/', recursive=True)) - for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): + for transfer_type in ( + Transfer.DEST_IS_TARGET, + Transfer.DEST_DIR, + Transfer.INFER_DEST, + ): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy( - fs, sema, Transfer(f'{src_base}empty-only/', dest_base.rstrip('/'), treat_dest_as=transfer_type) + await Copier.copy( # hailctl fs sync errors when the source is an empty directory or + # non-extant file (excluding the size-zero, ending in trailing slash + # files). + fs, + sema, + Transfer( + f'{src_base}empty-only/', + dest_base.rstrip('/'), + treat_dest_as=transfer_type, + ), ) with pytest.raises(FileNotFoundError): await collect_files(await fs.listfiles(f'{dest_base}empty-only/', recursive=True)) dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await Copier.copy( # hailctl fs sync errors when the source is an empty directory or + # non-extant file (excluding the size-zero, ending in trailing slash + # files). + fs, + sema, + Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type), + ) async def test_file_and_directory_error_with_slash_non_empty_file_only_google_non_recursive( @@ -614,7 +817,9 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only_google_no async def test_file_and_directory_error_with_slash_non_empty_file_only( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str + router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], + cloud_scheme: str, + copy_tool, ): sema, fs, bases = router_filesystem @@ -628,15 +833,27 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only( with pytest.raises(FileAndDirectoryError): await collect_files(await fs.listfiles(f'{src_base}not-empty-file-w-slash/', recursive=True)) - for transfer_type in (Transfer.DEST_IS_TARGET, Transfer.DEST_DIR, Transfer.INFER_DEST): + for transfer_type in ( + Transfer.DEST_IS_TARGET, + Transfer.DEST_DIR, + Transfer.INFER_DEST, + ): with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy( + await copy_tool( fs, sema, - Transfer(f'{src_base}not-empty-file-w-slash/', dest_base.rstrip('/'), treat_dest_as=transfer_type), + Transfer( + f'{src_base}not-empty-file-w-slash/', + dest_base.rstrip('/'), + treat_dest_as=transfer_type, + ), ) with pytest.raises(FileAndDirectoryError): dest_base = await fresh_dir(fs, bases, cloud_scheme) - await Copier.copy(fs, sema, Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type)) + await copy_tool( + fs, + sema, + Transfer(f'{src_base}', dest_base.rstrip('/'), treat_dest_as=transfer_type), + ) diff --git a/hail/python/test/hailtop/inter_cloud/test_fs.py b/hail/python/test/hailtop/inter_cloud/test_fs.py index 44d5c09f2d6..5adf4b112fb 100644 --- a/hail/python/test/hailtop/inter_cloud/test_fs.py +++ b/hail/python/test/hailtop/inter_cloud/test_fs.py @@ -534,6 +534,27 @@ async def test_basename_is_not_path(filesystem: Tuple[asyncio.Semaphore, AsyncFS assert (await fs.statfile(str(base.with_new_path_component('abc123')))).basename() == 'abc123' +async def test_basename_of_file_ending_in_slash_is_empty_in_cloud( + filesystem: Tuple[asyncio.Semaphore, AsyncFS, AsyncFSURL], +): + _, fs, base = filesystem + + if base.scheme in ('', 'file'): + return + + await fs.write(str(base.with_new_path_component('file-is-folder/')), b'') + + files = [x async for x in await fs.listfiles(str(base))] + assert len(files) == 1 + file = files[0] + assert file.basename() == 'file-is-folder' and file.is_dir() + + files = [x async for x in await fs.listfiles(str(await file.url()), exclude_trailing_slash_files=False)] + assert len(files) == 1 + file = files[0] + assert file.basename() == '' and file.is_file() + + async def test_listfiles(filesystem: Tuple[asyncio.Semaphore, AsyncFS, AsyncFSURL]): _, fs, base = filesystem diff --git a/hail/python/test/hailtop/inter_cloud/test_hailctl_fs_sync.py b/hail/python/test/hailtop/inter_cloud/test_hailctl_fs_sync.py new file mode 100644 index 00000000000..e3bdd041aa6 --- /dev/null +++ b/hail/python/test/hailtop/inter_cloud/test_hailctl_fs_sync.py @@ -0,0 +1,128 @@ +from typing import Tuple, Dict, AsyncIterator, List +import pytest +import os.path +import tempfile +import secrets +import asyncio +import pytest +from hailtop.utils import url_scheme, check_exec_output +from hailtop.aiotools import Transfer, FileAndDirectoryError, Copier, AsyncFS, FileListEntry + + +from .generate_copy_test_specs import run_test_spec, create_test_file, create_test_dir +from .test_copy import cloud_scheme, router_filesystem, fresh_dir + + +@pytest.mark.asyncio +async def test_cli_file_and_dir(router_filesystem, cloud_scheme): + sema, fs, bases = router_filesystem + + test_dir = await fresh_dir(fs, bases, cloud_scheme) + plandir = await fresh_dir(fs, bases, cloud_scheme) + plandir2 = await fresh_dir(fs, bases, cloud_scheme) + + await fs.write(f"{test_dir}file1", b"hello world\n") + + await check_exec_output( + 'hailctl', + 'fs', + 'sync', + '--make-plan', + plandir, + '--copy-to', + f'{test_dir}file1', + f'{test_dir}file2', + '--copy-into', + f'{test_dir}file1', + f'{test_dir}dir1', + ) + + await check_exec_output( + 'hailctl', + 'fs', + 'sync', + '--use-plan', + plandir, + ) + + expected_files = [f"{test_dir}file1", f"{test_dir}file2", f"{test_dir}dir1/file1"] + for url in expected_files: + assert await fs.read(url) == b"hello world\n" + + await check_exec_output( + 'hailctl', + 'fs', + 'sync', + '--make-plan', + plandir2, + '--copy-to', + f'{test_dir}file1', + f'{test_dir}file2', + '--copy-into', + f'{test_dir}file1', + f'{test_dir}dir1', + ) + assert await fs.read(plandir2 + 'differs') == b'' + assert await fs.read(plandir2 + 'dstonly') == b'' + assert await fs.read(plandir2 + 'srconly') == b'' + + +@pytest.mark.asyncio +async def test_cli_subdir(router_filesystem, cloud_scheme): + sema, fs, bases = router_filesystem + + test_dir = await fresh_dir(fs, bases, cloud_scheme) + plandir = await fresh_dir(fs, bases, cloud_scheme) + plandir2 = await fresh_dir(fs, bases, cloud_scheme) + + await fs.makedirs(f"{test_dir}dir") + await fs.makedirs(f"{test_dir}dir/subdir") + await fs.write(f"{test_dir}dir/subdir/file1", b"hello world\n") + + await check_exec_output( + 'hailctl', 'fs', 'sync', '--make-plan', plandir, '--copy-to', f'{test_dir}dir', f'{test_dir}dir2' + ) + + await check_exec_output( + 'hailctl', + 'fs', + 'sync', + '--use-plan', + plandir, + ) + + assert await fs.read(f"{test_dir}dir2/subdir/file1") == b"hello world\n" + + await check_exec_output( + 'hailctl', 'fs', 'sync', '--make-plan', plandir2, '--copy-to', f'{test_dir}dir', f'{test_dir}dir2' + ) + + assert await fs.read(plandir2 + 'plan') == b'' + assert await fs.read(plandir2 + 'summary') == b'0\t0\n' + assert await fs.read(plandir2 + 'differs') == b'' + assert await fs.read(plandir2 + 'dstonly') == b'' + assert await fs.read(plandir2 + 'srconly') == b'' + assert await fs.read(plandir2 + 'matches') == f'{test_dir}dir/subdir/file1\t{test_dir}dir2/subdir/file1\n'.encode() + + +@pytest.mark.asyncio +async def test_cli_already_synced(router_filesystem, cloud_scheme): + sema, fs, bases = router_filesystem + + test_dir = await fresh_dir(fs, bases, cloud_scheme) + plandir = await fresh_dir(fs, bases, cloud_scheme) + + await fs.makedirs(f"{test_dir}dir") + await fs.write(f"{test_dir}dir/foo", b"hello world\n") + await fs.write(f"{test_dir}bar", b"hello world\n") + + await check_exec_output( + 'hailctl', 'fs', 'sync', '--make-plan', plandir, '--copy-to', f'{test_dir}dir/foo', f'{test_dir}bar' + ) + + assert await fs.read(plandir + 'plan') == b'' + assert await fs.read(plandir + 'summary') == b'0\t0\n' + assert await fs.read(plandir + 'differs') == b'' + assert await fs.read(plandir + 'dstonly') == b'' + assert await fs.read(plandir + 'srconly') == b'' + assert await fs.read(plandir + 'matches') == f'{test_dir}dir/foo\t{test_dir}bar\n'.encode()