Skip to content

Commit 31881f6

Browse files
authored
Merge pull request #99 from lsst/tickets/DM-47357
DM-47357: Explicitly configure timeouts for fsspec
2 parents d73b774 + 89e2b36 commit 31881f6

File tree

3 files changed

+158
-35
lines changed

3 files changed

+158
-35
lines changed

python/lsst/resources/http.py

+84-8
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
try:
4141
import fsspec
42-
from aiohttp import ClientSession, TCPConnector
42+
from aiohttp import ClientSession, ClientTimeout, TCPConnector
4343
from fsspec.implementations.http import HTTPFileSystem
4444
from fsspec.spec import AbstractFileSystem
4545
except ImportError:
@@ -103,7 +103,7 @@ class HttpResourcePathConfig:
103103
"""
104104

105105
# Default timeouts for all HTTP requests (seconds).
106-
DEFAULT_TIMEOUT_CONNECT: float = 30.0
106+
DEFAULT_TIMEOUT_CONNECT: float = 60.0
107107
DEFAULT_TIMEOUT_READ: float = 1_500.0
108108

109109
# Default lower and upper bounds for the backoff interval (seconds).
@@ -125,6 +125,7 @@ def __init__(self) -> None:
125125
self._back_end_connections: int | None = None
126126
self._digest_algorithm: str | None = None
127127
self._send_expect_on_put: bool | None = None
128+
self._fsspec_is_enabled: bool | None = None
128129
self._timeout: tuple[float, float] | None = None
129130
self._collect_memory_usage: bool | None = None
130131
self._backoff_min: float | None = None
@@ -206,6 +207,20 @@ def send_expect_on_put(self) -> bool:
206207
self._send_expect_on_put = "LSST_HTTP_PUT_SEND_EXPECT_HEADER" in os.environ
207208
return self._send_expect_on_put
208209

210+
@property
211+
def fsspec_is_enabled(self) -> bool:
212+
"""Return True if `fsspec` is enabled for objects of class
213+
HttpResourcePath.
214+
215+
To determine if `fsspec` is enabled, this method inspects the presence
216+
of the environment variable `LSST_HTTP_ENABLE_FSSPEC` (with any value).
217+
"""
218+
if self._fsspec_is_enabled is not None:
219+
return self._fsspec_is_enabled
220+
221+
self._fsspec_is_enabled = "LSST_HTTP_ENABLE_FSSPEC" in os.environ
222+
return self._fsspec_is_enabled
223+
209224
@property
210225
def timeout(self) -> tuple[float, float]:
211226
"""Return a tuple with the values of timeouts for connecting to the
@@ -734,6 +749,10 @@ class HttpResourcePath(ResourcePath):
734749
via a PUT request. No digest is requested if this variable is not set
735750
or is set to an invalid value.
736751
Valid values are those in ACCEPTED_DIGESTS.
752+
753+
- LSST_HTTP_ENABLE_FSSPEC: the presence of this environment variable
754+
activates the usage of `fsspec` compatible file system to read
755+
a HTTP URL. The value of the variable is not inspected.
737756
"""
738757

739758
# WebDAV servers known to be able to sign URLs. The values are lowercased
@@ -742,7 +761,7 @@ class HttpResourcePath(ResourcePath):
742761
SUPPORTED_URL_SIGNERS = ("dcache", "xrootd")
743762

744763
# Configuration items for this class instances.
745-
_config = HttpResourcePathConfig()
764+
_config: HttpResourcePathConfig = HttpResourcePathConfig()
746765

747766
# The session for metadata requests is used for interacting with
748767
# the front end servers for requests such as PROPFIND, HEAD, etc. Those
@@ -897,6 +916,15 @@ def server_signs_urls(self) -> bool:
897916
"""
898917
return self.server in HttpResourcePath.SUPPORTED_URL_SIGNERS
899918

919+
@classmethod
920+
def _reload_config(cls) -> None:
921+
"""Reload the configuration for all instances of this class. That
922+
configuration is instantiated from the environment.
923+
924+
This is an internal method mainly intended for tests.
925+
"""
926+
HttpResourcePath._config = HttpResourcePathConfig()
927+
900928
def exists(self) -> bool:
901929
"""Check that a remote HTTP resource exists."""
902930
log.debug("Checking if resource exists: %s", self.geturl())
@@ -1308,6 +1336,26 @@ def to_fsspec(self) -> tuple[AbstractFileSystem, str]:
13081336
f"method HttpResourcePath.to_fsspec() not implemented for directory {self}"
13091337
)
13101338

1339+
# If usage of fsspec-compatible file system is disabled in the
1340+
# configuration we raise an exception which signals the caller
1341+
# that it cannot use fsspec. An example of such a caller is
1342+
# `lsst.daf.butler.formatters.ParquetFormatter`.
1343+
#
1344+
# Note that we don't call super().to_fsspec() since that method
1345+
# assumes that fsspec can be used provided fsspec package is
1346+
# importable.
1347+
#
1348+
# The motivation for making this configurable is that for HTTP
1349+
# URLs fsspec.HTTPFileSystem uses async I/O and we have found
1350+
# unexpected behavior by clients when used against dCache for reading
1351+
# parquet files via a ParquetFormatter instance. That behavior cannot
1352+
# be reproduced when using other callers.
1353+
#
1354+
# This needs more investigation to discard the possibility that async
1355+
# I/O, used by fsspec.HTTPFileSystem, is related to this behavior.
1356+
if not self._config.fsspec_is_enabled:
1357+
raise ImportError("fsspec is disabled for HttpResourcePath objects with webDAV back end")
1358+
13111359
async def get_client_session(**kwargs: Any) -> ClientSession:
13121360
"""Return a aiohttp.ClientSession configured to use an
13131361
`aiohttp.TCPConnector` shared by all instances of this class.
@@ -1325,14 +1373,42 @@ async def get_client_session(**kwargs: Any) -> ClientSession:
13251373
TCP connections to the server.
13261374
"""
13271375
if HttpResourcePath._tcp_connector is None:
1328-
HttpResourcePath._tcp_connector = TCPConnector(ssl=self._config.ssl_context)
1376+
HttpResourcePath._tcp_connector = TCPConnector(
1377+
# SSL context equipped with client credentials and
1378+
# configured to validate server certificates.
1379+
ssl=self._config.ssl_context,
1380+
# Total number of simultaneous connections this connector
1381+
# keeps open with any host.
1382+
#
1383+
# The default is 100 but we deliberately reduced it to
1384+
# avoid keeping a large number of open connexions to file
1385+
# servers when thousands of quanta execute simultaneously.
1386+
#
1387+
# In any case, new connexions are automatically established
1388+
# when needed.
1389+
limit=10,
1390+
# Number of simultaneous connections to a single host:port.
1391+
limit_per_host=1,
1392+
# Close network connection after usage
1393+
force_close=True,
1394+
)
13291395

1330-
return ClientSession(connector=HttpResourcePath._tcp_connector, **kwargs)
1396+
connect_timeout, read_timeout = self._config.timeout
1397+
return ClientSession(
1398+
connector=HttpResourcePath._tcp_connector,
1399+
timeout=ClientTimeout(
1400+
connect=connect_timeout,
1401+
sock_connect=connect_timeout,
1402+
sock_read=read_timeout,
1403+
total=2 * read_timeout,
1404+
),
1405+
**kwargs,
1406+
)
13311407

1332-
# Retrieve a signed URL for download valid for 1 hour.
1333-
url = self.generate_presigned_get_url(expiration_time_seconds=3_600)
1408+
# Retrieve a signed URL for download valid for 2 hours.
1409+
url = self.generate_presigned_get_url(expiration_time_seconds=2 * 3_600)
13341410

1335-
# HTTPFileSystem constructors accepts the argument 'block_size'. The
1411+
# HTTPFileSystem constructor accepts the argument 'block_size'. The
13361412
# default value is 'fsspec.utils.DEFAULT_BLOCK_SIZE' which is 5 MB.
13371413
# That seems to be a reasonable block size for downloading files.
13381414
return HTTPFileSystem(get_client=get_client_session), url

python/lsst/resources/tests.py

+4
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,10 @@ def test_fsspec(self) -> None:
920920
fs, path = uri.to_fsspec()
921921
except NotImplementedError as e:
922922
raise unittest.SkipTest(str(e)) from e
923+
except ImportError as e:
924+
# HttpResourcePath.to_fsspec() raises if support
925+
# of fsspec for webDAV back ends is disabled.
926+
raise unittest.SkipTest(str(e)) from e
923927
with fs.open(path, "r") as fd:
924928
as_read = fd.read()
925929
self.assertEqual(as_read, content)

tests/test_http.py

+70-27
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@
3939
HttpReadResourceHandle,
4040
parse_content_range_header,
4141
)
42-
from lsst.resources.http import BearerTokenAuth, HttpResourcePathConfig, SessionStore, _is_protected
42+
from lsst.resources.http import (
43+
BearerTokenAuth,
44+
HttpResourcePath,
45+
HttpResourcePathConfig,
46+
SessionStore,
47+
_is_protected,
48+
)
4349
from lsst.resources.tests import GenericReadWriteTestCase, GenericTestCase
4450
from lsst.resources.utils import makeTestTempDir, removeTestTempDir
4551

@@ -411,7 +417,7 @@ def test_dav_delete(self):
411417
os.remove(local_file)
412418

413419
def test_dav_to_fsspec(self):
414-
# Upload a randomly-generated file via write() with overwrite
420+
# Upload a randomly-generated file via write() with overwrite.
415421
local_file, file_size = self._generate_file()
416422
with open(local_file, "rb") as f:
417423
data = f.read()
@@ -420,31 +426,57 @@ def test_dav_to_fsspec(self):
420426
self.assertIsNone(remote_file.write(data, overwrite=True))
421427
self.assertTrue(remote_file.exists())
422428
self.assertEqual(remote_file.size(), file_size)
429+
remote_file_url = remote_file.geturl()
423430

424-
try:
425-
# Ensure that the contents of the remote file we just uploaded is
426-
# identical to the contents of that file when retrieved via
427-
# fsspec.open().
428-
fsys, url = remote_file.to_fsspec()
429-
with fsys.open(url) as f:
430-
self.assertEqual(data, f.read())
431-
432-
# Ensure the contents is identical to the result of fsspec.cat()
433-
self.assertEqual(data, fsys.cat(url))
434-
except NotImplementedError as e:
435-
# to_fsspec() must succeed if remote server knows how to sign URLs
436-
if remote_file.server_signs_urls:
437-
raise e
438-
finally:
439-
os.remove(local_file)
440-
441-
# Ensure that attempting to modify a remote via via fsspec fails.
442-
# fsspect.rm() raises NotImplementedError if it cannot remove the
443-
# remote file.
444-
if remote_file.server_signs_urls:
445-
fsys, url = remote_file.to_fsspec()
446-
with self.assertRaises(NotImplementedError):
447-
fsys.rm(url)
431+
# to_fsspec() may raise if that feature is not specifically
432+
# enabled in the environment and remote server is one of the
433+
# webDAV servers that support signing URLs.
434+
with unittest.mock.patch.dict(os.environ, {}, clear=True):
435+
try:
436+
# Force reinitialization of the config from the environment
437+
HttpResourcePath._reload_config()
438+
fsys, url = ResourcePath(remote_file_url).to_fsspec()
439+
self.assertEqual(data, fsys.cat(url))
440+
except ImportError as e:
441+
self.assertTrue("disable" in str(e))
442+
443+
# Ensure to_fsspec() works if that feature is enabled in the
444+
# environment.
445+
with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_ENABLE_FSSPEC": "true"}, clear=True):
446+
try:
447+
# Force reinitialization of the config from the environment.
448+
HttpResourcePath._reload_config()
449+
rpath = ResourcePath(remote_file_url)
450+
451+
# Ensure that the contents of the remote file we just
452+
# uploaded is identical to the contents of that file when
453+
# retrieved via fsspec.open().
454+
fsys, url = rpath.to_fsspec()
455+
with fsys.open(url) as f:
456+
self.assertEqual(data, f.read())
457+
458+
# Ensure the contents is identical to the result of
459+
# fsspec.cat()
460+
self.assertEqual(data, fsys.cat(url))
461+
462+
# Ensure that attempting to modify a remote via via fsspec
463+
# fails, since the returned URL is signed for download only.
464+
# fsspec.rm() raises NotImplementedError if it cannot remove
465+
# the remote file.
466+
if rpath.server_signs_urls:
467+
with self.assertRaises(NotImplementedError):
468+
fsys, url = rpath.to_fsspec()
469+
fsys.rm(url)
470+
except NotImplementedError as e:
471+
# to_fsspec() must succeed if remote server knows how to
472+
# sign URLs
473+
if rpath.server_signs_urls:
474+
raise e
475+
476+
# Force reinitialization of the config from the environment and
477+
# clean up local file.
478+
HttpResourcePath._reload_config()
479+
os.remove(local_file)
448480

449481
@responses.activate
450482
def test_is_webdav_endpoint(self):
@@ -619,7 +651,7 @@ def tearDown(self):
619651

620652
def test_send_expect_header(self):
621653
# Ensure environment variable LSST_HTTP_PUT_SEND_EXPECT_HEADER is
622-
# inspected to initialize the HttpResourcePathConfig config class.
654+
# inspected to initialize the HttpResourcePathConfig class.
623655
with unittest.mock.patch.dict(os.environ, {}, clear=True):
624656
config = HttpResourcePathConfig()
625657
self.assertFalse(config.send_expect_on_put)
@@ -628,6 +660,17 @@ def test_send_expect_header(self):
628660
config = HttpResourcePathConfig()
629661
self.assertTrue(config.send_expect_on_put)
630662

663+
def test_enable_fsspec(self):
664+
# Ensure environment variable LSST_HTTP_ENABLE_FSSPEC is
665+
# inspected to initialize the HttpResourcePathConfig class.
666+
with unittest.mock.patch.dict(os.environ, {}, clear=True):
667+
config = HttpResourcePathConfig()
668+
self.assertFalse(config.fsspec_is_enabled)
669+
670+
with unittest.mock.patch.dict(os.environ, {"LSST_HTTP_ENABLE_FSSPEC": "any value"}, clear=True):
671+
config = HttpResourcePathConfig()
672+
self.assertTrue(config.fsspec_is_enabled)
673+
631674
def test_collect_memory_usage(self):
632675
# Ensure environment variable LSST_HTTP_COLLECT_MEMORY_USAGE is
633676
# inspected to initialize the HttpResourcePathConfig class.

0 commit comments

Comments
 (0)