Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-47357: Explicitly configure timeouts for fsspec #99

Merged
merged 5 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 84 additions & 8 deletions python/lsst/resources/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

try:
import fsspec
from aiohttp import ClientSession, TCPConnector
from aiohttp import ClientSession, ClientTimeout, TCPConnector
from fsspec.implementations.http import HTTPFileSystem
from fsspec.spec import AbstractFileSystem
except ImportError:
Expand Down Expand Up @@ -103,7 +103,7 @@
"""

# Default timeouts for all HTTP requests (seconds).
DEFAULT_TIMEOUT_CONNECT: float = 30.0
DEFAULT_TIMEOUT_CONNECT: float = 60.0
DEFAULT_TIMEOUT_READ: float = 1_500.0

# Default lower and upper bounds for the backoff interval (seconds).
Expand All @@ -125,6 +125,7 @@
self._back_end_connections: int | None = None
self._digest_algorithm: str | None = None
self._send_expect_on_put: bool | None = None
self._fsspec_is_enabled: bool | None = None
self._timeout: tuple[float, float] | None = None
self._collect_memory_usage: bool | None = None
self._backoff_min: float | None = None
Expand Down Expand Up @@ -206,6 +207,20 @@
self._send_expect_on_put = "LSST_HTTP_PUT_SEND_EXPECT_HEADER" in os.environ
return self._send_expect_on_put

@property
def fsspec_is_enabled(self) -> bool:
"""Return True if `fsspec` is enabled for objects of class
HttpResourcePath.

To determine if `fsspec` is enabled, this method inspects the presence
of the environment variable `LSST_HTTP_ENABLE_FSSPEC` (with any value).
"""
if self._fsspec_is_enabled is not None:
return self._fsspec_is_enabled

Check warning on line 219 in python/lsst/resources/http.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/http.py#L219

Added line #L219 was not covered by tests

self._fsspec_is_enabled = "LSST_HTTP_ENABLE_FSSPEC" in os.environ
return self._fsspec_is_enabled

@property
def timeout(self) -> tuple[float, float]:
"""Return a tuple with the values of timeouts for connecting to the
Expand Down Expand Up @@ -734,6 +749,10 @@
via a PUT request. No digest is requested if this variable is not set
or is set to an invalid value.
Valid values are those in ACCEPTED_DIGESTS.

- LSST_HTTP_ENABLE_FSSPEC: the presence of this environment variable
activates the usage of `fsspec` compatible file system to read
a HTTP URL. The value of the variable is not inspected.
"""

# WebDAV servers known to be able to sign URLs. The values are lowercased
Expand All @@ -742,7 +761,7 @@
SUPPORTED_URL_SIGNERS = ("dcache", "xrootd")

# Configuration items for this class instances.
_config = HttpResourcePathConfig()
_config: HttpResourcePathConfig = HttpResourcePathConfig()

# The session for metadata requests is used for interacting with
# the front end servers for requests such as PROPFIND, HEAD, etc. Those
Expand Down Expand Up @@ -897,6 +916,15 @@
"""
return self.server in HttpResourcePath.SUPPORTED_URL_SIGNERS

@classmethod
def _reload_config(cls) -> None:
"""Reload the configuration for all instances of this class. That
configuration is instantiated from the environment.

This is an internal method mainly intended for tests.
"""
HttpResourcePath._config = HttpResourcePathConfig()

def exists(self) -> bool:
"""Check that a remote HTTP resource exists."""
log.debug("Checking if resource exists: %s", self.geturl())
Expand Down Expand Up @@ -1308,6 +1336,26 @@
f"method HttpResourcePath.to_fsspec() not implemented for directory {self}"
)

# If usage of fsspec-compatible file system is disabled in the
# configuration we raise an exception which signals the caller
# that it cannot use fsspec. An example of such a caller is
# `lsst.daf.butler.formatters.ParquetFormatter`.
#
# Note that we don't call super().to_fsspec() since that method
# assumes that fsspec can be used provided fsspec package is
# importable.
#
# The motivation for making this configurable is that for HTTP
# URLs fsspec.HTTPFileSystem uses async I/O and we have found
# unexpected behavior by clients when used against dCache for reading
# parquet files via a ParquetFormatter instance. That behavior cannot
# be reproduced when using other callers.
#
# This needs more investigation to discard the possibility that async
# I/O, used by fsspec.HTTPFileSystem, is related to this behavior.
if not self._config.fsspec_is_enabled:
raise ImportError("fsspec is disabled for HttpResourcePath objects with webDAV back end")

Check warning on line 1357 in python/lsst/resources/http.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/http.py#L1357

Added line #L1357 was not covered by tests

async def get_client_session(**kwargs: Any) -> ClientSession:
"""Return a aiohttp.ClientSession configured to use an
`aiohttp.TCPConnector` shared by all instances of this class.
Expand All @@ -1325,14 +1373,42 @@
TCP connections to the server.
"""
if HttpResourcePath._tcp_connector is None:
HttpResourcePath._tcp_connector = TCPConnector(ssl=self._config.ssl_context)
HttpResourcePath._tcp_connector = TCPConnector(

Check warning on line 1376 in python/lsst/resources/http.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/http.py#L1376

Added line #L1376 was not covered by tests
# SSL context equipped with client credentials and
# configured to validate server certificates.
ssl=self._config.ssl_context,
# Total number of simultaneous connections this connector
# keeps open with any host.
#
# The default is 100 but we deliberately reduced it to
# avoid keeping a large number of open connexions to file
# servers when thousands of quanta execute simultaneously.
#
# In any case, new connexions are automatically established
# when needed.
limit=10,
# Number of simultaneous connections to a single host:port.
limit_per_host=1,
# Close network connection after usage
force_close=True,
)

return ClientSession(connector=HttpResourcePath._tcp_connector, **kwargs)
connect_timeout, read_timeout = self._config.timeout
return ClientSession(

Check warning on line 1397 in python/lsst/resources/http.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/http.py#L1396-L1397

Added lines #L1396 - L1397 were not covered by tests
connector=HttpResourcePath._tcp_connector,
timeout=ClientTimeout(
connect=connect_timeout,
sock_connect=connect_timeout,
sock_read=read_timeout,
total=2 * read_timeout,
),
**kwargs,
)

# Retrieve a signed URL for download valid for 1 hour.
url = self.generate_presigned_get_url(expiration_time_seconds=3_600)
# Retrieve a signed URL for download valid for 2 hours.
url = self.generate_presigned_get_url(expiration_time_seconds=2 * 3_600)

Check warning on line 1409 in python/lsst/resources/http.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/http.py#L1409

Added line #L1409 was not covered by tests

# HTTPFileSystem constructors accepts the argument 'block_size'. The
# HTTPFileSystem constructor accepts the argument 'block_size'. The
# default value is 'fsspec.utils.DEFAULT_BLOCK_SIZE' which is 5 MB.
# That seems to be a reasonable block size for downloading files.
return HTTPFileSystem(get_client=get_client_session), url
Expand Down
4 changes: 4 additions & 0 deletions python/lsst/resources/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,10 @@
fs, path = uri.to_fsspec()
except NotImplementedError as e:
raise unittest.SkipTest(str(e)) from e
except ImportError as e:

Check warning on line 923 in python/lsst/resources/tests.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/tests.py#L923

Added line #L923 was not covered by tests
# HttpResourcePath.to_fsspec() raises if support
# of fsspec for webDAV back ends is disabled.
raise unittest.SkipTest(str(e)) from e

Check warning on line 926 in python/lsst/resources/tests.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/resources/tests.py#L926

Added line #L926 was not covered by tests
with fs.open(path, "r") as fd:
as_read = fd.read()
self.assertEqual(as_read, content)
Expand Down
97 changes: 70 additions & 27 deletions tests/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@
HttpReadResourceHandle,
parse_content_range_header,
)
from lsst.resources.http import BearerTokenAuth, HttpResourcePathConfig, SessionStore, _is_protected
from lsst.resources.http import (
BearerTokenAuth,
HttpResourcePath,
HttpResourcePathConfig,
SessionStore,
_is_protected,
)
from lsst.resources.tests import GenericReadWriteTestCase, GenericTestCase
from lsst.resources.utils import makeTestTempDir, removeTestTempDir

Expand Down Expand Up @@ -411,7 +417,7 @@
os.remove(local_file)

def test_dav_to_fsspec(self):
# Upload a randomly-generated file via write() with overwrite
# Upload a randomly-generated file via write() with overwrite.
local_file, file_size = self._generate_file()
with open(local_file, "rb") as f:
data = f.read()
Expand All @@ -420,31 +426,57 @@
self.assertIsNone(remote_file.write(data, overwrite=True))
self.assertTrue(remote_file.exists())
self.assertEqual(remote_file.size(), file_size)
remote_file_url = remote_file.geturl()

try:
# Ensure that the contents of the remote file we just uploaded is
# identical to the contents of that file when retrieved via
# fsspec.open().
fsys, url = remote_file.to_fsspec()
with fsys.open(url) as f:
self.assertEqual(data, f.read())

# Ensure the contents is identical to the result of fsspec.cat()
self.assertEqual(data, fsys.cat(url))
except NotImplementedError as e:
# to_fsspec() must succeed if remote server knows how to sign URLs
if remote_file.server_signs_urls:
raise e
finally:
os.remove(local_file)

# Ensure that attempting to modify a remote via via fsspec fails.
# fsspect.rm() raises NotImplementedError if it cannot remove the
# remote file.
if remote_file.server_signs_urls:
fsys, url = remote_file.to_fsspec()
with self.assertRaises(NotImplementedError):
fsys.rm(url)
# to_fsspec() may raise if that feature is not specifically
# enabled in the environment and remote server is one of the
# webDAV servers that support signing URLs.
with unittest.mock.patch.dict(os.environ, {}, clear=True):
try:
# Force reinitialization of the config from the environment
HttpResourcePath._reload_config()
fsys, url = ResourcePath(remote_file_url).to_fsspec()
self.assertEqual(data, fsys.cat(url))
except ImportError as e:
self.assertTrue("disable" in str(e))

Check warning on line 441 in tests/test_http.py

View check run for this annotation

Codecov / codecov/patch

tests/test_http.py#L440-L441

Added lines #L440 - L441 were not covered by tests

# Ensure to_fsspec() works if that feature is enabled in the
# environment.
with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_ENABLE_FSSPEC": "true"}, clear=True):
try:
# Force reinitialization of the config from the environment.
HttpResourcePath._reload_config()
rpath = ResourcePath(remote_file_url)

# Ensure that the contents of the remote file we just
# uploaded is identical to the contents of that file when
# retrieved via fsspec.open().
fsys, url = rpath.to_fsspec()
with fsys.open(url) as f:
self.assertEqual(data, f.read())

# Ensure the contents is identical to the result of
# fsspec.cat()
self.assertEqual(data, fsys.cat(url))

# Ensure that attempting to modify a remote via via fsspec
# fails, since the returned URL is signed for download only.
# fsspec.rm() raises NotImplementedError if it cannot remove
# the remote file.
if rpath.server_signs_urls:
with self.assertRaises(NotImplementedError):
fsys, url = rpath.to_fsspec()
fsys.rm(url)
except NotImplementedError as e:

Check warning on line 470 in tests/test_http.py

View check run for this annotation

Codecov / codecov/patch

tests/test_http.py#L467-L470

Added lines #L467 - L470 were not covered by tests
# to_fsspec() must succeed if remote server knows how to
# sign URLs
if rpath.server_signs_urls:
raise e

Check warning on line 474 in tests/test_http.py

View check run for this annotation

Codecov / codecov/patch

tests/test_http.py#L474

Added line #L474 was not covered by tests

# Force reinitialization of the config from the environment and
# clean up local file.
HttpResourcePath._reload_config()
os.remove(local_file)

@responses.activate
def test_is_webdav_endpoint(self):
Expand Down Expand Up @@ -619,7 +651,7 @@

def test_send_expect_header(self):
# Ensure environment variable LSST_HTTP_PUT_SEND_EXPECT_HEADER is
# inspected to initialize the HttpResourcePathConfig config class.
# inspected to initialize the HttpResourcePathConfig class.
with unittest.mock.patch.dict(os.environ, {}, clear=True):
config = HttpResourcePathConfig()
self.assertFalse(config.send_expect_on_put)
Expand All @@ -628,6 +660,17 @@
config = HttpResourcePathConfig()
self.assertTrue(config.send_expect_on_put)

def test_enable_fsspec(self):
# Ensure environment variable LSST_HTTP_ENABLE_FSSPEC is
# inspected to initialize the HttpResourcePathConfig class.
with unittest.mock.patch.dict(os.environ, {}, clear=True):
config = HttpResourcePathConfig()
self.assertFalse(config.fsspec_is_enabled)

with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_ENABLE_FSSPEC": "any value"}, clear=True):
config = HttpResourcePathConfig()
self.assertTrue(config.fsspec_is_enabled)

def test_collect_memory_usage(self):
# Ensure environment variable LSST_HTTP_COLLECT_MEMORY_USAGE is
# inspected to initialize the HttpResourcePathConfig class.
Expand Down
Loading