Skip to content

Commit

Permalink
feat: Add continuous mode with --sleep-seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
janw committed Oct 26, 2024
1 parent 43b2c95 commit 92eaa42
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 141 deletions.
1 change: 1 addition & 0 deletions cspell.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ words:
- PYTHONUNBUFFERED
- pyyaml
- rprint
- signum
- subdirs
- tini
- tmpl
Expand Down
14 changes: 11 additions & 3 deletions podcast_archiver/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import signal
import sys
import xml.etree.ElementTree as etree
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from podcast_archiver.logging import logger, rprint
from podcast_archiver.processor import FeedProcessor
Expand Down Expand Up @@ -31,9 +33,15 @@ def __init__(self, settings: Settings):
self.add_from_opml(opml)

def register_cleanup(self, ctx: click.RichContext) -> None:
@ctx.call_on_close
def _cleanup() -> None:
def _cleanup(signum: int, *args: Any) -> None:
logger.debug("Signal %s received", signum)
rprint("[error]Terminating.[/]")

Check warning on line 38 in podcast_archiver/base.py

View check run for this annotation

Codecov / codecov/patch

podcast_archiver/base.py#L37-L38

Added lines #L37 - L38 were not covered by tests
self.processor.shutdown()
ctx.close()
sys.exit(0)

Check warning on line 41 in podcast_archiver/base.py

View check run for this annotation

Codecov / codecov/patch

podcast_archiver/base.py#L40-L41

Added lines #L40 - L41 were not covered by tests

signal.signal(signal.SIGINT, _cleanup)
signal.signal(signal.SIGTERM, _cleanup)

def add_feed(self, feed: Path | str) -> None:
new_feeds = [feed] if isinstance(feed, str) else feed.read_text().strip().splitlines()
Expand Down
19 changes: 15 additions & 4 deletions podcast_archiver/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
import os
import pathlib
import stat
import time
from os import getenv
from typing import TYPE_CHECKING, Any

import rich_click as click
from rich import get_console

from podcast_archiver import __version__ as version
from podcast_archiver import constants
from podcast_archiver.base import PodcastArchiver
from podcast_archiver.config import Settings, in_ci
from podcast_archiver.exceptions import InvalidSettings
from podcast_archiver.logging import configure_logging
from podcast_archiver.logging import configure_logging, rprint

if TYPE_CHECKING:
from click.shell_completion import CompletionItem
Expand Down Expand Up @@ -49,6 +49,7 @@
"--update",
"--max-episodes",
"--ignore-database",
"--sleep",
],
},
]
Expand Down Expand Up @@ -281,10 +282,16 @@ def generate_default_config(ctx: click.Context, param: click.Parameter, value: b
show_envvar=True,
help=Settings.model_fields["ignore_database"].description,
)
@click.option(
"--sleep-seconds",
type=int,
default=0,
show_envvar=True,
help=Settings.model_fields["sleep_seconds"].description,
)
@click.pass_context
def main(ctx: click.RichContext, /, **kwargs: Any) -> int:
get_console().quiet = kwargs["quiet"]
configure_logging(kwargs["verbose"])
configure_logging(kwargs["verbose"], kwargs["quiet"])
try:
settings = Settings.load_from_dict(kwargs)

Expand All @@ -296,6 +303,10 @@ def main(ctx: click.RichContext, /, **kwargs: Any) -> int:
pa = PodcastArchiver(settings=settings)
pa.register_cleanup(ctx)
pa.run()
while settings.sleep_seconds > 0:
rprint(f"Sleeping for {settings.sleep_seconds} seconds.")
time.sleep(settings.sleep_seconds)
pa.run()

Check warning on line 309 in podcast_archiver/cli.py

View check run for this annotation

Codecov / codecov/patch

podcast_archiver/cli.py#L307-L309

Added lines #L307 - L309 were not covered by tests
except InvalidSettings as exc:
raise click.BadParameter(f"Invalid settings: {exc}") from exc
except KeyboardInterrupt as exc: # pragma: no cover
Expand Down
8 changes: 8 additions & 0 deletions podcast_archiver/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ class Settings(BaseModel):
),
)

sleep_seconds: int = Field(
default=0,
description=(
f"Run {constants.PROG_NAME} continuously. Set to a non-zero number of seconds to sleep after all available "
"episodes have been downloaded. Otherwise the application exits after all downloads have been completed."
),
)

config: FilePath | None = Field(
default=None,
exclude=True,
Expand Down
93 changes: 35 additions & 58 deletions podcast_archiver/download.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
from __future__ import annotations

from contextlib import nullcontext
from contextlib import contextmanager
from threading import Event
from typing import IO, TYPE_CHECKING, NoReturn

from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from typing import IO, TYPE_CHECKING, Generator

from podcast_archiver import constants
from podcast_archiver.enums import DownloadResult
from podcast_archiver.logging import logger
from podcast_archiver.exceptions import NotCompleted
from podcast_archiver.logging import logger, wrapped_tqdm
from podcast_archiver.session import session
from podcast_archiver.types import EpisodeResult
from podcast_archiver.utils import atomic_write
Expand All @@ -28,36 +26,29 @@ class DownloadJob:
target: Path
stop_event: Event

_debug_partial: bool
_max_download_bytes: int | None = None
_write_info_json: bool
_no_progress: bool

def __init__(
self,
episode: Episode,
*,
target: Path,
debug_partial: bool = False,
max_download_bytes: int | None = None,
write_info_json: bool = False,
no_progress: bool = False,
stop_event: Event | None = None,
) -> None:
self.episode = episode
self.target = target
self._debug_partial = debug_partial
self._max_download_bytes = max_download_bytes
self._write_info_json = write_info_json
self._no_progress = no_progress
self.stop_event = stop_event or Event()

def __repr__(self) -> str:
return f"EpisodeDownload({self})"

def __str__(self) -> str:
return str(self.episode)

def __call__(self) -> EpisodeResult:
try:
return self.run()
except NotCompleted:
return EpisodeResult(self.episode, DownloadResult.ABORTED)
except Exception as exc:
logger.error(f"Download failed: {exc}")
logger.debug("Exception while downloading", exc_info=exc)
Expand All @@ -68,58 +59,44 @@ def run(self) -> EpisodeResult:
return EpisodeResult(self.episode, DownloadResult.ALREADY_EXISTS)

self.target.parent.mkdir(parents=True, exist_ok=True)
self.write_info_json()

response = session.get(
self.episode.enclosure.href,
stream=True,
allow_redirects=True,
)
response.raise_for_status()
total_size = int(response.headers.get("content-length", "0"))
with (
logging_redirect_tqdm() if not self._no_progress else nullcontext(),
tqdm(
desc=f"{self.episode.title} ({self.episode.published_time:%Y-%m-%d})",
total=total_size,
unit_scale=True,
unit="B",
disable=self._no_progress,
) as progresser,
):
with atomic_write(self.target, mode="wb") as fp:
receive_complete = self.receive_data(fp, response, progresser=progresser)

if not receive_complete:
self.target.unlink(missing_ok=True)
return EpisodeResult(self.episode, DownloadResult.ABORTED)
response = session.get_and_raise(self.episode.enclosure.href, stream=True)
with self.write_info_json(), atomic_write(self.target, mode="wb") as fp:
self.receive_data(fp, response)

logger.info("Completed download of %s", self.target)
logger.info("Completed download of %s", self.target)
return EpisodeResult(self.episode, DownloadResult.COMPLETED_SUCCESSFULLY)

@property
def infojsonfile(self) -> Path:
return self.target.with_suffix(".info.json")

def receive_data(self, fp: IO[str], response: Response, progresser: tqdm[NoReturn]) -> bool:
def receive_data(self, fp: IO[bytes], response: Response) -> None:
total_size = int(response.headers.get("content-length", "0"))
total_written = 0
for chunk in response.iter_content(chunk_size=constants.DOWNLOAD_CHUNK_SIZE):
written = fp.write(chunk)
total_written += written
progresser.update(written)

if self._debug_partial and total_written >= constants.DEBUG_PARTIAL_SIZE:
logger.debug("Partial download completed.")
return True
if self.stop_event.is_set():
logger.debug("Stop event is set, bailing.")
return False
max_bytes = self._max_download_bytes
for chunk in wrapped_tqdm(
response.iter_content(chunk_size=constants.DOWNLOAD_CHUNK_SIZE),
desc=f"{self.episode.title} ({self.episode.published_time:%Y-%m-%d})",
total=total_size,
):
total_written += fp.write(chunk)

return True
if max_bytes and total_written >= max_bytes:
fp.truncate(max_bytes)
logger.debug("Partial download of first %s bytes completed.", max_bytes)
return

if self.stop_event.is_set():
logger.debug("Stop event is set, bailing on %s.", self.episode)
raise NotCompleted

def write_info_json(self) -> None:
@contextmanager
def write_info_json(self) -> Generator[None, None, None]:
if not self._write_info_json:
yield
return
logger.info("Writing episode metadata to %s", self.infojsonfile.name)
with atomic_write(self.infojsonfile) as fp:
fp.write(self.episode.model_dump_json(indent=2) + "\n")
yield
logger.info("Wrote episode metadata to %s", self.infojsonfile.name)
22 changes: 20 additions & 2 deletions podcast_archiver/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from typing import Any
from __future__ import annotations

import pydantic_core
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
import pydantic_core

from podcast_archiver.models import FeedInfo


class PodcastArchiverException(Exception):
Expand All @@ -27,3 +32,16 @@ def __str__(self) -> str:

class MissingDownloadUrl(ValueError):
pass


class NotCompleted(RuntimeError):
pass


class NotModified(PodcastArchiverException):
info: FeedInfo
last_modified: str | None = None

def __init__(self, info: FeedInfo, *args: object) -> None:
super().__init__(*args)
self.info = info
53 changes: 47 additions & 6 deletions podcast_archiver/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,70 @@

import logging
import logging.config
from typing import Any
from typing import Any, Generator, Iterable

from rich import get_console
from rich import print as _print
from rich.logging import RichHandler
from rich.text import Text
from tqdm import tqdm

logger = logging.getLogger("podcast_archiver")


def rprint(*objects: Any, sep: str = " ", end: str = "\n", **kwargs: Any) -> None:
if logger.level == logging.NOTSET or logger.level >= logging.WARNING:
_print(*objects, sep=sep, end=end, **kwargs)
_REDIRECT_VIA_TQDM: bool = False
_REDIRECT_VIA_LOGGING: bool = False


def rprint(msg: str, **kwargs: Any) -> None:
if not _REDIRECT_VIA_TQDM and not _REDIRECT_VIA_LOGGING:
_print(msg, **kwargs)
return

text = Text.from_markup(msg).plain.strip()

Check warning on line 25 in podcast_archiver/logging.py

View check run for this annotation

Codecov / codecov/patch

podcast_archiver/logging.py#L25

Added line #L25 was not covered by tests
if _REDIRECT_VIA_TQDM:
tqdm.write(text)
return

Check warning on line 28 in podcast_archiver/logging.py

View check run for this annotation

Codecov / codecov/patch

podcast_archiver/logging.py#L27-L28

Added lines #L27 - L28 were not covered by tests

logger.info(text)

Check warning on line 30 in podcast_archiver/logging.py

View check run for this annotation

Codecov / codecov/patch

podcast_archiver/logging.py#L30

Added line #L30 was not covered by tests


def wrapped_tqdm(iterable: Iterable[bytes], desc: str, total: int) -> Generator[bytes, None, None]:
if _REDIRECT_VIA_LOGGING:
yield from iterable

Check warning on line 35 in podcast_archiver/logging.py

View check run for this annotation

Codecov / codecov/patch

podcast_archiver/logging.py#L35

Added line #L35 was not covered by tests
return
logger.info(objects[0].strip(), *objects[1:])

with tqdm(desc=desc, total=total, unit_scale=True, unit="B") as progress:
global _REDIRECT_VIA_TQDM
_REDIRECT_VIA_TQDM = True
try:
for chunk in iterable:
progress.update(len(chunk))
yield chunk
finally:
_REDIRECT_VIA_TQDM = False


def configure_logging(verbosity: int) -> None:
def _tone_down_logging() -> None:
for name in [
"urllib3",
]:
logging.getLogger(name).setLevel(logging.INFO)


def configure_logging(verbosity: int, quiet: bool) -> None:
get_console().quiet = quiet
if verbosity > 1:
level = logging.DEBUG
elif verbosity == 1:
level = logging.WARNING
else:
level = logging.ERROR

if verbosity > 1 or quiet:
global _REDIRECT_VIA_LOGGING
_REDIRECT_VIA_LOGGING = True

Check warning on line 67 in podcast_archiver/logging.py

View check run for this annotation

Codecov / codecov/patch

podcast_archiver/logging.py#L67

Added line #L67 was not covered by tests

logging.basicConfig(
level=level,
format="%(message)s",
Expand All @@ -44,3 +84,4 @@ def configure_logging(verbosity: int) -> None:
)
logger.setLevel(level)
logger.debug("Running in debug mode.")
_tone_down_logging()
Loading

0 comments on commit 92eaa42

Please sign in to comment.