Skip to content

Commit 9adbd98

Browse files
authored
Merge pull request #91 from lsst/tickets/DM-44547
DM-44547: Add fsspec constructor
2 parents 5ea827e + f02ff63 commit 9adbd98

File tree

12 files changed

+230
-19
lines changed

12 files changed

+230
-19
lines changed

.github/workflows/build.yaml

+6-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
runs-on: ubuntu-latest
1414
strategy:
1515
matrix:
16-
python-version: ["3.11", "3.12"]
16+
python-version: ["3.11", "3.12", "3.13"]
1717

1818
steps:
1919
- uses: actions/checkout@v4
@@ -27,6 +27,7 @@ jobs:
2727
python-version: ${{ matrix.python-version }}
2828
cache: "pip"
2929
cache-dependency-path: "setup.cfg"
30+
allow-prereleases: true
3031

3132
- name: Update pip/wheel infrastructure
3233
shell: bash -l {0}
@@ -46,6 +47,10 @@ jobs:
4647
run: |
4748
uv pip install --system google-cloud-storage
4849
50+
- name: Install fsspec for testing
51+
run: |
52+
uv pip install --system fsspec s3fs
53+
4954
- name: Install dependencies
5055
run: |
5156
uv pip install --system -r requirements.txt

doc/changes/DM-44547.feature.rst

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added a new method ``ResourcePath.to_fsspec()`` to return ``fsspec`` file system objects suitable for use in packages such as Astropy and Pyarrow.

doc/conf.py

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
doxylink = {}
1313
exclude_patterns = ["changes/*"]
1414

15+
intersphinx_mapping["fsspec"] = ("https://filesystem-spec.readthedocs.io/en/latest/", None) # noqa: F405
16+
1517
nitpick_ignore_regex = [
1618
("py:(class|obj)", ".*ResourceHandle.U$"),
1719
("py:(class|obj)", "re.Pattern"),

mypy.ini

+6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ ignore_missing_imports = True
1717
[mypy-wsgidav.*]
1818
ignore_missing_imports = True
1919

20+
[mypy-fsspec.*]
21+
ignore_missing_imports = True
22+
23+
[mypy-s3fs.*]
24+
ignore_missing_imports = True
25+
2026
[mypy-defusedxml.*]
2127
ignore_missing_imports = True
2228

python/lsst/resources/_resourcePath.py

+23-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
from __future__ import annotations
1313

14+
__all__ = ("ResourcePath", "ResourcePathExpression")
15+
1416
import concurrent.futures
1517
import contextlib
1618
import copy
@@ -26,7 +28,12 @@
2628
from pathlib import Path, PurePath, PurePosixPath
2729
from random import Random
2830

29-
__all__ = ("ResourcePath", "ResourcePathExpression")
31+
try:
32+
import fsspec
33+
from fsspec.spec import AbstractFileSystem
34+
except ImportError:
35+
fsspec = None
36+
AbstractFileSystem = type
3037

3138
from collections.abc import Iterable, Iterator
3239
from typing import TYPE_CHECKING, Any, Literal, overload
@@ -409,6 +416,21 @@ def geturl(self) -> str:
409416
"""
410417
return self._uri.geturl()
411418

419+
def to_fsspec(self) -> tuple[AbstractFileSystem, str]:
420+
"""Return an abstract file system and path that can be used by fsspec.
421+
422+
Returns
423+
-------
424+
fs : `fsspec.spec.AbstractFileSystem`
425+
A file system object suitable for use with the returned path.
426+
path : `str`
427+
A path that can be opened by the file system object.
428+
"""
429+
if fsspec is None:
430+
raise ImportError("fsspec is not available")
431+
# By default give the URL to fsspec and hope.
432+
return fsspec.url_to_fs(self.geturl())
433+
412434
def root_uri(self) -> ResourcePath:
413435
"""Return the base root URI.
414436

python/lsst/resources/file.py

+23
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929
from ._resourcePath import ResourcePath
3030
from .utils import NoTransaction, ensure_directory_is_writeable, os2posix, posix2os
3131

32+
try:
33+
import fsspec
34+
from fsspec.spec import AbstractFileSystem
35+
except ImportError:
36+
fsspec = None
37+
AbstractFileSystem = type
38+
3239
if TYPE_CHECKING:
3340
from .utils import TransactionProtocol
3441

@@ -482,6 +489,22 @@ def _openImpl(
482489
with FileResourceHandle(mode=mode, log=log, uri=self, encoding=encoding) as buffer:
483490
yield buffer # type: ignore
484491

492+
def to_fsspec(self) -> tuple[AbstractFileSystem, str]:
493+
"""Return an abstract file system and path that can be used by fsspec.
494+
495+
Returns
496+
-------
497+
fs : `fsspec.spec.AbstractFileSystem`
498+
A file system object suitable for use with the returned path.
499+
path : `str`
500+
A path that can be opened by the file system object.
501+
"""
502+
if fsspec is None:
503+
raise ImportError("fsspec is not available")
504+
# fsspec does not like URL encodings in file URIs so pass it the os
505+
# path instead.
506+
return fsspec.url_to_fs(self.ospath)
507+
485508

486509
def _create_directories(name: str | bytes) -> None:
487510
"""Create a directory and all of its parent directories that don't yet

python/lsst/resources/packageresource.py

+27
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@
1818
import re
1919
from collections.abc import Iterator
2020
from importlib import resources
21+
from typing import TYPE_CHECKING
22+
23+
if TYPE_CHECKING:
24+
try:
25+
import fsspec
26+
from fsspec.spec import AbstractFileSystem
27+
except ImportError:
28+
fsspec = None
29+
AbstractFileSystem = type
2130

2231
from ._resourceHandles._baseResourceHandle import ResourceHandleProtocol
2332
from ._resourcePath import ResourcePath
@@ -163,3 +172,21 @@ def walk(
163172
for dir in dirs:
164173
new_uri = self.join(dir, forceDirectory=True)
165174
yield from new_uri.walk(file_filter)
175+
176+
def to_fsspec(self) -> tuple[AbstractFileSystem, str]:
177+
"""Return an abstract file system and path that can be used by fsspec.
178+
179+
Python package resources are effectively local files in most cases
180+
but can be found inside ZIP files. To support this we would have
181+
to change this API to a context manager (using
182+
``importlib.resources.as_file``) or find an API where fsspec knows
183+
about python package resource.
184+
185+
Returns
186+
-------
187+
fs : `fsspec.spec.AbstractFileSystem`
188+
A file system object suitable for use with the returned path.
189+
path : `str`
190+
A path that can be opened by the file system object.
191+
"""
192+
raise NotImplementedError("fsspec can not be used with python package resources.")

python/lsst/resources/s3.py

+37
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
from ._resourceHandles._s3ResourceHandle import S3ResourceHandle
3232
from ._resourcePath import ResourcePath
3333
from .s3utils import (
34+
_get_s3_connection_parameters,
35+
_s3_disable_bucket_validation,
36+
_s3_should_validate_bucket,
3437
all_retryable_errors,
3538
backoff,
3639
bucketExists,
@@ -46,6 +49,13 @@
4649
except ImportError:
4750
TransferConfig = None
4851

52+
try:
53+
import s3fs
54+
from fsspec.spec import AbstractFileSystem
55+
except ImportError:
56+
s3fs = None
57+
AbstractFileSystem = type
58+
4959
if TYPE_CHECKING:
5060
with contextlib.suppress(ImportError):
5161
import boto3
@@ -307,6 +317,33 @@ def _download_file(
307317
translate_client_error(err, self)
308318
raise
309319

320+
def to_fsspec(self) -> tuple[AbstractFileSystem, str]:
321+
"""Return an abstract file system and path that can be used by fsspec.
322+
323+
Returns
324+
-------
325+
fs : `fsspec.spec.AbstractFileSystem`
326+
A file system object suitable for use with the returned path.
327+
path : `str`
328+
A path that can be opened by the file system object.
329+
"""
330+
if s3fs is None:
331+
raise ImportError("s3fs is not available")
332+
# Must remove the profile from the URL and form it again.
333+
endpoint_config = _get_s3_connection_parameters(self._profile)
334+
s3 = s3fs.S3FileSystem(
335+
profile=endpoint_config.profile,
336+
endpoint_url=endpoint_config.endpoint_url,
337+
key=endpoint_config.access_key_id,
338+
secret=endpoint_config.secret_access_key,
339+
)
340+
if not _s3_should_validate_bucket():
341+
# Accessing the s3 property forces the boto client to be
342+
# constructed and cached and allows the validation to be removed.
343+
_s3_disable_bucket_validation(s3.s3)
344+
345+
return s3, f"{self._bucket}/{self.relativeToPathRoot}"
346+
310347
def _as_local(self) -> tuple[str, bool]:
311348
"""Download object from S3 and place in temporary directory.
312349

python/lsst/resources/s3utils.py

+61-17
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,36 @@ def getS3Client(profile: str | None = None) -> boto3.client:
221221
if botocore is None:
222222
raise ModuleNotFoundError("Could not find botocore. Are you sure it is installed?")
223223

224+
endpoint_config = _get_s3_connection_parameters(profile)
225+
226+
return _get_s3_client(endpoint_config, not _s3_should_validate_bucket())
227+
228+
229+
def _s3_should_validate_bucket() -> bool:
230+
"""Indicate whether bucket validation should be enabled.
231+
232+
Returns
233+
-------
234+
validate : `bool`
235+
If `True` bucket names should be validated.
236+
"""
237+
disable_value = os.environ.get("LSST_DISABLE_BUCKET_VALIDATION", "0")
238+
return bool(re.search(r"^(0|f|n|false)?$", disable_value, re.I))
239+
240+
241+
def _get_s3_connection_parameters(profile: str | None = None) -> _EndpointConfig:
242+
"""Calculate the connection details.
243+
244+
Parameters
245+
----------
246+
profile : `str`, optional
247+
The name of an S3 profile describing which S3 service to use.
248+
249+
Returns
250+
-------
251+
config : _EndPointConfig
252+
All the information necessary to connect to the bucket.
253+
"""
224254
endpoint = None
225255
if profile is not None:
226256
var_name = f"LSST_RESOURCES_S3_PROFILE_{profile}"
@@ -230,26 +260,29 @@ def getS3Client(profile: str | None = None) -> boto3.client:
230260
if not endpoint:
231261
endpoint = None # Handle ""
232262

233-
disable_value = os.environ.get("LSST_DISABLE_BUCKET_VALIDATION", "0")
234-
skip_validation = not re.search(r"^(0|f|n|false)?$", disable_value, re.I)
263+
return _parse_endpoint_config(endpoint, profile)
264+
265+
266+
def _s3_disable_bucket_validation(client: boto3.client) -> None:
267+
"""Disable the bucket name validation in the client.
235268
236-
return _get_s3_client(endpoint, profile, skip_validation)
269+
This removes the ``validate_bucket_name`` handler from the handlers
270+
registered for this client.
271+
272+
Parameters
273+
----------
274+
client : `boto3.client`
275+
The client to modify.
276+
"""
277+
client.meta.events.unregister("before-parameter-build.s3", validate_bucket_name)
237278

238279

239280
@functools.lru_cache
240-
def _get_s3_client(endpoint: str | None, profile: str | None, skip_validation: bool) -> boto3.client:
281+
def _get_s3_client(endpoint_config: _EndpointConfig, skip_validation: bool) -> boto3.client:
241282
# Helper function to cache the client for this endpoint
242283
config = botocore.config.Config(read_timeout=180, retries={"mode": "adaptive", "max_attempts": 10})
243284

244-
endpoint_config = _parse_endpoint_config(endpoint)
245-
246-
if endpoint_config.access_key_id is not None and endpoint_config.secret_access_key is not None:
247-
# We already have the necessary configuration for the profile, so do
248-
# not pass the profile to boto3. boto3 will raise an exception if the
249-
# profile is not defined in its configuration file, whether or not it
250-
# needs to read the configuration from it.
251-
profile = None
252-
session = boto3.Session(profile_name=profile)
285+
session = boto3.Session(profile_name=endpoint_config.profile)
253286

254287
client = session.client(
255288
"s3",
@@ -259,19 +292,20 @@ def _get_s3_client(endpoint: str | None, profile: str | None, skip_validation: b
259292
config=config,
260293
)
261294
if skip_validation:
262-
client.meta.events.unregister("before-parameter-build.s3", validate_bucket_name)
295+
_s3_disable_bucket_validation(client)
263296
return client
264297

265298

266299
class _EndpointConfig(NamedTuple):
267300
endpoint_url: str | None = None
268301
access_key_id: str | None = None
269302
secret_access_key: str | None = None
303+
profile: str | None = None
270304

271305

272-
def _parse_endpoint_config(endpoint: str | None) -> _EndpointConfig:
306+
def _parse_endpoint_config(endpoint: str | None, profile: str | None = None) -> _EndpointConfig:
273307
if not endpoint:
274-
return _EndpointConfig()
308+
return _EndpointConfig(profile=profile)
275309

276310
parsed = parse_url(endpoint)
277311

@@ -288,8 +322,18 @@ def _parse_endpoint_config(endpoint: str | None) -> _EndpointConfig:
288322
access_key_id = urllib.parse.unquote(access_key_id)
289323
secret_access_key = urllib.parse.unquote(secret_access_key)
290324

325+
if access_key_id is not None and secret_access_key is not None:
326+
# We already have the necessary configuration for the profile, so do
327+
# not pass the profile to boto3. boto3 will raise an exception if the
328+
# profile is not defined in its configuration file, whether or not it
329+
# needs to read the configuration from it.
330+
profile = None
331+
291332
return _EndpointConfig(
292-
endpoint_url=endpoint_url, access_key_id=access_key_id, secret_access_key=secret_access_key
333+
endpoint_url=endpoint_url,
334+
access_key_id=access_key_id,
335+
secret_access_key=secret_access_key,
336+
profile=profile,
293337
)
294338

295339

python/lsst/resources/tests.py

+23
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
from collections.abc import Iterable
2424
from typing import TYPE_CHECKING, Any
2525

26+
try:
27+
import fsspec
28+
except ImportError:
29+
fsspec = None
30+
2631
from lsst.resources import ResourcePath
2732
from lsst.resources.utils import makeTestTempDir, removeTestTempDir
2833

@@ -882,6 +887,24 @@ def test_temporary(self) -> None:
882887
with ResourcePath.temporary_uri(prefix=self.root_uri, suffix="xxx/") as tmp:
883888
pass
884889

890+
@unittest.skipIf(fsspec is None, "fsspec is not available.")
891+
def test_fsspec(self) -> None:
892+
"""Simple read of a file."""
893+
uri = self.tmpdir.join("test.txt")
894+
self.assertFalse(uri.exists(), f"{uri} should not exist")
895+
self.assertTrue(uri.path.endswith("test.txt"))
896+
897+
content = "abcdefghijklmnopqrstuv\n"
898+
uri.write(content.encode())
899+
900+
try:
901+
fs, path = uri.to_fsspec()
902+
except NotImplementedError as e:
903+
raise unittest.SkipTest(str(e)) from e
904+
with fs.open(path, "r") as fd:
905+
as_read = fd.read()
906+
self.assertEqual(as_read, content)
907+
885908
def test_open(self) -> None:
886909
tmpdir = ResourcePath(self.tmpdir, forceDirectory=True)
887910
with ResourcePath.temporary_uri(prefix=tmpdir, suffix=".txt") as tmp:

0 commit comments

Comments
 (0)