-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathauthorizer.py
157 lines (130 loc) · 5.54 KB
/
authorizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from flask import Flask, request, Response
import scitokens
import ConfigParser
import argparse
import traceback
import re
import logging
import json
logging.basicConfig(level=logging.DEBUG)
app = Flask(__name__)
g_authorized_issuers = {}
g_global_audience = ""
@app.route('/auth')
def flask_listener():
# Convert the operation to something that the token will know,
# like read or write
orig_op = request.headers.get('X-Original-Method')
op = ""
if orig_op == "GET":
op = 'read'
elif orig_op in ["PUT", "POST", "DELETE", "MKCOL", "COPY", "MOVE"]:
op = 'write'
# Look at the path as well
orig_path = request.headers.get('X-Original-URI')
# Convert the token to a SciToken (also check for errors with the token)
if 'Authorization' not in request.headers:
resp = Response("No Authorization header")
resp.headers['WWW-Authenticate'] = 'Bearer realm="scitokens"'
logging.error("No Authorization header presented")
return resp, 401
raw_token = request.headers['Authorization'].split(" ", 1)[1]
# Convert the token
# Send a 401 error code if there is any problem
try:
token = scitokens.SciToken.deserialize(raw_token, audience=g_global_audience)
except Exception as e:
resp = Response("No Authorization header")
resp.headers['WWW-Authenticate'] = 'Bearer realm="scitokens",error="invalid_token",error_description="{0}"'.format(str(e))
logging.exception("Failed to deserialize SciToken")
traceback.print_exc()
return resp, 401
(successful, message) = test_operation_path(op, orig_path, token)
if successful:
if 'jti' in token._claims:
logging.info("Allowed token with Token ID: {0}".format(str(token['jti'])))
return message, 200
else:
if 'jti' in token._claims:
logging.error("Failed to authenticate SciToken ID {0} because {1}".format(token['jti'], message))
else:
logging.error("Failed to authenticate SciToken because {0}".format(message))
return message, 403
def test_operation_path(op, path, token):
"""
Test whether an operation and path is allowed by this scitoken.
:returns: (successful, message) true if the scitoken allows for this path & op, else false
"""
# Setup a SciToken Enforcer
if token['iss'] not in g_authorized_issuers:
return (False, "Issuer not in configuration")
issuer = token['iss']
base_path = g_authorized_issuers[issuer]['base_path']
# The path above should consist of"
# $base_path + / + $auth_path + / + $request_path = path
if not path.startswith(base_path):
print "Requested path does not start with base_path"
return (False, "The requested path does not start with the base path")
# Now remove the base path so we just get the auth_path + request_path
auth_requested = path.replace(base_path, "", 1)
print auth_requested
enforcer = scitokens.scitokens.Enforcer(token['iss'], audience=g_global_audience)
try:
if enforcer.test(token, op, auth_requested):
return (True, "")
else:
return (False, "Path not allowed")
except scitokens.scitokens.EnforcementError as e:
print e
return (False, str(e))
return (True, "")
# From xrootd-scitokens, we want the same configuration
def config(fname):
print "Trying to load configuration from %s" % fname
cp = ConfigParser.SafeConfigParser()
try:
with open(fname, "r") as fp:
cp.readfp(fp)
except IOError as ie:
if ie.errno == errno.ENOENT:
return
raise
for section in cp.sections():
if not section.lower().startswith("issuer "):
continue
if 'issuer' not in cp.options(section):
print "Ignoring section %s as it has no `issuer` option set." % section
continue
if 'base_path' not in cp.options(section):
print "Ignoring section %s as it has no `base_path` option set." % section
continue
issuer = cp.get(section, 'issuer')
base_path = cp.get(section, 'base_path')
base_path = scitokens.urltools.normalize_path(base_path)
issuer_info = g_authorized_issuers.setdefault(issuer, {})
issuer_info['base_path'] = base_path
if 'map_subject' in cp.options(section):
issuer_info['map_subject'] = cp.getboolean(section, 'map_subject')
print "Configured token access for %s (issuer %s): %s" % (section, issuer, str(issuer_info))
global g_global_audience
if 'audience_json' in cp.options("Global"):
# Read in the audience as json. Hopefully it's in list format or a string
g_global_audience = json.loads(cp.get("Global", "audience_json"))
elif 'audience' in cp.options("Global"):
g_global_audience = cp.get("Global", "audience")
if ',' in g_global_audience:
# Split the audience list
g_global_audience = re.split("\s*,\s*", g_global_audience)
def main():
parser = argparse.ArgumentParser(description='Authenticate HTTP Requests')
parser.add_argument('-c', '--config', dest='config', type=str,
default="/etc/scitokens/authorizer.cfg",
help="Location of the configuration file")
args = parser.parse_args()
# Read in configuration
config(args.config)
# Set up listener for events
app.run(host='localhost', port=1234)
pass
if __name__ == "__main__":
main()