From a1a9940b8a8941d79e13cf22135e767253a35dce Mon Sep 17 00:00:00 2001 From: Mike Marvin Date: Sat, 13 Apr 2024 19:44:31 -0400 Subject: [PATCH] Tweaks to Auth Handling (#75) Ignore iat verification exceptions for now and return False for login when it fails due to wrong credentials. Initialize the Auth object first, get tokens in a separate call. --- pyemvue/__version__.py | 2 +- pyemvue/auth.py | 32 ++++++++++++++++++++++---------- pyemvue/pyemvue.py | 5 +++++ 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/pyemvue/__version__.py b/pyemvue/__version__.py index 3d5ecb9..9f6fbca 100644 --- a/pyemvue/__version__.py +++ b/pyemvue/__version__.py @@ -1 +1 @@ -VERSION = "0.18.4" +VERSION = "0.18.5" diff --git a/pyemvue/auth.py b/pyemvue/auth.py index 06517ab..4daa7d5 100644 --- a/pyemvue/auth.py +++ b/pyemvue/auth.py @@ -6,9 +6,11 @@ # These provide AWS cognito authentication support from pycognito import Cognito +from pycognito.exceptions import TokenVerificationException CLIENT_ID = "4qte47jbstod8apnfic0bunmrq" USER_POOL = "us-east-2_ghlOXVLi1" +USER_POOL_URL = f"https://cognito-idp.us-east-2.amazonaws.com/{USER_POOL}" class Auth: @@ -34,6 +36,8 @@ def __init__( self.max_retry_delay = max(max_retry_delay, 0) self.pool_wellknown_jwks = None + self._password = None + if ( tokens and tokens["access_token"] @@ -54,14 +58,24 @@ def __init__( self.cognito = Cognito( USER_POOL, CLIENT_ID, user_pool_region="us-east-2", username=username ) - self.cognito.authenticate(password=password) - - self.tokens = self.refresh_tokens() + self._password = password def refresh_tokens(self) -> "dict[str, str]": """Refresh and return new tokens.""" - self.cognito.renew_access_token() + try: + if self._password: + self.cognito.authenticate(password=self._password) + + self.cognito.renew_access_token() + except TokenVerificationException as ex: + # ignore iat errors (until https://github.com/NabuCasa/pycognito/issues/225 is fixed) + if "The token is not yet valid (iat)" not in ex.args[0]: + raise + finally: + self._password = None + tokens = self._extract_tokens_from_cognito() + self.tokens = tokens if self.token_updater is not None: self.token_updater(tokens) @@ -131,11 +145,11 @@ def _do_request(self, method: str, path: str, **kwargs) -> requests.Response: timeout=(self.connect_timeout, self.read_timeout), ) - def _decode_token(self, token: str) -> dict: + def _decode_token(self, token: str, verify_exp: bool = False) -> dict: """Decode a JWT token and return the payload as a dictionary, without a hard dependency on pycognito.""" if not self.pool_wellknown_jwks: self.pool_wellknown_jwks = requests.get( - f"https://cognito-idp.us-east-2.amazonaws.com/{USER_POOL}/.well-known/jwks.json", + USER_POOL_URL + "/.well-known/jwks.json", timeout=5, ).json() @@ -147,10 +161,10 @@ def _decode_token(self, token: str) -> dict: token, algorithms=["RS256"], key=hmac_key, - options={"verify_exp": False, "verify_iat": False, "verify_nbf": False}, + issuer=self.cognito.user_pool_url, + options={"verify_exp": verify_exp, "verify_iat": False, "verify_nbf": False}, ) - class SimulatedAuth(Auth): def __init__( self, host: str, username: Optional[str] = None, password: Optional[str] = None @@ -161,8 +175,6 @@ def __init__( self.connect_timeout = 6.03 self.read_timeout = 10.03 - self.tokens = self.refresh_tokens() - def refresh_tokens(self) -> dict[str, str]: return {"id_token": "simulator"} diff --git a/pyemvue/pyemvue.py b/pyemvue/pyemvue.py index 9c486c4..e49e327 100644 --- a/pyemvue/pyemvue.py +++ b/pyemvue/pyemvue.py @@ -366,6 +366,11 @@ def login( token_updater=self._store_tokens, ) + try: + self.auth.refresh_tokens() + except self.auth.cognito.client.exceptions.NotAuthorizedException as ex: + return False + if self.auth.tokens: self.username = self.auth.get_username() self.customer = self.get_customer_details()