Skip to content

Commit

Permalink
feat: add new method signatures (#105)
Browse files Browse the repository at this point in the history
Co-authored-by: Chris Tran <[email protected]>
  • Loading branch information
tdeshong and ctran88 authored Dec 5, 2024
1 parent 9cf20e4 commit 9246e3c
Show file tree
Hide file tree
Showing 8 changed files with 519 additions and 470 deletions.
84 changes: 84 additions & 0 deletions passageidentity/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Provides the Auth class for interacting with the Passage API."""

from __future__ import annotations

from typing import TYPE_CHECKING

import jwt
import jwt.algorithms

from passageidentity.errors import PassageError
from passageidentity.helper import fetch_app
from passageidentity.openapi_client.api.magic_links_api import MagicLinksApi
from passageidentity.openapi_client.exceptions import ApiException
from passageidentity.openapi_client.models.create_magic_link_request import CreateMagicLinkRequest

if TYPE_CHECKING:
from passageidentity.openapi_client.models.magic_link import MagicLink

CreateMagicLinkArgs = CreateMagicLinkRequest


class Auth:
"""Auth class for handling operations to authenticate and validate JWTs."""

def __init__(self, app_id: str, request_headers: dict[str, str]) -> None:
"""Initialize the Auth class with the app ID and request headers."""
self.app_id = app_id
self.request_headers = request_headers
self.jwks = jwt.PyJWKClient(
f"https://auth.passage.id/v1/apps/{self.app_id}/.well-known/jwks.json",
# must set a user agent to avoid 403 from CF
headers={"User-Agent": "passageidentity/python"},
)
self.app = fetch_app(self.app_id)

self.magic_links_api = MagicLinksApi()

def validate_jwt(self, token: str) -> str:
"""Verify the JWT and return the user ID for the authenticated user, or throw a PassageError."""
try:
kid = jwt.get_unverified_header(token)["kid"]
public_key = self.jwks.get_signing_key(kid)
claims = jwt.decode(
token,
public_key,
audience=[self.app_id] if self.app["hosted"] else self.app["auth_origin"],
algorithms=["RS256"],
)

return claims["sub"]
except Exception as e:
msg = f"JWT is not valid: {e}"
raise PassageError(msg) from e

def create_magic_link(self, args: CreateMagicLinkArgs) -> MagicLink:
"""Create a Magic Link for your app."""
magic_link_req = {}
args_dict = args.to_dict() if isinstance(args, CreateMagicLinkRequest) else args

magic_link_req["user_id"] = args_dict.get("user_id") or ""
magic_link_req["email"] = args_dict.get("email") or ""
magic_link_req["phone"] = args_dict.get("phone") or ""

magic_link_req["language"] = args_dict.get("language") or ""
magic_link_req["magic_link_path"] = args_dict.get("magic_link_path") or ""
magic_link_req["redirect_url"] = args_dict.get("redirect_url") or ""
magic_link_req["send"] = args_dict.get("send") or False
magic_link_req["ttl"] = args_dict.get("ttl") or 0
magic_link_req["type"] = args_dict.get("type") or "login"

if args_dict.get("email"):
magic_link_req["channel"] = args_dict.get("channel") or "email"
elif args_dict.get("phone"):
magic_link_req["channel"] = args_dict.get("channel") or "phone"

try:
return self.magic_links_api.create_magic_link(
self.app_id,
magic_link_req, # type: ignore[arg-type]
_headers=self.request_headers,
).magic_link
except ApiException as e:
msg = "Could not create a magic link for this app"
raise PassageError.from_response_error(e, msg) from e
27 changes: 25 additions & 2 deletions passageidentity/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,45 @@

from __future__ import annotations

from typing import TYPE_CHECKING

import typing_extensions

if TYPE_CHECKING:
from passageidentity.openapi_client.exceptions import ApiException


class PassageError(Exception):
"""Error class for handling Passage errors."""

@typing_extensions.deprecated(
"This should only be constructed by the Passage SDK. Use this type just for type checking.",
)
def __init__(
self,
message: str,
status_code: int | None = None,
status_text: str | None = None,
body: dict | None = None,
error_code: str | None = None,
) -> None:
"""Initialize the error with a message, status code, status text, and optional body."""
self.message = message
self.status_code = status_code
self.status_text = status_text

self.error_code = error_code
self.error = None

if body is not None:
self.error = body["error"]
else:
self.error = None
self.error_code = body["code"]

@classmethod
def from_response_error(cls, response_error: ApiException, message: str | None = None) -> PassageError:
"""Initialize the error with a response body and optional message."""
error_code = response_error.body["code"] if response_error.body else None
error_msg = response_error.body["error"] if response_error.body else None
msg = ": ".join(filter(None, [message, error_msg]))

return cls(message=msg, status_code=response_error.status, error_code=error_code)
6 changes: 3 additions & 3 deletions passageidentity/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from passageidentity.errors import PassageError

BEARER_PATTERN = r"Bearer ([^\s,]+)"
BASE_URL = "https://api.passage.id/v1/apps/"


def extract_token(auth_header: str) -> str:
Expand Down Expand Up @@ -50,9 +49,10 @@ def get_auth_token_from_request(request: Request, auth_strategy: int) -> str:
def fetch_app(app_id: str) -> dict:
"""Fetch the public key for the given app id from Passage."""
# unauthenticated request to get the public key
r = requests.get(BASE_URL + app_id)
r = requests.get(f"https://api.passage.id/v1/apps/{app_id}")

if r.status_code != HTTPStatus.OK:
raise PassageError("Could not fetch app information for app id " + app_id)
msg = f"Could not fetch app information for app id {app_id}"
raise PassageError(msg)

return r.json()["app"]
Loading

0 comments on commit 9246e3c

Please sign in to comment.