From c5ac237ebe081c90d85175487fb2a63bd705c07c Mon Sep 17 00:00:00 2001 From: Avi Date: Sun, 25 Aug 2024 01:00:29 +0300 Subject: [PATCH] zing init --- .../server/providers/zing/__init__.py | 638 ++++++++++++++++++ .../server/providers/zing/helpers.py | 370 ++++++++++ .../server/providers/zing/icon.svg | 1 + .../server/providers/zing/manifest.json | 10 + 4 files changed, 1019 insertions(+) create mode 100644 music_assistant/server/providers/zing/__init__.py create mode 100644 music_assistant/server/providers/zing/helpers.py create mode 100644 music_assistant/server/providers/zing/icon.svg create mode 100644 music_assistant/server/providers/zing/manifest.json diff --git a/music_assistant/server/providers/zing/__init__.py b/music_assistant/server/providers/zing/__init__.py new file mode 100644 index 000000000..f9bc72638 --- /dev/null +++ b/music_assistant/server/providers/zing/__init__.py @@ -0,0 +1,638 @@ +from music_assistant.server.models.music_provider import MusicProvider +from music_assistant.common.models.media_items import ( + Album, + Artist, + Track, + SearchResults, + ProviderMapping, + MediaItemImage, + ImageType, + AudioFormat, + ItemMapping, +) +from music_assistant.common.models.enums import ( + MediaType, + ContentType, + StreamType, + ProviderFeature, +) +from music_assistant.common.models.streamdetails import StreamDetails +from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType +from collections.abc import AsyncGenerator + +from music_assistant.common.models.enums import ( + ConfigEntryType, + ProviderFeature, + StreamType, +) +from gql import gql, Client +from gql.transport.requests import RequestsHTTPTransport + + +# Define the async function to setup the provider instance +async def setup(mass, manifest, config) -> MusicProvider: + """Initialize provider(instance) with given configuration.""" + return JewishMusicProvider(mass, manifest, config) + + +# Define the async function to get config entries for the provider +async def get_config_entries( + mass, + instance_id: str | None = None, + action: str | None = None, + values: dict[str, ConfigValueType] | None = None, +) -> tuple[ConfigEntry, ...]: + """ + Return Config entries to setup this provider. + + instance_id: id of an existing provider instance (None if new instance setup). + action: [optional] action key called from config entries UI. + values: the (intermediate) raw values for config entries sent with the action. + """ + return ( + ConfigEntry( + key="api_key", + type=ConfigEntryType.STRING, + label="API Key", + required=True, + description="Enter your API key for the Jewish Music provider.", + ), + ) + + +# Define the JewishMusicProvider class +class JewishMusicProvider(MusicProvider): + def __init__(self, mass, manifest, config): + super().__init__(mass, manifest, config) + self.api_url = "http://jewishmusic.fm:4000/graphql" + self.client = Client( + transport=RequestsHTTPTransport(url=self.api_url, verify=True, retries=3), + fetch_schema_from_transport=True, + ) + + @property + def supported_features(self) -> tuple: + """Return the features supported by this Provider.""" + return ( + ProviderFeature.SEARCH, + ProviderFeature.BROWSE, + ProviderFeature.LIBRARY_ALBUMS, + ProviderFeature.LIBRARY_ARTISTS, + ProviderFeature.LIBRARY_TRACKS, + ProviderFeature.ARTIST_ALBUMS, + ) + + @property + def is_streaming_provider(self) -> bool: + """Return True if the provider is a streaming provider.""" + return True + + async def search( + self, + search_query: str, + media_types: list[MediaType], + limit: int = 5, + ) -> SearchResults: + """Perform search on the music provider.""" + query = gql( + """ + query SearchByName($term: String!, $skip: Int!, $take: Int!) { + artists( + where: { + OR: [ + { enName: { contains: $term } }, + { heName: { contains: $term } } + ] + }, + skip: $skip, + take: $take + ) { + id + enName + heName + images { + large + medium + small + } + } + + albums( + where: { + OR: [ + { enName: { contains: $term } }, + { heName: { contains: $term } } + ] + }, + skip: $skip, + take: $take + ) { + id + enName + heName + releasedAt + artists { + id + enName + heName + } + images { + large + medium + small + } + } + + tracks( + where: { + OR: [ + { enName: { contains: $term } }, + { heName: { contains: $term } } + ] + }, + skip: $skip, + take: $take + ) { + id + trackNumber + enName + heName + duration + file + album { + id + enName + heName + } + artists { + id + enName + heName + } + images + } + } + + + + + """ + ) + + params = {"term": search_query, "take": limit, "skip": 0} + response = self.client.execute(query, variable_values=params) + + tracks = [] + for track in response["tracks"]: + parsed_track = self._parse_track(track) + tracks.append(parsed_track) + + albums = [] + for album in response["albums"]: + parsed_album = self._parse_album(album) + albums.append(parsed_album) + + artists = [] + for artist in response["artists"]: + parsed_artist = self._parse_album(artist) + artists.append(parsed_artist) + + return SearchResults(tracks=tracks, albums=albums, artists=artists) + + async def get_library_artists(self) -> list[Artist]: + query = gql( + """ + query GetCatArtists( + $term: String! + $skip: Int! + $count: Int! + $category: String! + ) { + __typename + artists( + take: $count + skip: $skip + orderBy: { heName: asc } + where: { + OR: [ + { enName: { contains: $term, mode: insensitive } } + { heName: { contains: $term, mode: insensitive } } + ] + categories: { + some: { enName: { contains: $category, mode: insensitive } } + } + } + ) { + __typename + id + enName + heName + images { + __typename + small + medium + large + } + } + } + """ + ) + params = {"term": "", "skip": 0, "count": 10, "category": "popular artist"} + response = self.client.execute(query, variable_values=params) + + artists_obj = response["artists"] + for artist in artists_obj: + self._parse_artist(artist) + + async def get_library_albums(self) -> list[Album]: + """Get full artist details by id.""" + query = gql( + """ + query GetAllAlbums($skip: Int!, $take: Int!) { + albums( + orderBy: { releasedAt: desc }, + skip: $skip, + take: $take + ) { + id + enName + heName + artists { + id + enName + heName + } + images { + large + small + medium + } + } + } + """ + ) + params = {"skip": 0, "take": 10} + response = self.client.execute(query, variable_values=params) + + albums_obj = response["albums"] + albums_list = [] + + for album in albums_obj: + parsed_album = self._parse_album(album) + albums_list.append(parsed_album) + + return albums_list + + async def get_library_tracks(self) -> list[Track]: + query = gql( + """ + query GetPopularTracks($count: Int!) { + tracks(take: $count) { + id + enName + heName + file + duration + album { + id + enName + heName + artists { + id + heName + enName + images { + large + medium + small + } + } + images { + small + medium + large + } + } + } + } + """ + ) + params = {"count": 10} + response = self.client.execute(query, variable_values=params) + + tracks_obj = response["tracks"] + for track in tracks_obj: + self._parse_track(track) + + tracks_list = [] + + for track in tracks_obj: + parsed_track = self._parse_track(track) + tracks_list.append(parsed_track) + + return tracks_list + + async def get_artist_albums(self, prov_artist_id) -> list[Album]: + """Get a list of all albums for the given artist.""" + query = gql( + """ + query GetAlbumsByArtist($artistId: Int!, $orderBy: [AlbumOrderByWithRelationInput!], $take: Int, $skip: Int) { + albums(where: { artists: { some: { id: { equals: $artistId } } } }, orderBy: $orderBy, take: $take, skip: $skip) { + id + enName + heName + artists { + id + enName + heName + } + images { + large + small + medium + } + } + } + + """ + ) + params = {"term": "", "skip": 0, "count": 50, "artistId": prov_artist_id} + response = self.client.execute(query, variable_values=params) + + albums_obj = response["albums"] + + albums_list = [] + + for album in albums_obj: + parsed_album = self._parse_album(album) + albums_list.append(parsed_album) + + return albums_list + + async def get_album(self, prov_album_id) -> Album: + """Get full album details by id.""" + query = gql( + """ + query GetAlbumById($albumId: Int!) { + album(where: { id: $albumId }) { + id + enName + heName + enDesc + heDesc + artists { + id + enName + heName + } + images { + large + medium + small + } + } + } + + """ + ) + params = {"albumId": prov_album_id} + response = self.client.execute(query, variable_values=params) + + album_data = response["album"] + self._parse_album(album_data) + + async def get_album_tracks(self, prov_album_id: str) -> list[Track]: + query = gql( + """ + query GetAlbumTracks($albumId: Int!) { + album(where: { id: $albumId }) { + id + enName + heName + tracks { + id + trackNumber + enName + heName + duration + file + artists { + id + enName + heName + } + } + } + } + + """ + ) + params = {"albumId": prov_album_id} + response = self.client.execute(query, variable_values=params) + + tracks_obj = response["album"]["tracks"] + tracks = [] + for track in tracks_obj: + parsed_track = self._parse_track(track) + tracks.append(parsed_track) + return tracks + + async def get_track(self, prov_track_id) -> Track: + """Get full track details by id.""" + query = gql( + """ + query GetTrackById($trackId: Int!) { + track(where: { id: $trackId }) { + id + trackNumber + enName + heName + file + duration + album { + id + enName + heName + } + artists { + id + enName + heName + } + genres { + id + enName + heName + } + images + } + } + + """ + ) + params = {"trackId": int(prov_track_id)} + response = self.client.execute(query, variable_values=params) + track_data = response["track"] + res = self._parse_track(track_data) + return res + + async def get_artist(self, prov_artist_id) -> Artist: + query = gql( + """ + query GetArtistById($artistId: Int!) { + artist(where: { id: $artistId }) { + id + enName + heName + images { + large + medium + small + } + } + } + """ + ) + params = {"artistId": prov_artist_id} + response = self.client.execute(query, variable_values=params) + + artist_obj = response["artist"] + self._parse_artist(artist_obj) + + def _parse_artist(self, artist_obj: dict) -> Artist: + """Parse a YT Artist response to Artist model object.""" + + artist = Artist( + item_id=artist_obj["id"], + name=artist_obj["heName"] or artist_obj["enName"], + provider=self.domain, + provider_mappings={ + ProviderMapping( + item_id=artist_obj["id"], + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + + if artist_obj.get("images"): + artist.metadata.images = self._parse_thumbnails(artist_obj["images"]) + return artist + + def _parse_album(self, album_obj) -> Album: + album = Album( + item_id=album_obj["id"], + name=album_obj["heName"] or album_obj["enName"], + provider=self.domain, + provider_mappings={ + ProviderMapping( + item_id=album_obj["id"], + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + ) + if album_obj.get("artists"): + album.artists = [ + self._get_artist_item_mapping(artist) for artist in album_obj["artists"] + ] + + if album_obj.get("images"): + album.metadata.images = self._parse_thumbnails(album_obj["images"]) + + return album + + def _parse_track(self, track_obj: dict) -> Track: + base_url = "https://jewishmusic.fm/wp-content/uploads/secretmusicfolder1/" + track = Track( + item_id=track_obj["id"], + provider=self.domain, + name=track_obj["heName"] or track_obj["enName"], + provider_mappings={ + ProviderMapping( + item_id=track_obj["id"], + provider_domain=self.domain, + provider_instance=self.instance_id, + available=True, + url=track_obj["file"], + audio_format=AudioFormat( + content_type=ContentType.MP3, + ), + ) + }, + disc_number=0, # not supported on YTM? + track_number=track_obj.get("trackNumber", 0), + ) + + if track_obj.get("artists"): + track.artists = [ + self._get_artist_item_mapping(artist) for artist in track_obj["artists"] + ] + + if ( + track_obj.get("album") + and isinstance(track_obj.get("album"), dict) + and track_obj["album"].get("id") + ): + album = track_obj["album"] + track.album = self._get_item_mapping( + MediaType.ALBUM, album["id"], album["heName"] or album["enName"] + ) + + if "duration" in track_obj and str(track_obj["duration"]).isdigit(): + track.duration = int(track_obj["duration"]) + + return track + + def _parse_thumbnails(self, thumbnails_obj: dict) -> list[MediaItemImage]: + """Parse and YTM thumbnails to MediaItemImage.""" + result: list[MediaItemImage] = [] + processed_images = set() + + # Assuming thumbnails_obj contains keys like 'small', 'medium', 'large', etc. + for size_key, url in thumbnails_obj.items(): + # Dummy values for width and height based on the size_key. + if size_key == "small": + width, height = 150, 150 + elif size_key == "medium": + width, height = 300, 300 + else: # assuming "large" or any other size + width, height = 600, 600 + + image_ratio: float = width / height + image_type = ImageType.LANDSCAPE if image_ratio > 2.0 else ImageType.THUMB + + # Base URL + url_base = url + + if (url_base, image_type) in processed_images: + continue + + processed_images.add((url_base, image_type)) + result.append( + MediaItemImage( + type=image_type, + path=url, + provider=self.lookup_key, + remotely_accessible=True, + ) + ) + + return result + + def _get_item_mapping( + self, media_type: MediaType, key: str, name: str + ) -> ItemMapping: + return ItemMapping( + media_type=media_type, + item_id=key, + provider=self.instance_id, + name=name, + ) + + def _get_artist_item_mapping(self, artist_obj: dict) -> ItemMapping: + return self._get_item_mapping( + MediaType.ARTIST, + artist_obj["id"], + artist_obj["heName"] or artist_obj["enName"], + ) diff --git a/music_assistant/server/providers/zing/helpers.py b/music_assistant/server/providers/zing/helpers.py new file mode 100644 index 000000000..6af6dc7d6 --- /dev/null +++ b/music_assistant/server/providers/zing/helpers.py @@ -0,0 +1,370 @@ +"""Helper module for parsing the Youtube Music API. + +This helpers file is an async wrapper around the excellent ytmusicapi package. +While the ytmusicapi package does an excellent job at parsing the Youtube Music results, +it is unfortunately not async, which is required for Music Assistant to run smoothly. +This also nicely separates the parsing logic from the Youtube Music provider logic. +""" + +import asyncio +from time import time + +import ytmusicapi +from aiohttp import ClientSession +from ytmusicapi.constants import ( + OAUTH_CLIENT_ID, + OAUTH_CLIENT_SECRET, + OAUTH_CODE_URL, + OAUTH_SCOPE, + OAUTH_TOKEN_URL, + OAUTH_USER_AGENT, +) + +from music_assistant.server.helpers.auth import AuthenticationHelper + + +async def get_artist( + prov_artist_id: str, headers: dict[str, str], language: str = "en" +) -> dict[str, str]: + """Async wrapper around the ytmusicapi get_artist function.""" + + def _get_artist(): + ytm = ytmusicapi.YTMusic(auth=headers, language=language) + try: + artist = ytm.get_artist(channelId=prov_artist_id) + # ChannelId can sometimes be different and original ID is not part of the response + artist["channelId"] = prov_artist_id + except KeyError: + try: + user = ytm.get_user(channelId=prov_artist_id) + artist = {"channelId": prov_artist_id, "name": user["name"]} + except KeyError: + artist = {"channelId": prov_artist_id, "name": "Unknown"} + return artist + + return await asyncio.to_thread(_get_artist) + + +async def get_album(prov_album_id: str, language: str = "en") -> dict[str, str]: + """Async wrapper around the ytmusicapi get_album function.""" + + def _get_album(): + ytm = ytmusicapi.YTMusic(language=language) + return ytm.get_album(browseId=prov_album_id) + + return await asyncio.to_thread(_get_album) + + +async def get_playlist( + prov_playlist_id: str, headers: dict[str, str], language: str = "en" +) -> dict[str, str]: + """Async wrapper around the ytmusicapi get_playlist function.""" + + def _get_playlist(): + ytm = ytmusicapi.YTMusic(auth=headers, language=language) + playlist = ytm.get_playlist(playlistId=prov_playlist_id, limit=None) + playlist["checksum"] = get_playlist_checksum(playlist) + # Fix missing playlist id in some edge cases + playlist["id"] = prov_playlist_id if not playlist.get("id") else playlist["id"] + return playlist + + return await asyncio.to_thread(_get_playlist) + + +async def get_track( + prov_track_id: str, headers: dict[str, str], language: str = "en" +) -> dict[str, str] | None: + """Async wrapper around the ytmusicapi get_playlist function.""" + + def _get_song(): + ytm = ytmusicapi.YTMusic(auth=headers, language=language) + track_obj = ytm.get_song(videoId=prov_track_id) + track = {} + if "videoDetails" not in track_obj: + # video that no longer exists + return None + track["videoId"] = track_obj["videoDetails"]["videoId"] + track["title"] = track_obj["videoDetails"]["title"] + track["artists"] = [ + { + "channelId": track_obj["videoDetails"]["channelId"], + "name": track_obj["videoDetails"]["author"], + } + ] + track["duration"] = track_obj["videoDetails"]["lengthSeconds"] + track["thumbnails"] = track_obj["microformat"]["microformatDataRenderer"]["thumbnail"][ + "thumbnails" + ] + track["isAvailable"] = track_obj["playabilityStatus"]["status"] == "OK" + return track + + return await asyncio.to_thread(_get_song) + + +async def get_library_artists(headers: dict[str, str], language: str = "en") -> dict[str, str]: + """Async wrapper around the ytmusicapi get_library_artists function.""" + + def _get_library_artists(): + ytm = ytmusicapi.YTMusic(auth=headers, language=language) + artists = ytm.get_library_subscriptions(limit=9999) + # Sync properties with uniformal artist object + for artist in artists: + artist["id"] = artist["browseId"] + artist["name"] = artist["artist"] + del artist["browseId"] + del artist["artist"] + return artists + + return await asyncio.to_thread(_get_library_artists) + + +async def get_library_albums(headers: dict[str, str], language: str = "en") -> dict[str, str]: + """Async wrapper around the ytmusicapi get_library_albums function.""" + + def _get_library_albums(): + ytm = ytmusicapi.YTMusic(auth=headers, language=language) + return ytm.get_library_albums(limit=9999) + + return await asyncio.to_thread(_get_library_albums) + + +async def get_library_playlists(headers: dict[str, str], language: str = "en") -> dict[str, str]: + """Async wrapper around the ytmusicapi get_library_playlists function.""" + + def _get_library_playlists(): + ytm = ytmusicapi.YTMusic(auth=headers, language=language) + playlists = ytm.get_library_playlists(limit=9999) + # Sync properties with uniformal playlist object + for playlist in playlists: + playlist["id"] = playlist["playlistId"] + del playlist["playlistId"] + playlist["checksum"] = get_playlist_checksum(playlist) + return playlists + + return await asyncio.to_thread(_get_library_playlists) + + +async def get_library_tracks(headers: dict[str, str], language: str = "en") -> dict[str, str]: + """Async wrapper around the ytmusicapi get_library_tracks function.""" + + def _get_library_tracks(): + ytm = ytmusicapi.YTMusic(auth=headers, language=language) + return ytm.get_library_songs(limit=9999) + + return await asyncio.to_thread(_get_library_tracks) + + +async def library_add_remove_artist( + headers: dict[str, str], prov_artist_id: str, add: bool = True +) -> bool: + """Add or remove an artist to the user's library.""" + + def _library_add_remove_artist(): + ytm = ytmusicapi.YTMusic(auth=headers) + if add: + return "actions" in ytm.subscribe_artists(channelIds=[prov_artist_id]) + if not add: + return "actions" in ytm.unsubscribe_artists(channelIds=[prov_artist_id]) + return None + + return await asyncio.to_thread(_library_add_remove_artist) + + +async def library_add_remove_album( + headers: dict[str, str], prov_item_id: str, add: bool = True +) -> bool: + """Add or remove an album or playlist to the user's library.""" + album = await get_album(prov_album_id=prov_item_id) + + def _library_add_remove_album(): + ytm = ytmusicapi.YTMusic(auth=headers) + playlist_id = album["audioPlaylistId"] + if add: + return ytm.rate_playlist(playlist_id, "LIKE") + if not add: + return ytm.rate_playlist(playlist_id, "INDIFFERENT") + return None + + return await asyncio.to_thread(_library_add_remove_album) + + +async def library_add_remove_playlist( + headers: dict[str, str], prov_item_id: str, add: bool = True +) -> bool: + """Add or remove an album or playlist to the user's library.""" + + def _library_add_remove_playlist(): + ytm = ytmusicapi.YTMusic(auth=headers) + if add: + return "actions" in ytm.rate_playlist(prov_item_id, "LIKE") + if not add: + return "actions" in ytm.rate_playlist(prov_item_id, "INDIFFERENT") + return None + + return await asyncio.to_thread(_library_add_remove_playlist) + + +async def add_remove_playlist_tracks( + headers: dict[str, str], prov_playlist_id: str, prov_track_ids: list[str], add: bool +) -> bool: + """Async wrapper around adding/removing tracks to a playlist.""" + + def _add_playlist_tracks(): + ytm = ytmusicapi.YTMusic(auth=headers) + if add: + return ytm.add_playlist_items(playlistId=prov_playlist_id, videoIds=prov_track_ids) + if not add: + return ytm.remove_playlist_items(playlistId=prov_playlist_id, videos=prov_track_ids) + return None + + return await asyncio.to_thread(_add_playlist_tracks) + + +async def get_song_radio_tracks( + headers: dict[str, str], prov_item_id: str, limit=25 +) -> dict[str, str]: + """Async wrapper around the ytmusicapi radio function.""" + + def _get_song_radio_tracks(): + ytm = ytmusicapi.YTMusic(auth=headers) + playlist_id = f"RDAMVM{prov_item_id}" + result = ytm.get_watch_playlist( + videoId=prov_item_id, playlistId=playlist_id, limit=limit, radio=True + ) + # Replace inconsistensies for easier parsing + for track in result["tracks"]: + if track.get("thumbnail"): + track["thumbnails"] = track["thumbnail"] + del track["thumbnail"] + if track.get("length"): + track["duration"] = get_sec(track["length"]) + return result + + return await asyncio.to_thread(_get_song_radio_tracks) + + +async def search( + query: str, ytm_filter: str | None = None, limit: int = 20, language: str = "en" +) -> list[dict]: + """Async wrapper around the ytmusicapi search function.""" + + def _search(): + ytm = ytmusicapi.YTMusic(language=language) + results = ytm.search(query=query, filter=ytm_filter, limit=limit) + # Sync result properties with uniformal objects + for result in results: + if result["resultType"] == "artist": + if "artists" in result and len(result["artists"]) > 0: + result["id"] = result["artists"][0]["id"] + result["name"] = result["artists"][0]["name"] + del result["artists"] + else: + result["id"] = result["browseId"] + result["name"] = result["artist"] + del result["browseId"] + del result["artist"] + elif result["resultType"] == "playlist": + if "playlistId" in result: + result["id"] = result["playlistId"] + del result["playlistId"] + elif "browseId" in result: + result["id"] = result["browseId"] + del result["browseId"] + return results[:limit] + + return await asyncio.to_thread(_search) + + +def get_playlist_checksum(playlist_obj: dict) -> str: + """Try to calculate a checksum so we can detect changes in a playlist.""" + for key in ("duration_seconds", "trackCount", "count"): + if key in playlist_obj: + return playlist_obj[key] + return str(int(time())) + + +def is_brand_account(username: str) -> bool: + """Check if the provided username is a brand-account.""" + return len(username) == 21 and username.isdigit() + + +def get_sec(time_str): + """Get seconds from time.""" + parts = time_str.split(":") + if len(parts) == 3: + return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2]) + if len(parts) == 2: + return int(parts[0]) * 60 + int(parts[1]) + return 0 + + +async def login_oauth(auth_helper: AuthenticationHelper): + """Use device login to get a token.""" + http_session = auth_helper.mass.http_session + code = await get_oauth_code(http_session) + return await visit_oauth_auth_url(auth_helper, code) + + +def _get_data_and_headers(data: dict): + """Prepare headers for OAuth requests.""" + data.update({"client_id": OAUTH_CLIENT_ID}) + headers = {"User-Agent": OAUTH_USER_AGENT} + return data, headers + + +async def get_oauth_code(session: ClientSession): + """Get the OAuth code from the server.""" + data, headers = _get_data_and_headers({"scope": OAUTH_SCOPE}) + async with session.post(OAUTH_CODE_URL, json=data, headers=headers) as code_response: + return await code_response.json() + + +async def visit_oauth_auth_url(auth_helper: AuthenticationHelper, code: dict[str, str]): + """Redirect the user to the OAuth login page and wait for the token.""" + auth_url = f"{code['verification_url']}?user_code={code['user_code']}" + auth_helper.send_url(auth_url=auth_url) + device_code = code["device_code"] + expiry = code["expires_in"] + interval = code["interval"] + while expiry > 0: + token = await get_oauth_token_from_code(auth_helper.mass.http_session, device_code) + if token.get("access_token"): + return token + await asyncio.sleep(interval) + expiry -= interval + msg = "You took too long to log in" + raise TimeoutError(msg) + + +async def get_oauth_token_from_code(session: ClientSession, device_code: str): + """Check if the OAuth token is ready yet.""" + data, headers = _get_data_and_headers( + data={ + "client_secret": OAUTH_CLIENT_SECRET, + "grant_type": "http://oauth.net/grant_type/device/1.0", + "code": device_code, + } + ) + async with session.post( + OAUTH_TOKEN_URL, + json=data, + headers=headers, + ) as token_response: + return await token_response.json() + + +async def refresh_oauth_token(session: ClientSession, refresh_token: str): + """Refresh an expired OAuth token.""" + data, headers = _get_data_and_headers( + { + "client_secret": OAUTH_CLIENT_SECRET, + "grant_type": "refresh_token", + "refresh_token": refresh_token, + } + ) + async with session.post( + OAUTH_TOKEN_URL, + json=data, + headers=headers, + ) as response: + return await response.json() diff --git a/music_assistant/server/providers/zing/icon.svg b/music_assistant/server/providers/zing/icon.svg new file mode 100644 index 000000000..4f033d9e2 --- /dev/null +++ b/music_assistant/server/providers/zing/icon.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/music_assistant/server/providers/zing/manifest.json b/music_assistant/server/providers/zing/manifest.json new file mode 100644 index 000000000..fc2af6197 --- /dev/null +++ b/music_assistant/server/providers/zing/manifest.json @@ -0,0 +1,10 @@ +{ + "type": "music", + "domain": "zing", + "name": "Zing Music", + "description": "Support for the Zing Music streaming provider in Music Assistant.", + "codeowners": ["@MarvinSchenkel"], + "requirements": ["gql==3.0.0a5", "requests_toolbelt"], + "documentation": "https://music-assistant.io/music-providers/youtube-music/", + "multi_instance": true +}