From 01903453d9e16c7716d91e7aacf165857c982dbf Mon Sep 17 00:00:00 2001 From: Kevin Barnard Date: Tue, 16 Jul 2024 11:38:11 -0700 Subject: [PATCH] chore: apply pre-commit hook on all files --- docs/source/conf.py | 15 +- fathomnet/api/__init__.py | 52 +++-- fathomnet/api/activity.py | 76 ++++--- fathomnet/api/boundingboxes.py | 140 +++++++----- fathomnet/api/comments.py | 46 ++-- fathomnet/api/darwincore.py | 6 +- fathomnet/api/firebase.py | 7 +- fathomnet/api/geoimages.py | 31 +-- fathomnet/api/images.py | 61 +++--- fathomnet/api/imagesetuploads.py | 25 ++- fathomnet/api/regions.py | 16 +- fathomnet/api/stats.py | 4 +- fathomnet/api/tags.py | 20 +- fathomnet/api/taxa.py | 16 +- fathomnet/api/topics.py | 30 ++- fathomnet/api/users.py | 107 ++++++--- fathomnet/api/worms.py | 46 ++-- fathomnet/api/xapikey.py | 10 +- fathomnet/dto.py | 109 ++++++---- fathomnet/models/__init__.py | 8 +- fathomnet/models/bases.py | 5 +- fathomnet/models/yolov5.py | 12 +- fathomnet/scripts/fathomnet_generate.py | 277 ++++++++++++------------ fathomnet/util.py | 14 +- test/__init__.py | 8 +- test/test_activity.py | 9 +- test/test_boundingboxes.py | 45 ++-- test/test_comments.py | 19 +- test/test_darwincore.py | 1 - test/test_firebase.py | 9 +- test/test_geoimages.py | 8 +- test/test_images.py | 55 ++--- test/test_imagesetuploads.py | 12 +- test/test_regions.py | 7 +- test/test_tags.py | 19 +- test/test_taxa.py | 14 +- test/test_topics.py | 15 +- test/test_users.py | 31 +-- test/test_worms.py | 78 ++++--- test/test_xapikey.py | 7 +- 40 files changed, 839 insertions(+), 631 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index a28cd19..d1b103f 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -12,14 +12,15 @@ # import os import sys -sys.path.insert(0, os.path.abspath('..')) + +sys.path.insert(0, os.path.abspath("..")) # -- Project information ----------------------------------------------------- -project = 'fathomnet-py' -copyright = '2022-2024, Monterey Bay Aquarium Research Institute' -author = 'Kevin Barnard' +project = "fathomnet-py" +copyright = "2022-2024, Monterey Bay Aquarium Research Institute" +author = "Kevin Barnard" # -- General configuration --------------------------------------------------- @@ -27,9 +28,7 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = [ - 'sphinx.ext.autodoc' -] +extensions = ["sphinx.ext.autodoc"] # Add any paths that contain templates here, relative to this directory. templates_path = [] @@ -45,7 +44,7 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = 'sphinx_rtd_theme' +html_theme = "sphinx_rtd_theme" # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, diff --git a/fathomnet/api/__init__.py b/fathomnet/api/__init__.py index 5bae67d..995fd75 100644 --- a/fathomnet/api/__init__.py +++ b/fathomnet/api/__init__.py @@ -9,42 +9,62 @@ class EndpointManager: - ROOT = 'http://fathomnet.org:8080' + ROOT = "http://fathomnet.org:8080" PATH = None @classmethod def url(cls, endpoint: str) -> str: if cls.PATH is None: raise NotImplementedError - return '/'.join([cls.ROOT, cls.PATH, endpoint]) + return "/".join([cls.ROOT, cls.PATH, endpoint]) @classmethod - def request(cls, method: str, endpoint: str, parse_json: bool = True, **kwargs) -> Union[requests.Response, dict, list]: + def request( + cls, method: str, endpoint: str, parse_json: bool = True, **kwargs + ) -> Union[requests.Response, dict, list]: url = cls.url(endpoint) res = SESSION.request(method, url, **kwargs) if res.ok: # Status code < 400 return res.json() if parse_json else res elif res.status_code == 401: # Not authorized, need to authenticate - raise ValueError('Unauthorized: please authenticate first.') - elif res.status_code == 403: # Forbidden, can't access this endpoint with given authentication - raise ValueError('Forbidden: you cannot access this resource.') + raise ValueError("Unauthorized: please authenticate first.") + elif ( + res.status_code == 403 + ): # Forbidden, can't access this endpoint with given authentication + raise ValueError("Forbidden: you cannot access this resource.") elif res.status_code < 500: # User error - raise ValueError('Client error ({}), please check your usage (docs: https://fathomnet-py.rtfd.io/) or open an issue at https://github.com/fathomnet/fathomnet-py/issues/new with the details below.\n{}'.format(res.status_code, debug_format_response(res))) + raise ValueError( + "Client error ({}), please check your usage (docs: https://fathomnet-py.rtfd.io/) or open an issue at https://github.com/fathomnet/fathomnet-py/issues/new with the details below.\n{}".format( + res.status_code, debug_format_response(res) + ) + ) else: # Server error, debug the response - raise ValueError('Server error ({}), please contact the FathomNet administrators with the details below.\n\n{}'.format(res.status_code, debug_format_response(res))) + raise ValueError( + "Server error ({}), please contact the FathomNet administrators with the details below.\n\n{}".format( + res.status_code, debug_format_response(res) + ) + ) @classmethod - def get(cls, endpoint: str, parse_json: bool = True, **kwargs) -> Union[requests.Response, dict, list]: - return cls.request('GET', endpoint, parse_json=parse_json, **kwargs) + def get( + cls, endpoint: str, parse_json: bool = True, **kwargs + ) -> Union[requests.Response, dict, list]: + return cls.request("GET", endpoint, parse_json=parse_json, **kwargs) @classmethod - def put(cls, endpoint: str, parse_json: bool = True, **kwargs) -> Union[requests.Response, dict, list]: - return cls.request('PUT', endpoint, parse_json=parse_json, **kwargs) + def put( + cls, endpoint: str, parse_json: bool = True, **kwargs + ) -> Union[requests.Response, dict, list]: + return cls.request("PUT", endpoint, parse_json=parse_json, **kwargs) @classmethod - def post(cls, endpoint: str, parse_json: bool = True, **kwargs) -> Union[requests.Response, dict, list]: - return cls.request('POST', endpoint, parse_json=parse_json, **kwargs) + def post( + cls, endpoint: str, parse_json: bool = True, **kwargs + ) -> Union[requests.Response, dict, list]: + return cls.request("POST", endpoint, parse_json=parse_json, **kwargs) @classmethod - def delete(cls, endpoint: str, parse_json: bool = True, **kwargs) -> Union[requests.Response, dict, list]: - return cls.request('DELETE', endpoint, parse_json=parse_json, **kwargs) + def delete( + cls, endpoint: str, parse_json: bool = True, **kwargs + ) -> Union[requests.Response, dict, list]: + return cls.request("DELETE", endpoint, parse_json=parse_json, **kwargs) diff --git a/fathomnet/api/activity.py b/fathomnet/api/activity.py index 52d33ad..d355f5b 100644 --- a/fathomnet/api/activity.py +++ b/fathomnet/api/activity.py @@ -2,65 +2,81 @@ from typing import List, Optional -from fathomnet.api import EndpointManager from fathomnet import dto +from fathomnet.api import EndpointManager class Activity(EndpointManager): - PATH = 'activity' + PATH = "activity" -def find_all(auth_header: Optional[dto.AuthHeader] = None, - start_timestamp: Optional[str] = None, end_timestamp: Optional[str] = None, - limit: Optional[int] = None, offset: Optional[int] = None, - include_self: Optional[bool] = None) -> List[dto.Activity]: +def find_all( + auth_header: Optional[dto.AuthHeader] = None, + start_timestamp: Optional[str] = None, + end_timestamp: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + include_self: Optional[bool] = None, +) -> List[dto.Activity]: """Get a list of all activity.""" params = {} if start_timestamp: - params['startTimestamp'] = start_timestamp + params["startTimestamp"] = start_timestamp if end_timestamp: - params['endTimestamp'] = end_timestamp + params["endTimestamp"] = end_timestamp if limit: - params['limit'] = limit + params["limit"] = limit if offset: - params['offset'] = offset + params["offset"] = offset if include_self: - params['includeSelf'] = include_self - res_json = Activity.get('', params=params, auth=auth_header) + params["includeSelf"] = include_self + res_json = Activity.get("", params=params, auth=auth_header) return list(map(dto.Activity.from_dict, res_json)) -def find_by_email(email: str, - auth_header: Optional[dto.AuthHeader] = None, - start_timestamp: Optional[str] = None, end_timestamp: Optional[str] = None, - limit: Optional[int] = None, offset: Optional[int] = None) -> List[dto.Activity]: +def find_by_email( + email: str, + auth_header: Optional[dto.AuthHeader] = None, + start_timestamp: Optional[str] = None, + end_timestamp: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, +) -> List[dto.Activity]: """Get a list of activity by email.""" params = {} if start_timestamp: - params['startTimestamp'] = start_timestamp + params["startTimestamp"] = start_timestamp if end_timestamp: - params['endTimestamp'] = end_timestamp + params["endTimestamp"] = end_timestamp if limit: - params['limit'] = limit + params["limit"] = limit if offset: - params['offset'] = offset - res_json = Activity.get('query/email/{}'.format(email), params=params, auth=auth_header) + params["offset"] = offset + res_json = Activity.get( + "query/email/{}".format(email), params=params, auth=auth_header + ) return list(map(dto.Activity.from_dict, res_json)) -def find_by_email_admin(email: str, - auth_header: Optional[dto.AuthHeader] = None, - start_timestamp: Optional[str] = None, end_timestamp: Optional[str] = None, - limit: Optional[int] = None, offset: Optional[int] = None) -> List[dto.Activity]: +def find_by_email_admin( + email: str, + auth_header: Optional[dto.AuthHeader] = None, + start_timestamp: Optional[str] = None, + end_timestamp: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, +) -> List[dto.Activity]: """(Admin) Get a list of activity by email. Used to support notification applications.""" params = {} if start_timestamp: - params['startTimestamp'] = start_timestamp + params["startTimestamp"] = start_timestamp if end_timestamp: - params['endTimestamp'] = end_timestamp + params["endTimestamp"] = end_timestamp if limit: - params['limit'] = limit + params["limit"] = limit if offset: - params['offset'] = offset - res_json = Activity.get('admin/query/email/{}'.format(email), params=params, auth=auth_header) + params["offset"] = offset + res_json = Activity.get( + "admin/query/email/{}".format(email), params=params, auth=auth_header + ) return list(map(dto.Activity.from_dict, res_json)) diff --git a/fathomnet/api/boundingboxes.py b/fathomnet/api/boundingboxes.py index d1bce1e..8c30a12 100644 --- a/fathomnet/api/boundingboxes.py +++ b/fathomnet/api/boundingboxes.py @@ -1,5 +1,5 @@ # boundingboxes.py (fathomnet-py) -from typing import List, BinaryIO, Optional +from typing import BinaryIO, List, Optional from urllib.parse import quote from fathomnet import dto @@ -7,64 +7,64 @@ class BoundingBoxes(EndpointManager): - PATH = 'boundingboxes' + PATH = "boundingboxes" -def create_with_dto(bounding_box: dto.BoundingBoxDTO, auth_header: Optional[dto.AuthHeader] = None) -> dto.BoundingBoxDTO: +def create_with_dto( + bounding_box: dto.BoundingBoxDTO, auth_header: Optional[dto.AuthHeader] = None +) -> dto.BoundingBoxDTO: """Create a bounding box.""" - res_json = BoundingBoxes.post('', - json=bounding_box.to_dict(), - auth=auth_header) + res_json = BoundingBoxes.post("", json=bounding_box.to_dict(), auth=auth_header) return dto.BoundingBoxDTO.from_dict(res_json) def count_all() -> dto.Count: """Get a count of all bounding boxes.""" - res_json = BoundingBoxes.get('count') + res_json = BoundingBoxes.get("count") return dto.Count.from_dict(res_json) def find_concepts() -> List[str]: """Get a list of all concepts.""" - res_json = BoundingBoxes.get('list/concepts') + res_json = BoundingBoxes.get("list/concepts") return res_json def count_total_by_concept() -> List[dto.ByConceptCount]: """Get a count of bounding boxes for each concept.""" - res_json = BoundingBoxes.get('list/counts') + res_json = BoundingBoxes.get("list/counts") return list(map(dto.ByConceptCount.from_dict, res_json)) def find_observers() -> List[str]: """Get a list of all observers.""" - res_json = BoundingBoxes.get('list/observers') + res_json = BoundingBoxes.get("list/observers") return res_json def count_by_concept(concept: str) -> dto.ByConceptCount: """Get a count of bounding boxes for a concept.""" - res_json = BoundingBoxes.get('query/count/{}'.format(concept)) + res_json = BoundingBoxes.get("query/count/{}".format(concept)) return dto.ByConceptCount.from_dict(res_json) def find_by_user_defined_key(user_defined_key: str) -> List[dto.BoundingBoxDTO]: """Get a list of bounding boxes by a user-defined key.""" - res_json = BoundingBoxes.get('query/userdefinedkey/{}'.format(user_defined_key)) + res_json = BoundingBoxes.get("query/userdefinedkey/{}".format(user_defined_key)) return list(map(dto.BoundingBoxDTO.from_dict, res_json)) def find_all_user_defined_keys() -> List[str]: """Get a list of all user-defined keys.""" - res_json = BoundingBoxes.get('query/userdefinedkeys') + res_json = BoundingBoxes.get("query/userdefinedkeys") return res_json -def upload_csv(csv_fp: BinaryIO, auth_header: Optional[dto.AuthHeader] = None) -> dto.Message: +def upload_csv( + csv_fp: BinaryIO, auth_header: Optional[dto.AuthHeader] = None +) -> dto.Message: """Upload a CSV of bounding boxes.""" - res_json = BoundingBoxes.post('upload/csv', - files={'csv': csv_fp}, - auth=auth_header) + res_json = BoundingBoxes.post("upload/csv", files={"csv": csv_fp}, auth=auth_header) return dto.Message.from_dict(res_json) @@ -74,11 +74,13 @@ def find_by_uuid(uuid: str) -> dto.BoundingBoxDTO: return dto.BoundingBoxDTO.from_dict(res_json) -def update(uuid: str, bounding_box: dto.ABoundingBoxDTO, auth_header: Optional[dto.AuthHeader] = None) -> dto.BoundingBoxDTO: +def update( + uuid: str, + bounding_box: dto.ABoundingBoxDTO, + auth_header: Optional[dto.AuthHeader] = None, +) -> dto.BoundingBoxDTO: """Update a bounding box.""" - res_json = BoundingBoxes.put(uuid, - json=bounding_box.to_dict(), - auth=auth_header) + res_json = BoundingBoxes.put(uuid, json=bounding_box.to_dict(), auth=auth_header) return dto.BoundingBoxDTO.from_dict(res_json) @@ -89,80 +91,106 @@ def delete(uuid: str, auth_header: Optional[dto.AuthHeader] = None): def audit_by_uuid(uuid: str) -> List[dto.BoundingBoxDTO]: """Get an audit of a bounding box by UUID.""" - res_json = BoundingBoxes.get('audit/uuid/{}'.format(uuid)) + res_json = BoundingBoxes.get("audit/uuid/{}".format(uuid)) return list(map(dto.BoundingBoxDTO.from_dict, res_json)) def audit_by_user_defined_key(user_defined_key: str) -> List[dto.BoundingBoxDTO]: """Get an audit of a bounding box by user-defined key.""" - res_json = BoundingBoxes.get('audit/userdefinedkey/{}'.format(user_defined_key)) + res_json = BoundingBoxes.get("audit/userdefinedkey/{}".format(user_defined_key)) return list(map(dto.BoundingBoxDTO.from_dict, res_json)) def find_searchable_concepts() -> List[str]: """Get a list of searchable concepts.""" - res_json = BoundingBoxes.get('list/searchable') + res_json = BoundingBoxes.get("list/searchable") return res_json -def find_by_observer_uuid(uuid: str, pageable: Optional[dto.Pageable] = None) -> List[dto.BoundingBoxDTO]: +def find_by_observer_uuid( + uuid: str, pageable: Optional[dto.Pageable] = None +) -> List[dto.BoundingBoxDTO]: """Get a list of bounding boxes by observer UUID.""" - res_json = BoundingBoxes.get('query/observer/{}'.format(uuid), params=pageable.to_params() if pageable else None) - return list(map(dto.BoundingBoxDTO.from_dict, res_json.get('content', []))) + res_json = BoundingBoxes.get( + "query/observer/{}".format(uuid), + params=pageable.to_params() if pageable else None, + ) + return list(map(dto.BoundingBoxDTO.from_dict, res_json.get("content", []))) -def find_by_verifier_uuid(uuid: str, pageable: Optional[dto.Pageable] = None) -> List[dto.BoundingBoxDTO]: +def find_by_verifier_uuid( + uuid: str, pageable: Optional[dto.Pageable] = None +) -> List[dto.BoundingBoxDTO]: """Get a list of bounding boxes by verifier UUID.""" - res_json = BoundingBoxes.get('query/verifier/{}'.format(uuid), params=pageable.to_params() if pageable else None) - return list(map(dto.BoundingBoxDTO.from_dict, res_json.get('content', []))) - - -def audit_by_concepts(concepts: List[str], - start_timestamp: Optional[str] = None, end_timestamp: Optional[str] = None, - limit: Optional[int] = None, offset: Optional[int] = None) -> List[dto.BoundingBoxDTO]: + res_json = BoundingBoxes.get( + "query/verifier/{}".format(uuid), + params=pageable.to_params() if pageable else None, + ) + return list(map(dto.BoundingBoxDTO.from_dict, res_json.get("content", []))) + + +def audit_by_concepts( + concepts: List[str], + start_timestamp: Optional[str] = None, + end_timestamp: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, +) -> List[dto.BoundingBoxDTO]: """Get an audit of bounding boxes by concepts.""" params = {} if start_timestamp: - params['startTimestamp'] = start_timestamp + params["startTimestamp"] = start_timestamp if end_timestamp: - params['endTimestamp'] = end_timestamp + params["endTimestamp"] = end_timestamp if limit: - params['limit'] = limit + params["limit"] = limit if offset: - params['offset'] = offset - res_json = BoundingBoxes.get('audit/concepts/{}'.format(quote(','.join(concepts))), params=params) + params["offset"] = offset + res_json = BoundingBoxes.get( + "audit/concepts/{}".format(quote(",".join(concepts))), params=params + ) return list(map(dto.BoundingBoxDTO.from_dict, res_json)) -def audit_by_verifier(uuid: str, - start_timestamp: Optional[str] = None, end_timestamp: Optional[str] = None, - limit: Optional[int] = None, offset: Optional[int] = None) -> List[dto.BoundingBoxDTO]: +def audit_by_verifier( + uuid: str, + start_timestamp: Optional[str] = None, + end_timestamp: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, +) -> List[dto.BoundingBoxDTO]: """Get an audit of bounding boxes by verifier UUID.""" params = {} if start_timestamp: - params['startTimestamp'] = start_timestamp + params["startTimestamp"] = start_timestamp if end_timestamp: - params['endTimestamp'] = end_timestamp + params["endTimestamp"] = end_timestamp if limit: - params['limit'] = limit + params["limit"] = limit if offset: - params['offset'] = offset - res_json = BoundingBoxes.get('audit/verifier/{}'.format(uuid), params=params) + params["offset"] = offset + res_json = BoundingBoxes.get("audit/verifier/{}".format(uuid), params=params) return list(map(dto.BoundingBoxDTO.from_dict, res_json)) -def audit_by_observer(observer: str, - start_timestamp: Optional[str] = None, end_timestamp: Optional[str] = None, - limit: Optional[int] = None, offset: Optional[int] = None) -> List[dto.BoundingBoxDTO]: +def audit_by_observer( + observer: str, + start_timestamp: Optional[str] = None, + end_timestamp: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, +) -> List[dto.BoundingBoxDTO]: """Get an audit of bounding boxes by observer.""" params = {} if start_timestamp: - params['startTimestamp'] = start_timestamp + params["startTimestamp"] = start_timestamp if end_timestamp: - params['endTimestamp'] = end_timestamp + params["endTimestamp"] = end_timestamp if limit: - params['limit'] = limit + params["limit"] = limit if offset: - params['offset'] = offset - res_json = BoundingBoxes.get('audit/observer/{}'.format(quote(observer)), params=params) + params["offset"] = offset + res_json = BoundingBoxes.get( + "audit/observer/{}".format(quote(observer)), params=params + ) return list(map(dto.BoundingBoxDTO.from_dict, res_json)) diff --git a/fathomnet/api/comments.py b/fathomnet/api/comments.py index 3f4f937..93c5742 100644 --- a/fathomnet/api/comments.py +++ b/fathomnet/api/comments.py @@ -2,17 +2,23 @@ from typing import List, Optional -from fathomnet.api import EndpointManager from fathomnet import dto +from fathomnet.api import EndpointManager class Comments(EndpointManager): - PATH = 'comments' + PATH = "comments" -def create(uuid: str, comment_content: dto.BoundingBoxCommentContent, auth_header: Optional[dto.AuthHeader] = None) -> dto.BoundingBoxComment: +def create( + uuid: str, + comment_content: dto.BoundingBoxCommentContent, + auth_header: Optional[dto.AuthHeader] = None, +) -> dto.BoundingBoxComment: """Create a comment.""" - res_json = Comments.post('boundingbox/{}'.format(uuid), json=comment_content.to_dict(), auth=auth_header) + res_json = Comments.post( + "boundingbox/{}".format(uuid), json=comment_content.to_dict(), auth=auth_header + ) return dto.BoundingBoxComment.from_dict(res_json) @@ -22,7 +28,11 @@ def find_by_uuid(uuid: str) -> dto.BoundingBoxComment: return dto.BoundingBoxComment.from_dict(res_json) -def update(uuid: str, comment_content: dto.BoundingBoxCommentContent, auth_header: Optional[dto.AuthHeader] = None) -> dto.BoundingBoxComment: +def update( + uuid: str, + comment_content: dto.BoundingBoxCommentContent, + auth_header: Optional[dto.AuthHeader] = None, +) -> dto.BoundingBoxComment: """Update a comment.""" res_json = Comments.put(uuid, json=comment_content.to_dict(), auth=auth_header) return dto.BoundingBoxComment.from_dict(res_json) @@ -34,22 +44,32 @@ def delete(uuid: str, auth_header: Optional[dto.AuthHeader] = None) -> None: return res.status_code == 200 -def find_by_bounding_box_uuid(uuid: str, auth_header: Optional[dto.AuthHeader] = None) -> List[dto.BoundingBoxComment]: +def find_by_bounding_box_uuid( + uuid: str, auth_header: Optional[dto.AuthHeader] = None +) -> List[dto.BoundingBoxComment]: """Get a list of comments by bounding box uuid.""" - res_json = Comments.get('boundingbox/{}'.format(uuid), auth=auth_header) + res_json = Comments.get("boundingbox/{}".format(uuid), auth=auth_header) return list(map(dto.BoundingBoxComment.from_dict, res_json)) -def find_by_email(email: str, pageable: Optional[dto.Pageable] = None, auth_header: Optional[dto.AuthHeader] = None) -> List[dto.BoundingBoxComment]: +def find_by_email( + email: str, + pageable: Optional[dto.Pageable] = None, + auth_header: Optional[dto.AuthHeader] = None, +) -> List[dto.BoundingBoxComment]: """Get a list of comments by email.""" - params = {'email': email} + params = {"email": email} if pageable: params.update(pageable.to_dict()) - res_json = Comments.get('query/email', params=params, auth=auth_header) - return list(map(dto.BoundingBoxComment.from_dict, res_json.get('content', []))) + res_json = Comments.get("query/email", params=params, auth=auth_header) + return list(map(dto.BoundingBoxComment.from_dict, res_json.get("content", []))) -def flag(uuid: str, value: bool, auth_header: Optional[dto.AuthHeader] = None) -> dto.BoundingBoxComment: +def flag( + uuid: str, value: bool, auth_header: Optional[dto.AuthHeader] = None +) -> dto.BoundingBoxComment: """Flag a comment.""" - res_json = Comments.post('flag/{}'.format(uuid), params={'flag': value}, auth=auth_header) + res_json = Comments.post( + "flag/{}".format(uuid), params={"flag": value}, auth=auth_header + ) return dto.BoundingBoxComment.from_dict(res_json) diff --git a/fathomnet/api/darwincore.py b/fathomnet/api/darwincore.py index c15c553..5eb6647 100644 --- a/fathomnet/api/darwincore.py +++ b/fathomnet/api/darwincore.py @@ -5,16 +5,16 @@ class DarwinCore(EndpointManager): - PATH = 'darwincore' + PATH = "darwincore" def index() -> str: """Get the darwin core index page.""" - res = DarwinCore.get('', parse_json=False) + res = DarwinCore.get("", parse_json=False) return res.text def find_owner_institution_codes() -> List[str]: """Get a list of owner institutions.""" - res_json = DarwinCore.get('list/ownerinstitutions') + res_json = DarwinCore.get("list/ownerinstitutions") return res_json diff --git a/fathomnet/api/firebase.py b/fathomnet/api/firebase.py index adf4345..f043819 100644 --- a/fathomnet/api/firebase.py +++ b/fathomnet/api/firebase.py @@ -6,19 +6,18 @@ class Firebase(EndpointManager): - PATH = 'firebase' + PATH = "firebase" def auth() -> dto.AuthHeader: """Authenticate via firebase and get a JWT.""" raise NotImplementedError # TODO figure out firebase authentication - res_json = Firebase.post('auth', - json={}) + res_json = Firebase.post("auth", json={}) return dto.AuthHeader.from_dict(res_json) def test(auth_header: Optional[dto.AuthHeader] = None) -> dto.Message: """Test an authorization token.""" - res_json = Firebase.get('test', auth=auth_header) + res_json = Firebase.get("test", auth=auth_header) return dto.Message.from_dict(res_json) diff --git a/fathomnet/api/geoimages.py b/fathomnet/api/geoimages.py index 9961043..225c91c 100644 --- a/fathomnet/api/geoimages.py +++ b/fathomnet/api/geoimages.py @@ -6,35 +6,38 @@ class GeoImages(EndpointManager): - PATH = 'geoimages' + PATH = "geoimages" def find_all(pageable: Optional[dto.Pageable] = None) -> List[dto.GeoImage]: """Get a paged list of all geo images.""" - res_json = GeoImages.get('', - params=pageable.to_params() if pageable else None) + res_json = GeoImages.get("", params=pageable.to_params() if pageable else None) # Note: schema inconsistent with response, need to grab the 'content' object - return list(map(dto.GeoImage.from_dict, res_json.get('content', []))) + return list(map(dto.GeoImage.from_dict, res_json.get("content", []))) -def count(geo_image_constraints: dto.GeoImageConstraints) -> dto.GeoImageConstraintsCount: +def count( + geo_image_constraints: dto.GeoImageConstraints, +) -> dto.GeoImageConstraintsCount: """Get a constrained count of geo images.""" - res_json = GeoImages.post('count', - json=geo_image_constraints.to_dict()) + res_json = GeoImages.post("count", json=geo_image_constraints.to_dict()) return dto.GeoImageConstraintsCount.from_dict(res_json) def find(geo_image_constraints: dto.GeoImageConstraints) -> List[dto.GeoImage]: """Get a constrained list of geo images.""" - res_json = GeoImages.post('query', - json=geo_image_constraints.to_dict()) + res_json = GeoImages.post("query", json=geo_image_constraints.to_dict()) return list(map(dto.GeoImage.from_dict, res_json)) -def find_by_image_set_upload_uuid(image_set_upload_uuid: str, - limit: Optional[int] = None, - offset: Optional[int] = None) -> List[dto.GeoImage]: +def find_by_image_set_upload_uuid( + image_set_upload_uuid: str, + limit: Optional[int] = None, + offset: Optional[int] = None, +) -> List[dto.GeoImage]: """Get a list of geo images corresponding to an image set upload UUID.""" - res_json = GeoImages.get('query/imagesetupload/{}'.format(image_set_upload_uuid), - params={'limit': limit, 'offset': offset}) + res_json = GeoImages.get( + "query/imagesetupload/{}".format(image_set_upload_uuid), + params={"limit": limit, "offset": offset}, + ) return list(map(dto.GeoImage.from_dict, res_json)) diff --git a/fathomnet/api/images.py b/fathomnet/api/images.py index 5a57916..987859f 100644 --- a/fathomnet/api/images.py +++ b/fathomnet/api/images.py @@ -7,105 +7,104 @@ class Images(EndpointManager): - PATH = 'images' + PATH = "images" def find_all_alt(pageable: Optional[dto.Pageable] = None) -> List[dto.AImageDTO]: """Get a paged list of all images. (alternative endpoint)""" - res_json = Images.get('', - params=pageable.to_params() if pageable else None) - return list(map(dto.AImageDTO.from_dict, res_json.get('content', []))) + res_json = Images.get("", params=pageable.to_params() if pageable else None) + return list(map(dto.AImageDTO.from_dict, res_json.get("content", []))) -def create_if_not_exists(images: List[dto.Image], auth_header: Optional[dto.AuthHeader] = None) -> List[dto.AImageDTO]: +def create_if_not_exists( + images: List[dto.Image], auth_header: Optional[dto.AuthHeader] = None +) -> List[dto.AImageDTO]: """Create an image if it doesn't exist.""" - res_json = Images.post('', - json=list(map(dto.Image.to_dict, images)), - auth=auth_header) + res_json = Images.post( + "", json=list(map(dto.Image.to_dict, images)), auth=auth_header + ) return list(map(dto.Image.from_dict, res_json)) def count_all() -> dto.Count: """Get a count of all images.""" - res_json = Images.get('count') + res_json = Images.get("count") return dto.Count.from_dict(res_json) def find_all(pageable: Optional[dto.Pageable] = None) -> List[dto.AImageDTO]: """Get a paged list of all images.""" - res_json = Images.get('list/all', - params=pageable.to_params() if pageable else None) + res_json = Images.get("list/all", params=pageable.to_params() if pageable else None) # Note: schema inconsistent with response, need to grab the 'content' object - return list(map(dto.AImageDTO.from_dict, res_json.get('content', []))) + return list(map(dto.AImageDTO.from_dict, res_json.get("content", []))) def find_distinct_submitter() -> List[str]: """Get a list of all submitters.""" - res_json = Images.get('list/contributors') + res_json = Images.get("list/contributors") return res_json def list_imaging_types() -> List[str]: """Get a list of all imaging types.""" - res_json = Images.get('list/imagingtypes') + res_json = Images.get("list/imagingtypes") return res_json def find(geo_image_constraints: dto.GeoImageConstraints) -> List[dto.AImageDTO]: """Get a constrained list of images.""" - res_json = Images.post('query', - json=geo_image_constraints.to_dict()) + res_json = Images.post("query", json=geo_image_constraints.to_dict()) return list(map(dto.AImageDTO.from_dict, res_json)) def find_by_concept(concept: str, taxa: Optional[str] = None) -> List[dto.AImageDTO]: """Get a list of images by concept (and optionally taxa provider).""" - res_json = Images.get('query/concept/{}'.format(quote(concept)), - params={'taxa': taxa} if taxa else None) + res_json = Images.get( + "query/concept/{}".format(quote(concept)), + params={"taxa": taxa} if taxa else None, + ) return list(map(dto.AImageDTO.from_dict, res_json)) def find_by_contributors_email(contributors_email: str) -> List[dto.AImageDTO]: """Get a list of images by contributor.""" - res_json = Images.get('query/contributor/{}'.format(contributors_email)) + res_json = Images.get("query/contributor/{}".format(contributors_email)) return list(map(dto.AImageDTO.from_dict, res_json)) def count_by_submitter(contributors_email: str) -> dto.ByContributorCount: """Get a count of images by contributor.""" - res_json = Images.get('query/count/contributor/{}'.format(contributors_email)) + res_json = Images.get("query/count/contributor/{}".format(contributors_email)) return dto.ByContributorCount.from_dict(res_json) def find_by_observer(observer: str) -> List[dto.AImageDTO]: """Get a list of images by observer.""" - res_json = Images.get('query/observer/{}'.format(observer)) + res_json = Images.get("query/observer/{}".format(observer)) return list(map(dto.AImageDTO.from_dict, res_json)) def find_by_sha256(sha256: str) -> List[dto.AImageDTO]: """Get a list of images by SHA256 hash.""" - res_json = Images.get('query/sha256/{}'.format(sha256)) + res_json = Images.get("query/sha256/{}".format(sha256)) return list(map(dto.AImageDTO.from_dict, res_json)) def find_by_tag_key(key: str, value: str) -> List[dto.AImageDTO]: """Get a list of images by a specified tag key-value pair.""" - res_json = Images.get('query/tags', - params={'key': key, 'value': value}) + res_json = Images.get("query/tags", params={"key": key, "value": value}) return list(map(dto.AImageDTO.from_dict, res_json)) def find_by_url(url: str) -> dto.AImageDTO: """Get an image by URL.""" - res_json = Images.get('query/url/{}'.format(quote_plus(url))) + res_json = Images.get("query/url/{}".format(quote_plus(url))) return dto.AImageDTO.from_dict(res_json) def find_by_uuid_in_list(uuids: List[str]) -> List[dto.AImageDTO]: """Get a list of images corresponding to a specified list of UUIDs.""" - res_json = Images.post('query/uuids', - json=uuids) + res_json = Images.post("query/uuids", json=uuids) return list(map(dto.AImageDTO.from_dict, res_json)) @@ -115,11 +114,11 @@ def find_by_uuid(uuid: str) -> dto.AImageDTO: return dto.AImageDTO.from_dict(res_json) -def update(uuid: str, image: dto.AImageDTO, auth_header: Optional[dto.AuthHeader] = None) -> dto.AImageDTO: +def update( + uuid: str, image: dto.AImageDTO, auth_header: Optional[dto.AuthHeader] = None +) -> dto.AImageDTO: """Update an image.""" - res_json = Images.put(uuid, - json=image.to_dict(), - auth=auth_header) + res_json = Images.put(uuid, json=image.to_dict(), auth=auth_header) return dto.AImageDTO.from_dict(res_json) diff --git a/fathomnet/api/imagesetuploads.py b/fathomnet/api/imagesetuploads.py index 6a1a08e..495e911 100644 --- a/fathomnet/api/imagesetuploads.py +++ b/fathomnet/api/imagesetuploads.py @@ -6,50 +6,53 @@ class ImageSetUploads(EndpointManager): - PATH = 'imagesetuploads' + PATH = "imagesetuploads" def count_all() -> dto.Count: """Count all image set uploads.""" - res_json = ImageSetUploads.get('count') + res_json = ImageSetUploads.get("count") return dto.Count.from_dict(res_json) -def find_collections(pageable: Optional[dto.Pageable] = None) -> List[dto.BImageSetUploadDTO]: +def find_collections( + pageable: Optional[dto.Pageable] = None, +) -> List[dto.BImageSetUploadDTO]: """Get a paged list of all image set uploads.""" - res_json = ImageSetUploads.get('list/all', - params=pageable.to_params() if pageable else None) + res_json = ImageSetUploads.get( + "list/all", params=pageable.to_params() if pageable else None + ) # Note: schema inconsistent with response, need to grab the 'content' object - return list(map(dto.BImageSetUploadDTO.from_dict, res_json.get('content', []))) + return list(map(dto.BImageSetUploadDTO.from_dict, res_json.get("content", []))) def find_contributors() -> List[str]: """Get a list of all contributors.""" - res_json = ImageSetUploads.get('list/contributors') + res_json = ImageSetUploads.get("list/contributors") return res_json def find_rejection_reasons() -> List[str]: """Get a list of all rejection reasons.""" - res_json = ImageSetUploads.get('list/rejectionreasons') + res_json = ImageSetUploads.get("list/rejectionreasons") return res_json def find_by_contributor(contributors_email: str) -> List[dto.BImageSetUploadDTO]: """Get a list of image set uploads by contributor.""" - res_json = ImageSetUploads.get('query/contributor/{}'.format(contributors_email)) + res_json = ImageSetUploads.get("query/contributor/{}".format(contributors_email)) return list(map(dto.BImageSetUploadDTO.from_dict, res_json)) def find_by_image_uuid(image_uuid: str) -> List[dto.BImageSetUploadDTO]: """Get an image set upload by UUID.""" - res_json = ImageSetUploads.get('query/image/{}'.format(image_uuid)) + res_json = ImageSetUploads.get("query/image/{}".format(image_uuid)) return list(map(dto.BImageSetUploadDTO.from_dict, res_json)) def stats(image_set_upload_uuid: str) -> dto.ImageSetUploadStats: """Get image set upload statistics for a corresponding image set upload UUID.""" - res_json = ImageSetUploads.get('stats/{}'.format(image_set_upload_uuid)) + res_json = ImageSetUploads.get("stats/{}".format(image_set_upload_uuid)) return dto.ImageSetUploadStats.from_dict(res_json) diff --git a/fathomnet/api/regions.py b/fathomnet/api/regions.py index 12cc88e..cc7b70b 100644 --- a/fathomnet/api/regions.py +++ b/fathomnet/api/regions.py @@ -6,34 +6,36 @@ class Regions(EndpointManager): - PATH = 'regions' + PATH = "regions" def find_all() -> List[dto.MarineRegion]: """Get a list of all marine regions.""" - res_json = Regions.get('') + res_json = Regions.get("") return list(map(dto.MarineRegion.from_dict, res_json)) def count_all() -> int: """Get a count of all marine regions.""" - res = Regions.get('count', parse_json=False) + res = Regions.get("count", parse_json=False) return int(res.content) def find_all_paged(pageable: Optional[dto.Pageable]) -> List[dto.MarineRegion]: """Get a paged list of all marine regions.""" - res_json = Regions.get('list/all', params=pageable.to_params() if pageable else None) - return list(map(dto.MarineRegion.from_dict, res_json.get('content', []))) + res_json = Regions.get( + "list/all", params=pageable.to_params() if pageable else None + ) + return list(map(dto.MarineRegion.from_dict, res_json.get("content", []))) def sync(auth_header: Optional[dto.AuthHeader] = None) -> int: """Synchronize.""" - res = Regions.get('sync', parse_json=False, auth=auth_header) + res = Regions.get("sync", parse_json=False, auth=auth_header) return int(res.content) def find_at(latitude: float, longitude: float) -> List[dto.MarineRegion]: """Get the marine regions at the given latitude and longitude.""" - res_json = Regions.get('at', params={'latitude': latitude, 'longitude': longitude}) + res_json = Regions.get("at", params={"latitude": latitude, "longitude": longitude}) return list(map(dto.MarineRegion.from_dict, res_json)) diff --git a/fathomnet/api/stats.py b/fathomnet/api/stats.py index 065b2c4..6b4d1e6 100644 --- a/fathomnet/api/stats.py +++ b/fathomnet/api/stats.py @@ -5,10 +5,10 @@ class Stats(EndpointManager): - PATH = 'stats' + PATH = "stats" def most_popular_searches() -> List[str]: """Get a list of the most popular searches.""" - res_json = Stats.get('list/popular/searches') + res_json = Stats.get("list/popular/searches") return res_json diff --git a/fathomnet/api/tags.py b/fathomnet/api/tags.py index 58600a4..f8160aa 100644 --- a/fathomnet/api/tags.py +++ b/fathomnet/api/tags.py @@ -6,14 +6,14 @@ class Tags(EndpointManager): - PATH = 'tags' + PATH = "tags" -def create_with_dto(tag: dto.TagDTO, auth_header: Optional[dto.AuthHeader] = None) -> dto.TagDTO: +def create_with_dto( + tag: dto.TagDTO, auth_header: Optional[dto.AuthHeader] = None +) -> dto.TagDTO: """Create a tag.""" - res_json = Tags.post('', - json=tag.to_dict(), - auth=auth_header) + res_json = Tags.post("", json=tag.to_dict(), auth=auth_header) return dto.TagDTO.from_dict(res_json) @@ -25,15 +25,15 @@ def find_by_uuid(uuid: str) -> dto.TagDTO: def find_by_image_uuid_and_key(image_uuid: str, key: str) -> List[dto.TagDTO]: """Get a tag by image UUID and key.""" - res_json = Tags.get('query/bykey/{}/{}'.format(image_uuid, key)) + res_json = Tags.get("query/bykey/{}/{}".format(image_uuid, key)) return list(map(dto.TagDTO.from_dict, res_json)) -def update(uuid: str, tag: dto.TagDTO, auth_header: Optional[dto.AuthHeader] = None) -> dto.TagDTO: +def update( + uuid: str, tag: dto.TagDTO, auth_header: Optional[dto.AuthHeader] = None +) -> dto.TagDTO: """Update a tag.""" - res_json = Tags.put(uuid, - json=tag.to_dict(), - auth=auth_header) + res_json = Tags.put(uuid, json=tag.to_dict(), auth=auth_header) return dto.TagDTO.from_dict(res_json) diff --git a/fathomnet/api/taxa.py b/fathomnet/api/taxa.py index 341d5b4..67396ae 100644 --- a/fathomnet/api/taxa.py +++ b/fathomnet/api/taxa.py @@ -7,34 +7,38 @@ class Taxa(EndpointManager): - PATH = 'taxa' + PATH = "taxa" def index() -> str: """Get the taxa index page.""" - res = Taxa.get('', parse_json=False) + res = Taxa.get("", parse_json=False) return res.text def list_taxa_providers() -> List[str]: """Get a list of all taxa providers.""" - res_json = Taxa.get('list/providers') + res_json = Taxa.get("list/providers") return res_json def find_children(provider_name: str, concept: str) -> List[dto.Taxa]: """Find the taxonomic children for a concept according to a taxa provider.""" - res_json = Taxa.get('query/children/{}/{}'.format(quote(provider_name), quote(concept))) + res_json = Taxa.get( + "query/children/{}/{}".format(quote(provider_name), quote(concept)) + ) return list(map(dto.Taxa.from_dict, res_json)) def find_parent(provider_name: str, concept: str) -> dto.Taxa: """Find the taxonomic parent for a concept according to a taxa provider.""" - res_json = Taxa.get('query/parent/{}/{}'.format(quote(provider_name), quote(concept))) + res_json = Taxa.get( + "query/parent/{}/{}".format(quote(provider_name), quote(concept)) + ) return dto.Taxa.from_dict(res_json) def find_taxa(provider_name: str, concept: str) -> List[dto.Taxa]: """Get a list of all taxonomic descendants of a concept (including the concept itself) according to a taxa provider.""" - res_json = Taxa.get('query/{}/{}'.format(quote(provider_name), quote(concept))) + res_json = Taxa.get("query/{}/{}".format(quote(provider_name), quote(concept))) return list(map(dto.Taxa.from_dict, res_json)) diff --git a/fathomnet/api/topics.py b/fathomnet/api/topics.py index 165bfa6..25a6efd 100644 --- a/fathomnet/api/topics.py +++ b/fathomnet/api/topics.py @@ -2,33 +2,41 @@ from typing import List, Optional -from fathomnet.api import EndpointManager from fathomnet import dto +from fathomnet.api import EndpointManager class Topics(EndpointManager): - PATH = 'topics' + PATH = "topics" -def create(topic: dto.Topic, auth_header: Optional[dto.AuthHeader] = None) -> dto.FollowedTopic: +def create( + topic: dto.Topic, auth_header: Optional[dto.AuthHeader] = None +) -> dto.FollowedTopic: """Follow a topic.""" - res_json = Topics.post('', json=topic.to_dict(), auth=auth_header) + res_json = Topics.post("", json=topic.to_dict(), auth=auth_header) return dto.FollowedTopic.from_dict(res_json) -def find_by_uuid(uuid: str, auth_header: Optional[dto.AuthHeader] = None) -> dto.FollowedTopic: +def find_by_uuid( + uuid: str, auth_header: Optional[dto.AuthHeader] = None +) -> dto.FollowedTopic: """Get a followed topic by uuid.""" res_json = Topics.get(uuid, auth=auth_header) return dto.FollowedTopic.from_dict(res_json) -def update(uuid: str, topic: dto.Topic, auth_header: Optional[dto.AuthHeader] = None) -> dto.FollowedTopic: +def update( + uuid: str, topic: dto.Topic, auth_header: Optional[dto.AuthHeader] = None +) -> dto.FollowedTopic: """Update a followed topic.""" res_json = Topics.put(uuid, json=topic.to_dict(), auth=auth_header) return dto.FollowedTopic.from_dict(res_json) -def delete(uuid: str, auth_header: Optional[dto.AuthHeader] = None) -> dto.FollowedTopic: +def delete( + uuid: str, auth_header: Optional[dto.AuthHeader] = None +) -> dto.FollowedTopic: """Unfollow a topic.""" res_json = Topics.delete(uuid, auth=auth_header) return dto.FollowedTopic.from_dict(res_json) @@ -36,11 +44,13 @@ def delete(uuid: str, auth_header: Optional[dto.AuthHeader] = None) -> dto.Follo def find(auth_header: Optional[dto.AuthHeader] = None) -> List[dto.FollowedTopic]: """Get a list of followed topics.""" - res_json = Topics.get('', auth=auth_header) + res_json = Topics.get("", auth=auth_header) return list(map(dto.FollowedTopic.from_dict, res_json)) -def find_by_email(email: str, auth_header: Optional[dto.AuthHeader] = None) -> List[dto.FollowedTopic]: +def find_by_email( + email: str, auth_header: Optional[dto.AuthHeader] = None +) -> List[dto.FollowedTopic]: """(Admin) Get a list of followed topics by email.""" - res_json = Topics.get('query/email/{}'.format(email), auth=auth_header) + res_json = Topics.get("query/email/{}".format(email), auth=auth_header) return list(map(dto.FollowedTopic.from_dict, res_json)) diff --git a/fathomnet/api/users.py b/fathomnet/api/users.py index 4237be8..643d82f 100644 --- a/fathomnet/api/users.py +++ b/fathomnet/api/users.py @@ -7,124 +7,163 @@ class Users(EndpointManager): - PATH = 'users' + PATH = "users" -def find_all(pageable: Optional[dto.Pageable] = None, auth_header: Optional[dto.AuthHeader] = None) -> List[dto.FathomnetIdentity]: +def find_all( + pageable: Optional[dto.Pageable] = None, + auth_header: Optional[dto.AuthHeader] = None, +) -> List[dto.FathomnetIdentity]: """Get a paged list of all users.""" - res_json = Users.get('list/users', params=pageable.to_params() if pageable else None, auth=auth_header) - return list(map(dto.FathomnetIdentity.from_dict, res_json.get('content', []))) - - -def find_all_admin(pageable: Optional[dto.Pageable] = None, auth_header: Optional[dto.AuthHeader] = None) -> List[dto.FathomnetIdentity]: + res_json = Users.get( + "list/users", + params=pageable.to_params() if pageable else None, + auth=auth_header, + ) + return list(map(dto.FathomnetIdentity.from_dict, res_json.get("content", []))) + + +def find_all_admin( + pageable: Optional[dto.Pageable] = None, + auth_header: Optional[dto.AuthHeader] = None, +) -> List[dto.FathomnetIdentity]: """(Admin) Get a paged list of all users.""" - res_json = Users.get('', params=pageable.to_params() if pageable else None, auth=auth_header) - return list(map(dto.FathomnetIdentity.from_dict, res_json.get('content', []))) + res_json = Users.get( + "", params=pageable.to_params() if pageable else None, auth=auth_header + ) + return list(map(dto.FathomnetIdentity.from_dict, res_json.get("content", []))) -def update_user_data(fathomnet_id_mutation: dto.FathomnetIdMutation, auth_header: Optional[dto.AuthHeader] = None) -> dto.FathomnetIdentity: +def update_user_data( + fathomnet_id_mutation: dto.FathomnetIdMutation, + auth_header: Optional[dto.AuthHeader] = None, +) -> dto.FathomnetIdentity: """Update a user's account data.""" - res_json = Users.put('', json=fathomnet_id_mutation.to_dict(), auth=auth_header) + res_json = Users.put("", json=fathomnet_id_mutation.to_dict(), auth=auth_header) return dto.FathomnetIdentity.from_dict(res_json) -def update_user_data_admin(uuid: str, fathomnet_id_admin_mutation: dto.FathomnetIdAdminMutation, auth_header: Optional[dto.AuthHeader] = None) -> dto.FathomnetIdentity: +def update_user_data_admin( + uuid: str, + fathomnet_id_admin_mutation: dto.FathomnetIdAdminMutation, + auth_header: Optional[dto.AuthHeader] = None, +) -> dto.FathomnetIdentity: """(Admin) Update a user's account data.""" - res_json = Users.put('admin/{}'.format(uuid), json=fathomnet_id_admin_mutation.to_dict(), auth=auth_header) + res_json = Users.put( + "admin/{}".format(uuid), + json=fathomnet_id_admin_mutation.to_dict(), + auth=auth_header, + ) return dto.FathomnetIdentity.from_dict(res_json) def get_api_key(auth_header: Optional[dto.AuthHeader] = None) -> dto.ApiKey: """Get a user's API key.""" - res_json = Users.get('apikey', auth=auth_header) + res_json = Users.get("apikey", auth=auth_header) return dto.ApiKey.from_dict(res_json) def create_new_api_key(auth_header: Optional[dto.AuthHeader] = None) -> dto.ApiKey: """Create a new API key for a user.""" - res_json = Users.post('apikey', json=None, auth=auth_header) + res_json = Users.post("apikey", json=None, auth=auth_header) return dto.ApiKey.from_dict(res_json) def delete_api_key(auth_header: Optional[dto.AuthHeader] = None): """Delete a user's API key.""" - Users.delete('apikey', auth=auth_header) + Users.delete("apikey", auth=auth_header) def count_all() -> dto.Count: """Get a count of all users.""" - res_json = Users.get('count') + res_json = Users.get("count") return dto.Count.from_dict(res_json) def disable_by_uuid(uuid: str, auth_header: Optional[dto.AuthHeader] = None): """(Admin) Disable an account by its UUID.""" - res_json = Users.put('disable/{}'.format(uuid), auth=auth_header) + res_json = Users.put("disable/{}".format(uuid), auth=auth_header) return dto.FathomnetIdentity.from_dict(res_json) def find_expertise() -> List[str]: """Get a list of all expertise levels.""" - res_json = Users.get('list/expertise') + res_json = Users.get("list/expertise") return res_json def find_contributors_names() -> List[str]: """Get a list of all contributor names.""" - res_json = Users.get('list/names') + res_json = Users.get("list/names") return res_json def find_roles() -> List[str]: """Get a list of all user roles.""" - res_json = Users.get('list/roles') + res_json = Users.get("list/roles") return res_json -def find_by_authentication(auth_header: Optional[dto.AuthHeader] = None) -> dto.FathomnetIdentity: +def find_by_authentication( + auth_header: Optional[dto.AuthHeader] = None, +) -> dto.FathomnetIdentity: """Find a user by authentication.""" - res_json = Users.get('query', auth=auth_header) + res_json = Users.get("query", auth=auth_header) return dto.FathomnetIdentity.from_dict(res_json) -def find_by_email(email: str, auth_header: Optional[dto.AuthHeader] = None) -> dto.FathomnetIdentity: +def find_by_email( + email: str, auth_header: Optional[dto.AuthHeader] = None +) -> dto.FathomnetIdentity: """Find a user by email.""" - res_json = Users.get('query/email/{}'.format(email), auth=auth_header) + res_json = Users.get("query/email/{}".format(email), auth=auth_header) return dto.FathomnetIdentity.from_dict(res_json) -def find_by_firebase_uid(uid: str, auth_header: Optional[dto.AuthHeader] = None) -> dto.FathomnetIdentity: +def find_by_firebase_uid( + uid: str, auth_header: Optional[dto.AuthHeader] = None +) -> dto.FathomnetIdentity: """Find a user by Firebase UID.""" - res_json = Users.get('query/uid/{}'.format(uid), auth=auth_header) + res_json = Users.get("query/uid/{}".format(uid), auth=auth_header) return dto.FathomnetIdentity.from_dict(res_json) def verify(auth_header: Optional[dto.AuthHeader] = None) -> dto.Authentication: """Get the contents of an authorization token.""" - res_json = Users.get('verification', auth=auth_header) + res_json = Users.get("verification", auth=auth_header) return dto.Authentication.from_dict(res_json) -def find_by_display_name(display_name: str, pageable: Optional[dto.Pageable] = None) -> List[dto.FathomnetIdentity]: +def find_by_display_name( + display_name: str, pageable: Optional[dto.Pageable] = None +) -> List[dto.FathomnetIdentity]: """Find a user by display name.""" - res_json = Users.get('query/name/{}'.format(quote(display_name)), params=pageable.to_params() if pageable else None) + res_json = Users.get( + "query/name/{}".format(quote(display_name)), + params=pageable.to_params() if pageable else None, + ) print(res_json) return list(map(dto.FathomnetIdentity.from_dict, res_json)) -def find_by_organization(organization: str, pageable: Optional[dto.Pageable] = None) -> List[dto.FathomnetIdentity]: +def find_by_organization( + organization: str, pageable: Optional[dto.Pageable] = None +) -> List[dto.FathomnetIdentity]: """Find a user by organization.""" - res_json = Users.get('query/organization/{}'.format(quote(organization)), params=pageable.to_params() if pageable else None) + res_json = Users.get( + "query/organization/{}".format(quote(organization)), + params=pageable.to_params() if pageable else None, + ) return list(map(dto.FathomnetIdentity.from_dict, res_json)) def find_by_uuid(uuid: str) -> dto.FathomnetIdentity: """Find a user by UUID.""" - res_json = Users.get('query/uuid/{}'.format(uuid)) + res_json = Users.get("query/uuid/{}".format(uuid)) return dto.FathomnetIdentity.from_dict(res_json) def find_badges_by_uuid(uuid: str) -> List[dto.Badge]: """Find a user's badges by UUID.""" - res_json = Users.get('badges/{}'.format(uuid)) + res_json = Users.get("badges/{}".format(uuid)) return list(map(dto.Badge.from_dict, res_json)) diff --git a/fathomnet/api/worms.py b/fathomnet/api/worms.py index db08684..7cdacf9 100644 --- a/fathomnet/api/worms.py +++ b/fathomnet/api/worms.py @@ -5,99 +5,101 @@ class Worms(EndpointManager): - ROOT = 'https://fathomnet.org' - PATH = 'worms' + ROOT = "https://fathomnet.org" + PATH = "worms" def count_names() -> int: """Get the total number of names available.""" - return int(Worms.get('names/count')) + return int(Worms.get("names/count")) def get_all_names(limit: int = 100, offset: int = 0) -> List[str]: """Get all names.""" - res_json = Worms.get('names', params={'limit': limit, 'offset': offset}) - return res_json['items'] + res_json = Worms.get("names", params={"limit": limit, "offset": offset}) + return res_json["items"] def get_names_by_aphia_id(aphia_id: int) -> WormsNames: """Get the names data for a given Aphia ID.""" - res_json = Worms.get(f'names/aphiaid/{aphia_id}') + res_json = Worms.get(f"names/aphiaid/{aphia_id}") return WormsNames.from_dict(res_json) def get_ancestors_names(name: str) -> List[str]: """Get all ancestors' names of a given name.""" - return Worms.get(f'ancestors/{name}') + return Worms.get(f"ancestors/{name}") def get_children_names(name: str) -> List[str]: """Get all children's names of a given name.""" - return Worms.get(f'children/{name}') + return Worms.get(f"children/{name}") def get_descendants_names(name: str, accepted: bool = False) -> List[str]: """Get all descendants' names of a given name.""" - return Worms.get(f'descendants/{name}', params={'accepted': accepted}) + return Worms.get(f"descendants/{name}", params={"accepted": accepted}) def get_parent_name(name: str) -> str: """Get the parent's name of a given name.""" - return Worms.get(f'parent/{name}') + return Worms.get(f"parent/{name}") def find_names_containing(fragment: str) -> List[str]: """Get all names that contain a fragment.""" - return Worms.get(f'query/contains/{fragment}') + return Worms.get(f"query/contains/{fragment}") def find_names_by_prefix(prefix: str) -> List[str]: """Get all names that start with a prefix.""" - return Worms.get(f'query/startswith/{prefix}') + return Worms.get(f"query/startswith/{prefix}") def get_synonyms_for_name(name: str) -> List[str]: """Get all synonyms for a name.""" - return Worms.get(f'synonyms/{name}') + return Worms.get(f"synonyms/{name}") def get_ancestors(name: str) -> WormsNode: """Get a taxa tree from the root node to the node for the given name.""" - res_json = Worms.get(f'taxa/ancestors/{name}') + res_json = Worms.get(f"taxa/ancestors/{name}") return WormsNode.from_dict(res_json) def get_children(name: str) -> List[WormsNode]: """Get the child taxa nodes of a given name.""" - res_json = Worms.get(f'taxa/children/{name}') + res_json = Worms.get(f"taxa/children/{name}") return [WormsNode.from_dict(item) for item in res_json] def get_descendants(name: str) -> WormsNode: """Get a taxa tree from the given name to the leaves.""" - res_json = Worms.get(f'taxa/descendants/{name}') + res_json = Worms.get(f"taxa/descendants/{name}") return WormsNode.from_dict(res_json) def get_parent(name: str) -> WormsNode: """Get the parent taxa node of a given name.""" - res_json = Worms.get(f'taxa/parent/{name}') + res_json = Worms.get(f"taxa/parent/{name}") return WormsNode.from_dict(res_json) def get_info(name: str) -> WormsNode: """Get a taxa node for a given name.""" - res_json = Worms.get(f'taxa/info/{name}') + res_json = Worms.get(f"taxa/info/{name}") return WormsNode.from_dict(res_json) -def find_taxa_by_prefix(prefix: str, rank: str = None, parent: str = None) -> List[WormsNode]: +def find_taxa_by_prefix( + prefix: str, rank: str = None, parent: str = None +) -> List[WormsNode]: """Get all taxa nodes that start with a prefix.""" params = {} if rank is not None: - params['rank'] = rank + params["rank"] = rank if parent is not None: - params['parent'] = parent + params["parent"] = parent - res_json = Worms.get(f'taxa/query/startswith/{prefix}', params=params) + res_json = Worms.get(f"taxa/query/startswith/{prefix}", params=params) return [WormsNode.from_dict(item) for item in res_json] diff --git a/fathomnet/api/xapikey.py b/fathomnet/api/xapikey.py index 2d0f084..d9e010c 100644 --- a/fathomnet/api/xapikey.py +++ b/fathomnet/api/xapikey.py @@ -2,16 +2,18 @@ from typing import Optional from fathomnet import dto -from fathomnet.api import EndpointManager, SESSION +from fathomnet.api import SESSION, EndpointManager class XApiKey(EndpointManager): - PATH = 'xapikey' + PATH = "xapikey" def auth(x_api_key_token: str) -> dto.AuthHeader: """Exchange an X-API-key token for a JWT.""" - res_json = XApiKey.post('auth', headers={'X-API-Key': x_api_key_token}) # TODO figure out request body + res_json = XApiKey.post( + "auth", headers={"X-API-Key": x_api_key_token} + ) # TODO figure out request body auth_header = dto.AuthHeader.from_dict(res_json) SESSION.auth = auth_header # Update session auth return auth_header @@ -19,5 +21,5 @@ def auth(x_api_key_token: str) -> dto.AuthHeader: def index(auth_header: Optional[dto.AuthHeader] = None): """Test a JWT to ensure it's valid.""" - res_json = XApiKey.get('test', auth=auth_header) + res_json = XApiKey.get("test", auth=auth_header) return dto.Message.from_dict(res_json) diff --git a/fathomnet/dto.py b/fathomnet/dto.py index 4502ad7..dec09e1 100644 --- a/fathomnet/dto.py +++ b/fathomnet/dto.py @@ -1,12 +1,13 @@ # dto.py (fathomnet-py) +import os from dataclasses import dataclass -from dataclasses_json import dataclass_json from enum import Enum from typing import List, Optional -from requests.auth import AuthBase + +from dataclasses_json import dataclass_json from lxml import etree from lxml.builder import E -import os +from requests.auth import AuthBase @dataclass_json @@ -31,14 +32,16 @@ class AImageDTO: modified: Optional[str] = None sha256: Optional[str] = None contributorsEmail: Optional[str] = None - tags: Optional[List['ATagDTO']] = None + tags: Optional[List["ATagDTO"]] = None timestamp: Optional[str] = None width: Optional[int] = None - boundingBoxes: Optional[List['ABoundingBoxDTO']] = None + boundingBoxes: Optional[List["ABoundingBoxDTO"]] = None createdTimestamp: Optional[str] = None lastUpdatedTimestamp: Optional[str] = None - def to_pascal_voc(self, path: Optional[str] = None, pretty_print: bool = False) -> str: + def to_pascal_voc( + self, path: Optional[str] = None, pretty_print: bool = False + ) -> str: """Convert to a Pascal VOC. :param path: Path to the image file, defaults to using the image URL if available @@ -51,7 +54,9 @@ def to_pascal_voc(self, path: Optional[str] = None, pretty_print: bool = False) """ if path is None: # If no path provided, use URL if self.url is None: - raise ValueError('Either the path argument or the image URL must be specified.') + raise ValueError( + "Either the path argument or the image URL must be specified." + ) path = self.url # Parse the folder name and file name @@ -62,17 +67,26 @@ def to_pascal_voc(self, path: Optional[str] = None, pretty_print: bool = False) boxes = self.boundingBoxes or [] objects = [ E.object( - E.name(box.concept + (' {}'.format(box.altConcept) if box.altConcept is not None else '')), - E.pose('Unspecified'), - E.truncated(str(int(box.truncated) if box.truncated is not None else 0)), - E.difficult('0'), + E.name( + box.concept + + ( + " {}".format(box.altConcept) + if box.altConcept is not None + else "" + ) + ), + E.pose("Unspecified"), + E.truncated( + str(int(box.truncated) if box.truncated is not None else 0) + ), + E.difficult("0"), E.occluded(str(int(box.occluded) if box.occluded is not None else 0)), E.bndbox( E.xmin(str(box.x)), E.xmax(str(box.x + box.width)), E.ymin(str(box.y)), - E.ymax(str(box.y + box.height)) - ) + E.ymax(str(box.y + box.height)), + ), ) for box in boxes ] @@ -82,15 +96,9 @@ def to_pascal_voc(self, path: Optional[str] = None, pretty_print: bool = False) E.folder(folder_name), E.filename(base_name), E.path(path), - E.source( - E.database('FathomNet') - ), - E.size( - E.width(str(self.width)), - E.height(str(self.height)), - E.depth('3') - ), - E.segmented('0'), + E.source(E.database("FathomNet")), + E.size(E.width(str(self.width)), E.height(str(self.height)), E.depth("3")), + E.segmented("0"), *objects ) @@ -156,7 +164,7 @@ class AuthHeader(AuthBase): @property def auth_dict(self): - return {'Authorization': '{} {}'.format(self.type, self.token)} + return {"Authorization": "{} {}".format(self.type, self.token)} def __call__(self, r): r.headers.update(self.auth_dict) @@ -183,7 +191,7 @@ class BoundingBox: userDefinedKey: Optional[str] = None concept: Optional[str] = None altConcept: Optional[str] = None - image: Optional['Image'] = None + image: Optional["Image"] = None groupOf: Optional[bool] = None height: Optional[int] = None occluded: Optional[bool] = None @@ -245,9 +253,9 @@ class BImageSetUploadDTO: localPath: Optional[str] = None remoteUri: Optional[str] = None sha256: Optional[str] = None - format: Optional['ImageSetUpload.UploadFormat'] = None + format: Optional["ImageSetUpload.UploadFormat"] = None contributorsEmail: Optional[str] = None - status: Optional['ImageSetUpload.Status'] = None + status: Optional["ImageSetUpload.Status"] = None statusUpdaterEmail: Optional[str] = None statusUpdateTimestamp: Optional[str] = None rejectionReason: Optional[str] = None @@ -288,7 +296,7 @@ class DarwinCore: institutionID: Optional[str] = None recordReferences: Optional[str] = None rightsHolder: Optional[str] = None - imageSetUpload: Optional['ImageSetUpload'] = None + imageSetUpload: Optional["ImageSetUpload"] = None @dataclass_json @@ -327,7 +335,7 @@ class FathomnetIdentity: disabled: Optional[bool] = None expertiseRank: Optional[str] = None displayName: Optional[str] = None - roles: Optional[List['Roles']] = None + roles: Optional[List["Roles"]] = None orcid: Optional[str] = None notificationFrequency: Optional[str] = None @@ -399,24 +407,24 @@ class Image: contributorsEmail: Optional[str] = None timestamp: Optional[str] = None width: Optional[int] = None - tags: Optional[List['Tag']] = None + tags: Optional[List["Tag"]] = None boundingBoxes: Optional[List[BoundingBox]] = None createdTimestamp: Optional[str] = None lastUpdatedTimestamp: Optional[str] = None - imageSetUploads: Optional[List['ImageSetUpload']] = None + imageSetUploads: Optional[List["ImageSetUpload"]] = None @dataclass_json @dataclass class ImageSetUpload: class Status(Enum): - PENDING = 'PENDING' - ACCEPTED = 'ACCEPTED' - REJECTED = 'REJECTED' + PENDING = "PENDING" + ACCEPTED = "ACCEPTED" + REJECTED = "REJECTED" class UploadFormat(Enum): - CSV = 'CSV' - UNSUPPORTED = 'UNSUPPORTED' + CSV = "CSV" + UNSUPPORTED = "UNSUPPORTED" id: Optional[int] = None uuid: Optional[str] = None @@ -472,8 +480,8 @@ class Sort: @dataclass class Order: class Direction(Enum): - ASC = 'ASC' - DESC = 'DESC' + ASC = "ASC" + DESC = "DESC" ignoreCase: Optional[bool] = None direction: Optional[Direction] = None @@ -497,16 +505,21 @@ def to_params(self) -> List[tuple]: """Make a list of paging parameters to be passed into a request.""" params = [] if self.size is not None: - params.append(('size', self.size)) + params.append(("size", self.size)) if self.number is not None: - params.append(('page', self.number)) + params.append(("page", self.number)) if self.sort is not None: for order in self.sort.orderBy: - params.append(('sort', order.property)) + params.append(("sort", order.property)) return params @classmethod - def from_params(cls, size: Optional[int] = None, page: Optional[int] = None, sort_keys: Optional[List[str]] = None): + def from_params( + cls, + size: Optional[int] = None, + page: Optional[int] = None, + sort_keys: Optional[List[str]] = None, + ): """Make a Pageable instance from paging parameters.""" pageable = cls() pageable.size = size @@ -519,12 +532,12 @@ def from_params(cls, size: Optional[int] = None, page: Optional[int] = None, sor class Roles(Enum): - ADMIN = 'ADMIN' - MODERATOR = 'MODERATOR' - READ = 'READ' - UNKNOWN = 'UNKNOWN' - UPDATE = 'UPDATE' - WRITE = 'WRITE' + ADMIN = "ADMIN" + MODERATOR = "MODERATOR" + READ = "READ" + UNKNOWN = "UNKNOWN" + UPDATE = "UPDATE" + WRITE = "WRITE" @dataclass_json @@ -610,7 +623,7 @@ class WormsNode: aphiaId: Optional[int] = None acceptedAphiaId: Optional[int] = None alternateNames: Optional[List[str]] = None - children: Optional[List['WormsNode']] = None + children: Optional[List["WormsNode"]] = None @dataclass_json diff --git a/fathomnet/models/__init__.py b/fathomnet/models/__init__.py index b5b32bf..f613068 100644 --- a/fathomnet/models/__init__.py +++ b/fathomnet/models/__init__.py @@ -1,8 +1,12 @@ # __init__.py (fathomnet-py) try: - import torch + import torch # noqa: F401 except ImportError: - raise ImportError("You must install the 'models' extra to use the models subpackage: pip install fathomnet[models]") + raise ImportError( + "You must install the 'models' extra to use the models subpackage: pip install fathomnet[models]" + ) from fathomnet.models.yolov5 import MBARIMBBenthicModel + +__all__ = ["MBARIMBBenthicModel"] diff --git a/fathomnet/models/bases.py b/fathomnet/models/bases.py index 3498d7e..fb5f31f 100644 --- a/fathomnet/models/bases.py +++ b/fathomnet/models/bases.py @@ -2,13 +2,11 @@ from abc import ABC, abstractmethod from pathlib import Path -from typing import Any, Iterable, TypeVar, Union +from typing import TypeVar, Union import cv2 import numpy as np -from fathomnet.dto import BoundingBox - T = TypeVar("T") @@ -16,6 +14,7 @@ class ImageModel(ABC): """ Abstract base class for image models. Defines the interface for models that can be used to predict on images. """ + @abstractmethod def _predict(self, image: Union[np.ndarray, Path]) -> T: raise NotImplementedError diff --git a/fathomnet/models/yolov5.py b/fathomnet/models/yolov5.py index 4a075b5..e3e1dbb 100644 --- a/fathomnet/models/yolov5.py +++ b/fathomnet/models/yolov5.py @@ -6,8 +6,8 @@ import numpy as np import requests -from torch.hub import load from appdirs import user_cache_dir +from torch.hub import load from fathomnet.dto import BoundingBox from fathomnet.models.bases import ImageModel @@ -17,6 +17,7 @@ class YOLOv5Model(ImageModel): """ YOLOv5 object detection model. Uses a .pt file and performs object detection on images. """ + def __init__(self, weights: Path) -> None: super().__init__() @@ -32,7 +33,7 @@ def _predict(self, image: np.ndarray) -> Iterable[BoundingBox]: x=int(x1), y=int(y1), width=int(x2 - x1), - height=int(y2 - y1) + height=int(y2 - y1), ) @@ -40,6 +41,7 @@ class MBARIMBBenthicModel(YOLOv5Model): """ MBARI Monterey Bay Benthic Object Detector. """ + WEIGHTS_URL = "https://zenodo.org/record/5539915/files/mbari-mb-benthic-33k.pt" def __init__(self) -> None: @@ -47,7 +49,11 @@ def __init__(self) -> None: mbari_mb_benthic_dir = cache_dir / "mbari-mb-benthic" weights = mbari_mb_benthic_dir / "mbari-mb-benthic-33k.pt" if not weights.exists(): - print("Downloading MBARI Monterey Bay Benthic Object Detector weights to {}".format(weights)) + print( + "Downloading MBARI Monterey Bay Benthic Object Detector weights to {}".format( + weights + ) + ) weights.parent.mkdir(parents=True, exist_ok=True) with requests.get(self.WEIGHTS_URL, stream=True) as r: r.raise_for_status() diff --git a/fathomnet/scripts/fathomnet_generate.py b/fathomnet/scripts/fathomnet_generate.py index b980d1e..4febbc0 100644 --- a/fathomnet/scripts/fathomnet_generate.py +++ b/fathomnet/scripts/fathomnet_generate.py @@ -3,24 +3,26 @@ """ import argparse -import os -import logging import datetime -from typing import Iterable, List, Optional +import logging +import os from dataclasses import dataclass, replace -import requests from shutil import copyfileobj -import progressbar +from typing import Iterable, List, Optional -from coco_lib.common import Info as COCOInfo, Image as COCOImage, License as COCOLicense +import progressbar +import requests +import yaml +from coco_lib.common import Image as COCOImage +from coco_lib.common import Info as COCOInfo +from coco_lib.common import License as COCOLicense from coco_lib.objectdetection import ( ObjectDetectionAnnotation, ObjectDetectionCategory, ObjectDetectionDataset, ) -import yaml -from ..api import images, taxa, darwincore, worms +from ..api import darwincore, images, taxa, worms from ..dto import AImageDTO, GeoImageConstraints @@ -38,7 +40,7 @@ class Arguments: def comma_list(s: str) -> List[str]: """Parse a comma-separated list of strings""" - return s.split(',') + return s.split(",") def lowercase_str(s: str) -> str: @@ -73,7 +75,7 @@ def generate_constraints( def write_voc(image: AImageDTO, filename: str): """Write a single image to a file""" - with open(filename, 'w') as f: + with open(filename, "w") as f: f.write(image.to_pascal_voc(pretty_print=True)) @@ -89,7 +91,7 @@ def download_imgs(args: Arguments, ims: List[AImageDTO]): if not os.path.exists(file_name): resp = requests.get(image.url, stream=True) resp.raw.decode_content = True - with open(file_name, 'wb') as f: + with open(file_name, "wb") as f: copyfileobj(resp.raw, f) flag += 1 @@ -104,16 +106,16 @@ def get_images(args: Arguments) -> Optional[List[AImageDTO]]: image_uuid_dict = {} if args.concepts: # Concepts specified, generate constraints for each # Print concepts specified - logging.info('Concept(s) specified:') + logging.info("Concept(s) specified:") for concept in args.concepts: - logging.info('- {}'.format(concept)) + logging.info("- {}".format(concept)) # Get the image data logging.info( - 'Fetching image records for {} concept(s)...'.format(len(args.concepts)) + "Fetching image records for {} concept(s)...".format(len(args.concepts)) ) for constraints in generate_constraints(args.concepts, args.base_constraints): - logging.debug('Constraints: {}'.format(constraints.to_json(indent=2))) + logging.debug("Constraints: {}".format(constraints.to_json(indent=2))) concept_images = find_images_paged(constraints) for image in concept_images: image_uuid_dict[image.uuid] = image @@ -128,7 +130,7 @@ def get_images(args: Arguments) -> Optional[List[AImageDTO]]: if box.concept in args.concepts ] else: # No concepts specified, use the base constraints - logging.info('Fetching image records...') + logging.info("Fetching image records...") noconcept_images = find_images_paged(args.base_constraints) for image in noconcept_images: image_uuid_dict[image.uuid] = image @@ -139,7 +141,7 @@ def get_images(args: Arguments) -> Optional[List[AImageDTO]]: } logging.info( - 'Found {} unique images with bounding boxes'.format(len(image_uuid_dict)) + "Found {} unique images with bounding boxes".format(len(image_uuid_dict)) ) # Compute the number of bounding boxes per concept @@ -151,21 +153,21 @@ def get_images(args: Arguments) -> Optional[List[AImageDTO]]: # Print table of bounding box counts for each concept if counting: if not concept_counts: - print('No bounding boxes found') + print("No bounding boxes found") else: longest_concept = max(concept_counts.keys(), key=len) - concept_header = 'concept' + concept_header = "concept" concept_len = len(longest_concept) + 1 concept_len = max(concept_len, len(concept_header) + 1) - count_header = '# boxes' + count_header = "# boxes" count_len = 9 count_len = max(count_len, len(count_header) + 1) - format_str = '{:<' + str(concept_len) + '}|{:>' + str(count_len) + '}' + format_str = "{:<" + str(concept_len) + "}|{:>" + str(count_len) + "}" print(format_str.format(concept_header, count_header)) - print(format_str.format('-' * concept_len, '-' * 9)) + print(format_str.format("-" * concept_len, "-" * 9)) for concept in sorted(concept_counts.keys()): count = concept_counts[concept] print(format_str.format(concept, count)) @@ -181,15 +183,15 @@ def generate_voc_dataset(ims: List[AImageDTO], output_dir: str) -> bool: # Write images to output directory for image in ims: - filename = '{}.{}'.format(image.uuid, 'xml') + filename = "{}.{}".format(image.uuid, "xml") filename = os.path.join(output_dir, filename) - logging.debug('Writing VOC {}'.format(filename)) + logging.debug("Writing VOC {}".format(filename)) try: write_voc(image, filename) except OSError as e: - logging.error('Error writing {}: {}'.format(filename, e)) + logging.error("Error writing {}: {}".format(filename, e)) error_flag = True - logging.info('Wrote {} VOC files to {}'.format(len(ims), output_dir)) + logging.info("Wrote {} VOC files to {}".format(len(ims), output_dir)) return error_flag @@ -198,22 +200,22 @@ def generate_coco_dataset(ims: List[AImageDTO], output_dir: str) -> bool: # Describe the dataset coco_info = COCOInfo( year=datetime.datetime.now().year, - version='0', - description='Generated by FathomNet', - contributor='FathomNet', - url='https://fathomnet.org', + version="0", + description="Generated by FathomNet", + contributor="FathomNet", + url="https://fathomnet.org", date_created=datetime.datetime.now(), ) # Set the FathomNet license fathomnet_license = COCOLicense( - id=0, name='FathomNet', url='http://fathomnet.org/fathomnet/#/license' + id=0, name="FathomNet", url="http://fathomnet.org/fathomnet/#/license" ) # Encode categories in sorted order concepts = sorted(set(box.concept for image in ims for box in image.boundingBoxes)) coco_categories = [ - ObjectDetectionCategory(id=idx, name=concept, supercategory='') + ObjectDetectionCategory(id=idx, name=concept, supercategory="") for idx, concept in enumerate(concepts, start=1) ] @@ -230,7 +232,7 @@ def generate_coco_dataset(ims: List[AImageDTO], output_dir: str) -> bool: license=fathomnet_license.id, flickr_url=image.url, coco_url=image.url, - date_captured=datetime.datetime.fromisoformat(image.timestamp.rstrip('Z')) + date_captured=datetime.datetime.fromisoformat(image.timestamp.rstrip("Z")) if image.timestamp is not None else None, ) @@ -259,12 +261,12 @@ def generate_coco_dataset(ims: List[AImageDTO], output_dir: str) -> bool: ) # Write - output_path = os.path.join(output_dir, 'dataset.json') + output_path = os.path.join(output_dir, "dataset.json") try: coco_dataset.save(output_path, indent=2) - logging.info('Wrote COCO dataset to {}'.format(output_path)) + logging.info("Wrote COCO dataset to {}".format(output_path)) except OSError as e: - logging.error('Error writing {}: {}'.format(output_path, e)) + logging.error("Error writing {}: {}".format(output_path, e)) return True return False @@ -277,16 +279,16 @@ def generate_yolo_dataset(ims: List[AImageDTO], output_dir: str) -> bool: # Create the concept -> index mapping concepts = sorted(set(box.concept for image in ims for box in image.boundingBoxes)) concept_to_index = {concept: idx for idx, concept in enumerate(concepts)} - + labels_dir = os.path.join(output_dir, "labels") if not os.path.exists(labels_dir): os.makedirs(labels_dir) - + # Assume images are in a directory called "images" images_dir = os.path.join(output_dir, "images") if not os.path.exists(images_dir): os.makedirs(images_dir) - + # Write annotation YAML file to output directory yaml_data = { "path": output_dir, @@ -299,17 +301,17 @@ def generate_yolo_dataset(ims: List[AImageDTO], output_dir: str) -> bool: with open(yaml_path, "w") as f: f.write(yaml.dump(yaml_data, indent=2, sort_keys=False)) except OSError as e: - logging.error('Error writing {}: {}'.format(yaml_path, e)) + logging.error("Error writing {}: {}".format(yaml_path, e)) return False - logging.info('Wrote dataset YAML to {}'.format(yaml_path)) + logging.info("Wrote dataset YAML to {}".format(yaml_path)) # Write annotation files to output directory for image in ims: - filename = '{}.{}'.format(image.uuid, 'txt') + filename = "{}.{}".format(image.uuid, "txt") filename = os.path.join(labels_dir, filename) - logging.debug('Writing YOLO {}'.format(filename)) + logging.debug("Writing YOLO {}".format(filename)) try: - with open(filename, 'w') as f: + with open(filename, "w") as f: for box in image.boundingBoxes: x_center = box.x + box.width / 2 y_center = box.y + box.height / 2 @@ -318,14 +320,18 @@ def generate_yolo_dataset(ims: List[AImageDTO], output_dir: str) -> bool: width_norm = box.width / image.width height_norm = box.height / image.height f.write( - '{} {} {} {} {}\n'.format( - concept_to_index[box.concept], x_center_norm, y_center_norm, width_norm, height_norm + "{} {} {} {} {}\n".format( + concept_to_index[box.concept], + x_center_norm, + y_center_norm, + width_norm, + height_norm, ) ) except OSError as e: - logging.error('Error writing {}: {}'.format(filename, e)) + logging.error("Error writing {}: {}".format(filename, e)) error_flag = True - logging.info('Wrote {} YOLO files to {}'.format(len(ims), labels_dir)) + logging.info("Wrote {} YOLO files to {}".format(len(ims), labels_dir)) return error_flag @@ -333,9 +339,9 @@ def generate_yolo_dataset(ims: List[AImageDTO], output_dir: str) -> bool: def generate_dataset(args: Arguments, ims: List[AImageDTO]) -> bool: """Call the specified dataset generation function according to the format specified""" dataset_func = { - 'voc': generate_voc_dataset, - 'coco': generate_coco_dataset, - 'yolo': generate_yolo_dataset, + "voc": generate_voc_dataset, + "coco": generate_coco_dataset, + "yolo": generate_yolo_dataset, } return dataset_func[args.format](ims, args.output) @@ -353,128 +359,127 @@ def parse_args() -> Arguments: valid_taxa_providers = taxa.list_taxa_providers() valid_contributor_emails = images.find_distinct_submitter() valid_owner_institution_codes = darwincore.find_owner_institution_codes() - valid_dataset_formats = ['voc', 'coco', 'yolo'] + valid_dataset_formats = ["voc", "coco", "yolo"] - parser.add_argument('-v', action='count', default=0, help='Increase verbosity') + parser.add_argument("-v", action="count", default=0, help="Increase verbosity") parser.add_argument( - '-t', - '--taxa', - dest='taxa', + "-t", + "--taxa", + dest="taxa", type=str, - help='Taxonomy provider (to include descendants). Options: {}'.format( - ', '.join(valid_taxa_providers) + help="Taxonomy provider (to include descendants). Options: {}".format( + ", ".join(valid_taxa_providers) ), ) parser.add_argument( - '--contributor-email', - dest='contributor_email', + "--contributor-email", + dest="contributor_email", type=str, - help='Contributor email', + help="Contributor email", ) parser.add_argument( - '--start', - dest='start_timestamp', + "--start", + dest="start_timestamp", type=datetime.datetime.fromisoformat, - help='Start timestamp (formatted as ISO-8601)', + help="Start timestamp (formatted as ISO-8601)", ) parser.add_argument( - '--end', - dest='end_timestamp', + "--end", + dest="end_timestamp", type=datetime.datetime.fromisoformat, - help='End timestamp (formatted as ISO-8601)', + help="End timestamp (formatted as ISO-8601)", ) parser.add_argument( - '--imaging-types', - dest='imaging_types', + "--imaging-types", + dest="imaging_types", type=comma_list, - help='Comma-separated list of imaging types to include. Options: {}'.format( - ', '.join(valid_imaging_types) + help="Comma-separated list of imaging types to include. Options: {}".format( + ", ".join(valid_imaging_types) ), ) parser.add_argument( - '--exclude-unverified', - dest='include_unverified', - action='store_false', - help='Flag to exclude unverified images', + "--exclude-unverified", + dest="include_unverified", + action="store_false", + help="Flag to exclude unverified images", ) parser.add_argument( - '--exclude-verified', - dest='include_verified', - action='store_false', - help='Flag to exclude verified images', + "--exclude-verified", + dest="include_verified", + action="store_false", + help="Flag to exclude verified images", ) parser.add_argument( - '--min-longitude', dest='min_longitude', type=float, help='Minimum longitude' + "--min-longitude", dest="min_longitude", type=float, help="Minimum longitude" ) parser.add_argument( - '--max-longitude', dest='max_longitude', type=float, help='Maximum longitude' + "--max-longitude", dest="max_longitude", type=float, help="Maximum longitude" ) parser.add_argument( - '--min-latitude', dest='min_latitude', type=float, help='Minimum latitude' + "--min-latitude", dest="min_latitude", type=float, help="Minimum latitude" ) parser.add_argument( - '--max-latitude', dest='max_latitude', type=float, help='Maximum latitude' + "--max-latitude", dest="max_latitude", type=float, help="Maximum latitude" ) parser.add_argument( - '--min-depth', dest='min_depth', type=float, help='Minimum depth' + "--min-depth", dest="min_depth", type=float, help="Minimum depth" ) parser.add_argument( - '--max-depth', dest='max_depth', type=float, help='Maximum depth' + "--max-depth", dest="max_depth", type=float, help="Maximum depth" ) parser.add_argument( - '--institutions', - dest='owner_institution_codes', + "--institutions", + dest="owner_institution_codes", type=comma_list, - help='Comma-separated list of owner institution codes to include', + help="Comma-separated list of owner institution codes to include", ) parser.add_argument( - '-a', - '--all', - dest='all', - action='store_true', - help='Flag to include all bounding boxes of other concepts in specified images', + "-a", + "--all", + dest="all", + action="store_true", + help="Flag to include all bounding boxes of other concepts in specified images", ) parser.add_argument( - '-f' - '--format', - dest='format', + "-f" "--format", + dest="format", type=lowercase_str, - default='voc', + default="voc", choices=valid_dataset_formats, - help='Dataset format. Options: {}'.format(', '.join(valid_dataset_formats)), + help="Dataset format. Options: {}".format(", ".join(valid_dataset_formats)), ) parser.add_argument( - '--img-download', - dest='img_dir', + "--img-download", + dest="img_dir", default=None, type=str, - help='Local directory to download images', + help="Local directory to download images", ) list_or_file = parser.add_mutually_exclusive_group(required=False) list_or_file.add_argument( - '-c', - '--concepts', - dest='concepts', + "-c", + "--concepts", + dest="concepts", type=comma_list, - help='Comma-separated list of concepts to include', + help="Comma-separated list of concepts to include", ) list_or_file.add_argument( - '--concepts-file', - dest='concepts_file', + "--concepts-file", + dest="concepts_file", type=str, - help='File containing newline-delimited list of concepts to include', + help="File containing newline-delimited list of concepts to include", ) count_or_output = parser.add_mutually_exclusive_group(required=True) count_or_output.add_argument( - '--count', - dest='count', - action='store_true', - help='Count images and bounding boxes instead of generating a dataset', + "--count", + dest="count", + action="store_true", + help="Count images and bounding boxes instead of generating a dataset", ) count_or_output.add_argument( - '-o', '--output', dest='output', type=str, help='Output directory' + "-o", "--output", dest="output", type=str, help="Output directory" ) # Parse arguments @@ -495,7 +500,7 @@ def parse_args() -> Arguments: concepts = args.concepts elif args.concepts_file: if os.path.isfile(args.concepts_file): - with open(args.concepts_file, 'r') as f: + with open(args.concepts_file, "r") as f: concepts = f.read().splitlines() concepts = [line.strip() for line in concepts] # remove string format @@ -508,15 +513,17 @@ def parse_args() -> Arguments: taxa_provider_lower = taxa_provider.lower() idx = [p.lower() for p in valid_taxa_providers].index(taxa_provider_lower) if idx < 0: - parser.error('Invalid taxonomy provider: {}'.format(taxa_provider)) + parser.error("Invalid taxonomy provider: {}".format(taxa_provider)) else: taxa_provider = valid_taxa_providers[idx] # Update concepts with all descendants new_concepts = [] for concept in concepts: - logging.debug('Finding taxa for {}'.format(concept)) - if taxa_provider == "fathomnet": # SPECIAL CASE: due to a bug in Micronaut, the fast worms provider is used directly for now + logging.debug("Finding taxa for {}".format(concept)) + if ( + taxa_provider == "fathomnet" + ): # SPECIAL CASE: due to a bug in Micronaut, the fast worms provider is used directly for now descendants_names = worms.get_descendants_names(concept) for new_concept in descendants_names: if new_concept not in new_concepts: @@ -530,38 +537,38 @@ def parse_args() -> Arguments: if new_concept not in new_concepts: new_concepts.append(new_concept) - logging.debug('Old concepts: {}'.format(concepts)) - logging.debug('New concepts: {}'.format(new_concepts)) + logging.debug("Old concepts: {}".format(concepts)) + logging.debug("New concepts: {}".format(new_concepts)) concepts = new_concepts # Parse contributor email contributor_email = args.contributor_email if contributor_email is not None: if contributor_email not in valid_contributor_emails: - parser.error('Invalid contributor email: {}'.format(contributor_email)) + parser.error("Invalid contributor email: {}".format(contributor_email)) # Parse start timestamp start_timestamp = args.start_timestamp start_timestamp_str = None if start_timestamp is not None: if start_timestamp > datetime.datetime.now(): - parser.error('Start timestamp cannot be in the future') - start_timestamp_str = start_timestamp.isoformat(timespec='milliseconds') + 'Z' + parser.error("Start timestamp cannot be in the future") + start_timestamp_str = start_timestamp.isoformat(timespec="milliseconds") + "Z" # Parse end timestamp end_timestamp = args.end_timestamp end_timestamp_str = None if end_timestamp is not None: if end_timestamp > datetime.datetime.now(): - parser.error('End timestamp cannot be in the future') - end_timestamp_str = end_timestamp.isoformat(timespec='milliseconds') + 'Z' + parser.error("End timestamp cannot be in the future") + end_timestamp_str = end_timestamp.isoformat(timespec="milliseconds") + "Z" # Parse imaging types imaging_types = args.imaging_types if imaging_types: for imaging_type in imaging_types: if imaging_type not in valid_imaging_types: - parser.error('Invalid imaging type: {}'.format(imaging_type)) + parser.error("Invalid imaging type: {}".format(imaging_type)) # Parse unverified/verified flags include_unverified = args.include_unverified @@ -581,7 +588,7 @@ def parse_args() -> Arguments: for owner_institution_code in owner_institution_codes: if owner_institution_code not in valid_owner_institution_codes: parser.error( - 'Invalid owner institution code: {}'.format(owner_institution_code) + "Invalid owner institution code: {}".format(owner_institution_code) ) # Pack specified constraints into base constraint instance @@ -605,27 +612,27 @@ def parse_args() -> Arguments: output = args.output # None if --count flag is set if output is not None: if not os.path.exists(output): - logging.info('Creating output directory {}'.format(output)) + logging.info("Creating output directory {}".format(output)) os.makedirs(output) elif not os.path.isdir(output): parser.error( - 'Output directory {} exists and is not a directory'.format(output) + "Output directory {} exists and is not a directory".format(output) ) # Create image output directory (if it doesn't exist) img_dir = args.img_dir # None if --img_download flag is not called if img_dir is not None: if not os.path.exists(img_dir): - logging.info('Creating output directory {}'.format(img_dir)) + logging.info("Creating output directory {}".format(img_dir)) os.makedirs(img_dir) elif not os.path.isdir(img_dir): parser.error( - 'Image download output directory {} exists and is not a directory'.format( + "Image download output directory {} exists and is not a directory".format( img_dir ) ) - logging.info('Successfully parsed flags') + logging.info("Successfully parsed flags") # Pack everything into an arguments instance return Arguments( @@ -644,7 +651,7 @@ def main(): try: dataset_images = get_images(args) # Get the images except KeyboardInterrupt: - logging.info('Image querying interrupted by user') + logging.info("Image querying interrupted by user") exit(0) if not dataset_images: # Ensure there are images to use @@ -653,7 +660,7 @@ def main(): try: error = generate_dataset(args, dataset_images) # Generate the dataset except KeyboardInterrupt: - logging.info('Dataset generation interrupted by user') + logging.info("Dataset generation interrupted by user") exit(0) if args.img_dir: @@ -662,11 +669,11 @@ def main(): args, dataset_images ) # download the images to specified output directory except KeyboardInterrupt: - logging.info('Image download interrupted by user') + logging.info("Image download interrupted by user") exit(0) else: exit(0) if error: - logging.error('Error generating dataset.') + logging.error("Error generating dataset.") exit(1) diff --git a/fathomnet/util.py b/fathomnet/util.py index 999cf72..1d808ba 100644 --- a/fathomnet/util.py +++ b/fathomnet/util.py @@ -16,7 +16,7 @@ def debug_format_response(response: requests.Response) -> str: A string representation of the response object. """ request = response.request - formatted_str = '''REQUEST: + formatted_str = """REQUEST: Method: {} URL: {} Headers: @@ -31,14 +31,18 @@ def debug_format_response(response: requests.Response) -> str: {} Content: {} - '''.format( + """.format( request.method, request.url, - '\n\t'.join(['{}: {}'.format(key, val) for key, val in request.headers.items()]), + "\n\t".join( + ["{}: {}".format(key, val) for key, val in request.headers.items()] + ), request.body, response.status_code, - '\n\t'.join(['{}: {}'.format(key, val) for key, val in response.headers.items()]), - response.content + "\n\t".join( + ["{}: {}".format(key, val) for key, val in response.headers.items()] + ), + response.content, ) return formatted_str diff --git a/test/__init__.py b/test/__init__.py index 6431a77..5211b97 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -6,9 +6,13 @@ TEST_X_API_KEY = None -if TEST_X_API_KEY is not None: # If test X-API-Key is provided, authenticate session to enable cases +if ( + TEST_X_API_KEY is not None +): # If test X-API-Key is provided, authenticate session to enable cases xapikey.auth(TEST_X_API_KEY) def skipIfNoAuth(f): # Decorator to skip test case if it needs auth - return skipIf(TEST_X_API_KEY is None, 'Test X-API-Key not specified. (see test/__init__.py)')(f) + return skipIf( + TEST_X_API_KEY is None, "Test X-API-Key not specified. (see test/__init__.py)" + )(f) diff --git a/test/test_activity.py b/test/test_activity.py index dd50005..8ea0029 100644 --- a/test/test_activity.py +++ b/test/test_activity.py @@ -1,8 +1,9 @@ -from unittest import TestCase, SkipTest +from unittest import SkipTest, TestCase -from . import skipIfNoAuth from fathomnet.api import activity +from . import skipIfNoAuth + class TestActivityAPI(TestCase): @skipIfNoAuth @@ -12,9 +13,9 @@ def test_find_all(self): @skipIfNoAuth def test_find_by_email(self): - activities = activity.find_by_email('kbarnard@mbari.org') + activities = activity.find_by_email("kbarnard@mbari.org") self.assertIsNotNone(activities) @skipIfNoAuth def test_find_by_email_admin(self): - raise SkipTest('Not implemented') + raise SkipTest("Not implemented") diff --git a/test/test_boundingboxes.py b/test/test_boundingboxes.py index 88108c5..9b9aaa6 100644 --- a/test/test_boundingboxes.py +++ b/test/test_boundingboxes.py @@ -1,30 +1,31 @@ -from unittest import TestCase, SkipTest +from unittest import SkipTest, TestCase -from . import skipIfNoAuth from fathomnet.api import boundingboxes +from . import skipIfNoAuth + class TestBoundingBoxesAPI(TestCase): @skipIfNoAuth def test_create_with_dto(self): - raise SkipTest('Write tests not yet implemented') # TODO create_with_dto test + raise SkipTest("Write tests not yet implemented") # TODO create_with_dto test def test_count_all(self): count = boundingboxes.count_all() self.assertIsNotNone(count) - self.assertEqual(count.objectType, 'BoundingBoxEntity') + self.assertEqual(count.objectType, "BoundingBoxEntity") self.assertGreater(count.count, 0) def test_find_concepts(self): concepts = boundingboxes.find_concepts() self.assertIsNotNone(concepts) - self.assertIn('Bathochordaeus', concepts) + self.assertIn("Bathochordaeus", concepts) def test_count_total_by_concept(self): concept_counts = boundingboxes.count_total_by_concept() self.assertIsNotNone(concept_counts) for concept_count in concept_counts: - if concept_count.concept == 'Bathochordaeus': + if concept_count.concept == "Bathochordaeus": self.assertGreater(concept_count.count, 0) break else: @@ -33,16 +34,16 @@ def test_count_total_by_concept(self): def test_find_observers(self): observers = boundingboxes.find_observers() self.assertIsNotNone(observers) - self.assertIn('kakani', observers) + self.assertIn("kakani", observers) def test_count_by_concept(self): - concept_count = boundingboxes.count_by_concept('Bathochordaeus') + concept_count = boundingboxes.count_by_concept("Bathochordaeus") self.assertIsNotNone(concept_count) - self.assertEqual(concept_count.concept, 'Bathochordaeus') + self.assertEqual(concept_count.concept, "Bathochordaeus") self.assertGreater(concept_count.count, 0) def test_find_by_user_defined_key(self): - user_defined_key = '00005716-ef67-44b9-0967-27ced2aab21e' + user_defined_key = "00005716-ef67-44b9-0967-27ced2aab21e" boxes = boundingboxes.find_by_user_defined_key(user_defined_key) self.assertIsNotNone(boxes) self.assertEqual(boxes[0].userDefinedKey, user_defined_key) @@ -53,30 +54,30 @@ def test_find_all_user_defined_keys(self): @skipIfNoAuth def test_upload_csv(self): - raise SkipTest('Write tests not yet implemented') # TODO upload_csv test + raise SkipTest("Write tests not yet implemented") # TODO upload_csv test def test_find_by_uuid(self): - uuid = 'eb05c713-9cd9-4cd9-bcaa-71f8e500825d' + uuid = "eb05c713-9cd9-4cd9-bcaa-71f8e500825d" box = boundingboxes.find_by_uuid(uuid) self.assertIsNotNone(box) self.assertEqual(box.uuid, uuid) @skipIfNoAuth def test_update(self): - raise SkipTest('Write tests not yet implemented') # TODO update test + raise SkipTest("Write tests not yet implemented") # TODO update test @skipIfNoAuth def test_delete(self): - raise SkipTest('Write tests not yet implemented') # TODO delete test + raise SkipTest("Write tests not yet implemented") # TODO delete test def test_audit_by_uuid(self): - uuid = '9f31b626-b118-4819-860c-3c1cfc04be3f' + uuid = "9f31b626-b118-4819-860c-3c1cfc04be3f" boxes = boundingboxes.audit_by_uuid(uuid) self.assertIsNotNone(boxes) self.assertEqual(boxes[0].uuid, uuid) def test_audit_by_user_defined_key(self): - user_defined_key = '285aa889-f771-46c0-6763-c0398712ba1e' + user_defined_key = "285aa889-f771-46c0-6763-c0398712ba1e" boxes = boundingboxes.audit_by_user_defined_key(user_defined_key) self.assertIsNotNone(boxes) self.assertEqual(boxes[0].userDefinedKey, user_defined_key) @@ -84,29 +85,29 @@ def test_audit_by_user_defined_key(self): def test_find_searchable_concepts(self): searchable_concepts = boundingboxes.find_searchable_concepts() self.assertIsNotNone(searchable_concepts) - self.assertIn('Bathochordaeus', searchable_concepts) + self.assertIn("Bathochordaeus", searchable_concepts) def test_find_by_observer_uuid(self): - observer_uuid = '9dba65e1-5974-46df-9276-98c461beba9f' + observer_uuid = "9dba65e1-5974-46df-9276-98c461beba9f" boxes = boundingboxes.find_by_observer_uuid(observer_uuid) self.assertIsNotNone(boxes) def test_find_by_verifier_uuid(self): - verifier_uuid = '9dba65e1-5974-46df-9276-98c461beba9f' + verifier_uuid = "9dba65e1-5974-46df-9276-98c461beba9f" boxes = boundingboxes.find_by_verifier_uuid(verifier_uuid) self.assertIsNotNone(boxes) def test_audit_by_concepts(self): - concepts = ['Bathochordaeus', "a'a", 'Abraliopsis (Boreabraliopsis) felis'] + concepts = ["Bathochordaeus", "a'a", "Abraliopsis (Boreabraliopsis) felis"] boxes = boundingboxes.audit_by_concepts(concepts) self.assertIsNotNone(boxes) def test_audit_by_verifier(self): - verifier = 'brian@mbari.org' + verifier = "brian@mbari.org" boxes = boundingboxes.audit_by_verifier(verifier) self.assertIsNotNone(boxes) def test_audit_by_observer(self): - observer = 'brian@mbari.org' + observer = "brian@mbari.org" boxes = boundingboxes.audit_by_observer(observer) self.assertIsNotNone(boxes) diff --git a/test/test_comments.py b/test/test_comments.py index 93a00fe..5bbaa16 100644 --- a/test/test_comments.py +++ b/test/test_comments.py @@ -1,39 +1,40 @@ -from unittest import TestCase, SkipTest +from unittest import SkipTest, TestCase -from . import skipIfNoAuth from fathomnet.api import comments +from . import skipIfNoAuth + class TestCommentsAPI(TestCase): @skipIfNoAuth def test_create(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") def test_find_by_uuid(self): - uuid = 'c3e98572-89ab-40ac-8ec1-2cc388b129dc' + uuid = "c3e98572-89ab-40ac-8ec1-2cc388b129dc" comment = comments.find_by_uuid(uuid) self.assertIsNotNone(comment) @skipIfNoAuth def test_update(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") @skipIfNoAuth def test_delete(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") @skipIfNoAuth def find_by_bounding_box_uuid(self): - bounding_box_uuid = 'c4822967-13b7-435d-9cba-5a7f52f7457f' + bounding_box_uuid = "c4822967-13b7-435d-9cba-5a7f52f7457f" res_comments = comments.find_by_bounding_box_uuid(bounding_box_uuid) self.assertIsNotNone(res_comments) @skipIfNoAuth def test_find_by_email(self): - email = 'erm.butler@gmail.com' + email = "erm.butler@gmail.com" res_comments = comments.find_by_email(email) self.assertIsNotNone(res_comments) @skipIfNoAuth def test_flag(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") diff --git a/test/test_darwincore.py b/test/test_darwincore.py index f71f0f3..694f1de 100644 --- a/test/test_darwincore.py +++ b/test/test_darwincore.py @@ -4,7 +4,6 @@ class TestDarwinCoreAPI(TestCase): - def test_find_owner_institution_codes(self): owner_institution_codes = darwincore.find_owner_institution_codes() self.assertIsNotNone(owner_institution_codes) diff --git a/test/test_firebase.py b/test/test_firebase.py index 580555c..f7f66f0 100644 --- a/test/test_firebase.py +++ b/test/test_firebase.py @@ -1,16 +1,17 @@ -from unittest import TestCase, SkipTest +from unittest import SkipTest, TestCase -from . import skipIfNoAuth from fathomnet.api import firebase +from . import skipIfNoAuth + class TestFirebaseAPI(TestCase): def test_auth(self): - raise SkipTest('Firebase authentication not yet implemented') + raise SkipTest("Firebase authentication not yet implemented") auth_header = firebase.auth() self.assertIsNotNone(auth_header) - self.assertEqual(auth_header.type, 'Bearer') + self.assertEqual(auth_header.type, "Bearer") @skipIfNoAuth def test_test(self): diff --git a/test/test_geoimages.py b/test/test_geoimages.py index 7389c15..b528233 100644 --- a/test/test_geoimages.py +++ b/test/test_geoimages.py @@ -14,8 +14,7 @@ def test_find_all(self): def test_count(self): geo_image_constraints = dto.GeoImageConstraints( - concept='Bathochordaeus', - limit=10 + concept="Bathochordaeus", limit=10 ) count = geoimages.count(geo_image_constraints) self.assertIsNotNone(count) @@ -23,15 +22,14 @@ def test_count(self): def test_find(self): geo_image_constraints = dto.GeoImageConstraints( - concept='Bathochordaeus', - limit=10 + concept="Bathochordaeus", limit=10 ) results = geoimages.find(geo_image_constraints) self.assertIsNotNone(results) self.assertGreater(len(results), 0) def test_find_by_image_set_upload_uuid(self): - image_set_upload_uuid = '9c891f7a-976b-4376-acf9-31681e1b3a15' + image_set_upload_uuid = "9c891f7a-976b-4376-acf9-31681e1b3a15" results = geoimages.find_by_image_set_upload_uuid(image_set_upload_uuid) self.assertIsNotNone(results) self.assertGreater(len(results), 0) diff --git a/test/test_images.py b/test/test_images.py index 58a589b..0ee1dfb 100644 --- a/test/test_images.py +++ b/test/test_images.py @@ -1,9 +1,10 @@ -from unittest import TestCase, SkipTest +from unittest import SkipTest, TestCase -from . import skipIfNoAuth from fathomnet import dto from fathomnet.api import images +from . import skipIfNoAuth + class TestImagesAPI(TestCase): def test_find_all_alt(self): @@ -15,12 +16,14 @@ def test_find_all_alt(self): @skipIfNoAuth def test_create_if_not_exists(self): - raise SkipTest('Write tests not yet implemented') # TODO create_if_not_exists test + raise SkipTest( + "Write tests not yet implemented" + ) # TODO create_if_not_exists test def test_count_all(self): count = images.count_all() self.assertIsNotNone(count) - self.assertEqual(count.objectType, 'ImageEntity') + self.assertEqual(count.objectType, "ImageEntity") self.assertGreater(count.count, 0) def test_find_all(self): @@ -33,18 +36,16 @@ def test_find_all(self): def test_find_distinct_submitter(self): submitters = images.find_distinct_submitter() self.assertIsNotNone(submitters) - self.assertIn('brian@mbari.org', submitters) + self.assertIn("brian@mbari.org", submitters) def test_list_imaging_types(self): imaging_types = images.list_imaging_types() self.assertIsNotNone(imaging_types) - self.assertIn('ROV', imaging_types) + self.assertIn("ROV", imaging_types) def test_find(self): geo_image_constraints = dto.GeoImageConstraints( - concept='Bathochordaeus charon', - taxaProviderName='mbari', - limit=10 + concept="Bathochordaeus charon", taxaProviderName="mbari", limit=10 ) results = images.find(geo_image_constraints) self.assertIsNotNone(results) @@ -56,7 +57,11 @@ def test_find(self): self.fail() def test_find_by_concept(self): - for concept in ('Bathochordaeus', "a'a", 'Abraliopsis (Boreabrealiopsis) felis'): + for concept in ( + "Bathochordaeus", + "a'a", + "Abraliopsis (Boreabrealiopsis) felis", + ): results = images.find_by_concept(concept) self.assertIsNotNone(results) for image in results: @@ -67,21 +72,21 @@ def test_find_by_concept(self): self.fail() def test_find_by_contributors_email(self): - contributors_email = 'kbarnard@mbari.org' + contributors_email = "kbarnard@mbari.org" results = images.find_by_contributors_email(contributors_email) self.assertIsNotNone(results) for image in results: self.assertEqual(image.contributorsEmail, contributors_email) def test_count_by_submitter(self): - contributors_email = 'brian@mbari.org' + contributors_email = "brian@mbari.org" count = images.count_by_submitter(contributors_email) self.assertIsNotNone(count) self.assertEqual(count.contributorsEmail, contributors_email) self.assertGreater(count.count, 0) def test_find_by_observer(self): - observer = 'kakani' + observer = "kakani" results = images.find_by_observer(observer) self.assertIsNotNone(results) for image in results: @@ -92,15 +97,15 @@ def test_find_by_observer(self): self.fail() def test_find_by_sha256(self): - sha256 = 'b572f8ca40b5af19972d8c63ac5fa4e33df215132c4c16c47b6b483ac9d07299' + sha256 = "b572f8ca40b5af19972d8c63ac5fa4e33df215132c4c16c47b6b483ac9d07299" results = images.find_by_sha256(sha256) self.assertIsNotNone(results) for image in results: self.assertEqual(image.sha256, sha256) def test_find_by_tag_key(self): - key = 'source' - value = 'TEST' + key = "source" + value = "TEST" results = images.find_by_tag_key(key, value) self.assertIsNotNone(results) for image in results: @@ -111,33 +116,33 @@ def test_find_by_tag_key(self): self.fail() def test_find_by_url(self): - url = 'https://fathomnet.org/static/m3/framegrabs/Ventana/images/3069/00_34_35_02.png' + url = "https://fathomnet.org/static/m3/framegrabs/Ventana/images/3069/00_34_35_02.png" image = images.find_by_url(url) self.assertIsNotNone(image) self.assertEqual(image.url, url) def test_find_by_uuid_in_list(self): uuids = [ - 'b7736c31-0b78-4761-840c-e3781d6845be', - '9b0bc09b-85b2-4b72-99db-bf91b36a9f89', - '8bf45f3c-4d11-418c-b384-2dfdc2e6c01c', - 'bfa62293-1723-4643-8954-a60786f10ad5', - 'a1b4d4ff-a22c-417b-921f-a1dd98c21f7a' + "b7736c31-0b78-4761-840c-e3781d6845be", + "9b0bc09b-85b2-4b72-99db-bf91b36a9f89", + "8bf45f3c-4d11-418c-b384-2dfdc2e6c01c", + "bfa62293-1723-4643-8954-a60786f10ad5", + "a1b4d4ff-a22c-417b-921f-a1dd98c21f7a", ] results = images.find_by_uuid_in_list(uuids) self.assertIsNotNone(results) self.assertSetEqual(set(image.uuid for image in results), set(uuids)) def test_find_by_uuid(self): - uuid = 'b7736c31-0b78-4761-840c-e3781d6845be' + uuid = "b7736c31-0b78-4761-840c-e3781d6845be" image = images.find_by_uuid(uuid) self.assertIsNotNone(image) self.assertEqual(image.uuid, uuid) @skipIfNoAuth def test_update(self): - raise SkipTest('Write tests not yet implemented') # TODO update test + raise SkipTest("Write tests not yet implemented") # TODO update test @skipIfNoAuth def test_delete(self): - raise SkipTest('Write tests not yet implemented') # TODO delete test + raise SkipTest("Write tests not yet implemented") # TODO delete test diff --git a/test/test_imagesetuploads.py b/test/test_imagesetuploads.py index 4b3f7d9..992d51e 100644 --- a/test/test_imagesetuploads.py +++ b/test/test_imagesetuploads.py @@ -8,7 +8,7 @@ class TestImageSetUploadsAPI(TestCase): def test_count_all(self): count = imagesetuploads.count_all() self.assertIsNotNone(count) - self.assertEqual(count.objectType, 'ImageSetUploadEntity') + self.assertEqual(count.objectType, "ImageSetUploadEntity") self.assertGreater(count.count, 0) def test_find_collections(self): @@ -21,7 +21,7 @@ def test_find_collections(self): def test_find_contributors(self): contributors = imagesetuploads.find_contributors() self.assertIsNotNone(contributors) - self.assertIn('brian@mbari.org', contributors) + self.assertIn("brian@mbari.org", contributors) def test_find_rejection_reasons(self): rejection_reasons = imagesetuploads.find_rejection_reasons() @@ -29,7 +29,7 @@ def test_find_rejection_reasons(self): self.assertGreater(len(rejection_reasons), 0) def test_find_by_contributor(self): - contributors_email = 'brian@mbari.org' + contributors_email = "brian@mbari.org" results = imagesetuploads.find_by_contributor(contributors_email) self.assertIsNotNone(results) self.assertGreater(len(results), 0) @@ -37,18 +37,18 @@ def test_find_by_contributor(self): self.assertEqual(image_set.contributorsEmail, contributors_email) def test_find_by_image_uuid(self): - image_uuid = '4f5265f7-31cd-490d-a807-bc350356435d' + image_uuid = "4f5265f7-31cd-490d-a807-bc350356435d" results = imagesetuploads.find_by_image_uuid(image_uuid) self.assertIsNotNone(results) def test_stats(self): - image_set_upload_uuid = '9da52a10-f7db-4897-a886-2e3fbf6b9d36' + image_set_upload_uuid = "9da52a10-f7db-4897-a886-2e3fbf6b9d36" stats = imagesetuploads.stats(image_set_upload_uuid) self.assertIsNotNone(stats) self.assertEqual(stats.imageSetUploadUuid, image_set_upload_uuid) def test_find_by_uuid(self): - uuid = '9c891f7a-976b-4376-acf9-31681e1b3a15' + uuid = "9c891f7a-976b-4376-acf9-31681e1b3a15" image_set = imagesetuploads.find_by_uuid(uuid) self.assertIsNotNone(image_set) self.assertEqual(image_set.uuid, uuid) diff --git a/test/test_regions.py b/test/test_regions.py index 2f8470d..6f0278e 100644 --- a/test/test_regions.py +++ b/test/test_regions.py @@ -1,9 +1,10 @@ -from unittest import TestCase, SkipTest +from unittest import SkipTest, TestCase -from . import skipIfNoAuth from fathomnet import dto from fathomnet.api import regions +from . import skipIfNoAuth + class TestRegionsAPI(TestCase): def test_find_all(self): @@ -23,7 +24,7 @@ def test_find_all_paged(self): @skipIfNoAuth def test_sync(self): - raise SkipTest('Sync endpoint not yet implemented') # TODO sync test + raise SkipTest("Sync endpoint not yet implemented") # TODO sync test def test_find_at(self): latitude = 35.6 diff --git a/test/test_tags.py b/test/test_tags.py index 5feec59..ce820e0 100644 --- a/test/test_tags.py +++ b/test/test_tags.py @@ -1,34 +1,35 @@ -from unittest import TestCase, SkipTest +from unittest import SkipTest, TestCase -from . import skipIfNoAuth from fathomnet.api import tags +from . import skipIfNoAuth + class TestTagsAPI(TestCase): @skipIfNoAuth def test_create_with_dto(self): - raise SkipTest('Write tests not yet implemented') # TODO create_with_dto test + raise SkipTest("Write tests not yet implemented") # TODO create_with_dto test def test_find_by_uuid(self): - uuid = '4c7f468c-ab41-4003-a048-d194a5f4ff4a' + uuid = "4c7f468c-ab41-4003-a048-d194a5f4ff4a" tag = tags.find_by_uuid(uuid) self.assertIsNotNone(tag) self.assertEqual(tag.uuid, uuid) def test_find_by_image_uuid_and_key(self): - image_uuid = '70df75b3-02ad-4a33-a5c6-6ed90313f752' - key = 'source' + image_uuid = "70df75b3-02ad-4a33-a5c6-6ed90313f752" + key = "source" tag_list = tags.find_by_image_uuid_and_key(image_uuid, key) self.assertIsNotNone(tag_list) tag = tag_list[0] self.assertEqual(tag.key, key) - self.assertEqual(tag.value, 'MBARI/VARS') + self.assertEqual(tag.value, "MBARI/VARS") self.assertEqual(tag.imageUuid, image_uuid) @skipIfNoAuth def test_update(self): - raise SkipTest('Write tests not yet implemented') # TODO update test + raise SkipTest("Write tests not yet implemented") # TODO update test @skipIfNoAuth def test_delete(self): - raise SkipTest('Write tests not yet implemented') # TODO delete test + raise SkipTest("Write tests not yet implemented") # TODO delete test diff --git a/test/test_taxa.py b/test/test_taxa.py index 5b8d35d..1093a29 100644 --- a/test/test_taxa.py +++ b/test/test_taxa.py @@ -14,19 +14,19 @@ def test_list_taxa_providers(self): self.assertGreater(len(taxa_providers), 0) def test_find_children(self): - children = taxa.find_children('fathomnet', 'Bathochordaeus') + children = taxa.find_children("fathomnet", "Bathochordaeus") self.assertIsNotNone(children) - self.assertIn('Bathochordaeus mcnutti', set(child.name for child in children)) + self.assertIn("Bathochordaeus mcnutti", set(child.name for child in children)) def test_find_parent(self): - parent = taxa.find_parent('fathomnet', 'Bathochordaeus mcnutti') + parent = taxa.find_parent("fathomnet", "Bathochordaeus mcnutti") self.assertIsNotNone(parent) - self.assertEqual(parent.name, 'Bathochordaeus') + self.assertEqual(parent.name, "Bathochordaeus") def test_find_taxa(self): - concept = 'Bathochordaeus' - rank = 'Genus' - results = taxa.find_taxa('fathomnet', concept) + concept = "Bathochordaeus" + rank = "Genus" + results = taxa.find_taxa("fathomnet", concept) self.assertIsNotNone(results) self.assertGreater(len(results), 0) for taxa_item in results: diff --git a/test/test_topics.py b/test/test_topics.py index 4f62f0f..019f9ff 100644 --- a/test/test_topics.py +++ b/test/test_topics.py @@ -1,26 +1,27 @@ -from unittest import TestCase, SkipTest +from unittest import SkipTest, TestCase -from . import skipIfNoAuth from fathomnet.api import topics +from . import skipIfNoAuth + class TestTopicsAPI(TestCase): @skipIfNoAuth def test_create(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") def test_find_by_uuid(self): - uuid = '411fa644-4f24-49c3-bdc1-b40f91aecaab' + uuid = "411fa644-4f24-49c3-bdc1-b40f91aecaab" topic = topics.find_by_uuid(uuid) self.assertIsNotNone(topic) @skipIfNoAuth def test_update(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") @skipIfNoAuth def test_delete(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") @skipIfNoAuth def test_find(self): @@ -29,4 +30,4 @@ def test_find(self): @skipIfNoAuth def test_find_by_email(self): - raise SkipTest('Not implemented') + raise SkipTest("Not implemented") diff --git a/test/test_users.py b/test/test_users.py index 26b04d0..cdefdd9 100644 --- a/test/test_users.py +++ b/test/test_users.py @@ -1,14 +1,15 @@ -from unittest import TestCase, SkipTest +from unittest import SkipTest, TestCase -from . import skipIfNoAuth from fathomnet.api import users from fathomnet.dto import Pageable +from . import skipIfNoAuth + class TestUsersAPI(TestCase): @skipIfNoAuth def test_create_with_dto(self): - raise SkipTest('Write tests not yet implemented') # TODO create_with_dto test + raise SkipTest("Write tests not yet implemented") # TODO create_with_dto test @skipIfNoAuth def test_find_all(self): @@ -18,15 +19,15 @@ def test_find_all(self): @skipIfNoAuth def test_find_all_admin(self): - raise SkipTest('Not implemented') + raise SkipTest("Not implemented") @skipIfNoAuth def test_update_user_data(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") @skipIfNoAuth def test_update_user_data_admin(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") @skipIfNoAuth def test_get_api_key(self): @@ -35,21 +36,21 @@ def test_get_api_key(self): @skipIfNoAuth def test_create_new_api_key(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") @skipIfNoAuth def test_delete_api_key(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") def test_count_all(self): count = users.count_all() self.assertIsNotNone(count) - self.assertEqual(count.objectType, 'FathomnetIdentityEntity') + self.assertEqual(count.objectType, "FathomnetIdentityEntity") self.assertGreater(count.count, 0) @skipIfNoAuth def test_disable_by_uuid(self): - raise SkipTest('Write tests not yet implemented') + raise SkipTest("Write tests not yet implemented") def test_find_expertise(self): expertise = users.find_expertise() @@ -69,7 +70,7 @@ def test_find_by_authentication(self): self.assertIsNotNone(user) def test_find_by_firebase_uid(self): - raise SkipTest('Not yet implemented') + raise SkipTest("Not yet implemented") @skipIfNoAuth def test_verify(self): @@ -77,18 +78,18 @@ def test_verify(self): self.assertIsNotNone(auth) def test_find_by_display_name(self): - res_users = users.find_by_display_name('Brian') + res_users = users.find_by_display_name("Brian") self.assertIsNotNone(res_users) @skipIfNoAuth def test_find_by_organization(self): - res_users = users.find_by_organization('mbari') + res_users = users.find_by_organization("mbari") self.assertIsNotNone(res_users) def test_find_by_uuid(self): - res_user = users.find_by_uuid('9dba65e1-5974-46df-9276-98c461beba9f') + res_user = users.find_by_uuid("9dba65e1-5974-46df-9276-98c461beba9f") self.assertIsNotNone(res_user) def test_find_badges_by_uuid(self): - res_badges = users.find_badges_by_uuid('9dba65e1-5974-46df-9276-98c461beba9f') + res_badges = users.find_badges_by_uuid("9dba65e1-5974-46df-9276-98c461beba9f") self.assertIsNotNone(res_badges) diff --git a/test/test_worms.py b/test/test_worms.py index 1ffc78a..02e7336 100644 --- a/test/test_worms.py +++ b/test/test_worms.py @@ -6,6 +6,7 @@ def check_for_name(node: WormsNode, name: str) -> bool: """Recursively check if a node or one of its descendants has a given name.""" + def recurse_check(node: WormsNode) -> bool: print(node.name, name) if node.name == name: @@ -13,11 +14,11 @@ def recurse_check(node: WormsNode) -> bool: if not node.children: return False return any(recurse_check(child) for child in node.children) + return recurse_check(node) class TestWormsAPI(TestCase): - def test_count_names(self): count = worms.count_names() self.assertIsNotNone(count) @@ -33,84 +34,97 @@ def test_get_names_by_aphia_id(self): self.assertEqual(2, names_obj.aphiaId) def test_get_ancestors_names(self): - ancestors = worms.get_ancestors_names('Animalia') + ancestors = worms.get_ancestors_names("Animalia") self.assertIsNotNone(ancestors) - self.assertIn('object', ancestors) + self.assertIn("object", ancestors) def test_get_children_names(self): - children = worms.get_children_names('Bathochordaeus') + children = worms.get_children_names("Bathochordaeus") self.assertIsNotNone(children) - self.assertIn('Bathochordaeus charon', children) + self.assertIn("Bathochordaeus charon", children) def test_get_descendants_names(self): - descendants = worms.get_descendants_names('Bathochordaeus') + descendants = worms.get_descendants_names("Bathochordaeus") self.assertIsNotNone(descendants) - self.assertIn('Bathochordaeus charon', descendants) - - siph_all_descendants = worms.get_descendants_names('Siphonophorae', accepted=False) - siph_accepted_descendants = worms.get_descendants_names('Siphonophorae', accepted=True) + self.assertIn("Bathochordaeus charon", descendants) + + siph_all_descendants = worms.get_descendants_names( + "Siphonophorae", accepted=False + ) + siph_accepted_descendants = worms.get_descendants_names( + "Siphonophorae", accepted=True + ) self.assertIsNotNone(siph_all_descendants) self.assertIsNotNone(siph_accepted_descendants) self.assertGreater(len(siph_all_descendants), len(siph_accepted_descendants)) - siph_all_descendants = worms.get_descendants_names('Siphonophorae', accepted=False) - siph_accepted_descendants = worms.get_descendants_names('Siphonophorae', accepted=True) + siph_all_descendants = worms.get_descendants_names( + "Siphonophorae", accepted=False + ) + siph_accepted_descendants = worms.get_descendants_names( + "Siphonophorae", accepted=True + ) self.assertIsNotNone(siph_all_descendants) self.assertIsNotNone(siph_accepted_descendants) self.assertGreater(len(siph_all_descendants), len(siph_accepted_descendants)) def test_get_parent_name(self): - parent = worms.get_parent_name('Bathochordaeus charon') + parent = worms.get_parent_name("Bathochordaeus charon") self.assertIsNotNone(parent) - self.assertEqual('Bathochordaeus', parent) + self.assertEqual("Bathochordaeus", parent) def test_find_names_containing(self): - names = worms.find_names_containing('pendicula') + names = worms.find_names_containing("pendicula") self.assertIsNotNone(names) - self.assertIn('Appendicularia', names) + self.assertIn("Appendicularia", names) def test_find_names_by_prefix(self): - names = worms.find_names_by_prefix('Appendicula') + names = worms.find_names_by_prefix("Appendicula") self.assertIsNotNone(names) - self.assertIn('Appendicularia', names) + self.assertIn("Appendicularia", names) def test_get_synonyms_for_name(self): - synonyms = worms.get_synonyms_for_name('Appendicularia') + synonyms = worms.get_synonyms_for_name("Appendicularia") self.assertIsNotNone(synonyms) - self.assertIn('larvaceans', synonyms) + self.assertIn("larvaceans", synonyms) def test_get_ancestors(self): - root_node = worms.get_ancestors('Animalia') + root_node = worms.get_ancestors("Animalia") self.assertIsNotNone(root_node) - self.assertTrue(check_for_name(root_node, 'object'), 'No "object" node found in ancestors') + self.assertTrue( + check_for_name(root_node, "object"), 'No "object" node found in ancestors' + ) def test_get_children(self): - children = worms.get_children('Bathochordaeus') + children = worms.get_children("Bathochordaeus") self.assertIsNotNone(children) for child in children: - if child.name == 'Bathochordaeus charon': + if child.name == "Bathochordaeus charon": return self.fail('No "Bathochordaeus charon" child found') def test_get_descendants(self): - taxa_node = worms.get_descendants('Bathochordaeus') + taxa_node = worms.get_descendants("Bathochordaeus") self.assertIsNotNone(taxa_node) - self.assertTrue(check_for_name(taxa_node, 'Bathochordaeus charon'), 'No "Bathochordaeus charon" descendant found') + self.assertTrue( + check_for_name(taxa_node, "Bathochordaeus charon"), + 'No "Bathochordaeus charon" descendant found', + ) def test_get_parent(self): - parent = worms.get_parent('Bathochordaeus charon') + parent = worms.get_parent("Bathochordaeus charon") self.assertIsNotNone(parent) - self.assertEqual('Bathochordaeus', parent.name) + self.assertEqual("Bathochordaeus", parent.name) def test_get_info(self): - info = worms.get_info('Bathochordaeus charon') + info = worms.get_info("Bathochordaeus charon") self.assertIsNotNone(info) - self.assertEqual('Bathochordaeus charon', info.name) + self.assertEqual("Bathochordaeus charon", info.name) def test_find_taxa_by_prefix(self): - taxa = worms.find_taxa_by_prefix('Appendicula') + taxa = worms.find_taxa_by_prefix("Appendicula") self.assertIsNotNone(taxa) for taxon in taxa: - if taxon.name == 'Appendicularia': + if taxon.name == "Appendicularia": return self.fail('No "Appendicularia" taxon found') diff --git a/test/test_xapikey.py b/test/test_xapikey.py index f641280..938d370 100644 --- a/test/test_xapikey.py +++ b/test/test_xapikey.py @@ -1,16 +1,17 @@ from unittest import TestCase -from . import skipIfNoAuth, TEST_X_API_KEY from fathomnet.api import xapikey +from . import TEST_X_API_KEY, skipIfNoAuth + class TestXAPIKeyAPI(TestCase): @skipIfNoAuth def test_auth(self): auth_header = xapikey.auth(TEST_X_API_KEY) self.assertIsNotNone(auth_header) - self.assertEqual(auth_header.type, 'Bearer') + self.assertEqual(auth_header.type, "Bearer") message = xapikey.index(auth_header) self.assertIsNotNone(message) - self.assertEqual(message.message, 'Authentication successful') + self.assertEqual(message.message, "Authentication successful")