From 3af09d4e3a1cfc3e5af5b8dcb69043499d7ce645 Mon Sep 17 00:00:00 2001 From: mutantsan Date: Tue, 3 Oct 2023 16:35:45 +0300 Subject: [PATCH] feature: make otl link expirable after usage --- ckanext/let_me_in/logic/action.py | 21 +++++++++++++------- ckanext/let_me_in/views.py | 32 +++++++++++++++++++++++-------- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/ckanext/let_me_in/logic/action.py b/ckanext/let_me_in/logic/action.py index add9425..22aac16 100644 --- a/ckanext/let_me_in/logic/action.py +++ b/ckanext/let_me_in/logic/action.py @@ -13,6 +13,8 @@ import ckanext.let_me_in.logic.schema as schema import ckanext.let_me_in.utils as lmi_utils +# import ckanext.let_me_in.model as lmi_model + @validate(schema.lmi_generate_otl) def lmi_generate_otl( @@ -26,12 +28,17 @@ def lmi_generate_otl( tk.check_access("lmi_generate_otl", context, data_dict) user = cast(model.User, model.User.get(data_dict["user"])) - - token_data = { - "user_id": user.id, - "exp": dt.utcnow() + td(hours=24) - } - - token = jwt.encode(token_data, lmi_utils.get_secret(True), algorithm="HS256") + now = dt.utcnow() + expires_at = now + td(hours=24) + + token = jwt.encode( + {"user_id": user.id, "exp": expires_at, "created_at": now.timestamp()}, + lmi_utils.get_secret(True), + algorithm="HS256", + ) + + # lmi_model.OneTimeLoginToken.create( + # {"token": token, "user_id": user.id, "expires_at": expires_at} + # ) return {"url": tk.url_for("lmi.login_with_token", token=token, _external=True)} diff --git a/ckanext/let_me_in/views.py b/ckanext/let_me_in/views.py index 0e17be5..ff983f0 100644 --- a/ckanext/let_me_in/views.py +++ b/ckanext/let_me_in/views.py @@ -5,6 +5,7 @@ import jwt +import ckan.model as model from ckan.plugins import toolkit as tk from flask import Blueprint @@ -17,19 +18,34 @@ @lmi.route("/lmi/") def login_with_token(token): try: - data = jwt.decode(token, lmi_utils.get_secret(False), algorithms=["HS256"]) - - if dt.utcnow() > dt.fromtimestamp(data["exp"]): - raise (jwt.ExpiredSignatureError) - - tk.login_user(lmi_utils.get_user(data["user_id"])) - tk.h.flash_success("You have been logged in.") - + token = jwt.decode(token, lmi_utils.get_secret(False), algorithms=["HS256"]) except jwt.ExpiredSignatureError: tk.h.flash_error(tk._("The login link has expired. Please request a new one.")) except jwt.DecodeError: tk.h.flash_error(tk._("Invalid login link.")) else: + user = lmi_utils.get_user(token["user_id"]) + + if user.state != model.State.ACTIVE: + tk.h.flash_error(tk._("User is not active. Can't login")) + return tk.h.redirect_to("user.login") + + if user.last_active > dt.fromtimestamp(token["created_at"]): + tk.h.flash_error( + tk._("You have tried to use a one-time login link that has expired.") + ) + return tk.h.redirect_to("user.login") + + tk.login_user(user) + _update_user_last_active(user) + + tk.h.flash_success("You have been logged in.") + return tk.h.redirect_to("user.me") return tk.h.redirect_to("user.login") + + +def _update_user_last_active(user: model.User) -> None: + user.last_active = dt.utcnow() + model.Session.commit()