From 6dd2278c81d0f3fea5512d6e8639afc89f6a5ca4 Mon Sep 17 00:00:00 2001 From: roland Date: Mon, 19 Dec 2016 15:01:39 +0100 Subject: [PATCH] Making federated client request and handling such a request now works. --- src/oic/extension/fed_client.py | 34 +++-- src/oic/extension/fed_provider.py | 5 +- tests/test_oic_fed_client.py | 110 ---------------- tests/test_oic_fed_entities.py | 207 ++++++++++++++++++++++++++++++ 4 files changed, 230 insertions(+), 126 deletions(-) delete mode 100644 tests/test_oic_fed_client.py create mode 100644 tests/test_oic_fed_entities.py diff --git a/src/oic/extension/fed_client.py b/src/oic/extension/fed_client.py index ad1b2b52b..225391e9d 100644 --- a/src/oic/extension/fed_client.py +++ b/src/oic/extension/fed_client.py @@ -79,6 +79,25 @@ def handle_registration_info(self, response): return resp + def federated_client_registration_request(self, **kwargs): + req = ClientMetadataStatement() + + try: + pp = kwargs['fo_pattern'] + except KeyError: + pp = '.' + req['metadata_statements'] = self.pick_signed_metadata_statements(pp) + + try: + req['redirect_uris'] = kwargs['redirect_uris'] + except KeyError: + try: + req["redirect_uris"] = self.redirect_uris + except AttributeError: + raise MissingRequiredAttribute("redirect_uris", kwargs) + + return req + def register(self, url, **kwargs): try: reg_type = kwargs['registration_type'] @@ -86,20 +105,7 @@ def register(self, url, **kwargs): reg_type = 'core' if reg_type == 'federation': - req = ClientMetadataStatement() - - try: - pp = kwargs['fo_pattern'] - except KeyError: - pp = '.' - req['metadata_statements'] = self.pick_signed_metadata_statements(pp) - - if "redirect_uris" not in kwargs: - try: - req["redirect_uris"] = self.redirect_uris - except AttributeError: - raise MissingRequiredAttribute("redirect_uris", kwargs) - + req = self.federated_client_registration_request(**kwargs) else: req = self.create_registration_request(**kwargs) diff --git a/src/oic/extension/fed_provider.py b/src/oic/extension/fed_provider.py index 254ff3398..7e916cfb7 100644 --- a/src/oic/extension/fed_provider.py +++ b/src/oic/extension/fed_provider.py @@ -3,7 +3,7 @@ from oic.oic import provider from oic.extension.oidc_fed import ClientMetadataStatement from oic.extension.oidc_fed import FederationEntity -from oic.oic.message import OpenIDSchema +from oic.oic.message import OpenIDSchema, RegistrationRequest from oic.utils.http_util import Created from oic.utils.http_util import Response from oic.utils.sanitize import sanitize @@ -46,7 +46,8 @@ def registration_endpoint(self, request, authn=None, **kwargs): logger.info("registration_request:%s" % sanitize(request.to_dict())) - request = self.get_metadata_statement(request) + request_args = self.get_metadata_statement(request) + request = RegistrationRequest(**request_args) result = self.client_registration_setup(request) if isinstance(result, Response): diff --git a/tests/test_oic_fed_client.py b/tests/test_oic_fed_client.py deleted file mode 100644 index ce0387bcb..000000000 --- a/tests/test_oic_fed_client.py +++ /dev/null @@ -1,110 +0,0 @@ -import os - -import pytest -from jwkest.jwk import rsa_load -from oic.extension.oidc_fed import Operator, ClientMetadataStatement - -from oic.oic import DEF_SIGN_ALG - -from oic.utils.keyio import KeyBundle, build_keyjar, KeyJar - -from oic.extension.fed_client import Client -from oic.utils.authn.client import CLIENT_AUTHN_METHOD - -BASE_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "data/keys")) -_key = rsa_load(os.path.join(BASE_PATH, "rsa.key")) -KC_RSA = KeyBundle({"key": _key, "kty": "RSA", "use": "sig"}) - -CLIENT_ID = "client_1" - -KEYDEFS = [ - {"type": "RSA", "key": '', "use": ["sig"]}, - {"type": "EC", "crv": "P-256", "use": ["sig"]} -] - -KEYS = {} -ISSUER = {} -OPERATOR = {} - -for entity in ['fo', 'fo1', 'org', 'inter', 'admin', 'ligo', 'op']: - fname = os.path.join(BASE_PATH, "{}.key".format(entity)) - _keydef = KEYDEFS[:] - _keydef[0]['key'] = fname - - _jwks, _keyjar, _kidd = build_keyjar(_keydef) - KEYS[entity] = {'jwks': _jwks, 'keyjar': _keyjar, 'kidd': _kidd} - ISSUER[entity] = 'https://{}.example.org'.format(entity) - OPERATOR[entity] = Operator(keyjar=_keyjar, iss=ISSUER[entity], jwks=_jwks) - -FOP = OPERATOR['fo'] -FOP.fo_keyjar = FOP.keyjar -FO1P = OPERATOR['fo1'] -FO1P.fo_keyjar = FO1P.keyjar -ORGOP = OPERATOR['org'] -ADMINOP = OPERATOR['admin'] -INTEROP = OPERATOR['inter'] -LIGOOP = OPERATOR['ligo'] -OPOP = OPERATOR['op'] - - -def fo_member(*args): - _kj = KeyJar() - for fo in args: - _kj.import_jwks(fo.jwks, fo.iss) - - return Operator(fo_keyjar=_kj) - - -def create_compound_metadata_statement(spec): - _ms = None - for op, op_args, signer, sig_args in spec: - _cms = ClientMetadataStatement(signing_keys=op.jwks, **op_args) - if _ms: - sig_args['metadata_statements'] = [_ms] - _ms = signer.pack_metadata_statement(_cms, **sig_args) - return _ms - -SPEC = [ - [ORGOP, {'contacts':['info@example.com']}, - FOP, {'alg':'RS256', 'scope':['openid']}], - [INTEROP, {'tos_uri':['https://inter.example.com/tos.html']}, - ORGOP, {'alg':'RS256'}], - [ADMINOP, {'redirect_uris':['https://rp.example.com/auth_cb']}, - INTEROP, {'alg':'RS256'}] -] - - -class TestClient(object): - @pytest.fixture(autouse=True) - def create_client(self): - sms = [create_compound_metadata_statement(SPEC)] - self.redirect_uri = "http://example.com/redirect" - self.client = Client(CLIENT_ID, - client_authn_method=CLIENT_AUTHN_METHOD, - fo_keyjar=fo_member(FOP, FO1P).fo_keyjar, - signed_metadata_statements=sms, - fo_priority_order=[FOP.iss, FO1P.iss] - ) - self.client.redirect_uris = [self.redirect_uri] - self.client.authorization_endpoint = \ - "http://example.com/authorization" - self.client.token_endpoint = "http://example.com/token" - self.client.userinfo_endpoint = "http://example.com/userinfo" - self.client.client_secret = "abcdefghijklmnop" - self.client.keyjar[""] = KC_RSA - self.client.behaviour = { - "request_object_signing_alg": DEF_SIGN_ALG[ - "openid_request_object"]} - - def test_init(self): - receiver = fo_member(FOP, FO1P) - ms = receiver.unpack_metadata_statement( - jwt_ms=self.client.signed_metadata_statements[0]) - res = receiver.evaluate_metadata_statement(ms) - assert FOP.iss in res - - def test_create_registration_request(self): - self.client.create_registration_request( - redirect_uris=['https://rp.example.com/auth_cb'], - metadata_statements=[self.client.signed_metadata_statements] - ) \ No newline at end of file diff --git a/tests/test_oic_fed_entities.py b/tests/test_oic_fed_entities.py new file mode 100644 index 000000000..fb140ad73 --- /dev/null +++ b/tests/test_oic_fed_entities.py @@ -0,0 +1,207 @@ +import json +import os +from time import time + +import pytest +from jwkest.jwk import rsa_load +from oic.oic.message import RegistrationResponse + +from oic import rndstr +from oic.oic import DEF_SIGN_ALG +from oic.extension.fed_client import Client +from oic.extension.fed_provider import Provider +from oic.extension.oidc_fed import ClientMetadataStatement +from oic.extension.oidc_fed import Operator +from oic.utils.authn.authn_context import AuthnBroker +from oic.utils.authn.client import CLIENT_AUTHN_METHOD +from oic.utils.authn.client import verify_client +from oic.utils.authn.user import UserAuthnMethod +from oic.utils.authz import AuthzHandling +from oic.utils.keyio import build_keyjar +from oic.utils.keyio import KeyBundle +from oic.utils.keyio import KeyJar +from oic.utils.sdb import SessionDB +from oic.utils.userinfo import UserInfo + +BASE_PATH = os.path.abspath( + os.path.join(os.path.dirname(__file__), "data/keys")) +_key = rsa_load(os.path.join(BASE_PATH, "rsa.key")) +KC_RSA = KeyBundle({"key": _key, "kty": "RSA", "use": "sig"}) + +CLIENT_ID = "client_1" + +KEYDEFS = [ + {"type": "RSA", "key": '', "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]} +] + +CONSUMER_CONFIG = { + "authz_page": "/authz", + "scope": ["openid"], + "response_type": ["code"], + "user_info": { + "name": None, + "email": None, + "nickname": None + }, + "request_method": "param" +} + +SERVER_INFO = { + "version": "3.0", + "issuer": "https://connect-op.heroku.com", + "authorization_endpoint": "http://localhost:8088/authorization", + "token_endpoint": "http://localhost:8088/token", + "flows_supported": ["code", "token", "code token"], +} + +BASE_PATH = os.path.abspath( + os.path.join(os.path.dirname(__file__), "data/keys")) + +KEYJAR = KeyJar() +KEYJAR[""] = KC_RSA + +CDB = {} + +USERDB = { + "username": { + "name": "Linda Lindgren", + "nickname": "Linda", + "email": "linda@example.com", + "verified": True, + "sub": "username" + } +} + +URLMAP = {CLIENT_ID: ["https://example.com/authz"]} + + +def _eq(l1, l2): + return set(l1) == set(l2) + + +class DummyAuthn(UserAuthnMethod): + def __init__(self, srv, user): + UserAuthnMethod.__init__(self, srv) + self.user = user + + def authenticated_as(self, cookie=None, **kwargs): + if cookie == "FAIL": + return None, 0 + else: + return {"uid": self.user}, time() + + +# AUTHN = UsernamePasswordMako(None, "login.mako", tl, PASSWD, "authenticated") +AUTHN_BROKER = AuthnBroker() +AUTHN_BROKER.add("UNDEFINED", DummyAuthn(None, "username")) + +# dealing with authorization +AUTHZ = AuthzHandling() +SYMKEY = rndstr(16) # symmetric key used to encrypt cookie info +USERINFO = UserInfo(USERDB) + +KEYS = {} +ISSUER = {} +OPERATOR = {} + +for entity in ['fo', 'fo1', 'org', 'inter', 'admin', 'ligo', 'op']: + fname = os.path.join(BASE_PATH, "{}.key".format(entity)) + _keydef = KEYDEFS[:] + _keydef[0]['key'] = fname + + _jwks, _keyjar, _kidd = build_keyjar(_keydef) + KEYS[entity] = {'jwks': _jwks, 'keyjar': _keyjar, 'kidd': _kidd} + ISSUER[entity] = 'https://{}.example.org'.format(entity) + OPERATOR[entity] = Operator(keyjar=_keyjar, iss=ISSUER[entity], jwks=_jwks) + +FOP = OPERATOR['fo'] +FOP.fo_keyjar = FOP.keyjar +FO1P = OPERATOR['fo1'] +FO1P.fo_keyjar = FO1P.keyjar +ORGOP = OPERATOR['org'] +ADMINOP = OPERATOR['admin'] +INTEROP = OPERATOR['inter'] +LIGOOP = OPERATOR['ligo'] +OPOP = OPERATOR['op'] + + +def fo_keyjar(*args): + _kj = KeyJar() + for fo in args: + _kj.import_jwks(fo.jwks, fo.iss) + return _kj + +def fo_member(*args): + return Operator(fo_keyjar=fo_keyjar(*args)) + + +def create_compound_metadata_statement(spec): + _ms = None + root_signer = '' + for op, op_args, signer, sig_args in spec: + _cms = ClientMetadataStatement(signing_keys=op.jwks, **op_args) + if _ms: + sig_args['metadata_statements'] = [_ms] + else: # root signed + root_signer = signer.iss + _ms = signer.pack_metadata_statement(_cms, **sig_args) + return root_signer, _ms + + +SPEC = [ + [ORGOP, {'contacts': ['info@example.com']}, + FOP, {'alg': 'RS256', 'scope': ['openid']}], + [INTEROP, {'tos_uri': ['https://rp.example.com/tos.html']}, + ORGOP, {'alg': 'RS256'}], + [ADMINOP, {'redirect_uris': ['https://rp.example.com/auth_cb']}, + INTEROP, {'alg': 'RS256'}] +] + + +class TestClient(object): + @pytest.fixture(autouse=True) + def create_client(self): + signer, ms = create_compound_metadata_statement(SPEC) + sms = {signer: [ms]} + + self.redirect_uri = "http://example.com/redirect" + self.client = Client(CLIENT_ID, + client_authn_method=CLIENT_AUTHN_METHOD, + fo_keyjar=fo_member(FOP, FO1P).fo_keyjar, + signed_metadata_statements=sms, + fo_priority_order=[FOP.iss, FO1P.iss] + ) + self.client.redirect_uris = [self.redirect_uri] + self.client.authorization_endpoint = \ + "http://example.com/authorization" + self.client.token_endpoint = "http://example.com/token" + self.client.userinfo_endpoint = "http://example.com/userinfo" + self.client.client_secret = "abcdefghijklmnop" + self.client.keyjar[""] = KC_RSA + self.client.behaviour = { + "request_object_signing_alg": DEF_SIGN_ALG[ + "openid_request_object"]} + + self.provider = Provider( + SERVER_INFO["issuer"], SessionDB(SERVER_INFO["issuer"]), CDB, + AUTHN_BROKER, USERINFO, AUTHZ, verify_client, SYMKEY, urlmap=URLMAP, + keyjar=KEYJAR, fo_keyjar=fo_keyjar(FOP, FO1P), + fo_priority_order=[FOP.iss, FO1P.iss]) + self.provider.baseurl = self.provider.name + + def test_init(self): + receiver = fo_member(FOP, FO1P) + ms = receiver.unpack_metadata_statement( + jwt_ms=self.client.signed_metadata_statements[FOP.iss][0]) + res = receiver.evaluate_metadata_statement(ms) + assert FOP.iss in res + + def test_create_registration_request(self): + req = self.client.federated_client_registration_request( + redirect_uris=['https://rp.example.com/auth_cb'] + ) + msg = self.provider.registration_endpoint(req.to_json()) + assert msg.status == '201 Created' + reqresp = RegistrationResponse(**json.loads(msg.message)) + assert reqresp['response_types'] == ['code'] \ No newline at end of file