Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fs] basic sync tool #14248

Open
wants to merge 66 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
8621664
[fs] basic sync tool
Oct 17, 2023
86b5a4b
more docs
Feb 5, 2024
8fbc0f2
fix bad help string
Feb 5, 2024
861ca13
use recursive=True for rapid listing
Feb 8, 2024
7660033
fix
Feb 8, 2024
f956f0a
no prints
Feb 8, 2024
5b0c8a9
allow isdir without trailing slash
Feb 9, 2024
46b1b34
simplify sync.py dramatically
Feb 12, 2024
55c83e0
update listeners before return
Feb 12, 2024
eac543c
use uvloop
Feb 12, 2024
2bc9d1d
uvloopx
Feb 13, 2024
9588f91
import fix
Feb 13, 2024
b905f30
maybe get InsertObjectStream right
Feb 13, 2024
c4162cb
await _cleanup_future too
Feb 13, 2024
c8388b1
use async with instead of async with await
Feb 13, 2024
527488f
smaller part size maybe helps?
Feb 13, 2024
c8891ec
prints
Feb 13, 2024
0e6ae95
use order of magnitude less file parallelism than partition parallelism
Feb 13, 2024
04448ed
files are 1
Feb 13, 2024
22ef264
prints
Feb 13, 2024
9a43627
debugging
Feb 13, 2024
f5a7af2
debugging
Feb 13, 2024
e6fffd7
fix
Feb 13, 2024
470884d
revert prints
Feb 13, 2024
01e0759
fewre prints
Feb 13, 2024
52fd7b6
debug
Feb 13, 2024
e5ce4d6
wtf
Feb 13, 2024
279dcfa
debug
Feb 13, 2024
d1ef63c
debug
Feb 13, 2024
41d9a9a
debug
Feb 13, 2024
927ad35
debug
Feb 13, 2024
8e0d44c
debug
Feb 13, 2024
cab8b8f
debug
Feb 13, 2024
4a848b4
debug
Feb 13, 2024
f89a9e5
debug
Feb 13, 2024
188fff7
debug
Feb 13, 2024
9ef4b32
debug
Feb 13, 2024
7086648
debug
Feb 13, 2024
2c9d79d
debug
Feb 13, 2024
6268348
debug
Feb 13, 2024
edbf8cd
debug
Feb 13, 2024
c9b07de
debug
Feb 13, 2024
7aa4a5a
debug
Feb 13, 2024
2a886aa
debug
Feb 13, 2024
7fc7003
debug
Feb 13, 2024
e564b5d
async with await
Feb 15, 2024
40f96c5
pyright fixes
Feb 22, 2024
4c69ec6
remove uvloopx changes
Feb 22, 2024
e3425e1
Revert "also front_end.py"
Feb 22, 2024
eeb861f
remove cruft
Feb 22, 2024
ba77330
remove debug cruft
Feb 22, 2024
44c9364
fix Self improt
Feb 27, 2024
f6eacea
fix bad imports
Feb 28, 2024
f502b8f
Merge remote-tracking branch 'hi/main' into new-new-copier
Feb 28, 2024
2bd85ec
[uvloopx] consolidate uvloop initialization code to one place
Feb 22, 2024
4c05471
also front_end.py
Feb 22, 2024
4bba249
revert unnecsesary changes to copy and copier
Feb 28, 2024
7ae6107
add assertion about total size and also fix lint about unused variable
Feb 29, 2024
38727c9
:Merge remote-tracking branch 'upstream/main' into new-new-copier
chrisvittal Jun 25, 2024
9c25bc0
fix
chrisvittal Jun 25, 2024
1a33173
Merge branch 'main' into new-new-copier
chrisvittal Aug 5, 2024
44ef794
lint fixes
chrisvittal Aug 7, 2024
7087fc0
test fixes
chrisvittal Aug 8, 2024
5a32d23
fix?
chrisvittal Aug 12, 2024
c66533f
lint
chrisvittal Aug 13, 2024
a447e22
Merge branch 'main' into new-new-copier
chrisvittal Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hail/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ pytest-inter-cloud: upload-remote-test-resources install-editable
HAIL_TEST_S3_BUCKET=hail-test-dy5rg \
HAIL_TEST_AZURE_ACCOUNT=hailtest \
HAIL_TEST_AZURE_CONTAINER=hail-test-4nxei \
HAIL_TEST_AZURE_RESGRP=hail-dev \
HAIL_TEST_AZURE_SUBID=22cd45fe-f996-4c51-af67-ef329d977519 \
$(HAIL_PYTHON3) -m pytest \
-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \
--log-cli-level=INFO \
Expand Down
24 changes: 20 additions & 4 deletions hail/python/hailtop/aiocloud/aioaws/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,11 @@ def __init__(self, head_object_resp, url: str):
self.head_object_resp = head_object_resp
self._url = url

def __repr__(self):
return f'S3HeadObjectFileStatus({self.head_object_resp}, {self._url})'

def basename(self) -> str:
return os.path.basename(self._url.rstrip('/'))
return os.path.basename(self._url)

def url(self) -> str:
return self._url
Expand All @@ -121,8 +124,11 @@ def __init__(self, item: Dict[str, Any], url: str):
self._item = item
self._url = url

def __repr__(self):
return f'S3ListFilesFileStatus({self._item}, {self._url})'

def basename(self) -> str:
return os.path.basename(self._url.rstrip('/'))
return os.path.basename(self._url)

def url(self) -> str:
return self._url
Expand Down Expand Up @@ -195,18 +201,28 @@ def __init__(self, bucket: str, key: str, item: Optional[Dict[str, Any]]):
self._item = item
self._status: Optional[S3ListFilesFileStatus] = None

def __repr__(self):
return f'S3FileListEntry({self._bucket}, {self._key}, {self._item})'

def basename(self) -> str:
return os.path.basename(self._key.rstrip('/'))
object_name = self._key
if self._is_dir():
assert object_name[-1] == '/'
object_name = object_name[:-1]
return os.path.basename(object_name)

async def url(self) -> str:
return f's3://{self._bucket}/{self._key}'

async def is_file(self) -> bool:
return self._item is not None

async def is_dir(self) -> bool:
def _is_dir(self) -> bool:
return self._item is None

async def is_dir(self) -> bool:
return self._is_dir()

async def status(self) -> FileStatus:
if self._status is None:
if self._item is None:
Expand Down
25 changes: 20 additions & 5 deletions hail/python/hailtop/aiocloud/aioazure/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,15 @@ def __init__(self, url: 'AzureAsyncFSURL', blob_props: Optional[BlobProperties])
self._blob_props = blob_props
self._status: Optional[AzureFileStatus] = None

def __repr__(self):
return f'AzureFileListEntry({self._url}, {self._blob_props})'

def basename(self) -> str:
return os.path.basename(self._url.base.rstrip('/'))
url_no_sas = self._url.base
if self._is_dir():
assert url_no_sas[-1] == '/'
url_no_sas = url_no_sas[:-1]
return os.path.basename(url_no_sas)

async def url(self) -> str:
return self._url.base
Expand All @@ -255,9 +262,12 @@ async def url_full(self) -> str:
async def is_file(self) -> bool:
return self._blob_props is not None

async def is_dir(self) -> bool:
def _is_dir(self) -> bool:
return self._blob_props is None

async def is_dir(self) -> bool:
return self._is_dir()

async def status(self) -> FileStatus:
if self._status is None:
if self._blob_props is None:
Expand All @@ -271,8 +281,11 @@ def __init__(self, blob_props: BlobProperties, url: 'AzureAsyncFSURL'):
self.blob_props = blob_props
self._url = url

def __repr__(self):
return f'AzureFileStatus({self.blob_props}, {self._url})'

def basename(self) -> str:
return os.path.basename(self._url.base.rstrip('/'))
return os.path.basename(self._url.base)

def url(self) -> str:
return str(self._url)
Expand Down Expand Up @@ -546,9 +559,11 @@ async def isfile(self, url: str) -> bool:
@handle_public_access_error
async def isdir(self, url: str) -> bool:
fs_url = self.parse_url(url, error_if_bucket=True)
assert not fs_url.path or fs_url.path.endswith('/'), fs_url.path
client = await self.get_container_client(fs_url)
async for _ in client.walk_blobs(name_starts_with=fs_url.path, include=['metadata'], delimiter='/'):
prefix = fs_url.path
if prefix[-1] != '/':
prefix = prefix + '/'
async for _ in client.walk_blobs(name_starts_with=prefix, include=['metadata'], delimiter='/'):
return True
return False

Expand Down
28 changes: 22 additions & 6 deletions hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ async def _cleanup_future(fut: asyncio.Future):


class InsertObjectStream(WritableStream):
def __init__(self, it: FeedableAsyncIterable[bytes], request_task: asyncio.Task[aiohttp.ClientResponse]):
def __init__(self, it: FeedableAsyncIterable[bytes], request_task: asyncio.Task[aiohttp.ClientResponse], url: str):
super().__init__()
self._it = it
self._request_task = request_task
self._value = None
self._exit_stack = AsyncExitStack()
self.url = url

async def cleanup_request_task():
if not self._request_task.cancelled():
Expand Down Expand Up @@ -364,7 +365,7 @@ async def insert_object(self, bucket: str, name: str, **kwargs) -> WritableStrea
f'https://storage.googleapis.com/upload/storage/v1/b/{bucket}/o', retry=False, **kwargs
)
)
return InsertObjectStream(it, request_task)
return InsertObjectStream(it, request_task, 'gs://' + bucket + '/' + name)

# Write using resumable uploads. See:
# https://cloud.google.com/storage/docs/performing-resumable-uploads
Expand Down Expand Up @@ -461,8 +462,11 @@ def __init__(self, items: Dict[str, str], url: str):
self._items = items
self._url = url

def __repr__(self):
return f'GetObjectFileStatus({self._items}, {self._url})'

def basename(self) -> str:
return os.path.basename(self._url.rstrip('/'))
return os.path.basename(self._url)

def url(self) -> str:
return self._url
Expand All @@ -487,18 +491,28 @@ def __init__(self, bucket: str, name: str, items: Optional[Dict[str, Any]]):
self._items = items
self._status: Optional[GetObjectFileStatus] = None

def __repr__(self):
return f'GoogleStorageFileListEntry({self._bucket}, {self._name}, {self._items})'

def basename(self) -> str:
return os.path.basename(self._name.rstrip('/'))
object_name = self._name
if self._is_dir():
assert object_name[-1] == '/'
object_name = object_name[:-1]
return os.path.basename(object_name)

async def url(self) -> str:
return f'gs://{self._bucket}/{self._name}'

async def is_file(self) -> bool:
return self._items is not None

async def is_dir(self) -> bool:
def _is_dir(self) -> bool:
return self._items is None

async def is_dir(self) -> bool:
return self._is_dir()

async def status(self) -> FileStatus:
if self._status is None:
if self._items is None:
Expand Down Expand Up @@ -856,7 +870,9 @@ async def isfile(self, url: str) -> bool:

async def isdir(self, url: str) -> bool:
fsurl = self.parse_url(url, error_if_bucket=True)
assert not fsurl._path or fsurl.path.endswith('/'), fsurl._path
prefix = fsurl._path
if len(prefix) > 0 and prefix[-1] != '/':
prefix += '/'
params = {'prefix': fsurl._path, 'delimiter': '/', 'includeTrailingDelimiter': 'true', 'maxResults': 1}
async for page in await self._storage_client.list_objects(fsurl._bucket, params=params):
prefixes = page.get('prefixes')
Expand Down
Loading