Skip to content

Commit

Permalink
Merge branch 'allow-timeout-as-number' into new-new-copier
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan King committed Jan 16, 2024
2 parents 5d1be8c + 630ffd3 commit 7e29e63
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 13 deletions.
19 changes: 11 additions & 8 deletions hail/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ $(FAST_PYTHON_JAR_EXTRA_CLASSPATH): build.gradle
python-jar: $(PYTHON_JAR)

.PHONY: pytest
pytest: $(PYTHON_VERSION_INFO) $(INIT_SCRIPTS)
pytest: python/README.md $(FAST_PYTHON_JAR) $(FAST_PYTHON_JAR_EXTRA_CLASSPATH)
pytest: install-editable
cd python && \
$(HAIL_PYTHON3) -m pytest \
-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \
Expand All @@ -202,8 +201,7 @@ pytest: python/README.md $(FAST_PYTHON_JAR) $(FAST_PYTHON_JAR_EXTRA_CLASSPATH)

# NOTE: Look at upload-remote-test-resources target if test resources are missing
.PHONY: pytest-inter-cloud
pytest-inter-cloud: install-editable
pytest-inter-cloud: upload-remote-test-resources
pytest-inter-cloud: upload-remote-test-resources install-editable
cd python && \
HAIL_TEST_STORAGE_URI=$(TEST_STORAGE_URI) \
HAIL_TEST_GCS_BUCKET=$(HAIL_TEST_GCS_BUCKET) \
Expand Down Expand Up @@ -349,15 +347,20 @@ upload-artifacts: $(WHEEL)
# NOTE: 1-day expiration of the test bucket means that this
# target must be run at least once a day. To trigger this target to re-run,
# > rm upload-remote-test-resources
upload-remote-test-resources: install-editable
upload-remote-test-resources: $(shell git ls-files src/test/resources)
upload-remote-test-resources: $(shell git ls-files python/hail/docs/data)
# # If hailtop.aiotools.copy gives you trouble:
# gcloud storage cp -r src/test/resources/\* $(CLOUD_HAIL_TEST_RESOURCES_DIR)
# gcloud storage cp -r python/hail/docs/data/\* $(CLOUD_HAIL_DOCTEST_DATA_DIR)
python3 -m hailtop.aiotools.copy -vvv 'null' '[\
{"from":"src/test/resources","to":"$(CLOUD_HAIL_TEST_RESOURCES_DIR)"},\
{"from":"python/hail/docs/data","to":"$(CLOUD_HAIL_DOCTEST_DATA_DIR)"}\
]' --timeout 600
python3 -m hailtop.aiotools.copy \
-vvv \
'null' \
"[\
{\"from\":\"src/test/resources\",\"to\":\"$(CLOUD_HAIL_TEST_RESOURCES_DIR)\"}, \
{\"from\":\"python/hail/docs/data\",\"to\":\"$(CLOUD_HAIL_DOCTEST_DATA_DIR)\"} \
]" \
--timeout 600
touch $@

# NOTE: 1-day expiration of the test bucket means that this
Expand Down
36 changes: 35 additions & 1 deletion hail/python/hailtop/aiocloud/aioaws/fs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
from typing import Any, AsyncIterator, BinaryIO, cast, AsyncContextManager, Dict, List, Optional, Set, Tuple, Type
from typing import (
Any,
AsyncIterator,
BinaryIO,
cast,
AsyncContextManager,
Dict,
List,
Optional,
Set,
Tuple,
Type,
Union,
)
from types import TracebackType
import aiohttp
import sys
from concurrent.futures import ThreadPoolExecutor
import os.path
Expand Down Expand Up @@ -341,12 +355,32 @@ def __init__(
max_workers: Optional[int] = None,
*,
max_pool_connections: int = 10,
timeout: Optional[Union[int, float, aiohttp.ClientTimeout]] = None,
):
if not thread_pool:
thread_pool = ThreadPoolExecutor(max_workers=max_workers)
self._thread_pool = thread_pool

kwargs = {}
if isinstance(timeout, aiohttp.ClientTimeout):
if timeout.sock_read:
kwargs['read_timeout'] = timeout.sock_read
elif timeout.total:
kwargs['read_timeout'] = timeout.total

if timeout.sock_connect:
kwargs['connect_timeout'] = timeout.sock_connect
elif timeout.connect:
kwargs['connect_timeout'] = timeout.connect
elif timeout.total:
kwargs['connect_timeout'] = timeout.total
elif isinstance(timeout, (int, float)):
kwargs['read_timeout'] = timeout
kwargs['connect_timeout'] = timeout

config = botocore.config.Config(
max_pool_connections=max_pool_connections,
**kwargs,
)
self._s3 = boto3.client('s3', config=config)

Expand Down
23 changes: 20 additions & 3 deletions hail/python/hailtop/aiocloud/aioazure/fs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, AsyncContextManager, AsyncIterator, Dict, List, Optional, Set, Tuple, Type, Union
from types import TracebackType

import aiohttp
import abc
import re
import os
Expand Down Expand Up @@ -372,7 +373,13 @@ async def wrapped(self: 'AzureAsyncFS', url, *args, **kwargs):
class AzureAsyncFS(AsyncFS):
PATH_REGEX = re.compile('/(?P<container>[^/]+)(?P<name>.*)')

def __init__(self, *, credential_file: Optional[str] = None, credentials: Optional[AzureCredentials] = None):
def __init__(
self,
*,
credential_file: Optional[str] = None,
credentials: Optional[AzureCredentials] = None,
timeout: Optional[Union[int, float, aiohttp.ClientTimeout]] = None,
):
if credentials is None:
scopes = ['https://storage.azure.com/.default']
if credential_file is not None:
Expand All @@ -382,6 +389,16 @@ def __init__(self, *, credential_file: Optional[str] = None, credentials: Option
elif credential_file is not None:
raise ValueError('credential and credential_file cannot both be defined')

if isinstance(timeout, aiohttp.ClientTimeout):
self.read_timeout = timeout.sock_read or timeout.total or 5
self.connection_timeout = timeout.sock_connect or timeout.connect or timeout.total or 5
elif isinstance(timeout, (int, float)):
self.read_timeout = timeout
self.connection_timeout = timeout
else:
self.read_timeout = 5
self.connection_timeout = 5

self._credential = credentials.credential
self._blob_service_clients: Dict[Tuple[str, str, Union[AzureCredentials, str, None]], BlobServiceClient] = {}

Expand Down Expand Up @@ -482,8 +499,8 @@ def get_blob_service_client(self, account: str, container: str, token: Optional[
self._blob_service_clients[k] = BlobServiceClient(
f'https://{account}.blob.core.windows.net',
credential=credential, # type: ignore
connection_timeout=5,
read_timeout=5,
connection_timeout=self.connection_timeout,
read_timeout=self.read_timeout,
)
return self._blob_service_clients[k]

Expand Down
1 change: 0 additions & 1 deletion hail/python/hailtop/aiotools/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ async def main() -> None:
timeout = args.timeout
if timeout:
timeout = float(timeout)
print(timeout)
gcs_kwargs = {
'gcs_requester_pays_configuration': requester_pays_project,
'timeout': timeout,
Expand Down

0 comments on commit 7e29e63

Please sign in to comment.