diff --git a/pulpcore/app/models/content.py b/pulpcore/app/models/content.py index 8465b2e175..5205715cab 100644 --- a/pulpcore/app/models/content.py +++ b/pulpcore/app/models/content.py @@ -21,7 +21,7 @@ from django.forms.models import model_to_dict from django.utils.timezone import now from django_guid import get_guid -from django_lifecycle import BEFORE_UPDATE, BEFORE_SAVE, AFTER_SAVE, AFTER_DELETE, hook +from django_lifecycle import BEFORE_UPDATE, BEFORE_SAVE, hook from pulpcore.constants import ALL_KNOWN_CONTENT_CHECKSUMS from pulpcore.app import pulp_hashlib @@ -34,7 +34,6 @@ UnsupportedDigestValidationError, ) - # All available digest fields ordered by algorithm strength. _DIGEST_FIELDS = [] for alg in ("sha512", "sha384", "sha256", "sha224", "sha1", "md5"): @@ -391,14 +390,6 @@ def touch(self): """Update timestamp_of_interest.""" self.save(update_fields=["timestamp_of_interest"]) - @hook(AFTER_SAVE) - @hook(AFTER_DELETE) - def emmit_telemetry(self): - from pulpcore.plugin.tasking import dispatch - from pulpcore.app.tasks import telemetry - - dispatch(telemetry.emmit_disk_space_usage_telemetry, args=(self.pulp_domain.pk,)) - class PulpTemporaryFile(HandleTempFilesMixin, BaseModel): """ diff --git a/pulpcore/app/models/domain.py b/pulpcore/app/models/domain.py index 6d2386fe15..aee7ba81d0 100644 --- a/pulpcore/app/models/domain.py +++ b/pulpcore/app/models/domain.py @@ -1,6 +1,6 @@ from django.core.files.storage import default_storage from django.db import models -from django_lifecycle import hook, BEFORE_DELETE, BEFORE_UPDATE, AFTER_CREATE +from django_lifecycle import hook, BEFORE_DELETE, BEFORE_UPDATE from pulpcore.app.models import BaseModel, AutoAddObjPermsMixin from pulpcore.exceptions import DomainProtectedError diff --git a/pulpcore/app/tasks/telemetry.py b/pulpcore/app/tasks/telemetry.py deleted file mode 100644 index 74bc9f1c3a..0000000000 --- a/pulpcore/app/tasks/telemetry.py +++ /dev/null @@ -1,31 +0,0 @@ -from logging import getLogger - -from opentelemetry import metrics - -from django.db.models import Sum - -_logger = getLogger(__name__) - - -def emmit_disk_space_usage_telemetry(domain_pk=None, *args, **kwargs): - from pulpcore.app.models import Artifact, Domain - - artifacts = Artifact.objects.values("pulp_domain__name").annotate( - total_size=Sum("size", default=0) - ) - - if domain_pk: - domain = Domain.objects.get(pk=domain_pk) - meter = metrics.get_meter(f"domain.{domain.name}.disk_usage.meter") - else: - _logger.info("Ready to calculate the space usage for all domains") - meter = metrics.get_meter("disk_usage.meter") - - gauge = meter.create_gauge( - name="disk_usage", - description="The total disk size per domain.", - unit="bytes", - ) - - for domain in artifacts: - gauge.set(domain["total_size"], {"domain_name": domain["pulp_domain__name"]}) diff --git a/pulpcore/app/util.py b/pulpcore/app/util.py index 0b6b40be82..9da3c11e8c 100644 --- a/pulpcore/app/util.py +++ b/pulpcore/app/util.py @@ -13,17 +13,14 @@ from django.conf import settings from django.db import connection -from django.db.models import Model, Sum +from django.db.models import Model from django.urls import Resolver404, resolve, reverse -from opentelemetry import metrics - from rest_framework.serializers import ValidationError from pulpcore.app.loggers import deprecation_logger from pulpcore.app.apps import pulp_plugin_configs from pulpcore.app import models -from pulpcore.constants import STORAGE_METRICS_LOCK from pulpcore.exceptions import AdvisoryLockError from pulpcore.exceptions.validation import InvalidSignatureError @@ -544,57 +541,6 @@ def build(cls, *args, **kwargs): return cls._NoopEmitter() -class DomainMetricsEmitter(MetricsEmitter): - """A builder class that initializes an emitter for recording domain's metrics.""" - - def __init__(self, domain): - self.domain = domain - self.meter = metrics.get_meter(f"domain.{domain.name}.meter") - self.instrument = self._init_emitting_total_size() - - def _init_emitting_total_size(self): - return self.meter.create_observable_gauge( - name="disk_usage", - description="The total disk size by domain.", - callbacks=[self._disk_usage_callback()], - unit="Bytes", - ) - - def _disk_usage_callback(self): - try: - with PGAdvisoryLock(STORAGE_METRICS_LOCK): - from pulpcore.app.models import Artifact - - options = yield # noqa - - while True: - artifacts = Artifact.objects.filter(pulp_domain=self.domain).only("size") - total_size = artifacts.aggregate(size=Sum("size", default=0))["size"] - options = yield [ # noqa - metrics.Observation( - total_size, - { - "pulp_href": get_url(self.domain), - "domain_name": self.domain.name, - }, - ) - ] - except AdvisoryLockError: - yield - - -def init_domain_metrics_exporter(): - PULP_OTEL_SPACE_USAGE_TELEMETRY = ( - os.getenv("PULP_OTEL_SPACE_USAGE_TELEMETRY", "").lower() == "true" - ) - - if PULP_OTEL_SPACE_USAGE_TELEMETRY: - from pulpcore.app.models.domain import Domain - - for domain in Domain.objects.all(): - DomainMetricsEmitter.build(domain) - - class PGAdvisoryLock: """ A context manager that will hold a postgres advisory lock non-blocking. diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index df7322f44d..608048ad99 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -15,7 +15,7 @@ from django.conf import settings from django.db import connection -from django.db.models import Case, Count, F, Max, Value, When +from django.db.models import Case, Count, F, Max, Value, When, Sum from django.utils import timezone from pulpcore.constants import ( @@ -24,9 +24,10 @@ TASK_SCHEDULING_LOCK, TASK_UNBLOCKING_LOCK, TASK_METRICS_HEARTBEAT_LOCK, + STORAGE_METRICS_LOCK, ) from pulpcore.app.apps import pulp_plugin_configs -from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus +from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus, Artifact from pulpcore.app.util import PGAdvisoryLock, get_domain from pulpcore.exceptions import AdvisoryLockError @@ -74,6 +75,8 @@ def __init__(self): WORKER_CLEANUP_INTERVAL / 10, WORKER_CLEANUP_INTERVAL ) + self.last_space_usage_metric_heartbeat = timezone.now() + meter = get_meter(__name__) self.tasks_unblocked_queue_meter = meter.create_gauge( name="tasks_unblocked_queue", @@ -87,6 +90,12 @@ def __init__(self): unit="seconds", ) + self.disk_usage_meter = meter.create_gauge( + name="disk_usage", + description="The total disk size per domain.", + unit="bytes", + ) + # Add a file descriptor to trigger select on signals self.sentinel, sentinel_w = os.pipe() os.set_blocking(self.sentinel, False) @@ -176,6 +185,7 @@ def beat(self): with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_SCHEDULING_LOCK): dispatch_scheduled_tasks() self.record_unblocked_waiting_tasks_metric() + self.record_domain_space_usage() def notify_workers(self): self.cursor.execute("NOTIFY pulp_worker_wakeup") @@ -456,6 +466,27 @@ def handle_available_tasks(self): keep_looping = True self.supervise_task(task) + def record_domain_space_usage(self): + if os.getenv("PULP_OTEL_ENABLED", "").lower() != "true" and not settings.DOMAIN_ENABLED: + return + + with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(STORAGE_METRICS_LOCK): + now = timezone.now() + if now > self.last_space_usage_metric_heartbeat + self.heartbeat_period: + space_utilization_per_domain = Artifact.objects.values( + "pulp_domain__name" + ).annotate(total_size=Sum("size", default=0)) + + # We're using the same gauge with different attributes for each domain space usage + for domain in space_utilization_per_domain: + self.disk_usage_meter.set( + domain["total_size"], {"domain_name": domain["pulp_domain__name"]} + ) + + self.cursor.execute( + f"NOTIFY pulp_worker_space_utilization_metrics_heartbeat, '{str(now)}'" + ) + def record_unblocked_waiting_tasks_metric(self): if os.getenv("PULP_OTEL_ENABLED", "").lower() != "true": return @@ -504,6 +535,7 @@ def run(self, burst=False): connection.connection.add_notify_handler(self._pg_notify_handler) self.cursor.execute("LISTEN pulp_worker_cancel") self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat") + self.cursor.execute("LISTEN pulp_worker_space_utilization_metrics_heartbeat") if burst: self.handle_available_tasks() else: @@ -516,6 +548,7 @@ def run(self, burst=False): break self.sleep() self.cursor.execute("UNLISTEN pulp_worker_wakeup") + self.cursor.execute("UNLISTEN pulp_worker_space_utilization_metrics_heartbeat") self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat") self.cursor.execute("UNLISTEN pulp_worker_cancel") self.shutdown()