Skip to content

Commit

Permalink
Get verification data from request body (#6723)
Browse files Browse the repository at this point in the history
Allows getting the necessary verification data directly from the request
body in the case where the application does not have an expired
verification token.
  • Loading branch information
scotttrinh authored and aljazerzen committed Jan 25, 2024
1 parent eef1c67 commit ba1711e
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 12 deletions.
48 changes: 36 additions & 12 deletions edb/server/protocol/auth_ext/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,18 +607,42 @@ async def handle_resend_verification_email(
):
data = self._get_data_from_request(request)

_check_keyset(data, {"verification_token", "provider"})
(
identity_id,
_,
verify_url,
maybe_challenge,
maybe_redirect_to,
) = self._get_data_from_verification_token(data["verification_token"])

_check_keyset(data, {"provider"})
local_client = local.Client(db=self.db, provider_name=data["provider"])
email = await local_client.get_email_by_identity_id(identity_id)
if email is None:
verify_url = data.get("verify_url", f"{self.base_path}/ui/verify")
if "verification_token" in data:
(
identity_id,
_,
verify_url,
maybe_challenge,
maybe_redirect_to,
) = self._get_data_from_verification_token(
data["verification_token"]
)
email = await local_client.get_email_by_identity_id(identity_id)
elif "email" in data:
email = data["email"]
maybe_challenge = None
maybe_redirect_to = data.get("redirect_to")
if maybe_redirect_to and not self._is_url_allowed(
maybe_redirect_to
):
raise errors.InvalidData(
"Redirect URL does not match any allowed URLs.",
)

try:
(identity, _) = await local_client.get_identity_and_secret(
{"email": email}
)
identity_id = identity.id
except errors.NoIdentityFound:
identity_id = None
else:
raise errors.InvalidData("Missing 'verification_token' or 'email'")

if identity_id is None or email is None:
await auth_emails.send_fake_email(self.tenant)
else:
await self._send_verification_email(
Expand Down Expand Up @@ -1364,7 +1388,7 @@ async def _send_verification_email(
"iat": issued_at,
"challenge": maybe_challenge,
"redirect_to": maybe_redirect_to,
"verify_url": verify_url
"verify_url": verify_url,
},
expires_in=datetime.timedelta(seconds=0),
)
Expand Down
97 changes: 97 additions & 0 deletions tests/test_http_ext_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2338,6 +2338,103 @@ async def test_http_auth_ext_local_password_authenticate_01(self):
auth_data_redirect_on_failure["redirect_on_failure"],
)

async def test_http_auth_ext_resend_verification_email_with_token(self):
with self.http_con() as http_con:
# Register a new user
provider_config = await self.get_builtin_provider_config_by_name(
"local_emailpassword"
)
provider_name = provider_config.name
email = "[email protected]"
form_data = {
"provider": provider_name,
"email": email,
"password": "test_resend_password",
"challenge": str(uuid.uuid4()),
}
form_data_encoded = urllib.parse.urlencode(form_data).encode()

self.http_con_request(
http_con,
None,
path="register",
method="POST",
body=form_data_encoded,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)

# Get the verification token from email
test_file = os.environ.get(
"EDGEDB_TEST_EMAIL_FILE", "/tmp/edb-test-email.pickle"
)
with open(test_file, "rb") as f:
email_args = pickle.load(f)
self.assertEqual(email_args["sender"], "[email protected]")
self.assertEqual(email_args["recipients"], form_data["email"])
html_msg = email_args["message"].get_payload(0).get_payload(1)
html_email = html_msg.get_payload(decode=True).decode("utf-8")
match = re.search(r'<a href=[\'"]?([^\'" >]+)', html_email)
assert match is not None
verify_url = urllib.parse.urlparse(match.group(1))
search_params = urllib.parse.parse_qs(verify_url.query)
verification_token = search_params.get(
"verification_token", [None]
)[0]
assert verification_token is not None

# Resend verification email with the verification token
resend_data = {
"provider": form_data["provider"],
"verification_token": verification_token,
}
resend_data_encoded = urllib.parse.urlencode(resend_data).encode()

_, _, status = self.http_con_request(
http_con,
None,
path="resend-verification-email",
method="POST",
body=resend_data_encoded,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)

self.assertEqual(status, 200)

# Resend verification email with just the email
resend_data = {
"provider": form_data["provider"],
"email": email,
}
resend_data_encoded = urllib.parse.urlencode(resend_data).encode()

_, _, status = self.http_con_request(
http_con,
None,
path="resend-verification-email",
method="POST",
body=resend_data_encoded,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)

self.assertEqual(status, 200)

# Resend verification email with no email or token
resend_data = {
"provider": form_data["provider"],
}
resend_data_encoded = urllib.parse.urlencode(resend_data).encode()

_, _, status = self.http_con_request(
http_con,
None,
path="resend-verification-email",
method="POST",
body=resend_data_encoded,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)

self.assertEqual(status, 400)

async def test_http_auth_ext_token_01(self):
with self.http_con() as http_con:
# Create a PKCE challenge and verifier
Expand Down

0 comments on commit ba1711e

Please sign in to comment.