From 5111a08f86aeb161573f477139bc04f2513659fa Mon Sep 17 00:00:00 2001 From: Scott Trinh Date: Fri, 22 Nov 2024 15:09:36 -0500 Subject: [PATCH] Allow Magic Link to specify a custom link URL (#8030) Similar to #8026 , this allows applications to specify a link back to the application itself to process the Magic Link authentication flow. Closes #8028 --- edb/server/protocol/auth_ext/http.py | 22 ++- edb/server/protocol/auth_ext/webhook.py | 1 + tests/test_http_ext_auth.py | 232 +++++++++++++++++++++++- 3 files changed, 250 insertions(+), 5 deletions(-) diff --git a/edb/server/protocol/auth_ext/http.py b/edb/server/protocol/auth_ext/http.py index fd91dc21d35..b35a3868f60 100644 --- a/edb/server/protocol/auth_ext/http.py +++ b/edb/server/protocol/auth_ext/http.py @@ -1104,6 +1104,13 @@ async def handle_magic_link_register( data.get("redirect_to") ) + allowed_link_url = self._maybe_make_allowed_url(data.get("link_url")) + link_url = ( + allowed_link_url.url + if allowed_link_url + else f"{self.base_path}/magic-link/authenticate" + ) + magic_link_client = magic_link.Client( db=self.db, issuer=self.base_path, @@ -1149,6 +1156,7 @@ async def handle_magic_link_register( identity_id=email_factor.identity.id, email_factor_id=email_factor.id, magic_link_token=magic_link_token, + magic_link_url=link_url, ) ) logger.info( @@ -1157,7 +1165,7 @@ async def handle_magic_link_register( ) await magic_link_client.send_magic_link( email=email, - link_url=f"{self.base_path}/magic-link/authenticate", + link_url=link_url, redirect_on_failure=allowed_redirect_on_failure.url, token=magic_link_token, ) @@ -1239,6 +1247,15 @@ async def handle_magic_link_email( data.get("redirect_to") ) + allowed_link_url = self._maybe_make_allowed_url( + data.get("link_url") + ) + link_url = ( + allowed_link_url.url + if allowed_link_url + else f"{self.base_path}/magic-link/authenticate" + ) + magic_link_client = magic_link.Client( db=self.db, issuer=self.base_path, @@ -1268,12 +1285,13 @@ async def handle_magic_link_email( identity_id=identity_id, email_factor_id=email_factor.id, magic_link_token=magic_link_token, + magic_link_url=link_url, ) ) await magic_link_client.send_magic_link( email=email, token=magic_link_token, - link_url=f"{self.base_path}/magic-link/authenticate", + link_url=link_url, redirect_on_failure=redirect_on_failure, ) logger.info( diff --git a/edb/server/protocol/auth_ext/webhook.py b/edb/server/protocol/auth_ext/webhook.py index 1e0fc977e65..d33eee5d8df 100644 --- a/edb/server/protocol/auth_ext/webhook.py +++ b/edb/server/protocol/auth_ext/webhook.py @@ -175,6 +175,7 @@ class MagicLinkRequested(Event, HasIdentity, HasEmailFactor): init=False, ) magic_link_token: str + magic_link_url: str def __repr__(self) -> str: return ( diff --git a/tests/test_http_ext_auth.py b/tests/test_http_ext_auth.py index 4d102879678..560085607ff 100644 --- a/tests/test_http_ext_auth.py +++ b/tests/test_http_ext_auth.py @@ -4339,13 +4339,15 @@ async def test_http_auth_ext_webauthn_authenticate_options(self): ) ) - async def test_http_auth_ext_magic_link_01(self): + async def test_http_auth_ext_magic_link_with_link_url(self): email = f"{uuid.uuid4()}@example.com" challenge = "test_challenge" callback_url = "https://example.com/app/auth/callback" redirect_on_failure = "https://example.com/app/auth/magic-link-failure" + link_url = "https://example.com/app/magic-link/authenticate" with self.http_con() as http_con: + # Register with link_url _, _, status = self.http_con_request( http_con, method="POST", @@ -4357,6 +4359,7 @@ async def test_http_auth_ext_magic_link_01(self): "challenge": challenge, "callback_url": callback_url, "redirect_on_failure": redirect_on_failure, + "link_url": link_url, } ).encode(), headers={ @@ -4387,10 +4390,174 @@ async def test_http_auth_ext_magic_link_01(self): r'

([^<]+)', html_email ) assert match is not None - verify_url = urllib.parse.urlparse(match.group(1)) - search_params = urllib.parse.parse_qs(verify_url.query) + magic_link_url = urllib.parse.urlparse(match.group(1)) + search_params = urllib.parse.parse_qs(magic_link_url.query) + token = search_params.get("token", [None])[0] + assert token is not None + self.assertEqual( + urllib.parse.urlunparse( + ( + magic_link_url.scheme, + magic_link_url.netloc, + magic_link_url.path, + '', + '', + '', + ) + ), + link_url, + ) + + _, headers, status = self.http_con_request( + http_con, + method="GET", + path=f"magic-link/authenticate?token={token}", + ) + + self.assertEqual(status, 302) + location = headers.get("location") + assert location is not None + parsed_location = urllib.parse.urlparse(location) + self.assertEqual( + urllib.parse.urlunparse( + ( + parsed_location.scheme, + parsed_location.netloc, + parsed_location.path, + '', + '', + '', + ) + ), + callback_url, + ) + + # Sign in with the registered email and link_url + _, _, status = self.http_con_request( + http_con, + method="POST", + path="magic-link/email", + body=json.dumps( + { + "provider": "builtin::local_magic_link", + "email": email, + "challenge": challenge, + "callback_url": callback_url, + "redirect_on_failure": redirect_on_failure, + "link_url": link_url, + } + ).encode(), + headers={ + "Content-Type": "application/json", + "Accept": "application/json", + }, + ) + + # Get the token from email + file_name_hash = hashlib.sha256( + f"{SENDER}{email}".encode() + ).hexdigest() + test_file = os.environ.get( + "EDGEDB_TEST_EMAIL_FILE", + f"/tmp/edb-test-email-{file_name_hash}.pickle", + ) + with open(test_file, "rb") as f: + email_args = pickle.load(f) + self.assertEqual(email_args["sender"], SENDER) + self.assertEqual(email_args["recipients"], email) + msg = cast(EmailMessage, email_args["message"]).get_body( + ("html",) + ) + assert msg is not None + html_email = msg.get_payload(decode=True).decode("utf-8") + match = re.search( + r'

([^<]+)', html_email + ) + assert match is not None + magic_link_url = urllib.parse.urlparse(match.group(1)) + search_params = urllib.parse.parse_qs(magic_link_url.query) + token = search_params.get("token", [None])[0] + assert token is not None + self.assertEqual( + urllib.parse.urlunparse( + ( + magic_link_url.scheme, + magic_link_url.netloc, + magic_link_url.path, + '', + '', + '', + ) + ), + link_url, + ) + + async def test_http_auth_ext_magic_link_without_link_url(self): + email = f"{uuid.uuid4()}@example.com" + challenge = "test_challenge" + callback_url = "https://example.com/app/auth/callback" + redirect_on_failure = "https://example.com/app/auth/magic-link-failure" + + with self.http_con() as http_con: + # Register without link_url + _, _, status = self.http_con_request( + http_con, + method="POST", + path="magic-link/register", + body=json.dumps( + { + "provider": "builtin::local_magic_link", + "email": email, + "challenge": challenge, + "callback_url": callback_url, + "redirect_on_failure": redirect_on_failure, + } + ).encode(), + headers={ + "Content-Type": "application/json", + "Accept": "application/json", + }, + ) + self.assertEqual(status, 200) + + # Get the token from email + file_name_hash = hashlib.sha256( + f"{SENDER}{email}".encode() + ).hexdigest() + test_file = os.environ.get( + "EDGEDB_TEST_EMAIL_FILE", + f"/tmp/edb-test-email-{file_name_hash}.pickle", + ) + with open(test_file, "rb") as f: + email_args = pickle.load(f) + self.assertEqual(email_args["sender"], SENDER) + self.assertEqual(email_args["recipients"], email) + msg = cast(EmailMessage, email_args["message"]).get_body( + ("html",) + ) + assert msg is not None + html_email = msg.get_payload(decode=True).decode("utf-8") + match = re.search( + r'

([^<]+)', html_email + ) + assert match is not None + magic_link_url = urllib.parse.urlparse(match.group(1)) + search_params = urllib.parse.parse_qs(magic_link_url.query) token = search_params.get("token", [None])[0] assert token is not None + self.assertEqual( + urllib.parse.urlunparse( + ( + magic_link_url.scheme, + magic_link_url.netloc, + magic_link_url.path, + '', + '', + '', + ) + ), + f"{self.http_addr}/magic-link/authenticate", + ) _, headers, status = self.http_con_request( http_con, @@ -4416,6 +4583,65 @@ async def test_http_auth_ext_magic_link_01(self): callback_url, ) + # Sign in with the registered email without link_url + _, _, status = self.http_con_request( + http_con, + method="POST", + path="magic-link/email", + body=json.dumps( + { + "provider": "builtin::local_magic_link", + "email": email, + "challenge": challenge, + "callback_url": callback_url, + "redirect_on_failure": redirect_on_failure, + } + ).encode(), + headers={ + "Content-Type": "application/json", + "Accept": "application/json", + }, + ) + + # Get the token from email + file_name_hash = hashlib.sha256( + f"{SENDER}{email}".encode() + ).hexdigest() + test_file = os.environ.get( + "EDGEDB_TEST_EMAIL_FILE", + f"/tmp/edb-test-email-{file_name_hash}.pickle", + ) + with open(test_file, "rb") as f: + email_args = pickle.load(f) + self.assertEqual(email_args["sender"], SENDER) + self.assertEqual(email_args["recipients"], email) + msg = cast(EmailMessage, email_args["message"]).get_body( + ("html",) + ) + assert msg is not None + html_email = msg.get_payload(decode=True).decode("utf-8") + match = re.search( + r'

([^<]+)', html_email + ) + assert match is not None + magic_link_url = urllib.parse.urlparse(match.group(1)) + search_params = urllib.parse.parse_qs(magic_link_url.query) + token = search_params.get("token", [None])[0] + assert token is not None + self.assertEqual( + urllib.parse.urlunparse( + ( + magic_link_url.scheme, + magic_link_url.netloc, + magic_link_url.path, + '', + '', + '', + ) + ), + f"{self.http_addr}/magic-link/authenticate", + ) + async def test_http_auth_ext_identity_delete_cascade_01(self): """ Test deleting a LocalIdentity deletes the associated Factors and