Skip to content

Commit

Permalink
Allow Magic Link to specify a custom link URL (#8030)
Browse files Browse the repository at this point in the history
Similar to #8026 , this allows applications to specify a link back to
the application itself to process the Magic Link authentication flow.

Closes #8028
  • Loading branch information
scotttrinh authored Nov 22, 2024
1 parent d7f1f4c commit 5111a08
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 5 deletions.
22 changes: 20 additions & 2 deletions edb/server/protocol/auth_ext/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,13 @@ async def handle_magic_link_register(
data.get("redirect_to")
)

allowed_link_url = self._maybe_make_allowed_url(data.get("link_url"))
link_url = (
allowed_link_url.url
if allowed_link_url
else f"{self.base_path}/magic-link/authenticate"
)

magic_link_client = magic_link.Client(
db=self.db,
issuer=self.base_path,
Expand Down Expand Up @@ -1149,6 +1156,7 @@ async def handle_magic_link_register(
identity_id=email_factor.identity.id,
email_factor_id=email_factor.id,
magic_link_token=magic_link_token,
magic_link_url=link_url,
)
)
logger.info(
Expand All @@ -1157,7 +1165,7 @@ async def handle_magic_link_register(
)
await magic_link_client.send_magic_link(
email=email,
link_url=f"{self.base_path}/magic-link/authenticate",
link_url=link_url,
redirect_on_failure=allowed_redirect_on_failure.url,
token=magic_link_token,
)
Expand Down Expand Up @@ -1239,6 +1247,15 @@ async def handle_magic_link_email(
data.get("redirect_to")
)

allowed_link_url = self._maybe_make_allowed_url(
data.get("link_url")
)
link_url = (
allowed_link_url.url
if allowed_link_url
else f"{self.base_path}/magic-link/authenticate"
)

magic_link_client = magic_link.Client(
db=self.db,
issuer=self.base_path,
Expand Down Expand Up @@ -1268,12 +1285,13 @@ async def handle_magic_link_email(
identity_id=identity_id,
email_factor_id=email_factor.id,
magic_link_token=magic_link_token,
magic_link_url=link_url,
)
)
await magic_link_client.send_magic_link(
email=email,
token=magic_link_token,
link_url=f"{self.base_path}/magic-link/authenticate",
link_url=link_url,
redirect_on_failure=redirect_on_failure,
)
logger.info(
Expand Down
1 change: 1 addition & 0 deletions edb/server/protocol/auth_ext/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class MagicLinkRequested(Event, HasIdentity, HasEmailFactor):
init=False,
)
magic_link_token: str
magic_link_url: str

def __repr__(self) -> str:
return (
Expand Down
232 changes: 229 additions & 3 deletions tests/test_http_ext_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4339,13 +4339,15 @@ async def test_http_auth_ext_webauthn_authenticate_options(self):
)
)

async def test_http_auth_ext_magic_link_01(self):
async def test_http_auth_ext_magic_link_with_link_url(self):
email = f"{uuid.uuid4()}@example.com"
challenge = "test_challenge"
callback_url = "https://example.com/app/auth/callback"
redirect_on_failure = "https://example.com/app/auth/magic-link-failure"
link_url = "https://example.com/app/magic-link/authenticate"

with self.http_con() as http_con:
# Register with link_url
_, _, status = self.http_con_request(
http_con,
method="POST",
Expand All @@ -4357,6 +4359,7 @@ async def test_http_auth_ext_magic_link_01(self):
"challenge": challenge,
"callback_url": callback_url,
"redirect_on_failure": redirect_on_failure,
"link_url": link_url,
}
).encode(),
headers={
Expand Down Expand Up @@ -4387,10 +4390,174 @@ async def test_http_auth_ext_magic_link_01(self):
r'<p style="word-break: break-all">([^<]+)', html_email
)
assert match is not None
verify_url = urllib.parse.urlparse(match.group(1))
search_params = urllib.parse.parse_qs(verify_url.query)
magic_link_url = urllib.parse.urlparse(match.group(1))
search_params = urllib.parse.parse_qs(magic_link_url.query)
token = search_params.get("token", [None])[0]
assert token is not None
self.assertEqual(
urllib.parse.urlunparse(
(
magic_link_url.scheme,
magic_link_url.netloc,
magic_link_url.path,
'',
'',
'',
)
),
link_url,
)

_, headers, status = self.http_con_request(
http_con,
method="GET",
path=f"magic-link/authenticate?token={token}",
)

self.assertEqual(status, 302)
location = headers.get("location")
assert location is not None
parsed_location = urllib.parse.urlparse(location)
self.assertEqual(
urllib.parse.urlunparse(
(
parsed_location.scheme,
parsed_location.netloc,
parsed_location.path,
'',
'',
'',
)
),
callback_url,
)

# Sign in with the registered email and link_url
_, _, status = self.http_con_request(
http_con,
method="POST",
path="magic-link/email",
body=json.dumps(
{
"provider": "builtin::local_magic_link",
"email": email,
"challenge": challenge,
"callback_url": callback_url,
"redirect_on_failure": redirect_on_failure,
"link_url": link_url,
}
).encode(),
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)

# Get the token from email
file_name_hash = hashlib.sha256(
f"{SENDER}{email}".encode()
).hexdigest()
test_file = os.environ.get(
"EDGEDB_TEST_EMAIL_FILE",
f"/tmp/edb-test-email-{file_name_hash}.pickle",
)
with open(test_file, "rb") as f:
email_args = pickle.load(f)
self.assertEqual(email_args["sender"], SENDER)
self.assertEqual(email_args["recipients"], email)
msg = cast(EmailMessage, email_args["message"]).get_body(
("html",)
)
assert msg is not None
html_email = msg.get_payload(decode=True).decode("utf-8")
match = re.search(
r'<p style="word-break: break-all">([^<]+)', html_email
)
assert match is not None
magic_link_url = urllib.parse.urlparse(match.group(1))
search_params = urllib.parse.parse_qs(magic_link_url.query)
token = search_params.get("token", [None])[0]
assert token is not None
self.assertEqual(
urllib.parse.urlunparse(
(
magic_link_url.scheme,
magic_link_url.netloc,
magic_link_url.path,
'',
'',
'',
)
),
link_url,
)

async def test_http_auth_ext_magic_link_without_link_url(self):
email = f"{uuid.uuid4()}@example.com"
challenge = "test_challenge"
callback_url = "https://example.com/app/auth/callback"
redirect_on_failure = "https://example.com/app/auth/magic-link-failure"

with self.http_con() as http_con:
# Register without link_url
_, _, status = self.http_con_request(
http_con,
method="POST",
path="magic-link/register",
body=json.dumps(
{
"provider": "builtin::local_magic_link",
"email": email,
"challenge": challenge,
"callback_url": callback_url,
"redirect_on_failure": redirect_on_failure,
}
).encode(),
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)
self.assertEqual(status, 200)

# Get the token from email
file_name_hash = hashlib.sha256(
f"{SENDER}{email}".encode()
).hexdigest()
test_file = os.environ.get(
"EDGEDB_TEST_EMAIL_FILE",
f"/tmp/edb-test-email-{file_name_hash}.pickle",
)
with open(test_file, "rb") as f:
email_args = pickle.load(f)
self.assertEqual(email_args["sender"], SENDER)
self.assertEqual(email_args["recipients"], email)
msg = cast(EmailMessage, email_args["message"]).get_body(
("html",)
)
assert msg is not None
html_email = msg.get_payload(decode=True).decode("utf-8")
match = re.search(
r'<p style="word-break: break-all">([^<]+)', html_email
)
assert match is not None
magic_link_url = urllib.parse.urlparse(match.group(1))
search_params = urllib.parse.parse_qs(magic_link_url.query)
token = search_params.get("token", [None])[0]
assert token is not None
self.assertEqual(
urllib.parse.urlunparse(
(
magic_link_url.scheme,
magic_link_url.netloc,
magic_link_url.path,
'',
'',
'',
)
),
f"{self.http_addr}/magic-link/authenticate",
)

_, headers, status = self.http_con_request(
http_con,
Expand All @@ -4416,6 +4583,65 @@ async def test_http_auth_ext_magic_link_01(self):
callback_url,
)

# Sign in with the registered email without link_url
_, _, status = self.http_con_request(
http_con,
method="POST",
path="magic-link/email",
body=json.dumps(
{
"provider": "builtin::local_magic_link",
"email": email,
"challenge": challenge,
"callback_url": callback_url,
"redirect_on_failure": redirect_on_failure,
}
).encode(),
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
)

# Get the token from email
file_name_hash = hashlib.sha256(
f"{SENDER}{email}".encode()
).hexdigest()
test_file = os.environ.get(
"EDGEDB_TEST_EMAIL_FILE",
f"/tmp/edb-test-email-{file_name_hash}.pickle",
)
with open(test_file, "rb") as f:
email_args = pickle.load(f)
self.assertEqual(email_args["sender"], SENDER)
self.assertEqual(email_args["recipients"], email)
msg = cast(EmailMessage, email_args["message"]).get_body(
("html",)
)
assert msg is not None
html_email = msg.get_payload(decode=True).decode("utf-8")
match = re.search(
r'<p style="word-break: break-all">([^<]+)', html_email
)
assert match is not None
magic_link_url = urllib.parse.urlparse(match.group(1))
search_params = urllib.parse.parse_qs(magic_link_url.query)
token = search_params.get("token", [None])[0]
assert token is not None
self.assertEqual(
urllib.parse.urlunparse(
(
magic_link_url.scheme,
magic_link_url.netloc,
magic_link_url.path,
'',
'',
'',
)
),
f"{self.http_addr}/magic-link/authenticate",
)

async def test_http_auth_ext_identity_delete_cascade_01(self):
"""
Test deleting a LocalIdentity deletes the associated Factors and
Expand Down

0 comments on commit 5111a08

Please sign in to comment.