Skip to content

Commit

Permalink
Work on a different implementation idea.
Browse files Browse the repository at this point in the history
Closes pulp#5762
  • Loading branch information
decko committed Sep 16, 2024
1 parent 6a3b3d4 commit 43cbbd7
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 127 deletions.
11 changes: 1 addition & 10 deletions pulpcore/app/models/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from django.forms.models import model_to_dict
from django.utils.timezone import now
from django_guid import get_guid
from django_lifecycle import BEFORE_UPDATE, BEFORE_SAVE, AFTER_SAVE, AFTER_DELETE, hook
from django_lifecycle import BEFORE_UPDATE, BEFORE_SAVE, hook

from pulpcore.constants import ALL_KNOWN_CONTENT_CHECKSUMS
from pulpcore.app import pulp_hashlib
Expand All @@ -34,7 +34,6 @@
UnsupportedDigestValidationError,
)


# All available digest fields ordered by algorithm strength.
_DIGEST_FIELDS = []
for alg in ("sha512", "sha384", "sha256", "sha224", "sha1", "md5"):
Expand Down Expand Up @@ -391,14 +390,6 @@ def touch(self):
"""Update timestamp_of_interest."""
self.save(update_fields=["timestamp_of_interest"])

@hook(AFTER_SAVE)
@hook(AFTER_DELETE)
def emmit_telemetry(self):
from pulpcore.plugin.tasking import dispatch
from pulpcore.app.tasks import telemetry

dispatch(telemetry.emmit_disk_space_usage_telemetry, args=(self.pulp_domain.pk,))


class PulpTemporaryFile(HandleTempFilesMixin, BaseModel):
"""
Expand Down
2 changes: 1 addition & 1 deletion pulpcore/app/models/domain.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from django.core.files.storage import default_storage
from django.db import models
from django_lifecycle import hook, BEFORE_DELETE, BEFORE_UPDATE, AFTER_CREATE
from django_lifecycle import hook, BEFORE_DELETE, BEFORE_UPDATE

from pulpcore.app.models import BaseModel, AutoAddObjPermsMixin
from pulpcore.exceptions import DomainProtectedError
Expand Down
31 changes: 0 additions & 31 deletions pulpcore/app/tasks/telemetry.py

This file was deleted.

84 changes: 1 addition & 83 deletions pulpcore/app/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@

from django.conf import settings
from django.db import connection
from django.db.models import Model, Sum
from django.db.models import Model
from django.urls import Resolver404, resolve, reverse

from opentelemetry import metrics

from rest_framework.serializers import ValidationError

from pulpcore.app.loggers import deprecation_logger
from pulpcore.app.apps import pulp_plugin_configs
from pulpcore.app import models
from pulpcore.constants import STORAGE_METRICS_LOCK
from pulpcore.exceptions import AdvisoryLockError
from pulpcore.exceptions.validation import InvalidSignatureError

Expand Down Expand Up @@ -516,85 +513,6 @@ def cache_key(base_path):
return base_path


class MetricsEmitter:
"""
A builder class that initializes an emitter.
If Open Telemetry is enabled, the builder configures a real emitter capable of sending data to
the collector. Otherwise, a no-op emitter is initialized. The real emitter may utilize the
global settings to send metrics.
By default, the emitter sends data to the collector every 60 seconds. Adjust the environment
variable OTEL_METRIC_EXPORT_INTERVAL accordingly if needed.
"""

class _NoopEmitter:
def __call__(self, *args, **kwargs):
return self

def __getattr__(self, *args, **kwargs):
return self

@classmethod
def build(cls, *args, **kwargs):
otel_enabled = os.getenv("PULP_OTEL_ENABLED", "").lower() == "true"
if otel_enabled and settings.DOMAIN_ENABLED:
return cls(*args, **kwargs)
else:
return cls._NoopEmitter()


class DomainMetricsEmitter(MetricsEmitter):
"""A builder class that initializes an emitter for recording domain's metrics."""

def __init__(self, domain):
self.domain = domain
self.meter = metrics.get_meter(f"domain.{domain.name}.meter")
self.instrument = self._init_emitting_total_size()

def _init_emitting_total_size(self):
return self.meter.create_observable_gauge(
name="disk_usage",
description="The total disk size by domain.",
callbacks=[self._disk_usage_callback()],
unit="Bytes",
)

def _disk_usage_callback(self):
try:
with PGAdvisoryLock(STORAGE_METRICS_LOCK):
from pulpcore.app.models import Artifact

options = yield # noqa

while True:
artifacts = Artifact.objects.filter(pulp_domain=self.domain).only("size")
total_size = artifacts.aggregate(size=Sum("size", default=0))["size"]
options = yield [ # noqa
metrics.Observation(
total_size,
{
"pulp_href": get_url(self.domain),
"domain_name": self.domain.name,
},
)
]
except AdvisoryLockError:
yield


def init_domain_metrics_exporter():
PULP_OTEL_SPACE_USAGE_TELEMETRY = (
os.getenv("PULP_OTEL_SPACE_USAGE_TELEMETRY", "").lower() == "true"
)

if PULP_OTEL_SPACE_USAGE_TELEMETRY:
from pulpcore.app.models.domain import Domain

for domain in Domain.objects.all():
DomainMetricsEmitter.build(domain)


class PGAdvisoryLock:
"""
A context manager that will hold a postgres advisory lock non-blocking.
Expand Down
37 changes: 35 additions & 2 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from django.conf import settings
from django.db import connection
from django.db.models import Case, Count, F, Max, Value, When
from django.db.models import Case, Count, F, Max, Value, When, Sum
from django.utils import timezone

from pulpcore.constants import (
Expand All @@ -24,9 +24,10 @@
TASK_SCHEDULING_LOCK,
TASK_UNBLOCKING_LOCK,
TASK_METRICS_HEARTBEAT_LOCK,
STORAGE_METRICS_LOCK,
)
from pulpcore.app.apps import pulp_plugin_configs
from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus
from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus, Artifact
from pulpcore.app.util import PGAdvisoryLock, get_domain
from pulpcore.exceptions import AdvisoryLockError

Expand Down Expand Up @@ -74,6 +75,8 @@ def __init__(self):
WORKER_CLEANUP_INTERVAL / 10, WORKER_CLEANUP_INTERVAL
)

self.last_space_usage_metric_heartbeat = timezone.now()

meter = get_meter(__name__)
self.tasks_unblocked_queue_meter = meter.create_gauge(
name="tasks_unblocked_queue",
Expand All @@ -87,6 +90,12 @@ def __init__(self):
unit="seconds",
)

self.disk_usage_meter = meter.create_gauge(
name="disk_usage",
description="The total disk size per domain.",
unit="bytes",
)

# Add a file descriptor to trigger select on signals
self.sentinel, sentinel_w = os.pipe()
os.set_blocking(self.sentinel, False)
Expand Down Expand Up @@ -176,6 +185,7 @@ def beat(self):
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_SCHEDULING_LOCK):
dispatch_scheduled_tasks()
self.record_unblocked_waiting_tasks_metric()
self.record_domain_space_usage()

def notify_workers(self):
self.cursor.execute("NOTIFY pulp_worker_wakeup")
Expand Down Expand Up @@ -456,6 +466,27 @@ def handle_available_tasks(self):
keep_looping = True
self.supervise_task(task)

def record_domain_space_usage(self):
if os.getenv("PULP_OTEL_ENABLED", "").lower() != "true" and not settings.DOMAIN_ENABLED:
return

with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(STORAGE_METRICS_LOCK):
now = timezone.now()
if now > self.last_space_usage_metric_heartbeat + self.heartbeat_period:
space_utilization_per_domain = Artifact.objects.values(
"pulp_domain__name"
).annotate(total_size=Sum("size", default=0))

# We're using the same gauge with different attributes/label for each domain space usage
for domain in space_utilization_per_domain:
self.disk_usage_meter.set(
domain["total_size"], {"domain_name": domain["pulp_domain__name"]}
)

self.cursor.execute(
f"NOTIFY pulp_worker_space_utilization_metrics_heartbeat, '{str(now)}'"
)

def record_unblocked_waiting_tasks_metric(self):
if os.getenv("PULP_OTEL_ENABLED", "").lower() != "true":
return
Expand Down Expand Up @@ -504,6 +535,7 @@ def run(self, burst=False):
connection.connection.add_notify_handler(self._pg_notify_handler)
self.cursor.execute("LISTEN pulp_worker_cancel")
self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat")
self.cursor.execute("LISTEN pulp_worker_space_utilization_metrics_heartbeat")
if burst:
self.handle_available_tasks()
else:
Expand All @@ -516,6 +548,7 @@ def run(self, burst=False):
break
self.sleep()
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
self.cursor.execute("UNLISTEN pulp_worker_space_utilization_metrics_heartbeat")
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")
self.cursor.execute("UNLISTEN pulp_worker_cancel")
self.shutdown()

0 comments on commit 43cbbd7

Please sign in to comment.