diff --git a/waterbutler/core/provider.py b/waterbutler/core/provider.py index 84e44d8dd..6d8ef2a2a 100644 --- a/waterbutler/core/provider.py +++ b/waterbutler/core/provider.py @@ -1,6 +1,12 @@ import abc import time -import typing +from typing import ( + List, + Set, + Tuple, + Callable, + Union +) import asyncio import logging import weakref @@ -8,17 +14,28 @@ import itertools from urllib import parse -import furl import aiohttp +import furl -from waterbutler.core import streams from waterbutler.core import exceptions -from waterbutler.core import path as wb_path -from waterbutler import settings as wb_settings +from waterbutler.core.metadata import ( + BaseMetadata, + BaseFileMetadata, + BaseFileRevisionMetadata, + BaseFolderMetadata +) from waterbutler.core.metrics import MetricsRecord -from waterbutler.core import metadata as wb_metadata -from waterbutler.core.utils import ZipStreamGenerator -from waterbutler.core.utils import RequestHandlerContext +from waterbutler.core.path import WaterButlerPath +from waterbutler.core.streams import ( + BaseStream, + ResponseStreamReader, + ZipStreamReader +) +from waterbutler.core.utils import ( + RequestHandlerContext, + ZipStreamGenerator +) +from waterbutler.settings import OP_CONCURRENCY logger = logging.getLogger(__name__) @@ -84,12 +101,14 @@ class BaseProvider(metaclass=abc.ABCMeta): BASE_URL = None - def __init__(self, auth: dict, - credentials: dict, - settings: dict, - retry_on: typing.Set[int]={408, 502, 503, 504}) -> None: - """ - :param auth: ( :class:`dict` ) Information about the user this provider will act on the behalf of + def __init__( + self, + auth: dict, + credentials: dict, + settings: dict, + retry_on: Set[int]={408, 502, 503, 504} + ) -> None: + """:param auth: ( :class:`dict` ) Information about the user this provider will act on the behalf of :param credentials: ( :class:`dict` ) The credentials used to authenticate with the provider, ofter an OAuth 2 token :param settings: ( :class:`dict` ) Configuration settings for this provider, @@ -197,13 +216,15 @@ async def make_request(self, method: str, url: str, *args, **kwargs) -> aiohttp. def request(self, *args, **kwargs): return RequestHandlerContext(self.make_request(*args, **kwargs)) - async def move(self, - dest_provider: 'BaseProvider', - src_path: wb_path.WaterButlerPath, - dest_path: wb_path.WaterButlerPath, - rename: str=None, - conflict: str='replace', - handle_naming: bool=True) -> typing.Tuple[wb_metadata.BaseMetadata, bool]: + async def move( + self, + dest_provider: 'BaseProvider', + src_path: WaterButlerPath, + dest_path: WaterButlerPath, + rename: str=None, + conflict: str='replace', + handle_naming: bool=True + ) -> Tuple[BaseMetadata, bool]: """Moves a file or folder from the current provider to the specified one Performs a copy and then a delete. Calls :func:`BaseProvider.intra_move` if possible. @@ -255,13 +276,27 @@ async def move(self, return meta_data, created - async def copy(self, - dest_provider: 'BaseProvider', - src_path: wb_path.WaterButlerPath, - dest_path: wb_path.WaterButlerPath, - rename: str=None, conflict: str='replace', - handle_naming: bool=True) \ - -> typing.Tuple[wb_metadata.BaseMetadata, bool]: + async def copy( + self, + dest_provider: 'BaseProvider', + src_path: WaterButlerPath, + dest_path: WaterButlerPath, + rename: str=None, + conflict: str='replace', + handle_naming: bool=True + ) -> Tuple[BaseMetadata, bool]: + """Copies a file or folder from the current provider to the specified one + Performs a copy and then a delete. + Calls :func:`BaseProvider.intra_move` if possible. + + :param dest_provider: ( :class:`.BaseProvider` ) The provider to copy to + :param src_path: ( :class:`.WaterButlerPath` ) Path to where the resource can be found + :param dest_path: ( :class:`.WaterButlerPath` ) Path to where the resource will be copied + :param rename: ( :class:`str` ) The desired name of the resulting path, may be incremented + :param conflict: ( :class:`str` ) What to do in the event of a name conflict, ``replace`` or ``keep`` + :param handle_naming: ( :class:`bool` ) If a naming conflict is detected, should it be automatically handled? + """ + args = (dest_provider, src_path, dest_path) kwargs = {'rename': rename, 'conflict': conflict, 'handle_naming': handle_naming} @@ -282,8 +317,8 @@ async def copy(self, # files and folders shouldn't overwrite themselves if ( - self.shares_storage_root(dest_provider) and - src_path.materialized_path == dest_path.materialized_path + self.shares_storage_root(dest_provider) and + src_path.materialized_path == dest_path.materialized_path ): raise exceptions.OverwriteSelfError(src_path) @@ -302,12 +337,13 @@ async def copy(self, return await dest_provider.upload(download_stream, dest_path) - async def _folder_file_op(self, - func: typing.Callable, - dest_provider: 'BaseProvider', - src_path: wb_path.WaterButlerPath, - dest_path: wb_path.WaterButlerPath, - **kwargs) -> typing.Tuple[wb_metadata.BaseFolderMetadata, bool]: + async def _folder_file_op( + self, + func: Callable, + dest_provider: 'BaseProvider', + src_path: WaterButlerPath, + dest_path: WaterButlerPath, + ) -> Tuple[BaseFolderMetadata, bool]: """Recursively apply func to src/dest path. Called from: func: copy and move if src_path.is_dir. @@ -343,10 +379,10 @@ async def _folder_file_op(self, # Metadata returns a union, which confuses mypy self.provider_metrics.append('_folder_file_ops.item_counts', len(items)) # type: ignore - for i in range(0, len(items), wb_settings.OP_CONCURRENCY): # type: ignore + for i in range(0, len(items), OP_CONCURRENCY): # type: ignore futures = [] - for item in items[i:i + wb_settings.OP_CONCURRENCY]: # type: ignore - futures.append(asyncio.ensure_future( + for item in items[i:i + OP_CONCURRENCY]: # type: ignore + future = asyncio.ensure_future( func( dest_provider, # TODO figure out a way to cut down on all the requests made here @@ -354,26 +390,28 @@ async def _folder_file_op(self, (await dest_provider.revalidate_path(dest_path, item.name, folder=item.is_folder)), handle_naming=False, ) - )) - + ) if item.is_folder: - await futures[-1] + await future + futures.append(future) if not futures: continue done, _ = await asyncio.wait(futures, return_when=asyncio.FIRST_EXCEPTION) - for fut in done: - folder.children.append(fut.result()[0]) + for future in done: + folder.children.append(future.result()[0]) return folder, created - async def handle_naming(self, - src_path: wb_path.WaterButlerPath, - dest_path: wb_path.WaterButlerPath, - rename: str=None, - conflict: str='replace') -> wb_path.WaterButlerPath: + async def handle_naming( + self, + src_path: WaterButlerPath, + dest_path: WaterButlerPath, + rename: str=None, + conflict: str='replace' + ) -> WaterButlerPath: """Given a :class:`.WaterButlerPath` and the desired name, handle any potential naming issues. i.e.: @@ -411,9 +449,7 @@ async def handle_naming(self, return dest_path - def can_intra_copy(self, - other: 'BaseProvider', - path: wb_path.WaterButlerPath=None) -> bool: + def can_intra_copy(self, other: 'BaseProvider', path: WaterButlerPath=None) -> bool: """Indicates if a quick copy can be performed between the current provider and `other`. .. note:: @@ -425,9 +461,7 @@ def can_intra_copy(self, """ return False - def can_intra_move(self, - other: 'BaseProvider', - path: wb_path.WaterButlerPath=None) -> bool: + def can_intra_move(self, other: 'BaseProvider', path: WaterButlerPath=None) -> bool: """Indicates if a quick move can be performed between the current provider and `other`. .. note:: @@ -439,10 +473,12 @@ def can_intra_move(self, """ return False - async def intra_copy(self, - dest_provider: 'BaseProvider', - source_path: wb_path.WaterButlerPath, - dest_path: wb_path.WaterButlerPath) -> typing.Tuple[wb_metadata.BaseFileMetadata, bool]: + async def intra_copy( + self, + dest_provider: 'BaseProvider', + source_path: WaterButlerPath, + dest_path: WaterButlerPath + ) -> Tuple[BaseFileMetadata, bool]: """If the provider supports copying files and/or folders within itself by some means other than download/upload, then ``can_intra_copy`` should return ``True``. This method will implement the copy. It accepts the destination provider, a source path, and the @@ -457,10 +493,12 @@ async def intra_copy(self, """ raise NotImplementedError - async def intra_move(self, - dest_provider: 'BaseProvider', - src_path: wb_path.WaterButlerPath, - dest_path: wb_path.WaterButlerPath) -> typing.Tuple[wb_metadata.BaseFileMetadata, bool]: + async def intra_move( + self, + dest_provider: 'BaseProvider', + src_path: WaterButlerPath, + dest_path: WaterButlerPath + ) -> Tuple[BaseFileMetadata, bool]: """If the provider supports moving files and/or folders within itself by some means other than download/upload/delete, then ``can_intra_move`` should return ``True``. This method will implement the move. It accepts the destination provider, a source path, and the @@ -477,8 +515,11 @@ async def intra_move(self, await self.delete(src_path) return data, created - async def exists(self, path: wb_path.WaterButlerPath, **kwargs) \ - -> typing.Union[bool, wb_metadata.BaseMetadata, typing.List[wb_metadata.BaseMetadata]]: + async def exists( + self, + path: WaterButlerPath, + **kwargs + ) -> Union[bool, BaseMetadata, List[BaseMetadata]]: """Check for existence of WaterButlerPath Attempt to retrieve provider metadata to determine existence of a WaterButlerPath. If @@ -497,11 +538,14 @@ async def exists(self, path: wb_path.WaterButlerPath, **kwargs) \ raise return False - async def handle_name_conflict(self, - path: wb_path.WaterButlerPath, - conflict: str='replace', - **kwargs) -> typing.Tuple[wb_path.WaterButlerPath, bool]: - """Check WaterButlerPath and resolve conflicts + async def handle_name_conflict( + self, + path: WaterButlerPath, + conflict: str='replace', + **kwargs + ) -> Tuple[WaterButlerPath, bool]: + """ + Check WaterButlerPath and resolve conflicts Given a WaterButlerPath and a conflict resolution pattern determine the correct file path to upload to and indicate if that file exists or not @@ -531,12 +575,15 @@ async def handle_name_conflict(self, return path, False - async def revalidate_path(self, - base: wb_path.WaterButlerPath, - path: str, - folder: bool=False) -> wb_path.WaterButlerPath: - """Take a path and a base path and build a WaterButlerPath representing `/base/path`. For - id-based providers, this will need to lookup the id of the new child object. + async def revalidate_path( + self, + base: WaterButlerPath, + path: str, + folder: bool=False + ) -> WaterButlerPath: + """Take a path and a base path and build a WaterButlerPath representing `/base/path`. + + For id-based providers, this will need to lookup the id of the new child object. :param base: ( :class:`.WaterButlerPath` ) The base folder to look under :param path: ( :class:`str`) the path of a child of `base`, relative to `base` @@ -545,18 +592,17 @@ async def revalidate_path(self, """ return base.child(path, folder=folder) - async def zip(self, path: wb_path.WaterButlerPath, **kwargs) -> asyncio.StreamReader: + async def zip(self, path: WaterButlerPath, **kwargs) -> asyncio.StreamReader: """Streams a Zip archive of the given folder :param path: ( :class:`.WaterButlerPath` ) The folder to compress """ - meta_data = await self.metadata(path) # type: ignore if path.is_file: meta_data = [meta_data] # type: ignore path = path.parent - return streams.ZipStreamReader(ZipStreamGenerator(self, path, *meta_data)) # type: ignore + return ZipStreamReader(ZipStreamGenerator(self, path, *meta_data)) # type: ignore def shares_storage_root(self, other: 'BaseProvider') -> bool: """Returns True if ``self`` and ``other`` both point to the same storage root. Used to @@ -575,7 +621,7 @@ def can_duplicate_names(self) -> bool: raise NotImplementedError @abc.abstractmethod - async def download(self, src_path: wb_path.WaterButlerPath, **kwargs) -> streams.ResponseStreamReader: + async def download(self, src_path: WaterButlerPath, **kwargs) -> ResponseStreamReader: """Download a file from this provider. :param src_path: ( :class:`.WaterButlerPath` ) Path to the file to be downloaded @@ -586,8 +632,13 @@ async def download(self, src_path: wb_path.WaterButlerPath, **kwargs) -> streams raise NotImplementedError @abc.abstractmethod - async def upload(self, stream: streams.BaseStream, path: wb_path.WaterButlerPath, *args, **kwargs) \ - -> typing.Tuple[wb_metadata.BaseFileMetadata, bool]: + async def upload( + self, + stream: BaseStream, + path: WaterButlerPath, + *args, + **kwargs + ) -> Tuple[BaseFileMetadata, bool]: """Uploads the given stream to the provider. Returns the metadata for the newly created file and a boolean indicating whether the file is completely new (``True``) or overwrote a previously-existing file (``False``) @@ -601,8 +652,9 @@ async def upload(self, stream: streams.BaseStream, path: wb_path.WaterButlerPath raise NotImplementedError @abc.abstractmethod - async def delete(self, src_path: wb_path.WaterButlerPath, **kwargs) -> None: - """ + async def delete(self, src_path: WaterButlerPath, **kwargs) -> None: + """Delete a file or folder. + :param src_path: ( :class:`.WaterButlerPath` ) Path to be deleted :param \*\*kwargs: ( :class:`dict` ) Arguments to be parsed by child classes :rtype: :class:`None` @@ -611,8 +663,7 @@ async def delete(self, src_path: wb_path.WaterButlerPath, **kwargs) -> None: raise NotImplementedError @abc.abstractmethod - async def metadata(self, path: wb_path.WaterButlerPath, **kwargs) \ - -> typing.Union[wb_metadata.BaseMetadata, typing.List[wb_metadata.BaseMetadata]]: + async def metadata(self, path: WaterButlerPath, **kwargs) -> Union[BaseMetadata, List[BaseMetadata]]: """Get metadata about the specified resource from this provider. Will be a :class:`list` if the resource is a directory otherwise an instance of :class:`.BaseFileMetadata` @@ -630,7 +681,7 @@ async def metadata(self, path: wb_path.WaterButlerPath, **kwargs) \ raise NotImplementedError @abc.abstractmethod - async def validate_v1_path(self, path: str, **kwargs) -> wb_path.WaterButlerPath: + async def validate_v1_path(self, path: str, **kwargs) -> WaterButlerPath: """API v1 requires that requests against folder endpoints always end with a slash, and requests against files never end with a slash. This method checks the provider's metadata for the given id and throws a 404 Not Found if the implicit and explicit types don't @@ -651,7 +702,7 @@ async def validate_v1_path(self, path: str, **kwargs) -> wb_path.WaterButlerPath raise NotImplementedError @abc.abstractmethod - async def validate_path(self, path: str, **kwargs) -> wb_path.WaterButlerPath: + async def validate_path(self, path: str, **kwargs) -> WaterButlerPath: """Validates paths passed in via the v0 API. v0 paths are much less strict than v1 paths. They may represent things that exist or something that should be created. As such, the goal of ``validate_path`` is to split the path into its component parts and attempt to determine @@ -671,20 +722,25 @@ async def validate_path(self, path: str, **kwargs) -> wb_path.WaterButlerPath: """ raise NotImplementedError - def path_from_metadata(self, - parent_path: wb_path.WaterButlerPath, - meta_data: wb_metadata.BaseMetadata) -> wb_path.WaterButlerPath: - return parent_path.child(meta_data.name, _id=meta_data.path.strip('/'), - folder=meta_data.is_folder) + def path_from_metadata(self, parent_path: WaterButlerPath, meta_data: BaseMetadata) -> WaterButlerPath: + """Returns a WaterButlerPath derived from the provided metadata. + + :param parent_path: ( :class:`.WaterButlerPath` ) waterbutler path + :rtype: :class"`.WaterButlerPath` + """ + return parent_path.child( + meta_data.name, + _id=meta_data.path.strip('/'), + folder=meta_data.is_folder + ) - async def revisions(self, path: wb_path.WaterButlerPath, **kwargs): + async def revisions(self, path: WaterButlerPath, **kwargs) -> List[BaseFileRevisionMetadata]: """Return a list of :class:`.BaseFileRevisionMetadata` objects representing the revisions available for the file at ``path``. """ return [] # TODO Raise 405 by default h/t @rliebz - async def create_folder(self, path: wb_path.WaterButlerPath, - **kwargs) -> wb_metadata.BaseFolderMetadata: + async def create_folder(self, path: WaterButlerPath, **kwargs) -> BaseFolderMetadata: """Create a folder in the current provider at `path`. Returns a `BaseFolderMetadata` object if successful. May throw a 409 Conflict if a directory with the same name already exists. @@ -694,7 +750,7 @@ async def create_folder(self, path: wb_path.WaterButlerPath, """ raise exceptions.ProviderError({'message': 'Folder creation not supported.'}, code=405) - def _build_range_header(self, slice_tup: typing.Tuple[int, int]) -> str: + def _build_range_header(self, slice_tup: Tuple[int, int]) -> str: start, end = slice_tup return 'bytes={}-{}'.format( '' if start is None else start, diff --git a/waterbutler/providers/googledrive/provider.py b/waterbutler/providers/googledrive/provider.py index d254bfd75..d699a16d6 100644 --- a/waterbutler/providers/googledrive/provider.py +++ b/waterbutler/providers/googledrive/provider.py @@ -647,7 +647,7 @@ async def _file_metadata(self, ) as resp: try: data = await resp.json() - except: # some 404s return a string instead of json + except Exception: # some 404s return a string instead of json data = await resp.read() if resp.status != 200: