Skip to content

Commit

Permalink
IFC-668 Add manage account related objects permission (#4534)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmazoyer authored Oct 7, 2024
1 parent ebd9633 commit 30f4ddb
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 7 deletions.
1 change: 1 addition & 0 deletions backend/infrahub/core/constants/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class GlobalPermissions(InfrahubStringEnum):
SUPER_ADMIN = "super_admin"
MERGE_BRANCH = "merge_branch"
MERGE_PROPOSED_CHANGE = "merge_proposed_change"
MANAGE_ACCOUNTS = "manage_accounts"


class PermissionAction(InfrahubStringEnum):
Expand Down
6 changes: 5 additions & 1 deletion backend/infrahub/graphql/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from ..auth.query_permission_checker.default_branch_checker import DefaultBranchPermissionChecker
from ..auth.query_permission_checker.default_checker import DefaultGraphQLPermissionChecker
from ..auth.query_permission_checker.merge_operation_checker import MergeBranchPermissionChecker
from ..auth.query_permission_checker.object_permission_checker import ObjectPermissionChecker
from ..auth.query_permission_checker.object_permission_checker import (
AccountManagerPermissionChecker,
ObjectPermissionChecker,
)
from ..auth.query_permission_checker.read_only_checker import ReadOnlyGraphQLPermissionChecker
from ..auth.query_permission_checker.read_write_checker import ReadWriteGraphQLPermissionChecker
from ..auth.query_permission_checker.super_admin_checker import SuperAdminPermissionChecker
Expand All @@ -25,6 +28,7 @@ def build_graphql_query_permission_checker() -> GraphQLQueryPermissionChecker:
SuperAdminPermissionChecker(),
DefaultBranchPermissionChecker(),
MergeBranchPermissionChecker(),
AccountManagerPermissionChecker(),
ObjectPermissionChecker(),
ReadWriteGraphQLPermissionChecker(), # Deprecated, will be replace by either a global permission or object permissions
ReadOnlyGraphQLPermissionChecker(), # Deprecated, will be replace by either a global permission or object permissions
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from infrahub.auth import AccountSession
from infrahub.core import registry
from infrahub.core.branch import Branch
from infrahub.core.constants import GLOBAL_BRANCH_NAME
from infrahub.core.constants import GLOBAL_BRANCH_NAME, GlobalPermissions, PermissionDecision
from infrahub.database import InfrahubDatabase
from infrahub.exceptions import PermissionDeniedError
from infrahub.graphql.analyzer import InfrahubGraphQLQueryAnalyzer
Expand All @@ -11,7 +11,9 @@


class DefaultBranchPermissionChecker(GraphQLQueryPermissionCheckerInterface):
permission_required = "global:edit_default_branch:allow"
"""Checker that makes sure a user account can edit data in the default branch."""

permission_required = f"global:{GlobalPermissions.EDIT_DEFAULT_BRANCH.value}:{PermissionDecision.ALLOW.value}"
exempt_operations = ["BranchCreate"]

async def supports(self, db: InfrahubDatabase, account_session: AccountSession, branch: Branch) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@


class MergeBranchPermissionChecker(GraphQLQueryPermissionCheckerInterface):
"""Checker that makes sure a user account can merge a branch without going through a proposed change."""

permission_required = f"global:{GlobalPermissions.MERGE_BRANCH.value}:allow"

async def supports(self, db: InfrahubDatabase, account_session: AccountSession, branch: Branch) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from infrahub.core import registry
from infrahub.core.account import ObjectPermission
from infrahub.core.branch import Branch
from infrahub.core.constants import PermissionDecision
from infrahub.core.constants import GlobalPermissions, InfrahubKind, PermissionDecision
from infrahub.core.manager import get_schema
from infrahub.core.schema.node_schema import NodeSchema
from infrahub.database import InfrahubDatabase
from infrahub.exceptions import PermissionDeniedError
from infrahub.graphql.analyzer import InfrahubGraphQLQueryAnalyzer
Expand All @@ -13,6 +15,8 @@


class ObjectPermissionChecker(GraphQLQueryPermissionCheckerInterface):
"""Checker that makes sure a user account can perform some action on some kind of objects."""

async def supports(self, db: InfrahubDatabase, account_session: AccountSession, branch: Branch) -> bool:
return account_session.authenticated

Expand Down Expand Up @@ -71,3 +75,52 @@ async def check(
raise PermissionDeniedError(f"You do not have the following permission: {permission}")

return CheckerResolution.NEXT_CHECKER


class AccountManagerPermissionChecker(GraphQLQueryPermissionCheckerInterface):
"""Checker that makes sure a user account can perform actions on account related objects.
This is similar to object permission checker except that we care for any operations on any account related kinds.
"""

permission_required = f"global:{GlobalPermissions.MANAGE_ACCOUNTS.value}:{PermissionDecision.ALLOW.value}"

async def supports(self, db: InfrahubDatabase, account_session: AccountSession, branch: Branch) -> bool:
return account_session.authenticated

async def check(
self,
db: InfrahubDatabase,
account_session: AccountSession,
analyzed_query: InfrahubGraphQLQueryAnalyzer,
query_parameters: GraphqlParams,
branch: Branch,
) -> CheckerResolution:
is_account_operation = False
kinds = await analyzed_query.get_models_in_use(types=query_parameters.context.types)
operation_names = [operation.name for operation in analyzed_query.operations]

for kind in kinds:
schema = get_schema(db=db, branch=branch, node_schema=kind)
if is_account_operation := kind in (
InfrahubKind.GENERICACCOUNT,
InfrahubKind.ACCOUNTGROUP,
InfrahubKind.ACCOUNTROLE,
) or (isinstance(schema, NodeSchema) and InfrahubKind.GENERICACCOUNT in schema.inherit_from):
break

# Ignore non-account related operation or viewing account own profile
if not is_account_operation or operation_names == ["AccountProfile"]:
return CheckerResolution.NEXT_CHECKER

has_permission = False
for permission_backend in registry.permission_backends:
if has_permission := await permission_backend.has_permission(
db=db, account_id=account_session.account_id, permission=self.permission_required, branch=branch
):
break

if not has_permission and analyzed_query.contains_mutation:
raise PermissionDeniedError("You do not have the permission to manage user accounts, groups or roles")

return CheckerResolution.NEXT_CHECKER
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@


class SuperAdminPermissionChecker(GraphQLQueryPermissionCheckerInterface):
"""Checker allows a user to do anything (if the checker runs first)."""

permission_required = f"global:{GlobalPermissions.SUPER_ADMIN.value}:allow"

async def supports(self, db: InfrahubDatabase, account_session: AccountSession, branch: Branch) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from infrahub.auth import AccountSession, AuthType
from infrahub.core.account import ObjectPermission
from infrahub.core.constants import (
AccountRole,
GlobalPermissions,
InfrahubKind,
PermissionAction,
PermissionDecision,
Expand All @@ -16,7 +18,11 @@
from infrahub.core.registry import registry
from infrahub.exceptions import PermissionDeniedError
from infrahub.graphql.analyzer import InfrahubGraphQLQueryAnalyzer
from infrahub.graphql.auth.query_permission_checker.object_permission_checker import ObjectPermissionChecker
from infrahub.graphql.auth.query_permission_checker.interface import CheckerResolution
from infrahub.graphql.auth.query_permission_checker.object_permission_checker import (
AccountManagerPermissionChecker,
ObjectPermissionChecker,
)
from infrahub.graphql.initialization import prepare_graphql_params
from infrahub.permissions.local_backend import LocalPermissionBackend

Expand Down Expand Up @@ -80,6 +86,47 @@
}
"""

MUTATION_ACCOUNT = """
mutation {
CoreAccountCreate(data: {
name: {value: "test"}
password: {value: "test"}
}) {
ok
}
}
"""

MUTATION_ACCOUNT_GROUP = """
mutation {
CoreAccountGroupCreate(data: {
name: {value: "test"}
}) {
ok
}
}
"""

MUTATION_ACCOUNT_ROLE = """
mutation {
CoreAccountRoleCreate(data: {
name: {value: "test"}
}) {
ok
}
}
"""

QUERY_ACCOUNT_PROFILE = """
query {
AccountProfile {
name {
value
}
}
}
"""


class TestObjectPermissions:
async def test_setup(
Expand All @@ -90,9 +137,8 @@ async def test_setup(
permissions_helper: PermissionsHelper,
first_account: CoreAccount,
):
permissions_helper._first = first_account
permissions_helper._default_branch = default_branch
registry.permission_backends = [LocalPermissionBackend()]
permissions_helper._default_branch = default_branch

permissions = []
for object_permission in [
Expand Down Expand Up @@ -136,6 +182,8 @@ async def test_setup(
await group.members.add(db=db, data={"id": first_account.id})
await group.members.save(db=db)

permissions_helper._first = first_account

async def test_first_account_tags(self, db: InfrahubDatabase, permissions_helper: PermissionsHelper) -> None:
gql_params = prepare_graphql_params(db=db, include_mutation=True, branch=permissions_helper.default_branch)
analyzed_query = InfrahubGraphQLQueryAnalyzer(
Expand Down Expand Up @@ -228,3 +276,118 @@ async def test_first_account_graphql_and_repos(
branch=permissions_helper.default_branch,
query_parameters=gql_params,
)


class TestAccountManagerPermissions:
async def test_setup(
self,
db: InfrahubDatabase,
register_core_models_schema: None,
default_branch: Branch,
permissions_helper: PermissionsHelper,
first_account: CoreAccount,
second_account: CoreAccount,
):
registry.permission_backends = [LocalPermissionBackend()]
permissions_helper._default_branch = default_branch

permission = await Node.init(db=db, schema=InfrahubKind.GLOBALPERMISSION)
await permission.new(
db=db, name=GlobalPermissions.EDIT_DEFAULT_BRANCH.value, action=GlobalPermissions.MANAGE_ACCOUNTS.value
)
await permission.save(db=db)

role = await Node.init(db=db, schema=InfrahubKind.ACCOUNTROLE)
await role.new(db=db, name="admin", permissions=[permission])
await role.save(db=db)

group = await Node.init(db=db, schema=InfrahubKind.ACCOUNTGROUP)
await group.new(db=db, name="admin", roles=[role])
await group.save(db=db)

await group.members.add(db=db, data={"id": first_account.id})
await group.members.save(db=db)

permissions_helper._first = first_account
permissions_helper._second = second_account

@pytest.mark.parametrize(
"user",
[
AccountSession(account_id="abc", auth_type=AuthType.JWT, role=AccountRole.ADMIN),
AccountSession(authenticated=False, account_id="anonymous", auth_type=AuthType.NONE),
],
)
async def test_supports_manage_accounts_permission_accounts(
self, user: AccountSession, db: InfrahubDatabase, permissions_helper: PermissionsHelper
):
checker = AccountManagerPermissionChecker()
is_supported = await checker.supports(db=db, account_session=user, branch=permissions_helper.default_branch)
assert is_supported == user.authenticated

@pytest.mark.parametrize("operation", [MUTATION_ACCOUNT, MUTATION_ACCOUNT_GROUP, MUTATION_ACCOUNT_ROLE])
async def test_account_with_permission(
self, db: InfrahubDatabase, permissions_helper: PermissionsHelper, operation: str
):
checker = AccountManagerPermissionChecker()
session = AccountSession(
authenticated=True, account_id=permissions_helper.first.id, session_id=str(uuid4()), auth_type=AuthType.JWT
)

gql_params = prepare_graphql_params(db=db, include_mutation=True, branch=permissions_helper.default_branch)
analyzed_query = InfrahubGraphQLQueryAnalyzer(
query=operation, schema=gql_params.schema, branch=permissions_helper.default_branch
)

resolution = await checker.check(
db=db,
account_session=session,
analyzed_query=analyzed_query,
query_parameters=gql_params,
branch=permissions_helper.default_branch,
)
assert resolution == CheckerResolution.NEXT_CHECKER

@pytest.mark.parametrize(
"operation,must_raise",
[
(MUTATION_ACCOUNT, True),
(MUTATION_ACCOUNT_GROUP, True),
(MUTATION_ACCOUNT_ROLE, True),
(QUERY_TAGS, False),
(QUERY_ACCOUNT_PROFILE, False),
],
)
async def test_account_without_permission(
self, db: InfrahubDatabase, permissions_helper: PermissionsHelper, operation: str, must_raise: bool
):
checker = AccountManagerPermissionChecker()
session = AccountSession(
authenticated=True, account_id=permissions_helper.second.id, session_id=str(uuid4()), auth_type=AuthType.JWT
)

gql_params = prepare_graphql_params(db=db, include_mutation=True, branch=permissions_helper.default_branch)
analyzed_query = InfrahubGraphQLQueryAnalyzer(
query=operation, schema=gql_params.schema, branch=permissions_helper.default_branch
)

if not must_raise:
resolution = await checker.check(
db=db,
account_session=session,
analyzed_query=analyzed_query,
query_parameters=gql_params,
branch=permissions_helper.default_branch,
)
assert resolution == CheckerResolution.NEXT_CHECKER
else:
with pytest.raises(
PermissionDeniedError, match=r"You do not have the permission to manage user accounts, groups or roles"
):
await checker.check(
db=db,
account_session=session,
analyzed_query=analyzed_query,
query_parameters=gql_params,
branch=permissions_helper.default_branch,
)

0 comments on commit 30f4ddb

Please sign in to comment.