diff --git a/plextraktsync/commands/sync.py b/plextraktsync/commands/sync.py index 786e537cf1..46fbf866ba 100644 --- a/plextraktsync/commands/sync.py +++ b/plextraktsync/commands/sync.py @@ -69,4 +69,4 @@ async def sync( w.print_plan(print=logger.info) if dry_run: logger.info("Enabled dry-run mode: not making actual changes") - runner.sync(walker=w, dry_run=config.dry_run) + await runner.sync(walker=w, dry_run=config.dry_run) diff --git a/plextraktsync/media/MediaFactory.py b/plextraktsync/media/MediaFactory.py index 7c1851488d..644b7f8157 100644 --- a/plextraktsync/media/MediaFactory.py +++ b/plextraktsync/media/MediaFactory.py @@ -27,7 +27,7 @@ def __init__(self, plex: PlexApi, trakt: TraktApi): self.plex = plex self.trakt = trakt - def resolve_any(self, pm: PlexLibraryItem, show: Media = None) -> Media | None: + async def resolve_any(self, pm: PlexLibraryItem, show: Media = None) -> Media | None: try: guids = pm.guids except (PlexApiException, RequestException) as e: diff --git a/plextraktsync/plan/Walker.py b/plextraktsync/plan/Walker.py index d05d988ac6..075dbff536 100644 --- a/plextraktsync/plan/Walker.py +++ b/plextraktsync/plan/Walker.py @@ -69,7 +69,7 @@ def print_plan(self, print): if self.plan.episodes: print(f"Sync Episodes: {[x.title for x in self.plan.episodes]}") - def get_plex_movies(self) -> Generator[PlexLibraryItem, Any, None]: + async def get_plex_movies(self) -> Generator[PlexLibraryItem, Any, None]: """ Iterate over movie sections unless specific movie is requested """ @@ -80,16 +80,17 @@ def get_plex_movies(self) -> Generator[PlexLibraryItem, Any, None]: else: return - yield from movies + async for m in movies: + yield m - def find_movies(self) -> Generator[Media, Any, None]: - for plex in self.get_plex_movies(): - movie = self.mf.resolve_any(plex) + async def find_movies(self) -> Generator[Media, Any, None]: + async for plex in self.get_plex_movies(): + movie = await self.mf.resolve_any(plex) if not movie: continue yield movie - def get_plex_shows(self) -> Generator[PlexLibraryItem, Any, None]: + async def get_plex_shows(self) -> Generator[PlexLibraryItem, Any, None]: if self.plan.shows: shows = self.media_from_items("show", self.plan.shows) elif self.plan.show_sections: @@ -97,11 +98,13 @@ def get_plex_shows(self) -> Generator[PlexLibraryItem, Any, None]: else: return - yield from shows + async for m in shows: + yield m - def find_episodes(self): + async def find_episodes(self): if self.plan.episodes: - yield from self.get_plex_episodes(self.plan.episodes) + async for m in self.get_plex_episodes(self.plan.episodes): + yield m # Preload plex shows plex_shows = {} @@ -124,14 +127,15 @@ def find_episodes(self): show_cache[show_id] = m.show yield m - def walk_shows(self, shows: set[Media], title="Processing Shows"): + async def walk_shows(self, shows: set[Media], title="Processing Shows"): if not shows: return - yield from self.progressbar(shows, desc=title) + async for show in self.progressbar(shows, desc=title): + yield show - def get_plex_episodes(self, episodes: list[Episode]) -> Generator[Media, Any, None]: + async def get_plex_episodes(self, episodes: list[Episode]) -> Generator[Media, Any, None]: it = self.progressbar(episodes, desc="Processing episodes") - for pe in it: + async for pe in it: guid = PlexGuid(pe.grandparentGuid, "show") show = self.mf.resolve_guid(guid) if not show: @@ -143,7 +147,7 @@ def get_plex_episodes(self, episodes: list[Episode]) -> Generator[Media, Any, No me.show = show yield me - def media_from_sections(self, sections: list[PlexLibrarySection]) -> Generator[PlexLibraryItem, Any, None]: + async def media_from_sections(self, sections: list[PlexLibrarySection]) -> Generator[PlexLibraryItem, Any, None]: for section in sections: with measure_time(f"{section.title_link} processed", extra={"markup": True}): self.set_window_title(f"Processing {section.title}") @@ -161,42 +165,45 @@ def episodes_from_sections(self, sections: list[PlexLibrarySection]) -> Generato section.pager("episode"), desc=f"Processing {section.title_link}", ) - yield from it + async for m in it: + yield m - def media_from_items(self, libtype: str, items: list) -> Generator[PlexLibraryItem, Any, None]: + async def media_from_items(self, libtype: str, items: list) -> Generator[PlexLibraryItem, Any, None]: it = self.progressbar(items, desc=f"Processing {libtype}s") - for m in it: + async for m in it: yield PlexLibraryItem(m, plex=self.plex) - def episode_from_show(self, show: Media) -> Generator[Media, Any, None]: + async def episode_from_show(self, show: Media) -> Generator[Media, Any, None]: for pe in show.plex.episodes(): - me = self.mf.resolve_any(pe, show) + me = await self.mf.resolve_any(pe, show) if not me: continue me.show = show yield me - def progressbar(self, iterable: Iterable, **kwargs): + async def progressbar(self, iterable: Iterable, **kwargs): if self._progressbar: pb = self._progressbar(iterable, **kwargs) with pb as it: - yield from it + async for m in it: + yield m else: - yield from iterable + async for m in iterable: + yield m - def media_from_traktlist(self, items: Iterable, title="Trakt watchlist") -> Generator[Media, Any, None]: + async def media_from_traktlist(self, items: Iterable, title="Trakt watchlist") -> Generator[Media, Any, None]: it = self.progressbar(items, desc=f"Processing {title}") - for media in it: - tm = TraktItem(media) + async for media in it: + tm = TraktItem(media, trakt=self.trakt) m = self.mf.resolve_trakt(tm) yield m - def media_from_plexlist(self, items: Iterable) -> Generator[Media, Any, None]: + async def media_from_plexlist(self, items: Iterable) -> Generator[Media, Any, None]: it = self.progressbar(items, desc="Processing Plex watchlist") - for media in it: + async for media in it: pm = PlexLibraryItem(media, plex=self.plex) - m = self.mf.resolve_any(pm) + m = await self.mf.resolve_any(pm) if not m: continue yield m diff --git a/plextraktsync/sync.py b/plextraktsync/sync.py new file mode 100644 index 0000000000..39ab558c4c --- /dev/null +++ b/plextraktsync/sync.py @@ -0,0 +1,262 @@ +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING + +from plextraktsync.decorators.measure_time import measure_time +from plextraktsync.factory import logger +from plextraktsync.trakt.types import TraktMedia +from plextraktsync.trakt_list_util import TraktListUtil + +if TYPE_CHECKING: + from typing import Iterable + + from plextraktsync.config.SyncConfig import SyncConfig + from plextraktsync.media import Media + from plextraktsync.plan.Walker import Walker + from plextraktsync.plex.PlexApi import PlexApi + from plextraktsync.trakt.TraktApi import TraktApi + + +class Sync: + def __init__(self, config: SyncConfig, plex: PlexApi, trakt: TraktApi): + self.config = config + self.plex = plex + self.trakt = trakt + + @cached_property + def plex_wl(self): + from plextraktsync.plex.PlexWatchList import PlexWatchList + + return PlexWatchList(self.plex.watchlist()) + + @cached_property + def sync_wl(self): + return self.config.sync_wl and len(self.plex_wl) > 0 + + @cached_property + def trakt_wl(self): + from plextraktsync.trakt.TraktWatchlist import TraktWatchList + + return TraktWatchList(self.trakt.watchlist_movies + self.trakt.watchlist_shows) + + async def sync(self, walker: Walker, dry_run=False): + listutil = TraktListUtil() + is_partial = walker.is_partial + + if is_partial and self.config.clear_collected: + logger.warning("Running partial library sync. Clear collected will be disabled.") + + if self.config.update_plex_wl_as_pl: + if is_partial: + logger.warning("Running partial library sync. Watchlist as playlist won't update because it needs full library sync.") + else: + listutil.addList(None, "Trakt Watchlist", trakt_list=self.trakt.watchlist_movies) + + if self.config.sync_liked_lists: + if is_partial: + logger.warning("Partial walk, disabling liked lists updating. Liked lists won't update because it needs full library sync.") + else: + for lst in self.trakt.liked_lists: + listutil.addList(lst["listid"], lst["listname"]) + + if self.config.need_library_walk: + movie_trakt_ids = set() + async for movie in walker.find_movies(): + await self.sync_collection(movie, dry_run=dry_run) + self.sync_ratings(movie, dry_run=dry_run) + self.sync_watched(movie, dry_run=dry_run) + if not is_partial: + listutil.addPlexItemToLists(movie) + if self.config.clear_collected: + movie_trakt_ids.add(movie.trakt_id) + + if movie_trakt_ids: + self.clear_collected(self.trakt.movie_collection, movie_trakt_ids) + + shows = set() + episode_trakt_ids = set() + async for episode in walker.find_episodes(): + await self.sync_collection(episode, dry_run=dry_run) + self.sync_ratings(episode, dry_run=dry_run) + self.sync_watched(episode, dry_run=dry_run) + if not is_partial: + listutil.addPlexItemToLists(episode) + if self.config.clear_collected: + episode_trakt_ids.add(episode.trakt_id) + + if self.config.sync_ratings: + # collect shows for later ratings sync + shows.add(episode.show) + + if episode_trakt_ids: + self.clear_collected(self.trakt.episodes_collection, episode_trakt_ids) + + async for show in walker.walk_shows(shows, title="Syncing show ratings"): + self.sync_ratings(show, dry_run=dry_run) + + if self.config.update_plex_wl_as_pl or self.config.sync_liked_lists: + if is_partial: + logger.warning("Running partial library sync. Liked lists won't update because it needs full library sync.") + else: + with measure_time("Updated liked list"): + self.update_playlists(listutil, dry_run=dry_run) + + if walker.config.walk_watchlist and self.sync_wl: + with measure_time("Updated watchlist"): + await self.sync_watchlist(walker, dry_run=dry_run) + + def update_playlists(self, listutil: TraktListUtil, dry_run=False): + if dry_run: + return + + for tl in listutil.lists: + logger.debug(f"Updating Plex list '{tl.name}' ({len(tl.plex_items)} items)") + updated = self.plex.update_playlist(tl.name, tl.plex_items_sorted, description=tl.description) + logger.info(f"Plex list '{tl.name}' ({len(tl.plex_items)} items) {'updated' if updated else 'nothing to update'}") + + async def sync_collection(self, m: Media, dry_run=False): + if not self.config.plex_to_trakt["collection"]: + return + + if m.is_collected: + return + + logger.info(f"Adding to collection: {m.title_link}", extra={"markup": True}) + + if not dry_run: + m.add_to_collection() + + def sync_ratings(self, m: Media, dry_run=False): + if not self.config.sync_ratings: + return + + if m.plex_rating is m.trakt_rating: + return + + rating_priority = self.config["rating_priority"] + plex_to_trakt = self.config.plex_to_trakt["ratings"] + trakt_to_plex = self.config.trakt_to_plex["ratings"] + has_trakt = m.trakt_rating is not None + has_plex = m.plex_rating is not None + rate = None + + if rating_priority == "none": + # Only rate items with missing rating + if plex_to_trakt and has_plex and not has_trakt: + rate = "trakt" + elif trakt_to_plex and has_trakt and not has_plex: + rate = "plex" + + elif rating_priority == "trakt": + # If two-way rating sync, Trakt rating takes precedence over Plex rating + if trakt_to_plex and has_trakt: + rate = "plex" + elif plex_to_trakt and has_plex: + rate = "trakt" + + elif rating_priority == "plex": + # If two-way rating sync, Plex rating takes precedence over Trakt rating + if plex_to_trakt and has_plex: + rate = "trakt" + elif trakt_to_plex and has_trakt: + rate = "plex" + + if rate == "trakt": + logger.info(f"Rating {m.title_link} with {m.plex_rating} on Trakt (was {m.trakt_rating})", extra={"markup": True}) + if not dry_run: + m.trakt_rate() + + elif rate == "plex": + logger.info(f"Rating {m.title_link} with {m.trakt_rating} on Plex (was {m.plex_rating})", extra={"markup": True}) + if not dry_run: + m.plex_rate() + + def sync_watched(self, m: Media, dry_run=False): + if not self.config.sync_watched_status: + return + + if m.watched_on_plex is m.watched_on_trakt: + return + + if m.watched_on_plex: + if not self.config.plex_to_trakt["watched_status"]: + return + + if m.is_episode and m.watched_before_reset: + show = m.plex.item.show() + logger.info(f"Show '{show.title}' has been reset in trakt at {m.show_reset_at}.") + logger.info(f"Marking '{show.title}' as unwatched in Plex.") + if not dry_run: + m.reset_show() + else: + logger.info(f"Marking as watched in Trakt: {m.title_link}", extra={"markup": True}) + if not dry_run: + m.mark_watched_trakt() + elif m.watched_on_trakt: + if not self.config.trakt_to_plex["watched_status"]: + return + logger.info(f"Marking as watched in Plex: {m.title_link}", extra={"markup": True}) + if not dry_run: + m.mark_watched_plex() + + def watchlist_sync_item(self, m: Media, dry_run=False): + if m.plex is None: + if self.config.update_plex_wl: + logger.info(f"Skipping {m.title_link} from Trakt watchlist because not found in Plex Discover", extra={"markup": True}) + elif self.config.update_trakt_wl: + logger.info(f"Removing {m.title_link} from Trakt watchlist", extra={"markup": True}) + if not dry_run: + m.remove_from_trakt_watchlist() + return + + if m in self.plex_wl: + if m not in self.trakt_wl: + if self.config.update_trakt_wl: + logger.info(f"Adding {m.title_link} to Trakt watchlist", extra={"markup": True}) + if not dry_run: + m.add_to_trakt_watchlist() + else: + logger.info(f"Removing {m.title_link} from Plex watchlist", extra={"markup": True}) + if not dry_run: + m.remove_from_plex_watchlist() + else: + # Plex Online search is inaccurate, and it doesn't offer search by id. + # Remove known match from trakt watchlist, so that the search would not be attempted. + # Example, trakt id 187634 where title mismatches: + # - "The Vortex": https://trakt.tv/movies/the-vortex-2012 + # - "Big Bad Bugs": https://app.plex.tv/desktop/#!/provider/tv.plex.provider.vod/details?key=%2Flibrary%2Fmetadata%2F60185c5891c237002b37653d + del self.trakt_wl[m] + elif m in self.trakt_wl: + if self.config.update_plex_wl: + logger.info(f"Adding {m.title_link} to Plex watchlist", extra={"markup": True}) + if not dry_run: + m.add_to_plex_watchlist() + else: + logger.info(f"Removing {m.title_link} from Trakt watchlist", extra={"markup": True}) + if not dry_run: + m.remove_from_trakt_watchlist() + + async def sync_watchlist(self, walker: Walker, dry_run=False): + # NOTE: Plex watchlist sync removes matching items from trakt lists + # See the comment above around "del self.trakt_wl[m]" + async for m in walker.media_from_plexlist(self.plex_wl): + self.watchlist_sync_item(m, dry_run) + + # Because Plex syncing might have emptied the watchlists, skip printing the 0/0 progress + if len(self.trakt_wl): + async for m in walker.media_from_traktlist(self.trakt_wl): + self.watchlist_sync_item(m, dry_run) + + def clear_collected(self, existing_items: Iterable[TraktMedia], keep_ids: set[int], dry_run=False): + from plextraktsync.trakt.trakt_set import trakt_set + + existing_ids = trakt_set(existing_items) + delete_ids = existing_ids - keep_ids + delete_items = (tm for tm in existing_items if tm.trakt in delete_ids) + + n = len(delete_ids) + for i, tm in enumerate(delete_items, start=1): + logger.info(f"Remove from Trakt collection ({i}/{n}): {tm}") + if not dry_run: + self.trakt.remove_from_collection(tm) diff --git a/tests/test_async.py b/tests/test_async.py index 81fc090b1b..e98ab317cf 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -8,6 +8,17 @@ from tests.conftest import factory +async def items(max_items: int): + for item in range(max_items): + yield item + + +async def test_async_generator(): + async for item in items(3): + print(item) + # print(c) + + @pytest.mark.asyncio async def test_plex_sync(): walker: Walker = factory.walker @@ -17,3 +28,4 @@ async def test_plex_sync(): if __name__ == '__main__': asyncio.run(test_plex_sync()) + # asyncio.run(test_async_generator())