Skip to content

Commit

Permalink
Ignore iat verification exceptions during auth for now
Browse files Browse the repository at this point in the history
  • Loading branch information
magico13 committed Apr 13, 2024
1 parent 397de30 commit bfbb060
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions pyemvue/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

# These provide AWS cognito authentication support
from pycognito import Cognito
from pycognito.exceptions import TokenVerificationException

CLIENT_ID = "4qte47jbstod8apnfic0bunmrq"
USER_POOL = "us-east-2_ghlOXVLi1"
USER_POOL_URL = f"https://cognito-idp.us-east-2.amazonaws.com/{USER_POOL}"


class Auth:
Expand All @@ -33,6 +35,9 @@ def __init__(
self.initial_retry_delay = max(initial_retry_delay, 0.5)
self.max_retry_delay = max(max_retry_delay, 0)
self.pool_wellknown_jwks = None
self.tokens = tokens or {}

self._password = None

if (
tokens
Expand All @@ -58,12 +63,20 @@ def __init__(

def refresh_tokens(self) -> "dict[str, str]":
"""Refresh and return new tokens."""
if self._password:
self.cognito.authenticate(password=self._password)
try:
if self._password:
self.cognito.authenticate(password=self._password)

self.cognito.renew_access_token()
except TokenVerificationException as ex:
# ignore iat errors (until https://github.com/NabuCasa/pycognito/issues/225 is fixed)
if "The token is not yet valid (iat)" not in ex.args[0]:
raise
finally:
self._password = None

self.cognito.renew_access_token()
tokens = self._extract_tokens_from_cognito()
self.tokens = tokens

if self.token_updater is not None:
self.token_updater(tokens)
Expand Down Expand Up @@ -133,11 +146,11 @@ def _do_request(self, method: str, path: str, **kwargs) -> requests.Response:
timeout=(self.connect_timeout, self.read_timeout),
)

def _decode_token(self, token: str) -> dict:
def _decode_token(self, token: str, verify_exp: bool = False) -> dict:
"""Decode a JWT token and return the payload as a dictionary, without a hard dependency on pycognito."""
if not self.pool_wellknown_jwks:
self.pool_wellknown_jwks = requests.get(
f"https://cognito-idp.us-east-2.amazonaws.com/{USER_POOL}/.well-known/jwks.json",
USER_POOL_URL + "/.well-known/jwks.json",
timeout=5,
).json()

Expand All @@ -149,10 +162,10 @@ def _decode_token(self, token: str) -> dict:
token,
algorithms=["RS256"],
key=hmac_key,
options={"verify_exp": False, "verify_iat": False, "verify_nbf": False},
issuer=self.cognito.user_pool_url,
options={"verify_exp": verify_exp, "verify_iat": False, "verify_nbf": False},
)


class SimulatedAuth(Auth):
def __init__(
self, host: str, username: Optional[str] = None, password: Optional[str] = None
Expand Down

0 comments on commit bfbb060

Please sign in to comment.