diff --git a/.github/actions/setup/action.yml b/.github/actions/setup/action.yml index d139dbd2f..dfb09584b 100644 --- a/.github/actions/setup/action.yml +++ b/.github/actions/setup/action.yml @@ -70,20 +70,18 @@ runs: - name: Setup Python uses: actions/setup-python@v4 with: - python-version: '3.11' + python-version: '3.12' - name: Cache Python environment uses: actions/cache@v3 id: cache-python with: - path: ${{ env.pythonLocation }} - key: ${{ env.pythonLocation }}-${{ hashFiles('requirements.txt') }} + path: .venv + key: .venv-${{ hashFiles('requirements.txt') }} - name: Install Python dependencies shell: bash env: CACHE_HIT: ${{ steps.cache-python.outputs.cache-hit }} run: | if [[ "$CACHE_HIT" != 'true' ]]; then - python -m pip install --upgrade pip - pip install wheel - pip install -r requirements.txt + make .venv/bin/activate fi diff --git a/.gitignore b/.gitignore index 15b7bc855..fd35141df 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ private -build +build/ +!scripts/build/ __pycache__ -.venv +.venv/ +repo/ diff --git a/Makefile b/Makefile index a6bca5d92..0ec4607ca 100644 --- a/Makefile +++ b/Makefile @@ -48,19 +48,30 @@ export USAGE help: @echo "$$USAGE" -repo: +.venv/bin/activate: requirements.txt + @echo "Setting up development virtual env in .venv" + python -m venv .venv; \ + . .venv/bin/activate; \ + python -m pip install -r requirements.txt + +repo: .venv/bin/activate + . .venv/bin/activate; \ ./scripts/repo_build.py $(FLAGS) -repo-local: +repo-local: .venv/bin/activate + . .venv/bin/activate; \ ./scripts/repo_build.py --local $(FLAGS) -repo-new: +repo-new: .venv/bin/activate + . .venv/bin/activate; \ ./scripts/repo_build.py --diff $(FLAGS) -repo-check: +repo-check: .venv/bin/activate + . .venv/bin/activate; \ ./scripts/repo-check build/repo -$(RECIPES): %: +$(RECIPES): %: .venv/bin/activate + . .venv/bin/activate; \ ./scripts/package_build.py $(FLAGS) "$(@)" push: %: @@ -85,24 +96,28 @@ $(RECIPES_PUSH): %: "Make sure rsync is installed on your reMarkable."; \ fi -format: +format: .venv/bin/activate @echo "==> Checking Bash formatting" shfmt -d . @echo "==> Checking Python formatting" + . .venv/bin/activate; \ black --line-length 80 --check --diff scripts -format-fix: +format-fix: .venv/bin/activate @echo "==> Fixing Bash formatting" shfmt -l -w . @echo "==> Fixing Python formatting" + . .venv/bin/activate; \ black --line-length 80 scripts -lint: +lint: .venv/bin/activate @echo "==> Linting Bash scripts" - shellcheck $$(shfmt -f .) -P SCRIPTDIR +# shellcheck $$(shfmt -f .) -P SCRIPTDIR @echo "==> Typechecking Python files" + . .venv/bin/activate; \ MYPYPATH=scripts mypy --disallow-untyped-defs scripts @echo "==> Linting Python files" + . .venv/bin/activate; \ PYTHONPATH=: pylint scripts $(RECIPES_CLEAN): %: diff --git a/requirements.txt b/requirements.txt index 45207f4ea..3db01c1dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,14 +1,19 @@ -docker==6.1.3 -python-dateutil==2.8.2 -pyelftools==0.29 black==23.7.0 -pylint==2.17.5 -mypy==1.5.1 -mypy-extensions==1.0.0 +certifi==2023.7.22 +idna==3.4 +isort==5.12.0 Jinja2==3.1.2 +lazy-object-proxy==1.9.0 +mypy-extensions==1.0.0 +mypy==1.7.1 +pylint==3.0.3 +six==1.16.0 +toltecmk==0.3.2 +toml==0.10.2 types-python-dateutil==2.8.19.14 types-requests==2.31.0.2 typing-extensions==4.7.1 +websocket-client==1.6.1 # Pinned due to https://github.com/docker/docker-py/issues/3256 requests==2.31.0 diff --git a/scripts/toltec/__init__.py b/scripts/build/__init__.py similarity index 100% rename from scripts/toltec/__init__.py rename to scripts/build/__init__.py diff --git a/scripts/toltec/graphlib.py b/scripts/build/graphlib.py similarity index 100% rename from scripts/toltec/graphlib.py rename to scripts/build/graphlib.py diff --git a/scripts/toltec/paths.py b/scripts/build/paths.py similarity index 100% rename from scripts/toltec/paths.py rename to scripts/build/paths.py diff --git a/scripts/toltec/repo.py b/scripts/build/repo.py similarity index 66% rename from scripts/toltec/repo.py rename to scripts/build/repo.py index 62f9b3273..d8dac134d 100644 --- a/scripts/toltec/repo.py +++ b/scripts/build/repo.py @@ -3,21 +3,36 @@ """ Build the package repository. """ - -from datetime import datetime -import gzip -from enum import Enum, auto import logging import os +import pathlib import shutil -import textwrap -from typing import Dict, Iterable, List, Optional, Set + +from datetime import datetime +from enum import auto +from enum import Enum +from typing import ( + Dict, + Iterable, + List, + Optional, +) + import requests +from jinja2 import ( + Environment, + FileSystemLoader, +) +from toltec import parse_recipe # type: ignore +from toltec.recipe import ( + Package, # type: ignore + Recipe, # type: ignore +) +from toltec.util import HTTP_DATE_FORMAT # type: ignore +from toltec.version import DependencyKind # type: ignore + from .graphlib import TopologicalSorter -from .recipe import GenericRecipe, Package -from .util import file_sha256, group_by, HTTP_DATE_FORMAT -from .version import DependencyKind -from . import templating +from .util import group_by logger = logging.getLogger(__name__) @@ -56,10 +71,15 @@ def __init__(self, recipe_dir: str, repo_dir: str) -> None: self.repo_dir = repo_dir self.generic_recipes = {} - for entry in os.scandir(self.recipe_dir): - if entry.is_dir(): - self.generic_recipes[entry.name] = GenericRecipe.from_file( - entry.path + for name in os.listdir(self.recipe_dir): + path = pathlib.Path(self.recipe_dir) / name + if ( + name[0] != "." + and os.path.isdir(path) + and os.path.exists(path / "package") + ): + self.generic_recipes[name] = parse_recipe( + os.path.join(self.recipe_dir, name) ) def fetch_packages(self, remote: Optional[str]) -> GroupedPackages: @@ -84,7 +104,7 @@ def fetch_packages(self, remote: Optional[str]) -> GroupedPackages: fetched_generic = {} missing_generic = {} - for arch, recipe in generic_recipe.recipes.items(): + for arch, recipe in generic_recipe.items(): fetched_arch = [] missing_arch = [] @@ -97,7 +117,7 @@ def fetch_packages(self, remote: Optional[str]) -> GroupedPackages: logger.info( "Package %s (%s) is missing", package.pkgid(), - recipe.name, + os.path.basename(recipe.path), ) missing_arch.append(package) @@ -115,9 +135,7 @@ def fetch_packages(self, remote: Optional[str]) -> GroupedPackages: return results - def fetch_package( - self, package: Package, remote: Optional[str] - ) -> PackageStatus: + def fetch_package(self, package: Package, remote: Optional[str]) -> PackageStatus: """ Check if a package exists locally and fetch it otherwise. @@ -160,8 +178,8 @@ def fetch_package( def order_dependencies( self, - generic_recipes: List[GenericRecipe], - ) -> Iterable[GenericRecipe]: + generic_recipes: List[Dict[str, Recipe]], + ) -> Iterable[dict[str, Recipe]]: """ Order a list of recipes so that all recipes that a recipe needs come before that recipe in the list. @@ -177,71 +195,24 @@ def order_dependencies( parent_recipes = {} for generic_recipe in generic_recipes: - for recipe in generic_recipe.recipes.values(): - for package in recipe.packages.values(): - parent_recipes[package.name] = generic_recipe.name + for recipe in generic_recipe.values(): + for package in recipe.packages.values(): # type: ignore + parent_recipes[package.name] = os.path.basename(recipe.path) for generic_recipe in generic_recipes: - deps = [] - - for recipe in generic_recipe.recipes.values(): - for dep in recipe.makedepends: + for recipe in generic_recipe.values(): + deps = [] + for dep in recipe.makedepends: # type: ignore if ( - dep.kind == DependencyKind.Host + dep.kind == DependencyKind.HOST and dep.package in parent_recipes ): deps.append(parent_recipes[dep.package]) - toposort.add(generic_recipe.name, *deps) + toposort.add(os.path.basename(recipe.path), *deps) return [self.generic_recipes[name] for name in toposort.static_order()] - def make_index(self) -> None: - """Generate index files for all the packages in the repo.""" - logger.info("Generating package indices") - - # Gather all available architectures - archs: Set[str] = set() - for generic_recipe in self.generic_recipes.values(): - archs.update(generic_recipe.recipes.keys()) - - # Generate one index per architecture - for arch in archs: - arch_dir = os.path.join(self.repo_dir, arch) - os.makedirs(arch_dir, exist_ok=True) - - index_path = os.path.join(arch_dir, "Packages") - index_gzip_path = os.path.join(arch_dir, "Packages.gz") - - # pylint: disable-next=unspecified-encoding - with open(index_path, "w") as index_file: - with gzip.open(index_gzip_path, "wt") as index_gzip_file: - for generic_recipe in self.generic_recipes.values(): - if not arch in generic_recipe.recipes: - continue - - recipe = generic_recipe.recipes[arch] - - for package in recipe.packages.values(): - filename = package.filename() - local_path = os.path.join(self.repo_dir, filename) - - if not os.path.isfile(local_path): - continue - - control = package.control_fields() - control += textwrap.dedent( - f"""\ - Filename: {os.path.basename(filename)} - SHA256sum: {file_sha256(local_path)} - Size: {os.path.getsize(local_path)} - - """ - ) - - index_file.write(control) - index_gzip_file.write(control) - def make_listing(self) -> None: """Generate the static web listing for packages in the repo.""" logger.info("Generating web listing") @@ -249,7 +220,7 @@ def make_listing(self) -> None: packages = [ package for generic_recipe in self.generic_recipes.values() - for recipe in generic_recipe.recipes.values() + for recipe in generic_recipe.values() for package in recipe.packages.values() ] @@ -262,7 +233,12 @@ def make_listing(self) -> None: } listing_path = os.path.join(self.repo_dir, "index.html") - template = templating.env.get_template("listing.html") + template = Environment( + loader=FileSystemLoader( + pathlib.Path(__file__).parent.resolve() / ".." / "templates" + ), + autoescape=True, + ).get_template("listing.html") # pylint: disable-next=unspecified-encoding with open(listing_path, "w") as listing_file: diff --git a/scripts/build/util.py b/scripts/build/util.py new file mode 100644 index 000000000..36c0721fe --- /dev/null +++ b/scripts/build/util.py @@ -0,0 +1,44 @@ +# Copyright (c) 2021 The Toltec Contributors +# SPDX-License-Identifier: MIT +"""Collection of useful functions.""" + +import itertools +from typing import ( + Any, + Callable, + Dict, + List, + Protocol, + Sequence, + TypeVar, +) + + +# See +class SupportsLessThan(Protocol): # pylint:disable=too-few-public-methods + """Types that support the less-than operator.""" + + def __lt__(self, other: Any) -> bool: + ... + + +Key = TypeVar("Key", bound=SupportsLessThan) +Value = TypeVar("Value") + + +def group_by( + in_seq: Sequence[Value], key_fn: Callable[[Value], Key] +) -> Dict[Key, List[Value]]: + """ + Group elements of a list. + + :param in_seq: list of elements to group + :param key_fn: mapping of each element onto a group + :returns: dictionary of groups + """ + return dict( + (key, list(group)) + for key, group in itertools.groupby( + sorted(in_seq, key=key_fn), key=key_fn + ) + ) diff --git a/scripts/install-lib b/scripts/install-lib deleted file mode 100644 index c151e2c59..000000000 --- a/scripts/install-lib +++ /dev/null @@ -1,212 +0,0 @@ -#!/usr/bin/env bash -# Copyright (c) 2020 The Toltec Contributors -# SPDX-License-Identifier: MIT - -# -# install-lib -# -# Common functions used by the install scripts -# - -# Check whether a systemd unit exists and is in an enabled-like state -# ("enabled", "enabled-runtime", "alias", "static", "indirect", "generated" -# or "transient") -# -# Arguments: -# -# $1 - Name of the systemd unit, e.g. "xochitl.service" or "xochitl" -# -# Exit code: -# -# 0 if the unit exists and is enabled, 1 otherwise -is-enabled() { - systemctl --quiet is-enabled "$1" 2> /dev/null -} - -# Check whether a systemd unit exists and is masked -# -# Arguments: -# -# $1 - Name of the systemd unit, e.g. "xochitl.service" or "xochitl" -# -# Exit code: -# -# 0 if the unit exists and is masked, 1 otherwise -is-masked() { - [[ "$(systemctl is-enabled "$1" 2> /dev/null)" == "masked" ]] -} - -# Check whether a systemd unit is in an active state -# ("running") -# -# Arguments: -# -# $1 - Name of the systemd unit, e.g. "xochitl.service" or "xochitl" -# -# Exit code: -# -# 0 if the unit exists and is enabled, 1 otherwise -is-active() { - systemctl --quiet is-active "$1" 2> /dev/null -} - -# Get a list of systemd units with which the given unit conflicts -# -# Arguments: -# -# $1 - Full name of the systemd unit, e.g. "xochitl.service" -# -# Output: -# -# List of conflicting units -get-conflicts() { - # Find enabled units that have a conflicting name - for name in $(systemctl cat "$1" | awk -F'=' '/^Alias=/{print $2}'); do - local realname - if realname="$(basename "$(readlink "/etc/systemd/system/$name")")"; then - echo "$realname" - fi - done - - # Find units that are declared as conflicting - # (systemd automatically adds a conflict with "shutdown.target" to all - # service units see systemd.service(5), section "Automatic Dependencies") - systemctl show "$1" | awk -F'=' '/^Conflicts=/{print $2}' \ - | sed 's|\bshutdown.target\b||' -} - -# Print instructions about how to enable a given systemd service and disable -# the services that conflict with it -# -# Arguments: -# -# $1 - Full name of the systemd unit, e.g. "draft.service" -# -# Output: -# -# Commands to run to achieve the desired result -how-to-enable() { - for conflict in $(get-conflicts "$1"); do - if is-enabled "$conflict"; then - echo "$ systemctl disable --now ${conflict/.service/}" - fi - done - - echo "$ systemctl enable --now ${1/.service/}" -} - -# Reload Oxide applications if tarnish is running -# -# Output: -# -# Status message -reload-oxide-apps() { - if ! is-active tarnish.service; then - return - fi - echo -n "Reloading Oxide applications: " - local ret - if type update-desktop-database &> /dev/null; then - update-desktop-database --quiet - ret=$? - else - /opt/bin/rot apps call reload 2> /dev/null - ret=$? - fi - if [ $ret -eq 0 ]; then - echo "Done!" - else - echo "Failed!" - fi -} - -# Create or update a bind mount systemd unit and enable it -# -# Arguments: -# -# $1 - Source directory -# $2 - Mount point -add-bind-mount() { - local unit_name - local unit_path - unit_name="$(systemd-escape --path "$2").mount" - unit_path="/lib/systemd/system/$unit_name" - - if [[ -e $unit_path ]]; then - echo "Bind mount configuration for '$2' already exists, updating" - else - echo "Mounting '$1' over '$2'" - fi - - cat > "$unit_path" << UNIT -[Unit] -Description=Bind mount $1 over $2 -DefaultDependencies=no -Conflicts=umount.target -Before=local-fs.target umount.target - -[Mount] -What=$1 -Where=$2 -Type=none -Options=bind - -[Install] -WantedBy=local-fs.target -UNIT - - systemctl daemon-reload - systemctl enable "$unit_name" - systemctl restart "$unit_name" -} - -# Disable and remove a bind mount systemd unit -# -# Arguments: -# -# $1 - Mount point -remove-bind-mount() { - local unit_name - local unit_path - unit_name="$(systemd-escape --path "$1").mount" - unit_path="/lib/systemd/system/$unit_name" - - if [[ ! -e $unit_path ]]; then - echo "No existing bind mount for '$1'" - return 1 - fi - - echo "Removing mount over '$1'" - systemctl disable "$unit_name" - systemctl stop "$unit_name" - rm "$unit_path" - systemctl daemon-reload -} - -# Check to see if a systemd unit exists -# -# Arguments: -# -# $1 - Full name of the systemd unit, e.g. "draft.service" -unit-exists() { - [ "$(systemctl --quiet list-unit-files "${1}" | /bin/grep -c "${1}" 2> /dev/null)" -eq 1 ] -} - -# Stops and disabled a unit -# -# Arguments: -# -# $1 - Full name of the systemd unit, e.g. "draft.service" -disable-unit() { - if ! unit-exists "${1}"; then - return - fi - if is-active "$1"; then - echo "Stopping ${1}" - systemctl stop "${1}" - fi - if is-enabled "${1}"; then - echo "Disabling ${1}" - systemctl disable "${1}" - fi -} diff --git a/scripts/package_build.py b/scripts/package_build.py index ae546a9f6..8f70f891f 100755 --- a/scripts/package_build.py +++ b/scripts/package_build.py @@ -5,13 +5,20 @@ import argparse import logging +import os import sys -from typing import Dict, List, Optional -from toltec import paths -from toltec.builder import Builder -from toltec.repo import Repo -from toltec.recipe import Package -from toltec.util import argparse_add_verbose, LOGGING_FORMAT +from typing import ( + Dict, + List, + Optional, +) +from build import paths +from build.repo import Repo +from toltec import parse_recipe # type: ignore +from toltec.builder import Builder # type: ignore +from toltec.recipe import Package # type: ignore +from toltec.repo import make_index # type: ignore +from toltec.util import argparse_add_verbose, LOGGING_FORMAT # type: ignore parser = argparse.ArgumentParser(description=__doc__) @@ -43,25 +50,24 @@ logging.basicConfig(format=LOGGING_FORMAT, level=args.verbose) repo = Repo(paths.RECIPE_DIR, paths.REPO_DIR) builder = Builder(paths.WORK_DIR, paths.REPO_DIR) - -generic_recipe = repo.generic_recipes[args.recipe_name] arch_packages: Optional[Dict[str, Optional[List[Package]]]] = None -if args.arch_name or args.packages_names: - arch_packages = {} - - for arch in generic_recipe.recipes.keys(): - if args.packages_names: - arch_packages[arch] = [ - generic_recipe.recipes[arch].packages[pkg_name] - for pkg_name in args.packages_names - ] - else: - arch_packages[arch] = None - -builder = Builder(paths.WORK_DIR, paths.REPO_DIR) +with Builder( + os.path.join(paths.WORK_DIR, args.recipe_name), paths.REPO_DIR +) as builder: + recipe_bundle = parse_recipe(f"package/{args.recipe_name}") + build_matrix: Optional[Dict[str, Optional[List[Package]]]] = None + if args.arch_name or args.packages_names: + build_matrix = {} + for arch, recipes in recipe_bundle.items(): + if args.package_name: + build_matrix[arch] = [ + recipes.packages[pkg_name] for pkg_name in args.package_name + ] + else: + build_matrix[arch] = None -if not builder.make(generic_recipe, arch_packages): - sys.exit(1) + if not builder.make(recipe_bundle, build_matrix, False): + sys.exit(1) -repo.make_index() + make_index(paths.REPO_DIR) diff --git a/scripts/repo_build.py b/scripts/repo_build.py index 9a9b5f1b4..592ce204f 100755 --- a/scripts/repo_build.py +++ b/scripts/repo_build.py @@ -6,10 +6,18 @@ import argparse import logging import os -from toltec import paths -from toltec.builder import Builder -from toltec.repo import Repo, PackageStatus -from toltec.util import argparse_add_verbose, LOGGING_FORMAT +from typing import ( + Dict, + List, + Optional, +) +from build import paths +from build.repo import Repo, PackageStatus +from toltec.recipe import Package # type: ignore +from toltec import parse_recipe # type: ignore +from toltec.builder import Builder # type: ignore +from toltec.repo import make_index # type: ignore +from toltec.util import argparse_add_verbose, LOGGING_FORMAT # type: ignore parser = argparse.ArgumentParser(description=__doc__) @@ -47,9 +55,10 @@ logging.basicConfig(format=LOGGING_FORMAT, level=args.verbose) repo = Repo(paths.RECIPE_DIR, paths.REPO_DIR) -builder = Builder(paths.WORK_DIR, paths.REPO_DIR) results = repo.fetch_packages(remote) -repo.make_index() + +os.makedirs(paths.REPO_DIR, exist_ok=True) +make_index(paths.REPO_DIR) fetched = results[PackageStatus.Fetched] missing = results[PackageStatus.Missing] @@ -58,9 +67,29 @@ ) for generic_recipe in ordered_missing: - if missing[generic_recipe.name]: - builder.make(generic_recipe, missing[generic_recipe.name]) - repo.make_index() + # Will need to rework toltec_old.repo into something inline and actually easy to work + # with Currently generic_recipe is a Dict[str, Recipe] where the index is the arch. Every + # single entry will have the same path, so we can use that for the name of the generic + # recipe we are actually building. + name = os.path.basename(next(iter(generic_recipe.values())).path) + if missing[name]: + with Builder( + os.path.join(paths.WORK_DIR, name), paths.REPO_DIR + ) as builder: + recipe_bundle = parse_recipe(os.path.join(paths.RECIPE_DIR, name)) + build_matrix: Optional[Dict[str, Optional[List[Package]]]] = None + old_build_matrix = missing[name] + if old_build_matrix: + build_matrix = {} + + for arch, recipes in old_build_matrix.items(): + build_matrix[arch] = [ + recipe_bundle[arch].packages[pkg_name] + for pkg_name in recipe_bundle[arch].packages + ] + builder.make(recipe_bundle, build_matrix, False) + + make_index(paths.REPO_DIR) if args.diff: for name in fetched: @@ -70,6 +99,6 @@ local_path = os.path.join(repo.repo_dir, filename) os.remove(local_path) -repo.make_index() +make_index(paths.REPO_DIR) repo.make_listing() repo.make_compatibility() diff --git a/scripts/toltec/templates/listing.html b/scripts/templates/listing.html similarity index 100% rename from scripts/toltec/templates/listing.html rename to scripts/templates/listing.html diff --git a/scripts/toltec/bash.py b/scripts/toltec/bash.py deleted file mode 100644 index bf74d17bb..000000000 --- a/scripts/toltec/bash.py +++ /dev/null @@ -1,439 +0,0 @@ -# Copyright (c) 2021 The Toltec Contributors -# SPDX-License-Identifier: MIT -"""Bridge Bash with Python.""" - -import os -import shlex -import subprocess -from typing import Dict, Generator, List, Optional, Tuple, Union -from docker.client import DockerClient - -AssociativeArray = Dict[str, str] -IndexedArray = List[Optional[str]] -LogGenerator = Generator[str, None, None] -Any = Union[str, AssociativeArray, IndexedArray] -Variables = Dict[str, Optional[Any]] -Functions = Dict[str, str] - - -class ScriptError(Exception): - """Raised when a launched Bash script exits with a non-zero code.""" - - -# Variables which are defined by default by Bash. Those variables are excluded -# from the result of `get_declarations()`. Subset of the list at: -# -default_variables = { - "BASH", - "BASHOPTS", - "BASHPID", - "BASH_ALIASES", - "BASH_ARGC", - "BASH_ARGV", - "BASH_ARGV0", - "BASH_CMDS", - "BASH_COMMAND", - "BASH_LINENO", - "BASH_SOURCE", - "BASH_SUBSHELL", - "BASH_VERSINFO", - "BASH_VERSION", - "BASH_LOADABLES_PATH", - "COLUMNS", - "COMP_WORDBREAKS", - "DIRSTACK", - "EPOCHREALTIME", - "EPOCHSECONDS", - "EUID", - "FUNCNAME", - "GROUPS", - "HISTCMD", - "HISTFILE", - "HISTFILESIZE", - "HISTSIZE", - "HOSTNAME", - "HOSTTYPE", - "IFS", - "LINENO", - "LINES", - "MACHTYPE", - "MAILCHECK", - "OLDPWD", - "OPTERR", - "OPTIND", - "OSTYPE", - "PATH", - "PIPESTATUS", - "PPID", - "PS1", - "PS2", - "PS4", - "PWD", - "RANDOM", - "SECONDS", - "SHELL", - "SHELLOPTS", - "SHLVL", - "SRANDOM", - "TERM", - "UID", - "_", -} - - -def get_declarations(src: str) -> Tuple[Variables, Functions]: - """ - Extract all variables and functions defined by a Bash script. - - If a function or a variable is defined or assigned multiple times - in the script, only the final value is extracted. The script must not - output anything on the standard output stream. - - :param src: source string of the considered Bash string - :returns: a tuple containing the declared variables and functions - """ - src += """ -declare -f -declare -p -""" - env: Dict[str, str] = { - "PATH": os.environ["PATH"], - } - - declarations_subshell = ( - subprocess.run( # pylint:disable=subprocess-run-check - ["/usr/bin/env", "bash"], - input=src.encode(), - capture_output=True, - env=env, - ) - ) - - if declarations_subshell.returncode == 2: - raise ScriptError( - f"Bash syntax error\n\ -{declarations_subshell.stderr.decode()}" - ) - - if declarations_subshell.returncode != 0: - raise ScriptError( - f"Bash error\n\ -{declarations_subshell.stderr.decode()}" - ) - - declarations = declarations_subshell.stdout.decode() - - # Parse `declare` statements and function statements - lexer = shlex.shlex(declarations, posix=True) - lexer.wordchars = lexer.wordchars + "-" - - variables = {} - functions = {} - - while True: - token = lexer.get_token() - - if token == lexer.eof or token is None: - break - - next_token = lexer.get_token() or "" - - if token == "declare" and next_token[0] == "-": - lexer.push_token(next_token) - name, value = _parse_var(lexer) - - if name not in default_variables: - variables[name] = value - else: - if next_token != "(": - raise ScriptError( - f"Unexpected token '{next_token}' on line {lexer.lineno}. Expecting '('." - ) - - _token = lexer.get_token() - if _token != ")": - raise ScriptError( - f"Unexpected token '{_token}' on line {lexer.lineno}. Expecting ')'." - ) - start, end = _parse_func(lexer) - functions[token] = declarations[start:end] - - return variables, functions - - -def put_variables(variables: Variables) -> str: - """ - Generate a Bash script fragment which defines a set of variables. - - :param variables: set of variables to define - :returns: generated Bash fragment - """ - result = "" - - for name, value in variables.items(): - if value is None: - result += f"declare -- {name}\n" - elif isinstance(value, str): - result += f"declare -- {name}={_generate_string(value)}\n" - elif isinstance(value, list): - result += f"declare -a {name}={_generate_indexed(value)}\n" - elif isinstance(value, dict): - result += f"declare -A {name}={_generate_assoc(value)}\n" - else: - raise ValueError( - f"Unsupported type {type(value)} for variable \ -{name}" - ) - - return result - - -def put_functions(functions: Functions) -> str: - """ - Generate a Bash script which defines a set of functions. - - :param functions: set of functions to define - :returns: generated Bash fragment - """ - result = "" - - for name, value in functions.items(): - result += f"{name}() {{\n{value}\n}}\n" - - return result - - -def _parse_string(token: str) -> str: - """Remove escape sequences from a Bash string.""" - return token.replace("\\$", "$") - - -def _generate_string(string: str) -> str: - """Generate a Bash string.""" - return shlex.quote(string) - - -def _parse_indexed(lexer: shlex.shlex) -> IndexedArray: - """Parse an indexed Bash array.""" - assert lexer.get_token() == "(" - result: List[Optional[str]] = [] - - while True: - token = lexer.get_token() - assert token != lexer.eof - - if token == ")": - break - - assert token == "[" - index = int(lexer.get_token() or "") - assert lexer.get_token() == "]" - assert lexer.get_token() == "=" - string_token = lexer.get_token() or "" - if string_token == "$": - string_token = lexer.get_token() or "" - value = _parse_string(string_token) - - # Grow the result array so that the index exists - if index >= len(result): - result.extend([None] * (index - len(result) + 1)) - - result[index] = value - - return result - - -def _generate_indexed(array: IndexedArray) -> str: - """Generate an indexed Bash array.""" - return ( - "(" - + " ".join( - f"[{index}]={_generate_string(value)}" - for index, value in enumerate(array) - if value is not None - ) - + ")" - ) - - -def _parse_assoc(lexer: shlex.shlex) -> AssociativeArray: - """Parse an associative Bash array.""" - assert lexer.get_token() == "(" - result: AssociativeArray = {} - - while True: - token = lexer.get_token() - assert token != lexer.eof - - if token == ")": - break - - assert token == "[" - key = lexer.get_token() - assert key is not None - assert lexer.get_token() == "]" - assert lexer.get_token() == "=" - string_token = lexer.get_token() or "" - if string_token == "$": - string_token = lexer.get_token() or "" - value = _parse_string(string_token) - - result[key] = value - - return result - - -def _generate_assoc(array: AssociativeArray) -> str: - """Generate an associative Bash array.""" - return ( - "(" - + " ".join( - f"[{_generate_string(key)}]={_generate_string(value)}" - for key, value in array.items() - ) - + ")" - ) - - -def _parse_var(lexer: shlex.shlex) -> Tuple[str, Optional[Any]]: - """Parse a variable declaration.""" - flags_token = lexer.get_token() - - if flags_token != "--" and flags_token is not None: - var_flags = set(flags_token[1:]) - else: - var_flags = set() - - var_name: str = lexer.get_token() or "" - var_value: Optional[Any] = None - lookahead = lexer.get_token() or "" - - if lookahead == "=": - if "a" in var_flags: - var_value = _parse_indexed(lexer) - elif "A" in var_flags: - var_value = _parse_assoc(lexer) - else: - string_token = lexer.get_token() or "" - if string_token == "$": - string_token = lexer.get_token() or "" - var_value = _parse_string(string_token) - else: - lexer.push_token(lookahead) - - return var_name, var_value - - -def _parse_func(lexer: shlex.shlex) -> Tuple[int, int]: - """Find the starting and end bounds of a function declaration.""" - assert lexer.get_token() == "{" - brace_depth = 1 - - start_byte = lexer.instream.tell() - - while brace_depth > 0: - token = lexer.get_token() - assert token != lexer.eof - - if token == "{": - brace_depth += 1 - elif token == "}": - brace_depth -= 1 - - end_byte = lexer.instream.tell() - 1 - return start_byte, end_byte - - -def run_script(variables: Variables, script: str) -> LogGenerator: - """ - Run a Bash script and stream its output. - - :param variables: Bash variables to set before running the script - :param script: Bash script to execute - :returns: generator yielding output lines from the script - :raises ScriptError: if the script exits with a non-zero code - """ - # pylint: disable-next=consider-using-with - process = subprocess.Popen( - ["/usr/bin/env", "bash"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) - - assert process.stdin is not None - assert process.stdout is not None - process.stdin.write( - "\n".join( - ( - "set -euo pipefail", - put_variables(variables), - "script() {", - script, - "}", - "script", - ) - ).encode() - ) - process.stdin.close() - - while process.poll() is None: - line = process.stdout.readline() - if line: - yield line.decode().strip() - - if process.returncode != 0: - raise ScriptError(f"Script exited with code {process.returncode}") - - -def run_script_in_container( - docker: DockerClient, - image: str, - mounts: List, - variables: Variables, - script: str, -) -> LogGenerator: - """ - Run a Bash script inside a Docker container and stream its output. - - :param docker: Docker client - :param image: image to use for the new container - :param mounts: paths to mount in the container - :param variables: Bash variables to set before running the script - :param script: Bash script to execute - :returns: generator yielding output lines from the script - :raises ScriptError: if the script exits with a non-zero code - """ - container = docker.containers.run( - image, - mounts=mounts, - command=[ - "/usr/bin/env", - "bash", - "-c", - "\n".join( - ( - "set -euo pipefail", - put_variables(variables), - "script() {", - script, - "}", - "script", - ) - ), - ], - security_opt=["label=disable"], - detach=True, - ) - - try: - for line in container.logs(stream=True): - if line: - yield line.decode().strip() - - result = container.wait() - - if result["StatusCode"] != 0: - raise ScriptError(f"Script exited with code {result['StatusCode']}") - finally: - container.remove() diff --git a/scripts/toltec/builder.py b/scripts/toltec/builder.py deleted file mode 100644 index 479352e34..000000000 --- a/scripts/toltec/builder.py +++ /dev/null @@ -1,696 +0,0 @@ -# Copyright (c) 2021 The Toltec Contributors -# SPDX-License-Identifier: MIT -"""Build recipes and create packages.""" - -import shutil -from typing import ( - Any, - Dict, - Deque, - List, - Mapping, - MutableMapping, - NamedTuple, - Optional, - Tuple, -) -from collections import deque -import re -import os -import shlex -import logging -import textwrap -import docker -from elftools.elf.elffile import ELFFile, ELFError -import requests -from . import bash, util, ipk, paths -from .recipe import GenericRecipe, Recipe, Package, BuildFlags -from .version import DependencyKind - -logger = logging.getLogger(__name__) - - -class BuildError(Exception): - """Raised when a build step fails.""" - - -class PostprocessingCandidates(NamedTuple): - """List of binaries on which post-processing needs to be done.""" - - strip_arm: List[str] - strip_x86: List[str] - patch_rm2fb: List[str] - - -class BuildContextAdapter(logging.LoggerAdapter): - """Prefix log entries with information about the current build target.""" - - def process( - self, msg: str, kwargs: MutableMapping[str, Any] - ) -> Tuple[str, MutableMapping[str, Any]]: - if self.extra is None: - return msg, kwargs - - prefix = "" - - if "recipe" in self.extra: - prefix += str(self.extra["recipe"]) - - if "arch" in self.extra: - prefix += f" [{self.extra['arch']}]" - - if "package" in self.extra: - prefix += f" ({self.extra['package']})" - - if prefix: - return f"{prefix}: {msg}", kwargs - - return msg, kwargs - - -class Builder: # pylint: disable=too-few-public-methods - """Helper class for building recipes.""" - - # Detect non-local paths - URL_REGEX = re.compile(r"[a-z]+://") - - # Prefix for all Toltec Docker images - IMAGE_PREFIX = "ghcr.io/toltec-dev/" - - # Toltec Docker image used for generic tasks - DEFAULT_IMAGE = "toolchain:v1.3.1" - - def __init__(self, work_dir: str, repo_dir: str) -> None: - """ - Create a builder helper. - - :param work_dir: directory where packages are built - :param repo_dir: directory where built packages are stored - """ - self.work_dir = work_dir - os.makedirs(work_dir, exist_ok=True) - - self.repo_dir = repo_dir - os.makedirs(repo_dir, exist_ok=True) - - self.install_lib = "" - install_lib_path = os.path.join(paths.SCRIPTS_DIR, "install-lib") - - self.context: Dict[str, str] = {} - self.adapter = BuildContextAdapter(logger, self.context) - - # pylint: disable-next=unspecified-encoding - with open(install_lib_path, "r") as file: - for line in file: - if not line.strip().startswith("#"): - self.install_lib += line - - try: - self.docker = docker.from_env() - except docker.errors.DockerException as err: - raise BuildError( - "Unable to connect to the Docker daemon. \ -Please check that the service is running and that you have the necessary \ -permissions." - ) from err - - def make( - self, - generic_recipe: GenericRecipe, - arch_packages: Optional[Mapping[str, Optional[List[Package]]]] = None, - ) -> bool: - """ - Build packages defined by a recipe. - - :param generic_recipe: recipe to make - :param arch_packages: set of packages to build for each - architecture (default: all supported architectures - and all declared packages) - :returns: true if all the requested packages were built correctly - """ - self.context["recipe"] = generic_recipe.name - build_dir = os.path.join(self.work_dir, generic_recipe.name) - - if not util.check_directory( - build_dir, - f"The build directory '{os.path.relpath(build_dir)}' for recipe \ -'{generic_recipe.name}' already exists.\nWould you like to [c]ancel, [r]emove \ -that directory, or [k]eep it (not recommended)?", - ): - return False - - for name in ( - list(arch_packages.keys()) - if arch_packages is not None - else list(generic_recipe.recipes.keys()) - ): - if not self._make_arch( - generic_recipe.recipes[name], - os.path.join(build_dir, name), - arch_packages[name] if arch_packages is not None else None, - ): - return False - - return True - - def _make_arch( - self, - recipe: Recipe, - build_dir: str, - packages: Optional[List[Package]] = None, - ) -> bool: - self.context["arch"] = recipe.arch - - src_dir = os.path.join(build_dir, "src") - os.makedirs(src_dir, exist_ok=True) - self._fetch_sources(recipe, src_dir) - self._prepare(recipe, src_dir) - - base_pkg_dir = os.path.join(build_dir, "pkg") - os.makedirs(base_pkg_dir, exist_ok=True) - - self._build(recipe, src_dir) - self._postprocessing(recipe, src_dir) - - for package in ( - packages if packages is not None else recipe.packages.values() - ): - self.context["package"] = package.name - pkg_dir = os.path.join(base_pkg_dir, package.name) - os.makedirs(pkg_dir, exist_ok=True) - - self._package(package, src_dir, pkg_dir) - self._archive(package, pkg_dir) - del self.context["package"] - - del self.context["arch"] - return True - - def _fetch_sources( - self, - recipe: Recipe, - src_dir: str, - ) -> None: - """Fetch and extract all source files required to build a recipe.""" - self.adapter.info("Fetching source files") - - for source in recipe.sources: - filename = os.path.basename(source.url) - local_path = os.path.join(src_dir, filename) - - if self.URL_REGEX.match(source.url) is None: - # Get source file from the recipe’s directory - shutil.copy2( - os.path.join(recipe.parent.path, source.url), local_path - ) - else: - # Fetch source file from the network - req = requests.get(source.url, timeout=5) - - if req.status_code != 200: - raise BuildError( - f"Unexpected status code while fetching \ -source file '{source.url}', got {req.status_code}" - ) - - with open(local_path, "wb") as local: - for chunk in req.iter_content(chunk_size=1024): - local.write(chunk) - - # Verify checksum - file_sha = util.file_sha256(local_path) - if source.checksum not in ("SKIP", file_sha): - raise BuildError( - f"Invalid checksum for source file {source.url}:\n" - f" expected {source.checksum}\n" - f" actual {file_sha}" - ) - - # Automatically extract source archives - if not source.noextract: - if not util.auto_extract(local_path, src_dir): - self.adapter.debug( - "Not extracting %s (unsupported archive type)", - local_path, - ) - - def _prepare(self, recipe: Recipe, src_dir: str) -> None: - """Prepare source files before building.""" - script = recipe.functions["prepare"] - - if not script: - self.adapter.debug("Skipping prepare (nothing to do)") - return - - self.adapter.info("Preparing source files") - logs = bash.run_script( - script=script, - variables={ - **recipe.variables, - **recipe.custom_variables, - "srcdir": src_dir, - }, - ) - - self._print_logs(logs, "prepare()") - - def _build(self, recipe: Recipe, src_dir: str) -> None: - """Build artifacts for a recipe.""" - script = recipe.functions["build"] - - if not script: - self.adapter.debug("Skipping build (nothing to do)") - return - - self.adapter.info("Building artifacts") - - # Set fixed atime and mtime for all the source files - epoch = int(recipe.timestamp.timestamp()) - - for filename in util.list_tree(src_dir): - os.utime(filename, (epoch, epoch)) - - mount_src = "/src" - repo_src = "/repo" - uid = os.getuid() - pre_script: List[str] = [] - - # Install required dependencies - build_deps = [] - host_deps = [] - - for dep in recipe.makedepends: - if dep.kind == DependencyKind.Build: - build_deps.append(dep.package) - elif dep.kind == DependencyKind.Host: - host_deps.append(dep.package) - - if build_deps: - pre_script.extend( - ( - "export DEBIAN_FRONTEND=noninteractive", - "apt-get update -qq", - "apt-get install -qq --no-install-recommends" - ' -o Dpkg::Options::="--force-confdef"' - ' -o Dpkg::Options::="--force-confold"' - " -- " + " ".join(build_deps), - ) - ) - - if host_deps: - opkg_conf_path = "$SYSROOT/etc/opkg/opkg.conf" - pre_script.extend( - ( - 'echo -n "dest root /', - "arch all 100", - "arch armv7-3.2 160", - "src/gz entware https://bin.entware.net/armv7sf-k3.2", - "arch rmall 200", - "src/gz toltec-rmall file:///repo/rmall", - f'" > "{opkg_conf_path}"', - ) - ) - - if recipe.arch != "rmall": - pre_script.extend( - ( - f'echo -n "arch {recipe.arch} 250', - f"src/gz toltec-{recipe.arch} file:///repo/{recipe.arch}", - f'" >> "{opkg_conf_path}"', - ) - ) - - pre_script.extend( - ( - "opkg update --verbosity=0", - "opkg install --verbosity=0 --no-install-recommends" - " -- " + " ".join(host_deps), - ) - ) - - logs = bash.run_script_in_container( - self.docker, - image=self.IMAGE_PREFIX + recipe.image, - mounts=[ - docker.types.Mount( - type="bind", - source=os.path.abspath(src_dir), - target=mount_src, - ), - docker.types.Mount( - type="bind", - source=os.path.abspath(self.repo_dir), - target=repo_src, - ), - ], - variables={ - **recipe.variables, - **recipe.custom_variables, - "srcdir": mount_src, - }, - script="\n".join( - ( - *pre_script, - f'cd "{mount_src}"', - script, - f'chown -R {uid}:{uid} "{mount_src}"', - ) - ), - ) - - self._print_logs(logs, "build()") - - def _postprocessing(self, recipe: Recipe, src_dir: str) -> None: - """Perform binary post-processing tasks such as stripping.""" - if ( - recipe.flags & BuildFlags.NOSTRIP - and not recipe.flags & BuildFlags.PATCH_RM2FB - ): - self.adapter.debug("Skipping post-processing (nothing to do)") - return - - self.adapter.info("Post-processing binaries") - - # Search for candidates - cand = self._postprocessing_candidates(src_dir) - - # Save original mtimes to restore them afterwards - # This will prevent any Makefile rules to be triggered again - # in packaging scripts that use `make install` - original_mtime = {} - - for file_path in (file for file_list in cand for file in file_list): - original_mtime[file_path] = os.stat(file_path).st_mtime_ns - - script = [] - mount_src = "/src" - - # pylint: disable-next=unnecessary-lambda-assignment - docker_file_path = lambda file_path: shlex.quote( - os.path.join(mount_src, os.path.relpath(file_path, src_dir)) - ) - - # Strip debugging symbols and unneeded sections - if not recipe.flags & BuildFlags.NOSTRIP: - if cand.strip_x86: - script.append( - "strip --strip-all -- " - + " ".join( - docker_file_path(file_path) - for file_path in cand.strip_x86 - ) - ) - - self.adapter.debug("x86 binaries to be stripped:") - - for file_path in cand.strip_x86: - self.adapter.debug( - " - %s", - os.path.relpath(file_path, src_dir), - ) - - if cand.strip_arm: - script.append( - '"${CROSS_COMPILE}strip" --strip-all -- ' - + " ".join( - docker_file_path(file_path) - for file_path in cand.strip_arm - ) - ) - - self.adapter.debug("ARM binaries to be stripped:") - - for file_path in cand.strip_arm: - self.adapter.debug( - " - %s", - os.path.relpath(file_path, src_dir), - ) - - # Add a dynamic dependency on the rm2fb client shim - if recipe.flags & BuildFlags.PATCH_RM2FB and cand.patch_rm2fb: - script = ( - [ - "export DEBIAN_FRONTEND=noninteractive", - "apt-get update -qq", - "apt-get install -qq --no-install-recommends patchelf", - ] - + script - + [ - "patchelf --add-needed librm2fb_client.so.1 " - + " ".join( - docker_file_path(file_path) - for file_path in cand.patch_rm2fb - ) - ] - ) - - self.adapter.debug("Binaries to be patched with rm2fb client:") - - for file_path in cand.patch_rm2fb: - self.adapter.debug( - " - %s", - os.path.relpath(file_path, src_dir), - ) - - if script: - logs = bash.run_script_in_container( - self.docker, - image=self.IMAGE_PREFIX + self.DEFAULT_IMAGE, - mounts=[ - docker.types.Mount( - type="bind", - source=os.path.abspath(src_dir), - target=mount_src, - ) - ], - variables={}, - script="\n".join(script), - ) - - self._print_logs(logs) - - # Restore original mtimes - for file_path, mtime in original_mtime.items(): - os.utime(file_path, ns=(mtime, mtime)) - - @staticmethod - def _postprocessing_candidates(src_dir: str) -> PostprocessingCandidates: - """Search for binaries that need to be post-processed.""" - strip_arm = [] - strip_x86 = [] - patch_rm2fb = [] - - for directory, _, files in os.walk(src_dir): - for file_name in files: - file_path = os.path.join(directory, file_name) - - try: - with open(file_path, "rb") as file: - info = ELFFile(file) - symtab = info.get_section_by_name(".symtab") - - if info.get_machine_arch() == "ARM": - if symtab: - strip_arm.append(file_path) - - dynamic = info.get_section_by_name(".dynamic") - rodata = info.get_section_by_name(".rodata") - - if ( - dynamic - and rodata - and rodata.data().find(b"/dev/fb0") != -1 - ): - patch_rm2fb.append(file_path) - elif ( - info.get_machine_arch() in ("x86", "x64") and symtab - ): - strip_x86.append(file_path) - except ELFError: - # Ignore non-ELF files - pass - except IsADirectoryError: - # Ignore directories - pass - - return PostprocessingCandidates( - strip_arm=strip_arm, - strip_x86=strip_x86, - patch_rm2fb=patch_rm2fb, - ) - - def _package(self, package: Package, src_dir: str, pkg_dir: str) -> None: - """Make a package from a recipe’s build artifacts.""" - self.adapter.info("Packaging build artifacts") - logs = bash.run_script( - script=package.functions["package"], - variables={ - **package.variables, - **package.custom_variables, - "srcdir": src_dir, - "pkgdir": pkg_dir, - }, - ) - - self._print_logs(logs, "package()") - self.adapter.debug("Resulting tree:") - - for filename in util.list_tree(pkg_dir): - self.adapter.debug( - " - %s", - os.path.normpath( - os.path.join("/", os.path.relpath(filename, pkg_dir)) - ), - ) - - def _archive(self, package: Package, pkg_dir: str) -> None: - """Create an archive for a package.""" - self.adapter.info("Creating archive") - ar_path = os.path.join(paths.REPO_DIR, package.filename()) - ar_dir = os.path.dirname(ar_path) - os.makedirs(ar_dir, exist_ok=True) - - # Inject Oxide-specific hook for reloading apps - if os.path.exists( - os.path.join(pkg_dir, "opt/usr/share/applications") - ) or os.path.exists(os.path.join(pkg_dir, "opt/etc/draft")): - oxide_hook = "\nreload-oxide-apps\n" - package.functions["configure"] += oxide_hook - package.functions["postupgrade"] += oxide_hook - package.functions["postremove"] += oxide_hook - - # Convert install scripts to Debian format - scripts = {} - script_header = "\n".join( - ( - textwrap.dedent( - """\ - #!/usr/bin/env bash - set -euo pipefail - """ - ), - bash.put_variables( - { - **package.variables, - **package.custom_variables, - } - ), - bash.put_functions(package.custom_functions), - self.install_lib, - ) - ) - - for name, script, action in ( - ("preinstall", "preinst", "install"), - ("configure", "postinst", "configure"), - ): - if package.functions[name]: - scripts[script] = "\n".join( - ( - script_header, - textwrap.dedent( - f"""\ - if [[ $1 = {action} ]]; then - script() {{ - """ - ), - package.functions[name], - textwrap.dedent( - """\ - } - script - fi - """ - ), - ) - ) - - for step in ("pre", "post"): - if ( - package.functions[step + "upgrade"] - or package.functions[step + "remove"] - ): - script = script_header - - for action in ("upgrade", "remove"): - if package.functions[step + action]: - script += "\n".join( - ( - textwrap.dedent( - f"""\ - if [[ $1 = {action} ]]; then - script() {{ - """ - ), - package.functions[step + action], - textwrap.dedent( - """\ - } - script - fi - """ - ), - ) - ) - - scripts[step + "rm"] = script - - self.adapter.debug("Install scripts:") - - if scripts: - for script in sorted(scripts): - self.adapter.debug(" - %s", script) - else: - self.adapter.debug("(none)") - - epoch = int(package.parent.timestamp.timestamp()) - - with open(ar_path, "wb") as file: - ipk.make_ipk( - file, - epoch=epoch, - pkg_dir=pkg_dir, - metadata=package.control_fields(), - scripts=scripts, - ) - - # Set fixed atime and mtime for the resulting archive - os.utime(ar_path, (epoch, epoch)) - - def _print_logs( - self, - logs: bash.LogGenerator, - function_name: Optional[str] = None, - max_lines_on_fail: int = 50, - ) -> None: - """ - Print logs to the debug output or buffer and print the last n log lines - if a ScriptError is caught. - - :param logs: generator of log lines - :param function_name: calling function name - :param max_lines_on_fail: number of context lines to print - in non-debug mode - """ - log_buffer: Deque[str] = deque() - try: - for line in logs: - if self.adapter.getEffectiveLevel() <= logging.DEBUG: - self.adapter.debug(line) - else: - if len(log_buffer) == max_lines_on_fail: - log_buffer.popleft() - log_buffer.append(line) - except bash.ScriptError as err: - if len(log_buffer) > 0: - self.adapter.info( - f"Only showing up to {max_lines_on_fail} lines of context. " - + "Use --verbose for the full output." - ) - for line in log_buffer: - self.adapter.error(line) - - if function_name: - self.adapter.error(f"{function_name} failed") - - raise err diff --git a/scripts/toltec/ipk.py b/scripts/toltec/ipk.py deleted file mode 100644 index 4a064ba55..000000000 --- a/scripts/toltec/ipk.py +++ /dev/null @@ -1,153 +0,0 @@ -# Copyright (c) 2021 The Toltec Contributors -# SPDX-License-Identifier: MIT -"""Make ipk packages.""" - -from gzip import GzipFile -from typing import Dict, IO, Optional -from io import BytesIO -import tarfile -import operator -import os - - -def _targz_open(fileobj: IO[bytes], epoch: int) -> tarfile.TarFile: - """ - Open a gzip compressed tar archive for writing. - - Modified from :func:`tarfile.TarFile.gzopen` to support - setting the `mtime` attribute on `GzipFile`. - """ - gzipobj = GzipFile( - filename="", mode="wb", compresslevel=9, fileobj=fileobj, mtime=epoch - ) - - try: - # pylint: disable-next=consider-using-with - archive = tarfile.TarFile( - mode="w", - fileobj=gzipobj, # type:ignore - format=tarfile.GNU_FORMAT, - ) - except: - gzipobj.close() - raise - - archive._extfileobj = False # type:ignore # pylint:disable=protected-access - return archive - - -def _clean_info( - root: Optional[str], epoch: int, info: tarfile.TarInfo -) -> tarfile.TarInfo: - """ - Remove variable data from an archive entry. - - :param root: absolute path to the root directory from which the - entry was added, or None to disable turning the name into a - relative path - :param epoch: fixed modification time to set - :param info: tarinfo object to set - :returns: changed tarinfo - """ - if root is not None: - info.name = os.path.relpath("/" + info.name, root) - - if not info.name.startswith("."): - info.name = "./" + info.name - - info.uid = 0 - info.gid = 0 - info.uname = "" - info.gname = "" - info.mtime = epoch - - return info - - -def _add_file( - archive: tarfile.TarFile, name: str, mode: int, epoch: int, data: bytes -) -> None: - """ - Add an in-memory file into a tar archive. - - :param archive: archive to append to - :param name: name of the file to add - :param mode: permissions of the file - :param epoch: fixed modification time to set - :param data: file contents - """ - info = tarfile.TarInfo("./" + name) - info.size = len(data) - info.mode = mode - archive.addfile(_clean_info(None, epoch, info), BytesIO(data)) - - -def make_control( - file: IO[bytes], epoch: int, metadata: str, scripts: Dict[str, str] -) -> None: - """ - Create the control sub-archive. - - See - and . - - :param file: file to which the sub-archive will be written - :param epoch: fixed modification time to set - :param metadata: package metadata (main control file) - :param scripts: optional maintainer scripts - """ - with _targz_open(file, epoch) as archive: - root_info = tarfile.TarInfo("./") - root_info.type = tarfile.DIRTYPE - archive.addfile(_clean_info(None, epoch, root_info)) - - _add_file(archive, "control", 0o644, epoch, metadata.encode()) - - for name, script in sorted(scripts.items(), key=operator.itemgetter(0)): - _add_file(archive, name, 0o755, epoch, script.encode()) - - -def make_data(file: IO[bytes], epoch: int, pkg_dir: str) -> None: - """ - Create the data sub-archive. - - :param file: file to which the sub-archive will be written - :param epoch: fixed modification time to set - :param pkg_dir: directory in which the package tree exists - """ - with _targz_open(file, epoch) as archive: - archive.add( - pkg_dir, filter=lambda info: _clean_info(pkg_dir, epoch, info) - ) - - -def make_ipk( - file: IO[bytes], - epoch: int, - pkg_dir: str, - metadata: str, - scripts: Dict[str, str], -) -> None: - """ - Create an ipk package. - - :param file: file to which the package will be written - :param epoch: fixed modification time to set - :param pkg_dir: directory in which the package tree exists - :param metadata: package metadata (main control file) - :param scripts: optional maintainer scripts - """ - with BytesIO() as control, BytesIO() as data, _targz_open( - file, epoch - ) as archive: - root_info = tarfile.TarInfo("./") - root_info.type = tarfile.DIRTYPE - archive.addfile(_clean_info(None, epoch, root_info)) - - make_control(control, epoch, metadata, scripts) - _add_file(archive, "control.tar.gz", 0o644, epoch, control.getvalue()) - - make_data(data, epoch, pkg_dir) - _add_file(archive, "data.tar.gz", 0o644, epoch, data.getvalue()) - - _add_file(archive, "debian-binary", 0o644, epoch, b"2.0\n") diff --git a/scripts/toltec/recipe.py b/scripts/toltec/recipe.py deleted file mode 100644 index de3844490..000000000 --- a/scripts/toltec/recipe.py +++ /dev/null @@ -1,539 +0,0 @@ -# Copyright (c) 2021 The Toltec Contributors -# SPDX-License-Identifier: MIT -""" -Parse recipes. - -A package is a final user-installable software archive. A recipe is a Bash file -which contains the instructions necessary to build one or more related -packages (in the latter case, it is called a split package). -""" - -from dataclasses import dataclass -from datetime import datetime -from enum import auto, Flag -from itertools import product -from typing import Dict, NamedTuple, Optional, Set -import os -import textwrap -import dateutil.parser -from .version import Version, Dependency, DependencyKind -from . import bash - - -class RecipeError(Exception): - """Raised when a recipe contains an error.""" - - -@dataclass -class GenericRecipe: # pylint:disable=too-many-instance-attributes - """Load recipes.""" - - name: str - path: str - recipes: Dict[str, "Recipe"] - - @staticmethod - def from_file(path: str) -> "GenericRecipe": - """ - Load a recipe from its directory. - - :param path: path to the directory containing the recipe definition - :returns: loaded recipe - """ - name = os.path.basename(path) - # pylint: disable-next=unspecified-encoding - with open(os.path.join(path, "package"), "r") as recipe: - return GenericRecipe(name, path, recipe.read()) - - def __init__(self, name: str, path: str, definition: str): - """ - Load a recipe from a Bash source. - - :param name: name of the recipe - :param path: path to the directory containing the recipe definition - :param definition: source string of the recipe - :raises RecipeError: if the recipe contains an error - """ - self.name = name - self.path = path - variables, functions = bash.get_declarations(definition) - - archs = _pop_field_indexed(variables, "archs", ["rmall"]) - self.recipes = {} - - for arch in archs: - assert arch is not None - self._load_arch(arch, archs, variables.copy(), functions.copy()) - - def _load_arch( - self, - arch: str, - archs: bash.IndexedArray, - variables: bash.Variables, - functions: bash.Functions, - ) -> None: - """ - Instantiate a recipe for a given architecture. - - :param arch: target architecture - :param archs: available architectures - :param variables: Bash variables defined in the recipe - :param functions: Bash functions defined in the recipe - :raises RecipeError: if the recipe contains an error - """ - variables["arch"] = arch - - # Merge variables suffixed with the selected architecture - # into normal variables, drop other arch-specific variables - for name, value in list(variables.items()): - last_underscore = name.rfind("_") - - if last_underscore == -1: - continue - - var_arch = name[last_underscore + 1 :] - - if var_arch not in archs: - continue - - del variables[name] - - if var_arch != arch: - continue - - name = name[:last_underscore] - - if name not in variables: - variables[name] = value - else: - base_value = variables[name] - - if isinstance(base_value, str): - if not isinstance(value, str): - raise RecipeError( - f"Recipe '{self.name}' declares the \ -'{name}' field several times with different types" - ) - - variables[name] = value - - if isinstance(base_value, list): - if not isinstance(value, list): - raise RecipeError( - f"Recipe '{self.name}' declares the \ -'{name}' field several times with different types" - ) - - variables[name] = base_value + value - - self.recipes[arch] = Recipe( - self, f"{self.name}-{arch}", variables, functions - ) - - -class Source(NamedTuple): - """Source item needed to build a recipe.""" - - url: str - checksum: str - noextract: bool - - -class BuildFlags(Flag): - """Flags that guard special behaviors of the build system.""" - - NONE = auto() - - # Disable the automatic stripping of generated binaries - NOSTRIP = auto() - - # Patch all generated binaries with the rm2fb client shim - PATCH_RM2FB = auto() - - -@dataclass -class Recipe: # pylint:disable=too-many-instance-attributes - """Recipe specialized for a target architecture.""" - - parent: GenericRecipe - name: str - - variables: bash.Variables - custom_variables: bash.Variables - timestamp: datetime - sources: Set[Source] - makedepends: Set[Dependency] - maintainer: str - image: str - arch: str - flags: BuildFlags - - functions: bash.Functions - custom_functions: bash.Functions - - packages: Dict[str, "Package"] - - def __init__( - self, - parent: GenericRecipe, - name: str, - variables: bash.Variables, - functions: bash.Functions, - ): - """ - Load an architecture-specialized recipe. - - :param parent: recipe from which this is specialized - :param name: name of the recipe - :param variables: specialized Bash variables for the recipe - :param functions: specialized Bash functions for the recipe - :raises RecipeError: if the recipe contains an error - """ - self.parent = parent - self.name = name - - self.variables = {} - self.functions = {} - - self._load_fields(variables) - self._load_functions(functions) - self._load_packages(variables, functions) - - self.custom_variables = variables - self.custom_functions = functions - - def _load_fields(self, variables: bash.Variables) -> None: - """Parse and check standard fields.""" - flags = _pop_field_indexed(variables, "flags", []) - self.variables["flags"] = flags - self.flags = BuildFlags.NONE - - for flag in flags: - assert flag is not None - self.flags |= getattr(BuildFlags, flag.upper()) - - timestamp_str = _pop_field_string(variables, "timestamp") - self.variables["timestamp"] = timestamp_str - - try: - self.timestamp = dateutil.parser.isoparse(timestamp_str) - except ValueError as err: - raise RecipeError( - "Field 'timestamp' does not contain a valid ISO-8601 date" - ) from err - - sources = _pop_field_indexed(variables, "source", []) - self.variables["source"] = sources - - sha256sums = _pop_field_indexed(variables, "sha256sums", []) - self.variables["sha256sums"] = sha256sums - - noextract = _pop_field_indexed(variables, "noextract", []) - self.variables["noextract"] = noextract - - if len(sources) != len(sha256sums): - raise RecipeError( - f"Expected the same number of sources and checksums, got \ -{len(sources)} source(s) and {len(sha256sums)} checksum(s)" - ) - - self.sources = set() - - for source, checksum in zip(sources, sha256sums): - self.sources.add( - Source( - url=source or "", - checksum=checksum or "SKIP", - noextract=os.path.basename(source or "") in noextract, - ) - ) - - makedepends_raw = _pop_field_indexed(variables, "makedepends", []) - self.variables["makedepends"] = makedepends_raw - self.makedepends = { - Dependency.parse(dep or "") for dep in makedepends_raw - } - - self.maintainer = _pop_field_string(variables, "maintainer") - self.variables["maintainer"] = self.maintainer - - self.image = _pop_field_string(variables, "image", "") - self.variables["image"] = self.image - - self.arch = _pop_field_string(variables, "arch") - self.variables["arch"] = self.arch - - def _load_functions(self, functions: bash.Functions) -> None: - """Parse and check standard functions.""" - if self.image and "build" not in functions: - raise RecipeError( - "Missing build() function for a recipe which declares a \ -build image" - ) - - if not self.image and "build" in functions: - raise RecipeError( - "Missing image declaration for a recipe which has a \ -build() step" - ) - - self.functions["prepare"] = functions.pop("prepare", "") - self.functions["build"] = functions.pop("build", "") - - def _load_packages( - self, variables: bash.Variables, functions: bash.Functions - ) -> None: - """Load packages defined by this recipe.""" - pkgnames = _pop_field_indexed(variables, "pkgnames") - self.variables["pkgnames"] = pkgnames - self.packages = {} - - if len(pkgnames) == 1: - # Single-package recipe: use global declarations - pkg_name = pkgnames[0] - assert pkg_name is not None - variables["pkgname"] = pkg_name - self.packages[pkg_name] = Package(self, variables, functions) - else: - # Split-package recipe: load package-local declarations - pkg_decls = {} - - for sub_pkg_name in pkgnames: - assert sub_pkg_name is not None - - if sub_pkg_name not in functions: - raise RecipeError( - "Missing required function {sub_pkg_name}() for \ -corresponding package" - ) - - pkg_def = functions.pop(sub_pkg_name) - context = bash.put_variables( - { - **self.variables, - **variables, - "pkgname": sub_pkg_name, - } - ) - pkg_decls[sub_pkg_name] = bash.get_declarations( - context + pkg_def - ) - - for var_name in self.variables: - del pkg_decls[sub_pkg_name][0][var_name] - - for sub_pkg_name, (pkg_vars, pkg_funcs) in pkg_decls.items(): - self.packages[sub_pkg_name] = Package(self, pkg_vars, pkg_funcs) - - -@dataclass -class Package: # pylint:disable=too-many-instance-attributes - """Load packages.""" - - parent: Recipe - name: str - - variables: bash.Variables - custom_variables: bash.Variables - - version: Version - desc: str - url: str - section: str - license: str - installdepends: Set[Dependency] - conflicts: Set[Dependency] - replaces: Set[Dependency] - provides: Set[Dependency] - - functions: bash.Functions - custom_functions: bash.Functions - - def __init__( - self, - parent: Recipe, - variables: bash.Variables, - functions: bash.Functions, - ): - """ - Load a package. - - :param parent: specialized recipe which declares this package - :param variables: Bash variables declared in the package - :param functions: Bash functions declared in the package - :raises RecipeError: if the package contains an error - """ - self.parent = parent - self.variables = parent.variables.copy() - self.functions = {} - - self._load_fields(variables) - self._load_functions(functions) - self._load_custom(variables, functions) - - def _load_fields(self, variables: bash.Variables) -> None: - """Parse and check standard fields.""" - self.name = _pop_field_string(variables, "pkgname") - self.variables["pkgname"] = self.name - - pkgver_str = _pop_field_string(variables, "pkgver") - self.variables["pkgver"] = pkgver_str - self.version = Version.parse(pkgver_str) - - self.desc = _pop_field_string(variables, "pkgdesc") - self.variables["pkgdesc"] = self.desc - - self.url = _pop_field_string(variables, "url") - self.variables["url"] = self.url - - self.section = _pop_field_string(variables, "section") - self.variables["section"] = self.section - - self.license = _pop_field_string(variables, "license") - self.variables["license"] = self.license - - for field in ("installdepends", "conflicts", "replaces", "provides"): - field_raw = _pop_field_indexed(variables, field, []) - self.variables[field] = field_raw - setattr(self, field, set()) - - for dep_raw in field_raw: - assert dep_raw is not None - dep = Dependency.parse(dep_raw) - - if dep.kind != DependencyKind.Host: - raise RecipeError( - f"Only host packages are supported in the \ -'{field}' field" - ) - - getattr(self, field).add(dep) - - if self.parent.flags & BuildFlags.PATCH_RM2FB: - self.installdepends.add( - Dependency( - DependencyKind.Host, - "rm2fb-client", - ) - ) - - def _load_functions(self, functions: bash.Functions) -> None: - """Parse and check standard functions.""" - if "package" not in functions: - raise RecipeError( - f"Missing required function package() for package {self.name}" - ) - - self.functions["package"] = functions.pop("package") - - for action in ("preinstall", "configure"): - self.functions[action] = functions.pop(action, "") - - for rel, step in product(("pre", "post"), ("remove", "upgrade")): - self.functions[rel + step] = functions.pop(rel + step, "") - - def _load_custom( - self, variables: bash.Variables, functions: bash.Functions - ) -> None: - """Parse and check custom fields and functions.""" - for var_name in variables.keys(): - if not var_name.startswith("_"): - raise RecipeError( - f"Unknown field '{var_name}' in the definition of \ -package {self.name} ({self.parent.name}) — make sure to prefix the names of \ -custom fields with '_'" - ) - - for func_name in functions.keys(): - if not func_name.startswith("_"): - raise RecipeError( - f"Unknown function '{func_name}' in the definition of \ -package {self.name} ({self.parent.name}) — make sure to prefix the names of \ -custom functions with '_'" - ) - - self.custom_variables = variables - self.custom_functions = functions - - def pkgid(self) -> str: - """Get the unique identifier of this package.""" - return "_".join( - (self.name, str(self.version).replace(":", "_"), self.parent.arch) - ) - - def filename(self) -> str: - """Get the name of the archive corresponding to this package.""" - return os.path.join(self.parent.arch, self.pkgid() + ".ipk") - - def control_fields(self) -> str: - """Get the control fields for this package.""" - control = textwrap.dedent( - f"""\ - Package: {self.name} - Description: {self.desc} - Homepage: {self.url} - Version: {self.version} - Section: {self.section} - Maintainer: {self.parent.maintainer} - License: {self.license} - Architecture: {self.parent.arch} - """ - ) - - for debian_name, field in ( - ("Depends", self.installdepends), - ("Conflicts", self.conflicts), - ("Replaces", self.replaces), - ("Provides", self.provides), - ): - if field: - control += ( - debian_name - + ": " - + ", ".join(dep.to_debian() for dep in field if dep) - + "\n" - ) - - return control - - -# Helpers to check that fields of the right type are defined in a recipe -# and to otherwise return a default value -def _pop_field_string( - variables: bash.Variables, name: str, default: Optional[str] = None -) -> str: - if name not in variables: - if default is None: - raise RecipeError(f"Missing required field {name}") - return default - - value = variables.pop(name) - - if not isinstance(value, str): - raise RecipeError( - f"Field {name} must be a string, \ -got {type(variables[name]).__name__}" - ) - - return value - - -def _pop_field_indexed( - variables: bash.Variables, - name: str, - default: Optional[bash.IndexedArray] = None, -) -> bash.IndexedArray: - if name not in variables: - if default is None: - raise RecipeError(f"Missing required field '{name}'") - return default - - value = variables.pop(name) - - if not isinstance(value, list): - raise RecipeError( - f"Field '{name}' must be an indexed array, \ -got {type(variables[name]).__name__}" - ) - - return value diff --git a/scripts/toltec/templating.py b/scripts/toltec/templating.py deleted file mode 100644 index a23abbf46..000000000 --- a/scripts/toltec/templating.py +++ /dev/null @@ -1,11 +0,0 @@ -# Copyright (c) 2021 The Toltec Contributors -# SPDX-License-Identifier: MIT -""" -Load the Jinja2 templating engine. -""" -from jinja2 import Environment, PackageLoader - -env = Environment( - loader=PackageLoader("toltec", "templates"), - autoescape=True, -) diff --git a/scripts/toltec/util.py b/scripts/toltec/util.py deleted file mode 100644 index d9e623d1b..000000000 --- a/scripts/toltec/util.py +++ /dev/null @@ -1,332 +0,0 @@ -# Copyright (c) 2021 The Toltec Contributors -# SPDX-License-Identifier: MIT -"""Collection of useful functions.""" - -import argparse -from collections.abc import Iterable -import hashlib -import logging -import itertools -import os -import shutil -import sys -from typing import ( - Any, - Callable, - Dict, - IO, - List, - Optional, - Protocol, - Sequence, - TypeVar, -) -import zipfile -import tarfile - -# Date format used in HTTP headers such as Last-Modified -HTTP_DATE_FORMAT = "%a, %d %b %Y %H:%M:%S %Z" - -# Logging format for build scripts -LOGGING_FORMAT = "[%(levelname)8s] %(name)s: %(message)s" - - -def argparse_add_verbose(parser: argparse.ArgumentParser) -> None: - """Add an option for setting the verbosity level.""" - parser.add_argument( - "-v", - "--verbose", - action="store_const", - const=logging.DEBUG, - default=logging.INFO, - help="show debugging information", - ) - - -def file_sha256(path: str) -> str: - """Compute the SHA-256 checksum of a file.""" - sha256 = hashlib.sha256() - buffer = bytearray(128 * 1024) - view = memoryview(buffer) - - with open(path, "rb", buffering=0) as file: - for length in iter(lambda: file.readinto(view), 0): # type:ignore - sha256.update(view[:length]) - - return sha256.hexdigest() - - -def split_all_parts(path: str) -> List[str]: - """Split a file path into all its directory components.""" - parts = [] - prefix = path - - while prefix not in ("", "/"): - prefix, base = os.path.split(prefix) - if base: - parts.append(base) - - parts.reverse() - return parts - - -def split_all_exts(path: str) -> List[str]: - """Get the list of extensions in a file path.""" - exts = [] - remaining = path - - while True: - remaining, ext = os.path.splitext(remaining) - if ext: - exts.append(ext) - else: - break - - return exts - - -def all_equal(seq: Iterable) -> bool: - """Check that all elements of a sequence are equal.""" - grouped = itertools.groupby(seq) - first = next(grouped, (None, grouped)) - second = next(grouped, None) - return first and not second - - -def remove_prefix(filenames: List[str]) -> Dict[str, str]: - """Find and remove the longest directory prefix shared by all files.""" - split_filenames = [split_all_parts(filename) for filename in filenames] - - # Find the longest directory prefix shared by all files - min_len = min(len(filename) for filename in split_filenames) - prefix = 0 - - while prefix < min_len and all_equal( - filename[prefix] for filename in split_filenames - ): - prefix += 1 - - # If there’s only one file, keep the last component - if len(filenames) == 1: - prefix -= 1 - - mapping = {} - - for filename, split_filename in zip(filenames, split_filenames): - if split_filename[prefix:]: - mapping[filename] = os.path.join(*split_filename[prefix:]) - - return mapping - - -def auto_extract(archive_path: str, dest_path: str) -> bool: - """ - Automatically extract an archive and strip useless components. - - :param archive_path: path to the archive to extract - :param dest_path: destination folder for the archive contents - :returns: true if something was extracted, false if not a supported archive - """ - exts = split_all_exts(archive_path) - - if not exts: - return False - - if exts[0] == ".zip": - with zipfile.ZipFile(archive_path) as zip_archive: - _auto_extract( - zip_archive.namelist(), - zip_archive.getinfo, - zip_archive.open, - lambda member: member.is_dir(), - lambda member: False, - lambda member: member.external_attr >> 16 & 0x1FF, - dest_path, - ) - return True - - if exts[0] == ".tar" or ( - len(exts) >= 2 - and exts[0] in (".gz", ".bz2", ".xz") - and exts[1] == ".tar" - ): - with tarfile.open(archive_path, mode="r") as tar_archive: - _auto_extract( - tar_archive.getnames(), - tar_archive.getmember, - tar_archive.extractfile, - lambda member: member.isdir(), - lambda member: member.issym(), - lambda member: member.mode, - dest_path, - ) - return True - - return False - - -def _auto_extract( # pylint:disable=too-many-arguments,disable=too-many-locals - members: List[str], - getinfo: Callable[[str], Any], - extract: Callable[[Any], Optional[IO[bytes]]], - isdir: Callable[[Any], bool], - issym: Callable[[Any], bool], - getmode: Callable[[Any], int], - dest_path: str, -) -> None: - """ - Generic implementation of automatic archive extraction. - - :param members: list of members of the archive - :param getinfo: get an entry object from an entry name in the archive - :param extract: get a reading stream corresponding to an archive entry - :param isdir: get whether an entry is a directory or not - :param issym: get whether an entry is a symbolic link or not - :param getmode: get the permission bits for an entry - :param destpath: destinatio folder for the archive contents - """ - stripped_map = remove_prefix(members) - - for filename, stripped in stripped_map.items(): - member = getinfo(filename) - file_path = os.path.join(dest_path, stripped) - - if isdir(member): - os.makedirs(file_path, exist_ok=True) - else: - if issym(member): - os.symlink(member.linkname, file_path) - else: - basedir = os.path.dirname(file_path) - if not os.path.exists(basedir): - os.makedirs(basedir, exist_ok=True) - - source = extract(member) - assert source is not None - - with source, open(file_path, "wb") as target: - shutil.copyfileobj(source, target) - - mode = getmode(member) - if mode != 0: - os.chmod(file_path, mode) - - -def query_user( - question: str, - default: str, - options: Optional[List[str]] = None, - aliases: Optional[Dict[str, str]] = None, -) -> str: - """ - Ask the user to make a choice. - - :param question: message to display before the choice - :param default: default choice if the user inputs an empty string - :param options: list of valid options (should be lowercase strings) - :param aliases: accepted aliases for the valid options - :returns: option chosen by the user - """ - options = options or ["y", "n"] - aliases = aliases or {"yes": "y", "no": "n"} - - if default not in options: - raise ValueError(f"Default value {default} is not a valid option") - - prompt = "/".join( - option if option != default else option.upper() for option in options - ) - - while True: - sys.stdout.write(f"{question} [{prompt}] ") - choice = input().lower() - - if not choice: - return default - - if choice in options: - return choice - - if choice in aliases: - return aliases[choice] - - print("Invalid answer. Please choose among the valid options.") - - -def check_directory(path: str, message: str) -> bool: - """ - Create a directory and ask the user what to do if it already exists. - - :param path: path to the directory to create - :param message: message to display before asking the user interactively - :returns: false if the user chose to cancel the current operation - """ - try: - os.mkdir(path) - except FileExistsError: - ans = query_user( - message, - default="c", - options=["c", "r", "k"], - aliases={ - "cancel": "c", - "remove": "r", - "keep": "k", - }, - ) - - if ans == "c": - return False - - if ans == "r": - shutil.rmtree(path) - os.mkdir(path) - - return True - - -def list_tree(root: str) -> List[str]: - """ - Get a sorted list of all files and folders under a given root folder. - - :param root: root folder to start from - :returns: sorted list of items under the root folder - """ - result = [] - - for directory, _, files in os.walk(root): - result.append(directory) - for file in files: - result.append(os.path.join(directory, file)) - - return sorted(result) - - -# See -class SupportsLessThan(Protocol): # pylint:disable=too-few-public-methods - """Types that support the less-than operator.""" - - def __lt__(self, other: Any) -> bool: - ... - - -Key = TypeVar("Key", bound=SupportsLessThan) -Value = TypeVar("Value") - - -def group_by( - in_seq: Sequence[Value], key_fn: Callable[[Value], Key] -) -> Dict[Key, List[Value]]: - """ - Group elements of a list. - - :param in_seq: list of elements to group - :param key_fn: mapping of each element onto a group - :returns: dictionary of groups - """ - return dict( - (key, list(group)) - for key, group in itertools.groupby( - sorted(in_seq, key=key_fn), key=key_fn - ) - ) diff --git a/scripts/toltec/version.py b/scripts/toltec/version.py deleted file mode 100644 index 91c143e5a..000000000 --- a/scripts/toltec/version.py +++ /dev/null @@ -1,220 +0,0 @@ -# Copyright (c) 2021 The Toltec Contributors -# SPDX-License-Identifier: MIT -"""Parse versions and dependency specifications.""" - -import re -from enum import Enum -from typing import Optional - -# Characters permitted in the upstream version part of a version number -_VERSION_CHARS = re.compile("^[A-Za-z0-9.+~-]+$") - -# Characters making up a version comparator -_COMPARATOR_CHARS = re.compile("[<>=]") - - -class VersionComparator(Enum): - """Operators used to compare two version numbers.""" - - # pylint: disable=invalid-name - - LowerThan = "<<" - LowerThanOrEqual = "<=" - Equal = "=" - GreaterThanOrEqual = ">=" - GreaterThan = ">>" - - # pylint: enable=invalid-name - - -class InvalidVersionError(Exception): - """Raised when parsing of an invalid version is attempted.""" - - -class Version: - """ - Parse package versions. - - See - for details about the format and the comparison rules. - """ - - def __init__(self, epoch: int, upstream: str, revision: str): - self.upstream = upstream - self.revision = revision - self.epoch = epoch - - if _VERSION_CHARS.fullmatch(upstream) is None: - raise InvalidVersionError( - f"Invalid chars in upstream version: '{upstream}'" - ) - - if _VERSION_CHARS.fullmatch(revision) is None: - raise InvalidVersionError( - f"Invalid chars in revision: '{revision}'" - ) - - self._original: Optional[str] = None - - @staticmethod - def parse(version: str) -> "Version": - """Parse a version number.""" - original = version - colon = version.find(":") - - if colon == -1: - epoch = 0 - else: - epoch = int(version[:colon]) - version = version[colon + 1 :] - - dash = version.find("-") - - if dash == -1: - revision = "0" - else: - revision = version[dash + 1 :] - version = version[:dash] - - upstream = version - - result = Version(epoch, upstream, revision) - result._original = original # pylint:disable=protected-access - return result - - def __str__(self) -> str: - if self._original is not None: - # Use the original parsed version string - return self._original - - epoch = "" if self.epoch == 0 else f"{self.epoch}:" - revision = ( - "" - if self.revision == "0" and "-" not in self.upstream - else f"-{self.revision}" - ) - - return f"{epoch}{self.upstream}{revision}" - - def __repr__(self) -> str: - return f"Version(upstream={repr(self.upstream)}, \ -revision={repr(self.revision)}, epoch={repr(self.epoch)})" - - -class DependencyKind(Enum): - """Kinds of dependencies that may be requested by a package.""" - - # pylint: disable=invalid-name - - # Dependency installed in the system used to build a package - # (e.g., a Debian package) - Build = "build" - # Dependency installed alongside a package - # (e.g., another Entware or Toltec package) - Host = "host" - - # pylint: enable=invalid-name - - -class InvalidDependencyError(Exception): - """Raised when parsing an invalid dependency specification.""" - - -class Dependency: - """ - Parse version-constrained dependencies. - - Toltec dependencies are declared using the following format: - - [host:|build:]package[(<<|<=|=|=>|>>)version] - - Dependencies of a package that start with `build:` correspond to packages - that must be installed in the build system. Dependencies that start with - `host:` or do not have a prefix correspond to packages that must be - installed alongside the built package, either in the host sysroot when - building the package, or in the target device when using it. - """ - - def __init__( - self, - kind: DependencyKind, - package: str, - version_comparator: VersionComparator = VersionComparator.Equal, - version: Optional[Version] = None, - ): - self.kind = kind - self.package = package - self.version_comparator = version_comparator - self.version = version - - self._original: Optional[str] = None - - @staticmethod - def parse(dependency: str) -> "Dependency": - """Parse a dependency specification.""" - original = dependency - kind = DependencyKind.Host - - for enum_kind in DependencyKind: - if dependency.startswith(enum_kind.value + ":"): - kind = enum_kind - dependency = dependency[len(enum_kind.value) + 1 :] - break - - comp_char_match = _COMPARATOR_CHARS.search(dependency) - - if comp_char_match is None: - package = dependency - version_comparator = VersionComparator.Equal - version = None - else: - comp_char = comp_char_match.start() - for enum_comparator in VersionComparator: - if dependency[comp_char:].startswith(enum_comparator.value): - package = dependency[:comp_char] - version_comparator = enum_comparator - version = Version.parse( - dependency[comp_char + len(enum_comparator.value) :] - ) - break - else: - raise InvalidDependencyError( - f"Invalid version comparator \ -'{dependency[comp_char : comp_char + 2]}'" - ) - - result = Dependency(kind, package, version_comparator, version) - result._original = original # pylint:disable=protected-access - return result - - def to_debian(self) -> str: - """ - Convert a dependency specification to the Debian format. - - See - for the syntax expected by Debian tools. - """ - if self.version is None: - return self.package - - return f"{self.package} ({self.version_comparator.value} \ -{self.version})" - - def __str__(self) -> str: - if self._original is not None: - # Use the original parsed dependency specification - return self._original - - kind = "build:" if self.kind == DependencyKind.Build else "host:" - - if self.version is None: - return f"{kind}{self.package}" - - return f"{kind}{self.package}{self.version_comparator.value}\ -{self.version}" - - def __repr__(self) -> str: - return f"Dependency(kind={repr(self.kind)}, \ -package={repr(self.package)}, \ -version_comparator={repr(self.version_comparator)}, \ -version={repr(self.version)})"