From 7c17421df855764a051f7ba49ccc20823a351fed Mon Sep 17 00:00:00 2001 From: Daniel Braun Date: Fri, 14 Apr 2023 00:35:46 +0300 Subject: [PATCH] feat: multiple binary names (#40) --- nanolayer/__main__.py | 4 +- nanolayer/cli/install.py | 8 +- .../gh_release/gh_release_installer.py | 247 ++++++++++-------- 3 files changed, 141 insertions(+), 118 deletions(-) diff --git a/nanolayer/__main__.py b/nanolayer/__main__.py index 2d4ed1ef..5e1b831f 100644 --- a/nanolayer/__main__.py +++ b/nanolayer/__main__.py @@ -1,5 +1,7 @@ -import typer import os + +import typer + from nanolayer.cli.install import app as install_app from nanolayer.utils.analytics import setup_analytics from nanolayer.utils.settings import NanolayerSettings diff --git a/nanolayer/cli/install.py b/nanolayer/cli/install.py index 8b580dc2..32c19d4a 100644 --- a/nanolayer/cli/install.py +++ b/nanolayer/cli/install.py @@ -140,7 +140,7 @@ def install_aptitude_packages( @app.command("gh-release") def install_gh_release_binary( repo: str, - binary_name: str, + binary_names: str = typer.Argument(None, help="comma separated list of binary names"), version: str = "latest", lib_name: Optional[str] = None, asset_regex: Optional[str] = None, @@ -149,12 +149,12 @@ def install_gh_release_binary( force: bool = False, arch: Optional[str] = None, ) -> None: - if binary_name == "": - raise typer.BadParameter("target cannot be empty string") + if binary_names == "": + raise typer.BadParameter("binary names cannot be empty string") GHReleaseInstaller.install( repo=repo, - binary_name=binary_name, + binary_names=binary_names.split(","), lib_name=lib_name, arch=arch, bin_location=bin_location, diff --git a/nanolayer/installers/gh_release/gh_release_installer.py b/nanolayer/installers/gh_release/gh_release_installer.py index ec743e5d..a001e503 100644 --- a/nanolayer/installers/gh_release/gh_release_installer.py +++ b/nanolayer/installers/gh_release/gh_release_installer.py @@ -4,25 +4,23 @@ import platform import re import shutil -import tarfile import tempfile import urllib +from abc import ABC, abstractmethod from copy import deepcopy from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Union +from tarfile import TarFile, is_tarfile +from typing import Any, Dict, List, Optional, Union +from zipfile import ZipFile, is_zipfile import invoke import semver from pydantic import BaseModel, Extra -from nanolayer.utils.invoker import Invoker from nanolayer.utils.linux_information_desk import LinuxInformationDesk logger = logging.getLogger(__name__) -from abc import ABC, abstractmethod -from tarfile import TarFile, is_tarfile -from zipfile import ZipFile, is_zipfile class AbstractExtendedArchive(ABC): @@ -214,42 +212,21 @@ class Config: GIT_VERSION_TAG_REGEX = "(?:tags\/)(v)?([0-9]+)\.([0-9]+)\.([0-9]+)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+[0-9A-Za-z-]+)?$" - class TooManyAssetsFound(Exception): - pass - - class NoAssetsFound(Exception): + class ResolveAssetsError(Exception): pass class BadArchitecture(Exception): pass - class NoPremissions(PermissionError): - pass - - class BadPlatform(PermissionError): - pass - class ReleaseVersionNotFound(Exception): pass - class MoBinaryMatchesFound(Exception): - pass - - class MultipleBinaryMatchesFound(Exception): + class ResolveBinariesError(Exception): pass class TargetExists(Exception): pass - class AddPPAsFailed(Invoker.InvokerException): - pass - - class RemovePPAsFailed(Invoker.InvokerException): - pass - - class CleanUpFailed(Invoker.InvokerException): - pass - @classmethod def get_version_tags(cls, repo: str) -> List[str]: response = invoke.run( @@ -360,7 +337,7 @@ def resolve_asset( cls, repo: str, tag: str, - target_name: str, + binary_names: List[str], asset_regex: Optional[str] = None, arch: Optional[LinuxInformationDesk.Architecture] = None, ) -> "GHReleaseInstaller.ReleaseAsset": @@ -382,7 +359,7 @@ def resolve_asset( return assets[0] if len(assets) == 0: - raise cls.NoAssetsFound( + raise cls.ResolveAssetsError( f"no matches found for asset regex: {asset_regex}" ) @@ -435,12 +412,19 @@ def resolve_asset( return assets[0] elif len(assets) == 0: - raise cls.NoAssetsFound("No matches found") + raise cls.ResolveAssetsError("No matches found") # positive filters are being run one by one, because we want to discard those # who filter out all of the remaining. positive_filters = [ + cls.FindAllRegexFilter( + name=f"contains binary name: {binary_name} ", + regex=f".*{binary_name}.*", + negative=False, + ) + for binary_name in binary_names + ] + [ cls.FindAllRegexFilter( name=arch.value, regex=ARCH_REGEX_MAP[arch], negative=False ), @@ -449,14 +433,9 @@ def resolve_asset( regex=PLATFORM_REGEX_MAP[PlatformType.LINUX], negative=False, ), - cls.FindAllRegexFilter( - name="contains target name", - regex=f".*{target_name}.*", - negative=False, - ), cls.FindAllRegexFilter( name="prefer musl", # musl is compatible across more distros - regex=f".*musl.*", + regex=".*musl.*", negative=False, ), cls.FindAllRegexFilter( @@ -501,7 +480,7 @@ def resolve_asset( assets = filtered_assets if len(assets) > 1: - raise cls.TooManyAssetsFound( + raise cls.ResolveAssetsError( f"Too many matches found: {str([asset.name for asset in assets])}" ) @@ -519,7 +498,9 @@ def resolve_and_validate_dir( if isinstance(dir_location, str): dir_location = Path(dir_location) - assert not dir_location.is_file(), f"{dir_location} should be a folder - got file" + assert ( + not dir_location.is_file() + ), f"{dir_location} should be a folder - got file" dir_location.mkdir(parents=True, exist_ok=True) @@ -563,11 +544,41 @@ def _recursive_chmod(cls, dir_location: str, permissions: str) -> None: for d in dirs: os.chmod(os.path.join(root, d), octal_permissions) + @classmethod + def _find_binary_members( + cls, archive_path: str, binary_names: List[str] + ) -> List[str]: + binary_members = [] + with Archive(archive_path) as archive_file: + # resolve target member name + if len(archive_file.get_file_members()) == 1: + if len(binary_names) > 1: + raise cls.ResolveBinariesError( + f"multiple binary names given, but only one member in archive: {archive_file.get_file_members()[0]}" + ) + + # In case of a single member, use it no matter how its named + binary_members.append(archive_file.get_file_members()[0]) + else: + for binary_name in binary_names: + target_member_names = archive_file.names_by_filename(binary_name) + if len(target_member_names) > 1: + raise cls.ResolveBinariesError( + f"multiple binary matches were found in archive: {target_member_names}" + ) + if len(target_member_names) == 0: + raise cls.ResolveBinariesError( + f"no binary named {binary_name} found in archive" + ) + binary_members.append(target_member_names[0]) + + return binary_members + @classmethod def install( cls, repo: str, - binary_name: str, + binary_names: List[str], lib_name: Optional[str] = None, bin_location: Optional[Union[str, Path]] = None, lib_location: Optional[Union[str, Path]] = None, @@ -577,16 +588,17 @@ def install( arch: Optional[str] = None, ) -> None: if lib_name is None or lib_name == "": - lib_name = binary_name - + if len(binary_names) > 1: + raise ValueError( + "If multiple binary names given, lib name has to be given as well" + ) + lib_name = binary_names[0] + if "linux" not in platform.system().lower(): - raise cls.BadPlatform( + raise ValueError( f"Currently only the Linux platform is supported (got {platform.system().lower()})" ) - if not LinuxInformationDesk.has_root_privileges(): - raise cls.NoPremissions("Please run as root or with sudo") - # will raise an exception if arch is invalid arch = cls.resolve_and_validate_architecture(arch) @@ -598,9 +610,12 @@ def install( lib_location, cls.DEFAULT_LIB_LOCATION ) - final_binary_location = bin_location.joinpath(binary_name) - if final_binary_location.exists() and not force: - raise cls.TargetExists(f"target {final_binary_location} already exists") + final_binary_locations = [] + for binary_name in binary_names: + final_binary_location = bin_location.joinpath(binary_name) + if final_binary_location.exists() and not force: + raise cls.TargetExists(f"target {final_binary_location} already exists") + final_binary_locations.append(final_binary_location) # Will raise an exception if release for the requested version does not exists version = cls.resolve_release_version(asked_version=version, repo=repo) @@ -611,7 +626,7 @@ def install( tag=version, asset_regex=asset_regex, arch=arch, - target_name=binary_name, + binary_names=binary_names, ) logger.warning("resolved asset: %s", resolved_asset.name) @@ -624,51 +639,36 @@ def install( cls.download_asset( url=resolved_asset.browser_download_url, target=temp_asset_path ) + if not Archive.is_archive(temp_asset_path): + logger.warning("asset recognized as a binary") - if Archive.is_archive(temp_asset_path): - with Archive(temp_asset_path) as archive_file: - logger.warning("asset recognized as an archive file") + if len(binary_names) > 1: + raise cls.ResolveBinariesError( + "multiple binary names given but the resolved asset is a single binary file" + ) - # resolve target member name - if len(archive_file.get_file_members()) == 1: - # In case of a single member, use it no matter how its named - target_member_name = archive_file.get_file_members()[0] - else: - target_member_names = archive_file.names_by_filename( - binary_name - ) - if len(target_member_names) > 1: - raise cls.MultipleBinaryMatchesFound( - f"multiple binary matches were found in archive {resolved_asset.name}: {target_member_names}" - ) - if len(target_member_names) == 0: - raise cls.MoBinaryMatchesFound( - f"no binary named {binary_name} found in archive {resolved_asset.name}" - ) - target_member_name = target_member_names[0] + # assumes regular binary + shutil.copyfile(temp_asset_path, final_binary_locations[0]) + cls._recursive_chmod(final_binary_locations[0], cls.BIN_PERMISSIONS) - logger.warning( - "target binary found in tar as member: %s", target_member_name - ) + else: + logger.warning("asset recognized as an archive file") - same_dir_members = archive_file.get_names_by_prefix( - os.path.dirname(target_member_name) - ) + archive_member_names = cls._find_binary_members( + temp_asset_path, binary_names + ) + assert len(archive_member_names) == len( + binary_names + ), "amount of resolved archive members does not match the amount of binary names gived" + logger.warning( + "binary members found in archive are: %s", str(archive_member_names) + ) - if len(same_dir_members) == 1: - # In case of a single file, copy it into bin location and rename it as the target name - archive_file.extract(target_member_name, temp_extraction_path) - if target_member_name != binary_name: - logger.warning( - "renaming %s to %s", target_member_name, binary_name - ) - shutil.copyfile( - temp_extraction_path.joinpath(target_member_name), - final_binary_location, + with Archive(temp_asset_path) as archive_file: + if len(archive_file.get_file_members()) > len(binary_names): + logger.warning( + "archive recognized as library (contains additional files outside of requested binaries)" ) - cls._recursive_chmod(final_binary_location, cls.BIN_PERMISSIONS) - - else: # In case other files in same dir, assume lib dir. # extracting to lib location and soft link the target into bin location target_lib_location = lib_location.joinpath(lib_name) @@ -697,29 +697,50 @@ def install( f"{target_lib_location} already exists" ) from exc - lib_binary_location = target_lib_location.joinpath( - target_member_name - ) - # execute permissions cls._recursive_chmod(target_lib_location, cls.BIN_PERMISSIONS) - - logger.warning( - "linking %s to %s", - lib_binary_location, + for ( + binary_name, final_binary_location, - ) - try: - os.symlink(lib_binary_location, final_binary_location) - except FileExistsError as exc: - os.remove(final_binary_location) - os.symlink(lib_binary_location, final_binary_location) - - else: - # assumes regular binary - shutil.copyfile(temp_asset_path, final_binary_location) - cls._recursive_chmod(final_binary_location, cls.BIN_PERMISSIONS) - - # execute permissions - # st = os.stat(final_binary_location) - # os.chmod(final_binary_location, st.st_mode | stat.S_IEXEC) + archive_member_name, + ) in zip( + binary_names, final_binary_locations, archive_member_names + ): + lib_binary_location = target_lib_location.joinpath( + archive_member_name + ) + logger.warning( + "linking %s to %s", + lib_binary_location, + final_binary_location, + ) + try: + os.symlink(lib_binary_location, final_binary_location) + except FileExistsError: + os.remove(final_binary_location) + os.symlink(lib_binary_location, final_binary_location) + else: + for ( + binary_name, + final_binary_location, + archive_member_name, + ) in zip( + binary_names, final_binary_locations, archive_member_names + ): + # In case of a single file, copy it into bin location and rename it as the target name + archive_file.extract( + archive_member_name, temp_extraction_path + ) + if archive_member_name != binary_name: + logger.warning( + "renaming %s to %s", + archive_member_name, + binary_name, + ) + shutil.copyfile( + temp_extraction_path.joinpath(archive_member_name), + final_binary_location, + ) + cls._recursive_chmod( + final_binary_location, cls.BIN_PERMISSIONS + )