From c2139a450b4b3920f04ba92f7b8ea5b09cde650b Mon Sep 17 00:00:00 2001 From: Joel McCoy Date: Thu, 4 Jan 2024 17:05:26 -0600 Subject: [PATCH] SSOAdmin: Add customer managed policy functionality (#7186) --- moto/ssoadmin/models.py | 88 ++++++++++ moto/ssoadmin/responses.py | 56 +++++++ moto/ssoadmin/utils.py | 7 + tests/test_ssoadmin/test_ssoadmin_policies.py | 150 ++++++++++++++++++ 4 files changed, 301 insertions(+) diff --git a/moto/ssoadmin/models.py b/moto/ssoadmin/models.py index 7d7433f80d08..fee98d4c7f3d 100644 --- a/moto/ssoadmin/models.py +++ b/moto/ssoadmin/models.py @@ -70,6 +70,7 @@ def __init__( self.created_date = unix_time() self.inline_policy = "" self.managed_policies: List[ManagedPolicy] = list() + self.customer_managed_policies: List[CustomerManagedPolicy] = list() self.total_managed_policies_attached = ( 0 # this will also include customer managed policies ) @@ -107,6 +108,17 @@ def __eq__(self, other: Any) -> bool: return self.arn == other.arn +class CustomerManagedPolicy(BaseModel): + def __init__(self, name: str, path: str = "/"): + self.name = name + self.path = path + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, CustomerManagedPolicy): + return False + return f"{self.path}{self.name}" == f"{other.path}{other.name}" + + class SSOAdminBackend(BaseBackend): """Implementation of SSOAdmin APIs.""" @@ -424,5 +436,81 @@ def detach_managed_policy_from_permission_set( instance_arn, permission_set_arn, managed_policy_arn ) + def attach_customer_managed_policy_reference_to_permission_set( + self, + instance_arn: str, + permission_set_arn: str, + customer_managed_policy_reference: Dict[str, str], + ) -> None: + permissionset = self._find_permission_set( + permission_set_arn=permission_set_arn, instance_arn=instance_arn + ) + + name = customer_managed_policy_reference["Name"] + path = customer_managed_policy_reference.get("Path", "/") # default path is "/" + customer_managed_policy = CustomerManagedPolicy(name=name, path=path) + + if customer_managed_policy in permissionset.customer_managed_policies: + raise ConflictException( + f"Given customer managed policy with name: {name} and path {path} already attached" + ) + + if ( + permissionset.total_managed_policies_attached + >= MAX_MANAGED_POLICIES_PER_PERMISSION_SET + ): + raise ServiceQuotaExceededException( + f"Cannot attach managed policy: number of attached managed policies is already at maximum {MAX_MANAGED_POLICIES_PER_PERMISSION_SET}" + ) + + permissionset.customer_managed_policies.append(customer_managed_policy) + permissionset.total_managed_policies_attached += 1 + + @paginate(pagination_model=PAGINATION_MODEL) # type: ignore[misc] + def list_customer_managed_policy_references_in_permission_set( + self, instance_arn: str, permission_set_arn: str + ) -> List[CustomerManagedPolicy]: + permissionset = self._find_permission_set( + permission_set_arn=permission_set_arn, instance_arn=instance_arn + ) + return permissionset.customer_managed_policies + + def _detach_customer_managed_policy_from_permissionset( + self, + instance_arn: str, + permission_set_arn: str, + customer_managed_policy_reference: Dict[str, str], + ) -> None: + permissionset = self._find_permission_set( + permission_set_arn=permission_set_arn, instance_arn=instance_arn + ) + path: str = customer_managed_policy_reference.get("Path", "/") + name: str = customer_managed_policy_reference["Name"] + + for customer_managed_policy in permissionset.customer_managed_policies: + if ( + customer_managed_policy.name == name + and customer_managed_policy.path == path + ): + permissionset.customer_managed_policies.remove(customer_managed_policy) + permissionset.total_managed_policies_attached -= 1 + return + + raise ResourceNotFoundException( + f"Given managed policy with name: {name} and path {path} does not exist on PermissionSet" + ) + + def detach_customer_managed_policy_reference_from_permission_set( + self, + instance_arn: str, + permission_set_arn: str, + customer_managed_policy_reference: Dict[str, str], + ) -> None: + self._detach_customer_managed_policy_from_permissionset( + instance_arn=instance_arn, + permission_set_arn=permission_set_arn, + customer_managed_policy_reference=customer_managed_policy_reference, + ) + ssoadmin_backends = BackendDict(SSOAdminBackend, "sso") diff --git a/moto/ssoadmin/responses.py b/moto/ssoadmin/responses.py index 3187f803a9ec..86f38a581768 100644 --- a/moto/ssoadmin/responses.py +++ b/moto/ssoadmin/responses.py @@ -244,3 +244,59 @@ def detach_managed_policy_from_permission_set(self) -> str: managed_policy_arn=managed_policy_arn, ) return json.dumps({}) + + def attach_customer_managed_policy_reference_to_permission_set(self) -> str: + instance_arn = self._get_param("InstanceArn") + permission_set_arn = self._get_param("PermissionSetArn") + customer_managed_policy_reference = self._get_param( + "CustomerManagedPolicyReference" + ) + self.ssoadmin_backend.attach_customer_managed_policy_reference_to_permission_set( + instance_arn=instance_arn, + permission_set_arn=permission_set_arn, + customer_managed_policy_reference=customer_managed_policy_reference, + ) + return json.dumps({}) + + def list_customer_managed_policy_references_in_permission_set(self) -> str: + instance_arn = self._get_param("InstanceArn") + permission_set_arn = self._get_param("PermissionSetArn") + max_results = self._get_int_param("MaxResults") + next_token = self._get_param("NextToken") + + ( + customer_managed_policy_references, + next_token, + ) = self.ssoadmin_backend.list_customer_managed_policy_references_in_permission_set( + instance_arn=instance_arn, + permission_set_arn=permission_set_arn, + max_results=max_results, + next_token=next_token, + ) + + customer_managed_policy_references_response = [ + { + "Name": customer_managed_policy_reference.name, + "Path": customer_managed_policy_reference.path, + } + for customer_managed_policy_reference in customer_managed_policy_references + ] + return json.dumps( + { + "CustomerManagedPolicyReferences": customer_managed_policy_references_response, + "NextToken": next_token, + } + ) + + def detach_customer_managed_policy_reference_from_permission_set(self) -> str: + instance_arn = self._get_param("InstanceArn") + permission_set_arn = self._get_param("PermissionSetArn") + customer_managed_policy_reference = self._get_param( + "CustomerManagedPolicyReference" + ) + self.ssoadmin_backend.detach_customer_managed_policy_reference_from_permission_set( + instance_arn=instance_arn, + permission_set_arn=permission_set_arn, + customer_managed_policy_reference=customer_managed_policy_reference, + ) + return json.dumps({}) diff --git a/moto/ssoadmin/utils.py b/moto/ssoadmin/utils.py index 48d2a2952fc2..8ed8891c78a0 100644 --- a/moto/ssoadmin/utils.py +++ b/moto/ssoadmin/utils.py @@ -37,4 +37,11 @@ "result_key": "AttachedManagedPolicies", "unique_attribute": ["arn"], }, + "list_customer_managed_policy_references_in_permission_set": { + "input_token": "next_token", + "limit_key": "max_results", + "limit_default": 100, + "result_key": "CustomerManagedPolicyReferences", + "unique_attribute": ["name", "path"], + }, } diff --git a/tests/test_ssoadmin/test_ssoadmin_policies.py b/tests/test_ssoadmin/test_ssoadmin_policies.py index f9f2add04f5b..e7a8ab531026 100644 --- a/tests/test_ssoadmin/test_ssoadmin_policies.py +++ b/tests/test_ssoadmin/test_ssoadmin_policies.py @@ -328,3 +328,153 @@ def test_detach_managed_policy_from_permission_set(): ) assert len(response["AttachedManagedPolicies"]) == 0 + + +@mock_ssoadmin +def test_attach_customer_managed_policy_reference_to_permission_set(): + client = boto3.client("sso-admin", region_name="us-east-1") + permission_set_arn = create_permissionset(client) + + policy_name = "test-policy" + policy_path = "/test-path/" + + client.attach_customer_managed_policy_reference_to_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + CustomerManagedPolicyReference={ + "Name": policy_name, + "Path": policy_path, + }, + ) + + response = client.list_customer_managed_policy_references_in_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ) + + assert len(response["CustomerManagedPolicyReferences"]) == 1 + assert response["CustomerManagedPolicyReferences"][0]["Name"] == policy_name + assert response["CustomerManagedPolicyReferences"][0]["Path"] == policy_path + + # test for customer managed policy that is already attached + with pytest.raises(ClientError) as e: + client.attach_customer_managed_policy_reference_to_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + CustomerManagedPolicyReference={ + "Name": policy_name, + "Path": policy_path, + }, + ) + err = e.value.response["Error"] + assert err["Code"] == "ConflictException" + assert ( + err["Message"] + == f"Given customer managed policy with name: {policy_name} and path {policy_path} already attached" + ) + + +@mock_ssoadmin +def test_list_customer_managed_policy_references_in_permission_set(): + """ + Tests listing customer managed policies including pagination. + """ + client = boto3.client("sso-admin", region_name="us-east-1") + permission_set_arn = create_permissionset(client) + + policy_name = "test-policy-" + + # attach 3 customer managed policies + for idx in range(3): + client.attach_customer_managed_policy_reference_to_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + CustomerManagedPolicyReference={"Name": f"{policy_name}{idx}"}, + ) + + response = client.list_customer_managed_policy_references_in_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + MaxResults=2, + ) + + customer_managed_policy_names = [] + + assert len(response["CustomerManagedPolicyReferences"]) == 2 + next_token = response["NextToken"] + for name in response["CustomerManagedPolicyReferences"]: + customer_managed_policy_names.append(name["Name"]) + + response = client.list_customer_managed_policy_references_in_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + MaxResults=2, + NextToken=next_token, + ) + for name in response["CustomerManagedPolicyReferences"]: + customer_managed_policy_names.append(name["Name"]) + + assert len(response["CustomerManagedPolicyReferences"]) == 1 + + # ensure the 3 unique customer managed policies were returned + assert len(set(customer_managed_policy_names)) == 3 + + +@mock_ssoadmin +def test_detach_customer_managed_policy_reference_from_permission_set(): + client = boto3.client("sso-admin", region_name="us-east-1") + permission_set_arn = create_permissionset(client) + + # trying to detach a policy that doesn't exist yet + with pytest.raises(ClientError) as e: + client.detach_customer_managed_policy_reference_from_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + CustomerManagedPolicyReference={ + "Name": "test-policy", + }, + ) + err = e.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" + assert ( + err["Message"] + == "Given managed policy with name: test-policy and path / does not exist on PermissionSet" + ) + + # attach a policy + client.attach_customer_managed_policy_reference_to_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + CustomerManagedPolicyReference={ + "Name": "test-policy", + "Path": "/some-path/", + }, + ) + + # try to detach the policy but default path (should fail) + with pytest.raises(ClientError) as e: + client.detach_customer_managed_policy_reference_from_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + CustomerManagedPolicyReference={ + "Name": "test-policy", + }, + ) + + # detach the policy + client.detach_customer_managed_policy_reference_from_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + CustomerManagedPolicyReference={ + "Name": "test-policy", + "Path": "/some-path/", + }, + ) + + # ensure policy is detached + response = client.list_customer_managed_policy_references_in_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ) + + assert len(response["CustomerManagedPolicyReferences"]) == 0