From dd3c564351f0c7c12a89f4a433a03a99275781f9 Mon Sep 17 00:00:00 2001 From: Matthias Dellweg Date: Wed, 14 Jun 2023 10:27:59 +0200 Subject: [PATCH] WIP Implement compact_index and add digest to gem fixes #96 --- CHANGES/96.feature | 1 + CHANGES/96.removal | 3 + ...te_excludes_gemremote_includes_and_more.py | 58 +++++++++ pulp_gem/app/models.py | 47 ++++++- pulp_gem/app/serializers.py | 30 +++-- pulp_gem/app/tasks/publishing.py | 48 ++++++- pulp_gem/app/tasks/synchronizing.py | 121 +++++++----------- pulp_gem/specs.py | 109 +++++++++++++++- 8 files changed, 330 insertions(+), 87 deletions(-) create mode 100644 CHANGES/96.feature create mode 100644 CHANGES/96.removal create mode 100644 pulp_gem/app/migrations/0006_gemremote_excludes_gemremote_includes_and_more.py diff --git a/CHANGES/96.feature b/CHANGES/96.feature new file mode 100644 index 0000000..243411c --- /dev/null +++ b/CHANGES/96.feature @@ -0,0 +1 @@ +Implement new compact_index format. Add checksum and dependency information to gem content. diff --git a/CHANGES/96.removal b/CHANGES/96.removal new file mode 100644 index 0000000..b7cb3fd --- /dev/null +++ b/CHANGES/96.removal @@ -0,0 +1,3 @@ +Disable synching without compact index format. Existing on-demand content will be broken after this release. + +TODO: Provide a data repair command. diff --git a/pulp_gem/app/migrations/0006_gemremote_excludes_gemremote_includes_and_more.py b/pulp_gem/app/migrations/0006_gemremote_excludes_gemremote_includes_and_more.py new file mode 100644 index 0000000..9201486 --- /dev/null +++ b/pulp_gem/app/migrations/0006_gemremote_excludes_gemremote_includes_and_more.py @@ -0,0 +1,58 @@ +# Generated by Django 4.2.1 on 2023-06-14 14:53 + +import django.contrib.postgres.fields.hstore +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ("core", "0107_distribution_hidden"), + ("gem", "0005_rename_gemcontent_shallowgemcontent"), + ] + + operations = [ + migrations.AddField( + model_name="gemremote", + name="excludes", + field=django.contrib.postgres.fields.hstore.HStoreField(null=True), + ), + migrations.AddField( + model_name="gemremote", + name="includes", + field=django.contrib.postgres.fields.hstore.HStoreField(null=True), + ), + migrations.AddField( + model_name="gemremote", + name="prereleases", + field=models.BooleanField(default=False), + ), + migrations.CreateModel( + name="GemContent", + fields=[ + ( + "content_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="core.content", + ), + ), + ("name", models.TextField()), + ("version", models.TextField()), + ("checksum", models.CharField(db_index=True, max_length=64)), + ("dependencies", django.contrib.postgres.fields.hstore.HStoreField(default=dict)), + ("required_ruby_version", models.TextField(null=True)), + ("required_rubygems_version", models.TextField(null=True)), + ("prerelease", models.BooleanField(default=False)), + ], + options={ + "default_related_name": "%(app_label)s_%(model_name)s", + "unique_together": {("name", "version", "checksum")}, + }, + bases=("core.content",), + ), + ] diff --git a/pulp_gem/app/models.py b/pulp_gem/app/models.py index 5f2762f..cb04ed6 100644 --- a/pulp_gem/app/models.py +++ b/pulp_gem/app/models.py @@ -1,5 +1,6 @@ from logging import getLogger +from django.contrib.postgres.fields import HStoreField from django.db import models from pulpcore.plugin.models import ( @@ -46,7 +47,45 @@ class Meta: default_related_name = "%(app_label)s_%(model_name)s" unique_together = ("name", "version") -GemContent = ShallowGemContent + +class GemContent(Content): + """ + The "gem" content type. + + Content of this type represents a ruby gem file + with its spec data. + + Fields: + name (str): The name of the gem. + version (str): The version of the gem. + + """ + + TYPE = "gem" + repo_key_fields = ("name", "version") + + name = models.TextField(blank=False, null=False) + version = models.TextField(blank=False, null=False) + checksum = models.CharField(max_length=64, null=False, db_index=True) + prerelease = models.BooleanField(default=False) + dependencies = HStoreField(default=dict) + required_ruby_version = models.TextField(null=True) + required_rubygems_version = models.TextField(null=True) + + @property + def relative_path(self): + """The relative path this gem is stored under for the content app.""" + return f"gems/{self.name}-{self.version}.gem" + + @property + def gemspec_path(self): + """The path for this gem's gemspec for the content app.""" + return f"quick/Marshal.4.8/{self.name}-{self.version}.gemspec.rz" + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + unique_together = ("name", "version", "checksum") + class GemDistribution(Distribution): """ @@ -77,6 +116,10 @@ class GemRemote(Remote): TYPE = "gem" + prereleases = models.BooleanField(default=False) + includes = HStoreField(null=True) + excludes = HStoreField(null=True) + class Meta: default_related_name = "%(app_label)s_%(model_name)s" @@ -87,7 +130,7 @@ class GemRepository(Repository): """ TYPE = "gem" - CONTENT_TYPES = [ShallowGemContent] + CONTENT_TYPES = [GemContent, ShallowGemContent] REMOTE_TYPES = [GemRemote] class Meta: diff --git a/pulp_gem/app/serializers.py b/pulp_gem/app/serializers.py index 83eb978..5be6cb8 100644 --- a/pulp_gem/app/serializers.py +++ b/pulp_gem/app/serializers.py @@ -3,9 +3,11 @@ import os from rest_framework.serializers import ( + BooleanField, CharField, ChoiceField, FileField, + HStoreField, HyperlinkedRelatedField, ValidationError, ) @@ -72,6 +74,10 @@ class GemContentSerializer(MultipleArtifactContentSerializer): ) name = CharField(help_text=_("Name of the gem"), read_only=True) version = CharField(help_text=_("Version of the gem"), read_only=True) + prerelease = BooleanField(help_text=_("Whether the gem is a prerelease"), read_only=True) + dependencies = HStoreField(read_only=True) + required_ruby_version = CharField(help_text=_("Required ruby version of the gem"), read_only=True) + required_rubygems_version = CharField(help_text=_("Required rubygems version of the gem"), read_only=True) def __init__(self, *args, **kwargs): """Initializer for GemContentSerializer.""" @@ -98,23 +104,25 @@ def deferred_validate(self, data): """Validate the GemContent data (deferred).""" artifact = data.pop("artifact") - name, version, spec_data = analyse_gem(artifact.file) - relative_path = os.path.join("gems", name + "-" + version + ".gem") + gem_info, spec_data = analyse_gem(artifact.file) + relative_path = os.path.join("gems", gem_info["name"] + "-" + gem_info["version"] + ".gem") spec_artifact = _artifact_from_data(spec_data) - spec_relative_path = os.path.join("quick/Marshal.4.8", name + "-" + version + ".gemspec.rz") + spec_relative_path = os.path.join( + "quick/Marshal.4.8", gem_info["name"] + "-" + gem_info["version"] + ".gemspec.rz" + ) - data["name"] = name - data["version"] = version + data.update(gem_info) data["artifacts"] = {relative_path: artifact, spec_relative_path: spec_artifact} + data["checksum"] = artifact.sha256 # Validate uniqueness - content = GemContent.objects.filter(name=name, version=version) + content = GemContent.objects.filter(checksum=data["checksum"]) if content.exists(): raise ValidationError( - _( - "There is already a gem content with name '{name}' and version '{version}'." - ).format(name=name, version=version) + _("There is already a gem content with that artifact.").format( + name=name, version=version + ) ) return data @@ -142,6 +150,10 @@ class Meta: "repository", "name", "version", + "prerelease", + "dependencies", + "required_ruby_version", + "required_rubygems_version", ) model = GemContent diff --git a/pulp_gem/app/tasks/publishing.py b/pulp_gem/app/tasks/publishing.py index f1878d5..48c9144 100644 --- a/pulp_gem/app/tasks/publishing.py +++ b/pulp_gem/app/tasks/publishing.py @@ -1,6 +1,8 @@ +import datetime import logging import re import gzip +import os import shutil from gettext import gettext as _ @@ -53,6 +55,21 @@ def _publish_specs(specs, relative_path, publication): specs_metadata_gz.save() +def _publish_compact_index(lines, relative_path, publication, timestamp=False): + with open(relative_path, "w") as fp: + if timestamp: + timestamp = datetime.datetime.utcnow().isoformat(timespec="seconds") + fp.write(f"created_at: {timestamp}Z\n") + fp.write("---\n") + for line in lines: + fp.write(line + "\n") + metadata = PublishedMetadata.create_from_file( + publication=publication, file=File(open(relative_path, "rb")) + ) + metadata.save() + return metadata + + def _create_index(publication, path="", links=None): links = links or [] links = (li if li.endswith("/") else str(Path(li).relative_to(path)) for li in links) @@ -110,6 +127,30 @@ def publish(repository_version_pk): _publish_specs(specs, "specs.4.8", publication) _publish_specs(latest_specs, "latest_specs.4.8", publication) _publish_specs(prerelease_specs, "prerelease_specs.4.8", publication) + + # compact_inde + gems_qs = GemContent.objects.filter(pk__in=publication.repository_version.content) + names_qs = gems_qs.order_by("name").values_list("name", flat=True).distinct() + _publish_compact_index(names_qs, "names", publication) + + versions_lines = [] + os.mkdir("info") + for name in names_qs: + lines = [] + for gem in gems_qs.filter(name=name): + deps = ",".join((f"{key}: {value}" for key, value in gem.dependencies.items())) + line = f"{gem.version} {deps}|checksum:{gem.checksum}" + if gem.required_ruby_version: + line += f",ruby:{gem.required_ruby_version}" + if gem.required_rubygems_version: + line += f",rubygems:{gem.required_rubygems_version}" + lines.append(line) + info_metadata = _publish_compact_index(lines, f"info/{name}", publication) + versions = ",".join(gems_qs.filter(name=name).values_list("version", flat=True)) + md5_sum = info_metadata._artifacts.first().md5 + versions_lines.append(f"{name} {versions} {md5_sum}") + _publish_compact_index(versions_lines, "versions", publication, timestamp=True) + _create_index( publication, path="", @@ -119,10 +160,15 @@ def publish(repository_version_pk): "specs.4.8", "latest_specs.4.8", "prerelease_specs.4.8", + "names", + "versions", + "info/", ], ) _create_index(publication, path="gems/", links=gems) - _create_index(publication, path="quick/", links=[]) + _create_index(publication, path="quick/", links=["quick/Marshal.4.8/"]) _create_index(publication, path="quick/Marshal.4.8/", links=gemspecs) + _create_index(publication, path="info/", links=(f"info/{name}" for name in names_qs)) + log.info(_("Publication: {publication} created").format(publication=publication.pk)) diff --git a/pulp_gem/app/tasks/synchronizing.py b/pulp_gem/app/tasks/synchronizing.py index f0017bb..12f2820 100644 --- a/pulp_gem/app/tasks/synchronizing.py +++ b/pulp_gem/app/tasks/synchronizing.py @@ -2,9 +2,10 @@ import os from gettext import gettext as _ -from urllib.parse import urlparse, urlunparse +from urllib.parse import urlparse, urlunparse, urljoin from asgiref.sync import sync_to_async +from django.conf import settings from pulpcore.plugin.models import Artifact, ProgressReport, Remote, Repository from pulpcore.plugin.stages import ( @@ -20,7 +21,7 @@ ) from pulp_gem.app.models import GemContent, GemRemote -from pulp_gem.specs import read_specs +from pulp_gem.specs import read_specs, read_versions, read_info log = logging.getLogger(__name__) @@ -76,7 +77,7 @@ def synchronize(remote_pk, repository_pk, mirror=False): raise ValueError(_("A remote must have a url specified to synchronize.")) first_stage = GemFirstStage(remote) - dv = GemDeclarativeVersion(first_stage, repository, mirror=mirror) + dv = DeclarativeVersion(first_stage, repository, mirror=mirror) dv.create() @@ -102,72 +103,48 @@ async def run(self): # Interpret policy to download Artifacts or not deferred_download = self.remote.policy != Remote.IMMEDIATE - async with ProgressReport(message="Downloading Metadata") as progress: - parsed_url = urlparse(self.remote.url) - root_dir = parsed_url.path - specs_path = os.path.join(root_dir, "specs.4.8.gz") - specs_url = urlunparse(parsed_url._replace(path=specs_path)) - downloader = self.remote.get_downloader(url=specs_url) - result = await downloader.run() - await progress.aincrement() - - async with ProgressReport(message="Parsing Metadata") as progress: - for key in read_specs(result.path): - relative_path = os.path.join("gems", key.name + "-" + key.version + ".gem") - path = os.path.join(root_dir, relative_path) - url = urlunparse(parsed_url._replace(path=path)) - - spec_relative_path = os.path.join( - "quick/Marshal.4.8", key.name + "-" + key.version + ".gemspec.rz" - ) - spec_path = os.path.join(root_dir, spec_relative_path) - spec_url = urlunparse(parsed_url._replace(path=spec_path)) - gem = GemContent(name=key.name, version=key.version) - da_gem = DeclarativeArtifact( - artifact=Artifact(), - url=url, - relative_path=relative_path, - remote=self.remote, - deferred_download=deferred_download, - ) - da_spec = DeclarativeArtifact( - artifact=Artifact(), - url=spec_url, - relative_path=spec_relative_path, - remote=self.remote, - deferred_download=deferred_download, - ) - dc = DeclarativeContent(content=gem, d_artifacts=[da_gem, da_spec]) - await progress.aincrement() - await self.put(dc) - - -class GemDeclarativeVersion(DeclarativeVersion): - """ - Custom implementation of Declarative version. - """ - - def pipeline_stages(self, new_version): - """ - Build the list of pipeline stages feeding into the ContentUnitAssociation stage. - - This is overwritten to create a custom pipeline. - - Args: - new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The - new repository version that is going to be built. - - Returns: - list: List of :class:`~pulpcore.plugin.stages.Stage` instances - - """ - pipeline = [ - self.first_stage, - QueryExistingContents(), - UpdateExistingContentArtifacts(), - ArtifactDownloader(), - ArtifactSaver(), - ContentSaver(), - RemoteArtifactSaver(), - ] - return pipeline + async with ProgressReport( + message="Downloading versions list", total=1 + ) as pr_download_versions: + versions_url = urljoin(self.remote.url, "versions") + versions_downloader = self.remote.get_downloader(url=versions_url) + versions_result = await versions_downloader.run() + await pr_download_versions.aincrement() + + async with ProgressReport(message="Parsing versions list") as pr_parse_versions: + async with ProgressReport(message="Parsing versions info") as pr_parse_info: + async for name, versions, md5_sum in read_versions(versions_result.path): + info_url = urljoin(urljoin(self.remote.url, "info/"), name) + if "md5" in settings.ALLOWED_CONTENT_CHECKSUMS: + extra_kwargs = {"expected_digests": {"md5": md5_sum}} + else: + extra_kwargs = {} + log.warn("Checksum of info file for '{}' could not be validated.", name) + info_downloader = self.remote.get_downloader(url=info_url, **extra_kwargs) + info_result = await info_downloader.run() + async for gem_info in read_info(info_result.path): + gem_info["name"] = name + gem = GemContent(**gem_info) + gem_path = gem.relative_path + gem_url = urljoin(self.remote.url, gem_path) + gemspec_path = gem.gemspec_path + gemspec_url = urljoin(self.remote.url, gemspec_path) + + da_gem = DeclarativeArtifact( + artifact=Artifact(sha256=gem_info["checksum"]), + url=gem_url, + relative_path=gem_path, + remote=self.remote, + deferred_download=deferred_download, + ) + da_gemspec = DeclarativeArtifact( + artifact=Artifact(), + url=gemspec_url, + relative_path=gemspec_path, + remote=self.remote, + deferred_download=deferred_download, + ) + dc = DeclarativeContent(content=gem, d_artifacts=[da_gem, da_gemspec]) + await pr_parse_info.aincrement() + await self.put(dc) + await pr_parse_versions.aincrement() diff --git a/pulp_gem/specs.py b/pulp_gem/specs.py index aa3c00d..28041f5 100644 --- a/pulp_gem/specs.py +++ b/pulp_gem/specs.py @@ -1,7 +1,9 @@ from collections import namedtuple +import aiofiles import zlib import gzip +import re import yaml from tarfile import TarFile @@ -10,10 +12,81 @@ import rubymarshal.reader +NAME_REGEX = re.compile(r"[\w\.-]+") +VERSION_REGEX = re.compile(r"\d+(?:\.\d+)*") +PRERELEASE_VERSION_REGEX = NAME_REGEX + # Natural key. Key = namedtuple("Key", ("name", "version")) +async def read_versions(relative_path): + # File starts with: + # created_at: + # --- + async with aiofiles.open(relative_path, mode="r") as fp: + results = {} + preamble = True + async for line in fp: + line = line.strip() + if line == "---": + preamble = False + continue + if preamble: + continue + name, versions, md5_sum = line.split(" ", maxsplit=2) + versions = versions.split(",") + entry = results.get(name) or ([], "") + results[name] = (entry[0] + versions, md5_sum) + for name, (versions, md5_sum) in results.items(): + # Sanitize name + if not NAME_REGEX.match(name): + raise ValueError(f"Invalid gem name: {name}") + yield name, versions, md5_sum + + +async def read_info(relative_path): + # File starts with: + # --- + async with aiofiles.open(relative_path, mode="r") as fp: + results = {} + preamble = True + async for line in fp: + line = line.strip() + if line == "---": + preamble = False + continue + if preamble: + continue + gem_info = {} + front, back = line.split("|") + version, dependencies = front.split(" ", maxsplit=1) + # Sanitize version + if VERSION_REGEX.match(version): + gem_info["prerelease"] = False + elif PRERELEASE_VERSION_REGEX.match(version): + gem_info["prerelease"] = True + else: + raise ValueError(f"Invalid version string: {version}") + gem_info["version"] = version + dependencies = dependencies.strip() + if dependencies: + gem_info["dependencies"] = dict( + (item.split(":", maxsplit=1) for item in dependencies.split(",")) + ) + for stmt in back.split(","): + key, value = stmt.split(":") + if key == "checksum": + gem_info["checksum"] = value + elif key == "ruby": + gem_info["required_ruby_version"] = value + elif key == "rubygems": + gem_info["required_rubygems_version"] = value + else: + raise ValueError(f"Invalid requirement: {stmt}") + yield gem_info + + def read_specs(relative_path): """ Read rubygem specs from file. @@ -46,12 +119,20 @@ def write_specs(keys, relative_path): rubymarshal.writer.write(fd, specs) +class RubyMarshalYamlLoader(yaml.SafeLoader): + pass + + def _yaml_ruby_constructor(loader, suffix, node): value = loader.construct_mapping(node) return rubymarshal.classes.UsrMarshal(suffix, value) -yaml.add_multi_constructor("!ruby/object:", _yaml_ruby_constructor, Loader=yaml.SafeLoader) +yaml.add_multi_constructor("!ruby/object:", _yaml_ruby_constructor, Loader=RubyMarshalYamlLoader) + + +def _collapse_requirement(data): + return "&".join([f"{req[0]} {req[1].values['version']}" for req in data.values["requirements"]]) def analyse_gem(file_obj): @@ -60,8 +141,30 @@ def analyse_gem(file_obj): """ with TarFile(fileobj=file_obj) as archive: with archive.extractfile("metadata.gz") as md_file: - data = yaml.safe_load(gzip.decompress(md_file.read())) + data = yaml.load(gzip.decompress(md_file.read()), Loader=RubyMarshalYamlLoader) + gem_info = { + "name": data.values["name"], + "version": data.values["version"].values["version"], + } + # Sanitize name + if not NAME_REGEX.match(gem_info["name"]): + raise ValueError(f"Invalid gem name: {name}") + # Sanitize version + if VERSION_REGEX.match(gem_info["version"]): + gem_info["prerelease"] = False + elif PRERELEASE_VERSION_REGEX.match(gem_info["version"]): + gem_info["prerelease"] = True + else: + raise ValueError(f"Invalid version string: {version}") + for key in ("required_ruby_version", "required_rubygems_version"): + if (requirement := data.values.get(key)) is not None: + gem_info[key] = _collapse_requirement(requirement) + if (dependencies := data.values.get("dependencies")) is not None: + gem_info["dependencies"] = { + dep.values["name"]: _collapse_requirement(dep.values["requirement"]) + for dep in dependencies + } # Workaroud del data.values["date"] zdata = zlib.compress(rubymarshal.writer.writes(data)) - return data.values["name"], data.values["version"].values["version"], zdata + return gem_info, zdata