Skip to content

Commit cda7a56

Browse files
authored
Merge pull request #82 from lsst/tickets/DM-42704
DM-42704: Support multiple S3 endpoints
2 parents fe12363 + fc81205 commit cda7a56

File tree

6 files changed

+246
-47
lines changed

6 files changed

+246
-47
lines changed

.github/workflows/lint.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ on:
77
pull_request:
88

99
jobs:
10-
call-workflow:
11-
uses: lsst/rubin_workflows/.github/workflows/lint.yaml@main
1210
ruff:
1311
runs-on: ubuntu-latest
1412
steps:

doc/changes/DM-42704.feature.rst

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
``S3ResourcePath`` now supports using multiple S3 endpoints simultaneously. This is configured using URIs in the form ``s3://profile@bucket/path`` and environment variables ``LSST_RESOURCES_S3_PROFILE_<profile>=https://<access key ID>:<secret key>@<s3 endpoint hostname>``.

python/lsst/resources/s3.py

+56-21
Original file line numberDiff line numberDiff line change
@@ -192,13 +192,48 @@ def _transfer_config(self) -> TransferConfig:
192192
@property
193193
def client(self) -> boto3.client:
194194
"""Client object to address remote resource."""
195-
# Defer import for circular dependencies
196-
return getS3Client()
195+
return getS3Client(self._profile)
196+
197+
@property
198+
def _profile(self) -> str | None:
199+
"""Profile name to use for looking up S3 credentials and endpoint."""
200+
return self._uri.username
201+
202+
@property
203+
def _bucket(self) -> str:
204+
"""S3 bucket where the files are stored."""
205+
# Notionally the bucket is stored in the 'hostname' part of the URI.
206+
# However, Ceph S3 uses a "multi-tenant" syntax for bucket names in the
207+
# form 'tenant:bucket'. The part after the colon is parsed as the port
208+
# portion of the URI, and urllib throws an exception if you try to read
209+
# a non-integer port value. So manually split off this portion of the
210+
# URI.
211+
split = self._uri.netloc.split("@")
212+
num_components = len(split)
213+
if num_components == 2:
214+
# There is a profile@ portion of the URL, so take the second half.
215+
bucket = split[1]
216+
elif num_components == 1:
217+
# There is no profile@, so take the whole netloc.
218+
bucket = split[0]
219+
else:
220+
raise ValueError(f"Unexpected extra '@' in S3 URI: '{str(self)}'")
221+
222+
if not bucket:
223+
raise ValueError(f"S3 URI does not include bucket name: '{str(self)}'")
224+
225+
return bucket
197226

198227
@classmethod
199228
def _mexists(cls, uris: Iterable[ResourcePath]) -> dict[ResourcePath, bool]:
200-
# Force client to be created before creating threads.
201-
getS3Client()
229+
# Force client to be created for each profile before creating threads.
230+
profiles = set[str | None]()
231+
for path in uris:
232+
if path.scheme == "s3":
233+
path = cast(S3ResourcePath, path)
234+
profiles.add(path._profile)
235+
for profile in profiles:
236+
getS3Client(profile)
202237

203238
return super()._mexists(uris)
204239

@@ -207,16 +242,16 @@ def exists(self) -> bool:
207242
"""Check that the S3 resource exists."""
208243
if self.is_root:
209244
# Only check for the bucket since the path is irrelevant
210-
return bucketExists(self.netloc)
211-
exists, _ = s3CheckFileExists(self, client=self.client)
245+
return bucketExists(self._bucket, self.client)
246+
exists, _ = s3CheckFileExists(self, bucket=self._bucket, client=self.client)
212247
return exists
213248

214249
@backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time)
215250
def size(self) -> int:
216251
"""Return the size of the resource in bytes."""
217252
if self.dirLike:
218253
return 0
219-
exists, sz = s3CheckFileExists(self, client=self.client)
254+
exists, sz = s3CheckFileExists(self, bucket=self._bucket, client=self.client)
220255
if not exists:
221256
raise FileNotFoundError(f"Resource {self} does not exist")
222257
return sz
@@ -229,7 +264,7 @@ def remove(self) -> None:
229264
# for checking all the keys again, reponse is HTTP 204 OK
230265
# response all the time
231266
try:
232-
self.client.delete_object(Bucket=self.netloc, Key=self.relativeToPathRoot)
267+
self.client.delete_object(Bucket=self._bucket, Key=self.relativeToPathRoot)
233268
except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
234269
raise FileNotFoundError("No such resource: {self}") from err
235270

@@ -239,7 +274,7 @@ def read(self, size: int = -1) -> bytes:
239274
if size > 0:
240275
args["Range"] = f"bytes=0-{size-1}"
241276
try:
242-
response = self.client.get_object(Bucket=self.netloc, Key=self.relativeToPathRoot, **args)
277+
response = self.client.get_object(Bucket=self._bucket, Key=self.relativeToPathRoot, **args)
243278
except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
244279
raise FileNotFoundError(f"No such resource: {self}") from err
245280
except ClientError as err:
@@ -255,20 +290,20 @@ def write(self, data: bytes, overwrite: bool = True) -> None:
255290
if not overwrite and self.exists():
256291
raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled")
257292
with time_this(log, msg="Write to %s", args=(self,)):
258-
self.client.put_object(Bucket=self.netloc, Key=self.relativeToPathRoot, Body=data)
293+
self.client.put_object(Bucket=self._bucket, Key=self.relativeToPathRoot, Body=data)
259294

260295
@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
261296
def mkdir(self) -> None:
262297
"""Write a directory key to S3."""
263-
if not bucketExists(self.netloc):
264-
raise ValueError(f"Bucket {self.netloc} does not exist for {self}!")
298+
if not bucketExists(self._bucket, self.client):
299+
raise ValueError(f"Bucket {self._bucket} does not exist for {self}!")
265300

266301
if not self.dirLike:
267302
raise NotADirectoryError(f"Can not create a 'directory' for file-like URI {self}")
268303

269304
# don't create S3 key when root is at the top-level of an Bucket
270305
if self.path != "/":
271-
self.client.put_object(Bucket=self.netloc, Key=self.relativeToPathRoot)
306+
self.client.put_object(Bucket=self._bucket, Key=self.relativeToPathRoot)
272307

273308
@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
274309
def _download_file(self, local_file: IO, progress: ProgressPercentage | None) -> None:
@@ -279,7 +314,7 @@ def _download_file(self, local_file: IO, progress: ProgressPercentage | None) ->
279314
"""
280315
try:
281316
self.client.download_fileobj(
282-
self.netloc,
317+
self._bucket,
283318
self.relativeToPathRoot,
284319
local_file,
285320
Callback=progress,
@@ -324,7 +359,7 @@ def _upload_file(self, local_file: ResourcePath, progress: ProgressPercentage |
324359
"""
325360
try:
326361
self.client.upload_file(
327-
local_file.ospath, self.netloc, self.relativeToPathRoot, Callback=progress
362+
local_file.ospath, self._bucket, self.relativeToPathRoot, Callback=progress
328363
)
329364
except self.client.exceptions.NoSuchBucket as err:
330365
raise NotADirectoryError(f"Target does not exist: {err}") from err
@@ -333,13 +368,13 @@ def _upload_file(self, local_file: ResourcePath, progress: ProgressPercentage |
333368
raise
334369

335370
@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
336-
def _copy_from(self, src: ResourcePath) -> None:
371+
def _copy_from(self, src: S3ResourcePath) -> None:
337372
copy_source = {
338-
"Bucket": src.netloc,
373+
"Bucket": src._bucket,
339374
"Key": src.relativeToPathRoot,
340375
}
341376
try:
342-
self.client.copy_object(CopySource=copy_source, Bucket=self.netloc, Key=self.relativeToPathRoot)
377+
self.client.copy_object(CopySource=copy_source, Bucket=self._bucket, Key=self.relativeToPathRoot)
343378
except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
344379
raise FileNotFoundError("No such resource to transfer: {self}") from err
345380
except ClientError as err:
@@ -469,7 +504,7 @@ def walk(
469504
filenames = []
470505
files_there = False
471506

472-
for page in s3_paginator.paginate(Bucket=self.netloc, Prefix=prefix, Delimiter="/"):
507+
for page in s3_paginator.paginate(Bucket=self._bucket, Prefix=prefix, Delimiter="/"):
473508
# All results are returned as full key names and we must
474509
# convert them back to the root form. The prefix is fixed
475510
# and delimited so that is a simple trim
@@ -507,7 +542,7 @@ def _openImpl(
507542
*,
508543
encoding: str | None = None,
509544
) -> Iterator[ResourceHandleProtocol]:
510-
with S3ResourceHandle(mode, log, self.client, self.netloc, self.relativeToPathRoot) as handle:
545+
with S3ResourceHandle(mode, log, self.client, self._bucket, self.relativeToPathRoot) as handle:
511546
if "b" in mode:
512547
yield handle
513548
else:
@@ -529,6 +564,6 @@ def generate_presigned_put_url(self, *, expiration_time_seconds: int) -> str:
529564
def _generate_presigned_url(self, method: str, expiration_time_seconds: int) -> str:
530565
return self.client.generate_presigned_url(
531566
method,
532-
Params={"Bucket": self.netloc, "Key": self.relativeToPathRoot},
567+
Params={"Bucket": self._bucket, "Key": self.relativeToPathRoot},
533568
ExpiresIn=expiration_time_seconds,
534569
)

python/lsst/resources/s3utils.py

+81-9
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,19 @@
3030
import functools
3131
import os
3232
import re
33+
import urllib.parse
3334
from collections.abc import Callable, Iterator
3435
from contextlib import contextmanager
3536
from http.client import HTTPException, ImproperConnectionState
3637
from types import ModuleType
37-
from typing import TYPE_CHECKING, Any, cast
38+
from typing import TYPE_CHECKING, Any, NamedTuple, cast
3839
from unittest.mock import patch
3940

4041
from botocore.exceptions import ClientError
4142
from botocore.handlers import validate_bucket_name
4243
from deprecated.sphinx import deprecated
4344
from urllib3.exceptions import HTTPError, RequestError
45+
from urllib3.util import Url, parse_url
4446

4547
if TYPE_CHECKING:
4648
from unittest import TestCase
@@ -178,18 +180,35 @@ def clean_test_environment_for_s3() -> Iterator[None]:
178180
yield
179181

180182

181-
def getS3Client() -> boto3.client:
183+
def getS3Client(profile: str | None = None) -> boto3.client:
182184
"""Create a S3 client with AWS (default) or the specified endpoint.
183185
186+
Parameters
187+
----------
188+
profile : `str`, optional
189+
The name of an S3 profile describing which S3 service to use.
190+
184191
Returns
185192
-------
186193
s3client : `botocore.client.S3`
187194
A client of the S3 service.
188195
189196
Notes
190197
-----
191-
The endpoint URL is from the environment variable S3_ENDPOINT_URL.
192-
If none is specified, the default AWS one is used.
198+
If an explicit profile name is specified, its configuration will be read
199+
from an environment variable named ``LSST_RESOURCES_S3_PROFILE_<profile>``
200+
if it exists. Note that the name of the profile is case sensitive. This
201+
configuration is specified in the format: ``https://<access key ID>:<secret
202+
key>@<s3 endpoint hostname>``. If the access key ID or secret key values
203+
contain slashes, the slashes must be URI-encoded (replace "/" with "%2F").
204+
205+
If profile is `None` or the profile environment variable was not set, the
206+
configuration is read from the environment variable ``S3_ENDPOINT_URL``.
207+
If it is not specified, the default AWS endpoint is used.
208+
209+
The access key ID and secret key are optional -- if not specified, they
210+
will be looked up via the `AWS credentials file
211+
<https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html>`_.
193212
194213
If the environment variable LSST_DISABLE_BUCKET_VALIDATION exists
195214
and has a value that is not empty, "0", "f", "n", or "false"
@@ -202,26 +221,78 @@ def getS3Client() -> boto3.client:
202221
if botocore is None:
203222
raise ModuleNotFoundError("Could not find botocore. Are you sure it is installed?")
204223

205-
endpoint = os.environ.get("S3_ENDPOINT_URL", None)
224+
endpoint = None
225+
if profile is not None:
226+
var_name = f"LSST_RESOURCES_S3_PROFILE_{profile}"
227+
endpoint = os.environ.get(var_name, None)
228+
if not endpoint:
229+
endpoint = os.environ.get("S3_ENDPOINT_URL", None)
206230
if not endpoint:
207231
endpoint = None # Handle ""
232+
208233
disable_value = os.environ.get("LSST_DISABLE_BUCKET_VALIDATION", "0")
209234
skip_validation = not re.search(r"^(0|f|n|false)?$", disable_value, re.I)
210235

211-
return _get_s3_client(endpoint, skip_validation)
236+
return _get_s3_client(endpoint, profile, skip_validation)
212237

213238

214239
@functools.lru_cache
215-
def _get_s3_client(endpoint: str, skip_validation: bool) -> boto3.client:
240+
def _get_s3_client(endpoint: str | None, profile: str | None, skip_validation: bool) -> boto3.client:
216241
# Helper function to cache the client for this endpoint
217242
config = botocore.config.Config(read_timeout=180, retries={"mode": "adaptive", "max_attempts": 10})
218243

219-
client = boto3.client("s3", endpoint_url=endpoint, config=config)
244+
endpoint_config = _parse_endpoint_config(endpoint)
245+
246+
if endpoint_config.access_key_id is not None and endpoint_config.secret_access_key is not None:
247+
# We already have the necessary configuration for the profile, so do
248+
# not pass the profile to boto3. boto3 will raise an exception if the
249+
# profile is not defined in its configuration file, whether or not it
250+
# needs to read the configuration from it.
251+
profile = None
252+
session = boto3.Session(profile_name=profile)
253+
254+
client = session.client(
255+
"s3",
256+
endpoint_url=endpoint_config.endpoint_url,
257+
aws_access_key_id=endpoint_config.access_key_id,
258+
aws_secret_access_key=endpoint_config.secret_access_key,
259+
config=config,
260+
)
220261
if skip_validation:
221262
client.meta.events.unregister("before-parameter-build.s3", validate_bucket_name)
222263
return client
223264

224265

266+
class _EndpointConfig(NamedTuple):
267+
endpoint_url: str | None = None
268+
access_key_id: str | None = None
269+
secret_access_key: str | None = None
270+
271+
272+
def _parse_endpoint_config(endpoint: str | None) -> _EndpointConfig:
273+
if not endpoint:
274+
return _EndpointConfig()
275+
276+
parsed = parse_url(endpoint)
277+
278+
# Strip the username/password portion of the URL from the result.
279+
endpoint_url = Url(host=parsed.host, path=parsed.path, port=parsed.port, scheme=parsed.scheme).url
280+
281+
access_key_id = None
282+
secret_access_key = None
283+
if parsed.auth:
284+
split = parsed.auth.split(":")
285+
if len(split) != 2:
286+
raise ValueError("S3 access key and secret not in expected format.")
287+
access_key_id, secret_access_key = split
288+
access_key_id = urllib.parse.unquote(access_key_id)
289+
secret_access_key = urllib.parse.unquote(secret_access_key)
290+
291+
return _EndpointConfig(
292+
endpoint_url=endpoint_url, access_key_id=access_key_id, secret_access_key=secret_access_key
293+
)
294+
295+
225296
def s3CheckFileExists(
226297
path: Location | ResourcePath | str,
227298
bucket: str | None = None,
@@ -268,7 +339,8 @@ def s3CheckFileExists(
268339
bucket = uri.netloc
269340
filepath = uri.relativeToPathRoot
270341
elif isinstance(path, ResourcePath | Location):
271-
bucket = path.netloc
342+
if bucket is None:
343+
bucket = path.netloc
272344
filepath = path.relativeToPathRoot
273345
else:
274346
raise TypeError(f"Unsupported path type: {path!r}.")

0 commit comments

Comments
 (0)