diff --git a/ckanext/let_me_in/logic/action.py b/ckanext/let_me_in/logic/action.py index 9719fc2..eb82e10 100644 --- a/ckanext/let_me_in/logic/action.py +++ b/ckanext/let_me_in/logic/action.py @@ -22,19 +22,23 @@ def lmi_generate_otl( """Generate a one-time login link for a specified user :param uid: user ID - :type uid: string + :type uid: str :param name: username - :type name: string + :type name: str :param mail: user email - :type mail: string + :type mail: str + + :param ttl: TTL for OTL link in seconds + :type ttl: int """ tk.check_access("lmi_generate_otl", context, data_dict) uid: str = data_dict.get("uid", "") name: str = data_dict.get("name", "") mail: str = data_dict.get("mail", "") + ttl: int = data_dict.get("ttl", lmi_config.get_default_otl_link_ttl()) if not any([uid, name, mail]): raise tk.ValidationError( @@ -56,7 +60,7 @@ def lmi_generate_otl( token = jwt.encode( { "user_id": user.id, - "exp": now + td(seconds=lmi_config.get_default_otl_link_ttl()), + "exp": now + td(seconds=ttl), "created_at": now.timestamp(), }, lmi_utils.get_secret(True), diff --git a/ckanext/let_me_in/logic/schema.py b/ckanext/let_me_in/logic/schema.py index ba90926..8386446 100644 --- a/ckanext/let_me_in/logic/schema.py +++ b/ckanext/let_me_in/logic/schema.py @@ -14,9 +14,12 @@ def lmi_generate_otl( user_id_exists, user_name_exists, user_email_exists, + int_validator, + is_positive_integer, ) -> Schema: return { "uid": [ignore_missing, unicode_safe, user_id_exists], "name": [ignore_missing, unicode_safe, user_name_exists], "mail": [ignore_missing, unicode_safe, user_email_exists], + "ttl": [ignore_missing, int_validator, is_positive_integer], } diff --git a/ckanext/let_me_in/tests/test_config.py b/ckanext/let_me_in/tests/test_config.py new file mode 100644 index 0000000..805b114 --- /dev/null +++ b/ckanext/let_me_in/tests/test_config.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +import pytest + +import ckanext.let_me_in.config as lmi_config + + +@pytest.mark.usefixtures("with_plugins") +class TestOTLConfig(object): + @pytest.mark.ckan_config(lmi_config.CONF_OTL_LINK_TTL, 999) + def test_set_default_ttl(self): + assert lmi_config.get_default_otl_link_ttl() == 999 + + def test_default_ttl(self): + assert lmi_config.get_default_otl_link_ttl() == lmi_config.DEFAULT_OTL_LINK_TTL diff --git a/ckanext/let_me_in/tests/test_logic.py b/ckanext/let_me_in/tests/test_logic.py index 1c765a1..7cb1144 100644 --- a/ckanext/let_me_in/tests/test_logic.py +++ b/ckanext/let_me_in/tests/test_logic.py @@ -45,3 +45,7 @@ def test_by_name(self, user): @pytest.mark.usefixtures("clean_db") def test_by_male(self, user): call_action("lmi_generate_otl", mail=user["email"]) + + @pytest.mark.usefixtures("clean_db") + def test_ttl_option(self, user): + call_action("lmi_generate_otl", mail=user["email"]) diff --git a/ckanext/let_me_in/tests/test_views.py b/ckanext/let_me_in/tests/test_views.py index 49455b0..9450442 100644 --- a/ckanext/let_me_in/tests/test_views.py +++ b/ckanext/let_me_in/tests/test_views.py @@ -8,6 +8,10 @@ import ckan.model as model from ckan.tests.helpers import call_action +HOUR = 3600 +SECOND = 1 +EXPIRED = True + @pytest.mark.usefixtures("non_clean_db", "with_plugins") class TestOTLViews(object): @@ -62,3 +66,30 @@ def test_otl_time_expiration(self, app, freezer, user, delta_kwargs, expired): err_msg = "The login link has expired. Please request a new one" assert err_msg in resp_body if expired else err_msg not in resp_body + + def test_user_is_not_active(self, app, user_factory): + """If user is not Active, we can't login""" + user = user_factory(state=model.State.DELETED) + otl = call_action("lmi_generate_otl", uid=user["id"]) + + assert "User is not active" in app.get(otl["url"]).body + + @pytest.mark.parametrize( + "delta_kwargs,ttl,expired", + [ + ({"seconds": SECOND}, SECOND, EXPIRED), # 1 seconds, immediately expires + ({"hours": 2}, HOUR, EXPIRED), + ({"hours": 2}, HOUR * 3, not EXPIRED), + ], + ) + def test_custom_otl_ttl(self, app, freezer, user, delta_kwargs, ttl, expired): + """We can set custom TTL for each generated OTL link""" + + otl = call_action("lmi_generate_otl", uid=user["id"], ttl=ttl) + + freezer.move_to(timedelta(**delta_kwargs)) + + resp_body: str = app.get(otl["url"]).body + + err_msg = "The login link has expired. Please request a new one" + assert err_msg in resp_body if expired else err_msg not in resp_body diff --git a/ckanext/let_me_in/views.py b/ckanext/let_me_in/views.py index eeedb48..53ae6e4 100644 --- a/ckanext/let_me_in/views.py +++ b/ckanext/let_me_in/views.py @@ -23,40 +23,38 @@ def login_with_token(token): token = jwt.decode(token, lmi_utils.get_secret(False), algorithms=["HS256"]) except jwt.ExpiredSignatureError: tk.h.flash_error(tk._("The login link has expired. Please request a new one.")) + return tk.h.redirect_to("user.login") except jwt.DecodeError: tk.h.flash_error(tk._("Invalid login link.")) - else: - user = lmi_utils.get_user(token["user_id"]) + return tk.h.redirect_to("user.login") - if not user: - tk.h.flash_error(tk._("Invalid login link.")) - return tk.h.redirect_to("user.login") + user = lmi_utils.get_user(token["user_id"]) - context = {} + if not user: + tk.h.flash_error(tk._("Invalid login link.")) + return tk.h.redirect_to("user.login") - for plugin in p.PluginImplementations(ILetMeIn): - user = plugin.manage_user(user, context) + context = {} - if user.state != model.State.ACTIVE: - tk.h.flash_error(tk._("User is not active. Can't login")) - return tk.h.redirect_to("user.login") + for plugin in p.PluginImplementations(ILetMeIn): + user = plugin.manage_user(user, context) - if user.last_active and user.last_active > dt.fromtimestamp( - token["created_at"] - ): - tk.h.flash_error( - tk._("You have tried to use a one-time login link that has expired.") - ) - return tk.h.redirect_to("user.login") + if user.state != model.State.ACTIVE: + tk.h.flash_error(tk._("User is not active. Can't login")) + return tk.h.redirect_to("user.login") - for plugin in p.PluginImplementations(ILetMeIn): - plugin.before_otl_login(user, context) + if user.last_active and user.last_active > dt.fromtimestamp(token["created_at"]): + tk.h.flash_error( + tk._("You have tried to use a one-time login link that has expired.") + ) + return tk.h.redirect_to("user.login") - tk.login_user(user) + for plugin in p.PluginImplementations(ILetMeIn): + plugin.before_otl_login(user, context) - for plugin in p.PluginImplementations(ILetMeIn): - plugin.after_otl_login(user, context) + tk.login_user(user) - return tk.h.redirect_to("user.me") + for plugin in p.PluginImplementations(ILetMeIn): + plugin.after_otl_login(user, context) - return tk.h.redirect_to("user.login") + return tk.h.redirect_to("user.me")