-
Notifications
You must be signed in to change notification settings - Fork 1
/
nginx-odoo.py
255 lines (232 loc) · 9.84 KB
/
nginx-odoo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#!/bin/env python
# Copyright 2020 Sunflower IT
# TODO: for extra safety, dont let session_id be Odoo session id,
# but make a new JWT token that includes the Odoo session id,
# and translate in NGINX.
# TODO: prevent people logging out, but setting session_id again
# with cookie manager, then coming to Odoo login screen and
# guessing admin password.
from tornado.web import Application, RequestHandler, StaticFileHandler, MissingArgumentError
import tornado.ioloop
import tornado.escape
import asyncio
import logging
import pyotp
import lib.config as config
db = config.db
OdooAuthHandler = config.OdooAuthHandler
from lib.email import email
# Login page and login check
class LoginHandler(RequestHandler):
def get(self):
# TODO: extra protection eg. by IP or browser signature
# Redirect to other service
# needed when using the bouncer to authenticate a user with the bouncer
redirect_url = self.get_argument("redirect", None)
if redirect_url != None:
session_id = self.get_cookie("session_id")
if db.verify_session(session_id):
if not redirect_url.endswith("/"):
redirect_url += "/"
return self.redirect(f"{redirect_url}auth/{session_id}")
self.render(r"./templates/login.html", **config.theme_params)
async def post(self):
# handle username/password
# TODO: CSRF protection
username = self.get_body_argument("username", default=None)
password = self.get_body_argument("password", default=None)
if username and password:
logging.info("Verifying username %s and password...", username)
handler = OdooAuthHandler()
data, session_id = await handler.check_login(username, password)
if session_id:
hotp = pyotp.HOTP(config.HOTP_SECRET)
counter, code = db.next_hotp_id(session_id)
key = hotp.at(counter)
if config.disable_email:
# Display hotp in the log in stead of sending an email
logging.info(f"HOTP code: {key}")
else:
if not await email.send(username, key):
message = "Mail with security code not sent."
logging.error(message)
return self.render(
r"./templates/login.html",
**config.theme_params,
error=message,
)
return self.render(
r"./templates/hotp.html",
**config.theme_params,
counter=counter,
code=code,
)
else:
# TODO: brute force protection
# (block for X minutes after X attempts)
message = "Invalid username or password."
logging.info(message)
return self.render(
r"./templates/login.html", **config.theme_params, error=message
)
# check HOTP
counter = self.get_body_argument("counter", default=None)
code = self.get_body_argument("code", default=None)
hotp_code = self.get_body_argument("hotp_code", default=None)
if code and counter and hotp_code:
hotp = pyotp.HOTP(config.HOTP_SECRET)
if not hotp.verify(hotp_code, int(counter)):
message = "Invalid security code."
return self.render(
r"./templates/login.html", **config.theme_params, error=message
)
session_id = db.verify_code_and_expiry(counter, code)
if not session_id:
message = "Invalid security code (2)."
return self.render(
r"./templates/login.html", **config.theme_params, error=message
)
db.save_session(session_id, config.EXPIRY_INTERVAL)
logging.info("Setting session cookie: %s", session_id)
self.set_cookie("session_id", session_id, path="/")
# Redirect to other service
# needed when using the bouncer to authenticate a user
redirect_url = self.get_argument("redirect", "/")
if redirect_url != "/":
if not redirect_url.endswith("/"):
redirect_url += "/"
redirect_url = f"{redirect_url}auth/{session_id}"
return self.redirect(redirect_url)
return self.redirect("/")
# Session verificaiton
class VerifySessionHandler(RequestHandler):
def get(self):
session = self.get_cookie("session_id")
if db.verify_session(session):
self.set_status(200)
else:
logging.error(f"Failed to verify session: {session}")
self.set_status(401)
self.finish()
# Session logout
class LogoutHandler(RequestHandler):
def get(self):
session = self.get_cookie("session_id")
db.remove_session(session)
return self.redirect("/")
# Session login
class AuthenticateHandler(RequestHandler):
async def post(self):
params = tornado.escape.json_decode(self.request.body)["params"]
database = params.get("db")
username = params.get("login")
password = params.get("password")
hotp_code = params.get("hotp_code")
hotp_counter = params.get("hotp_counter")
hotp_csrf = params.get("hotp_csrf")
if not username and not password:
return self.set_status(400)
if not (hotp_code and hotp_counter and hotp_csrf):
handler = OdooAuthHandler()
data, session_id = await handler.check_login(username, password)
if not session_id:
return self.set_status(401)
hotp = pyotp.HOTP(config.HOTP_SECRET)
hotp_counter, hotp_csrf = db.next_hotp_id(session_id)
hotp_code = hotp.at(hotp_counter)
if config.disable_email:
# Display hotp in the log in stead of sending an email
logging.info(f"HOTP code: {hotp_code}")
else:
if not await email.send(username, hotp_code):
# for obfuscation, this needs to be the same as above
return self.set_status(401)
return self.write(
{
"result": {
"hotp_counter": hotp_counter,
"hotp_csrf": hotp_csrf,
}
}
)
else:
hotp = pyotp.HOTP(config.HOTP_SECRET)
# TODO: memory leaks?
handler = OdooAuthHandler()
if not hotp.verify(hotp_code, int(hotp_counter)):
return self.set_status(401)
session_id = db.verify_code_and_expiry(hotp_counter, hotp_csrf)
# login again and return new session id
data, session_id = await handler.check_login(username, password)
if not session_id:
# for obfuscation, this needs to be the same as above
return self.set_status(401)
# save new session id, not old one
db.save_session(session_id, config.EXPIRY_INTERVAL)
return self.write(data)
# Coupa punchout login
class PunchoutLoginHandler(RequestHandler):
async def get(self):
token = self.get_argument("token")
if not token:
return self.set_status(401)
handler = OdooAuthHandler()
resp, session_id = await handler.punchout_login(token)
if not session_id:
return self.set_status(401)
db.save_session(session_id, config.EXPIRY_INTERVAL)
self.set_status(200)
self.set_cookie("session_id", session_id, path="/")
self.finish(resp.text)
# Coupa punchout signup
class PunchoutSignupHandler(RequestHandler):
async def get(self):
try:
token = self.get_argument("signup_token")
except MissingArgumentError:
return self.set_status(401)
if not token:
return self.set_status(401)
handler = OdooAuthHandler()
# preserve session if we have it
session_id = self.get_cookie("session_id")
resp, session_id = await handler.punchout_signup(token, session_id)
if not session_id:
return self.set_status(401)
self.set_status(200)
self.set_cookie("session_id", session_id, path="/")
self.finish(resp.text)
async def post(self):
token = self.get_argument("signup_token")
if not token:
return self.set_status(401)
handler = OdooAuthHandler()
params = {k: self.get_argument(k) for k in self.request.arguments}
# preserve session if we have it (to prevent CSRF validation problems)
session_id = self.get_cookie("session_id")
resp, session_id = await handler.punchout_signup_post(token, params, session_id)
if not session_id:
return self.set_status(401)
db.save_session(session_id, config.EXPIRY_INTERVAL)
self.set_status(200)
self.set_cookie("session_id", session_id, path="/")
self.finish(resp.text)
app = Application(
[
(r"/", LoginHandler),
(r"/auth/?", VerifySessionHandler),
(r"/logout/?", LogoutHandler),
(r"/static/(.*\.(css|jpg|png))/?", StaticFileHandler, {"path": r"./static"}),
(r"/web/session/authenticate/?", AuthenticateHandler),
(r"/punchouttokenlogin/?", PunchoutLoginHandler),
(r"/punchout/signup?", PunchoutSignupHandler),
],
debug=True,
)
if __name__ == "__main__":
# Check connection with email service
#loop = asyncio.get_event_loop()
#loop.run_until_complete(email.test())
app.listen(config.LISTEN_PORT)
print(f"Listening at port {config.LISTEN_PORT}")
tornado.ioloop.IOLoop.current().start()