Skip to content

Commit

Permalink
feature: add ttl param to lmi_generate_otl, update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mutantsan committed Oct 10, 2023
1 parent e179c1d commit d2062e9
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 29 deletions.
12 changes: 8 additions & 4 deletions ckanext/let_me_in/logic/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,23 @@ def lmi_generate_otl(
"""Generate a one-time login link for a specified user
:param uid: user ID
:type uid: string
:type uid: str
:param name: username
:type name: string
:type name: str
:param mail: user email
:type mail: string
:type mail: str
:param ttl: TTL for OTL link in seconds
:type ttl: int
"""
tk.check_access("lmi_generate_otl", context, data_dict)

uid: str = data_dict.get("uid", "")
name: str = data_dict.get("name", "")
mail: str = data_dict.get("mail", "")
ttl: int = data_dict.get("ttl", lmi_config.get_default_otl_link_ttl())

if not any([uid, name, mail]):
raise tk.ValidationError(
Expand All @@ -56,7 +60,7 @@ def lmi_generate_otl(
token = jwt.encode(
{
"user_id": user.id,
"exp": now + td(seconds=lmi_config.get_default_otl_link_ttl()),
"exp": now + td(seconds=ttl),
"created_at": now.timestamp(),
},
lmi_utils.get_secret(True),
Expand Down
3 changes: 3 additions & 0 deletions ckanext/let_me_in/logic/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ def lmi_generate_otl(
user_id_exists,
user_name_exists,
user_email_exists,
int_validator,
is_positive_integer,
) -> Schema:
return {
"uid": [ignore_missing, unicode_safe, user_id_exists],
"name": [ignore_missing, unicode_safe, user_name_exists],
"mail": [ignore_missing, unicode_safe, user_email_exists],
"ttl": [ignore_missing, int_validator, is_positive_integer],
}
15 changes: 15 additions & 0 deletions ckanext/let_me_in/tests/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

import pytest

import ckanext.let_me_in.config as lmi_config


@pytest.mark.usefixtures("with_plugins")
class TestOTLConfig(object):
@pytest.mark.ckan_config(lmi_config.CONF_OTL_LINK_TTL, 999)
def test_set_default_ttl(self):
assert lmi_config.get_default_otl_link_ttl() == 999

def test_default_ttl(self):
assert lmi_config.get_default_otl_link_ttl() == lmi_config.DEFAULT_OTL_LINK_TTL
4 changes: 4 additions & 0 deletions ckanext/let_me_in/tests/test_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ def test_by_name(self, user):
@pytest.mark.usefixtures("clean_db")
def test_by_male(self, user):
call_action("lmi_generate_otl", mail=user["email"])

@pytest.mark.usefixtures("clean_db")
def test_ttl_option(self, user):
call_action("lmi_generate_otl", mail=user["email"])
31 changes: 31 additions & 0 deletions ckanext/let_me_in/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
import ckan.model as model
from ckan.tests.helpers import call_action

HOUR = 3600
SECOND = 1
EXPIRED = True


@pytest.mark.usefixtures("non_clean_db", "with_plugins")
class TestOTLViews(object):
Expand Down Expand Up @@ -62,3 +66,30 @@ def test_otl_time_expiration(self, app, freezer, user, delta_kwargs, expired):

err_msg = "The login link has expired. Please request a new one"
assert err_msg in resp_body if expired else err_msg not in resp_body

def test_user_is_not_active(self, app, user_factory):
"""If user is not Active, we can't login"""
user = user_factory(state=model.State.DELETED)
otl = call_action("lmi_generate_otl", uid=user["id"])

assert "User is not active" in app.get(otl["url"]).body

@pytest.mark.parametrize(
"delta_kwargs,ttl,expired",
[
({"seconds": SECOND}, SECOND, EXPIRED), # 1 seconds, immediately expires
({"hours": 2}, HOUR, EXPIRED),
({"hours": 2}, HOUR * 3, not EXPIRED),
],
)
def test_custom_otl_ttl(self, app, freezer, user, delta_kwargs, ttl, expired):
"""We can set custom TTL for each generated OTL link"""

otl = call_action("lmi_generate_otl", uid=user["id"], ttl=ttl)

freezer.move_to(timedelta(**delta_kwargs))

resp_body: str = app.get(otl["url"]).body

err_msg = "The login link has expired. Please request a new one"
assert err_msg in resp_body if expired else err_msg not in resp_body
48 changes: 23 additions & 25 deletions ckanext/let_me_in/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,38 @@ def login_with_token(token):
token = jwt.decode(token, lmi_utils.get_secret(False), algorithms=["HS256"])
except jwt.ExpiredSignatureError:
tk.h.flash_error(tk._("The login link has expired. Please request a new one."))
return tk.h.redirect_to("user.login")
except jwt.DecodeError:
tk.h.flash_error(tk._("Invalid login link."))
else:
user = lmi_utils.get_user(token["user_id"])
return tk.h.redirect_to("user.login")

if not user:
tk.h.flash_error(tk._("Invalid login link."))
return tk.h.redirect_to("user.login")
user = lmi_utils.get_user(token["user_id"])

context = {}
if not user:
tk.h.flash_error(tk._("Invalid login link."))
return tk.h.redirect_to("user.login")

for plugin in p.PluginImplementations(ILetMeIn):
user = plugin.manage_user(user, context)
context = {}

if user.state != model.State.ACTIVE:
tk.h.flash_error(tk._("User is not active. Can't login"))
return tk.h.redirect_to("user.login")
for plugin in p.PluginImplementations(ILetMeIn):
user = plugin.manage_user(user, context)

if user.last_active and user.last_active > dt.fromtimestamp(
token["created_at"]
):
tk.h.flash_error(
tk._("You have tried to use a one-time login link that has expired.")
)
return tk.h.redirect_to("user.login")
if user.state != model.State.ACTIVE:
tk.h.flash_error(tk._("User is not active. Can't login"))
return tk.h.redirect_to("user.login")

for plugin in p.PluginImplementations(ILetMeIn):
plugin.before_otl_login(user, context)
if user.last_active and user.last_active > dt.fromtimestamp(token["created_at"]):
tk.h.flash_error(
tk._("You have tried to use a one-time login link that has expired.")
)
return tk.h.redirect_to("user.login")

tk.login_user(user)
for plugin in p.PluginImplementations(ILetMeIn):
plugin.before_otl_login(user, context)

for plugin in p.PluginImplementations(ILetMeIn):
plugin.after_otl_login(user, context)
tk.login_user(user)

return tk.h.redirect_to("user.me")
for plugin in p.PluginImplementations(ILetMeIn):
plugin.after_otl_login(user, context)

return tk.h.redirect_to("user.login")
return tk.h.redirect_to("user.me")

0 comments on commit d2062e9

Please sign in to comment.