diff --git a/box_sdk_gen/auth_schemas.py b/box_sdk_gen/auth_schemas.py deleted file mode 100644 index bfc428c..0000000 --- a/box_sdk_gen/auth_schemas.py +++ /dev/null @@ -1,74 +0,0 @@ -from enum import Enum -from typing import Union - -from .base_object import BaseObject - - -class TokenRequestGrantType(str, Enum): - AUTHORIZATION_CODE = 'authorization_code' - REFRESH_TOKEN = 'refresh_token' - CLIENT_CREDENTIALS = 'client_credentials' - URN_IETF_PARAMS_OAUTH_GRANT_TYPE_JWT_BEARER = ( - 'urn:ietf:params:oauth:grant-type:jwt-bearer' - ) - URN_IETF_PARAMS_OAUTH_GRANT_TYPE_TOKEN_EXCHANGE = ( - 'urn:ietf:params:oauth:grant-type:token-exchange' - ) - - -class TokenRequestBoxSubjectType(str, Enum): - ENTERPRISE = 'enterprise' - USER = 'user' - - -class TokenRequest(BaseObject): - def __init__( - self, - grant_type: TokenRequestGrantType, - client_id: Union[None, str] = None, - client_secret: Union[None, str] = None, - code: Union[None, str] = None, - refresh_token: Union[None, str] = None, - assertion: Union[None, str] = None, - subject_token: Union[None, str] = None, - subject_token_type: Union[None, str] = None, - actor_token: Union[None, str] = None, - actor_token_type: Union[None, str] = None, - scope: Union[None, str] = None, - resource: Union[None, str] = None, - box_subject_type: Union[None, TokenRequestBoxSubjectType] = None, - box_subject_id: Union[None, str] = None, - box_shared_link: Union[None, str] = None, - **kwargs - ): - super().__init__(**kwargs) - self.grant_type = grant_type - self.client_id = client_id - self.client_secret = client_secret - self.code = code - self.refresh_token = refresh_token - self.assertion = assertion - self.subject_token = subject_token - self.subject_token_type = subject_token_type - self.actor_token = actor_token - self.actor_token_type = actor_token_type - self.scope = scope - self.resource = resource - self.box_subject_type = box_subject_type - self.box_subject_id = box_subject_id - self.box_shared_link = box_shared_link - - -class FileScope(str, Enum): - ANNOTATION_EDIT = 'annotation_edit' - ANNOTATION_VIEW_ALL = 'annotation_view_all' - ANNOTATION_VIEW_SELF = 'annotation_view_self' - BASE_EXPLORER = 'base_explorer' - BASE_PICKER = 'base_picker' - BASE_PREVIEW = 'base_preview' - BASE_UPLOAD = 'base_upload' - ITEM_DELETE = 'item_delete' - ITEM_DOWNLOAD = 'item_download' - ITEM_PREVIEW = 'item_preview' - ITEM_RENAME = 'item_rename' - ITEM_SHARE = 'item_share' diff --git a/box_sdk_gen/ccg_auth.py b/box_sdk_gen/ccg_auth.py index 840b1d5..701778f 100644 --- a/box_sdk_gen/ccg_auth.py +++ b/box_sdk_gen/ccg_auth.py @@ -1,15 +1,24 @@ -from typing import Union, Optional +from typing import Optional -from .auth import Authentication -from .token_storage import TokenStorage, InMemoryTokenStorage -from .auth_schemas import ( - TokenRequestBoxSubjectType, - TokenRequest, - TokenRequestGrantType, -) -from .fetch import fetch, FetchResponse, FetchOptions -from .network import NetworkSession -from .schemas import AccessToken +from typing import List + +from box_sdk_gen.managers.authorization import RequestAccessTokenGrantType + +from box_sdk_gen.managers.authorization import RequestAccessTokenSubjectTokenType + +from box_sdk_gen.schemas import AccessToken + +from box_sdk_gen.schemas import PostOAuth2TokenBoxSubjectTypeField + +from box_sdk_gen.auth import Authentication + +from box_sdk_gen.network import NetworkSession + +from box_sdk_gen.token_storage import TokenStorage + +from box_sdk_gen.token_storage import InMemoryTokenStorage + +from box_sdk_gen.managers.authorization import AuthorizationManager class CCGConfig: @@ -17,43 +26,24 @@ def __init__( self, client_id: str, client_secret: str, - enterprise_id: Union[None, str] = None, - user_id: Union[None, str] = None, + enterprise_id: Optional[str] = None, + user_id: Optional[str] = None, token_storage: TokenStorage = None, ): """ - :param client_id: - Box API key used for identifying the application the user is authenticating with. - :param client_secret: - Box API secret used for making auth requests. - :param enterprise_id: - The ID of the Box Developer Edition enterprise. - - May be `None`, if the caller knows that it will not be - authenticating as an enterprise instance / service account. - - If `user_id` is passed, this value is not used, unless - `authenticate_enterprise()` is called to authenticate as the enterprise instance. - :param user_id: - The user id to authenticate. This value is not required. But if it is provided, then the user - will be auto-authenticated at the time of the first API call. - - Should be `None` if the intention is to authenticate as the - enterprise instance / service account. If both `enterprise_id` and - `user_id` are non-`None`, the `user` takes precedense when `refresh()` - is called. - - - - :param token_storage: - Object responsible for storing token. If no custom implementation provided, - the token will be stored in memory. + :param client_id: Box API key used for identifying the application the user is authenticating with + :type client_id: str + :param client_secret: Box API secret used for making auth requests. + :type client_secret: str + :param enterprise_id: The ID of the Box Developer Edition enterprise. + :type enterprise_id: Optional[str], optional + :param user_id: The user id to authenticate. This value is not required. But if it is provided, then the user will be auto-authenticated at the time of the first API call. + :type user_id: Optional[str], optional + :param token_storage: Object responsible for storing token. If no custom implementation provided,the token will be stored in memory. + :type token_storage: TokenStorage, optional """ if token_storage is None: token_storage = InMemoryTokenStorage() - if not enterprise_id and not user_id: - raise Exception("Enterprise ID or User ID is needed") - self.client_id = client_id self.client_secret = client_secret self.enterprise_id = enterprise_id @@ -62,89 +52,161 @@ def __init__( class BoxCCGAuth(Authentication): - def __init__(self, config: CCGConfig): + def __init__(self, config: CCGConfig, **kwargs): """ - :param config: - Configuration object of Client Credentials Grant auth. + :param config: Configuration object of Client Credentials Grant auth. + :type config: CCGConfig """ + super().__init__(**kwargs) self.config = config - self.token_storage = config.token_storage - - if config.user_id: - self.subject_id = self.config.user_id - self.subject_type = TokenRequestBoxSubjectType.USER - else: - self.subject_type = TokenRequestBoxSubjectType.ENTERPRISE - self.subject_id = self.config.enterprise_id + self.token_storage = self.config.token_storage + self.subject_id = ( + self.config.user_id + if not self.config.user_id == None + else self.config.enterprise_id + ) + self.subject_type = 'user' if not self.config.user_id == None else 'enterprise' - def retrieve_token( + def refresh_token( self, network_session: Optional[NetworkSession] = None ) -> AccessToken: """ - Return a current token or get a new one when not available. + Get a new access token using CCG auth :param network_session: An object to keep network session state - :return: Access token + :type network_session: Optional[NetworkSession], optional """ - token = self.token_storage.get() - if token is None: - return self.refresh_token(network_session=network_session) + auth_manager: AuthorizationManager = ( + AuthorizationManager(network_session=network_session) + if not network_session == None + else AuthorizationManager() + ) + token: AccessToken = auth_manager.request_access_token( + grant_type=RequestAccessTokenGrantType.CLIENT_CREDENTIALS.value, + client_id=self.config.client_id, + client_secret=self.config.client_secret, + box_subject_type=self.subject_type, + box_subject_id=self.subject_id, + ) + self.token_storage.store(token) return token - def refresh_token( + def retrieve_token( self, network_session: Optional[NetworkSession] = None ) -> AccessToken: """ Return a current token or get a new one when not available. :param network_session: An object to keep network session state - :return: Access token + :type network_session: Optional[NetworkSession], optional """ - request_body = TokenRequest( - grant_type=TokenRequestGrantType.CLIENT_CREDENTIALS, - client_id=self.config.client_id, - client_secret=self.config.client_secret, - box_subject_id=self.subject_id, - box_subject_type=self.subject_type, - ) + old_token = self.token_storage.get() + if old_token == None: + new_token: AccessToken = self.refresh_token(network_session) + return new_token + return old_token - response: FetchResponse = fetch( - 'https://api.box.com/oauth2/token', - FetchOptions( - method='POST', - data=request_body.to_dict(), - content_type='application/x-www-form-urlencoded', - network_session=network_session, - ), - ) - - new_token = AccessToken.from_dict(response.data) - self.token_storage.store(new_token) - return new_token - - def as_user(self, user_id: str): + def as_user(self, user_id: str, token_storage: TokenStorage = None) -> 'BoxCCGAuth': """ - Set authentication as user. The new token will be automatically fetched with a next API call. + Create a new BoxCCGAuth instance that uses the provided user ID as the subject ID. + + May be one of this application's created App User. Depending on the configured User Access Level, may also be any other App User or Managed User in the enterprise. - May be one of this application's created App User. Depending on the - configured User Access Level, may also be any other App User or Managed - User in the enterprise. + + - :param user_id: - The id of the user to authenticate. + :param user_id: The id of the user to authenticate + :type user_id: str + :param token_storage: Object responsible for storing token in newly created BoxCCGAuth. If no custom implementation provided, the token will be stored in memory. + :type token_storage: TokenStorage, optional + """ + if token_storage is None: + token_storage = InMemoryTokenStorage() + new_config: CCGConfig = CCGConfig( + client_id=self.config.client_id, + client_secret=self.config.client_secret, + enterprise_id=self.config.enterprise_id, + user_id=user_id, + token_storage=token_storage, + ) + return BoxCCGAuth(config=new_config) + + def as_enterprise( + self, enterprise_id: str, token_storage: TokenStorage = None + ) -> 'BoxCCGAuth': + """ + Create a new BoxCCGAuth instance that uses the provided enterprise ID as the subject ID. + :param enterprise_id: The id of the enterprise to authenticate + :type enterprise_id: str + :param token_storage: Object responsible for storing token in newly created BoxCCGAuth. If no custom implementation provided, the token will be stored in memory. + :type token_storage: TokenStorage, optional """ - self.subject_id = user_id - self.subject_type = TokenRequestBoxSubjectType.USER - self.token_storage.clear() + if token_storage is None: + token_storage = InMemoryTokenStorage() + new_config: CCGConfig = CCGConfig( + client_id=self.config.client_id, + client_secret=self.config.client_secret, + enterprise_id=enterprise_id, + user_id=None, + token_storage=token_storage, + ) + return BoxCCGAuth(config=new_config) - def as_enterprise(self, enterprise_id: str): + def downscope_token( + self, + scopes: List[str], + resource: Optional[str] = None, + shared_link: Optional[str] = None, + network_session: Optional[NetworkSession] = None, + ) -> AccessToken: + """ + Downscope access token to the provided scopes. Returning a new access token with the provided scopes, with the original access token unchanged. + :param scopes: The scope(s) to apply to the resulting token. + :type scopes: List[str] + :param resource: The file or folder to get a downscoped token for. If None and shared_link None, the resulting token will not be scoped down to just a single item. The resource should be a full URL to an item, e.g. https://api.box.com/2.0/files/123456. + :type resource: Optional[str], optional + :param shared_link: The shared link to get a downscoped token for. If None and item None, the resulting token will not be scoped down to just a single item. + :type shared_link: Optional[str], optional + :param network_session: An object to keep network session state + :type network_session: Optional[NetworkSession], optional """ - Set authentication as enterprise. The new token will be automatically fetched with a next API call. + token: Optional[AccessToken] = self.token_storage.get() + if token == None: + raise Exception( + 'No access token is available. Make an API call to retrieve a token' + ' before calling this method.' + ) + auth_manager: AuthorizationManager = ( + AuthorizationManager(network_session=network_session) + if not network_session == None + else AuthorizationManager() + ) + downscoped_token: AccessToken = auth_manager.request_access_token( + grant_type=RequestAccessTokenGrantType.URN_IETF_PARAMS_OAUTH_GRANT_TYPE_TOKEN_EXCHANGE.value, + subject_token=token.access_token, + subject_token_type=RequestAccessTokenSubjectTokenType.URN_IETF_PARAMS_OAUTH_TOKEN_TYPE_ACCESS_TOKEN.value, + resource=resource, + scope=' '.join(scopes), + box_shared_link=shared_link, + ) + return downscoped_token - :param enterprise_id: - The ID of the Box Developer Edition enterprise. + def revoke_token(self, network_session: Optional[NetworkSession] = None) -> None: """ - self.subject_id = enterprise_id - self.subject_type = TokenRequestBoxSubjectType.ENTERPRISE - self.token_storage.clear() + Revoke the current access token and remove it from token storage. + :param network_session: An object to keep network session state + :type network_session: Optional[NetworkSession], optional + """ + old_token: Optional[AccessToken] = self.token_storage.get() + if old_token == None: + return None + auth_manager: AuthorizationManager = ( + AuthorizationManager(network_session=network_session) + if not network_session == None + else AuthorizationManager() + ) + auth_manager.revoke_access_token( + self.config.client_id, self.config.client_secret, old_token.access_token + ) + return self.token_storage.clear() diff --git a/box_sdk_gen/developer_token_auth.py b/box_sdk_gen/developer_token_auth.py index 684ac62..d34c60c 100644 --- a/box_sdk_gen/developer_token_auth.py +++ b/box_sdk_gen/developer_token_auth.py @@ -1,13 +1,16 @@ from typing import Optional -from .auth import Authentication -from .network import NetworkSession -from .schemas import AccessToken +from box_sdk_gen.schemas import AccessToken + +from box_sdk_gen.auth import Authentication + +from box_sdk_gen.network import NetworkSession class BoxDeveloperTokenAuth(Authentication): - def __init__(self, token: str): - self.token: AccessToken = AccessToken(access_token=token) + def __init__(self, token: str, **kwargs): + super().__init__(**kwargs) + self.token = token def retrieve_token( self, network_session: Optional[NetworkSession] = None @@ -15,13 +18,16 @@ def retrieve_token( """ Retrieves stored developer token :param network_session: An object to keep network session state - :return: Return a current token + :type network_session: Optional[NetworkSession], optional """ - return self.token + return AccessToken(access_token=self.token) - def refresh_token(self, network_session: Optional[NetworkSession] = None): + def refresh_token( + self, network_session: Optional[NetworkSession] = None + ) -> AccessToken: """ Developer token cannot be refreshed :param network_session: An object to keep network session state + :type network_session: Optional[NetworkSession], optional """ - raise Exception("Developer token has expired. Please provide a new one.") + raise Exception('Developer token has expired. Please provide a new one.') diff --git a/box_sdk_gen/jwt_auth.py b/box_sdk_gen/jwt_auth.py index 8f70c6b..8d68940 100644 --- a/box_sdk_gen/jwt_auth.py +++ b/box_sdk_gen/jwt_auth.py @@ -8,8 +8,6 @@ from typing import List -from box_sdk_gen.utils import to_string - from box_sdk_gen.managers.authorization import RequestAccessTokenGrantType from box_sdk_gen.managers.authorization import RequestAccessTokenSubjectTokenType @@ -50,10 +48,6 @@ from box_sdk_gen.managers.authorization import AuthorizationManager -from box_sdk_gen.utils import to_string - -from box_sdk_gen.json_data import sd_to_json - box_jwt_audience: str = 'https://api.box.com/oauth2/token' @@ -331,7 +325,7 @@ def retrieve_token( return new_token return old_token - def as_user(self, user_id: str) -> 'BoxJWTAuth': + def as_user(self, user_id: str, token_storage: TokenStorage = None) -> 'BoxJWTAuth': """ Create a new BoxJWTAuth instance that uses the provided user ID as the subject of the JWT assertion. @@ -345,7 +339,11 @@ def as_user(self, user_id: str) -> 'BoxJWTAuth': :param user_id: The id of the user to authenticate :type user_id: str + :param token_storage: Object responsible for storing token in newly created BoxJWTAuth. If no custom implementation provided, the token will be stored in memory. + :type token_storage: TokenStorage, optional """ + if token_storage is None: + token_storage = InMemoryTokenStorage() new_config: JWTConfig = JWTConfig( client_id=self.config.client_id, client_secret=self.config.client_secret, @@ -354,18 +352,23 @@ def as_user(self, user_id: str) -> 'BoxJWTAuth': jwt_key_id=self.config.jwt_key_id, private_key=self.config.private_key, private_key_passphrase=self.config.private_key_passphrase, - token_storage=self.token_storage, + token_storage=token_storage, ) new_auth: 'BoxJWTAuth' = BoxJWTAuth(config=new_config) - self.token_storage.clear() return new_auth - def as_enterprise(self, user_id: str) -> 'BoxJWTAuth': + def as_enterprise( + self, user_id: str, token_storage: TokenStorage = None + ) -> 'BoxJWTAuth': """ Create a new BoxJWTAuth instance that uses the provided enterprise ID as the subject of the JWT assertion. :param user_id: The id of the enterprise to authenticate :type user_id: str + :param token_storage: Object responsible for storing token in newly created BoxJWTAuth. If no custom implementation provided, the token will be stored in memory. + :type token_storage: TokenStorage, optional """ + if token_storage is None: + token_storage = InMemoryTokenStorage() new_config: JWTConfig = JWTConfig( client_id=self.config.client_id, client_secret=self.config.client_secret, @@ -374,10 +377,9 @@ def as_enterprise(self, user_id: str) -> 'BoxJWTAuth': jwt_key_id=self.config.jwt_key_id, private_key=self.config.private_key, private_key_passphrase=self.config.private_key_passphrase, - token_storage=self.token_storage, + token_storage=token_storage, ) new_auth: 'BoxJWTAuth' = BoxJWTAuth(config=new_config) - self.token_storage.clear() return new_auth def downscope_token( @@ -391,7 +393,7 @@ def downscope_token( Downscope access token to the provided scopes. Returning a new access token with the provided scopes, with the original access token unchanged. :param scopes: The scope(s) to apply to the resulting token. :type scopes: List[str] - :param resource: The file or folder to get a downscoped token for. If None and shared_link None, the resulting token will not be scoped down to just a single item. + :param resource: The file or folder to get a downscoped token for. If None and shared_link None, the resulting token will not be scoped down to just a single item. The resource should be a full URL to an item, e.g. https://api.box.com/2.0/files/123456. :type resource: Optional[str], optional :param shared_link: The shared link to get a downscoped token for. If None and item None, the resulting token will not be scoped down to just a single item. :type shared_link: Optional[str], optional @@ -414,14 +416,17 @@ def downscope_token( subject_token=token.access_token, subject_token_type=RequestAccessTokenSubjectTokenType.URN_IETF_PARAMS_OAUTH_TOKEN_TYPE_ACCESS_TOKEN.value, resource=resource, - scope=to_string(scopes), + scope=' '.join(scopes), box_shared_link=shared_link, ) return downscoped_token + def add_space(self, text: str) -> str: + return ''.join([]) + def revoke_token(self, network_session: Optional[NetworkSession] = None) -> None: """ - Revoke the current access token and remove from token storage. + Revoke the current access token and remove it from token storage. :param network_session: An object to keep network session state :type network_session: Optional[NetworkSession], optional """ diff --git a/box_sdk_gen/oauth.py b/box_sdk_gen/oauth.py index 709429f..a04d42b 100644 --- a/box_sdk_gen/oauth.py +++ b/box_sdk_gen/oauth.py @@ -1,29 +1,40 @@ -from urllib.parse import urlencode, urlunsplit from typing import Optional -from .auth import Authentication -from .token_storage import TokenStorage, InMemoryTokenStorage -from .auth_schemas import TokenRequest, TokenRequestGrantType -from .fetch import fetch, FetchResponse, FetchOptions -from .network import NetworkSession -from .schemas import AccessToken -from .json_data import json_to_serialized_data +from typing import Dict + +from typing import List + +from box_sdk_gen.managers.authorization import RequestAccessTokenGrantType + +from box_sdk_gen.managers.authorization import RequestAccessTokenSubjectTokenType + +from box_sdk_gen.auth import Authentication + +from box_sdk_gen.network import NetworkSession + +from box_sdk_gen.schemas import AccessToken + +from box_sdk_gen.schemas import PostOAuth2Token + +from box_sdk_gen.schemas import PostOAuth2Revoke + +from box_sdk_gen.managers.authorization import AuthorizationManager + +from box_sdk_gen.token_storage import TokenStorage + +from box_sdk_gen.token_storage import InMemoryTokenStorage + +from box_sdk_gen.json_data import sd_to_url_params + +from box_sdk_gen.utils import prepare_params + +box_oauth_2_auth_url: str = 'https://account.box.com/api/oauth2/authorize' class OAuthConfig: def __init__( self, client_id: str, client_secret: str, token_storage: TokenStorage = None ): - """ - :param client_id: - Box API key used for identifying the application the user is authenticating with. - :param client_secret: - Box API secret used for making auth requests. - :param token_storage: - Object responsible for storing token. If no custom implementation provided, - the token will be stored in memory. - """ - if token_storage is None: token_storage = InMemoryTokenStorage() self.client_id = client_id @@ -41,13 +52,16 @@ def __init__( scope: Optional[str] = None, ): """ - :param client_id: The Client ID of the application that is requesting to authenticate the user. - :param redirect_uri: The URI to which Box redirects the browser after the user has granted or - denied the application permission. - :param response_type: The type of response we'd like to receive. Must be 'code'. - :param state: A custom string of your choice. Box will pass the same string to the redirect - URL when authentication is complete. - :param scope: A comma-separated list of application scopes you'd like to authenticate the user for. + :param client_id: Box API key used for identifying the application the user is authenticating with + :type client_id: Optional[str], optional + :param redirect_uri: The URI to which Box redirects the browser after the user has granted or denied the application permission. This URI match one of the redirect URIs in the configuration of your application. + :type redirect_uri: Optional[str], optional + :param response_type: The type of response we would like to receive. + :type response_type: Optional[str], optional + :param state: A custom string of your choice. Box will pass the same string to the redirect URL when authentication is complete. This parameter can be used to identify a user on redirect, as well as protect against hijacked sessions and other exploits. + :type state: Optional[str], optional + :param scope: A space-separated list of application scopes you'd like to authenticate the user for. This defaults to all the scopes configured for the application in its configuration page. + :type scope: Optional[str], optional """ self.client_id = client_id self.redirect_uri = redirect_uri @@ -57,74 +71,57 @@ def __init__( class BoxOAuth(Authentication): - OAUTH2_AUTHORIZE_URL = 'https://account.box.com/api/oauth2/authorize' - - def __init__(self, config: OAuthConfig): + def __init__(self, config: OAuthConfig, **kwargs): """ - :param config: - Configuration object of OAuth. + :param config: Configuration object of OAuth. + :type config: OAuthConfig """ + super().__init__(**kwargs) self.config = config - self.token_storage = config.token_storage + self.token_storage = self.config.token_storage - def get_authorize_url( - self, options: Optional[GetAuthorizeUrlOptions] = None - ) -> str: + def get_authorize_url(self, options: GetAuthorizeUrlOptions = None) -> str: """ Get the authorization URL for the app user. - :param options: Options class for getting authorization url - :return: Authorization url """ if options is None: options = GetAuthorizeUrlOptions() - - params = [ - ( - 'client_id', - ( - options.client_id - if options.client_id is not None - else self.config.client_id - ), + params: Dict[str, str] = prepare_params({ + 'client_id': ( + options.client_id + if not options.client_id == None + else self.config.client_id ), - ( - 'response_type', - options.response_type if options.response_type is not None else 'code', + 'response_type': ( + options.response_type if not options.response_type == None else 'code' ), - ] - - if options.redirect_uri is not None: - params.append(('redirect_uri', options.redirect_uri)) - - if options.state is not None: - params.append(('state', options.state)) - - if options.scope is not None: - params.append(('scope', options.scope)) - - params = [ - (key.encode('utf-8'), value.encode('utf-8')) for (key, value) in params - ] - query_string = urlencode(params) - return urlunsplit(('', '', self.OAUTH2_AUTHORIZE_URL, query_string, '')) + 'redirect_uri': options.redirect_uri, + 'state': options.state, + 'scope': options.scope, + }) + return ''.join([box_oauth_2_auth_url, '?', sd_to_url_params(params)]) def get_tokens_authorization_code_grant( self, authorization_code: str, network_session: Optional[NetworkSession] = None ) -> AccessToken: """ - Send token request and return the access_token - :param authorization_code: Short-lived authorization code + Acquires token info using an authorization code. + :param authorization_code: The authorization code to use to get tokens. + :type authorization_code: str :param network_session: An object to keep network session state - :return: Access token + :type network_session: Optional[NetworkSession], optional """ - request_body = TokenRequest( - grant_type=TokenRequestGrantType.AUTHORIZATION_CODE, + auth_manager: AuthorizationManager = ( + AuthorizationManager(network_session=network_session) + if not network_session == None + else AuthorizationManager() + ) + token: AccessToken = auth_manager.request_access_token( + grant_type=RequestAccessTokenGrantType.AUTHORIZATION_CODE.value, + code=authorization_code, client_id=self.config.client_id, client_secret=self.config.client_secret, - code=authorization_code, ) - - token: AccessToken = self._send_token_request(request_body, network_session) self.token_storage.store(token) return token @@ -132,15 +129,15 @@ def retrieve_token( self, network_session: Optional[NetworkSession] = None ) -> AccessToken: """ - Return a current token or get a new one when not available. + Get the current access token. If the current access token is expired or not found, this method will attempt to refresh the token. :param network_session: An object to keep network session state - :return: Valid access token + :type network_session: Optional[NetworkSession], optional """ token = self.token_storage.get() - if token is None: + if token == None: raise Exception( - "Access and refresh tokens not available. Authenticate before making" - " any API call first." + 'Access and refresh tokens not available. Authenticate before making' + ' any API call first.' ) return token @@ -150,42 +147,84 @@ def refresh_token( refresh_token: Optional[str] = None, ) -> AccessToken: """ - Refresh outdated access token with refresh token + Get a new access token for the app user. :param network_session: An object to keep network session state - :param refresh_token: Refresh token, which can be used to obtain a new access token - :return: Valid access token + :type network_session: Optional[NetworkSession], optional + :param refresh_token: A refresh token to use + :type refresh_token: Optional[str], optional """ old_token: Optional[AccessToken] = self.token_storage.get() token_used_for_refresh = ( - refresh_token or old_token.refresh_token if old_token else None + refresh_token + if not refresh_token == None + else old_token.refresh_token if not old_token == None else None ) - - if token_used_for_refresh is None: - raise Exception("No refresh_token is available.") - - request_body = TokenRequest( - grant_type=TokenRequestGrantType.REFRESH_TOKEN, + auth_manager: AuthorizationManager = ( + AuthorizationManager(network_session=network_session) + if not network_session == None + else AuthorizationManager() + ) + token: AccessToken = auth_manager.request_access_token( + grant_type=RequestAccessTokenGrantType.REFRESH_TOKEN.value, client_id=self.config.client_id, client_secret=self.config.client_secret, - refresh_token=refresh_token or old_token.refresh_token, + refresh_token=token_used_for_refresh, ) + self.token_storage.store(token) + return token - new_token = self._send_token_request(request_body, network_session) - self.token_storage.store(new_token) - return new_token + def revoke_token(self, network_session: Optional[NetworkSession] = None) -> None: + """ + Revoke an active Access Token, effectively logging a user out that has been previously authenticated. + :param network_session: An object to keep network session state + :type network_session: Optional[NetworkSession], optional + """ + token: Optional[AccessToken] = self.token_storage.get() + if token == None: + return None + auth_manager: AuthorizationManager = ( + AuthorizationManager(network_session=network_session) + if not network_session == None + else AuthorizationManager() + ) + auth_manager.revoke_access_token( + self.config.client_id, self.config.client_secret, token.access_token + ) + self.token_storage.clear() + return None - @staticmethod - def _send_token_request( - request_body: TokenRequest, network_session: Optional[NetworkSession] = None + def downscope_token( + self, + scopes: List[str], + resource: Optional[str] = None, + shared_link: Optional[str] = None, + network_session: Optional[NetworkSession] = None, ) -> AccessToken: - response: FetchResponse = fetch( - 'https://api.box.com/oauth2/token', - FetchOptions( - method='POST', - data=request_body.to_dict(), - content_type='application/x-www-form-urlencoded', - network_session=network_session, - ), + """ + Downscope access token to the provided scopes. Returning a new access token with the provided scopes, with the original access token unchanged. + :param scopes: The scope(s) to apply to the resulting token. + :type scopes: List[str] + :param resource: The file or folder to get a downscoped token for. If None and shared_link None, the resulting token will not be scoped down to just a single item. The resource should be a full URL to an item, e.g. https://api.box.com/2.0/files/123456. + :type resource: Optional[str], optional + :param shared_link: The shared link to get a downscoped token for. If None and item None, the resulting token will not be scoped down to just a single item. + :type shared_link: Optional[str], optional + :param network_session: An object to keep network session state + :type network_session: Optional[NetworkSession], optional + """ + token: Optional[AccessToken] = self.token_storage.get() + if token == None or token.access_token == None: + raise Exception('No access token is available.') + auth_manager: AuthorizationManager = ( + AuthorizationManager(network_session=network_session) + if not network_session == None + else AuthorizationManager() ) - - return AccessToken.from_dict(response.data) + downscoped_token: AccessToken = auth_manager.request_access_token( + grant_type=RequestAccessTokenGrantType.URN_IETF_PARAMS_OAUTH_GRANT_TYPE_TOKEN_EXCHANGE.value, + subject_token=token.access_token, + subject_token_type=RequestAccessTokenSubjectTokenType.URN_IETF_PARAMS_OAUTH_TOKEN_TYPE_ACCESS_TOKEN.value, + resource=resource, + scope=' '.join(scopes), + box_shared_link=shared_link, + ) + return downscoped_token diff --git a/box_sdk_gen/schemas.py b/box_sdk_gen/schemas.py index 8976fb1..4bb5c4e 100644 --- a/box_sdk_gen/schemas.py +++ b/box_sdk_gen/schemas.py @@ -2868,23 +2868,22 @@ class StoragePolicyMiniTypeField(str, Enum): class StoragePolicyMini(BaseObject): - def __init__( - self, - id: Optional[str] = None, - type: Optional[StoragePolicyMiniTypeField] = None, - **kwargs - ): + def __init__(self, id: str, type: StoragePolicyMiniTypeField, **kwargs): """ :param id: The unique identifier for this storage policy - :type id: Optional[str], optional + :type id: str :param type: `storage_policy` - :type type: Optional[StoragePolicyMiniTypeField], optional + :type type: StoragePolicyMiniTypeField """ super().__init__(**kwargs) self.id = id self.type = type +class StoragePolicyAssignmentTypeField(str, Enum): + STORAGE_POLICY_ASSIGNMENT = 'storage_policy_assignment' + + class StoragePolicyAssignmentAssignedToField(BaseObject): def __init__(self, id: Optional[str] = None, type: Optional[str] = None, **kwargs): """ @@ -2901,11 +2900,21 @@ def __init__(self, id: Optional[str] = None, type: Optional[str] = None, **kwarg class StoragePolicyAssignment(BaseObject): def __init__( self, + id: str, + type: StoragePolicyAssignmentTypeField, storage_policy: Optional[StoragePolicyMini] = None, assigned_to: Optional[StoragePolicyAssignmentAssignedToField] = None, **kwargs ): + """ + :param id: The unique identifier for a storage policy assignment. + :type id: str + :param type: `storage_policy_assignment` + :type type: StoragePolicyAssignmentTypeField + """ super().__init__(**kwargs) + self.id = id + self.type = type self.storage_policy = storage_policy self.assigned_to = assigned_to @@ -2941,18 +2950,18 @@ def __init__( class StoragePolicy(StoragePolicyMini): def __init__( self, + id: str, + type: StoragePolicyMiniTypeField, name: Optional[str] = None, - id: Optional[str] = None, - type: Optional[StoragePolicyMiniTypeField] = None, **kwargs ): """ - :param name: A descriptive name of the region - :type name: Optional[str], optional :param id: The unique identifier for this storage policy - :type id: Optional[str], optional + :type id: str :param type: `storage_policy` - :type type: Optional[StoragePolicyMiniTypeField], optional + :type type: StoragePolicyMiniTypeField + :param name: A descriptive name of the region + :type name: Optional[str], optional """ super().__init__(id=id, type=type, **kwargs) self.name = name diff --git a/docs/authentication.md b/docs/authentication.md index 14c986b..4de384c 100644 --- a/docs/authentication.md +++ b/docs/authentication.md @@ -229,19 +229,22 @@ client = BoxClient(auth=auth) ### Switching between Service Account and User -In order to switch between being authenticated as Service Account and a User you can call: +You can easily switch to be authenticated as a Service Account or as a User. +To create a new auth object authenticated as Service Account you can call: ```python -auth.as_enterprise(enterprise_id='YOUR_ENTERPRISE_ID') +enterprise_auth = auth.as_enterprise(enterprise_id='YOUR_ENTERPRISE_ID') +enterprise_client = BoxClient(auth=enterprise_auth) ``` -to authenticate as enterprise or +To authenticate as user with provided User ID call: ```python -auth.as_user(user_id='YOUR_USER_ID') +user_auth = auth.as_user(user_id='YOUR_USER_ID') +user_client = BoxClient(auth=user_auth) ``` -to authenticate as User with provided ID. The new token will be automatically fetched with a next API call. +The new token will be automatically fetched with a next API call. [ccg_guide]: https://developer.box.com/guides/authentication/client-credentials/client-credentials-setup/ @@ -291,7 +294,7 @@ list names of all items in a root folder. from flask import Flask, request, redirect from box_sdk_gen.client import BoxClient -from box_sdk_gen.oauth import BoxOAuth, OAuthConfig +from box_sdk_gen.oauth import BoxOAuth, OAuthConfig, GetAuthorizeUrlOptions app = Flask(__name__) @@ -302,7 +305,7 @@ AUTH = BoxOAuth( @app.route("/") def get_auth(): - auth_url = AUTH.get_authorize_url() + auth_url = AUTH.get_authorize_url(GetAuthorizeUrlOptions(redirect_uri='YOUR_REDIRECT_URL')) return redirect(auth_url, code=302) @@ -321,9 +324,10 @@ if __name__ == '__main__': # Revoke token -Access tokens for a client can be revoked when needed. This call invalidates old token, but you can still -reuse the `auth` object to retrieve a new token. If you make any new call after revoking the token, -a new token will be automatically retrieved. This method is currently only available for JWT authentication. +Access tokens for a client can be revoked when needed. This call invalidates old token. +For CCGAuth and JWTAuth you can still reuse the `auth` object to retrieve a new token. If you make any new call after revoking the token, +a new token will be automatically retrieved. +For OAuth it would be necessary to manually go through the authentication process again. To revoke current client's tokens in the storage use the following code: @@ -337,7 +341,7 @@ client.auth.revoke_token() You can exchange a client's access token for one with a lower scope, in order to restrict the permissions for a child client or to pass to a less secure -location (e.g. a browser-based app). This method is currently only available for JWT authentication. +location (e.g. a browser-based app). A downscoped token does not include a refresh token. In such a scenario, to obtain a new downscoped token, refresh the original token diff --git a/test/auth.py b/test/auth.py index 80a5f4b..8ebf47f 100644 --- a/test/auth.py +++ b/test/auth.py @@ -12,6 +12,10 @@ from box_sdk_gen.schemas import AccessToken +from box_sdk_gen.schemas import FolderFull + +from box_sdk_gen.managers.folders import CreateFolderParent + from box_sdk_gen.utils import decode_base_64 from box_sdk_gen.utils import get_env_var @@ -54,7 +58,6 @@ def test_jwt_auth(): decode_base_64(get_env_var('JWT_CONFIG_BASE_64')) ) auth: BoxJWTAuth = BoxJWTAuth(config=jwt_config) - client: BoxClient = BoxClient(auth=auth) user_auth: BoxJWTAuth = auth.as_user(user_id) user_client: BoxClient = BoxClient(auth=user_auth) current_user: UserFull = user_client.users.get_user_me() @@ -81,7 +84,9 @@ def test_jwt_auth_downscope(): ) file: FileFull = uploaded_files.entries[0] resource_path: str = ''.join(['https://api.box.com/2.0/files/', file.id]) - downscoped_token: AccessToken = auth.downscope_token(['item_rename'], resource_path) + downscoped_token: AccessToken = auth.downscope_token( + ['item_rename', 'item_preview'], resource_path + ) assert not downscoped_token.access_token == None downscoped_client: BoxClient = BoxClient( auth=BoxDeveloperTokenAuth(token=downscoped_token.access_token) @@ -129,17 +134,58 @@ def test_ccg_auth(): user_id=user_id, ) auth: BoxCCGAuth = BoxCCGAuth(config=ccg_config) - client: BoxClient = BoxClient(auth=auth) - auth.as_user(user_id) - current_user: UserFull = client.users.get_user_me() + user_auth: BoxCCGAuth = auth.as_user(user_id) + user_client: BoxClient = BoxClient(auth=user_auth) + current_user: UserFull = user_client.users.get_user_me() assert current_user.id == user_id - auth.as_enterprise(enterprise_id) - new_user: UserFull = client.users.get_user_me(fields=['enterprise']) + enterprise_auth: BoxCCGAuth = auth.as_enterprise(enterprise_id) + enterprise_client: BoxClient = BoxClient(auth=enterprise_auth) + new_user: UserFull = enterprise_client.users.get_user_me(fields=['enterprise']) assert not new_user.enterprise == None assert new_user.enterprise.id == enterprise_id assert not new_user.id == user_id +def test_ccg_auth_downscope(): + ccg_config: CCGConfig = CCGConfig( + client_id=get_env_var('CLIENT_ID'), + client_secret=get_env_var('CLIENT_SECRET'), + user_id=get_env_var('USER_ID'), + ) + auth: BoxCCGAuth = BoxCCGAuth(config=ccg_config) + parent_client: BoxClient = BoxClient(auth=auth) + folder: FolderFull = parent_client.folders.create_folder( + name=get_uuid(), parent=CreateFolderParent(id='0') + ) + resource_path: str = ''.join(['https://api.box.com/2.0/folders/', folder.id]) + downscoped_token: AccessToken = auth.downscope_token( + ['item_rename', 'item_preview'], resource_path + ) + assert not downscoped_token.access_token == None + downscoped_client: BoxClient = BoxClient( + auth=BoxDeveloperTokenAuth(token=downscoped_token.access_token) + ) + downscoped_client.folders.update_folder_by_id(folder_id=folder.id, name=get_uuid()) + with pytest.raises(Exception): + downscoped_client.folders.delete_folder_by_id(folder_id=folder.id) + parent_client.folders.delete_folder_by_id(folder_id=folder.id) + + +def test_ccg_auth_revoke(): + ccg_config: CCGConfig = CCGConfig( + client_id=get_env_var('CLIENT_ID'), + client_secret=get_env_var('CLIENT_SECRET'), + user_id=get_env_var('USER_ID'), + ) + auth: BoxCCGAuth = BoxCCGAuth(config=ccg_config) + auth.retrieve_token() + token_from_storage_before_revoke: Optional[AccessToken] = auth.token_storage.get() + auth.revoke_token() + token_from_storage_after_revoke: Optional[AccessToken] = auth.token_storage.get() + assert not token_from_storage_before_revoke == None + assert token_from_storage_after_revoke == None + + def get_access_token() -> AccessToken: user_id: str = get_env_var('USER_ID') enterprise_id: str = get_env_var('ENTERPRISE_ID') @@ -162,3 +208,46 @@ def test_developer_token_auth(): client: BoxClient = BoxClient(auth=dev_auth) current_user: UserFull = client.users.get_user_me() assert current_user.id == user_id + + +def test_oauth_auth_revoke(): + config: OAuthConfig = OAuthConfig( + client_id=get_env_var('CLIENT_ID'), client_secret=get_env_var('CLIENT_SECRET') + ) + auth: BoxOAuth = BoxOAuth(config=config) + token: AccessToken = get_access_token() + auth.token_storage.store(token) + token_before_revoke: Optional[AccessToken] = auth.token_storage.get() + auth.revoke_token() + token_after_revoke: Optional[AccessToken] = auth.token_storage.get() + assert not token_before_revoke == None + assert token_after_revoke == None + + +def test_oauth_auth_downscope(): + config: OAuthConfig = OAuthConfig( + client_id=get_env_var('CLIENT_ID'), client_secret=get_env_var('CLIENT_SECRET') + ) + auth: BoxOAuth = BoxOAuth(config=config) + token: AccessToken = get_access_token() + auth.token_storage.store(token) + parent_client: BoxClient = BoxClient(auth=auth) + uploaded_files: Files = parent_client.uploads.upload_file( + attributes=UploadFileAttributes( + name=get_uuid(), parent=UploadFileAttributesParentField(id='0') + ), + file=generate_byte_stream(1024 * 1024), + ) + file: FileFull = uploaded_files.entries[0] + resource_path: str = ''.join(['https://api.box.com/2.0/files/', file.id]) + downscoped_token: AccessToken = auth.downscope_token( + ['item_rename', 'item_preview'], resource_path + ) + assert not downscoped_token.access_token == None + downscoped_client: BoxClient = BoxClient( + auth=BoxDeveloperTokenAuth(token=downscoped_token.access_token) + ) + downscoped_client.files.update_file_by_id(file_id=file.id, name=get_uuid()) + with pytest.raises(Exception): + downscoped_client.files.delete_file_by_id(file_id=file.id) + parent_client.files.delete_file_by_id(file_id=file.id)