From b7d26f5b63626e164e8b7291241f5fb463ca6af0 Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Tue, 15 Oct 2024 17:33:02 +0200 Subject: [PATCH] IFC-761 Use flags to handle branch permissions (#4601) --- backend/infrahub/api/schema.py | 14 +- backend/infrahub/core/account.py | 32 ++--- backend/infrahub/core/attribute.py | 7 +- backend/infrahub/core/constants/__init__.py | 10 +- backend/infrahub/core/enums.py | 2 +- backend/infrahub/core/initialization.py | 4 +- backend/infrahub/core/node/permissions.py | 11 +- backend/infrahub/core/protocols.py | 1 - .../infrahub/core/schema/definitions/core.py | 13 +- .../default_branch_checker.py | 5 +- .../merge_operation_checker.py | 7 +- .../object_permission_checker.py | 42 +++--- .../super_admin_checker.py | 7 +- backend/infrahub/graphql/enums.py | 6 +- .../graphql/mutations/proposed_change.py | 17 ++- backend/infrahub/graphql/queries/account.py | 1 - backend/infrahub/graphql/types/attribute.py | 3 +- backend/infrahub/graphql/types/permission.py | 16 ++- backend/infrahub/permissions/backend.py | 12 +- backend/infrahub/permissions/constants.py | 8 ++ backend/infrahub/permissions/local_backend.py | 86 ++++++----- backend/infrahub/permissions/report.py | 103 ++++++++----- backend/infrahub/permissions/types.py | 26 +++- backend/tests/unit/conftest.py | 5 +- backend/tests/unit/core/test_enums.py | 2 +- .../test_object_permission_checker.py | 14 +- .../graphql/queries/test_list_permissions.py | 120 +++++++--------- .../tests/unit/graphql/test_core_account.py | 5 +- .../tests/unit/permissions/test_backends.py | 136 +++++++++++++++--- 29 files changed, 461 insertions(+), 254 deletions(-) diff --git a/backend/infrahub/api/schema.py b/backend/infrahub/api/schema.py index 015f116329..a577264282 100644 --- a/backend/infrahub/api/schema.py +++ b/backend/infrahub/api/schema.py @@ -16,8 +16,9 @@ from infrahub.api.dependencies import get_branch_dep, get_current_user, get_db from infrahub.api.exceptions import SchemaNotValidError from infrahub.core import registry +from infrahub.core.account import GlobalPermission from infrahub.core.branch import Branch # noqa: TCH001 -from infrahub.core.constants import GlobalPermissions, PermissionDecision +from infrahub.core.constants import GLOBAL_BRANCH_NAME, GlobalPermissions, PermissionDecision from infrahub.core.migrations.schema.models import SchemaApplyMigrationData from infrahub.core.models import ( # noqa: TCH001 SchemaBranchHash, @@ -247,7 +248,16 @@ async def load_schema( if not await permission_backend.has_permission( db=db, account_id=account_session.account_id, - permission=f"global:{GlobalPermissions.MANAGE_SCHEMA.value}:{PermissionDecision.ALLOW.value}", + permission=GlobalPermission( + id="", + name="", + action=GlobalPermissions.MANAGE_SCHEMA.value, + decision=( + PermissionDecision.ALLOW_DEFAULT + if branch.name in (GLOBAL_BRANCH_NAME, registry.default_branch) + else PermissionDecision.ALLOW_OTHER + ).value, + ), branch=branch, ): raise PermissionDeniedError("You are not allowed to manage the schema") diff --git a/backend/infrahub/core/account.py b/backend/infrahub/core/account.py index dc249e4a57..31cbb1cc93 100644 --- a/backend/infrahub/core/account.py +++ b/backend/infrahub/core/account.py @@ -3,7 +3,7 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Optional, Union -from infrahub.core.constants import InfrahubKind +from infrahub.core.constants import InfrahubKind, PermissionDecision from infrahub.core.query import Query from infrahub.core.registry import registry @@ -21,22 +21,23 @@ class Permission: id: str name: str action: str - decision: str + decision: int @dataclass class GlobalPermission(Permission): def __str__(self) -> str: - return f"global:{self.action}:{self.decision}" + decision = PermissionDecision(self.decision) + return f"global:{self.action}:{decision.name.lower()}" @dataclass class ObjectPermission(Permission): - branch: str namespace: str def __str__(self) -> str: - return f"object:{self.branch}:{self.namespace}:{self.name}:{self.action}:{self.decision}" + decision = PermissionDecision(self.decision) + return f"object:{self.namespace}:{self.name}:{self.action}:{decision.name.lower()}" class AccountGlobalPermissionQuery(Query): @@ -234,17 +235,6 @@ async def query_init(self, db: InfrahubDatabase, **kwargs: Any) -> None: RETURN object_permission } WITH object_permission - CALL { - WITH object_permission - MATCH (object_permission)-[r1:HAS_ATTRIBUTE]->(:Attribute {name: "branch"})-[r2:HAS_VALUE]->(object_permission_branch:AttributeValue) - WHERE all(r IN [r1, r2] WHERE (%(branch_filter)s)) - WITH object_permission_branch, r1, r2, (r1.status = "active" AND r2.status = "active") AS is_active - ORDER BY object_permission_branch.uuid, r2.branch_level DESC, r2.from DESC, r1.branch_level DESC, r1.from DESC - WITH object_permission_branch, head(collect(is_active)) as latest_is_active - WHERE latest_is_active = TRUE - RETURN object_permission_branch - } - WITH object_permission, object_permission_branch CALL { WITH object_permission @@ -254,7 +244,7 @@ async def query_init(self, db: InfrahubDatabase, **kwargs: Any) -> None: ORDER BY r2.branch_level DESC, r2.from DESC, r1.branch_level DESC, r1.from DESC LIMIT 1 } - WITH object_permission, object_permission_branch, object_permission_namespace, is_active AS opn_is_active + WITH object_permission, object_permission_namespace, is_active AS opn_is_active WHERE opn_is_active = TRUE CALL { WITH object_permission @@ -264,7 +254,7 @@ async def query_init(self, db: InfrahubDatabase, **kwargs: Any) -> None: ORDER BY r2.branch_level DESC, r2.from DESC, r1.branch_level DESC, r1.from DESC LIMIT 1 } - WITH object_permission, object_permission_branch, object_permission_namespace, object_permission_name, is_active AS opn_is_active + WITH object_permission, object_permission_namespace, object_permission_name, is_active AS opn_is_active WHERE opn_is_active = TRUE CALL { WITH object_permission @@ -274,7 +264,7 @@ async def query_init(self, db: InfrahubDatabase, **kwargs: Any) -> None: ORDER BY r2.branch_level DESC, r2.from DESC, r1.branch_level DESC, r1.from DESC LIMIT 1 } - WITH object_permission, object_permission_branch, object_permission_namespace, object_permission_name, object_permission_action, is_active AS opa_is_active + WITH object_permission, object_permission_namespace, object_permission_name, object_permission_action, is_active AS opa_is_active WHERE opa_is_active = TRUE CALL { WITH object_permission @@ -284,7 +274,7 @@ async def query_init(self, db: InfrahubDatabase, **kwargs: Any) -> None: ORDER BY r2.branch_level DESC, r2.from DESC, r1.branch_level DESC, r1.from DESC LIMIT 1 } - WITH object_permission, object_permission_branch, object_permission_namespace, object_permission_name, object_permission_action, object_permission_decision, is_active AS opd_is_active + WITH object_permission, object_permission_namespace, object_permission_name, object_permission_action, object_permission_decision, is_active AS opd_is_active WHERE opd_is_active = TRUE """ % { "branch_filter": branch_filter, @@ -298,7 +288,6 @@ async def query_init(self, db: InfrahubDatabase, **kwargs: Any) -> None: self.return_labels = [ "object_permission", - "object_permission_branch", "object_permission_namespace", "object_permission_name", "object_permission_action", @@ -311,7 +300,6 @@ def get_permissions(self) -> list[ObjectPermission]: permissions.append( ObjectPermission( id=result.get("object_permission").get("uuid"), - branch=result.get("object_permission_branch").get("value"), namespace=result.get("object_permission_namespace").get("value"), name=result.get("object_permission_name").get("value"), action=result.get("object_permission_action").get("value"), diff --git a/backend/infrahub/core/attribute.py b/backend/infrahub/core/attribute.py index 3fdff96860..637b3cb35e 100644 --- a/backend/infrahub/core/attribute.py +++ b/backend/infrahub/core/attribute.py @@ -465,9 +465,7 @@ async def to_graphql( """Generate GraphQL Payload for this attribute.""" # pylint: disable=too-many-branches - response: dict[str, Any] = { - "id": self.id, - } + response: dict[str, Any] = {"id": self.id} if fields and isinstance(fields, dict): field_names = fields.keys() @@ -508,6 +506,9 @@ async def to_graphql( ) continue + if field_name == "permissions": + response["permissions"] = {"view": "ALLOW", "update": "ALLOW"} + if field_name.startswith("_"): field = getattr(self, field_name[1:]) else: diff --git a/backend/infrahub/core/constants/__init__.py b/backend/infrahub/core/constants/__init__.py index 7c8a8811f1..b258cfb8a5 100644 --- a/backend/infrahub/core/constants/__init__.py +++ b/backend/infrahub/core/constants/__init__.py @@ -4,7 +4,7 @@ from infrahub.core.constants import infrahubkind as InfrahubKind from infrahub.exceptions import ValidationError -from infrahub.utils import InfrahubStringEnum +from infrahub.utils import InfrahubNumberEnum, InfrahubStringEnum from .schema import FlagProperty, NodeProperty, SchemaElementPathType, UpdateSupport, UpdateValidationErrorType @@ -69,9 +69,11 @@ class PermissionAction(InfrahubStringEnum): VIEW = "view" -class PermissionDecision(InfrahubStringEnum): - ALLOW = "allow" - DENY = "deny" +class PermissionDecision(InfrahubNumberEnum): + DENY = 1 + ALLOW_DEFAULT = 2 + ALLOW_OTHER = 4 + ALLOW_ALL = 6 class AccountRole(InfrahubStringEnum): diff --git a/backend/infrahub/core/enums.py b/backend/infrahub/core/enums.py index 83798cf2a0..b4c7430add 100644 --- a/backend/infrahub/core/enums.py +++ b/backend/infrahub/core/enums.py @@ -9,7 +9,7 @@ def generate_python_enum(name: str, options: list[Any]) -> type[enum.Enum]: main_attrs = {} for option in options: if isinstance(option, int): - enum_name = str(option) + enum_name = f"Value_{option!s}" else: enum_name = "_".join(re.findall(ENUM_NAME_REGEX, option)).upper() diff --git a/backend/infrahub/core/initialization.py b/backend/infrahub/core/initialization.py index eef11ef412..1d7e40693c 100644 --- a/backend/infrahub/core/initialization.py +++ b/backend/infrahub/core/initialization.py @@ -300,7 +300,7 @@ async def create_initial_permission(db: InfrahubDatabase) -> Node: db=db, name=format_label(GlobalPermissions.SUPER_ADMIN.value), action=GlobalPermissions.SUPER_ADMIN.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_ALL.value, ) await permission.save(db=db) log.info(f"Created global permission: {GlobalPermissions.SUPER_ADMIN}") @@ -329,7 +329,7 @@ async def create_super_administrator_role(db: InfrahubDatabase) -> Node: db=db, name=format_label(GlobalPermissions.SUPER_ADMIN.value), action=GlobalPermissions.SUPER_ADMIN.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_ALL.value, ) await permission.save(db=db) log.info(f"Created global permission: {GlobalPermissions.SUPER_ADMIN}") diff --git a/backend/infrahub/core/node/permissions.py b/backend/infrahub/core/node/permissions.py index cc0536b7da..b4864e3bdc 100644 --- a/backend/infrahub/core/node/permissions.py +++ b/backend/infrahub/core/node/permissions.py @@ -2,6 +2,8 @@ from typing import TYPE_CHECKING, Optional +from infrahub.permissions.constants import PermissionDecisionFlag + from . import Node if TYPE_CHECKING: @@ -27,7 +29,8 @@ async def to_graphql( if fields: if "identifier" in fields: - response["identifier"] = {"value": f"global:{self.action.value}:{self.decision.value.value}"} # type: ignore[attr-defined] + decision = PermissionDecisionFlag(value=self.decision.value.value) # type: ignore[attr-defined] + response["identifier"] = {"value": f"global:{self.action.value}:{decision.name.lower()}"} # type: ignore[attr-defined,union-attr] return response @@ -51,11 +54,9 @@ async def to_graphql( if fields: if "identifier" in fields: + decision = PermissionDecisionFlag(value=self.decision.value.value) # type: ignore[attr-defined] response["identifier"] = { - "value": ( - f"object:{self.branch.value}:{self.namespace.value}:{self.name.value}:{self.action.value.value}:" # type: ignore[attr-defined] - f"{self.decision.value.value}" # type: ignore[attr-defined] - ) + "value": f"object:{self.namespace.value}:{self.name.value}:{self.action.value.value}:{decision.name.lower()}" # type: ignore[attr-defined,union-attr] } return response diff --git a/backend/infrahub/core/protocols.py b/backend/infrahub/core/protocols.py index a49de3ff52..cc091b6b72 100644 --- a/backend/infrahub/core/protocols.py +++ b/backend/infrahub/core/protocols.py @@ -391,7 +391,6 @@ class CoreNumberPool(CoreResourcePool, LineageSource): class CoreObjectPermission(CoreBasePermission): - branch: String namespace: String name: String action: Enum diff --git a/backend/infrahub/core/schema/definitions/core.py b/backend/infrahub/core/schema/definitions/core.py index c839c1ccf6..b17cc5bae6 100644 --- a/backend/infrahub/core/schema/definitions/core.py +++ b/backend/infrahub/core/schema/definitions/core.py @@ -933,9 +933,9 @@ "attributes": [ { "name": "decision", - "kind": "Text", + "kind": "Number", "enum": PermissionDecision.available_types(), - "default_value": PermissionDecision.ALLOW.value, + "default_value": PermissionDecision.ALLOW_ALL.value, "order_weight": 5000, }, { @@ -2186,15 +2186,12 @@ "description": "A permission that grants rights to perform actions on objects", "label": "Object permission", "include_in_menu": False, - "order_by": ["branch__value", "namespace__value", "name__value", "action__value", "decision__value"], - "display_labels": ["branch__value", "namespace__value", "name__value", "action__value", "decision__value"], - "uniqueness_constraints": [ - ["branch__value", "namespace__value", "name__value", "action__value", "decision__value"] - ], + "order_by": ["namespace__value", "name__value", "action__value", "decision__value"], + "display_labels": ["namespace__value", "name__value", "action__value", "decision__value"], + "uniqueness_constraints": [["namespace__value", "name__value", "action__value", "decision__value"]], "generate_profile": False, "inherit_from": [InfrahubKind.BASEPERMISSION], "attributes": [ - {"name": "branch", "kind": "Text", "order_weight": 1000}, {"name": "namespace", "kind": "Text", "order_weight": 2000}, {"name": "name", "kind": "Text", "order_weight": 3000}, { diff --git a/backend/infrahub/graphql/auth/query_permission_checker/default_branch_checker.py b/backend/infrahub/graphql/auth/query_permission_checker/default_branch_checker.py index ae04806ea4..a25dfd6425 100644 --- a/backend/infrahub/graphql/auth/query_permission_checker/default_branch_checker.py +++ b/backend/infrahub/graphql/auth/query_permission_checker/default_branch_checker.py @@ -1,5 +1,6 @@ from infrahub.auth import AccountSession from infrahub.core import registry +from infrahub.core.account import GlobalPermission from infrahub.core.branch import Branch from infrahub.core.constants import GLOBAL_BRANCH_NAME, GlobalPermissions, PermissionDecision from infrahub.database import InfrahubDatabase @@ -13,7 +14,9 @@ class DefaultBranchPermissionChecker(GraphQLQueryPermissionCheckerInterface): """Checker that makes sure a user account can edit data in the default branch.""" - permission_required = f"global:{GlobalPermissions.EDIT_DEFAULT_BRANCH.value}:{PermissionDecision.ALLOW.value}" + permission_required = GlobalPermission( + id="", name="", action=GlobalPermissions.EDIT_DEFAULT_BRANCH.value, decision=PermissionDecision.ALLOW_ALL.value + ) exempt_operations = ["BranchCreate"] async def supports(self, db: InfrahubDatabase, account_session: AccountSession, branch: Branch) -> bool: diff --git a/backend/infrahub/graphql/auth/query_permission_checker/merge_operation_checker.py b/backend/infrahub/graphql/auth/query_permission_checker/merge_operation_checker.py index dbadc65112..878bf8840b 100644 --- a/backend/infrahub/graphql/auth/query_permission_checker/merge_operation_checker.py +++ b/backend/infrahub/graphql/auth/query_permission_checker/merge_operation_checker.py @@ -1,7 +1,8 @@ from infrahub.auth import AccountSession from infrahub.core import registry +from infrahub.core.account import GlobalPermission from infrahub.core.branch import Branch -from infrahub.core.constants import GlobalPermissions +from infrahub.core.constants import GlobalPermissions, PermissionDecision from infrahub.database import InfrahubDatabase from infrahub.exceptions import PermissionDeniedError from infrahub.graphql.analyzer import InfrahubGraphQLQueryAnalyzer @@ -13,7 +14,9 @@ class MergeBranchPermissionChecker(GraphQLQueryPermissionCheckerInterface): """Checker that makes sure a user account can merge a branch without going through a proposed change.""" - permission_required = f"global:{GlobalPermissions.MERGE_BRANCH.value}:allow" + permission_required = GlobalPermission( + id="", name="", action=GlobalPermissions.MERGE_BRANCH.value, decision=PermissionDecision.ALLOW_ALL.value + ) async def supports(self, db: InfrahubDatabase, account_session: AccountSession, branch: Branch) -> bool: return account_session.authenticated diff --git a/backend/infrahub/graphql/auth/query_permission_checker/object_permission_checker.py b/backend/infrahub/graphql/auth/query_permission_checker/object_permission_checker.py index 2f31be77b4..7fee47e3c8 100644 --- a/backend/infrahub/graphql/auth/query_permission_checker/object_permission_checker.py +++ b/backend/infrahub/graphql/auth/query_permission_checker/object_permission_checker.py @@ -1,14 +1,15 @@ from infrahub.auth import AccountSession from infrahub.core import registry -from infrahub.core.account import ObjectPermission +from infrahub.core.account import GlobalPermission, ObjectPermission from infrahub.core.branch import Branch -from infrahub.core.constants import GlobalPermissions, InfrahubKind, PermissionDecision +from infrahub.core.constants import GLOBAL_BRANCH_NAME, GlobalPermissions, InfrahubKind, PermissionDecision from infrahub.core.manager import get_schema from infrahub.core.schema.node_schema import NodeSchema from infrahub.database import InfrahubDatabase from infrahub.exceptions import PermissionDeniedError from infrahub.graphql.analyzer import InfrahubGraphQLQueryAnalyzer from infrahub.graphql.initialization import GraphqlParams +from infrahub.permissions.constants import PermissionDecisionFlag from infrahub.utils import extract_camelcase_words from .interface import CheckerResolution, GraphQLQueryPermissionCheckerInterface @@ -28,6 +29,13 @@ async def check( query_parameters: GraphqlParams, branch: Branch, ) -> CheckerResolution: + required_decision = ( + PermissionDecisionFlag.ALLOW_DEFAULT + if analyzed_query.branch is None + or analyzed_query.branch.name in (GLOBAL_BRANCH_NAME, registry.default_branch) + else PermissionDecisionFlag.ALLOW_OTHER + ) + kinds = await analyzed_query.get_models_in_use(types=query_parameters.context.types) # Identify which operations are performed. As we don't have a mapping between kinds and the @@ -47,21 +55,17 @@ async def check( actions.add(query_action) # Infer required permissions from the kind/operation map - permissions: list[str] = [] + permissions: list[ObjectPermission] = [] for action in actions: for kind in kinds: extracted_words = extract_camelcase_words(kind) permissions.append( - str( - # Create a object permission instance just to get its string representation - ObjectPermission( - id="", - branch=branch.name, - namespace=extracted_words[0], - name="".join(extracted_words[1:]), - action=action.lower(), - decision=PermissionDecision.ALLOW.value, - ) + ObjectPermission( + id="", + namespace=extracted_words[0], + name="".join(extracted_words[1:]), + action=action.lower(), + decision=required_decision, ) ) @@ -83,7 +87,9 @@ class AccountManagerPermissionChecker(GraphQLQueryPermissionCheckerInterface): This is similar to object permission checker except that we care for any operations on any account related kinds. """ - permission_required = f"global:{GlobalPermissions.MANAGE_ACCOUNTS.value}:{PermissionDecision.ALLOW.value}" + permission_required = GlobalPermission( + id="", name="", action=GlobalPermissions.MANAGE_ACCOUNTS.value, decision=PermissionDecision.ALLOW_ALL.value + ) async def supports(self, db: InfrahubDatabase, account_session: AccountSession, branch: Branch) -> bool: return account_session.authenticated @@ -132,7 +138,9 @@ class PermissionManagerPermissionChecker(GraphQLQueryPermissionCheckerInterface) This is similar to object permission checker except that we care for any operations on any permission related kinds. """ - permission_required = f"global:{GlobalPermissions.MANAGE_PERMISSIONS.value}:{PermissionDecision.ALLOW.value}" + permission_required = GlobalPermission( + id="", name="", action=GlobalPermissions.MANAGE_PERMISSIONS.value, decision=PermissionDecision.ALLOW_ALL.value + ) async def supports(self, db: InfrahubDatabase, account_session: AccountSession, branch: Branch) -> bool: return account_session.authenticated @@ -175,7 +183,9 @@ class RepositoryManagerPermissionChecker(GraphQLQueryPermissionCheckerInterface) This is similar to object permission checker except that we only care about mutations on repositories. """ - permission_required = f"global:{GlobalPermissions.MANAGE_REPOSITORIES.value}:{PermissionDecision.ALLOW.value}" + permission_required = GlobalPermission( + id="", name="", action=GlobalPermissions.MANAGE_REPOSITORIES.value, decision=PermissionDecision.ALLOW_ALL.value + ) async def supports(self, db: InfrahubDatabase, account_session: AccountSession, branch: Branch) -> bool: return account_session.authenticated diff --git a/backend/infrahub/graphql/auth/query_permission_checker/super_admin_checker.py b/backend/infrahub/graphql/auth/query_permission_checker/super_admin_checker.py index a0e5982b26..9871f2ade9 100644 --- a/backend/infrahub/graphql/auth/query_permission_checker/super_admin_checker.py +++ b/backend/infrahub/graphql/auth/query_permission_checker/super_admin_checker.py @@ -1,7 +1,8 @@ from infrahub.auth import AccountSession from infrahub.core import registry +from infrahub.core.account import GlobalPermission from infrahub.core.branch import Branch -from infrahub.core.constants import GlobalPermissions +from infrahub.core.constants import GlobalPermissions, PermissionDecision from infrahub.database import InfrahubDatabase from infrahub.graphql.analyzer import InfrahubGraphQLQueryAnalyzer from infrahub.graphql.initialization import GraphqlParams @@ -12,7 +13,9 @@ class SuperAdminPermissionChecker(GraphQLQueryPermissionCheckerInterface): """Checker allows a user to do anything (if the checker runs first).""" - permission_required = f"global:{GlobalPermissions.SUPER_ADMIN.value}:allow" + permission_required = GlobalPermission( + id="", name="", action=GlobalPermissions.SUPER_ADMIN.value, decision=PermissionDecision.ALLOW_ALL.value + ) async def supports(self, db: InfrahubDatabase, account_session: AccountSession, branch: Branch) -> bool: return account_session.authenticated diff --git a/backend/infrahub/graphql/enums.py b/backend/infrahub/graphql/enums.py index 6c4a80bda3..a664a39d19 100644 --- a/backend/infrahub/graphql/enums.py +++ b/backend/infrahub/graphql/enums.py @@ -16,7 +16,11 @@ def get_enum_attribute_type_name(node_schema: MainSchemaTypes, attr_schema: Attr def generate_graphql_enum(name: str, options: List[Any]) -> graphene.Enum: def description_func(value: Any) -> str: if value: - return value.value + try: + int(value.value) + return value.name + except ValueError: + return value.value return f"Enum for {name}" py_enum = generate_python_enum(name=name, options=options) diff --git a/backend/infrahub/graphql/mutations/proposed_change.py b/backend/infrahub/graphql/mutations/proposed_change.py index 449211140d..f3af9bde16 100644 --- a/backend/infrahub/graphql/mutations/proposed_change.py +++ b/backend/infrahub/graphql/mutations/proposed_change.py @@ -4,8 +4,16 @@ from graphql import GraphQLResolveInfo from infrahub import lock +from infrahub.core.account import GlobalPermission from infrahub.core.branch import Branch -from infrahub.core.constants import CheckType, GlobalPermissions, InfrahubKind, ProposedChangeState, ValidatorConclusion +from infrahub.core.constants import ( + CheckType, + GlobalPermissions, + InfrahubKind, + PermissionDecision, + ProposedChangeState, + ValidatorConclusion, +) from infrahub.core.diff.ipam_diff_parser import IpamDiffParser from infrahub.core.manager import NodeManager from infrahub.core.merge import BranchMerger @@ -101,7 +109,12 @@ async def mutate_update( # pylint: disable=too-many-branches if has_merge_permission := await permission_backend.has_permission( db=context.db, account_id=context.active_account_session.account_id, - permission=f"global:{GlobalPermissions.MERGE_PROPOSED_CHANGE.value}:allow", + permission=GlobalPermission( + id="", + name="", + action=GlobalPermissions.EDIT_DEFAULT_BRANCH.value, + decision=PermissionDecision.ALLOW_ALL.value, + ), branch=branch, ): break diff --git a/backend/infrahub/graphql/queries/account.py b/backend/infrahub/graphql/queries/account.py index 792f868c02..89f8296a25 100644 --- a/backend/infrahub/graphql/queries/account.py +++ b/backend/infrahub/graphql/queries/account.py @@ -150,7 +150,6 @@ async def resolve_account_permissions( { "node": { "id": obj.id, - "branch": obj.branch, "namespace": obj.namespace, "name": obj.name, "action": obj.action, diff --git a/backend/infrahub/graphql/types/attribute.py b/backend/infrahub/graphql/types/attribute.py index bc22d05cd8..f722fb8e91 100644 --- a/backend/infrahub/graphql/types/attribute.py +++ b/backend/infrahub/graphql/types/attribute.py @@ -7,6 +7,7 @@ from infrahub.core import registry +from .enums import PermissionDecision from .interface import InfrahubInterface @@ -56,7 +57,7 @@ class RelatedPrefixNodeInput(InputObjectType): class PermissionType(ObjectType): - update_value = String(required=False) + update_value = Field(PermissionDecision, required=False) class AttributeInterface(InfrahubInterface): diff --git a/backend/infrahub/graphql/types/permission.py b/backend/infrahub/graphql/types/permission.py index 2e92a1a12d..1708ab038f 100644 --- a/backend/infrahub/graphql/types/permission.py +++ b/backend/infrahub/graphql/types/permission.py @@ -2,15 +2,21 @@ from graphene import Field, Int, List, ObjectType, String -from .enums import PermissionDecision +from infrahub.graphql.types.enums import PermissionDecision class ObjectPermission(ObjectType): kind = Field(String, required=True, description="The kind this permission refers to.") - view = Field(PermissionDecision, required=True, description="Indicates if the account has the read permission") - create = Field(PermissionDecision, required=True, description="Indicates if the account has the create permission") - update = Field(PermissionDecision, required=True, description="Indicates if the account has the update permission") - delete = Field(PermissionDecision, required=True, description="Indicates if the account has the delete permission") + view = Field(PermissionDecision, required=True, description="Indicates the permission level for the read action.") + create = Field( + PermissionDecision, required=True, description="Indicates the permission level for the create action." + ) + update = Field( + PermissionDecision, required=True, description="Indicates the permission level for the update action." + ) + delete = Field( + PermissionDecision, required=True, description="Indicates the permission level for the delete action." + ) class ObjectPermissionNode(ObjectType): diff --git a/backend/infrahub/permissions/backend.py b/backend/infrahub/permissions/backend.py index ed404f6bc8..db4d7a57fc 100644 --- a/backend/infrahub/permissions/backend.py +++ b/backend/infrahub/permissions/backend.py @@ -4,9 +4,10 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: + from infrahub.core.account import GlobalPermission, ObjectPermission from infrahub.core.branch import Branch from infrahub.database import InfrahubDatabase - from infrahub.permissions.constants import AssignedPermissions + from infrahub.permissions.constants import AssignedPermissions, PermissionDecisionFlag class PermissionBackend(ABC): @@ -14,4 +15,11 @@ class PermissionBackend(ABC): async def load_permissions(self, db: InfrahubDatabase, account_id: str, branch: Branch) -> AssignedPermissions: ... @abstractmethod - async def has_permission(self, db: InfrahubDatabase, account_id: str, permission: str, branch: Branch) -> bool: ... + def report_object_permission( + self, permissions: list[ObjectPermission], namespace: str, name: str, action: str + ) -> PermissionDecisionFlag: ... + + @abstractmethod + async def has_permission( + self, db: InfrahubDatabase, account_id: str, permission: GlobalPermission | ObjectPermission, branch: Branch + ) -> bool: ... diff --git a/backend/infrahub/permissions/constants.py b/backend/infrahub/permissions/constants.py index 3dad122d62..4de88e3464 100644 --- a/backend/infrahub/permissions/constants.py +++ b/backend/infrahub/permissions/constants.py @@ -1,5 +1,6 @@ from __future__ import annotations +from enum import IntFlag from typing import TYPE_CHECKING, TypedDict if TYPE_CHECKING: @@ -9,3 +10,10 @@ class AssignedPermissions(TypedDict): global_permissions: list[GlobalPermission] object_permissions: list[ObjectPermission] + + +class PermissionDecisionFlag(IntFlag): + DENY = 1 + ALLOW_DEFAULT = 2 + ALLOW_OTHER = 4 + ALLOW_ALL = ALLOW_DEFAULT | ALLOW_OTHER diff --git a/backend/infrahub/permissions/local_backend.py b/backend/infrahub/permissions/local_backend.py index 76df5b334f..964ad090f0 100644 --- a/backend/infrahub/permissions/local_backend.py +++ b/backend/infrahub/permissions/local_backend.py @@ -4,6 +4,7 @@ from infrahub.core.account import GlobalPermission, ObjectPermission, fetch_permissions from infrahub.core.constants import GlobalPermissions, PermissionDecision +from infrahub.permissions.constants import PermissionDecisionFlag from .backend import PermissionBackend @@ -17,55 +18,65 @@ class LocalPermissionBackend(PermissionBackend): wildcard_values = ["*"] wildcard_actions = ["any"] - def compute_specificity(self, permission: ObjectPermission) -> int: + def _compute_specificity(self, permission: ObjectPermission) -> int: specificity = 0 - if permission.branch not in self.wildcard_values: - specificity += 1 if permission.namespace not in self.wildcard_values: specificity += 1 if permission.name not in self.wildcard_values: specificity += 1 if permission.action not in self.wildcard_actions: specificity += 1 + if not permission.decision & PermissionDecisionFlag.ALLOW_ALL: + specificity += 1 return specificity - def resolve_object_permission(self, permissions: list[ObjectPermission], permission_to_check: str) -> bool: - """Compute the permissions and check if the one provided is allowed.""" - if not permission_to_check.startswith("object:"): - return False - - most_specific_permission: str | None = None + def report_object_permission( + self, permissions: list[ObjectPermission], namespace: str, name: str, action: str + ) -> PermissionDecisionFlag: + """Given a set of permissions, return the permission decision for a given kind and action.""" highest_specificity: int = -1 - _, branch, namespace, name, action, _ = permission_to_check.split(":") + combined_decision = PermissionDecisionFlag.DENY for permission in permissions: if ( - permission.branch in [branch, *self.wildcard_values] - and permission.namespace in [namespace, *self.wildcard_values] + permission.namespace in [namespace, *self.wildcard_values] and permission.name in [name, *self.wildcard_values] and permission.action in [action, *self.wildcard_actions] ): + permission_decision = PermissionDecisionFlag(value=permission.decision) # Compute the specifity of a permission to keep the decision of the most specific if two or more permissions overlap - specificity = self.compute_specificity(permission=permission) + specificity = self._compute_specificity(permission=permission) if specificity > highest_specificity: - most_specific_permission = permission.decision + combined_decision = permission_decision highest_specificity = specificity - elif specificity == highest_specificity and permission.decision == PermissionDecision.DENY.value: - most_specific_permission = permission.decision + elif specificity == highest_specificity: + combined_decision |= permission_decision + + return combined_decision - return most_specific_permission == PermissionDecision.ALLOW.value + def resolve_object_permission( + self, permissions: list[ObjectPermission], permission_to_check: ObjectPermission + ) -> bool: + """Compute the permissions and check if the one provided is allowed.""" + required_decision = PermissionDecisionFlag(value=permission_to_check.decision) + combined_decision = self.report_object_permission( + permissions=permissions, + namespace=permission_to_check.namespace, + name=permission_to_check.name, + action=permission_to_check.action, + ) - def resolve_global_permission(self, permissions: list[GlobalPermission], permission_to_check: str) -> bool: - if not permission_to_check.startswith("global:"): - return False + return combined_decision & required_decision == required_decision - _, action, _ = permission_to_check.split(":") + def resolve_global_permission( + self, permissions: list[GlobalPermission], permission_to_check: GlobalPermission + ) -> bool: grant_permission = False for permission in permissions: - if permission.action == action: + if permission.action == permission_to_check.action: # Early exit on deny as deny preempt allow - if permission.decision == PermissionDecision.DENY.value: + if permission.decision == PermissionDecisionFlag.DENY: return False grant_permission = True @@ -74,19 +85,28 @@ def resolve_global_permission(self, permissions: list[GlobalPermission], permiss async def load_permissions(self, db: InfrahubDatabase, account_id: str, branch: Branch) -> AssignedPermissions: return await fetch_permissions(db=db, account_id=account_id, branch=branch) - async def has_permission(self, db: InfrahubDatabase, account_id: str, permission: str, branch: Branch) -> bool: + async def has_permission( + self, db: InfrahubDatabase, account_id: str, permission: GlobalPermission | ObjectPermission, branch: Branch + ) -> bool: granted_permissions = await self.load_permissions(db=db, account_id=account_id, branch=branch) + is_super_admin = self.resolve_global_permission( + permissions=granted_permissions["global_permissions"], + permission_to_check=GlobalPermission( + id="", name="", action=GlobalPermissions.SUPER_ADMIN, decision=PermissionDecision.ALLOW_ALL + ), + ) - # Check for a final super admin permission at the end if no permissions have matched before - return ( - self.resolve_global_permission( - permissions=granted_permissions["global_permissions"], permission_to_check=permission + if isinstance(permission, GlobalPermission): + return ( + self.resolve_global_permission( + permissions=granted_permissions["global_permissions"], permission_to_check=permission + ) + or is_super_admin ) - or self.resolve_object_permission( + + return ( + self.resolve_object_permission( permissions=granted_permissions["object_permissions"], permission_to_check=permission ) - or self.resolve_global_permission( - permissions=granted_permissions["global_permissions"], - permission_to_check=f"global:{GlobalPermissions.SUPER_ADMIN.value}:{PermissionDecision.ALLOW.value}", - ) + or is_super_admin ) diff --git a/backend/infrahub/permissions/report.py b/backend/infrahub/permissions/report.py index fc9427cee0..052410554e 100644 --- a/backend/infrahub/permissions/report.py +++ b/backend/infrahub/permissions/report.py @@ -2,8 +2,9 @@ from typing import TYPE_CHECKING +from infrahub.core.account import GlobalPermission from infrahub.core.constants import GlobalPermissions, PermissionDecision -from infrahub.core.registry import registry +from infrahub.permissions.constants import AssignedPermissions, PermissionDecisionFlag from infrahub.permissions.local_backend import LocalPermissionBackend if TYPE_CHECKING: @@ -11,57 +12,91 @@ from infrahub.core.branch import Branch from infrahub.core.schema import MainSchemaTypes from infrahub.database import InfrahubDatabase + from infrahub.permissions.backend import PermissionBackend from infrahub.permissions.types import KindPermissions +def get_permission_report( + backend: PermissionBackend, + permissions: AssignedPermissions, + node: MainSchemaTypes, + action: str, + is_super_admin: bool = False, + can_edit_default_branch: bool = False, # pylint: disable=unused-argument +) -> PermissionDecisionFlag: + if is_super_admin: + return PermissionDecisionFlag.ALLOW_ALL + + decision = backend.report_object_permission( + permissions=permissions["object_permissions"], namespace=node.namespace, name=node.name, action=action + ) + + # What do we do if edit default branch global permission is set? + # if can_edit_default_branch: + # decision |= PermissionDecisionFlag.ALLOW_DEFAULT + + return decision + + async def report_schema_permissions( db: InfrahubDatabase, schemas: list[MainSchemaTypes], account_session: AccountSession, branch: Branch ) -> list[KindPermissions]: perm_backend = LocalPermissionBackend() permissions = await perm_backend.load_permissions(db=db, account_id=account_session.account_id, branch=branch) - # Check for super admin permission and handle default branch edition if account is not super admin is_super_admin = perm_backend.resolve_global_permission( permissions=permissions["global_permissions"], - permission_to_check=f"global:{GlobalPermissions.SUPER_ADMIN.value}:allow", + permission_to_check=GlobalPermission( + id="", name="", action=GlobalPermissions.SUPER_ADMIN.value, decision=PermissionDecision.ALLOW_ALL.value + ), + ) + can_edit_default_branch = perm_backend.resolve_global_permission( + permissions=permissions["global_permissions"], + permission_to_check=GlobalPermission( + id="", + name="", + action=GlobalPermissions.EDIT_DEFAULT_BRANCH.value, + decision=PermissionDecision.ALLOW_ALL.value, + ), ) - restrict_changes = False - if branch.name == registry.default_branch and not is_super_admin: - restrict_changes = not perm_backend.resolve_global_permission( - permissions=permissions["global_permissions"], - permission_to_check=f"global:{GlobalPermissions.EDIT_DEFAULT_BRANCH.value}:allow", - ) permission_objects: list[KindPermissions] = [] for node in schemas: - permission_base = f"object:{branch.name}:{node.namespace}:{node.name}" - - has_create = perm_backend.resolve_object_permission( - permissions=permissions["object_permissions"], permission_to_check=f"{permission_base}:create:allow" - ) - has_delete = perm_backend.resolve_object_permission( - permissions=permissions["object_permissions"], permission_to_check=f"{permission_base}:delete:allow" - ) - has_update = perm_backend.resolve_object_permission( - permissions=permissions["object_permissions"], permission_to_check=f"{permission_base}:update:allow" - ) - has_view = perm_backend.resolve_object_permission( - permissions=permissions["object_permissions"], permission_to_check=f"{permission_base}:view:allow" - ) - permission_objects.append( { "kind": node.kind, - "create": PermissionDecision.ALLOW - if is_super_admin or (has_create and not restrict_changes) - else PermissionDecision.DENY, - "delete": PermissionDecision.ALLOW - if is_super_admin or (has_delete and not restrict_changes) - else PermissionDecision.DENY, - "update": PermissionDecision.ALLOW - if is_super_admin or (has_update and not restrict_changes) - else PermissionDecision.DENY, - "view": PermissionDecision.ALLOW if is_super_admin or has_view else PermissionDecision.DENY, + "create": get_permission_report( + backend=perm_backend, + permissions=permissions, + node=node, + action="create", + is_super_admin=is_super_admin, + can_edit_default_branch=can_edit_default_branch, + ), + "delete": get_permission_report( + backend=perm_backend, + permissions=permissions, + node=node, + action="delete", + is_super_admin=is_super_admin, + can_edit_default_branch=can_edit_default_branch, + ), + "update": get_permission_report( + backend=perm_backend, + permissions=permissions, + node=node, + action="update", + is_super_admin=is_super_admin, + can_edit_default_branch=can_edit_default_branch, + ), + "view": get_permission_report( + backend=perm_backend, + permissions=permissions, + node=node, + action="view", + is_super_admin=is_super_admin, + can_edit_default_branch=can_edit_default_branch, + ), } ) diff --git a/backend/infrahub/permissions/types.py b/backend/infrahub/permissions/types.py index 097ed1602e..178b3f9433 100644 --- a/backend/infrahub/permissions/types.py +++ b/backend/infrahub/permissions/types.py @@ -1,14 +1,30 @@ from __future__ import annotations +from dataclasses import dataclass from typing import TYPE_CHECKING, TypedDict if TYPE_CHECKING: - from infrahub.core.constants import PermissionDecision + from infrahub.permissions.constants import PermissionDecisionFlag class KindPermissions(TypedDict): kind: str - create: PermissionDecision - delete: PermissionDecision - update: PermissionDecision - view: PermissionDecision + create: PermissionDecisionFlag + delete: PermissionDecisionFlag + update: PermissionDecisionFlag + view: PermissionDecisionFlag + + +@dataclass +class GlobalPermissionToVerify: + name: str + action: str + decision: PermissionDecisionFlag + + +@dataclass +class ObjectPermissionToVerify: + namespace: str + name: str + action: str + decision: PermissionDecisionFlag diff --git a/backend/tests/unit/conftest.py b/backend/tests/unit/conftest.py index 8639921bf4..1b807f2322 100644 --- a/backend/tests/unit/conftest.py +++ b/backend/tests/unit/conftest.py @@ -2543,7 +2543,7 @@ async def create_test_admin(db: InfrahubDatabase, register_core_models_schema, d db=db, name=format_label(GlobalPermissions.SUPER_ADMIN.value), action=GlobalPermissions.SUPER_ADMIN.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_ALL.value, ) await global_permission.save(db=db) permissions.append(global_permission) @@ -2551,11 +2551,10 @@ async def create_test_admin(db: InfrahubDatabase, register_core_models_schema, d object_permission = await Node.init(db=db, schema=InfrahubKind.OBJECTPERMISSION) await object_permission.new( db=db, - branch="*", namespace="*", name="*", action=PermissionAction.ANY.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_ALL.value, ) await object_permission.save(db=db) permissions.append(object_permission) diff --git a/backend/tests/unit/core/test_enums.py b/backend/tests/unit/core/test_enums.py index b3d1d3b110..9f65b6534d 100644 --- a/backend/tests/unit/core/test_enums.py +++ b/backend/tests/unit/core/test_enums.py @@ -18,5 +18,5 @@ def test_generate_python_enum_with_integers(): enum_two = enum_class(2) assert isinstance(enum_two, enum.Enum) - assert {enum.name for enum in enum_class} == {"2", "5", "14"} + assert {enum.name for enum in enum_class} == {"Value_2", "Value_5", "Value_14"} assert {enum.value for enum in enum_class} == {2, 5, 14} diff --git a/backend/tests/unit/graphql/auth/query_permission_checker/test_object_permission_checker.py b/backend/tests/unit/graphql/auth/query_permission_checker/test_object_permission_checker.py index ab0a12665e..8ef865e5e0 100644 --- a/backend/tests/unit/graphql/auth/query_permission_checker/test_object_permission_checker.py +++ b/backend/tests/unit/graphql/auth/query_permission_checker/test_object_permission_checker.py @@ -233,25 +233,22 @@ async def test_setup( for object_permission in [ ObjectPermission( id="", - branch="main", namespace="Builtin", name="*", action=PermissionAction.ANY.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_DEFAULT.value, ), ObjectPermission( id="", - branch="main", namespace="Core", name="GraphQLQuery", action=PermissionAction.VIEW.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_DEFAULT.value, ), ]: obj = await Node.init(db=db, schema=InfrahubKind.OBJECTPERMISSION) await obj.new( db=db, - branch=object_permission.branch, namespace=object_permission.namespace, name=object_permission.name, action=object_permission.action, @@ -307,10 +304,7 @@ async def test_first_account_repos(self, db: InfrahubDatabase, permissions_helpe auth_type=AuthType.JWT, ) - with pytest.raises( - PermissionDeniedError, - match="You do not have the following permission: object:main:Core:Repository:view:allow", - ): + with pytest.raises(PermissionDeniedError, match=r":Repository:view:"): await perms.check( db=db, account_session=session, @@ -357,7 +351,7 @@ async def test_first_account_graphql_and_repos( auth_type=AuthType.JWT, ) - with pytest.raises(PermissionDeniedError, match="Repository:view:allow"): + with pytest.raises(PermissionDeniedError, match=r"Repository:view:"): await perms.check( db=db, account_session=session, diff --git a/backend/tests/unit/graphql/queries/test_list_permissions.py b/backend/tests/unit/graphql/queries/test_list_permissions.py index 9d5cb2a69b..b29ceed6e2 100644 --- a/backend/tests/unit/graphql/queries/test_list_permissions.py +++ b/backend/tests/unit/graphql/queries/test_list_permissions.py @@ -7,15 +7,13 @@ from infrahub.auth import AccountSession, AuthType from infrahub.core.account import ObjectPermission -from infrahub.core.constants import ( - InfrahubKind, - PermissionAction, - PermissionDecision, -) +from infrahub.core.constants import InfrahubKind, PermissionAction from infrahub.core.initialization import create_branch from infrahub.core.node import Node from infrahub.core.registry import registry from infrahub.graphql.initialization import prepare_graphql_params +from infrahub.graphql.types.permission import PermissionDecision +from infrahub.permissions.constants import PermissionDecisionFlag from infrahub.permissions.local_backend import LocalPermissionBackend if TYPE_CHECKING: @@ -82,41 +80,43 @@ async def test_setup( for object_permission in [ ObjectPermission( id="", - branch="*", namespace="Builtin", name="*", action=PermissionAction.VIEW.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecisionFlag.ALLOW_ALL, ), ObjectPermission( id="", - branch="*", namespace="Builtin", name="*", action=PermissionAction.CREATE.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecisionFlag.ALLOW_OTHER, ), ObjectPermission( id="", - branch="*", namespace="Builtin", name="*", action=PermissionAction.DELETE.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecisionFlag.ALLOW_OTHER, ), ObjectPermission( id="", - branch="*", namespace="Core", name="*", action=PermissionAction.ANY.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecisionFlag.ALLOW_OTHER, + ), + ObjectPermission( + id="", + namespace="Core", + name="*", + action=PermissionAction.VIEW.value, + decision=PermissionDecisionFlag.ALLOW_ALL, ), ]: obj = await Node.init(db=db, schema=InfrahubKind.OBJECTPERMISSION) await obj.new( db=db, - branch=object_permission.branch, namespace=object_permission.namespace, name=object_permission.name, action=object_permission.action, @@ -136,31 +136,28 @@ async def test_setup( await group.members.add(db=db, data={"id": first_account.id}) await group.members.save(db=db) - async def test_first_account_tags_main_branch( - self, db: InfrahubDatabase, permissions_helper: PermissionsHelper - ) -> None: - """In the main branch the first account doesn't have the permission to make changes""" + async def test_first_account_tags(self, db: InfrahubDatabase, permissions_helper: PermissionsHelper) -> None: + """In the main branch the first account doesn't have the permission to make changes, but it has in the other branches""" session = AccountSession( - authenticated=True, - account_id=permissions_helper.first.id, - session_id=str(uuid4()), - auth_type=AuthType.JWT, + authenticated=True, account_id=permissions_helper.first.id, session_id=str(uuid4()), auth_type=AuthType.JWT ) gql_params = prepare_graphql_params( db=db, include_mutation=True, branch=permissions_helper.default_branch, account_session=session ) - result = await graphql( - schema=gql_params.schema, - source=QUERY_TAGS, - context_value=gql_params.context, - ) + result = await graphql(schema=gql_params.schema, source=QUERY_TAGS, context_value=gql_params.context) assert not result.errors assert result.data assert result.data["BuiltinTag"]["permissions"]["count"] == 1 assert result.data["BuiltinTag"]["permissions"]["edges"][0] == { - "node": {"kind": "BuiltinTag", "create": "DENY", "update": "DENY", "delete": "DENY", "view": "ALLOW"} + "node": { + "kind": "BuiltinTag", + "create": PermissionDecision.ALLOW_OTHER.name, + "update": PermissionDecision.DENY.name, + "delete": PermissionDecision.ALLOW_OTHER.name, + "view": PermissionDecision.ALLOW_ALL.name, + } } async def test_first_account_tags_non_main_branch( @@ -169,24 +166,21 @@ async def test_first_account_tags_non_main_branch( """In other branches the permissions for the first account is less restrictive""" branch2 = await create_branch(branch_name="pr-12345", db=db) session = AccountSession( - authenticated=True, - account_id=permissions_helper.first.id, - session_id=str(uuid4()), - auth_type=AuthType.JWT, + authenticated=True, account_id=permissions_helper.first.id, session_id=str(uuid4()), auth_type=AuthType.JWT ) gql_params = prepare_graphql_params(db=db, include_mutation=True, branch=branch2, account_session=session) - - result = await graphql( - schema=gql_params.schema, - source=QUERY_TAGS, - context_value=gql_params.context, - ) - + result = await graphql(schema=gql_params.schema, source=QUERY_TAGS, context_value=gql_params.context) assert not result.errors assert result.data assert result.data["BuiltinTag"]["permissions"]["count"] == 1 assert result.data["BuiltinTag"]["permissions"]["edges"][0] == { - "node": {"kind": "BuiltinTag", "create": "ALLOW", "update": "DENY", "delete": "ALLOW", "view": "ALLOW"} + "node": { + "kind": "BuiltinTag", + "create": PermissionDecision.ALLOW_OTHER.name, + "update": PermissionDecision.DENY.name, + "delete": PermissionDecision.ALLOW_OTHER.name, + "view": PermissionDecision.ALLOW_ALL.name, + } } async def test_first_account_list_permissions_for_generics( @@ -194,10 +188,7 @@ async def test_first_account_list_permissions_for_generics( ) -> None: """In the main branch the first account doesn't have the permission to make changes""" session = AccountSession( - authenticated=True, - account_id=permissions_helper.first.id, - session_id=str(uuid4()), - auth_type=AuthType.JWT, + authenticated=True, account_id=permissions_helper.first.id, session_id=str(uuid4()), auth_type=AuthType.JWT ) gql_params = prepare_graphql_params( db=db, include_mutation=True, branch=permissions_helper.default_branch, account_session=session @@ -215,28 +206,28 @@ async def test_first_account_list_permissions_for_generics( assert { "node": { "kind": "CoreGenericRepository", - "create": "DENY", - "update": "DENY", - "delete": "DENY", - "view": "ALLOW", + "create": PermissionDecision.ALLOW_OTHER.name, + "update": PermissionDecision.ALLOW_OTHER.name, + "delete": PermissionDecision.ALLOW_OTHER.name, + "view": PermissionDecision.ALLOW_ALL.name, } } in result.data["CoreGenericRepository"]["permissions"]["edges"] assert { "node": { "kind": "CoreRepository", - "create": "DENY", - "update": "DENY", - "delete": "DENY", - "view": "ALLOW", + "create": PermissionDecision.ALLOW_OTHER.name, + "update": PermissionDecision.ALLOW_OTHER.name, + "delete": PermissionDecision.ALLOW_OTHER.name, + "view": PermissionDecision.ALLOW_ALL.name, } } in result.data["CoreGenericRepository"]["permissions"]["edges"] assert { "node": { "kind": "CoreReadOnlyRepository", - "create": "DENY", - "update": "DENY", - "delete": "DENY", - "view": "ALLOW", + "create": PermissionDecision.ALLOW_OTHER.name, + "update": PermissionDecision.ALLOW_OTHER.name, + "delete": PermissionDecision.ALLOW_OTHER.name, + "view": PermissionDecision.ALLOW_ALL.name, } } in result.data["CoreGenericRepository"]["permissions"]["edges"] @@ -277,41 +268,36 @@ async def test_setup( for object_permission in [ ObjectPermission( id="", - branch="*", namespace="Builtin", name="*", action=PermissionAction.VIEW.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecisionFlag.ALLOW_ALL, ), ObjectPermission( id="", - branch="*", namespace="Builtin", name="*", action=PermissionAction.CREATE.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecisionFlag.ALLOW_ALL, ), ObjectPermission( id="", - branch="*", namespace="Builtin", name="*", action=PermissionAction.DELETE.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecisionFlag.ALLOW_ALL, ), ObjectPermission( id="", - branch="pr-12345", namespace="Builtin", name="*", action=PermissionAction.UPDATE.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecisionFlag.ALLOW_OTHER, ), ]: obj = await Node.init(db=db, schema=InfrahubKind.OBJECTPERMISSION) await obj.new( db=db, - branch=object_permission.branch, namespace=object_permission.namespace, name=object_permission.name, action=object_permission.action, @@ -355,7 +341,7 @@ async def test_first_account_tags_main_branch( assert result.data assert result.data["BuiltinTag"]["count"] == 1 assert result.data["BuiltinTag"]["edges"][0]["node"]["name"]["permissions"] == { - "update_value": PermissionDecision.DENY + "update_value": PermissionDecision.ALLOW_OTHER.name } async def test_first_account_tags_non_main_branch( @@ -377,5 +363,5 @@ async def test_first_account_tags_non_main_branch( assert result.data assert result.data["BuiltinTag"]["count"] == 1 assert result.data["BuiltinTag"]["edges"][0]["node"]["name"]["permissions"] == { - "update_value": PermissionDecision.ALLOW + "update_value": PermissionDecision.ALLOW_OTHER.name } diff --git a/backend/tests/unit/graphql/test_core_account.py b/backend/tests/unit/graphql/test_core_account.py index 14a0a80c28..6bd79b9894 100644 --- a/backend/tests/unit/graphql/test_core_account.py +++ b/backend/tests/unit/graphql/test_core_account.py @@ -91,7 +91,7 @@ async def test_permissions( id="", name=GlobalPermissions.SUPER_ADMIN.value, action=GlobalPermissions.SUPER_ADMIN.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_ALL.value, ) ) ] @@ -101,11 +101,10 @@ async def test_permissions( str( ObjectPermission( id="", - branch="*", namespace="*", name="*", action=PermissionAction.ANY.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_ALL.value, ) ) ] diff --git a/backend/tests/unit/permissions/test_backends.py b/backend/tests/unit/permissions/test_backends.py index f222399083..826fb375e0 100644 --- a/backend/tests/unit/permissions/test_backends.py +++ b/backend/tests/unit/permissions/test_backends.py @@ -5,6 +5,7 @@ from infrahub.core.protocols import CoreAccount from infrahub.database import InfrahubDatabase from infrahub.permissions import LocalPermissionBackend +from infrahub.permissions.constants import PermissionDecisionFlag async def test_load_permissions(db: InfrahubDatabase, default_branch: Branch, create_test_admin, first_account): @@ -19,11 +20,10 @@ async def test_load_permissions(db: InfrahubDatabase, default_branch: Branch, cr assert str(permissions["object_permissions"][0]) == str( ObjectPermission( id="", - branch="*", namespace="*", name="*", action=PermissionAction.ANY.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_ALL.value, ) ) @@ -49,7 +49,7 @@ async def test_has_permission_global( allow_default_branch_edition = GlobalPermission( id="", action=GlobalPermissions.EDIT_DEFAULT_BRANCH.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_ALL.value, name="Edit default branch", ) @@ -102,10 +102,10 @@ async def test_has_permission_global( await group2.members.save(db=db) assert await backend.has_permission( - db=db, account_id=first_account.id, permission=str(allow_default_branch_edition), branch=default_branch + db=db, account_id=first_account.id, permission=allow_default_branch_edition, branch=default_branch ) assert not await backend.has_permission( - db=db, account_id=second_account.id, permission=str(allow_default_branch_edition), branch=default_branch + db=db, account_id=second_account.id, permission=allow_default_branch_edition, branch=default_branch ) @@ -123,15 +123,13 @@ async def test_has_permission_object( for p in [ ObjectPermission( id="", - branch="*", namespace="*", name="*", action=PermissionAction.ANY.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_ALL.value, ), ObjectPermission( id="", - branch="*", namespace="Builtin", name="Tag", action=PermissionAction.ANY.value, @@ -139,7 +137,7 @@ async def test_has_permission_object( ), ]: obj = await Node.init(db=db, schema=InfrahubKind.OBJECTPERMISSION) - await obj.new(db=db, branch=p.branch, namespace=p.namespace, name=p.name, action=p.action, decision=p.decision) + await obj.new(db=db, namespace=p.namespace, name=p.name, action=p.action, decision=p.decision) await obj.save(db=db) role1_permissions.append(obj) @@ -158,7 +156,6 @@ async def test_has_permission_object( for p in [ ObjectPermission( id="", - branch="*", namespace="*", name="*", action=PermissionAction.ANY.value, @@ -166,15 +163,14 @@ async def test_has_permission_object( ), ObjectPermission( id="", - branch="*", namespace="Builtin", name="Tag", action=PermissionAction.ANY.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_ALL.value, ), ]: obj = await Node.init(db=db, schema=InfrahubKind.OBJECTPERMISSION) - await obj.new(db=db, branch=p.branch, namespace=p.namespace, name=p.name, action=p.action, decision=p.decision) + await obj.new(db=db, namespace=p.namespace, name=p.name, action=p.action, decision=p.decision) await obj.save(db=db) role2_permissions.append(obj) @@ -191,15 +187,121 @@ async def test_has_permission_object( permission = ObjectPermission( id="", - branch="*", namespace="Builtin", name="Tag", action=PermissionAction.CREATE.value, - decision=PermissionDecision.ALLOW.value, + decision=PermissionDecision.ALLOW_ALL.value, ) assert not await backend.has_permission( - db=db, account_id=first_account.id, permission=str(permission), branch=default_branch + db=db, account_id=first_account.id, permission=permission, branch=default_branch ) assert await backend.has_permission( - db=db, account_id=second_account.id, permission=str(permission), branch=default_branch + db=db, account_id=second_account.id, permission=permission, branch=default_branch + ) + + +async def test_report_permission_object( + db: InfrahubDatabase, + default_branch: Branch, + register_core_models_schema: None, + create_test_admin: CoreAccount, + first_account: CoreAccount, + second_account: CoreAccount, +): + backend = LocalPermissionBackend() + + role1_permissions = [] + for p in [ + ObjectPermission( + id="", + namespace="*", + name="*", + action=PermissionAction.ANY.value, + decision=PermissionDecision.ALLOW_ALL.value, + ), + ObjectPermission( + id="", + namespace="Builtin", + name="Tag", + action=PermissionAction.ANY.value, + decision=PermissionDecision.DENY.value, + ), + ]: + obj = await Node.init(db=db, schema=InfrahubKind.OBJECTPERMISSION) + await obj.new(db=db, namespace=p.namespace, name=p.name, action=p.action, decision=p.decision) + await obj.save(db=db) + role1_permissions.append(obj) + + role1 = await Node.init(db=db, schema=InfrahubKind.ACCOUNTROLE) + await role1.new(db=db, name="anything but tags", permissions=role1_permissions) + await role1.save(db=db) + + group1 = await Node.init(db=db, schema=InfrahubKind.ACCOUNTGROUP) + await group1.new(db=db, name="group1", roles=[role1]) + await group1.save(db=db) + + await group1.members.add(db=db, data={"id": first_account.id}) + await group1.members.save(db=db) + + role2_permissions = [] + for p in [ + ObjectPermission( + id="", + namespace="*", + name="*", + action=PermissionAction.ANY.value, + decision=PermissionDecision.DENY.value, + ), + ObjectPermission( + id="", + namespace="Builtin", + name="Tag", + action=PermissionAction.ANY.value, + decision=PermissionDecision.ALLOW_ALL.value, + ), + ]: + obj = await Node.init(db=db, schema=InfrahubKind.OBJECTPERMISSION) + await obj.new(db=db, namespace=p.namespace, name=p.name, action=p.action, decision=p.decision) + await obj.save(db=db) + role2_permissions.append(obj) + + role2 = await Node.init(db=db, schema=InfrahubKind.ACCOUNTROLE) + await role2.new(db=db, name="only tags", permissions=role2_permissions) + await role2.save(db=db) + + group2 = await Node.init(db=db, schema=InfrahubKind.ACCOUNTGROUP) + await group2.new(db=db, name="group2", roles=[role2]) + await group2.save(db=db) + + await group2.members.add(db=db, data={"id": second_account.id}) + await group2.members.save(db=db) + + first_permissions = await backend.load_permissions(db=db, account_id=first_account.id, branch=default_branch) + + assert ( + backend.report_object_permission( + permissions=first_permissions["object_permissions"], namespace="Builtin", name="Tag", action="create" + ) + == PermissionDecisionFlag.DENY + ) + assert ( + backend.report_object_permission( + permissions=first_permissions["object_permissions"], namespace="Core", name="Account", action="view" + ) + == PermissionDecisionFlag.ALLOW_ALL + ) + + second_permissions = await backend.load_permissions(db=db, account_id=second_account.id, branch=default_branch) + + assert ( + backend.report_object_permission( + permissions=second_permissions["object_permissions"], namespace="Builtin", name="Tag", action="create" + ) + == PermissionDecisionFlag.ALLOW_ALL + ) + assert ( + backend.report_object_permission( + permissions=second_permissions["object_permissions"], namespace="Core", name="Account", action="view" + ) + == PermissionDecisionFlag.DENY )