diff --git a/edb/server/protocol/auth_ext/http.py b/edb/server/protocol/auth_ext/http.py index c4f18242979..92f2c0d91bf 100644 --- a/edb/server/protocol/auth_ext/http.py +++ b/edb/server/protocol/auth_ext/http.py @@ -607,18 +607,42 @@ async def handle_resend_verification_email( ): data = self._get_data_from_request(request) - _check_keyset(data, {"verification_token", "provider"}) - ( - identity_id, - _, - verify_url, - maybe_challenge, - maybe_redirect_to, - ) = self._get_data_from_verification_token(data["verification_token"]) - + _check_keyset(data, {"provider"}) local_client = local.Client(db=self.db, provider_name=data["provider"]) - email = await local_client.get_email_by_identity_id(identity_id) - if email is None: + verify_url = data.get("verify_url", f"{self.base_path}/ui/verify") + if "verification_token" in data: + ( + identity_id, + _, + verify_url, + maybe_challenge, + maybe_redirect_to, + ) = self._get_data_from_verification_token( + data["verification_token"] + ) + email = await local_client.get_email_by_identity_id(identity_id) + elif "email" in data: + email = data["email"] + maybe_challenge = None + maybe_redirect_to = data.get("redirect_to") + if maybe_redirect_to and not self._is_url_allowed( + maybe_redirect_to + ): + raise errors.InvalidData( + "Redirect URL does not match any allowed URLs.", + ) + + try: + (identity, _) = await local_client.get_identity_and_secret( + {"email": email} + ) + identity_id = identity.id + except errors.NoIdentityFound: + identity_id = None + else: + raise errors.InvalidData("Missing 'verification_token' or 'email'") + + if identity_id is None or email is None: await auth_emails.send_fake_email(self.tenant) else: await self._send_verification_email( @@ -1364,7 +1388,7 @@ async def _send_verification_email( "iat": issued_at, "challenge": maybe_challenge, "redirect_to": maybe_redirect_to, - "verify_url": verify_url + "verify_url": verify_url, }, expires_in=datetime.timedelta(seconds=0), ) diff --git a/tests/test_http_ext_auth.py b/tests/test_http_ext_auth.py index 4b13141c730..3d9f01ede27 100644 --- a/tests/test_http_ext_auth.py +++ b/tests/test_http_ext_auth.py @@ -2338,6 +2338,103 @@ async def test_http_auth_ext_local_password_authenticate_01(self): auth_data_redirect_on_failure["redirect_on_failure"], ) + async def test_http_auth_ext_resend_verification_email_with_token(self): + with self.http_con() as http_con: + # Register a new user + provider_config = await self.get_builtin_provider_config_by_name( + "local_emailpassword" + ) + provider_name = provider_config.name + email = "test_resend@example.com" + form_data = { + "provider": provider_name, + "email": email, + "password": "test_resend_password", + "challenge": str(uuid.uuid4()), + } + form_data_encoded = urllib.parse.urlencode(form_data).encode() + + self.http_con_request( + http_con, + None, + path="register", + method="POST", + body=form_data_encoded, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + + # Get the verification token from email + test_file = os.environ.get( + "EDGEDB_TEST_EMAIL_FILE", "/tmp/edb-test-email.pickle" + ) + with open(test_file, "rb") as f: + email_args = pickle.load(f) + self.assertEqual(email_args["sender"], "noreply@example.com") + self.assertEqual(email_args["recipients"], form_data["email"]) + html_msg = email_args["message"].get_payload(0).get_payload(1) + html_email = html_msg.get_payload(decode=True).decode("utf-8") + match = re.search(r']+)', html_email) + assert match is not None + verify_url = urllib.parse.urlparse(match.group(1)) + search_params = urllib.parse.parse_qs(verify_url.query) + verification_token = search_params.get( + "verification_token", [None] + )[0] + assert verification_token is not None + + # Resend verification email with the verification token + resend_data = { + "provider": form_data["provider"], + "verification_token": verification_token, + } + resend_data_encoded = urllib.parse.urlencode(resend_data).encode() + + _, _, status = self.http_con_request( + http_con, + None, + path="resend-verification-email", + method="POST", + body=resend_data_encoded, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + + self.assertEqual(status, 200) + + # Resend verification email with just the email + resend_data = { + "provider": form_data["provider"], + "email": email, + } + resend_data_encoded = urllib.parse.urlencode(resend_data).encode() + + _, _, status = self.http_con_request( + http_con, + None, + path="resend-verification-email", + method="POST", + body=resend_data_encoded, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + + self.assertEqual(status, 200) + + # Resend verification email with no email or token + resend_data = { + "provider": form_data["provider"], + } + resend_data_encoded = urllib.parse.urlencode(resend_data).encode() + + _, _, status = self.http_con_request( + http_con, + None, + path="resend-verification-email", + method="POST", + body=resend_data_encoded, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + + self.assertEqual(status, 400) + async def test_http_auth_ext_token_01(self): with self.http_con() as http_con: # Create a PKCE challenge and verifier