Skip to content

Commit

Permalink
AmpelVault: restore get_named_secret (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
jvansanten authored Feb 7, 2025
1 parent 1fb1396 commit fe98790
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
3 changes: 2 additions & 1 deletion ampel/secret/AmpelVault.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def get_named_secret(self, label: str, ValueType: type[T]) -> None | NamedSecret

def get_named_secret(self, label, ValueType=object):
""" Returns a resolved NamedSecret using provided label """
ns = NamedSecret[ValueType](label=label) # type: ignore[valid-type]
with NamedSecret.resolve_with(None):
ns = NamedSecret[ValueType](label=label) # type: ignore[valid-type]
if self.resolve_secret(ns, ValueType):
return ns
return None
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "ampel-interface"
version = "0.10.5a2"
version = "0.10.5a3"
description = "Base classes for the Ampel analysis platform"
authors = ["Valery Brinnel"]
maintainers = ["Jakob van Santen <[email protected]>"]
Expand Down
11 changes: 10 additions & 1 deletion tests/test_AmpelVault.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,21 @@ def test_secret_validation_hook() -> None:
class HasSecret(AmpelBaseModel):
secret: NamedSecret[str]

with NamedSecret.resolve_with(AmpelVault([DummySecretProvider({"foo": "bar"})])):
vault = AmpelVault([DummySecretProvider({"foo": "bar"})])

with NamedSecret.resolve_with(vault):
resolved = HasSecret.model_validate({"secret": {"label": "foo"}})
unresolved = HasSecret.model_validate({"secret": {"label": "foo"}})
assert resolved.secret.get() == "bar"
with pytest.raises(ValueError, match="Secret not yet resolved"):
unresolved.secret.get()

# get_named_secret can be used to fetch optional secrets even in a resolving context
with NamedSecret.resolve_with(vault):
ns = vault.get_named_secret("foo", str)
assert ns is not None
assert ns.get() == "bar"
assert vault.get_named_secret("foo", int) is None


def test_secret_resolution() -> None:
Expand Down

0 comments on commit fe98790

Please sign in to comment.