From c8451a347004621c92efa4a6a59f010493221630 Mon Sep 17 00:00:00 2001 From: Martin Metzker <60664558+16Martin@users.noreply.github.com> Date: Sat, 30 Nov 2024 23:40:06 +0100 Subject: [PATCH] fixup(findsources): flake8 and codespell --- capycli/bom/findsources.py | 56 ++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/capycli/bom/findsources.py b/capycli/bom/findsources.py index 3d0f407..6b56ecb 100644 --- a/capycli/bom/findsources.py +++ b/capycli/bom/findsources.py @@ -56,35 +56,33 @@ def _validate_key(self, key: Tuple[str, str]) -> Tuple[str, str]: """Ensure our keys are hashable.""" if len(key) != 2 or key != (str(key[0]), str(key[1])): raise KeyError(f'{self.__class__.__name__} key must consist of' - 'a project name and a version string') + 'a project name and a version string') return key - def add(self, project: str, version:str , tag: str) -> None: + def add(self, project: str, version: str, tag: str) -> None: """Cache a tag for a specific project and version.""" key = self._validate_key((project, version)) tags = self.data.setdefault(key, set()) tags.add(tag) - def filter(self, project: str, version: str, data: Any) ->List[str]: + def filter(self, project: str, version: str, data: Any) -> List[str]: """Remove all cached entries from @data.""" if isinstance(data, str): data = [data] elif not isinstance(data, Iterable): - raise ValueError('Expecting an interable of tags!') + raise ValueError('Expecting an iterable of tags!') key = self._validate_key((project, version)) return [item for item in data - if item not in self.data.get(key,[]) + if item not in self.data.get(key, []) and len(item) > 0] - def filter_and_cache(self, project: str, version: str, data: Any - ) ->List[str]: + def filter_and_cache(self, project: str, version: str, data: Any) -> List[str]: """Convenience method to to filtering and adding in one run.""" candidates = set(self.filter(project, version, data)) for tag in candidates: self.add(project, version, tag) return list(candidates) - def __init__(self) -> None: self.verbose: bool = False self.version_regex = re.compile(r"(\d+[._])+\d+") @@ -123,18 +121,19 @@ def is_sourcefile_accessible(self, sourcefile_url: str) -> bool: def github_request(url: str, username: str = "", token: str = "", return_response: bool = False, allow_redirects: bool = True, # default in requests - ) -> Any: + ) -> Any: try: headers = {} if token: headers["Authorization"] = "token " + token if username: headers["Username"] = username - response = requests.get(url, headers=headers, allow_redirects=allow_redirects) + response = requests.get(url, headers=headers, + allow_redirects=allow_redirects) if not response.ok: if response.status_code == 429 \ - or 'rate limit exceeded' in response.reason \ - or 'API rate limit exceeded' in response.json().get('message', ''): + or 'rate limit exceeded' in response.reason \ + or 'API rate limit exceeded' in response.json().get('message', ''): print( Fore.LIGHTYELLOW_EX + " Github API rate limit exceeded - wait 60s and retry ... " + @@ -222,7 +221,7 @@ def _get_github_repo(self, github_ref: str) -> Dict[str, Any]: """ url = 'api.github.com/repos/' gh_ref = urlparse(github_ref, scheme='no_scheme') - if gh_ref.scheme == 'no_scheme': # interprete @github_ref as OWNER/REPO + if gh_ref.scheme == 'no_scheme': # interpret @github_ref as OWNER/REPO url += gh_ref.path elif not gh_ref.netloc.endswith('github.com'): raise ValueError(f'{github_ref} is not an expected @github_ref!') @@ -241,7 +240,7 @@ def _get_github_repo(self, github_ref: str) -> Dict[str, Any]: raise ValueError(f"Unable to make @github_ref {github_ref} work!") return repo - def _get_link_page(self, res: requests.Response, which: str='next') -> int: + def _get_link_page(self, res: requests.Response, which: str = 'next') -> int: """Fetch only page number from link-header.""" try: url = urlparse(res.links[which]['url']) @@ -250,8 +249,8 @@ def _get_link_page(self, res: requests.Response, which: str='next') -> int: return 1 def get_matching_source_url(self, version: Any, github_ref: str, - version_prefix: Any=None - ) -> str: + version_prefix: Any = None + ) -> str: """Find a URL to download source code from GitHub. We are looking for the source code in @github_ref at @version. @@ -271,23 +270,23 @@ def get_matching_source_url(self, version: Any, github_ref: str, print_yellow(" " + str(err)) return "" - tags_url = repo['tags_url'] +'?per_page=100' + tags_url = repo['tags_url'] + '?per_page=100' git_refs_url_tpl = repo['git_refs_url'].replace('{/sha}', '{sha}', 1) res = self.github_request(tags_url, self.github_name, - self.github_token, return_response=True) + self.github_token, return_response=True) pages = self._get_link_page(res, 'last') for _ in range(pages): # we prefer this over "while True" # note: in res.json() we already have the first results page try: tags = [tag for tag in res.json() - if version_prefix is None \ + if version_prefix is None or tag['name'].startswith(version_prefix)] source_url = self.get_matching_tag(tags, version, tags_url) if len(source_url) > 0: # we found what we believe is return source_url # the correct source_url - except (TypeError, KeyError, AttributeError) as err: + except (TypeError, KeyError, AttributeError): # res.json() did not give us an iterable of things where # 'name' is a viable index, for instance an error message tags = [] @@ -307,14 +306,14 @@ def get_matching_source_url(self, version: Any, github_ref: str, # ORDER BY tag-name-length DESC by_size = sorted([(len(tag['ref']), tag) for tag in w_prefix], - key=lambda x:x[0]) + key=lambda x: x[0]) w_prefix = [itm[1] for itm in reversed(by_size)] - transformed_for_get_matching_tags = [{ - 'name': tag['ref'].replace('refs/tags/', '', 1), - 'zipball_url': tag['url'].replace( + transformed_for_get_matching_tags = [ + {'name': tag['ref'].replace('refs/tags/', '', 1), + 'zipball_url': tag['url'].replace( '/git/refs/tags/', '/zipball/refs/tags/', 1), - } for tag in w_prefix] + } for tag in w_prefix] source_url = self.get_matching_tag( transformed_for_get_matching_tags, version, tags_url) if len(source_url) > 0: # we found what we believe is @@ -322,8 +321,8 @@ def get_matching_source_url(self, version: Any, github_ref: str, try: url = res.links['next']['url'] res = self.github_request(url, self.github_name, - self.github_token, return_response=True) - except KeyError as err: # no more result pages + self.github_token, return_response=True) + except KeyError: # no more result pages break print_yellow(" No matching tag for version " + version + " found") return "" @@ -364,7 +363,6 @@ def find_github_url(self, component: Component, use_language: bool = True) -> st name_match = [r for r in repositories.get("items") if component_name in r.get("name", "")] if len(name_match): for match in name_match: - tag_info = self.github_request(match["tags_url"], self.github_name, self.github_token) source_url = self.get_matching_source_url(component.version, match["tags_url"]) if len(name_match) == 1: return source_url @@ -457,7 +455,7 @@ def get_github_source_url(self, github_url: str, version: str) -> str: def check_for_github_error(self, tag_info: get_github_info_type) -> List[Dict[str, Any]]: """This method was introduced to check the output of get_github_info() for errors. - Removed, becasue get_github_info was removed. + Removed, because get_github_info was removed. """ raise NotImplementedError( "Removed with introduction of get_matchting_source_tag!")