Skip to content

Commit 6d0019e

Browse files
committed
Allow multiple S3 endpoints
Allow S3 URLs in the form "s3://profile@bucket/...", with profiles configured via environment variables LSST_RESOURCES_S3_PROFILE_<profile>. This allows users to access multiple S3 services simultaneously.
1 parent fe12363 commit 6d0019e

File tree

3 files changed

+167
-44
lines changed

3 files changed

+167
-44
lines changed

python/lsst/resources/s3.py

+37-20
Original file line numberDiff line numberDiff line change
@@ -192,13 +192,30 @@ def _transfer_config(self) -> TransferConfig:
192192
@property
193193
def client(self) -> boto3.client:
194194
"""Client object to address remote resource."""
195-
# Defer import for circular dependencies
196-
return getS3Client()
195+
return getS3Client(self.profile)
196+
197+
@property
198+
def profile(self) -> str | None:
199+
return self._uri.username
200+
201+
@property
202+
def bucket(self) -> str:
203+
bucket = self._uri.hostname
204+
if not bucket:
205+
raise ValueError(f"S3 URI does not include bucket name: '{str(self)}'")
206+
207+
return bucket
197208

198209
@classmethod
199210
def _mexists(cls, uris: Iterable[ResourcePath]) -> dict[ResourcePath, bool]:
200211
# Force client to be created before creating threads.
201-
getS3Client()
212+
profiles = set[str | None]()
213+
for path in uris:
214+
if path.scheme == "s3":
215+
path = cast(S3ResourcePath, path)
216+
profiles.add(path.profile)
217+
for profile in profiles:
218+
getS3Client(profile)
202219

203220
return super()._mexists(uris)
204221

@@ -207,16 +224,16 @@ def exists(self) -> bool:
207224
"""Check that the S3 resource exists."""
208225
if self.is_root:
209226
# Only check for the bucket since the path is irrelevant
210-
return bucketExists(self.netloc)
211-
exists, _ = s3CheckFileExists(self, client=self.client)
227+
return bucketExists(self.bucket, self.client)
228+
exists, _ = s3CheckFileExists(self, bucket=self.bucket, client=self.client)
212229
return exists
213230

214231
@backoff.on_exception(backoff.expo, retryable_io_errors, max_time=max_retry_time)
215232
def size(self) -> int:
216233
"""Return the size of the resource in bytes."""
217234
if self.dirLike:
218235
return 0
219-
exists, sz = s3CheckFileExists(self, client=self.client)
236+
exists, sz = s3CheckFileExists(self, bucket=self.bucket, client=self.client)
220237
if not exists:
221238
raise FileNotFoundError(f"Resource {self} does not exist")
222239
return sz
@@ -229,7 +246,7 @@ def remove(self) -> None:
229246
# for checking all the keys again, reponse is HTTP 204 OK
230247
# response all the time
231248
try:
232-
self.client.delete_object(Bucket=self.netloc, Key=self.relativeToPathRoot)
249+
self.client.delete_object(Bucket=self.bucket, Key=self.relativeToPathRoot)
233250
except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
234251
raise FileNotFoundError("No such resource: {self}") from err
235252

@@ -239,7 +256,7 @@ def read(self, size: int = -1) -> bytes:
239256
if size > 0:
240257
args["Range"] = f"bytes=0-{size-1}"
241258
try:
242-
response = self.client.get_object(Bucket=self.netloc, Key=self.relativeToPathRoot, **args)
259+
response = self.client.get_object(Bucket=self.bucket, Key=self.relativeToPathRoot, **args)
243260
except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
244261
raise FileNotFoundError(f"No such resource: {self}") from err
245262
except ClientError as err:
@@ -255,20 +272,20 @@ def write(self, data: bytes, overwrite: bool = True) -> None:
255272
if not overwrite and self.exists():
256273
raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled")
257274
with time_this(log, msg="Write to %s", args=(self,)):
258-
self.client.put_object(Bucket=self.netloc, Key=self.relativeToPathRoot, Body=data)
275+
self.client.put_object(Bucket=self.bucket, Key=self.relativeToPathRoot, Body=data)
259276

260277
@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
261278
def mkdir(self) -> None:
262279
"""Write a directory key to S3."""
263-
if not bucketExists(self.netloc):
264-
raise ValueError(f"Bucket {self.netloc} does not exist for {self}!")
280+
if not bucketExists(self.bucket, self.client):
281+
raise ValueError(f"Bucket {self.bucket} does not exist for {self}!")
265282

266283
if not self.dirLike:
267284
raise NotADirectoryError(f"Can not create a 'directory' for file-like URI {self}")
268285

269286
# don't create S3 key when root is at the top-level of an Bucket
270287
if self.path != "/":
271-
self.client.put_object(Bucket=self.netloc, Key=self.relativeToPathRoot)
288+
self.client.put_object(Bucket=self.bucket, Key=self.relativeToPathRoot)
272289

273290
@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
274291
def _download_file(self, local_file: IO, progress: ProgressPercentage | None) -> None:
@@ -279,7 +296,7 @@ def _download_file(self, local_file: IO, progress: ProgressPercentage | None) ->
279296
"""
280297
try:
281298
self.client.download_fileobj(
282-
self.netloc,
299+
self.bucket,
283300
self.relativeToPathRoot,
284301
local_file,
285302
Callback=progress,
@@ -324,7 +341,7 @@ def _upload_file(self, local_file: ResourcePath, progress: ProgressPercentage |
324341
"""
325342
try:
326343
self.client.upload_file(
327-
local_file.ospath, self.netloc, self.relativeToPathRoot, Callback=progress
344+
local_file.ospath, self.bucket, self.relativeToPathRoot, Callback=progress
328345
)
329346
except self.client.exceptions.NoSuchBucket as err:
330347
raise NotADirectoryError(f"Target does not exist: {err}") from err
@@ -333,13 +350,13 @@ def _upload_file(self, local_file: ResourcePath, progress: ProgressPercentage |
333350
raise
334351

335352
@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
336-
def _copy_from(self, src: ResourcePath) -> None:
353+
def _copy_from(self, src: S3ResourcePath) -> None:
337354
copy_source = {
338-
"Bucket": src.netloc,
355+
"Bucket": src.bucket,
339356
"Key": src.relativeToPathRoot,
340357
}
341358
try:
342-
self.client.copy_object(CopySource=copy_source, Bucket=self.netloc, Key=self.relativeToPathRoot)
359+
self.client.copy_object(CopySource=copy_source, Bucket=self.bucket, Key=self.relativeToPathRoot)
343360
except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
344361
raise FileNotFoundError("No such resource to transfer: {self}") from err
345362
except ClientError as err:
@@ -469,7 +486,7 @@ def walk(
469486
filenames = []
470487
files_there = False
471488

472-
for page in s3_paginator.paginate(Bucket=self.netloc, Prefix=prefix, Delimiter="/"):
489+
for page in s3_paginator.paginate(Bucket=self.bucket, Prefix=prefix, Delimiter="/"):
473490
# All results are returned as full key names and we must
474491
# convert them back to the root form. The prefix is fixed
475492
# and delimited so that is a simple trim
@@ -507,7 +524,7 @@ def _openImpl(
507524
*,
508525
encoding: str | None = None,
509526
) -> Iterator[ResourceHandleProtocol]:
510-
with S3ResourceHandle(mode, log, self.client, self.netloc, self.relativeToPathRoot) as handle:
527+
with S3ResourceHandle(mode, log, self.client, self.bucket, self.relativeToPathRoot) as handle:
511528
if "b" in mode:
512529
yield handle
513530
else:
@@ -529,6 +546,6 @@ def generate_presigned_put_url(self, *, expiration_time_seconds: int) -> str:
529546
def _generate_presigned_url(self, method: str, expiration_time_seconds: int) -> str:
530547
return self.client.generate_presigned_url(
531548
method,
532-
Params={"Bucket": self.netloc, "Key": self.relativeToPathRoot},
549+
Params={"Bucket": self.bucket, "Key": self.relativeToPathRoot},
533550
ExpiresIn=expiration_time_seconds,
534551
)

python/lsst/resources/s3utils.py

+72-10
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@
3434
from contextlib import contextmanager
3535
from http.client import HTTPException, ImproperConnectionState
3636
from types import ModuleType
37-
from typing import TYPE_CHECKING, Any, cast
37+
from typing import TYPE_CHECKING, Any, NamedTuple, cast
3838
from unittest.mock import patch
3939

4040
from botocore.exceptions import ClientError
4141
from botocore.handlers import validate_bucket_name
4242
from deprecated.sphinx import deprecated
4343
from urllib3.exceptions import HTTPError, RequestError
44+
from urllib3.util import Url, parse_url
4445

4546
if TYPE_CHECKING:
4647
from unittest import TestCase
@@ -178,18 +179,32 @@ def clean_test_environment_for_s3() -> Iterator[None]:
178179
yield
179180

180181

181-
def getS3Client() -> boto3.client:
182+
def getS3Client(profile: str | None = None) -> boto3.client:
182183
"""Create a S3 client with AWS (default) or the specified endpoint.
183184
185+
Parameters
186+
----------
187+
profile : `str`, optional
188+
The name of an S3 profile describing which S3 service to use.
189+
184190
Returns
185191
-------
186192
s3client : `botocore.client.S3`
187193
A client of the S3 service.
188194
189195
Notes
190196
-----
191-
The endpoint URL is from the environment variable S3_ENDPOINT_URL.
192-
If none is specified, the default AWS one is used.
197+
If an explicit profile name is specified, its configuration is read from an
198+
environment variable named ``LSST_RESOURCES_S3_PROFILE_<profile>``. Note
199+
that the name of the profile is case sensitive. This configuration is
200+
specified in the format:
201+
``https://<access key ID>:<secret key>@<s3 endpoint hostname>``
202+
203+
The access key ID and secret key are optional -- if not specified, they
204+
will be looked up via the AWS credentials file.
205+
206+
If profile is `None`, this configuration is from the environment variable
207+
S3_ENDPOINT_URL. If none is specified, the default AWS one is used.
193208
194209
If the environment variable LSST_DISABLE_BUCKET_VALIDATION exists
195210
and has a value that is not empty, "0", "f", "n", or "false"
@@ -202,26 +217,72 @@ def getS3Client() -> boto3.client:
202217
if botocore is None:
203218
raise ModuleNotFoundError("Could not find botocore. Are you sure it is installed?")
204219

205-
endpoint = os.environ.get("S3_ENDPOINT_URL", None)
206-
if not endpoint:
207-
endpoint = None # Handle ""
220+
if profile is None:
221+
endpoint = os.environ.get("S3_ENDPOINT_URL", None)
222+
if not endpoint:
223+
endpoint = None # Handle ""
224+
else:
225+
var_name = f"LSST_RESOURCES_S3_PROFILE_{profile}"
226+
endpoint = os.environ.get(var_name, None)
227+
if not endpoint:
228+
raise RuntimeError(
229+
f"No configuration found for requested S3 profile '{profile}'."
230+
f" Set the environment variable '{var_name}' to configure it."
231+
)
232+
208233
disable_value = os.environ.get("LSST_DISABLE_BUCKET_VALIDATION", "0")
209234
skip_validation = not re.search(r"^(0|f|n|false)?$", disable_value, re.I)
210235

211236
return _get_s3_client(endpoint, skip_validation)
212237

213238

214239
@functools.lru_cache
215-
def _get_s3_client(endpoint: str, skip_validation: bool) -> boto3.client:
240+
def _get_s3_client(endpoint: str | None, skip_validation: bool) -> boto3.client:
216241
# Helper function to cache the client for this endpoint
217242
config = botocore.config.Config(read_timeout=180, retries={"mode": "adaptive", "max_attempts": 10})
218243

219-
client = boto3.client("s3", endpoint_url=endpoint, config=config)
244+
endpoint_config = _parse_endpoint_config(endpoint)
245+
246+
client = boto3.client(
247+
"s3",
248+
endpoint_url=endpoint_config.endpoint_url,
249+
aws_access_key_id=endpoint_config.access_key_id,
250+
aws_secret_access_key=endpoint_config.secret_access_key,
251+
config=config,
252+
)
220253
if skip_validation:
221254
client.meta.events.unregister("before-parameter-build.s3", validate_bucket_name)
222255
return client
223256

224257

258+
class _EndpointConfig(NamedTuple):
259+
endpoint_url: str | None = None
260+
access_key_id: str | None = None
261+
secret_access_key: str | None = None
262+
263+
264+
def _parse_endpoint_config(endpoint: str | None) -> _EndpointConfig:
265+
if not endpoint:
266+
return _EndpointConfig()
267+
268+
parsed = parse_url(endpoint)
269+
270+
# Strip the username/password portion of the URL from the result.
271+
endpoint_url = Url(host=parsed.host, path=parsed.path, port=parsed.port, scheme=parsed.scheme).url
272+
273+
access_key_id = None
274+
secret_access_key = None
275+
if parsed.auth:
276+
split = parsed.auth.split(":")
277+
if len(split) != 2:
278+
raise ValueError("S3 access key and secret not in expected format.")
279+
access_key_id, secret_access_key = split
280+
281+
return _EndpointConfig(
282+
endpoint_url=endpoint_url, access_key_id=access_key_id, secret_access_key=secret_access_key
283+
)
284+
285+
225286
def s3CheckFileExists(
226287
path: Location | ResourcePath | str,
227288
bucket: str | None = None,
@@ -268,7 +329,8 @@ def s3CheckFileExists(
268329
bucket = uri.netloc
269330
filepath = uri.relativeToPathRoot
270331
elif isinstance(path, ResourcePath | Location):
271-
bucket = path.netloc
332+
if bucket is None:
333+
bucket = path.netloc
272334
filepath = path.relativeToPathRoot
273335
else:
274336
raise TypeError(f"Unsupported path type: {path!r}.")

0 commit comments

Comments
 (0)