diff --git a/src/blueapi/service/authentication.py b/src/blueapi/service/authentication.py index 42456ea35..3bf70aae8 100644 --- a/src/blueapi/service/authentication.py +++ b/src/blueapi/service/authentication.py @@ -5,6 +5,7 @@ import os import time from abc import ABC, abstractmethod +from functools import cached_property from http import HTTPStatus from pathlib import Path from typing import Any, cast @@ -70,13 +71,16 @@ def __init__( server_config: OIDCConfig, ) -> None: self._server_config = server_config - self._client = jwt.PyJWKClient(server_config.jwks_uri) self._token_manager: TokenManager = ( CliTokenManager(server_config.token_file_path) if isinstance(server_config, CLIClientConfig) else NoOpTokenManager() ) + @cached_property + def client(self): + return jwt.PyJWKClient(self._server_config.jwks_uri) + def get_token(self) -> dict[str, Any]: return self._token_manager.load_token() @@ -84,7 +88,7 @@ def logout(self) -> None: self._token_manager.delete_token() def decode_jwt(self, json_web_token: str): - signing_key = self._client.get_signing_key(json_web_token) + signing_key = self.client.get_signing_key_from_jwt(json_web_token) return jwt.decode( json_web_token, signing_key.key, @@ -101,6 +105,8 @@ def decode_token(self, token: dict[str, Any]) -> dict[str, Any]: except jwt.DecodeError: # Else, we check if the id_token is still valid return self.decode_jwt(token["id_token"]) + except Exception as e: + print(e) def refresh_auth_token(self) -> None: token = self._token_manager.load_token() diff --git a/tests/unit_tests/service/test_authentication.py b/tests/unit_tests/service/test_authentication.py index dbad58373..b08b45f30 100644 --- a/tests/unit_tests/service/test_authentication.py +++ b/tests/unit_tests/service/test_authentication.py @@ -55,13 +55,15 @@ def test_poll_for_token( @patch("time.sleep") def test_poll_for_token_timeout( mock_sleep, + valid_oidc_config: dict[str, Any], mock_authn_server: responses.RequestsMock, session_manager: SessionManager, device_code: str, ): mock_authn_server.stop() + mock_authn_server.remove(responses.POST, valid_oidc_config["token_endpoint"]) mock_authn_server.post( - url="https://example.com/token", + url=valid_oidc_config["token_endpoint"], json={"error": "authorization_pending"}, status=HTTP_403_FORBIDDEN, )