Skip to content

Commit

Permalink
Add a lock to avoid multiple workers to emit at the same time.
Browse files Browse the repository at this point in the history
Closes #5442
  • Loading branch information
decko authored and dkliban committed Jun 6, 2024
1 parent c7079c8 commit 02bb455
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGES/5442.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a lock to avoid multiple workers sending metrics at the same time.
72 changes: 55 additions & 17 deletions pulpcore/app/util.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
import hashlib
import zlib
from functools import lru_cache
from gettext import gettext as _
import os
import tempfile
import gnupg

from functools import lru_cache
from gettext import gettext as _
from urllib.parse import urlparse

from contextlib import ExitStack
from contextvars import ContextVar
from datetime import timedelta
import gnupg

from django.conf import settings
from django.db import connection
from django.db.models import Model, Sum
from django.urls import Resolver404, resolve, reverse

from opentelemetry import metrics

from rest_framework.serializers import ValidationError

from pulpcore.app.loggers import deprecation_logger
from pulpcore.app.apps import pulp_plugin_configs
from pulpcore.app import models
from pulpcore.constants import STORAGE_METRICS_LOCK
from pulpcore.exceptions import AdvisoryLockError
from pulpcore.exceptions.validation import InvalidSignatureError


Expand Down Expand Up @@ -539,19 +542,26 @@ def _init_emitting_total_size(self):
)

def _disk_usage_callback(self):
from pulpcore.app.models import Artifact

options = yield # noqa

while True:
artifacts = Artifact.objects.filter(pulp_domain=self.domain).distinct()
total_size = artifacts.aggregate(size=Sum("size", default=0))["size"]
options = yield [ # noqa
metrics.Observation(
total_size,
{"pulp_href": get_url(self.domain), "domain_name": self.domain.name},
)
]
try:
with PGAdvisoryLock(STORAGE_METRICS_LOCK):
from pulpcore.app.models import Artifact

options = yield # noqa

while True:
artifacts = Artifact.objects.filter(pulp_domain=self.domain).distinct()
total_size = artifacts.aggregate(size=Sum("size", default=0))["size"]
options = yield [ # noqa
metrics.Observation(
total_size,
{
"pulp_href": get_url(self.domain),
"domain_name": self.domain.name,
},
)
]
except AdvisoryLockError:
yield

class _NoopEmitter:
def __call__(self, *args, **kwargs):
Expand All @@ -574,3 +584,31 @@ def init_domain_metrics_exporter():

for domain in Domain.objects.all():
DomainMetricsEmitterBuilder.build(domain)


class PGAdvisoryLock:
"""
A context manager that will hold a postgres advisory lock non-blocking.
The locks can be chosen from a lock group to avoid collisions. They will never collide with the
locks used for tasks.
"""

def __init__(self, lock, lock_group=0):
self.lock_group = lock_group
self.lock = lock

def __enter__(self):
with connection.cursor() as cursor:
cursor.execute("SELECT pg_try_advisory_lock(%s, %s)", [self.lock_group, self.lock])
acquired = cursor.fetchone()[0]
if not acquired:
raise AdvisoryLockError("Could not acquire lock.")
return self

def __exit__(self, exc_type, exc_value, traceback):
with connection.cursor() as cursor:
cursor.execute("SELECT pg_advisory_unlock(%s, %s)", [self.lock_group, self.lock])
released = cursor.fetchone()[0]
if not released:
raise RuntimeError("Lock not held.")
1 change: 1 addition & 0 deletions pulpcore/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
TASK_SCHEDULING_LOCK = 42
TASK_UNBLOCKING_LOCK = 84
TASK_METRICS_HEARTBEAT_LOCK = 74
STORAGE_METRICS_LOCK = 72


#: All valid task states.
Expand Down
29 changes: 0 additions & 29 deletions pulpcore/tasking/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,11 @@
configure_cleanup,
)
from pulpcore.constants import TASK_FINAL_STATES, TASK_STATES, VAR_TMP_PULP
from pulpcore.exceptions import AdvisoryLockError
from pulpcore.tasking.tasks import dispatch, execute_task

_logger = logging.getLogger(__name__)


class PGAdvisoryLock:
"""
A context manager that will hold a postgres advisory lock non-blocking.
The locks can be chosen from a lock group to avoid collisions. They will never collide with the
locks used for tasks.
"""

def __init__(self, lock, lock_group=0):
self.lock_group = lock_group
self.lock = lock

def __enter__(self):
with connection.cursor() as cursor:
cursor.execute("SELECT pg_try_advisory_lock(%s, %s)", [self.lock_group, self.lock])
acquired = cursor.fetchone()[0]
if not acquired:
raise AdvisoryLockError("Could not acquire lock.")
return self

def __exit__(self, exc_type, exc_value, traceback):
with connection.cursor() as cursor:
cursor.execute("SELECT pg_advisory_unlock(%s, %s)", [self.lock_group, self.lock])
released = cursor.fetchone()[0]
if not released:
raise RuntimeError("Lock not held.")


def startup_hook():
configure_analytics()
configure_cleanup()
Expand Down
4 changes: 2 additions & 2 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
TASK_UNBLOCKING_LOCK,
TASK_METRICS_HEARTBEAT_LOCK,
)
from pulpcore.exceptions import AdvisoryLockError
from pulpcore.app.apps import pulp_plugin_configs
from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus
from pulpcore.app.util import PGAdvisoryLock
from pulpcore.exceptions import AdvisoryLockError

from pulpcore.tasking.storage import WorkerDirectory
from pulpcore.tasking._util import (
delete_incomplete_resources,
dispatch_scheduled_tasks,
perform_task,
startup_hook,
PGAdvisoryLock,
)

_logger = logging.getLogger(__name__)
Expand Down

0 comments on commit 02bb455

Please sign in to comment.