Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Active ongoing episode releasing on the time of airing #942

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
13 changes: 9 additions & 4 deletions src/program/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
"""Program main module"""

from program.media.item import MediaItem # noqa: F401
from program.program import Event, Program # noqa: F401
"""Program module."""

from loguru import logger

from program.media.item import MediaItem # noqa: F401
from program.program import Event, Program # noqa: F401

# Add custom log levels
logger.level("RELEASE", no=35, color="<magenta>")
Comment on lines +8 to +9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for logger configuration

The logger configuration should be wrapped in a try-catch block to handle potential initialization failures gracefully.

 # Add custom log levels
-logger.level("RELEASE", no=35, color="<magenta>")
+try:
+    logger.level("RELEASE", no=35, color="<magenta>")
+except Exception as e:
+    logger.error(f"Failed to configure RELEASE log level: {e}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Add custom log levels
logger.level("RELEASE", no=35, color="<magenta>")
# Add custom log levels
try:
logger.level("RELEASE", no=35, color="<magenta>")
except Exception as e:
logger.error(f"Failed to configure RELEASE log level: {e}")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this belongs in logging.py, you dont need to change this file :)

11 changes: 8 additions & 3 deletions src/program/apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@
from .overseerr_api import OverseerrAPI, OverseerrAPIError
from .plex_api import PlexAPI, PlexAPIError
from .trakt_api import TraktAPI, TraktAPIError

from program.apis.tvmaze_api import TVMazeAPI
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from .tvmaze_api import TVMazeAPI

Also be sure to make a custom exception for TVMaze as well. Follow the same concepts as the other api modules do in this api's dir


def bootstrap_apis():
__setup_trakt()
__setup_plex()
__setup_mdblist()
__setup_overseerr()
__setup_listrr()
__setup_tvmaze()

def __setup_trakt():
traktApi = TraktAPI(settings_manager.settings.content.trakt)
di[TraktAPI] = traktApi
"""Setup Trakt API."""
di[TraktAPI] = TraktAPI(settings_manager.settings.content.trakt)

def __setup_plex():
if not settings_manager.settings.updaters.plex.enabled:
Expand All @@ -43,3 +44,7 @@ def __setup_listrr():
return
listrrApi = ListrrAPI(settings_manager.settings.content.listrr.api_key)
di[ListrrAPI] = listrrApi

def __setup_tvmaze():
"""Setup TVMaze API."""
di[TVMazeAPI] = TVMazeAPI()
dreulavelle marked this conversation as resolved.
Show resolved Hide resolved
53 changes: 48 additions & 5 deletions src/program/apis/trakt_api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import re
from datetime import datetime
from datetime import datetime, timedelta
from types import SimpleNamespace
from typing import List, Optional, Union
from urllib.parse import urlencode
from zoneinfo import ZoneInfo # Import ZoneInfo

from requests import RequestException, Session

Expand Down Expand Up @@ -358,9 +359,51 @@ def _get_imdb_id_from_list(self, namespaces: List[SimpleNamespace], id_type: str
return None

def _get_formatted_date(self, data, item_type: str) -> Optional[datetime]:
"""Get the formatted aired date from the data."""
"""Get the formatted aired date from the data.
Trakt API provides all dates in UTC/GMT format (ISO 8601).
"""
if item_type in ["show", "season", "episode"] and (first_aired := getattr(data, "first_aired", None)):
return datetime.strptime(first_aired, "%Y-%m-%dT%H:%M:%S.%fZ")
if item_type == "movie" and (released := getattr(data, "released", None)):
return datetime.strptime(released, "%Y-%m-%d")
try:
# Parse the UTC time directly from Trakt's first_aired field
utc_time = datetime.fromisoformat(first_aired.replace('Z', '+00:00'))

# Apply release delay if configured
if self.settings.release_delay_minutes > 0:
utc_time = utc_time + timedelta(minutes=self.settings.release_delay_minutes)
logger.debug(f" Adding {self.settings.release_delay_minutes} minute delay to release time")

logger.debug(f"Time conversion for {getattr(data, 'title', 'Unknown')}:")
logger.debug(f" 1. Raw time from Trakt (UTC): {first_aired}")
logger.debug(f" 2. Parsed UTC time: {utc_time}")

# Convert to local time for display
local_time = utc_time.astimezone()
logger.debug(f" 3. Your local time will be: {local_time}")

# Check if we have timezone information from Trakt
airs = getattr(data, "airs", None)
tz = getattr(airs, "timezone", None) if airs else None
if tz:
logger.debug(f" 4. Show timezone from Trakt: {tz}")
try:
# Convert UTC time to show's timezone for reference
show_time = utc_time.astimezone(ZoneInfo(tz))
logger.debug(f" 5. Time in show's timezone: {show_time}")
except Exception as e:
logger.error(f"Failed to convert to show timezone: {e}")

return utc_time
except (ValueError, TypeError) as e:
logger.error(f"Failed to parse airtime: {e}")
return None

elif item_type == "movie" and (released := getattr(data, "released", None)):
try:
# Movies just have dates, set to midnight UTC
utc_time = datetime.strptime(released, "%Y-%m-%d").replace(hour=0, minute=0, second=0, tzinfo=ZoneInfo("UTC"))
logger.debug(f"Parsed movie release date: {released} -> {utc_time} UTC")
return utc_time
except ValueError:
logger.error(f"Failed to parse release date: {released}")
return None
return None
184 changes: 184 additions & 0 deletions src/program/apis/tvmaze_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
"""TVMaze API client for fetching show information."""
from datetime import datetime
from typing import Optional
from zoneinfo import ZoneInfo

from loguru import logger
from program.utils.request import (
BaseRequestHandler,
HttpMethod,
ResponseType,
create_service_session,
get_cache_params,
get_rate_limit_params,
)
from requests.exceptions import HTTPError

class TVMazeAPI:
"""Handles TVMaze API communication."""

BASE_URL = "https://api.tvmaze.com"

def __init__(self):
rate_limit_params = get_rate_limit_params(max_calls=20, period=10) # TVMaze allows 20 requests per 10 seconds
tvmaze_cache = get_cache_params("tvmaze", 86400) # Cache for 24 hours
session = create_service_session(rate_limit_params=rate_limit_params, use_cache=True, cache_params=tvmaze_cache)
self.request_handler = BaseRequestHandler(session, response_type=ResponseType.SIMPLE_NAMESPACE)

def get_show_by_imdb(self, imdb_id: str, show_name: Optional[str] = None, season_number: Optional[int] = None, episode_number: Optional[int] = None) -> Optional[datetime]:
"""Get show information from TVMaze using IMDB ID.

Args:
imdb_id: IMDB ID of the show or episode (with or without 'tt' prefix)
show_name: Optional show name to use for search if IMDB lookup fails
season_number: Optional season number to find specific episode
episode_number: Optional episode number to find specific episode

Returns:
Next episode airtime in local time if available, None otherwise
"""
try:
# Add 'tt' prefix if not present
if not imdb_id.startswith('tt'):
imdb_id = f'tt{imdb_id}'

show = None

# Try singlesearch by show name first if provided, since episode IDs won't work with lookup
if show_name:
logger.debug(f"Trying singlesearch by name: {show_name}")
try:
response = self.request_handler._request(HttpMethod.GET, f"{self.BASE_URL}/singlesearch/shows", params={'q': show_name})
show = response.data if response.is_ok else None
except HTTPError as e:
if e.response.status_code == 404:
show = None
else:
raise

# If show name search fails or wasn't provided, try direct lookup
# This will only work for show-level IMDB IDs, not episode IDs
if not show:
try:
response = self.request_handler._request(HttpMethod.GET, f"{self.BASE_URL}/lookup/shows", params={'imdb': imdb_id})
show = response.data if response.is_ok else None
except HTTPError as e:
if e.response.status_code == 404:
show = None
else:
raise

# If that fails too, try regular search
if not show and show_name:
logger.debug(f"Singlesearch failed for {show_name}, trying regular search")
try:
response = self.request_handler._request(HttpMethod.GET, f"{self.BASE_URL}/search/shows", params={'q': show_name})
if response.is_ok and response.data:
# Take the first result with highest score
show = response.data[0].show if response.data else None
except HTTPError as e:
if e.response.status_code == 404:
show = None
else:
raise

if not show:
logger.debug(f"Could not find show for {imdb_id} / {show_name}")
return None

# Get next episode
try:
response = self.request_handler._request(HttpMethod.GET, f"{self.BASE_URL}/shows/{show.id}/episodes")
episodes = response.data if response.is_ok else None
except HTTPError as e:
if e.response.status_code == 404:
episodes = None
else:
raise

if not episodes:
return None

# Find the next episode that hasn't aired yet
current_time = datetime.now()
next_episode = None
target_episode_time = None

for episode in episodes:
try:
if not episode.airstamp:
continue

# First try to get air time using network timezone
air_time = None
if (hasattr(show, 'network') and show.network and
hasattr(show.network, 'country') and show.network.country and
hasattr(show.network.country, 'timezone') and show.network.country.timezone and
episode.airdate and episode.airtime):

# Combine airdate and airtime in network timezone
network_tz = ZoneInfo(show.network.country.timezone)
air_datetime = f"{episode.airdate}T{episode.airtime}"
try:
# Parse the time in network timezone
air_time = datetime.fromisoformat(air_datetime).replace(tzinfo=network_tz)
# Only log network time for the target episode
if (season_number is not None and episode_number is not None and
hasattr(episode, 'number') and hasattr(episode, 'season') and
episode.season == season_number and episode.number == episode_number):
logger.debug(f"Network airs show at {air_time} ({show.network.country.timezone})")
except Exception as e:
logger.error(f"Failed to parse network air time: {e}")
air_time = None

# Fallback to airstamp if needed
if not air_time and episode.airstamp:
try:
air_time = datetime.fromisoformat(episode.airstamp.replace('Z', '+00:00'))
if (season_number is not None and episode_number is not None and
hasattr(episode, 'number') and hasattr(episode, 'season') and
episode.season == season_number and episode.number == episode_number):
logger.debug(f"Using UTC airstamp: {air_time}")
except Exception as e:
logger.error(f"Failed to parse airstamp: {e}")
continue

if not air_time:
continue

# Convert to local time
air_time = air_time.astimezone(current_time.tzinfo)

# Check if this is the specific episode we want
if season_number is not None and episode_number is not None:
if hasattr(episode, 'number') and hasattr(episode, 'season'):
if episode.season == season_number and episode.number == episode_number:
dreulavelle marked this conversation as resolved.
Show resolved Hide resolved
# Found our target episode
if hasattr(episode, 'name'):
logger.debug(f"Found S{season_number}E{episode_number} '{episode.name}' airing at {air_time}")
else:
logger.debug(f"Found S{season_number}E{episode_number} airing at {air_time}")
target_episode_time = air_time
break # No need to continue looking

# If we're looking for next episode and this one is in the future
elif air_time > current_time:
# If we haven't found any future episode yet, or this one airs sooner
if not next_episode or air_time < next_episode:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Combine nested conditions into a single if statement

The nested if statements at lines 165-167 can be merged for clearer logic flow.

Apply this diff:

- elif air_time > current_time:
-     if not next_episode or air_time < next_episode:
-         next_episode = air_time
+ elif air_time > current_time and (not next_episode or air_time < next_episode):
+     next_episode = air_time
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
elif air_time > current_time:
# If we haven't found any future episode yet, or this one airs sooner
if not next_episode or air_time < next_episode:
elif air_time > current_time and (not next_episode or air_time < next_episode):
next_episode = air_time
🧰 Tools
🪛 Ruff (0.8.2)

165-167: Use a single if statement instead of nested if statements

(SIM102)

next_episode = air_time

except Exception as e:
logger.error(f"Failed to process episode {getattr(episode, 'number', '?')}: {e}")
continue

# Return target episode time if we found one, otherwise return next episode
if target_episode_time is not None:
return target_episode_time

if next_episode:
logger.debug(f"Next episode airs at {next_episode}")
return next_episode

except Exception as e:
logger.error(f"Error fetching TVMaze data for {imdb_id}: {e}")
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider breaking down the complex method into smaller, focused functions.

The get_show_by_imdb method handles multiple responsibilities including IMDB ID formatting, show lookup via different methods, and episode airtime processing. Consider extracting these into separate methods for better maintainability:

  1. _format_imdb_id(imdb_id: str) -> str
  2. _lookup_show_by_name(show_name: str) -> Optional[Show]
  3. _lookup_show_by_imdb(imdb_id: str) -> Optional[Show]
  4. _get_show_episodes(show_id: int) -> Optional[List[Episode]]

Additionally, consider consolidating the error handling pattern to reduce code duplication:

def _make_api_request(self, endpoint: str, params: dict) -> Optional[Any]:
    try:
        response = self.request_handler._request(HttpMethod.GET, f"{self.BASE_URL}/{endpoint}", params=params)
        return response.data if response.is_ok else None
    except HTTPError as e:
        if e.response.status_code == 404:
            return None
        raise
🧰 Tools
🪛 Ruff (0.8.2)

154-155: Use a single if statement instead of nested if statements

(SIM102)


165-167: Use a single if statement instead of nested if statements

(SIM102)

Loading