Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add script for pruning stale source archives #967

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cachito/web/api_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ def get_request_metrics():
"id": state.request_id,
"final_state": RequestStateMapping(state.state).name,
"final_state_reason": state.state_reason,
"finished": state.updated.isoformat(),
"finished": state.updated.isoformat(timespec="microseconds"),
"duration": state.duration,
"time_in_queue": state.time_in_queue,
}
Expand Down
10 changes: 7 additions & 3 deletions cachito/web/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,16 @@ def to_json(self, verbose=True):
if self.user:
user = self.user.username

created = None
if self.created is not None:
created = self.created.isoformat(timespec="microseconds")

env_vars_json = OrderedDict()
for env_var in self.environment_variables:
env_vars_json[env_var.name] = env_var.value
rv = {
"id": self.id,
"created": None if self.created is None else self.created.isoformat(),
"created": created,
"repo": self.repo,
"ref": self.ref,
"pkg_managers": pkg_managers,
Expand All @@ -428,7 +432,7 @@ def _state_to_json(state):
return {
"state": RequestStateMapping(state.state).name,
"state_reason": state.state_reason,
"updated": state.updated.isoformat(),
"updated": state.updated.isoformat(timespec="microseconds"),
}

def _error_to_json(error):
Expand Down Expand Up @@ -776,7 +780,7 @@ def to_json(self):
"origin": self.origin,
"error_type": self.error_type,
"message": self.message,
"occurred": self.occurred.isoformat(),
"occurred": self.occurred.isoformat(timespec="microseconds"),
}

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions cachito/workers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class Config(object):
broker_transport_options = {"max_retries": 10}
# Refer to README.md for information on all the Cachito configuration options
cachito_api_timeout = 60
cachito_archives_default_age_days = 730
cachito_archives_minimum_age_days = 365
cachito_auth_type: Optional[str] = None
cachito_default_environment_variables = {
"gomod": {"GOSUMDB": {"value": "off", "kind": "literal"}},
Expand Down
233 changes: 233 additions & 0 deletions cachito/workers/prune_archives.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import logging
import re
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from itertools import islice
from pathlib import Path
from typing import Annotated, Any, Generator, NamedTuple, Optional

import requests
Dismissed Show dismissed Hide dismissed
import typer
from ratelimit import limits, sleep_and_retry

from cachito.errors import NetworkError
from cachito.workers.config import get_worker_config
from cachito.workers.requests import get_requests_session

app = typer.Typer()
config = get_worker_config()
log = logging.getLogger(__name__)
session = get_requests_session()

ARCHIVE_DIR = Path(config.cachito_sources_dir)
ARCHIVE_PATTERN = re.compile(r"^[a-f0-9]{40}(-with-submodules)?\.tar\.gz$")
DEFAULT_AGE_DATETIME = datetime.now(timezone.utc) - timedelta(
days=config.cachito_archives_default_age_days
)
MINIMUM_AGE_DATETIME = datetime.now(timezone.utc) - timedelta(
days=config.cachito_archives_minimum_age_days
)
LOG_FORMAT = "%(asctime)s %(levelname)s %(message)s"


@dataclass(frozen=True)
class _ParsedArchive:
"""A source archive parsed from the filesystem."""

path: Path
repo_name: str
ref: str

@classmethod
def from_path(cls, path: Path) -> "_ParsedArchive":
repo_name = path.parent.relative_to(ARCHIVE_DIR).as_posix()
ref = path.name[:40]
return cls(path, repo_name, ref)


class _ResolvedArchive(NamedTuple):
"""A source archive matched to the most recent request for it."""

path: Path
created: datetime
latest_request_id: int


@app.callback()
def configure_logging(verbose: bool = False):
"""Configure logging for the app."""
log_level = logging.DEBUG if verbose else logging.INFO
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(LOG_FORMAT))
log.setLevel(log_level)
log.addHandler(handler)


def _get_latest_request(archive: _ParsedArchive) -> Optional[dict[str, Any]]:
"""
Find the latest request matching the _ParsedArchive via the API.

Return None if no matching request is found.
"""
url = f"{config.cachito_api_url.rstrip('/')}/requests/latest"
params = {
"repo_name": archive.repo_name,
"ref": archive.ref,
}

try:
response = session.get(url, params=params, timeout=config.cachito_api_timeout)
response.raise_for_status()
except requests.HTTPError:
if response.status_code == 404:
return None
log.error(
"The request to %s failed with the status code %d and the following text: %s",
url,
response.status_code,
response.text,
)
raise NetworkError("Failed to query the cachito API")
except requests.RequestException:
msg = f"The connection failed when querying {url}"
log.exception(msg)
raise NetworkError(msg)

return response.json()


def _get_parsed_source_archives(archive_dir: Path) -> Generator[_ParsedArchive, None, None]:
"""Return a _ParsedArchive for each source archive in ARCHIVE_DIR."""

def is_valid_archive_filename(filename: str) -> bool:
"""Archive filename should match <sha1 hash>-<(optional)with-submodules>.tar.gz."""
return re.match(ARCHIVE_PATTERN, filename) is not None

for path in archive_dir.rglob("*.tar.gz"):
if path.is_file() and is_valid_archive_filename(path.name):
yield _ParsedArchive.from_path(path)
else:
log.debug("%s does not appear to be a source archive.", path)


def _resolve_source_archive(parsed_archive: _ParsedArchive) -> Optional[_ResolvedArchive]:
"""Return a _ResolvedArchive if a matching request is found via the API."""
latest_request = _get_latest_request(parsed_archive)
if latest_request is None:
log.debug("Archive %s could not be resolved via the API.", parsed_archive.path)
return None

return _ResolvedArchive(
parsed_archive.path,
datetime.strptime(latest_request["created"], "%Y-%m-%dT%H:%M:%S.%f").replace(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the request timestamps already UTC adjusted or do we need to use the same treatment here as with user CLI input? I'm just curious since I don't see a 'Z' in the unit test timestamps and I'm too lazy to go and check the deployment for real data :).
It's still a nitpick though, so even if it weren't no big deal since I doubt the time shift would make a significant difference in terms of archive cleanup anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the request timestamps returned from the DB all should be UTC already. They're the values formatted by datetime.isoformat() in the first commit here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylormadore can you specifically request review from me so that my approval counts?

tzinfo=timezone.utc
),
latest_request["id"],
)


def _get_stale_archives(
older_than: datetime, api_calls_per_second: int
) -> Generator[_ResolvedArchive, None, None]:
"""
Return a Generator of _ResolvedArchives that are all stale.

The API requests are ratelimited to prevent potentially overwhelming the API
with a background maintenance task.
"""

@sleep_and_retry
@limits(calls=api_calls_per_second, period=1)
def resolve_source_archive_ratelimited(archive: _ParsedArchive) -> Optional[_ResolvedArchive]:
return _resolve_source_archive(archive)

for parsed_archive in _get_parsed_source_archives(ARCHIVE_DIR):
resolved_archive = resolve_source_archive_ratelimited(parsed_archive)
if resolved_archive and resolved_archive.created < older_than:
yield resolved_archive


def _process_stale_archives(
older_than: datetime,
api_calls_per_second: int,
delete: bool = False,
limit: Optional[int] = None,
) -> None:
"""List stale source archives up to the limit, optionally deleting them."""
for archive in islice(_get_stale_archives(older_than, api_calls_per_second), limit):
log.info(
f"Archive {archive.path} is stale. The most recent request_id="
f"{archive.latest_request_id} at {archive.created}"
)
if delete:
log.info(f"Deleting {archive.path}")
archive.path.unlink()


def _validate_older_than(older_than: Optional[datetime]) -> datetime:
"""Ensure that the value of the --older-than CLI option is not more recent than the minimum."""
older_than_utc = (
DEFAULT_AGE_DATETIME if older_than is None else older_than.astimezone(timezone.utc)
)
if older_than_utc > MINIMUM_AGE_DATETIME:
raise typer.BadParameter(f"cannot be more recent than {MINIMUM_AGE_DATETIME}")
return older_than_utc


@app.command("delete")
def delete_archives(
older_than: Annotated[
Optional[datetime],
typer.Option(
callback=_validate_older_than,
formats=["%Y-%m-%d"],
help="Deletes archives that are older than the specified date. YYYY-MM-DD",
),
] = None,
api_calls_per_second: Annotated[
int, typer.Option(min=1, max=5, help="The API requests-per-second limit.")
] = 2,
limit: Annotated[
Optional[int], typer.Option(min=1, help="The maximum number of stale archives to process.")
] = None,
execute: Annotated[bool, typer.Option(help="Actual deletion will only occur if True.")] = False,
):
"""
List and delete stale source archives.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make more sense for delete() to call list(), rather than duplicating the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworked this a bit in the final. Let me know if the changes work for you 👍


Actual deletion will not occur unless the --execute option is included.
"""
# Needed to keep mypy happy. See the _validate_older_than callback
if older_than is None:
raise typer.BadParameter("--older-than cannot be None")

_process_stale_archives(older_than, api_calls_per_second, delete=execute, limit=limit)


@app.command("list")
def list_archives(
older_than: Annotated[
Optional[datetime],
typer.Option(
callback=_validate_older_than,
formats=["%Y-%m-%d"],
help="Lists archives that are older than the specified date. YYYY-MM-DD",
),
] = None,
api_calls_per_second: Annotated[
int, typer.Option(min=1, max=5, help="The API requests-per-second limit.")
] = 2,
limit: Annotated[
Optional[int], typer.Option(min=1, help="The maximum number of stale archives to process.")
] = None,
):
"""List stale source archives."""
# Needed to keep mypy happy. See the _validate_older_than callback
if older_than is None:
raise typer.BadParameter("--older-than cannot be None")

_process_stale_archives(older_than, api_calls_per_second, delete=False, limit=limit)


if __name__ == "__main__":
app()
2 changes: 2 additions & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ packaging
pyarn
pydantic<2
pyyaml
ratelimit
requests_kerberos>=0.13.0
requests
semver
Expand All @@ -24,3 +25,4 @@ opentelemetry-instrumentation-requests
opentelemetry-instrumentation-sqlalchemy
opentelemetry-exporter-jaeger
opentelemetry-exporter-otlp-proto-http
typer
9 changes: 9 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ click==8.1.7 \
# click-plugins
# click-repl
# commoncode
# typer
click-didyoumean==0.3.0 \
--hash=sha256:a0713dc7a1de3f06bc0df5a9567ad19ead2d3d5689b434768a6145bff77c0667 \
--hash=sha256:f184f0d851d96b6d29297354ed981b7dd71df7ff500d82fa6d11f0856bee8035
Expand Down Expand Up @@ -829,6 +830,9 @@ pyyaml==6.0.1 \
# via
# -r requirements.in
# saneyaml
ratelimit==2.2.1 \
--hash=sha256:af8a9b64b821529aca09ebaf6d8d279100d766f19e90b5059ac6a718ca6dee42
# via -r requirements.in
requests==2.31.0 \
--hash=sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f \
--hash=sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1
Expand Down Expand Up @@ -870,12 +874,17 @@ text-unidecode==1.3 \
thrift==0.16.0 \
--hash=sha256:2b5b6488fcded21f9d312aa23c9ff6a0195d0f6ae26ddbd5ad9e3e25dfc14408
# via opentelemetry-exporter-jaeger-thrift
typer==0.9.0 \
--hash=sha256:50922fd79aea2f4751a8e0408ff10d2662bd0c8bbfa84755a699f3bada2978b2 \
--hash=sha256:5d96d986a21493606a358cae4461bd8cdf83cbf33a5aa950ae629ca3b51467ee
# via -r requirements.in
typing-extensions==4.9.0 \
--hash=sha256:23478f88c37f27d76ac8aee6c905017a143b0b1b886c3c9f66bc2fd94f9f5783 \
--hash=sha256:af72aea155e91adfc61c3ae9e0e342dbc0cba726d6cba4b6c72c1f34e47291cd
# via
# opentelemetry-sdk
# pydantic
# typer
tzdata==2023.3 \
--hash=sha256:11ef1e08e54acb0d4f95bdb1be05da659673de4acbd21bf9c69e94cc5e907a3a \
--hash=sha256:7e65763eef3120314099b6939b5546db7adce1e7d6f2e179e3df563c70511eda
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"console_scripts": [
"cachito=cachito.web.manage:cli",
"cachito-cleanup=cachito.workers.cleanup_job:main",
"cachito-prune-archives=cachito.workers.prune_archives:app",
"cachito-update-nexus-scripts=cachito.workers.nexus:create_or_update_scripts",
]
},
Expand Down
Loading