-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
118 lines (97 loc) · 3.74 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from flask import Blueprint
import json
import os
from six.moves.urllib.request import urlopen
from functools import wraps
from config import PROD_AUTH0_DOMAIN, DEV_AUTH0_DOMAIN
from flask import request, jsonify, _request_ctx_stack
from jose import jwt
if os.getenv("FLASK_ENV") == 'production':
AUTH0_DOMAIN = PROD_AUTH0_DOMAIN
else:
AUTH0_DOMAIN = DEV_AUTH0_DOMAIN
API_AUDIENCE = f"https://{AUTH0_DOMAIN}/api/v2/"
ALGORITHMS = ["RS256"]
auth = Blueprint("auth", __name__)
# Error handler
class AuthError(Exception):
def __init__(self, error, status_code):
self.error = error
self.status_code = status_code
@auth.errorhandler(AuthError)
def handle_auth_error(ex):
response = jsonify(ex.error)
response.status_code = ex.status_code
return response
# Format error response and append status code
def get_token_auth_header():
"""Obtains the Access Token from the Authorization Header
"""
auth = request.headers.get("Authorization", None)
if not auth:
raise AuthError({"code": "authorization_header_missing",
"description":
"Authorization header is expected"}, 401)
parts = auth.split()
if parts[0].lower() != "bearer":
raise AuthError({"code": "invalid_header",
"description":
"Authorization header must start with"
" Bearer"}, 401)
elif len(parts) == 1:
raise AuthError({"code": "invalid_header",
"description": "Token not found"}, 401)
elif len(parts) > 2:
raise AuthError({"code": "invalid_header",
"description":
"Authorization header must be"
" Bearer token"}, 401)
token = parts[1]
return token
def requires_auth(f):
"""Determines if the Access Token is valid
"""
@wraps(f)
def decorated(*args, **kwargs):
token = get_token_auth_header()
jsonurl = urlopen("https://"+AUTH0_DOMAIN+"/.well-known/jwks.json")
jwks = json.loads(jsonurl.read())
print(token)
unverified_header = jwt.get_unverified_header(token)
rsa_key = {}
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"]
}
if rsa_key:
try:
payload = jwt.decode(
token,
rsa_key,
algorithms=ALGORITHMS,
audience=API_AUDIENCE,
issuer="https://"+AUTH0_DOMAIN+"/"
)
except jwt.ExpiredSignatureError:
raise AuthError({"code": "token_expired",
"description": "token is expired"}, 401)
except jwt.JWTClaimsError:
raise AuthError({"code": "invalid_claims",
"description":
"incorrect claims,"
"please check the audience and issuer"}, 401)
except Exception:
raise AuthError({"code": "invalid_header",
"description":
"Unable to parse authentication"
" token."}, 401)
_request_ctx_stack.top.current_user = payload
return f(*args, **kwargs)
raise AuthError({"code": "invalid_header",
"description": "Unable to find appropriate key"}, 401)
return decorated