diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d2ce0758..a0e354595 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ and this project adheres to - ✅(frontend) Improve tests coverage #949 - ⬆️(docker) upgrade backend image to python 3.13 #973 - ⬆️(docker) upgrade node images to alpine 3.21 #973 +- ♻️(backend) Refactored permissions check from serializers to permission class #343 ### Fixed - 🐛(y-provider) increase JSON size limits for transcription conversion #989 diff --git a/src/backend/core/api/permissions.py b/src/backend/core/api/permissions.py index 43a0465f4..2ae9a0c06 100644 --- a/src/backend/core/api/permissions.py +++ b/src/backend/core/api/permissions.py @@ -134,3 +134,68 @@ def has_object_permission(self, request, view, obj): raise Http404 return has_permission + + +class CanManageAccessPermission(permissions.BasePermission): + """ + Permission class to control who can create or update access objects for a resource. + """ + + def has_permission(self, request, view): + user = request.user + + if not (bool(request.auth) or user.is_authenticated): + return False + + if view.action != "create": + return True + + try: + resource_id = view.kwargs["resource_id"] + except KeyError as exc: + raise exceptions.ValidationError( + "You must set a resource ID in kwargs to create a new access." + ) from exc + + resource_field = view.resource_field_name + + # Check if user is owner or admin for the resource + if not view.queryset.model.objects.filter( + Q(user=user) | Q(team__in=user.teams), + role__in=[RoleChoices.OWNER, RoleChoices.ADMIN], + **{resource_field: resource_id}, + ).exists(): + raise exceptions.PermissionDenied( + "You are not allowed to manage accesses for this resource." + ) + + if ( + request.data.get("role") == RoleChoices.OWNER + and not view.queryset.model.objects.filter( + Q(user=user) | Q(team__in=user.teams), + role=RoleChoices.OWNER, + **{resource_field: resource_id}, + ).exists() + ): + raise exceptions.PermissionDenied( + "Only owners of a resource can assign other users as owners." + ) + + return True + + def has_object_permission(self, request, view, obj): + if view.action in ["update", "partial_update"]: + requested_role = request.data.get("role") + abilities = obj.get_abilities(request.user) + allowed_roles = abilities.get("set_role_to", []) + + if requested_role and requested_role not in allowed_roles: + if allowed_roles: + raise exceptions.PermissionDenied( + f"You are only allowed to set role to {', '.join(allowed_roles)}." + ) + raise exceptions.PermissionDenied( + "You are not allowed to set this role for this " + f"{getattr(view, 'resource_field_name', 'resource')}." + ) + return True diff --git a/src/backend/core/api/serializers.py b/src/backend/core/api/serializers.py index e86288bb3..4309be68c 100644 --- a/src/backend/core/api/serializers.py +++ b/src/backend/core/api/serializers.py @@ -10,7 +10,7 @@ from django.utils.translation import gettext_lazy as _ import magic -from rest_framework import exceptions, serializers +from rest_framework import serializers from core import enums, models, utils from core.services.ai_services import AI_ACTIONS @@ -67,57 +67,9 @@ def get_abilities(self, access) -> dict: return {} def validate(self, attrs): - """ - Check access rights specific to writing (create/update) - """ - request = self.context.get("request") - user = getattr(request, "user", None) - role = attrs.get("role") - - # Update - if self.instance: - can_set_role_to = self.instance.get_abilities(user)["set_role_to"] - - if role and role not in can_set_role_to: - message = ( - f"You are only allowed to set role to {', '.join(can_set_role_to)}" - if can_set_role_to - else "You are not allowed to set this role for this template." - ) - raise exceptions.PermissionDenied(message) - - # Create - else: - try: - resource_id = self.context["resource_id"] - except KeyError as exc: - raise exceptions.ValidationError( - "You must set a resource ID in kwargs to create a new access." - ) from exc - - if not self.Meta.model.objects.filter( # pylint: disable=no-member - Q(user=user) | Q(team__in=user.teams), - role__in=[models.RoleChoices.OWNER, models.RoleChoices.ADMIN], - **{self.Meta.resource_field_name: resource_id}, # pylint: disable=no-member - ).exists(): - raise exceptions.PermissionDenied( - "You are not allowed to manage accesses for this resource." - ) - - if ( - role == models.RoleChoices.OWNER - and not self.Meta.model.objects.filter( # pylint: disable=no-member - Q(user=user) | Q(team__in=user.teams), - role=models.RoleChoices.OWNER, - **{self.Meta.resource_field_name: resource_id}, # pylint: disable=no-member - ).exists() - ): - raise exceptions.PermissionDenied( - "Only owners of a resource can assign other users as owners." - ) - - # pylint: disable=no-member - attrs[f"{self.Meta.resource_field_name}_id"] = self.context["resource_id"] + """Add the resource ID to the validated data on creation.""" + if not self.instance: + attrs[f"{self.Meta.resource_field_name}_id"] = self.context["resource_id"] # pylint: disable=no-member return attrs diff --git a/src/backend/core/api/viewsets.py b/src/backend/core/api/viewsets.py index cf9930703..c9f126d25 100644 --- a/src/backend/core/api/viewsets.py +++ b/src/backend/core/api/viewsets.py @@ -1497,7 +1497,11 @@ class DocumentAccessViewSet( lookup_field = "pk" pagination_class = Pagination - permission_classes = [permissions.IsAuthenticated, permissions.AccessPermission] + permission_classes = [ + permissions.IsAuthenticated, + permissions.CanManageAccessPermission, + permissions.AccessPermission, + ] queryset = models.DocumentAccess.objects.select_related("user").all() resource_field_name = "document" serializer_class = serializers.DocumentAccessSerializer @@ -1669,7 +1673,11 @@ class TemplateAccessViewSet( lookup_field = "pk" pagination_class = Pagination - permission_classes = [permissions.IsAuthenticated, permissions.AccessPermission] + permission_classes = [ + permissions.IsAuthenticated, + permissions.CanManageAccessPermission, + permissions.AccessPermission, + ] queryset = models.TemplateAccess.objects.select_related("user").all() resource_field_name = "template" serializer_class = serializers.TemplateAccessSerializer