Skip to content

Commit

Permalink
Report the size of served artifacts
Browse files Browse the repository at this point in the history
closes #4602
  • Loading branch information
lubosmj committed Sep 9, 2024
1 parent 91d1587 commit f26482c
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGES/4602.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added metrics reporting the size of served artifacts.
3 changes: 2 additions & 1 deletion docs/admin/learn/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ Collector here](https://opentelemetry.io/docs/collector/).

- Access to every API endpoint (an HTTP method, target URL, status code, and user agent).
- Access to every requested package (an HTTP method, target URL, status code, and user agent).
- Disk usage within a specific domain (total used disk space and the reference to the domain).
- Disk usage within a specific domain (total used disk space and the reference to a domain). Currently disabled.
- The size of served artifacts (total count of served data and the reference to a domain).

The information above is sent to the collector in the form of spans and metrics. Thus, the data is
emitted either based on the user interaction with the system or on a regular basis. Consult
Expand Down
15 changes: 0 additions & 15 deletions pulpcore/app/models/domain.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from opentelemetry.metrics import Observation

from django.core.files.storage import default_storage
from django.db import models
from django_lifecycle import hook, BEFORE_DELETE, BEFORE_UPDATE
Expand Down Expand Up @@ -86,16 +84,3 @@ class Meta:
permissions = [
("manage_roles_domain", "Can manage role assignments on domain"),
]


def disk_usage_callback(domain):
from pulpcore.app.models import Artifact
from pulpcore.app.util import get_url

options = yield # noqa
while True:
artifacts = Artifact.objects.filter(pulp_domain=domain).only("size")
total_size = artifacts.aggregate(size=models.Sum("size", default=0))["size"]
options = yield [ # noqa
Observation(total_size, {"pulp_href": get_url(domain), "domain_name": domain.name})
]
94 changes: 49 additions & 45 deletions pulpcore/app/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,53 +516,18 @@ def cache_key(base_path):
return base_path


class DomainMetricsEmitterBuilder:
"""A builder class that initializes an emitter for recording domain's metrics.
class MetricsEmitter:
"""
A builder class that initializes an emitter.
If Open Telemetry is enabled, the builder configures a real emitter capable of sending data to
the collector. Otherwise, a no-op emitter is initialized. The real emitter utilizes the global
settings to send metrics.
the collector. Otherwise, a no-op emitter is initialized. The real emitter may utilize the
global settings to send metrics.
By default, the emitter sends data to the collector every 60 seconds. Adjust the environment
variable OTEL_METRIC_EXPORT_INTERVAL accordingly if needed.
"""

class _DomainMetricsEmitter:
def __init__(self, domain):
self.domain = domain
self.meter = metrics.get_meter(f"domain.{domain.name}.meter")
self.instrument = self._init_emitting_total_size()

def _init_emitting_total_size(self):
return self.meter.create_observable_gauge(
name="disk_usage",
description="The total disk size by domain.",
callbacks=[self._disk_usage_callback()],
unit="Bytes",
)

def _disk_usage_callback(self):
try:
with PGAdvisoryLock(STORAGE_METRICS_LOCK):
from pulpcore.app.models import Artifact

options = yield # noqa

while True:
artifacts = Artifact.objects.filter(pulp_domain=self.domain).only("size")
total_size = artifacts.aggregate(size=Sum("size", default=0))["size"]
options = yield [ # noqa
metrics.Observation(
total_size,
{
"pulp_href": get_url(self.domain),
"domain_name": self.domain.name,
},
)
]
except AdvisoryLockError:
yield

class _NoopEmitter:
def __call__(self, *args, **kwargs):
return self
Expand All @@ -571,19 +536,58 @@ def __getattr__(self, *args, **kwargs):
return self

@classmethod
def build(cls, domain):
otel_enabled = os.getenv("PULP_OTEL_ENABLED")
if otel_enabled == "true" and settings.DOMAIN_ENABLED:
return cls._DomainMetricsEmitter(domain)
def build(cls, *args, **kwargs):
otel_enabled = os.getenv("PULP_OTEL_ENABLED", "").lower() == "true"
if otel_enabled and settings.DOMAIN_ENABLED:
return cls(*args, **kwargs)
else:
return cls._NoopEmitter()


class DomainMetricsEmitter(MetricsEmitter):
"""A builder class that initializes an emitter for recording domain's metrics."""

def __init__(self, domain):
self.domain = domain
self.meter = metrics.get_meter(f"domain.{domain.name}.meter")
self.instrument = self._init_emitting_total_size()

def _init_emitting_total_size(self):
return self.meter.create_observable_gauge(
name="disk_usage",
description="The total disk size by domain.",
callbacks=[self._disk_usage_callback()],
unit="Bytes",
)

def _disk_usage_callback(self):
try:
with PGAdvisoryLock(STORAGE_METRICS_LOCK):
from pulpcore.app.models import Artifact

options = yield # noqa

while True:
artifacts = Artifact.objects.filter(pulp_domain=self.domain).only("size")
total_size = artifacts.aggregate(size=Sum("size", default=0))["size"]
options = yield [ # noqa
metrics.Observation(
total_size,
{
"pulp_href": get_url(self.domain),
"domain_name": self.domain.name,
},
)
]
except AdvisoryLockError:
yield


def init_domain_metrics_exporter():
from pulpcore.app.models.domain import Domain

for domain in Domain.objects.all():
DomainMetricsEmitterBuilder.build(domain)
DomainMetricsEmitter.build(domain)


class PGAdvisoryLock:
Expand Down
94 changes: 72 additions & 22 deletions pulpcore/content/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from multidict import CIMultiDict
import os
import re
import socket
from gettext import gettext as _
from functools import lru_cache

from aiohttp.client_exceptions import ClientResponseError
from aiohttp.web import FileResponse, StreamResponse, HTTPOk
Expand All @@ -21,6 +23,8 @@

import django

from opentelemetry import metrics

from pulpcore.constants import STORAGE_RESPONSE_MAP
from pulpcore.responses import ArtifactResponse

Expand Down Expand Up @@ -49,7 +53,11 @@
RemoteArtifact,
)
from pulpcore.app import mime_types # noqa: E402: module level not at top of file
from pulpcore.app.util import get_domain, cache_key # noqa: E402: module level not at top of file
from pulpcore.app.util import ( # noqa: E402: module level not at top of file
MetricsEmitter,
get_domain,
cache_key,
)

from pulpcore.exceptions import UnsupportedDigestValidationError # noqa: E402

Expand All @@ -59,6 +67,11 @@
log = logging.getLogger(__name__)


@lru_cache(maxsize=1)
def _get_content_app_name():
return f"{os.getpid()}@{socket.gethostname()}"


class PathNotResolved(HTTPNotFound):
"""
The path could not be resolved to a published file.
Expand Down Expand Up @@ -154,6 +167,20 @@ class Handler:

distribution_model = None

class ArtifactsSizeCounter(MetricsEmitter):
def __init__(self):
self.meter = metrics.get_meter("artifacts.size.meter")
self.counter = self.meter.create_counter(
"artifacts.size.counter",
unit="Bytes",
description="Counts the size of served artifacts",
)

def add(self, amount, attributes):
self.counter.add(amount, attributes)

artifacts_size_counter = ArtifactsSizeCounter.build()

@staticmethod
def _reset_db_connection():
"""
Expand Down Expand Up @@ -960,13 +987,37 @@ def _set_params_from_headers(hdrs, storage_domain):
params[STORAGE_RESPONSE_MAP[storage_domain][a_key]] = hdrs[a_key]
return params

def _build_url(**kwargs):
filename = os.path.basename(content_artifact.relative_path)
content_disposition = f"attachment;filename={filename}"

headers["Content-Disposition"] = content_disposition
parameters = _set_params_from_headers(headers, domain.storage_class)
storage_url = storage.url(artifact_name, parameters=parameters, **kwargs)

return URL(storage_url, encoded=True)

artifact_file = content_artifact.artifact.file
artifact_name = artifact_file.name
filename = os.path.basename(content_artifact.relative_path)
content_disposition = f"attachment;filename={filename}"
domain = get_domain()
storage = domain.get_storage()

content_length = artifact_file.size

try:
range_start, range_stop = request.http_range.start, request.http_range.stop
if range_start or range_stop:
if range_stop and artifact_file.size and range_stop > artifact_file.size:
start = 0 if range_start is None else range_start
content_length = artifact_file.size - start
elif range_stop:
content_length = range_stop - range_start
except ValueError:
size = artifact_file.size or "*"
raise HTTPRequestRangeNotSatisfiable(headers={"Content-Range": f"bytes */{size}"})

self._report_served_artifact_size(content_length)

if domain.storage_class == "pulpcore.app.models.storage.FileSystem":
path = storage.path(artifact_name)
if not os.path.exists(path):
Expand All @@ -975,25 +1026,12 @@ def _set_params_from_headers(hdrs, storage_domain):
elif not domain.redirect_to_object_storage:
return ArtifactResponse(content_artifact.artifact, headers=headers)
elif domain.storage_class == "storages.backends.s3boto3.S3Boto3Storage":
headers["Content-Disposition"] = content_disposition
parameters = _set_params_from_headers(headers, domain.storage_class)
url = URL(
artifact_file.storage.url(
artifact_name, parameters=parameters, http_method=request.method
),
encoded=True,
)
raise HTTPFound(url)
elif domain.storage_class == "storages.backends.azure_storage.AzureStorage":
headers["Content-Disposition"] = content_disposition
parameters = _set_params_from_headers(headers, domain.storage_class)
url = URL(artifact_file.storage.url(artifact_name, parameters=parameters), encoded=True)
raise HTTPFound(url)
elif domain.storage_class == "storages.backends.gcloud.GoogleCloudStorage":
headers["Content-Disposition"] = content_disposition
parameters = _set_params_from_headers(headers, domain.storage_class)
url = URL(artifact_file.storage.url(artifact_name, parameters=parameters), encoded=True)
raise HTTPFound(url)
raise HTTPFound(_build_url(http_method=request.method))
elif domain.storage_class in (
"storages.backends.azure_storage.AzureStorage",
"storages.backends.gcloud.GoogleCloudStorage",
):
raise HTTPFound(_build_url())
else:
raise NotImplementedError()

Expand Down Expand Up @@ -1111,6 +1149,11 @@ async def finalize():
downloader.finalize = finalize
download_result = await downloader.run()

if content_length := response.headers.get("Content-Length"):
self._report_served_artifact_size(int(content_length))
else:
self._report_served_artifact_size(size)

if save_artifact and remote.policy != Remote.STREAMED:
await asyncio.shield(
sync_to_async(self._save_artifact)(download_result, remote_artifact, request)
Expand All @@ -1120,3 +1163,10 @@ async def finalize():
if response.status == 404:
raise HTTPNotFound()
return response

def _report_served_artifact_size(self, size):
attributes = {
"domain_name": get_domain().name,
"content_app_name": _get_content_app_name(),
}
self.artifacts_size_counter.add(size, attributes)

0 comments on commit f26482c

Please sign in to comment.