6
6
7
7
# These provide AWS cognito authentication support
8
8
from pycognito import Cognito
9
+ from pycognito .exceptions import TokenVerificationException
9
10
10
11
CLIENT_ID = "4qte47jbstod8apnfic0bunmrq"
11
12
USER_POOL = "us-east-2_ghlOXVLi1"
13
+ USER_POOL_URL = f"https://cognito-idp.us-east-2.amazonaws.com/{ USER_POOL } "
12
14
13
15
14
16
class Auth :
@@ -34,6 +36,8 @@ def __init__(
34
36
self .max_retry_delay = max (max_retry_delay , 0 )
35
37
self .pool_wellknown_jwks = None
36
38
39
+ self ._password = None
40
+
37
41
if (
38
42
tokens
39
43
and tokens ["access_token" ]
@@ -54,14 +58,24 @@ def __init__(
54
58
self .cognito = Cognito (
55
59
USER_POOL , CLIENT_ID , user_pool_region = "us-east-2" , username = username
56
60
)
57
- self .cognito .authenticate (password = password )
58
-
59
- self .tokens = self .refresh_tokens ()
61
+ self ._password = password
60
62
61
63
def refresh_tokens (self ) -> "dict[str, str]" :
62
64
"""Refresh and return new tokens."""
63
- self .cognito .renew_access_token ()
65
+ try :
66
+ if self ._password :
67
+ self .cognito .authenticate (password = self ._password )
68
+
69
+ self .cognito .renew_access_token ()
70
+ except TokenVerificationException as ex :
71
+ # ignore iat errors (until https://github.com/NabuCasa/pycognito/issues/225 is fixed)
72
+ if "The token is not yet valid (iat)" not in ex .args [0 ]:
73
+ raise
74
+ finally :
75
+ self ._password = None
76
+
64
77
tokens = self ._extract_tokens_from_cognito ()
78
+ self .tokens = tokens
65
79
66
80
if self .token_updater is not None :
67
81
self .token_updater (tokens )
@@ -131,11 +145,11 @@ def _do_request(self, method: str, path: str, **kwargs) -> requests.Response:
131
145
timeout = (self .connect_timeout , self .read_timeout ),
132
146
)
133
147
134
- def _decode_token (self , token : str ) -> dict :
148
+ def _decode_token (self , token : str , verify_exp : bool = False ) -> dict :
135
149
"""Decode a JWT token and return the payload as a dictionary, without a hard dependency on pycognito."""
136
150
if not self .pool_wellknown_jwks :
137
151
self .pool_wellknown_jwks = requests .get (
138
- f"https://cognito-idp.us-east-2.amazonaws.com/ { USER_POOL } /.well-known/jwks.json" ,
152
+ USER_POOL_URL + " /.well-known/jwks.json" ,
139
153
timeout = 5 ,
140
154
).json ()
141
155
@@ -147,10 +161,10 @@ def _decode_token(self, token: str) -> dict:
147
161
token ,
148
162
algorithms = ["RS256" ],
149
163
key = hmac_key ,
150
- options = {"verify_exp" : False , "verify_iat" : False , "verify_nbf" : False },
164
+ issuer = self .cognito .user_pool_url ,
165
+ options = {"verify_exp" : verify_exp , "verify_iat" : False , "verify_nbf" : False },
151
166
)
152
167
153
-
154
168
class SimulatedAuth (Auth ):
155
169
def __init__ (
156
170
self , host : str , username : Optional [str ] = None , password : Optional [str ] = None
@@ -161,8 +175,6 @@ def __init__(
161
175
self .connect_timeout = 6.03
162
176
self .read_timeout = 10.03
163
177
164
- self .tokens = self .refresh_tokens ()
165
-
166
178
def refresh_tokens (self ) -> dict [str , str ]:
167
179
return {"id_token" : "simulator" }
168
180
0 commit comments