Skip to content

Commit

Permalink
repo: key management refactor with key ID handling changes (#3359)
Browse files Browse the repository at this point in the history
- Decouple key management from _deb.Ubuntu, introducing a new
  apt_key_manager.AptKeyManager class.

- Improve unit test coverage of key management.

- Pivot to enforcing the 40-character key ID. Update the
  schema and package repository validation checks to
  enforce this.  Update the spread tests to comply.

- Update the key asset search to require matching keys to
  be found in snap/keys/12345678.asc, where 12345678 is
  the first eight characters of the key ID.  Update the
  schema and package repository validation checks to
  enforce this.  Update the spread tests to comply.

This functionality should finalize key management for the
stabilization of package-repositories, with the exception of
a related change to ensure all key assets are used.

Signed-off-by: Chris Patterson <[email protected]>
  • Loading branch information
Chris Patterson authored Jan 15, 2021
1 parent 8f418e3 commit 8272e54
Show file tree
Hide file tree
Showing 14 changed files with 649 additions and 346 deletions.
2 changes: 1 addition & 1 deletion schema/snapcraft.json
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
"key-id": {
"type": "string",
"description": "GPG key identifier / fingerprint. May be used to identify key file in <project>/snap/keys/<key-id>.asc",
"pattern": "^[a-zA-Z0-9-_]*$"
"pattern": "^[A-Z0-9]{40}$"
},
"key-server": {
"type": "string",
Expand Down
21 changes: 6 additions & 15 deletions snapcraft/internal/meta/package_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import logging
import re
from copy import deepcopy
from pathlib import Path
from typing import Any, Dict, List, Optional

from snapcraft.internal import repo
Expand All @@ -30,7 +29,7 @@

class PackageRepository(abc.ABC):
@abc.abstractmethod
def install(self, *, keys_path: Path) -> bool:
def install(self) -> bool:
...

@abc.abstractmethod
Expand Down Expand Up @@ -79,8 +78,8 @@ def __init__(self, *, ppa: str) -> None:

self.validate()

def install(self, *, keys_path: Path) -> bool:
return repo.Ubuntu.install_ppa(keys_path=keys_path, ppa=self.ppa)
def install(self) -> bool:
return repo.Ubuntu.install_ppa(ppa=self.ppa)

def marshal(self) -> Dict[str, Any]:
data = dict(type="apt")
Expand Down Expand Up @@ -171,7 +170,7 @@ def __init__(

self.validate()

def install(self, keys_path: Path) -> bool:
def install(self) -> bool:
"""Install repository configuration.
1) First check to see if package repo is implied path,
Expand Down Expand Up @@ -201,13 +200,7 @@ def install(self, keys_path: Path) -> bool:
else:
raise RuntimeError("no suites or path")

# First install associated GPG key.
new_key: bool = repo.Ubuntu.install_gpg_key_id(
keys_path=keys_path, key_id=self.key_id, key_server=self.key_server
)

# Now install sources file.
new_sources: bool = repo.Ubuntu.install_sources(
return repo.Ubuntu.install_sources(
architectures=self.architectures,
components=self.components,
formats=self.formats,
Expand All @@ -216,8 +209,6 @@ def install(self, keys_path: Path) -> bool:
url=self.url,
)

return new_key or new_sources

def marshal(self) -> Dict[str, Any]:
data: Dict[str, Any] = {"type": "apt"}

Expand Down Expand Up @@ -258,7 +249,7 @@ def validate(self) -> None: # noqa: C901
resolution="Verify the repository configuration and ensure that 'formats' is correctly specified.",
)

if not self.key_id:
if not self.key_id or not re.match(r"^[0-9A-F]{40}$", self.key_id):
raise errors.PackageRepositoryValidationError(
url=self.url,
brief=f"Invalid key identifier {self.key_id!r}.",
Expand Down
17 changes: 12 additions & 5 deletions snapcraft/internal/project_loader/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from snapcraft.internal.pluginhandler._part_environment import (
get_snapcraft_global_environment,
)
from snapcraft.internal.repo import apt_key_manager
from snapcraft.project._schema import Validator

from . import errors, grammar_processing, replace_attr
Expand Down Expand Up @@ -262,11 +263,17 @@ def install_package_repositories(self) -> None:
# Install pre-requisite packages for apt-key, if not installed.
repo.Repo.install_build_packages(package_names=["gnupg", "dirmngr"])

keys_path = self.project._get_keys_path()
changes = [
package_repo.install(keys_path=keys_path) for package_repo in package_repos
]
if any(changes):
key_assets = self.project._get_keys_path()
key_manager = apt_key_manager.AptKeyManager(key_assets=key_assets)

refresh_required = False
for package_repo in self._get_required_package_repositories():
refresh_required |= key_manager.install_package_repository_key(
package_repo=package_repo
)
refresh_required |= package_repo.install()

if refresh_required:
repo.Repo.refresh_build_packages()

def get_build_packages(self) -> Set[str]:
Expand Down
169 changes: 1 addition & 168 deletions snapcraft/internal/repo/_deb.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import tempfile
from typing import Dict, List, Optional, Set, Tuple # noqa: F401

import gnupg
from typing_extensions import Final
from xdg import BaseDirectory

Expand Down Expand Up @@ -255,9 +254,6 @@ def _sudo_write_file(*, dst_path: pathlib.Path, content: bytes) -> None:


class Ubuntu(BaseRepo):
_SNAPCRAFT_INSTALLED_GPG_KEYRING: Final[
str
] = "/etc/apt/trusted.gpg.d/snapcraft.gpg"
_SNAPCRAFT_INSTALLED_SOURCES_LIST: Final[
str
] = "/etc/apt/sources.list.d/snapcraft.list"
Expand Down Expand Up @@ -482,175 +478,12 @@ def get_installed_packages(cls) -> List[str]:
]

@classmethod
def _get_key_fingerprints(cls, key: str) -> List[str]:
with tempfile.NamedTemporaryFile(suffix="keyring") as temp_file:
return (
gnupg.GPG(keyring=temp_file.name).import_keys(key_data=key).fingerprints
)

@classmethod
def _is_key_id_installed(cls, key_id: str) -> bool:
# Check if key is installed by attempting to export the key.
# Unfortunately, apt-key does not exit with error, and
# we have to do our best to parse the output.
try:
proc = subprocess.run(
["sudo", "apt-key", "export", key_id],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True,
)
except subprocess.CalledProcessError as error:
# Export shouldn't exit with failure based on testing,
# but assume the key is not installed and log a warning.
logger.warning(f"Unexpected apt-key failure: {error.output}")
return False

apt_key_output = proc.stdout.decode()

if "BEGIN PGP PUBLIC KEY BLOCK" in apt_key_output:
return True

if "nothing exported" in apt_key_output:
return False

# The two strings above have worked in testing, but if neither is
# present for whatever reason, assume the key is not installed
# and log a warning.
logger.warning(f"Unexpected apt-key output: {apt_key_output}")
return False

@classmethod
def _install_gpg_key(cls, *, key_id: str, key: str) -> None:
cmd = [
"sudo",
"apt-key",
"--keyring",
cls._SNAPCRAFT_INSTALLED_GPG_KEYRING,
"add",
"-",
]
try:
logger.debug(f"Executing: {cmd!r}")
env = os.environ.copy()
env["LANG"] = "C.UTF-8"
subprocess.run(
cmd,
input=key.encode(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True,
env=env,
)
except subprocess.CalledProcessError as error:
raise errors.AptGPGKeyInstallError(output=error.output.decode(), key=key)

logger.debug(f"Installed apt repository key:\n{key}")

@classmethod
def install_gpg_key(cls, *, key_id: str, key: str) -> bool:
if cls._is_key_id_installed(key_id):
# Already installed, nothing to do.
return False

cls._install_gpg_key(key_id=key_id, key=key)
return True

@classmethod
def _install_gpg_key_id_from_keyserver(
cls, *, key_id: str, key_server: Optional[str] = None
) -> None:
# Default to keyserver.ubuntu.com.
if key_server is None:
key_server = "keyserver.ubuntu.com"

env = os.environ.copy()
env["LANG"] = "C.UTF-8"

cmd = [
"sudo",
"apt-key",
"--keyring",
cls._SNAPCRAFT_INSTALLED_GPG_KEYRING,
"adv",
"--keyserver",
key_server,
"--recv-keys",
key_id,
]

try:
logger.debug(f"Executing: {cmd!r}")
subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True,
env=env,
)
except subprocess.CalledProcessError as error:
raise errors.AptGPGKeyInstallError(
output=error.output.decode(), key_id=key_id, key_server=key_server
)

@classmethod
def _find_asset_with_key_id(
cls, *, key_id: str, keys_path: pathlib.Path
) -> Tuple[str, Optional[pathlib.Path]]:
# First look for any key asset that matches the key_id fingerprint.
for key_path in keys_path.glob(pattern="*.asc"):
key = key_path.read_text()
if key_id in cls._get_key_fingerprints(key=key):
return key_id, key_path

# Handle case where user uses 'key_id' as the name of the key asset.
# In this case we translate the key name to a proper key ID.
key_path = keys_path / f"{key_id}.asc"
if key_path.exists():
fingerprints = cls._get_key_fingerprints(key=key_path.read_text())
if len(fingerprints) == 0:
logger.warning(f"Error reading key file: {key_path}")
return key_id, None
elif len(fingerprints) > 1:
logger.warning(f"Found multiple key fingerprints in: {key_path}")
return fingerprints[0], key_path

return key_id, None

@classmethod
def install_gpg_key_id(
cls, *, key_id: str, keys_path: pathlib.Path, key_server: Optional[str] = None
) -> bool:
# If key_id references a local asset, we search and replace the local
# key_id reference with the actual fingerprint suitable for checking
# if it is installed.
key_id, key_path = cls._find_asset_with_key_id(
key_id=key_id, keys_path=keys_path
)

# If key is already installed, nothing to do.
if cls._is_key_id_installed(key_id):
return False

# Install key if it is available as a local asset.
if key_path is not None:
cls._install_gpg_key(key_id=key_id, key=key_path.read_text())
return True

# Finally attempt to install from keyserver.
cls._install_gpg_key_id_from_keyserver(key_id=key_id, key_server=key_server)
logger.debug(f"Installed apt repository key ID: {key_id}")
return True

@classmethod
def install_ppa(cls, *, keys_path: pathlib.Path, ppa: str) -> bool:
def install_ppa(cls, ppa: str) -> bool:
owner, name = apt_ppa.split_ppa_parts(ppa=ppa)
key_id = apt_ppa.get_launchpad_ppa_key_id(ppa=ppa)
codename = os_release.OsRelease().version_codename()

return any(
[
cls.install_gpg_key_id(keys_path=keys_path, key_id=key_id),
cls.install_sources(
components=["main"],
formats=["deb"],
Expand Down
Loading

0 comments on commit 8272e54

Please sign in to comment.