diff --git a/src/eduid/scimapi/config.py b/src/eduid/scimapi/config.py index e074735a0..1c3ac7da1 100644 --- a/src/eduid/scimapi/config.py +++ b/src/eduid/scimapi/config.py @@ -60,6 +60,11 @@ class ScimApiConfig(RootConfig, LoggingConfigMixin, AWSMixin): scope_sudo: dict[ScopeName, set[ScopeName]] = Field(default={}) # The expected value of the authn JWT claims['requested_access']['type'] requested_access_type: Optional[str] = "scim-api" + # required saml assurance level for authentications with interaction auth_source + required_saml_assurance_level: list[str] = Field(default=["http://www.swamid.se/policy/assurance/al3"]) + # group name to match saml entitlement for authorization + account_manager_default_group: str = "Account Managers" + account_manager_group_mapping: dict[DataOwnerName, str] = Field(default={}) # Invite config invite_url: str = "" invite_expire: int = 180 * 86400 # 180 days diff --git a/src/eduid/scimapi/middleware.py b/src/eduid/scimapi/middleware.py index 0b11c4fd9..584a42d66 100644 --- a/src/eduid/scimapi/middleware.py +++ b/src/eduid/scimapi/middleware.py @@ -2,12 +2,13 @@ import logging import re from copy import copy +from enum import Enum from typing import Any, Mapping, Optional from fastapi import Request, Response from jwcrypto import jwt from jwcrypto.common import JWException -from pydantic import BaseModel, Field, StrictInt, ValidationError, validator +from pydantic import BaseModel, Field, StrictInt, ValidationError, root_validator, validator from starlette.datastructures import URL from starlette.middleware.base import BaseHTTPMiddleware from starlette.types import Message @@ -17,6 +18,16 @@ from eduid.scimapi.context import Context from eduid.scimapi.context_request import ContextRequestMixin from eduid.scimapi.exceptions import Unauthorized, http_error_detail_handler +from eduid.userdb.scimapi import ScimApiGroupDB + +logger = logging.getLogger(__name__) + + +class AuthSource(str, Enum): + INTERACTION = "interaction" + CONFIG = "config" + MDQ = "mdq" + TLSFED = "tlsfed" class SudoAccess(BaseModel): @@ -24,6 +35,14 @@ class SudoAccess(BaseModel): scope: ScopeName +class AuthenticationError(Exception): + pass + + +class AuthorizationError(Exception): + pass + + class RequestedAccessDenied(Exception): """Break out of get_data_owner when requested access (in the token) is not allowed""" @@ -37,8 +56,18 @@ class AuthnBearerToken(BaseModel): scim_config: ScimApiConfig # must be listed first, used in validators version: StrictInt + auth_source: AuthSource requested_access: list[SudoAccess] = Field(default=[]) scopes: set[ScopeName] = Field(default=set()) + # saml interaction claims + saml_issuer: Optional[str] = None + saml_assurance: Optional[list[str]] = None + saml_entitlement: Optional[list[str]] = None + saml_eppn: Optional[str] = None + saml_unique_id: Optional[str] = None + + # class Config: + # validate_assignment = True def __str__(self): return f"<{self.__class__.__name__}: scopes={self.scopes}, requested_access={self.requested_access}>" @@ -49,6 +78,13 @@ def validate_version(cls, v: int) -> int: raise ValueError("Unknown version") return v + @root_validator(pre=True) + def set_scopes_from_saml_data(cls, values: dict[str, Any]): + # Get scope from saml identifier if the auth source is interaction and set it as scopes + if values.get("auth_source") == AuthSource.INTERACTION.value: + values["scopes"] = cls._get_scope_from_saml_data(values=values) + return values + @validator("scopes") def validate_scopes(cls, v: set[ScopeName], values: Mapping[str, Any]) -> set[ScopeName]: config = values.get("scim_config") @@ -71,7 +107,67 @@ def validate_requested_access(cls, v: list[SudoAccess], values: Mapping[str, Any new_access += [this] return new_access - def get_data_owner(self, logger: logging.Logger) -> Optional[DataOwnerName]: + @staticmethod + def _get_scope_from_saml_data(values: Mapping[str, Any]) -> list[ScopeName]: + saml_identifier = values.get("saml_eppn") or values.get("saml_unique_id") + if not saml_identifier: + return [] + try: + scope = ScopeName(saml_identifier.split("@")[1]) + except IndexError: + return [] + logger.info(f"Scope from saml data: {scope}") + return [scope] + + def validate_auth_source(self) -> None: + """ + Check if the auth source is any of the one we know of. If the auth source is config, mdq or tlsfed we + can just let it through. If the auth source is interaction we need to check the saml data to make sure + the user is allowed access to the data owner. + """ + if self.auth_source in [AuthSource.CONFIG, AuthSource.MDQ, AuthSource.TLSFED]: + logger.info(f"{self.auth_source} is a trusted auth source") + return + + if self.auth_source == AuthSource.INTERACTION: + assurances = self.saml_assurance or [] + # validate that the authentication meets the required assurance level + for assurance_level in self.scim_config.required_saml_assurance_level: + if assurance_level in assurances: + logger.info(f"Allowed assurance level {assurance_level} is in saml data: {assurances}") + return + raise AuthenticationError( + f"Asserted SAML assurance level(s) ({assurances}) not in" + f"allow-list: {self.scim_config.required_saml_assurance_level}" + ) + + raise AuthenticationError(f"Unsupported authentication source: {self.auth_source}") + + def validate_saml_entitlements(self, data_owner: DataOwnerName, groupdb: Optional[ScimApiGroupDB] = None) -> None: + if groupdb is None: + raise AuthenticationError("No groupdb provided, cannot validate saml entitlements.") + + default_name = self.scim_config.account_manager_default_group + account_manager_group_name = self.scim_config.account_manager_group_mapping.get(data_owner, default_name) + logger.debug(f"Checking for account manager group called {account_manager_group_name}") + + account_manager_group = groupdb.get_group_by_display_name(display_name=account_manager_group_name) + if account_manager_group is None: + raise AuthenticationError('No "Account Managers" group found for data owner') + logger.debug(f"Found group {account_manager_group_name} with id {account_manager_group.graph.identifier}") + + # TODO: create a helper function to do this for all places where we do this dance in the repo + # create the expected saml group id + saml_group_id = f"{groupdb.graphdb.scope}:group:{account_manager_group.graph.identifier}#eduid-iam" + # match against users entitlements + entitlements = self.saml_entitlement or [] + if saml_group_id in entitlements: + logger.debug(f"{saml_group_id} in {entitlements}") + return + logger.error(f"{saml_group_id} NOT in {entitlements}") + raise AuthorizationError(f"Not authorized: {saml_group_id} not in saml entitlements") + + def get_data_owner(self) -> Optional[DataOwnerName]: """ Get the data owner to use. @@ -94,7 +190,7 @@ def get_data_owner(self, logger: logging.Logger) -> Optional[DataOwnerName]: requested_access: [{'type': 'scim-api', 'scope': 'example.edu'}]} """ - allowed_scopes = self._get_allowed_scopes(self.scim_config, logger) + allowed_scopes = self._get_allowed_scopes(self.scim_config) logger.debug(f"Request {self}, allowed scopes: {allowed_scopes}") # only support one requested access at a time for now and do not fall back to simple scope check if @@ -126,7 +222,7 @@ def get_data_owner(self, logger: logging.Logger) -> Optional[DataOwnerName]: return None - def _get_allowed_scopes(self, config: ScimApiConfig, logger: logging.Logger) -> set[ScopeName]: + def _get_allowed_scopes(self, config: ScimApiConfig) -> set[ScopeName]: """ Make a set of all the allowed scopes for the requester. @@ -260,7 +356,15 @@ async def dispatch(self, req: Request, call_next) -> Response: return await http_error_detail_handler(req=req, exc=Unauthorized(detail="Bearer token error")) try: - data_owner = token.get_data_owner(self.context.logger) + token.validate_auth_source() + except AuthenticationError as exc: + self.context.logger.error(f"Access denied: {exc}") + return await http_error_detail_handler( + req=req, exc=Unauthorized(detail="Authentication source or assurance level invalid") + ) + + try: + data_owner = token.get_data_owner() except RequestedAccessDenied as exc: self.context.logger.error(f"Access denied: {exc}") return await http_error_detail_handler( @@ -278,4 +382,14 @@ async def dispatch(self, req: Request, call_next) -> Response: req.context.invitedb = self.context.get_invitedb(data_owner) req.context.eventdb = self.context.get_eventdb(data_owner) + # check authorization for interaction authentications + try: + if token.auth_source == AuthSource.INTERACTION: + token.validate_saml_entitlements(data_owner=data_owner, groupdb=req.context.groupdb) + except AuthorizationError as exc: + self.context.logger.error(f"Access denied: {exc}") + return await http_error_detail_handler( + req=req, exc=Unauthorized(detail="Missing correct entitlement in saml data") + ) + return await call_next(req) diff --git a/src/eduid/scimapi/routers/login.py b/src/eduid/scimapi/routers/login.py index b3247f96f..8ab5c90e9 100644 --- a/src/eduid/scimapi/routers/login.py +++ b/src/eduid/scimapi/routers/login.py @@ -6,6 +6,7 @@ from eduid.scimapi.api_router import APIRouter from eduid.scimapi.context_request import ContextRequest from eduid.scimapi.exceptions import ErrorDetail, NotFound, Unauthorized +from eduid.scimapi.middleware import AuthSource from eduid.scimapi.models.login import TokenRequest login_router = APIRouter( @@ -33,6 +34,7 @@ async def get_token(req: ContextRequest, resp: Response, token_req: TokenRequest "exp": expire.timestamp(), "scopes": [token_req.data_owner], "version": 1, + "auth_source": AuthSource.CONFIG, } token = jwt.JWT(header={"alg": "ES256"}, claims=claims) token.make_signed_token(signing_key) diff --git a/src/eduid/scimapi/testing.py b/src/eduid/scimapi/testing.py index 6abcded41..74d4f48d0 100644 --- a/src/eduid/scimapi/testing.py +++ b/src/eduid/scimapi/testing.py @@ -12,6 +12,7 @@ from eduid.common.config.parsers import load_config from eduid.common.models.scim_base import SCIMSchema +from eduid.graphdb.groupdb import User as GraphUser from eduid.graphdb.testing import Neo4jTemporaryInstance from eduid.queue.db.message import MessageDB from eduid.scimapi.app import init_api @@ -83,10 +84,6 @@ def setUpClass(cls) -> None: ) super().setUpClass() - def tearDown(self): - super().tearDown() - self.neo4j_instance.purge_db() - class ScimApiTestCase(MongoNeoTestCase): """Base test case providing the real API""" @@ -107,6 +104,7 @@ def setUp(self) -> None: # TODO: more tests for scoped groups when that is implemented self.data_owner = DataOwnerName("eduid.se") self.userdb = self.context.get_userdb(self.data_owner) + self.groupdb = self.context.get_groupdb(self.data_owner) self.invitedb = self.context.get_invitedb(self.data_owner) self.signup_invitedb = SignupInviteDB(db_uri=config.mongo_uri) self.messagedb = MessageDB(db_uri=config.mongo_uri) @@ -143,6 +141,15 @@ def add_user( self.userdb.save(user) return self.userdb.get_user_by_scim_id(scim_id=identifier) + def add_group_with_member( + self, group_identifier: str, display_name: str, user_identifier: str + ) -> Optional[ScimApiGroup]: + group = ScimApiGroup(scim_id=uuid.UUID(group_identifier), display_name=display_name) + group.add_member(GraphUser(identifier=user_identifier, display_name="Test Member 1")) + assert self.groupdb + self.groupdb.save(group) + return self.groupdb.get_group_by_scim_id(scim_id=group_identifier) + def tearDown(self): super().tearDown() if self.userdb: @@ -155,6 +162,9 @@ def tearDown(self): self.signup_invitedb._drop_whole_collection() if self.messagedb: self.messagedb._drop_whole_collection() + if self.groupdb: + self.groupdb._drop_whole_collection() + self.neo4j_instance.purge_db() def _assertScimError( self, diff --git a/src/eduid/scimapi/tests/test_authn.py b/src/eduid/scimapi/tests/test_authn.py index b0b1c3a60..474849328 100644 --- a/src/eduid/scimapi/tests/test_authn.py +++ b/src/eduid/scimapi/tests/test_authn.py @@ -12,7 +12,7 @@ from eduid.common.config.parsers import load_config from eduid.common.models.scim_base import SCIMSchema from eduid.scimapi.config import DataOwner, DataOwnerName, ScimApiConfig, ScopeName -from eduid.scimapi.middleware import AuthnBearerToken, RequestedAccessDenied, SudoAccess +from eduid.scimapi.middleware import AuthnBearerToken, AuthSource, RequestedAccessDenied, SudoAccess from eduid.scimapi.testing import BaseDBTestCase from eduid.scimapi.tests.test_scimuser import ScimApiTestUserResourceBase from eduid.userdb.scimapi import ScimApiProfile @@ -45,29 +45,40 @@ def test_scopes_canonicalization(self) -> None: config.scope_mapping[ScopeName("example.com")] = DataOwnerName(domain) config.scope_mapping[ScopeName("example.org")] = DataOwnerName(domain) # test no canonization - token = AuthnBearerToken(scim_config=self.config, version=1, scopes={ScopeName(domain)}) + token = AuthnBearerToken( + scim_config=self.config, version=1, scopes={ScopeName(domain)}, auth_source=AuthSource.CONFIG + ) assert token.scopes == {domain} # test no canonization, but normalisation - token = AuthnBearerToken(scim_config=self.config, version=1, scopes={ScopeName(domain.upper())}) + token = AuthnBearerToken( + scim_config=self.config, version=1, scopes={ScopeName(domain.upper())}, auth_source=AuthSource.CONFIG + ) assert token.scopes == {domain} # test canonization - token = AuthnBearerToken(scim_config=self.config, version=1, scopes={ScopeName("example.org")}) + token = AuthnBearerToken( + scim_config=self.config, version=1, scopes={ScopeName("example.org")}, auth_source=AuthSource.CONFIG + ) assert token.scopes == {domain} # test canonization and normalisation - token = AuthnBearerToken(scim_config=self.config, version=1, scopes={ScopeName("Example.Org")}) + token = AuthnBearerToken( + scim_config=self.config, version=1, scopes={ScopeName("Example.Org")}, auth_source=AuthSource.CONFIG + ) assert token.scopes == {domain} # test canonization and normalisation, and de-duplication token = AuthnBearerToken( scim_config=self.config, version=1, scopes={ScopeName("Example.Org"), ScopeName("example.coM"), ScopeName("other.foo")}, + auth_source=AuthSource.CONFIG, ) assert token.scopes == {domain, "other.foo"} def test_invalid_scope(self) -> None: # test too short domain name with pytest.raises(ValueError) as exc_info: - AuthnBearerToken(scim_config=self.config, version=1, scopes={ScopeName(".se")}) + AuthnBearerToken( + scim_config=self.config, version=1, scopes={ScopeName(".se")}, auth_source=AuthSource.CONFIG + ) assert exc_info.value.errors() == [ # type: ignore { "ctx": {"limit_value": 4}, @@ -80,7 +91,9 @@ def test_invalid_scope(self) -> None: def test_invalid_version(self) -> None: # test too short domain name with pytest.raises(ValueError) as exc_info: - AuthnBearerToken(scim_config=self.config, version=99, scopes={ScopeName("eduid.se")}) + AuthnBearerToken( + scim_config=self.config, version=99, scopes={ScopeName("eduid.se")}, auth_source=AuthSource.CONFIG + ) assert exc_info.value.errors() == [ # type: ignore {"loc": ("version",), "msg": "Unknown version", "type": "value_error"} ] @@ -99,6 +112,7 @@ def test_requested_access_canonicalization(self) -> None: version=1, scopes={ScopeName(domain)}, requested_access=[SudoAccess(type=_requested_access_type, scope=domain)], + auth_source=AuthSource.CONFIG, ) assert token.scopes == {domain} assert token.requested_access == [SudoAccess(type=_requested_access_type, scope=domain)] @@ -108,6 +122,7 @@ def test_requested_access_canonicalization(self) -> None: version=1, scopes={ScopeName(domain.capitalize())}, requested_access=[SudoAccess(type=_requested_access_type, scope=ScopeName(domain.upper()))], + auth_source=AuthSource.CONFIG, ) assert token.scopes == {domain} assert token.requested_access == [SudoAccess(type=_requested_access_type, scope=domain)] @@ -117,6 +132,7 @@ def test_requested_access_canonicalization(self) -> None: version=1, scopes={domain}, requested_access=[SudoAccess(type=_requested_access_type, scope=ScopeName("example.org"))], + auth_source=AuthSource.CONFIG, ) assert token.scopes == {domain} assert token.requested_access == [SudoAccess(type=_requested_access_type, scope=domain)] @@ -129,6 +145,7 @@ def test_invalid_requested_access_scope(self): version=1, scopes={"eduid.se"}, requested_access=[SudoAccess(type=self.config.requested_access_type, scope=".se")], + auth_source=AuthSource.CONFIG, ) assert exc_info.value.errors() == [ { @@ -148,6 +165,7 @@ def test_requested_access_not_for_us(self): version=1, scopes={domain}, requested_access=[SudoAccess(type="someone else", scope=domain)], + auth_source=AuthSource.CONFIG, ) assert token.scopes == {domain} assert token.requested_access == [] @@ -155,11 +173,34 @@ def test_requested_access_not_for_us(self): def test_regular_token(self): """Test the normal case. Login with access granted based on the single scope in the request.""" domain = "eduid.se" - claims = {"version": 1, "scopes": [domain]} + claims = { + "version": 1, + "scopes": [domain], + "auth_source": "config", + "requested_access": [{"type": "scim-api", "scope": "eduid.se"}], + } + token = AuthnBearerToken(scim_config=self.config, **claims) + assert token.version == 1 + assert token.scopes == {domain} + assert token.get_data_owner() == domain + assert token.auth_source == AuthSource.CONFIG + assert token.requested_access == [SudoAccess(type="scim-api", scope=ScopeName("eduid.se"))] + + def test_interaction_token(self): + """Test the normal case. Login with access granted based on the single scope in the request.""" + domain = "eduid.se" + claims = { + "version": 1, + "saml_eppn": f"eppn@{domain}", + "auth_source": "interaction", + "requested_access": [{"type": "scim-api", "scope": "eduid.se"}], + } token = AuthnBearerToken(scim_config=self.config, **claims) assert token.version == 1 assert token.scopes == {domain} - assert token.get_data_owner(logger=logger) == domain + assert token.get_data_owner() == domain + assert token.auth_source == AuthSource.INTERACTION + assert token.requested_access == [SudoAccess(type="scim-api", scope=ScopeName("eduid.se"))] def test_regular_token_with_canonisation(self): """Test the normal case. Login with access granted based on the single scope in the request.""" @@ -167,9 +208,19 @@ def test_regular_token_with_canonisation(self): domain_alias = "eduid.example.edu" config = self.config.copy() config.scope_mapping[domain_alias] = domain - claims = {"version": 1, "scopes": [domain_alias]} + claims = {"version": 1, "scopes": [domain_alias], "auth_source": "config"} + token = AuthnBearerToken(scim_config=self.config, **claims) + assert token.get_data_owner() == domain + + def test_interaction_token_with_canonisation(self): + """Test the normal case. Login with access granted based on the single scope in the request.""" + domain = DataOwnerName("eduid.se") + domain_alias = ScopeName("eduid.example.edu") + config = self.config.copy() + config.scope_mapping[domain_alias] = domain + claims = {"version": 1, "auth_source": "interaction", "saml_eppn": f"user@{domain_alias}"} token = AuthnBearerToken(scim_config=self.config, **claims) - assert token.get_data_owner(logger=logger) == domain + assert token.get_data_owner() == domain def test_regular_token_upper_case(self): """ @@ -177,27 +228,34 @@ def test_regular_token_upper_case(self): Scope provided in upper-case in the request. """ domain = "eduid.se" - claims = {"version": 1, "scopes": [domain.upper()]} + claims = {"version": 1, "scopes": [domain.upper()], "auth_source": "config"} token = AuthnBearerToken(scim_config=self.config, **claims) assert token.version == 1 assert token.scopes == {domain} - assert token.get_data_owner(logger=logger) == domain + assert token.get_data_owner() == domain def test_unknown_scope(self): """Test login with a scope that has no data owner in the configuration.""" domain = "example.org" - claims = {"version": 1, "scopes": [domain]} + claims = {"version": 1, "scopes": [domain], "auth_source": "config"} token = AuthnBearerToken(scim_config=self.config, **claims) - assert token.get_data_owner(logger=logger) is None + assert token.get_data_owner() is None + + def test_interaction_token_unknown_scope(self): + """Test login with a scope that has no data owner in the configuration.""" + domain = "example.org" + claims = {"version": 1, "saml_eppn": f"eppn{domain}", "auth_source": "interaction"} + token = AuthnBearerToken(scim_config=self.config, **claims) + assert token.get_data_owner() is None def test_regular_token_multiple_scopes(self): """Test the normal case. Login with access granted based on the scope in the request that has a data owner in configuration (one extra scope provided in the request, named 'aaa' so it is checked first - and skipped). """ domain = "eduid.se" - claims = {"version": 1, "scopes": ["aaa.example.com", domain]} + claims = {"version": 1, "scopes": ["aaa.example.com", domain], "auth_source": "config"} token = AuthnBearerToken(scim_config=self.config, **claims) - assert token.get_data_owner(logger=logger) == domain + assert token.get_data_owner() == domain def test_sudo_allowed(self) -> None: """Test the normal case when sudo:ing.""" @@ -210,9 +268,10 @@ def test_sudo_allowed(self) -> None: "version": 1, "scopes": [sudoer], "requested_access": [{"type": config.requested_access_type, "scope": domain}], + "auth_source": "config", } token = AuthnBearerToken(scim_config=config, **claims) - assert token.get_data_owner(logger=logger) == domain + assert token.get_data_owner() == domain def test_sudo_not_allowed(self) -> None: """Test attempting to sudo, but the target scope (other-domain.example.org) is not in the list of @@ -226,11 +285,12 @@ def test_sudo_not_allowed(self) -> None: "version": 1, "scopes": [sudoer], "requested_access": [{"type": config.requested_access_type, "scope": domain}], + "auth_source": "config", } token = AuthnBearerToken(scim_config=config, **claims) with pytest.raises(RequestedAccessDenied) as exc_info: - assert token.get_data_owner(logger=logger) == None + assert token.get_data_owner() is None assert str(exc_info.value) == ( "Requested access to scope eduid.se not in allow-list: other-domain.example.org, sudoer.example.org" ) @@ -247,12 +307,13 @@ def test_sudo_unknown_scope(self) -> None: "version": 1, "scopes": [sudoer], "requested_access": [{"type": config.requested_access_type, "scope": domain}], + "auth_source": "config", } token = AuthnBearerToken(scim_config=config, **claims) with pytest.raises(RequestedAccessDenied) as exc_info: - assert token.get_data_owner(logger=logger) == None - assert str(exc_info.value) == ("Requested access to scope other-domain.example.org but no data owner found") + assert token.get_data_owner() is None + assert str(exc_info.value) == "Requested access to scope other-domain.example.org but no data owner found" def test_sudo_takes_precedence(self) -> None: """ @@ -269,9 +330,10 @@ def test_sudo_takes_precedence(self) -> None: "version": 1, "scopes": [sudoer], "requested_access": [{"type": config.requested_access_type, "scope": domain}], + "auth_source": "config", } token = AuthnBearerToken(scim_config=config, **claims) - assert token.get_data_owner(logger=logger) == domain + assert token.get_data_owner() == domain def test_sudo_with_canonicalisation(self) -> None: """ @@ -290,9 +352,10 @@ def test_sudo_with_canonicalisation(self) -> None: "version": 1, "scopes": [sudoer], "requested_access": [{"type": config.requested_access_type, "scope": domain_alias}], + "auth_source": "config", } token = AuthnBearerToken(scim_config=config, **claims) - assert token.get_data_owner(logger=logger) == domain + assert token.get_data_owner() == domain class TestAuthnUserResource(ScimApiTestUserResourceBase): @@ -348,7 +411,40 @@ def test_get_user_untrusted_token(self): def test_get_user_correct_token(self): db_user = self.add_user(identifier=str(uuid4()), external_id="test-id-1", profiles={"test": self.test_profile}) - claims = {"scopes": ["eduid.se"], "version": 1} + claims = {"scopes": ["eduid.se"], "version": 1, "auth_source": "config"} + token = self._make_bearer_token(claims=claims) + + response = self._get_user_from_api(user=db_user, bearer_token=token) + + self._assertResponse(response, 200) + + _req = { + SCIMSchema.NUTID_USER_V1.value: {"profiles": {"test": asdict(self.test_profile)}, "linked_accounts": []}, + } + self._assertUserUpdateSuccess(_req, response, db_user) + + def test_get_user_interaction_token(self): + db_user = self.add_user(identifier=str(uuid4()), external_id="test-id-1", profiles={"test": self.test_profile}) + db_group = self.add_group_with_member( + group_identifier=str(uuid4()), + display_name=self.context.config.account_manager_default_group, + user_identifier=str(db_user.scim_id), + ) + + claims = { + "saml_eppn": "eppn@eduid.se", + "version": 1, + "auth_source": "interaction", + "saml_assurance": [ + "http://www.swamid.se/policy/assurance/al1", + "http://www.swamid.se/policy/assurance/al2", + "http://www.swamid.se/policy/assurance/al3", + ], + "saml_entitlement": [ + "urn:mace:some:other:entitlement", + f"{self.groupdb.graphdb.scope}:group:{db_group.graph.identifier}#eduid-iam", + ], + } token = self._make_bearer_token(claims=claims) response = self._get_user_from_api(user=db_user, bearer_token=token) diff --git a/src/eduid/userdb/scimapi/groupdb.py b/src/eduid/userdb/scimapi/groupdb.py index 6b549a5f2..965bca96c 100644 --- a/src/eduid/userdb/scimapi/groupdb.py +++ b/src/eduid/userdb/scimapi/groupdb.py @@ -258,6 +258,14 @@ def get_group_by_scim_id(self, scim_id: str) -> Optional[ScimApiGroup]: return group return None + def get_group_by_display_name(self, display_name: str) -> Optional[ScimApiGroup]: + doc = self._get_document_by_attr("display_name", display_name) + if doc: + group = ScimApiGroup.from_dict(doc) + group.graph = self._get_graph_group(str(group.scim_id)) + return group + return None + def get_groups_by_property( self, key: str, value: Union[str, int], skip=0, limit=100 ) -> tuple[list[ScimApiGroup], int]: