-
Notifications
You must be signed in to change notification settings - Fork 3
/
decorators.py
112 lines (86 loc) · 3.63 KB
/
decorators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from flask import request
# from ecrdb import *
import ecrdb
from error_response import ErrorResponse
from http import HTTPStatus
def login_required(func):
def wrapper2(self, **kwargs):
authenticated = request.environ["authenticated"]
if not authenticated:
raise ErrorResponse(
"Not authenticated", status_code=HTTPStatus.UNAUTHORIZED
)
return func(self, **kwargs)
return wrapper2
def approval_required(func):
def wrapper(self, **kwargs):
if not request.environ.get("is_approved", False):
raise ErrorResponse(
"Your account is not approved to perform this action.",
status_code=HTTPStatus.FORBIDDEN,
)
return func(self, **kwargs)
return wrapper
def has_resource_permission(permission):
def real_decorator(func):
def wrapper2(self, namespace=None, repository=None, version=None):
authenticated = request.environ["authenticated"]
resourceType = ""
resourceName = ""
if repository:
if not namespace:
raise ErrorResponse(
f"namespace missing (should not happen, code:a)",
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
)
resourceType = "repository"
resourceName = f"{namespace}/{repository}"
elif namespace:
resourceType = "namespace"
resourceName = namespace
else:
raise ErrorResponse(
f"namespace missing (should not happen, code:b)",
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
)
ecr_db = ecrdb.EcrDB()
if not authenticated:
if ecr_db.hasPermission(
resourceType, resourceName, "GROUP", "AllUsers", permission
):
return func(self, namespace, repository, version)
raise ErrorResponse(
f"Not authorized.", status_code=HTTPStatus.UNAUTHORIZED
)
requestUser = request.environ.get("user", "")
isAdmin = request.environ.get("is_admin", False)
if not repository:
# check namespace permission only
if isAdmin or ecr_db.hasPermission(
resourceType, resourceName, "USER", requestUser, permission
):
# return func(self, namespace, repository, version)
return func(self, namespace)
raise ErrorResponse(
f"Not authorized. (User {requestUser} does not have permission {permission} for {resourceType} {resourceName})",
status_code=HTTPStatus.UNAUTHORIZED,
)
#
# check repository permission and namespace-inherited permission
hasNamespaceAccess = ecr_db.hasPermission(
"namespace", namespace, "USER", requestUser, permission
)
if (
isAdmin
or hasNamespaceAccess
or ecr_db.hasPermission(
resourceType, resourceName, "USER", requestUser, permission
)
):
return func(self, namespace, repository, version)
raise ErrorResponse(
f"Not authorized. (User {requestUser} does not have permission {permission} for {resourceType} {resourceName})",
status_code=HTTPStatus.UNAUTHORIZED,
)
return wrapper2
return real_decorator