Skip to content

Commit

Permalink
Fix replicate for a distribution with publication
Browse files Browse the repository at this point in the history
In case a distribution that is supposed to be subject to replication is
referring to a publication (because it was created by someone else) we
need to unset the publication field for the replicate task to succeed.

fixes #4637
  • Loading branch information
mdellweg committed Mar 22, 2024
1 parent 1dd9850 commit a1dd579
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 100 deletions.
1 change: 1 addition & 0 deletions CHANGES/4637.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed an issue in replicate, where an existing distribution had a conflicting publication set.
19 changes: 17 additions & 2 deletions pulp_file/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,19 +308,34 @@ def _file_remote_client_cert_req_factory(*, manifest_path, policy, **kwargs):


@pytest.fixture(scope="class")
def file_repository_factory(file_repository_api_client, gen_object_with_cleanup):
def file_repository_factory(file_bindings, gen_object_with_cleanup):
"""A factory to generate a File Repository with auto-deletion after the test run."""

def _file_repository_factory(pulp_domain=None, **body):
kwargs = {}
if pulp_domain:
kwargs["pulp_domain"] = pulp_domain
body.setdefault("name", str(uuid.uuid4()))
return gen_object_with_cleanup(file_repository_api_client, body, **kwargs)
return gen_object_with_cleanup(file_bindings.RepositoriesFileApi, body, **kwargs)

return _file_repository_factory


@pytest.fixture(scope="class")
def file_publication_factory(file_bindings, gen_object_with_cleanup):
"""A factory to generate a File Publication with auto-deletion after the test run."""

def _file_publication_factory(**kwargs):
extra_args = {}
if pulp_domain := kwargs.pop("pulp_domain", None):
extra_args["pulp_domain"] = pulp_domain
# XOR check on repository and repository_version
assert bool("repository" in kwargs) ^ bool("repository_version" in kwargs)
return gen_object_with_cleanup(file_bindings.PublicationsFileApi, kwargs, **extra_args)

return _file_publication_factory


@pytest.fixture
def gen_bad_response_fixture_server(gen_threaded_aiohttp_server):
"""
Expand Down
116 changes: 70 additions & 46 deletions pulpcore/app/replica.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
from django.conf import settings
from django.db.models import Model
import logging

from pulp_glue.common.context import PulpContext
from pulpcore.tasking.tasks import dispatch
from pulpcore.app.tasks.base import general_update, general_create, general_delete
from pulpcore.app.tasks.base import (
general_update,
general_create,
general_multi_delete,
)
from pulpcore.plugin.util import get_url, get_domain

_logger = logging.getLogger(__name__)

class ReplicaContext(PulpContext):
def prompt(self, *args, **kwargs):
pass

def echo(self, *args, **kwargs):
pass
class ReplicaContext(PulpContext):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.out_buf = ""
self.err_buf = ""

def echo(self, message: str, nl: bool = True, err: bool = False) -> None:
if err:
self.err_buf += message
if nl:
_logger.warn("{}", self.err_buf)
self.err_buf = ""
else:
self.out_buf += message
if nl:
_logger.info("{}", self.out_buf)
self.out_buf = ""


class Replicator:
Expand All @@ -39,6 +57,7 @@ def __init__(self, pulp_ctx, task_group, tls_settings):
self.tls_settings = tls_settings
self.domain = get_domain()
uri = "/api/v3/distributions/"
# TODO check and compare this to distribution locking on the distribution viewset.
if settings.DOMAIN_ENABLED:
uri = f"/{self.domain.name}{uri}"
self.distros_uri = uri
Expand Down Expand Up @@ -133,49 +152,47 @@ def create_or_update_repository(self, remote):
repository.save()
return repository

def distribution_data(self, repository, upstream_distribution):
"""
Return the fields that need to be updated/cleared on distributions for idempotence.
"""
return {
"repository": get_url(repository),
"publication": None,
"base_path": upstream_distribution["base_path"],
}

def create_or_update_distribution(self, repository, upstream_distribution):
distribution_data = self.distribution_data(repository, upstream_distribution)
try:
distro = self.distribution_model_cls.objects.get(
name=upstream_distribution["name"], pulp_domain=self.domain
)
# Check that the distribution has the right repository associated
needs_update = self.needs_update(
{
"repository": get_url(repository),
"base_path": upstream_distribution["base_path"],
},
distro,
)
needs_update = self.needs_update(distribution_data, distro)
if needs_update:
# Update the distribution
dispatch(
general_update,
task_group=self.task_group,
shared_resources=[repository],
exclusive_resources=[self.distros_uri],
args=(distro.pk, self.app_label, self.distribution_serializer_name),
kwargs={
"data": {
"name": upstream_distribution["name"],
"base_path": upstream_distribution["base_path"],
"repository": get_url(repository),
},
"data": distribution_data,
"partial": True,
},
)
except self.distribution_model_cls.DoesNotExist:
# Dispatch a task to create the distribution
distribution_data["name"] = upstream_distribution["name"]
dispatch(
general_create,
task_group=self.task_group,
shared_resources=[repository],
exclusive_resources=[self.distros_uri],
args=(self.app_label, self.distribution_serializer_name),
kwargs={
"data": {
"name": upstream_distribution["name"],
"base_path": upstream_distribution["base_path"],
"repository": get_url(repository),
}
},
kwargs={"data": distribution_data},
)

def sync_params(self, repository, remote):
Expand All @@ -193,35 +210,42 @@ def sync(self, repository, remote):

def remove_missing(self, names):
# Remove all distributions with names not present in the list of names
distros_to_delete = self.distribution_model_cls.objects.filter(
pulp_domain=self.domain
).exclude(name__in=names)
for distro in distros_to_delete:
# Perform this in an extra task, because we hold a big lock here.
distribution_ids = [
(distribution.pk, self.app_label, self.distribution_serializer_name)
for distribution in self.distribution_model_cls.objects.filter(
pulp_domain=self.domain
).exclude(name__in=names)
]
if distribution_ids:
dispatch(
general_delete,
general_multi_delete,
task_group=self.task_group,
exclusive_resources=[self.distros_uri],
args=(distro.pk, self.app_label, self.distribution_serializer_name),
args=(distribution_ids,),
)

# Remove all the repositories and remotes of the missing distributions
repos_to_delete = self.repository_model_cls.objects.filter(pulp_domain=self.domain).exclude(
name__in=names
)
for repo in repos_to_delete:
dispatch(
general_delete,
task_group=self.task_group,
exclusive_resources=[repo],
args=(repo.pk, self.app_label, self.repository_serializer_name),
repositories = list(
self.repository_model_cls.objects.filter(pulp_domain=self.domain).exclude(
name__in=names
)
remotes_to_delete = self.remote_model_cls.objects.filter(pulp_domain=self.domain).exclude(
name__in=names
)
for remote in remotes_to_delete:
repository_ids = [
(repo.pk, self.app_label, self.repository_serializer_name) for repo in repositories
]

remotes = list(
self.remote_model_cls.objects.filter(pulp_domain=self.domain).exclude(name__in=names)
)
remote_ids = [
(remote.pk, self.app_label, self.remote_serializer_name) for remote in remotes
]

if repository_ids or remote_ids:
dispatch(
general_delete,
general_multi_delete,
task_group=self.task_group,
exclusive_resources=[remote],
args=(remote.pk, self.app_label, self.remote_serializer_name),
exclusive_resources=repositories + remotes,
args=(repository_ids + remote_ids,),
)
17 changes: 3 additions & 14 deletions pulpcore/app/tasks/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from pulpcore.app.apps import pulp_plugin_configs
from pulpcore.app.models import UpstreamPulp, TaskGroup
from pulpcore.app.replica import ReplicaContext
from pulpcore.app.util import get_domain

from pulp_glue.common import __version__ as pulp_glue_version

Expand All @@ -24,7 +23,6 @@ def user_agent():


def replicate_distributions(server_pk):
domain = get_domain()
server = UpstreamPulp.objects.get(pk=server_pk)

# Write out temporary files related to SSL
Expand Down Expand Up @@ -80,18 +78,9 @@ def replicate_distributions(server_pk):
# Create remote
remote = replicator.create_or_update_remote(upstream_distribution=distro)
if not remote:
# The upstream distribution is not serving any content, cleanup an existing local
# distribution
try:
local_distro = replicator.distribution_model.objects.get(
name=distro["name"], pulp_domain=domain
)
local_distro.repository = None
local_distro.publication = None
local_distro.save()
continue
except replicator.distribution_model.DoesNotExist:
continue
# The upstream distribution is not serving any content,
# let if fall throug the cracks and be cleanup below.
continue
# Check if there is already a repository
repository = replicator.create_or_update_repository(remote=remote)

Expand Down
76 changes: 38 additions & 38 deletions pulpcore/tests/functional/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,44 @@ def random_artifact(random_artifact_factory):
return random_artifact_factory()


@pytest.fixture()
def domain_factory(pulpcore_bindings, pulp_settings, gen_object_with_cleanup):
def _domain_factory():
if not pulp_settings.DOMAIN_ENABLED:
pytest.skip("Domains not enabled")
keys = dict()
keys["pulpcore.app.models.storage.FileSystem"] = ["MEDIA_ROOT"]
keys["storages.backends.s3boto3.S3Boto3Storage"] = [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_S3_ENDPOINT_URL",
"AWS_S3_ADDRESSING_STYLE",
"AWS_S3_SIGNATURE_VERSION",
"AWS_S3_REGION_NAME",
"AWS_STORAGE_BUCKET_NAME",
]
keys["storages.backends.azure_storage.AzureStorage"] = [
"AZURE_ACCOUNT_NAME",
"AZURE_CONTAINER",
"AZURE_ACCOUNT_KEY",
"AZURE_URL_EXPIRATION_SECS",
"AZURE_OVERWRITE_FILES",
"AZURE_LOCATION",
"AZURE_CONNECTION_STRING",
]
settings = dict()
for key in keys[pulp_settings.DEFAULT_FILE_STORAGE]:
settings[key] = getattr(pulp_settings, key, None)
body = {
"name": str(uuid.uuid4()),
"storage_class": pulp_settings.DEFAULT_FILE_STORAGE,
"storage_settings": settings,
}
return gen_object_with_cleanup(pulpcore_bindings.DomainsApi, body)

return _domain_factory


# Random other fixtures


Expand Down Expand Up @@ -1263,41 +1301,3 @@ def _wget_recursive_download_on_host(url, destination):
)

return _wget_recursive_download_on_host


@pytest.fixture()
def domain_factory(domains_api_client, pulp_settings, gen_object_with_cleanup):
def _domain_factory():
if not pulp_settings.DOMAIN_ENABLED:
pytest.skip("Domains not enabled")
keys = dict()
keys["pulpcore.app.models.storage.FileSystem"] = ["MEDIA_ROOT"]
keys["storages.backends.s3boto3.S3Boto3Storage"] = [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_S3_ENDPOINT_URL",
"AWS_S3_ADDRESSING_STYLE",
"AWS_S3_SIGNATURE_VERSION",
"AWS_S3_REGION_NAME",
"AWS_STORAGE_BUCKET_NAME",
]
keys["storages.backends.azure_storage.AzureStorage"] = [
"AZURE_ACCOUNT_NAME",
"AZURE_CONTAINER",
"AZURE_ACCOUNT_KEY",
"AZURE_URL_EXPIRATION_SECS",
"AZURE_OVERWRITE_FILES",
"AZURE_LOCATION",
"AZURE_CONNECTION_STRING",
]
settings = dict()
for key in keys[pulp_settings.DEFAULT_FILE_STORAGE]:
settings[key] = getattr(pulp_settings, key, None)
body = {
"name": str(uuid.uuid4()),
"storage_class": pulp_settings.DEFAULT_FILE_STORAGE,
"storage_settings": settings,
}
return gen_object_with_cleanup(domains_api_client, body)

return _domain_factory
Loading

0 comments on commit a1dd579

Please sign in to comment.