Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fs] basic sync tool #13834

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hail/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ pytest-inter-cloud: upload-remote-test-resources install-editable
HAIL_TEST_S3_BUCKET=hail-test-dy5rg \
HAIL_TEST_AZURE_ACCOUNT=hailtest \
HAIL_TEST_AZURE_CONTAINER=hail-test-4nxei \
HAIL_TEST_AZURE_RESGRP=hail-dev \
HAIL_TEST_AZURE_SUBID=22cd45fe-f996-4c51-af67-ef329d977519 \
$(HAIL_PYTHON3) -m pytest \
-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \
--log-cli-level=INFO \
Expand Down
24 changes: 20 additions & 4 deletions hail/python/hailtop/aiocloud/aioaws/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ def __init__(self, head_object_resp, url: str):
self.head_object_resp = head_object_resp
self._url = url

def __repr__(self):
return f'S3HeadObjectFileStatus({self.head_object_resp}, {self._url})'

def basename(self) -> str:
return os.path.basename(self._url.rstrip('/'))
return os.path.basename(self._url)

def url(self) -> str:
return self._url
Expand All @@ -120,8 +123,11 @@ def __init__(self, item: Dict[str, Any], url: str):
self._item = item
self._url = url

def __repr__(self):
return f'S3ListFilesFileStatus({self._item}, {self._url})'

def basename(self) -> str:
return os.path.basename(self._url.rstrip('/'))
return os.path.basename(self._url)

def url(self) -> str:
return self._url
Expand Down Expand Up @@ -194,18 +200,28 @@ def __init__(self, bucket: str, key: str, item: Optional[Dict[str, Any]]):
self._item = item
self._status: Optional[S3ListFilesFileStatus] = None

def __repr__(self):
return f'S3FileListEntry({self._bucket}, {self._key}, {self._item})'

def basename(self) -> str:
return os.path.basename(self._key.rstrip('/'))
object_name = self._key
if self._is_dir():
assert object_name[-1] == '/'
object_name = object_name[:-1]
return os.path.basename(object_name)

async def url(self) -> str:
return f's3://{self._bucket}/{self._key}'

async def is_file(self) -> bool:
return self._item is not None

async def is_dir(self) -> bool:
def _is_dir(self) -> bool:
return self._item is None

async def is_dir(self) -> bool:
return self._is_dir()

async def status(self) -> FileStatus:
if self._status is None:
if self._item is None:
Expand Down
25 changes: 20 additions & 5 deletions hail/python/hailtop/aiocloud/aioazure/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,15 @@ def __init__(self, url: 'AzureAsyncFSURL', blob_props: Optional[BlobProperties])
self._blob_props = blob_props
self._status: Optional[AzureFileStatus] = None

def __repr__(self):
return f'AzureFileListEntry({self._url}, {self._blob_props})'

def basename(self) -> str:
return os.path.basename(self._url.base.rstrip('/'))
url_no_sas = self._url.base
if self._is_dir():
assert url_no_sas[-1] == '/'
url_no_sas = url_no_sas[:-1]
return os.path.basename(url_no_sas)

async def url(self) -> str:
return self._url.base
Expand All @@ -250,9 +257,12 @@ async def url_full(self) -> str:
async def is_file(self) -> bool:
return self._blob_props is not None

async def is_dir(self) -> bool:
def _is_dir(self) -> bool:
return self._blob_props is None

async def is_dir(self) -> bool:
return self._is_dir()

async def status(self) -> FileStatus:
if self._status is None:
if self._blob_props is None:
Expand All @@ -266,8 +276,11 @@ def __init__(self, blob_props: BlobProperties, url: 'AzureAsyncFSURL'):
self.blob_props = blob_props
self._url = url

def __repr__(self):
return f'AzureFileStatus({self.blob_props}, {self._url})'

def basename(self) -> str:
return os.path.basename(self._url.base.rstrip('/'))
return os.path.basename(self._url.base)

def url(self) -> str:
return str(self._url)
Expand Down Expand Up @@ -546,9 +559,11 @@ async def isfile(self, url: str) -> bool:
@handle_public_access_error
async def isdir(self, url: str) -> bool:
fs_url = self.parse_url(url)
assert not fs_url.path or fs_url.path.endswith('/'), fs_url.path
client = self.get_container_client(fs_url)
async for _ in client.walk_blobs(name_starts_with=fs_url.path, include=['metadata'], delimiter='/'):
path = fs_url.path
if path[-1] != '/':
path = path + '/'
async for _ in client.walk_blobs(name_starts_with=path, include=['metadata'], delimiter='/'):
return True
return False

Expand Down
22 changes: 18 additions & 4 deletions hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,11 @@ def __init__(self, items: Dict[str, str], url: str):
self._items = items
self._url = url

def __repr__(self):
return f'GetObjectFileStatus({self._items}, {self._url})'

def basename(self) -> str:
return os.path.basename(self._url.rstrip('/'))
return os.path.basename(self._url)

def url(self) -> str:
return self._url
Expand All @@ -471,18 +474,28 @@ def __init__(self, bucket: str, name: str, items: Optional[Dict[str, Any]]):
self._items = items
self._status: Optional[GetObjectFileStatus] = None

def __repr__(self):
return f'GoogleStorageFileListEntry({self._bucket}, {self._name}, {self._items})'

def basename(self) -> str:
return os.path.basename(self._name.rstrip('/'))
object_name = self._name
if self._is_dir():
assert object_name[-1] == '/'
object_name = object_name[:-1]
return os.path.basename(object_name)

async def url(self) -> str:
return f'gs://{self._bucket}/{self._name}'

async def is_file(self) -> bool:
return self._items is not None

async def is_dir(self) -> bool:
def _is_dir(self) -> bool:
return self._items is None

async def is_dir(self) -> bool:
return self._is_dir()

async def status(self) -> FileStatus:
if self._status is None:
if self._items is None:
Expand Down Expand Up @@ -798,7 +811,8 @@ async def isfile(self, url: str) -> bool:

async def isdir(self, url: str) -> bool:
bucket, name = self.get_bucket_and_name(url)
assert not name or name.endswith('/'), name
if name[-1] != '/':
name = name + '/'
params = {'prefix': name, 'delimiter': '/', 'includeTrailingDelimiter': 'true', 'maxResults': 1}
async for page in await self._storage_client.list_objects(bucket, params=params):
prefixes = page.get('prefixes')
Expand Down
30 changes: 23 additions & 7 deletions hail/python/hailtop/aiotools/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ async def __aexit__(self, exc_type, exc, tb):
self.task.cancel()


def only_update_completions(progress: Progress, tid):
def listen(delta: int):
if delta < 0:
progress.update(tid, advance=-delta)

return listen


async def copy(
*,
max_simultaneous_transfers: Optional[int] = None,
Expand All @@ -76,6 +84,7 @@ async def copy(
s3_kwargs: Optional[dict] = None,
transfers: List[Transfer],
verbose: bool = False,
totals: Optional[Tuple[int, int]] = None,
) -> None:
with ThreadPoolExecutor() as thread_pool:
if max_simultaneous_transfers is None:
Expand Down Expand Up @@ -108,15 +117,22 @@ async def copy(
) as sema:
file_tid = progress.add_task(description='files', total=0, visible=verbose)
bytes_tid = progress.add_task(description='bytes', total=0, visible=verbose)

if totals:
n_files, n_bytes = totals
progress.update(file_tid, total=n_files)
progress.update(bytes_tid, total=n_bytes)
file_listener = only_update_completions(progress, file_tid)
bytes_listener = only_update_completions(progress, bytes_tid)
else:
file_listener = make_listener(progress, file_tid)
bytes_listener = make_listener(progress, bytes_tid)

copy_report = await Copier.copy(
fs,
sema,
transfers,
files_listener=make_listener(progress, file_tid),
bytes_listener=make_listener(progress, bytes_tid),
fs, sema, transfers, files_listener=file_listener, bytes_listener=bytes_listener
)
if verbose:
copy_report.summarize()
copy_report.summarize(include_sources=totals is None)


def make_transfer(json_object: Dict[str, str]) -> Transfer:
Expand Down Expand Up @@ -169,7 +185,7 @@ async def main() -> None:
parser.add_argument(
'-v', '--verbose', action='store_const', const=True, default=False, help='show logging information'
)
parser.add_argument('--timeout', type=str, default=None, help='show logging information')
parser.add_argument('--timeout', type=str, default=None, help='Set the total timeout for HTTP requests.')
args = parser.parse_args()

if args.verbose:
Expand Down
13 changes: 8 additions & 5 deletions hail/python/hailtop/aiotools/fs/copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def mark_done(self):
self._end_time = time_msecs()
self._duration = self._end_time - self._start_time

def summarize(self):
def summarize(self, include_sources: bool = True):
source_reports = []

def add_source_reports(transfer_report):
Expand Down Expand Up @@ -177,9 +177,10 @@ def add_source_reports(transfer_report):
file_rate = total_files / (self._duration / 1000)
print(f' Average file rate: {file_rate:,.1f}/s')

print('Sources:')
for sr in source_reports:
print(f' {sr._source}: {sr._files} files, {humanize.naturalsize(sr._bytes)}')
if include_sources:
print('Sources:')
for sr in source_reports:
print(f' {sr._source}: {sr._files} files, {humanize.naturalsize(sr._bytes)}')


class SourceCopier:
Expand Down Expand Up @@ -239,6 +240,7 @@ async def _copy_part(
part_creator: MultiPartCreate,
return_exceptions: bool,
) -> None:
total_written = 0
try:
async with self.xfer_sema.acquire_manager(min(Copier.BUFFER_SIZE, this_part_size)):
async with await self.router_fs.open_from(
Expand All @@ -254,8 +256,9 @@ async def _copy_part(
raise UnexpectedEOFError()
written = await destf.write(b)
assert written == len(b)
source_report.finish_bytes(written)
total_written += written
n -= len(b)
source_report.finish_bytes(total_written)
except Exception as e:
if return_exceptions:
source_report.set_exception(e)
Expand Down
Loading
Loading