Skip to content

Commit

Permalink
Merge pull request #249 from italia/multipleta
Browse files Browse the repository at this point in the history
Multiple TAs - Gain PoC interop stage
  • Loading branch information
Giuseppe De Marco authored Feb 14, 2023
2 parents 725967e + b78d01e commit 48a191d
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 62 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ jobs:
fail-fast: false
matrix:
python-version:
- '3.8'
- '3.9'
- '3.10'

steps:
Expand Down
2 changes: 1 addition & 1 deletion spid_cie_oidc/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.8.10"
__version__ = "0.8.11"
7 changes: 6 additions & 1 deletion spid_cie_oidc/entity/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,12 @@ def validate_by_superiors(
logger.info(f"Getting entity statements from {_url}")
jwts = get_entity_statements([_url], self.httpc_params)
jwt = jwts[0]
self.validate_by_superior_statement(jwt, ec)
if jwt:
self.validate_by_superior_statement(jwt, ec)
else:
logger.error(
f"Empty response for {_url}"
)

return self.verified_by_superiors

Expand Down
4 changes: 3 additions & 1 deletion spid_cie_oidc/entity/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@


import datetime
import json
import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -38,7 +39,8 @@ def get_jwks(metadata: dict, federation_jwks:list = []) -> dict:
jwks_uri = metadata["jwks_uri"]
jwks_list = get_http_url(
[jwks_uri], httpc_params=HTTPC_PARAMS
).json()
)
jwks_list = json.loads(jwks_list[0])
except Exception as e:
logger.error(f"Failed to download jwks from {jwks_uri}: {e}")
elif metadata.get('signed_jwks_uri'):
Expand Down
5 changes: 2 additions & 3 deletions spid_cie_oidc/provider/schemas/authn_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ class AuthenticationRequest(BaseModel):
iat: int
exp: Optional[int]
jti: Optional[str]
aud: List[HttpUrl]
aud: str | List[HttpUrl]
acr_values: List[AcrValues]
prompt: Optional[Literal["consent", "consent login"]]

@validator("claims")
def validate_claims(cls, claims):
Expand All @@ -174,7 +175,6 @@ def validate_scope(cls, scope):

class AuthenticationRequestSpid(AuthenticationRequest):
scope: List[ScopeSpid]
prompt: Literal["consent", "consent login"]

def get_claims() -> dict:
return CLAIMS_SPID
Expand Down Expand Up @@ -204,7 +204,6 @@ def example(): # pragma: no cover

class AuthenticationRequestCie(AuthenticationRequest):
scope: List[ScopeCie]
prompt: Literal["consent", "consent login"]

def get_claims() -> dict:
return CLAIMS_CIE
Expand Down
34 changes: 17 additions & 17 deletions spid_cie_oidc/provider/tests/test_02_authn_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def setUp(self):
is_active=True,
)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_unknown_error(self):
jws = create_jws(self.REQUEST_OBJECT_PAYLOAD, RP_METADATA_JWK1)
client = Client()
Expand All @@ -88,7 +88,7 @@ def test_auth_request_unknown_error(self):
self.assertTrue(res.status_code == 302)
self.assertTrue("error=invalid_request" in res.url)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
@override_settings(OIDCFED_DEFAULT_PROVIDER_PROFILE="cie")
def test_auth_request_id_token_claim(self):
jws = create_jws(self.REQUEST_OBJECT_PAYLOAD, RP_METADATA_JWK1)
Expand All @@ -115,14 +115,14 @@ def test_auth_request_id_token_claim(self):
self.assertEqual(id_token.get("email"), "[email protected]")


@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_no_request(self):
client = Client()
url = reverse("oidc_provider_authnrequest")
res = client.get(url, {})
self.assertTrue(res.status_code == 400)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_ok(self):
jws = create_jws(self.REQUEST_OBJECT_PAYLOAD, RP_METADATA_JWK1)
client = Client()
Expand All @@ -144,7 +144,7 @@ def test_auth_request_ok(self):
self.assertTrue(res.status_code == 302)
self.assertTrue("code" in res.url)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_user_rejected_consent(self):
jws = create_jws(self.REQUEST_OBJECT_PAYLOAD, RP_METADATA_JWK1)
client = Client()
Expand All @@ -167,7 +167,7 @@ def test_auth_request_user_rejected_consent(self):
# TODO: this is not normative
self.assertTrue("error=rejected_by_user" in res.url)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_no_session_in_post_consent(self):
jws = create_jws(self.REQUEST_OBJECT_PAYLOAD, RP_METADATA_JWK1)
client = Client()
Expand All @@ -189,7 +189,7 @@ def test_auth_request_no_session_in_post_consent(self):
res = client.post(consent_page_url, {"agree": True})
self.assertTrue(res.status_code == 403)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_no_session_in_get_consent(self):
jws = create_jws(self.REQUEST_OBJECT_PAYLOAD, RP_METADATA_JWK1)
client = Client()
Expand All @@ -213,7 +213,7 @@ def test_auth_request_no_session_in_get_consent(self):
res = client.get(consent_page_url)
self.assertTrue(res.status_code == 403)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_auth_code_already_used(self):
jws = create_jws(self.REQUEST_OBJECT_PAYLOAD, RP_METADATA_JWK1)
client = Client()
Expand All @@ -233,7 +233,7 @@ def test_auth_request_auth_code_already_used(self):
res = client.get(consent_page_url)
self.assertTrue(res.status_code == 403)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_wrong_login(self):
jws = create_jws(self.REQUEST_OBJECT_PAYLOAD, RP_METADATA_JWK1)
client = Client()
Expand All @@ -247,7 +247,7 @@ def test_auth_request_wrong_login(self):
)
self.assertIn("error", res.content.decode())

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_preexistent_authz(self):
jws = create_jws(self.REQUEST_OBJECT_PAYLOAD, RP_METADATA_JWK1)
client = Client()
Expand All @@ -264,7 +264,7 @@ def test_auth_request_preexistent_authz(self):
self.assertIn("error=invalid_request", res.url)
self.assertIn("state", res.url)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_trust_chain_no_active(self):
self.trust_chain.is_active = False
self.trust_chain.save()
Expand All @@ -276,7 +276,7 @@ def test_auth_request_trust_chain_no_active(self):
self.assertIn("error=invalid_request", res.url)
self.assertIn("state", res.url)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_invalid_jwk(self):
jws = create_jws(self.REQUEST_OBJECT_PAYLOAD, RP_METADATA_JWK1)
get_jwks(self.trust_chain.metadata['openid_relying_party'])[0][
Expand All @@ -294,7 +294,7 @@ def test_auth_request_invalid_jwk(self):
] = RP_METADATA_JWK1['kid']
self.trust_chain.save()

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_no_correct_payload(self):
NO_CORRECT_OBJECT_PAYLOAD = deepcopy(self.REQUEST_OBJECT_PAYLOAD)
NO_CORRECT_OBJECT_PAYLOAD["response_type"] = "test"
Expand All @@ -306,7 +306,7 @@ def test_auth_request_no_correct_payload(self):
self.assertIn("error", res.url)
self.assertIn("state", res.url)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_invalid_session(self):
client = Client()
url = reverse("oidc_provider_consent")
Expand All @@ -315,7 +315,7 @@ def test_auth_request_invalid_session(self):
res = client.post(url)
self.assertTrue(res.status_code == 403)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_no_correct_refresh_request(self):
local_payload = deepcopy(self.REQUEST_OBJECT_PAYLOAD)
local_payload["scope"] = "openid offline_access"
Expand All @@ -326,7 +326,7 @@ def test_auth_request_no_correct_refresh_request(self):
res = client.get(url, {"request": jws})
self.assertTrue(res.status_code == 403)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_user_staff(self):
self.user.is_staff = True
self.user.save()
Expand All @@ -339,7 +339,7 @@ def test_auth_request_user_staff(self):
self.assertTrue(res.status_code == 302)
self.assertTrue("/oidc/op/rp-test/landing/" == res.url)

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_auth_request_no_correct_authz_request(self):
self.user.is_staff = True
self.user.save()
Expand Down
9 changes: 5 additions & 4 deletions spid_cie_oidc/provider/tests/test_03_authn_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ def test_validate_spid_no_nonce(self):
def test_validate_spid_no_correct_nonce(self):
with self.assertRaises(ValidationError):
AuthenticationRequestSpid(**AUTHN_REQUEST_SPID_NO_CORRECT_NONCE)

def test_validate_spid_no_prompt(self):
with self.assertRaises(ValidationError):
AuthenticationRequestSpid(**AUTHN_REQUEST_SPID_NO_PROMPT)

# removed, relaxed with the GAIN-PoC integration
# def test_validate_spid_no_prompt(self):
# with self.assertRaises(ValidationError):
# AuthenticationRequestSpid(**AUTHN_REQUEST_SPID_NO_PROMPT)

def test_validate_spid_no_correct_prompt(self):
with self.assertRaises(ValidationError):
Expand Down
55 changes: 42 additions & 13 deletions spid_cie_oidc/provider/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
from spid_cie_oidc.entity.trust_chain_operations import get_or_create_trust_chain
from spid_cie_oidc.entity.utils import datetime_from_timestamp, exp_from_now, iat_now
from spid_cie_oidc.entity.utils import get_jwks
from spid_cie_oidc.provider.exceptions import AuthzRequestReplay, ExpiredAuthCode, InvalidSession, RevokedSession, ValidationException
from spid_cie_oidc.entity.exceptions import TrustchainMissingMetadata
from spid_cie_oidc.provider.exceptions import (
AuthzRequestReplay,
ExpiredAuthCode,
InvalidSession,
RevokedSession,
ValidationException
)
from spid_cie_oidc.provider.models import OidcSession

from spid_cie_oidc.provider.settings import (
Expand All @@ -32,12 +39,17 @@ class OpBase:
"""

def redirect_response_data(self, redirect_uri:str, **kwargs) -> HttpResponseRedirect:
url = f'{redirect_uri}?{urllib.parse.urlencode(kwargs)}'
if "?" in redirect_uri:
qstring = "&"
else:
qstring = "?"
url = f'{redirect_uri}{qstring}{urllib.parse.urlencode(kwargs)}'
return HttpResponseRedirect(url)

def find_jwk(self, header: dict, jwks: list) -> dict:
for jwk in jwks:
if header["kid"] == jwk["kid"]:
valid_jwk = jwk.get("kid", None)
if valid_jwk and header["kid"] == valid_jwk:
return jwk

def validate_authz_request_object(self, req) -> TrustChain:
Expand All @@ -58,7 +70,7 @@ def validate_authz_request_object(self, req) -> TrustChain:
rp_trust_chain = TrustChain.objects.filter(
metadata__openid_relying_party__isnull=False,
sub=self.payload["iss"],
trust_anchor__sub=settings.OIDCFED_DEFAULT_TRUST_ANCHOR
trust_anchor__sub__in=settings.OIDCFED_TRUST_ANCHORS
).first()
if rp_trust_chain and not rp_trust_chain.is_active:
_msg = (
Expand All @@ -70,15 +82,26 @@ def validate_authz_request_object(self, req) -> TrustChain:
raise Exception(_msg)

elif not rp_trust_chain or rp_trust_chain.is_expired:
rp_trust_chain = get_or_create_trust_chain(
subject=self.payload["iss"],
trust_anchor=settings.OIDCFED_DEFAULT_TRUST_ANCHOR,
httpc_params=HTTPC_PARAMS,
required_trust_marks=getattr(
settings, "OIDCFED_REQUIRED_TRUST_MARKS", []
),
)
if not rp_trust_chain.is_valid:
rp_trust_chain = None
# TODO: get async here
for ta in settings.OIDCFED_TRUST_ANCHORS:
try:
rp_trust_chain = get_or_create_trust_chain(
subject=self.payload["iss"],
trust_anchor=ta,
httpc_params=HTTPC_PARAMS,
required_trust_marks=getattr(
settings, "OIDCFED_REQUIRED_TRUST_MARKS", []
),
)
if rp_trust_chain and rp_trust_chain.metadata:
break
except TrustchainMissingMetadata as e:
logger.debug(f"TrustchainMissingMetadata: {e}")
# unless we find the good TA
continue

if not rp_trust_chain or not rp_trust_chain.is_valid:
_msg = (
f"Failed trust chain validation for {self.payload['iss']}. "
"error=unauthorized_client, "
Expand Down Expand Up @@ -346,6 +369,12 @@ def attributes_names_to_release(self, request, session: OidcSession) -> dict:
for claim in claims:
if claim in user_claims:
filtered_user_claims[claim] = user_claims[claim]
# IDA support/overload of the claims, the verified has priorities and overwrite the unverified
if "verified_claims" in claims:
claims = claims["verified_claims"].get('claims', {})
for claim in claims:
if claim in user_claims:
filtered_user_claims[claim] = user_claims[claim]

# mapping with human names
i18n_user_claims = [
Expand Down
2 changes: 1 addition & 1 deletion spid_cie_oidc/provider/views/authz_request_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def get(self, request, *args, **kwargs):
return HttpResponseForbidden()

acr_value = AcrValues(self.payload["acr_values"][0])
prompt = self.payload["prompt"]
prompt = self.payload.get("prompt", "login")
if request.user:
if (
request.user.is_authenticated and
Expand Down
2 changes: 1 addition & 1 deletion spid_cie_oidc/provider/views/userinfo_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def get(self, request, *args, **kwargs):
).first()
if not rp_tc:
return HttpResponseForbidden()

issuer = self.get_issuer()
access_token_data = unpad_jwt_payload(token.access_token)

Expand Down
6 changes: 3 additions & 3 deletions spid_cie_oidc/relying_party/tests/test_03_rp_begin.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ def test_no_request(self):
self.assertTrue("Missing provider url" in res.content.decode())

# I changed the code to get a smarter solution
@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB)
@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB, OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_no_unallowed_tc(self):
client = Client()
url = reverse("spid_cie_rp_begin")
res = client.get(url, {"provider": "provider"})
self.assertTrue(res.status_code == 404)
self.assertTrue(res.status_code == 403)
self.assertTrue("request rejected" in res.content.decode())
self.assertTrue("Unallowed Trust Anchor" in res.content.decode())
# self.assertTrue("Unallowed Trust Anchor" in res.content.decode())

@override_settings(OIDCFED_DEFAULT_TRUST_ANCHOR=TA_SUB, OIDCFED_TRUST_ANCHORS=[TA_SUB])
def test_no_rp_entity_conf(self):
Expand Down
22 changes: 13 additions & 9 deletions spid_cie_oidc/relying_party/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,21 @@ def get_oidc_op(self, request) -> TrustChain:
raise InvalidTrustchain(
"Missing provider url. Please try '?provider=https://provider-subject/'"
)
trust_anchor = request.GET.get(
"trust_anchor",
settings.OIDCFED_IDENTITY_PROVIDERS.get(
request.GET["provider"],
settings.OIDCFED_DEFAULT_TRUST_ANCHOR
)
)

if trust_anchor not in settings.OIDCFED_TRUST_ANCHORS:

trust_anchor = request.GET.get("trust_anchor", None)
if trust_anchor != None and trust_anchor not in settings.OIDCFED_TRUST_ANCHORS:
logger.warning("Unallowed Trust Anchor")
raise InvalidTrustchain("Unallowed Trust Anchor")

if not trust_anchor:
for profile,value in settings.OIDCFED_IDENTITY_PROVIDERS.items():
if request.GET["provider"] in value:
trust_anchor = value[request.GET["provider"]]

if not trust_anchor:
trust_anchor = settings.OIDCFED_DEFAULT_TRUST_ANCHOR



tc = TrustChain.objects.filter(
sub=request.GET["provider"],
Expand Down
Loading

0 comments on commit 48a191d

Please sign in to comment.