Skip to content

Commit

Permalink
generated with codegen at box/box-codegen@5aebe76 and spec at box/box…
Browse files Browse the repository at this point in the history
  • Loading branch information
box-sdk-build committed Sep 1, 2023
1 parent 7170a5f commit f838e43
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 48 deletions.
9 changes: 7 additions & 2 deletions box_sdk_gen/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
from typing import Optional

from .network import NetworkSession
from .schemas import AccessToken


class Authentication:
@abstractmethod
def retrieve_token(self, network_session: Optional[NetworkSession] = None) -> str:
def retrieve_token(
self, network_session: Optional[NetworkSession] = None
) -> AccessToken:
pass

@abstractmethod
def refresh(self, network_session: Optional[NetworkSession] = None) -> str:
def refresh_token(
self, network_session: Optional[NetworkSession] = None
) -> AccessToken:
pass
20 changes: 0 additions & 20 deletions box_sdk_gen/auth_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,3 @@ class FileScope(str, Enum):
ITEM_PREVIEW = 'item_preview'
ITEM_RENAME = 'item_rename'
ITEM_SHARE = 'item_share'


class AccessToken(BaseObject):
def __init__(
self,
access_token: Union[None, str] = None,
expires_in: Union[None, int] = None,
token_type: Union[None, str] = None,
restricted_to: Union[None, List[FileScope]] = None,
refresh_token: Union[None, str] = None,
issued_token_type: Union[None, str] = None,
**kwargs
):
super().__init__(**kwargs)
self.access_token = access_token
self.expires_in = expires_in
self.token_type = token_type
self.restricted_to = restricted_to
self.refresh_token = refresh_token
self.issued_token_type = issued_token_type
18 changes: 11 additions & 7 deletions box_sdk_gen/ccg_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
TokenRequestBoxSubjectType,
TokenRequest,
TokenRequestGrantType,
AccessToken,
)
from .fetch import fetch, FetchResponse, FetchOptions
from .network import NetworkSession
from .schemas import AccessToken


class CCGConfig:
Expand Down Expand Up @@ -62,7 +62,7 @@ def __init__(self, config: CCGConfig):
Configuration object of Client Credentials Grant auth.
"""
self.config = config
self.token: Union[None, str] = None
self.token: Union[None, AccessToken] = None

if config.user_id:
self.subject_id = self.config.user_id
Expand All @@ -71,17 +71,21 @@ def __init__(self, config: CCGConfig):
self.subject_type = TokenRequestBoxSubjectType.ENTERPRISE
self.subject_id = self.config.enterprise_id

def retrieve_token(self, network_session: Optional[NetworkSession] = None) -> str:
def retrieve_token(
self, network_session: Optional[NetworkSession] = None
) -> AccessToken:
"""
Return a current token or get a new one when not available.
:return:
Access token
"""
if self.token is None:
return self.refresh(network_session=network_session)
self.refresh_token(network_session=network_session)
return self.token

def refresh(self, network_session: Optional[NetworkSession] = None) -> str:
def refresh_token(
self, network_session: Optional[NetworkSession] = None
) -> AccessToken:
"""
Fetch a new access token
:return:
Expand All @@ -106,8 +110,8 @@ def refresh(self, network_session: Optional[NetworkSession] = None) -> str:
)

token_response = AccessToken.from_dict(json.loads(response.text))
self.token = token_response.access_token
return self.token
self.token = token_response
return token_response

def as_user(self, user_id: str):
"""
Expand Down
11 changes: 8 additions & 3 deletions box_sdk_gen/developer_token_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@

from .auth import Authentication
from .network import NetworkSession
from .schemas import AccessToken


class DeveloperTokenAuth(Authentication):
def __init__(self, token: str):
self.token: str = token
self.token: AccessToken = AccessToken(access_token=token)

def retrieve_token(self, network_session: Optional[NetworkSession] = None) -> str:
def retrieve_token(
self, network_session: Optional[NetworkSession] = None
) -> AccessToken:
return self.token

def refresh(self, network_session: Optional[NetworkSession] = None) -> str:
def refresh_token(
self, network_session: Optional[NetworkSession] = None
) -> AccessToken:
raise Exception("Developer token has expired. Please provide a new one.")
5 changes: 3 additions & 2 deletions box_sdk_gen/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def fetch(url: str, options: FetchOptions) -> FetchResponse:
)

if response.reauthentication_needed:
options.auth.refresh(options.network_session)
options.auth.refresh_token(options.network_session)
elif response.status_code != 429 and response.status_code < 500:
__raise_on_unsuccessful_request(
network_response=response.network_response,
Expand Down Expand Up @@ -161,7 +161,8 @@ def __compose_headers_for_request(options: FetchOptions) -> Dict[str, str]:
headers = options.headers or {}
if options.auth:
headers['Authorization'] = (
f'Bearer {options.auth.retrieve_token(options.network_session)}'
'Bearer'
f' {options.auth.retrieve_token(options.network_session).access_token}'
)

headers['User-Agent'] = USER_AGENT_HEADER
Expand Down
16 changes: 10 additions & 6 deletions box_sdk_gen/jwt_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
TokenRequestBoxSubjectType,
TokenRequest,
TokenRequestGrantType,
AccessToken,
)
from .fetch import fetch, FetchResponse, FetchOptions
from .network import NetworkSession
from .schemas import AccessToken


class JWTConfig:
Expand Down Expand Up @@ -142,7 +142,7 @@ def __init__(self, config: JWTConfig):
)

self.config = config
self.token = None
self.token: Union[None, AccessToken] = None

if config.enterprise_id:
self.subject_type = TokenRequestBoxSubjectType.ENTERPRISE
Expand All @@ -155,17 +155,21 @@ def __init__(self, config: JWTConfig):
config.private_key, config.private_key_passphrase
)

def retrieve_token(self, network_session: Optional[NetworkSession] = None) -> str:
def retrieve_token(
self, network_session: Optional[NetworkSession] = None
) -> AccessToken:
"""
Return a current token or get a new one when not available.
:return:
Access token
"""
if self.token is None:
return self.refresh(network_session=network_session)
self.refresh_token(network_session=network_session)
return self.token

def refresh(self, network_session: Optional[NetworkSession] = None) -> str:
def refresh_token(
self, network_session: Optional[NetworkSession] = None
) -> AccessToken:
"""
Fetch a new access token
:return:
Expand Down Expand Up @@ -215,7 +219,7 @@ def refresh(self, network_session: Optional[NetworkSession] = None) -> str:
)

token_response = AccessToken.from_dict(json.loads(response.text))
self.token = token_response.access_token
self.token = token_response
return self.token

def as_user(self, user_id: str):
Expand Down
19 changes: 13 additions & 6 deletions box_sdk_gen/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from typing import Union, Optional

from .auth import Authentication
from .auth_schemas import TokenRequest, TokenRequestGrantType, AccessToken
from .auth_schemas import TokenRequest, TokenRequestGrantType
from .fetch import fetch, FetchResponse, FetchOptions
from .network import NetworkSession
from .schemas import AccessToken


class OAuthConfig:
Expand Down Expand Up @@ -120,7 +121,9 @@ def get_tokens_authorization_code_grant(
self.token = self._send_token_request(request_body, network_session)
return self.token.access_token

def retrieve_token(self, network_session: Optional[NetworkSession] = None) -> str:
def retrieve_token(
self, network_session: Optional[NetworkSession] = None
) -> AccessToken:
"""
Return a current token or get a new one when not available.
:param network_session: An object to keep network session state
Expand All @@ -131,9 +134,13 @@ def retrieve_token(self, network_session: Optional[NetworkSession] = None) -> st
"Access and refresh tokens not available. Authenticate before making"
" any API call first."
)
return self.token.access_token
return self.token

def refresh(self, network_session: Optional[NetworkSession] = None) -> str:
def refresh_token(
self,
network_session: Optional[NetworkSession] = None,
refresh_token: Optional[str] = None,
) -> AccessToken:
"""
Refresh outdated access token with refresh token
:param network_session: An object to keep network session state
Expand All @@ -143,11 +150,11 @@ def refresh(self, network_session: Optional[NetworkSession] = None) -> str:
grant_type=TokenRequestGrantType.REFRESH_TOKEN,
client_id=self.config.client_id,
client_secret=self.config.client_secret,
refresh_token=self.token.refresh_token,
refresh_token=refresh_token or self.token.refresh_token,
)

self.token = self._send_token_request(request_body, network_session)
return self.token.access_token
return self.token

@staticmethod
def _send_token_request(
Expand Down
6 changes: 4 additions & 2 deletions test/auth.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from box_sdk_gen.schemas import AccessToken

from box_sdk_gen.utils import decode_base_64

from box_sdk_gen.utils import get_env_var
Expand Down Expand Up @@ -65,8 +67,8 @@ def test_developer_token_auth():
)
auth: JWTAuth = JWTAuth(config=jwt_config)
auth.as_user(user_id)
token: str = auth.retrieve_token()
dev_auth: DeveloperTokenAuth = DeveloperTokenAuth(token=token)
token: AccessToken = auth.retrieve_token()
dev_auth: DeveloperTokenAuth = DeveloperTokenAuth(token=token.access_token)
client: Client = Client(auth=dev_auth)
current_user: UserFull = client.users.get_user_me()
assert current_user.id == user_id
Expand Down

0 comments on commit f838e43

Please sign in to comment.