From 9fd4814e1fcca5dce00a5e89fbaf3369311b2b8e Mon Sep 17 00:00:00 2001 From: 7x11x13 <32209764+7x11x13@users.noreply.github.com> Date: Tue, 9 Jul 2024 10:51:23 -0400 Subject: [PATCH] Typecheck with mypy, lint and format with ruff (#499) * Lint and format with ruff, check types with mypy * Fix mypy checks * Update CI * Remove shebangs * Ignore EXE rules * Fix EOL * Fix mypy for 3.7 * Fix CI * Fix CI * Ensure filelock path exists * ci: various ci fixes (#500) --------- Co-authored-by: Arsenii es3n1n --- .github/workflows/pypi.yml | 35 ++ .gitignore | 3 +- mypy.ini | 3 + requirements.dev.txt | 6 +- ruff.toml | 13 + scdl/__init__.py | 2 +- scdl/metadata_assembler.py | 102 ++-- scdl/scdl.py | 949 ++++++++++++++++++++++--------------- scdl/utils.py | 64 +-- setup.py | 19 +- tests/__init__.py | 0 tests/test_playlist.py | 32 +- tests/test_track.py | 67 +-- tests/test_user.py | 29 +- tests/utils.py | 26 +- 15 files changed, 801 insertions(+), 549 deletions(-) create mode 100644 mypy.ini create mode 100644 ruff.toml create mode 100644 tests/__init__.py diff --git a/.github/workflows/pypi.yml b/.github/workflows/pypi.yml index 6514d806..e69abe95 100644 --- a/.github/workflows/pypi.yml +++ b/.github/workflows/pypi.yml @@ -1,10 +1,45 @@ name: pypi-publish on: push: + branches: [ master ] tags: - 'v*' + pull_request: + branches: [ master ] jobs: + test: + strategy: + matrix: + version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12'] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install ffmpeg + run: | + sudo apt update + sudo apt install -yq --no-install-recommends ffmpeg + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.version }} + - name: Install dependencies + run: | + pip install -e .[dev] + - name: Lint + run: ruff check + - name: Format check + run: ruff format --check + - name: Type check + run: mypy + - name: Test + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + AUTH_TOKEN: ${{ secrets.AUTH_TOKEN }} + run: | + pytest --exitfirst publish: + needs: test + if: startsWith(github.ref, 'refs/tags/v') runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 diff --git a/.gitignore b/.gitignore index e36fdd9b..9e7a77ec 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ __pycache__/ .venv .env .coverage* -.idea \ No newline at end of file +.idea +.python-version \ No newline at end of file diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000..4c663af9 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,3 @@ +[mypy] +packages = scdl, tests +check_untyped_defs = true \ No newline at end of file diff --git a/requirements.dev.txt b/requirements.dev.txt index 7bcb30c8..35c96cc0 100644 --- a/requirements.dev.txt +++ b/requirements.dev.txt @@ -1,2 +1,6 @@ pytest -music-tag \ No newline at end of file +music-tag +ruff +mypy +types-requests +types-tqdm \ No newline at end of file diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 00000000..782d5b92 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,13 @@ +target-version = "py37" +line-length = 100 + +[lint] +select = ["ALL"] +ignore = [ + "C90", "D", + "S", "BLE", "FBT", "A", "EM", "FA", "G", "SLF", "PTH", + "PLR", "TRY", + "PLW2901", "ANN204", + "COM812", "ISC001", + "EXE" +] \ No newline at end of file diff --git a/scdl/__init__.py b/scdl/__init__.py index c9040c1a..caadcfa1 100644 --- a/scdl/__init__.py +++ b/scdl/__init__.py @@ -1,3 +1,3 @@ -# -*- encoding: utf-8 -*- """Python Soundcloud Music Downloader.""" + __version__ = "v2.10.0" diff --git a/scdl/metadata_assembler.py b/scdl/metadata_assembler.py index 7701a159..a9c5198f 100644 --- a/scdl/metadata_assembler.py +++ b/scdl/metadata_assembler.py @@ -1,12 +1,11 @@ from base64 import b64encode from dataclasses import dataclass -from typing import Optional, Type, TypeVar, Union, Callable -from types import MappingProxyType +from functools import singledispatch +from typing import Optional, Union -from mutagen import FileType, flac, oggopus, id3, wave, mp3, mp4 +from mutagen import FileType, flac, id3, mp3, mp4, oggopus, wave - -JPEG_MIME_TYPE: str = 'image/jpeg' +JPEG_MIME_TYPE: str = "image/jpeg" @dataclass(frozen=True) @@ -26,6 +25,11 @@ class MetadataInfo: album_track_num: Optional[int] +@singledispatch +def assemble_metadata(file: FileType, meta: MetadataInfo) -> None: # noqa: ARG001 + raise NotImplementedError + + def _get_flac_pic(jpeg_data: bytes) -> flac.Picture: pic = flac.Picture() pic.data = jpeg_data @@ -39,119 +43,113 @@ def _get_apic(jpeg_data: bytes) -> id3.APIC: encoding=3, mime=JPEG_MIME_TYPE, type=3, - desc='Cover', + desc="Cover", data=jpeg_data, ) def _assemble_common(file: FileType, meta: MetadataInfo) -> None: - file['artist'] = meta.artist - file['title'] = meta.title + file["artist"] = meta.artist + file["title"] = meta.title if meta.genre: - file['genre'] = meta.genre + file["genre"] = meta.genre if meta.link: - file['website'] = meta.link + file["website"] = meta.link if meta.date: - file['date'] = meta.date + file["date"] = meta.date if meta.album_title: - file['album'] = meta.album_title + file["album"] = meta.album_title if meta.album_author: - file['albumartist'] = meta.album_author + file["albumartist"] = meta.album_author if meta.album_track_num is not None: - file['tracknumber'] = str(meta.album_track_num) + file["tracknumber"] = str(meta.album_track_num) -def _assemble_flac(file: flac.FLAC, meta: MetadataInfo) -> None: +@assemble_metadata.register(flac.FLAC) +def _(file: flac.FLAC, meta: MetadataInfo) -> None: _assemble_common(file, meta) if meta.description: - file['description'] = meta.description + file["description"] = meta.description if meta.artwork_jpeg: file.add_picture(_get_flac_pic(meta.artwork_jpeg)) -def _assemble_opus(file: oggopus.OggOpus, meta: MetadataInfo) -> None: +@assemble_metadata.register(oggopus.OggOpus) +def _(file: oggopus.OggOpus, meta: MetadataInfo) -> None: _assemble_common(file, meta) if meta.description: - file['comment'] = meta.description + file["comment"] = meta.description if meta.artwork_jpeg: pic = _get_flac_pic(meta.artwork_jpeg).write() - file['metadata_block_picture'] = b64encode(pic).decode() + file["metadata_block_picture"] = b64encode(pic).decode() -def _assemble_wav_or_mp3(file: Union[wave.WAVE, mp3.MP3], meta: MetadataInfo) -> None: - file['TIT2'] = id3.TIT2(encoding=3, text=meta.title) - file['TPE1'] = id3.TPE1(encoding=3, text=meta.artist) +@assemble_metadata.register(mp3.MP3) +@assemble_metadata.register(wave.WAVE) +def _(file: Union[wave.WAVE, mp3.MP3], meta: MetadataInfo) -> None: + file["TIT2"] = id3.TIT2(encoding=3, text=meta.title) + file["TPE1"] = id3.TPE1(encoding=3, text=meta.artist) if meta.description: - file['COMM'] = id3.COMM(encoding=3, lang='ENG', text=meta.description) + file["COMM"] = id3.COMM(encoding=3, lang="ENG", text=meta.description) if meta.genre: - file['TCON'] = id3.TCON(encoding=3, text=meta.genre) + file["TCON"] = id3.TCON(encoding=3, text=meta.genre) if meta.link: - file['WOAS'] = id3.WOAS(url=meta.link) + file["WOAS"] = id3.WOAS(url=meta.link) if meta.date: - file['TDAT'] = id3.TDAT(encoding=3, text=meta.date) + file["TDAT"] = id3.TDAT(encoding=3, text=meta.date) if meta.album_title: - file['TALB'] = id3.TALB(encoding=3, text=meta.album_title) + file["TALB"] = id3.TALB(encoding=3, text=meta.album_title) if meta.album_author: - file['TPE2'] = id3.TPE2(encoding=3, text=meta.album_author) + file["TPE2"] = id3.TPE2(encoding=3, text=meta.album_author) if meta.album_track_num is not None: - file['TRCK'] = id3.TRCK(encoding=3, text=str(meta.album_track_num)) + file["TRCK"] = id3.TRCK(encoding=3, text=str(meta.album_track_num)) if meta.artwork_jpeg: - file['APIC'] = _get_apic(meta.artwork_jpeg) + file["APIC"] = _get_apic(meta.artwork_jpeg) -def _assemble_mp4(file: mp4.MP4, meta: MetadataInfo) -> None: - file['\251ART'] = meta.artist - file['\251nam'] = meta.title +@assemble_metadata.register(mp4.MP4) +def _(file: mp4.MP4, meta: MetadataInfo) -> None: + file["\251ART"] = meta.artist + file["\251nam"] = meta.title if meta.genre: - file['\251gen'] = meta.genre + file["\251gen"] = meta.genre if meta.link: - file['\251cmt'] = meta.link + file["\251cmt"] = meta.link if meta.date: - file['\251day'] = meta.date + file["\251day"] = meta.date if meta.album_title: - file['\251alb'] = meta.album_title + file["\251alb"] = meta.album_title if meta.album_author: - file['aART'] = meta.album_author + file["aART"] = meta.album_author if meta.album_track_num is not None: - file['trkn'] = str(meta.album_track_num) + file["trkn"] = str(meta.album_track_num) if meta.description: - file['desc'] = meta.description + file["desc"] = meta.description if meta.artwork_jpeg: - file['covr'] = [mp4.MP4Cover(meta.artwork_jpeg)] - - -T = TypeVar('T') -METADATA_ASSEMBLERS: MappingProxyType[Type[T], Callable[[T, MetadataInfo], None]] = MappingProxyType({ - flac.FLAC: _assemble_flac, - oggopus.OggOpus: _assemble_opus, - wave.WAVE: _assemble_wav_or_mp3, - mp3.MP3: _assemble_wav_or_mp3, - mp4.MP4: _assemble_mp4, -}) - + file["covr"] = [mp4.MP4Cover(meta.artwork_jpeg)] diff --git a/scdl/scdl.py b/scdl/scdl.py index 168b25ae..4f93a576 100644 --- a/scdl/scdl.py +++ b/scdl/scdl.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python3 -# -*- encoding: utf-8 -*- - """scdl allows you to download music from Soundcloud Usage: @@ -21,7 +18,8 @@ -h --help Show this screen --version Show version -l [url] URL can be track/playlist/user - -n [maxtracks] Download the n last tracks of a playlist according to the creation date + -n [maxtracks] Download the n last tracks of a playlist according to the + creation date -a Download all tracks of user (including reposts) -t Download all uploads of a user (no reposts) -f Download all favorites (likes) of a user @@ -30,7 +28,8 @@ -r Download all reposts of user -c Continue if a downloaded file already exists --force-metadata This will set metadata on already downloaded track - -o [offset] Start downloading a playlist from the [offset]th track (starting with 1) + -o [offset] Start downloading a playlist from the [offset]th track + Indexing starts with 1. --addtimestamp Add track creation timestamp to filename, which allows for chronological sorting (Deprecated. Use --name-format instead.) @@ -49,16 +48,21 @@ --onlymp3 Download only mp3 files --path [path] Use a custom path for downloaded files --remove Remove any files not downloaded from execution - --sync [file] Compares an archive file to a playlist and downloads/removes any changed tracks - --flac Convert original files to .flac. Only works if the original file is lossless quality - --no-album-tag On some player track get the same cover art if from the same album, this prevent it + --sync [file] Compares an archive file to a playlist and downloads/removes + any changed tracks + --flac Convert original files to .flac. Only works if the original + file is lossless quality + --no-album-tag On some player track get the same cover art if from the same + album, this prevent it --original-art Download original cover art, not just 500x500 JPEG --original-name Do not change name of original file downloads --original-metadata Do not change metadata of original file downloads --no-original Do not download original file; only mp3, m4a, or opus --only-original Only download songs with original file available - --name-format [format] Specify the downloaded file name format. Use "-" to download to stdout - --playlist-name-format [format] Specify the downloaded file name format, if it is being downloaded as part of a playlist + --name-format [format] Specify the downloaded file name format. Use "-" to download + to stdout + --playlist-name-format [format] Specify the downloaded file name format, if it is being + downloaded as part of a playlist --client-id [id] Specify the client_id to use --auth-token [token] Specify the auth token to use --overwrite Overwrite file if it already exists @@ -75,39 +79,61 @@ import logging import math import mimetypes -import threading -import tempfile -from typing import List, Optional, TypedDict, Tuple, IO, Union -import secrets - -mimetypes.init() - import os import pathlib +import secrets import shutil import subprocess import sys +import tempfile +import threading import time import traceback +import typing import urllib.parse import warnings from dataclasses import asdict +from types import TracebackType +from typing import IO, Generator, List, NoReturn, Optional, Tuple, Type, Union -import filelock -import mutagen -from mutagen.easymp4 import EasyMP4 +from tqdm import tqdm -EasyMP4.RegisterTextKey("website", "purl") +if sys.version_info < (3, 8): + from typing_extensions import TypedDict +else: + from typing import TypedDict +if sys.version_info < (3, 11): + from typing_extensions import NotRequired +else: + from typing import NotRequired + +import filelock +import mutagen import requests from docopt import docopt from pathvalidate import sanitize_filename -from soundcloud import (BasicAlbumPlaylist, BasicTrack, MiniTrack, SoundCloud, - Transcoding) -from tqdm import tqdm +from soundcloud import ( + AlbumPlaylist, + BasicAlbumPlaylist, + BasicTrack, + MiniTrack, + PlaylistLike, + PlaylistStreamItem, + PlaylistStreamRepostItem, + SoundCloud, + Track, + TrackLike, + TrackStreamItem, + TrackStreamRepostItem, + Transcoding, + User, +) from scdl import __version__, utils -from scdl.metadata_assembler import METADATA_ASSEMBLERS, MetadataInfo +from scdl.metadata_assembler import MetadataInfo, assemble_metadata + +mimetypes.init() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -115,53 +141,131 @@ FFMPEG_PIPE_CHUNK_SIZE = 1024 * 1024 # 1 mb -fileToKeep = [] +files_to_keep = [] + + +class SCDLArgs(TypedDict): + C: bool + a: bool + addtimestamp: bool + addtofile: bool + auth_token: Optional[str] + c: bool + client_id: Optional[str] + debug: bool + download_archive: Optional[str] + error: bool + extract_artist: bool + f: bool + flac: bool + force_metadata: bool + hide_progress: bool + hidewarnings: bool + l: str # noqa: E741 + max_size: Optional[int] + me: bool + min_size: Optional[int] + n: Optional[str] + name_format: str + no_album_tag: bool + no_original: bool + no_playlist: bool + no_playlist_folder: bool + o: Optional[int] + offset: NotRequired[int] + only_original: bool + onlymp3: bool + opus: bool + original_art: bool + original_metadata: bool + original_name: bool + overwrite: bool + p: bool + path: Optional[str] + playlist_name_format: str + playlist_offset: NotRequired[int] + r: bool + remove: bool + strict_playlist: bool + sync: Optional[str] + t: bool + + +class PlaylistInfo(TypedDict): + author: str + id: int + title: str + tracknumber_int: int + tracknumber: str + -class SoundCloudException(Exception): +class SoundCloudException(Exception): # noqa: N818 pass -def handle_exception(exc_type, exc_value, exc_traceback): + +class MissingFilenameError(SoundCloudException): + def __init__(self, content_disp_header: Optional[str]): + super().__init__( + f"Could not get filename from content-disposition header: {content_disp_header}", + ) + + +class InvalidFilesizeError(SoundCloudException): + def __init__(self, min_size: float, max_size: float, size: float): + super().__init__( + f"File size: {size} not within --min-size={min_size} and --max-size={max_size}", + ) + + +class RegionBlockError(SoundCloudException): + def __init__(self): + super().__init__("Track is not available in your location. Try using a VPN") + + +class FFmpegError(SoundCloudException): + def __init__(self, return_code: int, errors: str): + super().__init__(f"FFmpeg error ({return_code}): {errors}") + + +def handle_exception( + exc_type: Type[BaseException], + exc_value: BaseException, + exc_traceback: Optional[TracebackType], +) -> NoReturn: if issubclass(exc_type, KeyboardInterrupt): logger.error("\nGoodbye!") else: logger.error("".join(traceback.format_exception(exc_type, exc_value, exc_traceback))) sys.exit(1) + sys.excepthook = handle_exception -class PlaylistInfo(TypedDict): - author: str - id: int - title: str - file_lock_dirs: List[pathlib.Path] = [] -def clean_up_locks(): - for dir in file_lock_dirs: - for lock in dir.glob("*.scdl.lock"): - try: - lock.unlink(True) - except Exception: - pass +def clean_up_locks() -> None: + with contextlib.suppress(OSError): + for dir in file_lock_dirs: + for lock in dir.glob("*.scdl.lock"): + lock.unlink() atexit.register(clean_up_locks) -def get_filelock(path: pathlib.Path, timeout: int = 10): +def get_filelock(path: Union[pathlib.Path, str], timeout: int = 10) -> filelock.FileLock: path = pathlib.Path(path) + path.parent.mkdir(parents=True, exist_ok=True) path = path.resolve() file_lock_dirs.append(path.parent) lock_path = str(path) + ".scdl.lock" return filelock.FileLock(lock_path, timeout=timeout) -def main(): - """ - Main function, parses the URL from command line arguments - """ +def main() -> None: + """Main function, parses the URL from command line arguments""" logger.addHandler(logging.StreamHandler()) # exit if ffmpeg not installed @@ -195,11 +299,16 @@ def main(): if not client.is_client_id_valid(): if arguments["--client-id"]: - logger.warning(f"Invalid client_id specified by --client-id argument. Using a dynamically generated client_id...") + logger.warning( + "Invalid client_id specified by --client-id argument. " + "Using a dynamically generated client_id...", + ) elif config["scdl"]["client_id"]: - logger.warning(f"Invalid client_id in {config_file}. Using a dynamically generated client_id...") + logger.warning( + f"Invalid client_id in {config_file}. Using a dynamically generated client_id...", + ) else: - logger.info(f"Generating dynamic client_id") + logger.info("Generating dynamic client_id") client = SoundCloud(None, token if token else None) if not client.is_client_id_valid(): logger.error("Dynamically generated client_id is not valid") @@ -207,13 +316,12 @@ def main(): config["scdl"]["client_id"] = client.client_id # save client_id config_file.parent.mkdir(parents=True, exist_ok=True) - with get_filelock(config_file): - with open(config_file, "w", encoding="UTF-8") as f: - config.write(f) + with get_filelock(config_file), open(config_file, "w", encoding="UTF-8") as f: + config.write(f) if (token or arguments["me"]) and not client.is_auth_token_valid(): if arguments["--auth-token"]: - logger.error(f"Invalid auth_token specified by --auth-token argument") + logger.error("Invalid auth_token specified by --auth-token argument") else: logger.error(f"Invalid auth_token in {config_file}") sys.exit(1) @@ -222,7 +330,7 @@ def main(): try: arguments["--offset"] = int(arguments["-o"]) - 1 if arguments["--offset"] < 0: - raise ValueError() + raise ValueError except Exception: logger.error("Offset should be a positive integer...") sys.exit(1) @@ -232,9 +340,7 @@ def main(): try: arguments["--min-size"] = utils.size_in_bytes(arguments["--min-size"]) except Exception: - logger.exception( - "Min size should be an integer with a possible unit suffix" - ) + logger.exception("Min size should be an integer with a possible unit suffix") sys.exit(1) logger.debug("min-size: %d", arguments["--min-size"]) @@ -257,7 +363,9 @@ def main(): if arguments["me"]: # set url to profile associated with auth token - arguments["-l"] = client.get_me().permalink_url + me = client.get_me() + assert me is not None + arguments["-l"] = me.permalink_url arguments["-l"] = validate_url(client, arguments["-l"]) @@ -266,9 +374,7 @@ def main(): path = pathlib.Path(arguments["--download-archive"]).resolve() arguments["--download-archive"] = path except Exception: - logger.error( - f"Invalid download archive file {arguments['--download-archive']}" - ) + logger.error(f"Invalid download archive file {arguments['--download-archive']}") sys.exit(1) if arguments["--sync"]: @@ -287,67 +393,68 @@ def main(): python_args[key] = value # change download path - path = arguments["--path"] or config["scdl"]["path"] - if os.path.exists(path): - os.chdir(path) + dl_path: str = arguments["--path"] or config["scdl"]["path"] + if os.path.exists(dl_path): + os.chdir(dl_path) else: if arguments["--path"]: - logger.error(f"Invalid download path '{path}' specified by --path argument") + logger.error(f"Invalid download path '{dl_path}' specified by --path argument") else: - logger.error(f"Invalid download path '{path}' in {config_file}") + logger.error(f"Invalid download path '{dl_path}' in {config_file}") sys.exit(1) logger.debug("Downloading to " + os.getcwd() + "...") - download_url(client, **python_args) + download_url(client, typing.cast(SCDLArgs, python_args)) if arguments["--remove"]: remove_files() -def validate_url(client: SoundCloud, url: str): - """ - If url is a valid soundcloud.com url, return it. +def validate_url(client: SoundCloud, url: str) -> str: + """If url is a valid soundcloud.com url, return it. Otherwise, try to fix the url so that it is valid. If it cannot be fixed, exit the program. """ - if url.startswith("https://m.soundcloud.com") or url.startswith("http://m.soundcloud.com") or url.startswith("m.soundcloud.com"): + if url.startswith(("https://m.soundcloud.com", "http://m.soundcloud.com", "m.soundcloud.com")): url = url.replace("m.", "", 1) - if url.startswith("https://www.soundcloud.com") or url.startswith("http://www.soundcloud.com") or url.startswith("www.soundcloud.com"): + if url.startswith( + ("https://www.soundcloud.com", "http://www.soundcloud.com", "www.soundcloud.com"), + ): url = url.replace("www.", "", 1) if url.startswith("soundcloud.com"): url = "https://" + url - if url.startswith("https://soundcloud.com") or url.startswith("http://soundcloud.com"): - url = urllib.parse.urljoin(url, urllib.parse.urlparse(url).path) - return url - + if url.startswith(("https://soundcloud.com", "http://soundcloud.com")): + return urllib.parse.urljoin(url, urllib.parse.urlparse(url).path) + # see if link redirects to soundcloud.com try: resp = requests.get(url) - if url.startswith("https://soundcloud.com") or url.startswith("http://soundcloud.com"): + if url.startswith(("https://soundcloud.com", "http://soundcloud.com")): return urllib.parse.urljoin(resp.url, urllib.parse.urlparse(resp.url).path) except Exception: # see if given a username instead of url if client.resolve(f"https://soundcloud.com/{url}"): return f"https://soundcloud.com/{url}" - + logger.error("URL is not valid") sys.exit(1) + def get_config(config_file: pathlib.Path) -> configparser.ConfigParser: - """ - Gets config from scdl.cfg - """ + """Gets config from scdl.cfg""" config = configparser.ConfigParser() default_config_file = pathlib.Path(__file__).with_name("scdl.cfg") with get_filelock(config_file): # load default config first - config.read_file(open(default_config_file, encoding="UTF-8")) + with open(default_config_file, encoding="UTF-8") as f: + config.read_file(f) # load config file if it exists if config_file.exists(): - config.read_file(open(config_file, encoding="UTF-8")) + with open(config_file, encoding="UTF-8") as f: + config.read_file(f) # save config to disk config_file.parent.mkdir(parents=True, exist_ok=True) @@ -358,9 +465,7 @@ def get_config(config_file: pathlib.Path) -> configparser.ConfigParser: def truncate_str(s: str, length: int) -> str: - """ - Truncate string to a certain number of bytes using the file system encoding - """ + """Truncate string to a certain number of bytes using the file system encoding""" encoding = sys.getfilesystemencoding() bytes = s.encode(encoding) bytes = bytes[:length] @@ -372,53 +477,60 @@ def sanitize_str( ext: str = "", replacement_char: str = "�", max_length: int = 255, -): - """ - Sanitizes a string for use as a filename. Does not allow the file to be hidden - """ +) -> str: + """Sanitizes a string for use as a filename. Does not allow the file to be hidden""" if filename.startswith("."): filename = "_" + filename if filename.endswith(".") and not ext: filename = filename + "_" max_filename_length = max_length - len(ext) sanitized = sanitize_filename( - filename, replacement_text=replacement_char, max_len=max_filename_length + filename, + replacement_text=replacement_char, + max_len=max_filename_length, ) # sanitize_filename truncates incorrectly, use our own method sanitized = truncate_str(sanitized, max_filename_length) return sanitized + ext -def download_url(client: SoundCloud, **kwargs): - """ - Detects if a URL is a track or a playlist, and parses the track(s) +def download_url(client: SoundCloud, kwargs: SCDLArgs) -> None: + """Detects if a URL is a track or a playlist, and parses the track(s) to the track downloader """ - url = kwargs.get("l") + url = kwargs["l"] item = client.resolve(url) logger.debug(item) offset = kwargs.get("offset", 0) - if not item: + if item is None: logger.error("URL is not valid") sys.exit(1) - elif item.kind == "track": + elif isinstance(item, Track): logger.info("Found a track") - download_track(client, item, **kwargs) - elif item.kind == "playlist": + download_track(client, item, kwargs) + elif isinstance(item, AlbumPlaylist): logger.info("Found a playlist") - download_playlist(client, item, playlist_offset=offset, **kwargs) - elif item.kind == "user": + kwargs["playlist_offset"] = offset + download_playlist(client, item, kwargs) + elif isinstance(item, User): user = item logger.info("Found a user profile") if kwargs.get("f"): logger.info(f"Retrieving all likes of user {user.username}...") - resources = client.get_user_likes(user.id, limit=1000) - for i, like in itertools.islice(enumerate(resources, 1), offset, None): + likes = client.get_user_likes(user.id, limit=1000) + for i, like in itertools.islice(enumerate(likes, 1), offset, None): logger.info(f"like n°{i} of {user.likes_count}") - if hasattr(like, "track"): - download_track(client, like.track, exit_on_fail=kwargs.get("strict_playlist"), **kwargs) - elif hasattr(like, "playlist"): - download_playlist(client, client.get_playlist(like.playlist.id), **kwargs) + if isinstance(like, TrackLike): + download_track( + client, + like.track, + kwargs, + exit_on_fail=kwargs["strict_playlist"], + ) + elif isinstance(like, PlaylistLike): + playlist = client.get_playlist(like.playlist.id) + assert playlist is not None + download_playlist(client, playlist, kwargs) else: logger.error(f"Unknown like type {like}") if kwargs.get("strict_playlist"): @@ -426,50 +538,70 @@ def download_url(client: SoundCloud, **kwargs): logger.info(f"Downloaded all likes of user {user.username}!") elif kwargs.get("C"): logger.info(f"Retrieving all commented tracks of user {user.username}...") - resources = client.get_user_comments(user.id, limit=1000) - for i, comment in itertools.islice(enumerate(resources, 1), offset, None): + comments = client.get_user_comments(user.id, limit=1000) + for i, comment in itertools.islice(enumerate(comments, 1), offset, None): logger.info(f"comment n°{i} of {user.comments_count}") - download_track(client, client.get_track(comment.track.id), exit_on_fail=kwargs.get("strict_playlist"), **kwargs) + track = client.get_track(comment.track.id) + assert track is not None + download_track( + client, + track, + kwargs, + exit_on_fail=kwargs["strict_playlist"], + ) logger.info(f"Downloaded all commented tracks of user {user.username}!") elif kwargs.get("t"): logger.info(f"Retrieving all tracks of user {user.username}...") - resources = client.get_user_tracks(user.id, limit=1000) - for i, track in itertools.islice(enumerate(resources, 1), offset, None): + tracks = client.get_user_tracks(user.id, limit=1000) + for i, track in itertools.islice(enumerate(tracks, 1), offset, None): logger.info(f"track n°{i} of {user.track_count}") - download_track(client, track, exit_on_fail=kwargs.get("strict_playlist"), **kwargs) + download_track(client, track, kwargs, exit_on_fail=kwargs["strict_playlist"]) logger.info(f"Downloaded all tracks of user {user.username}!") elif kwargs.get("a"): logger.info(f"Retrieving all tracks & reposts of user {user.username}...") - resources = client.get_user_stream(user.id, limit=1000) - for i, item in itertools.islice(enumerate(resources, 1), offset, None): - logger.info(f"item n°{i} of {user.track_count + user.reposts_count if user.reposts_count else '?'}") - if item.type in ("track", "track-repost"): - download_track(client, item.track, exit_on_fail=kwargs.get("strict_playlist"), **kwargs) - elif item.type in ("playlist", "playlist-repost"): - download_playlist(client, item.playlist, **kwargs) + items = client.get_user_stream(user.id, limit=1000) + for i, stream_item in itertools.islice(enumerate(items, 1), offset, None): + logger.info( + f"item n°{i} of " + f"{user.track_count + user.reposts_count if user.reposts_count else '?'}", + ) + if isinstance(stream_item, (TrackStreamItem, TrackStreamRepostItem)): + download_track( + client, + stream_item.track, + kwargs, + exit_on_fail=kwargs["strict_playlist"], + ) + elif isinstance(stream_item, (PlaylistStreamItem, PlaylistStreamRepostItem)): + download_playlist(client, stream_item.playlist, kwargs) else: - logger.error(f"Unknown item type {item.type}") + logger.error(f"Unknown item type {stream_item.type}") if kwargs.get("strict_playlist"): sys.exit(1) logger.info(f"Downloaded all tracks & reposts of user {user.username}!") elif kwargs.get("p"): logger.info(f"Retrieving all playlists of user {user.username}...") - resources = client.get_user_playlists(user.id, limit=1000) - for i, playlist in itertools.islice(enumerate(resources, 1), offset, None): + playlists = client.get_user_playlists(user.id, limit=1000) + for i, playlist in itertools.islice(enumerate(playlists, 1), offset, None): logger.info(f"playlist n°{i} of {user.playlist_count}") - download_playlist(client, playlist, **kwargs) + download_playlist(client, playlist, kwargs) logger.info(f"Downloaded all playlists of user {user.username}!") elif kwargs.get("r"): logger.info(f"Retrieving all reposts of user {user.username}...") - resources = client.get_user_reposts(user.id, limit=1000) - for i, item in itertools.islice(enumerate(resources, 1), offset, None): + reposts = client.get_user_reposts(user.id, limit=1000) + for i, repost in itertools.islice(enumerate(reposts, 1), offset, None): logger.info(f"item n°{i} of {user.reposts_count or '?'}") - if item.type == "track-repost": - download_track(client, item.track, exit_on_fail=kwargs.get("strict_playlist"), **kwargs) - elif item.type == "playlist-repost": - download_playlist(client, item.playlist, **kwargs) + if isinstance(repost, TrackStreamRepostItem): + download_track( + client, + repost.track, + kwargs, + exit_on_fail=kwargs["strict_playlist"], + ) + elif isinstance(repost, PlaylistStreamRepostItem): + download_playlist(client, repost.playlist, kwargs) else: - logger.error(f"Unknown item type {item.type}") + logger.error(f"Unknown item type {repost.type}") if kwargs.get("strict_playlist"): sys.exit(1) logger.info(f"Downloaded all reposts of user {user.username}!") @@ -480,40 +612,36 @@ def download_url(client: SoundCloud, **kwargs): logger.error(f"Unknown item type {item.kind}") sys.exit(1) -def remove_files(): - """ - Removes any pre-existing tracks that were not just downloaded - """ + +def remove_files() -> None: + """Removes any pre-existing tracks that were not just downloaded""" logger.info("Removing local track files that were not downloaded...") files = [f for f in os.listdir(".") if os.path.isfile(f)] for f in files: - if f not in fileToKeep: + if f not in files_to_keep: os.remove(f) def sync( client: SoundCloud, - playlist: BasicAlbumPlaylist, + playlist: Union[AlbumPlaylist, BasicAlbumPlaylist], playlist_info: PlaylistInfo, - **kwargs, -): - """ - Downloads/Removes tracks that have been changed on playlist since last archive file - """ + kwargs: SCDLArgs, +) -> Tuple[Union[BasicTrack, MiniTrack], ...]: + """Downloads/Removes tracks that have been changed on playlist since last archive file""" logger.info("Comparing tracks...") archive = kwargs.get("sync") + assert archive is not None with get_filelock(archive): with open(archive) as f: try: old = [int(i) for i in "".join(f.readlines()).strip().split("\n")] - except IOError as ioe: + except OSError as ioe: logger.error(f"Error trying to read download archive {archive}") logger.debug(ioe) sys.exit(1) except ValueError as verr: - logger.error( - f"Error trying to convert track ids. Verify archive file is not empty." - ) + logger.error("Error trying to convert track ids. Verify archive file is not empty.") logger.debug(verr) sys.exit(1) @@ -528,12 +656,16 @@ def sync( if rem: for track_id in rem: removed = False + track = client.get_track(track_id) + if track is None: + logger.warning(f"Could not find track with id: {track_id}. Skipping removal") + continue for ext in (".mp3", ".m4a", ".opus", ".flac", ".wav"): filename = get_filename( - client.get_track(track_id), + track, + kwargs, ext, playlist_info=playlist_info, - **kwargs, ) if filename in os.listdir("."): removed = True @@ -549,26 +681,28 @@ def sync( logger.info("No tracks to remove.") if add: - return [track for track in playlist.tracks if track.id in add] - else: - logger.info("No tracks to download. Exiting...") - sys.exit(0) + return tuple(track for track in playlist.tracks if track.id in add) + logger.info("No tracks to download. Exiting...") + sys.exit(0) -def download_playlist(client: SoundCloud, playlist: BasicAlbumPlaylist, **kwargs): - """ - Downloads a playlist - """ +def download_playlist( + client: SoundCloud, + playlist: Union[AlbumPlaylist, BasicAlbumPlaylist], + kwargs: SCDLArgs, +) -> None: + """Downloads a playlist""" if kwargs.get("no_playlist"): logger.info("Skipping playlist...") return - playlist_name = playlist.title.encode("utf-8", "ignore") - playlist_name = playlist_name.decode("utf-8") + playlist_name = playlist.title.encode("utf-8", "ignore").decode("utf-8") playlist_name = sanitize_str(playlist_name) - playlist_info = { - "author": playlist.user.username, - "id": playlist.id, - "title": playlist.title + playlist_info: PlaylistInfo = { + "author": playlist.user.username, + "id": playlist.id, + "title": playlist.title, + "tracknumber_int": 0, + "tracknumber": "0", } if not kwargs.get("no_playlist_folder"): @@ -577,77 +711,89 @@ def download_playlist(client: SoundCloud, playlist: BasicAlbumPlaylist, **kwargs os.chdir(playlist_name) try: - if kwargs.get("n"): # Order by creation date and get the n lasts tracks - playlist.tracks.sort( - key=lambda track: track.id, reverse=True + n = kwargs.get("n") + if n is not None: # Order by creation date and get the n lasts tracks + playlist.tracks = tuple( + sorted(playlist.tracks, key=lambda track: track.id, reverse=True)[: int(n)], ) - playlist.tracks = playlist.tracks[: int(kwargs.get("n"))] kwargs["playlist_offset"] = 0 - if kwargs.get("sync"): - if os.path.isfile(kwargs.get("sync")): - playlist.tracks = sync(client, playlist, playlist_info, **kwargs) + s = kwargs.get("sync") + if s: + if os.path.isfile(s): + playlist.tracks = sync(client, playlist, playlist_info, kwargs) else: logger.error(f'Invalid sync archive file {kwargs.get("sync")}') sys.exit(1) tracknumber_digits = len(str(len(playlist.tracks))) - for counter, track in itertools.islice(enumerate(playlist.tracks, 1), kwargs.get("playlist_offset", 0), None): + for counter, track in itertools.islice( + enumerate(playlist.tracks, 1), + kwargs.get("playlist_offset", 0), + None, + ): logger.debug(track) logger.info(f"Track n°{counter}") + playlist_info["tracknumber_int"] = counter playlist_info["tracknumber"] = str(counter).zfill(tracknumber_digits) if isinstance(track, MiniTrack): if playlist.secret_token: track = client.get_tracks([track.id], playlist.id, playlist.secret_token)[0] else: - track = client.get_track(track.id) - - download_track(client, track, playlist_info, kwargs.get("strict_playlist"), **kwargs) + track = client.get_track(track.id) # type: ignore[assignment] + assert isinstance(track, BasicTrack) + download_track( + client, + track, + kwargs, + playlist_info, + kwargs["strict_playlist"], + ) finally: if not kwargs.get("no_playlist_folder"): os.chdir("..") -def try_utime(path, filetime): +def try_utime(path: str, filetime: float) -> None: try: os.utime(path, (time.time(), filetime)) except Exception: logger.error("Cannot update utime of file") -def is_downloading_to_stdout(**kwargs) -> bool: - return kwargs.get('name_format') == '-' +def is_downloading_to_stdout(kwargs: SCDLArgs) -> bool: + return kwargs.get("name_format") == "-" -def get_stdout(): +@contextlib.contextmanager +def get_stdout() -> Generator[IO, None, None]: # Credits: https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/utils/_utils.py#L575 - if sys.platform == 'win32': + if sys.platform == "win32": import msvcrt # stdout may be any IO stream, e.g. when using contextlib.redirect_stdout with contextlib.suppress(io.UnsupportedOperation): msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) - return getattr(sys.stdout, 'buffer', sys.stdout) + yield getattr(sys.stdout, "buffer", sys.stdout) def get_filename( - track: BasicTrack, + track: Union[BasicTrack, Track], + kwargs: SCDLArgs, ext: Optional[str] = None, original_filename: Optional[str] = None, playlist_info: Optional[PlaylistInfo] = None, - **kwargs, -): +) -> str: # Force stdout name on tracks that are being downloaded to stdout - if is_downloading_to_stdout(**kwargs): - return 'stdout' + if is_downloading_to_stdout(kwargs): + return "stdout" username = track.user.username title = track.title.encode("utf-8", "ignore").decode("utf-8") - if kwargs.get("addtofile"): - if username not in title and "-" not in title: - title = "{0} - {1}".format(username, title) - logger.debug('Adding "{0}" to filename'.format(username)) + if kwargs.get("addtofile") and username not in title and "-" not in title: + title = f"{username} - {title}" + logger.debug(f'Adding "{username}" to filename') timestamp = str(int(track.created_at.timestamp())) if kwargs.get("addtimestamp"): @@ -655,26 +801,29 @@ def get_filename( if not kwargs.get("addtofile") and not kwargs.get("addtimestamp"): if playlist_info: - title = kwargs.get("playlist_name_format").format(**asdict(track), playlist=playlist_info, timestamp=timestamp) + title = kwargs["playlist_name_format"].format( + **asdict(track), + playlist=playlist_info, + timestamp=timestamp, + ) else: - title = kwargs.get("name_format").format(**asdict(track), timestamp=timestamp) + title = kwargs["name_format"].format(**asdict(track), timestamp=timestamp) if original_filename is not None: original_filename = original_filename.encode("utf-8", "ignore").decode("utf-8") ext = os.path.splitext(original_filename)[1] - filename = sanitize_str(title, ext) - return filename + return sanitize_str(title, ext or "") def download_original_file( client: SoundCloud, - track: BasicTrack, + track: Union[BasicTrack, Track], title: str, + kwargs: SCDLArgs, playlist_info: Optional[PlaylistInfo] = None, - **kwargs, ) -> Tuple[Optional[str], bool]: logger.info("Downloading the original file.") - to_stdout = is_downloading_to_stdout(**kwargs) + to_stdout = is_downloading_to_stdout(kwargs) # Get the requests stream url = client.get_track_original_download(track.id, track.secret_token) @@ -698,7 +847,7 @@ def download_original_file( if "filename" in params: filename = urllib.parse.unquote(params["filename"][-1], encoding="utf-8") else: - raise SoundCloudException(f"Could not get filename from content-disposition header: {header}") + raise MissingFilenameError(header) orig_filename = filename _, ext = os.path.splitext(filename) @@ -707,13 +856,18 @@ def download_original_file( orig_filename, ext = os.path.splitext(filename) # Find file extension - mime = r.headers.get("content-type") - ext = ext or mimetypes.guess_extension(mime) - ext = ext or ("." + r.headers.get("x-amz-meta-file-type")) + ext = ( + ext + or mimetypes.guess_extension(r.headers["content-type"]) + or ("." + r.headers["x-amz-meta-file-type"]) + ) orig_filename += ext filename = get_filename( - track, original_filename=orig_filename, playlist_info=playlist_info, **kwargs + track, + kwargs, + original_filename=orig_filename, + playlist_info=playlist_info, ) logger.debug(f"filename : {filename}") @@ -724,64 +878,88 @@ def download_original_file( # Skip if file ID or filename already exists # We are always re-downloading to stdout - if not to_stdout and already_downloaded(track, title, filename, **kwargs): + if not to_stdout and already_downloaded(track, title, filename, kwargs): return filename, True re_encode_to_out( track, r, - ext[1:] if not encoding_to_flac else 'flac', + ext[1:] if not encoding_to_flac else "flac", not encoding_to_flac, # copy the stream only if we aren't re-encoding to flac filename, + kwargs, skip_re_encoding=not encoding_to_flac, - **kwargs, ) return filename, False -def get_transcoding_m3u8(client: SoundCloud, transcoding: Transcoding, **kwargs): +def get_transcoding_m3u8( + client: SoundCloud, + transcoding: Transcoding, + kwargs: SCDLArgs, +) -> str: url = transcoding.url - bitrate_KBps = 256 / 8 if "aac" in transcoding.preset else 128 / 8 + bitrate_KBps = 256 / 8 if "aac" in transcoding.preset else 128 / 8 # noqa: N806 total_bytes = bitrate_KBps * transcoding.duration min_size = kwargs.get("min_size") or 0 - max_size = kwargs.get("max_size") or math.inf # max size of 0 treated as no max size + max_size = kwargs.get("max_size") or math.inf # max size of 0 treated as no max size if not min_size <= total_bytes <= max_size: - raise SoundCloudException("File not within --min-size and --max-size bounds") + raise InvalidFilesizeError(min_size, max_size, total_bytes) if url is not None: headers = client._get_default_headers() if client.auth_token: headers["Authorization"] = f"OAuth {client.auth_token}" - r = requests.get(url, params={"client_id": client.client_id}, headers=headers) + + params = { + "client_id": client.client_id, + } + + r: Optional[requests.Response] = None + delay: int = 0 + + # If we got ratelimited + while not r or r.status_code == 429: + if delay > 0: + logger.warning(f"Got rate-limited, delaying for {delay}sec") + time.sleep(delay) + + r = requests.get(url, headers=headers, params=params) + delay = (delay or 1) * 2 # exponential backoff, what could possibly go wrong + + if r.status_code != 200: + raise SoundCloudException(f"Unable to get transcoding m3u8({r.status_code}): {r.text}") + logger.debug(r.url) return r.json()["url"] + raise SoundCloudException(f"Transcoding does not contain URL: {transcoding}") def download_hls( client: SoundCloud, - track: BasicTrack, + track: Union[BasicTrack, Track], title: str, + kwargs: SCDLArgs, playlist_info: Optional[PlaylistInfo] = None, - **kwargs, -) -> Tuple[Optional[str], bool]: +) -> Tuple[str, bool]: if not track.media.transcodings: raise SoundCloudException(f"Track {track.permalink_url} has no transcodings available") logger.debug(f"Transcodings: {track.media.transcodings}") transcodings = [t for t in track.media.transcodings if t.format.protocol == "hls"] - to_stdout = is_downloading_to_stdout(**kwargs) + to_stdout = is_downloading_to_stdout(kwargs) # ordered in terms of preference best -> worst valid_presets = [("mp3", ".mp3")] if not kwargs.get("onlymp3"): if kwargs.get("opus"): - valid_presets = [("opus", ".opus")] + valid_presets - valid_presets = [("aac", ".m4a")] + valid_presets + valid_presets = [("opus", ".opus"), *valid_presets] + valid_presets = [("aac", ".m4a"), *valid_presets] transcoding = None ext = None @@ -794,27 +972,30 @@ def download_hls( break else: raise SoundCloudException( - f"Could not find valid transcoding. Available transcodings: {[t.preset for t in track.media.transcodings if t.format.protocol == 'hls']}" + "Could not find valid transcoding. Available transcodings: " + f"{[t.preset for t in track.media.transcodings if t.format.protocol == 'hls']}", ) - filename = get_filename(track, ext=ext, playlist_info=playlist_info, **kwargs) + filename = get_filename(track, kwargs, ext=ext, playlist_info=playlist_info) logger.debug(f"filename : {filename}") # Skip if file ID or filename already exists - if not to_stdout and already_downloaded(track, title, filename, **kwargs): + if not to_stdout and already_downloaded(track, title, filename, kwargs): return filename, True # Get the requests stream - url = get_transcoding_m3u8(client, transcoding, **kwargs) + url = get_transcoding_m3u8(client, transcoding, kwargs) _, ext = os.path.splitext(filename) re_encode_to_out( track, url, - preset_name if preset_name != 'aac' else 'ipod', # We are encoding aac files to m4a, so an ipod codec is used + preset_name + if preset_name != "aac" + else "ipod", # We are encoding aac files to m4a, so an ipod codec is used True, # no need to fully re-encode the whole hls stream filename, + kwargs, playlist_info, - **kwargs, ) return filename, False @@ -822,14 +1003,12 @@ def download_hls( def download_track( client: SoundCloud, - track: BasicTrack, + track: Union[BasicTrack, Track], + kwargs: SCDLArgs, playlist_info: Optional[PlaylistInfo] = None, - exit_on_fail=True, - **kwargs, -): - """ - Downloads a track - """ + exit_on_fail: bool = True, +) -> None: + """Downloads a track""" try: title = track.title title = title.encode("utf-8", "ignore").decode("utf-8") @@ -841,15 +1020,15 @@ def download_track( # Geoblocked track if track.policy == "BLOCK": - raise SoundCloudException(f"{title} is not available in your location...") + raise RegionBlockError # Get user_id from the client - client_user_id = client.get_me().id if client.auth_token else None + me = client.get_me() if kwargs["auth_token"] else None + client_user_id = me and me.id lock = get_filelock(pathlib.Path(f"./{track.id}"), 0) # Downloadable track - downloaded_original = False filename = None is_already_downloaded = False if ( @@ -861,49 +1040,60 @@ def download_track( try: with lock: filename, is_already_downloaded = download_original_file( - client, track, title, playlist_info, **kwargs + client, + track, + title, + kwargs, + playlist_info, ) - downloaded_original = True except filelock.Timeout: logger.debug(f"Could not acquire lock: {lock}. Skipping") return if filename is None: if kwargs.get("only_original"): - raise SoundCloudException(f'Track "{track.permalink_url}" does not have original file available. Not downloading...') + raise SoundCloudException( + f'Track "{track.permalink_url}" does not have original file ' + "available. Not downloading...", + ) try: with lock: filename, is_already_downloaded = download_hls( - client, track, title, playlist_info, **kwargs + client, + track, + title, + kwargs, + playlist_info, ) except filelock.Timeout: logger.debug(f"Could not acquire lock: {lock}. Skipping") return if kwargs.get("remove"): - fileToKeep.append(filename) + files_to_keep.append(filename) - record_download_archive(track, **kwargs) + record_download_archive(track, kwargs) - to_stdout = is_downloading_to_stdout(**kwargs) + to_stdout = is_downloading_to_stdout(kwargs) # Skip if file ID or filename already exists if is_already_downloaded and not kwargs.get("force_metadata"): raise SoundCloudException(f"{filename} already downloaded.") # If file does not exist an error occurred - # If we are downloading to stdout and reached this point, then most likely we downloaded the track + # If we are downloading to stdout and reached this point, then most likely + # we downloaded the track if not os.path.isfile(filename) and not to_stdout: raise SoundCloudException(f"An error occurred downloading {filename}.") # Add metadata to an already existing file if needed - if is_already_downloaded and kwargs.get('force_metadata'): - with open(filename, 'rb') as f: + if is_already_downloaded and kwargs.get("force_metadata"): + with open(filename, "rb") as f: file_data = io.BytesIO(f.read()) - _add_metadata_to_stream(track, file_data, playlist_info, **kwargs) + _add_metadata_to_stream(track, file_data, kwargs, playlist_info) - with open(filename, 'wb') as f: + with open(filename, "wb") as f: file_data.seek(0) f.write(file_data.getbuffer()) @@ -919,26 +1109,25 @@ def download_track( sys.exit(1) -def can_convert(filename): +def can_convert(filename: str) -> bool: ext = os.path.splitext(filename)[1] return "wav" in ext or "aif" in ext -def already_downloaded(track: BasicTrack, title: str, filename: str, **kwargs): - """ - Returns True if the file has already been downloaded - """ +def already_downloaded( + track: Union[BasicTrack, Track], + title: str, + filename: str, + kwargs: SCDLArgs, +) -> bool: + """Returns True if the file has already been downloaded""" already_downloaded = False if os.path.isfile(filename): already_downloaded = True - if ( - kwargs.get("flac") - and can_convert(filename) - and os.path.isfile(filename[:-4] + ".flac") - ): + if kwargs.get("flac") and can_convert(filename) and os.path.isfile(filename[:-4] + ".flac"): already_downloaded = True - if kwargs.get("download_archive") and in_download_archive(track, **kwargs): + if kwargs.get("download_archive") and in_download_archive(track, kwargs): already_downloaded = True if kwargs.get("flac") and can_convert(filename) and os.path.isfile(filename): @@ -950,54 +1139,47 @@ def already_downloaded(track: BasicTrack, title: str, filename: str, **kwargs): if already_downloaded: if kwargs.get("c") or kwargs.get("remove") or kwargs.get("force_metadata"): return True - else: - logger.error(f'Track "{title}" already exists!') - logger.error("Exiting... (run again with -c to continue)") - sys.exit(1) + logger.error(f'Track "{title}" already exists!') + logger.error("Exiting... (run again with -c to continue)") + sys.exit(1) return False -def in_download_archive(track: BasicTrack, **kwargs): - """ - Returns True if a track_id exists in the download archive - """ +def in_download_archive(track: Union[BasicTrack, Track], kwargs: SCDLArgs) -> bool: + """Returns True if a track_id exists in the download archive""" archive_filename = kwargs.get("download_archive") if not archive_filename: - return + return False try: - with get_filelock(archive_filename): - with open(archive_filename, "a+", encoding="utf-8") as file: - file.seek(0) - track_id = str(track.id) - for line in file: - if line.strip() == track_id: - return True - except IOError as ioe: + with get_filelock(archive_filename), open(archive_filename, "a+", encoding="utf-8") as file: + file.seek(0) + track_id = str(track.id) + for line in file: + if line.strip() == track_id: + return True + except OSError as ioe: logger.error("Error trying to read download archive...") logger.error(ioe) return False -def record_download_archive(track: BasicTrack, **kwargs): - """ - Write the track_id in the download archive - """ +def record_download_archive(track: Union[BasicTrack, Track], kwargs: SCDLArgs) -> None: + """Write the track_id in the download archive""" archive_filename = kwargs.get("download_archive") if not archive_filename: return try: - with get_filelock(archive_filename): - with open(archive_filename, "a", encoding="utf-8") as file: - file.write(f"{track.id}\n") - except IOError as ioe: + with get_filelock(archive_filename), open(archive_filename, "a", encoding="utf-8") as file: + file.write(f"{track.id}\n") + except OSError as ioe: logger.error("Error trying to write to download archive...") logger.error(ioe) -def _try_get_artwork(url: str, size: str = 'original') -> Optional[requests.Response]: +def _try_get_artwork(url: str, size: str = "original") -> Optional[requests.Response]: new_artwork_url = url.replace("large", size) try: @@ -1006,8 +1188,8 @@ def _try_get_artwork(url: str, size: str = 'original') -> Optional[requests.Resp if artwork_response.status_code != 200: return None - content_type = artwork_response.headers.get('Content-Type', '').lower() - if content_type not in ('image/png', 'image/jpeg', 'image/jpg'): + content_type = artwork_response.headers.get("Content-Type", "").lower() + if content_type not in ("image/png", "image/jpeg", "image/jpg"): return None return artwork_response @@ -1019,62 +1201,61 @@ def build_ffmpeg_encoding_args( input_file: str, output_file: str, out_codec: str, - *args, + *args: str, ) -> List[str]: return [ - 'ffmpeg', - + "ffmpeg", # Disable all the useless stuff - '-loglevel', 'error', - '-hide_banner', - + "-loglevel", + "error", + "-hide_banner", # Input stream - '-i', input_file, - + "-i", + input_file, # Encoding - '-f', out_codec, - + "-f", + out_codec, # Progress to stderr - '-progress', 'pipe:2', - '-stats_period', '0.1', - + "-progress", + "pipe:2", + "-stats_period", + "0.1", # User provided arguments *args, - # Output file - output_file + output_file, ] def _write_streaming_response_to_pipe( response: requests.Response, pipe: Union[IO[bytes], io.BytesIO], - **kwargs, + kwargs: SCDLArgs, ) -> None: - total_length = int(response.headers.get("content-length")) + total_length = int(response.headers["content-length"]) min_size = kwargs.get("min_size") or 0 max_size = kwargs.get("max_size") or math.inf # max size of 0 treated as no max size if not min_size <= total_length <= max_size: - raise SoundCloudException("File not within --min-size and --max-size bounds") + raise InvalidFilesizeError(min_size, max_size, total_length) - logger.info('Receiving the streaming response') + logger.info("Receiving the streaming response") received = 0 chunk_size = 8192 with memoryview(bytearray(chunk_size)) as buffer: for chunk in tqdm( - iter(lambda: response.raw.read(chunk_size), b''), + iter(lambda: response.raw.read(chunk_size), b""), total=(total_length / chunk_size) + 1, - disable=bool(kwargs.get('hide_progress')), - unit='Kb', + disable=bool(kwargs.get("hide_progress")), + unit="Kb", unit_scale=chunk_size / 1024, ): if not chunk: break - buffer_view = buffer[:len(chunk)] + buffer_view = buffer[: len(chunk)] buffer_view[:] = chunk received += len(chunk) @@ -1091,10 +1272,10 @@ def _write_streaming_response_to_pipe( def _add_metadata_to_stream( - track: BasicTrack, + track: Union[BasicTrack, Track], stream: io.BytesIO, + kwargs: SCDLArgs, playlist_info: Optional[PlaylistInfo] = None, - **kwargs, ) -> None: logger.info("Applying metadata...") @@ -1102,14 +1283,14 @@ def _add_metadata_to_stream( artwork_response = None if kwargs.get("original_art"): - artwork_response = _try_get_artwork(artwork_base_url, 'original') + artwork_response = _try_get_artwork(artwork_base_url, "original") if artwork_response is None: - artwork_response = _try_get_artwork(artwork_base_url, 't500x500') + artwork_response = _try_get_artwork(artwork_base_url, "t500x500") artist: str = track.user.username - if bool(kwargs.get('extract_artist')): - for dash in {" - ", " − ", " – ", " — ", " ― "}: + if bool(kwargs.get("extract_artist")): + for dash in (" - ", " − ", " – ", " — ", " ― "): # noqa: RUF001 if dash not in track.title: continue @@ -1118,7 +1299,7 @@ def _add_metadata_to_stream( track.title = artist_title[1].strip() break - album_available: bool = playlist_info and not kwargs.get("no_album_tag") + album_available: bool = (playlist_info is not None) and not kwargs.get("no_album_tag") metadata = MetadataInfo( artist=artist, @@ -1127,66 +1308,69 @@ def _add_metadata_to_stream( genre=track.genre, artwork_jpeg=artwork_response.content if artwork_response else None, link=track.permalink_url, - date=track.created_at.strftime('%Y-%m-%d %H:%M:%S'), - album_title=playlist_info["title"] if album_available else None, - album_author=playlist_info["author"] if album_available else None, - album_track_num=playlist_info["tracknumber"] if album_available else None, + date=track.created_at.strftime("%Y-%m-%d %H:%M:%S"), + album_title=playlist_info["title"] if album_available else None, # type: ignore[index] + album_author=playlist_info["author"] if album_available else None, # type: ignore[index] + album_track_num=playlist_info["tracknumber_int"] if album_available else None, # type: ignore[index] ) mutagen_file = mutagen.File(stream) - handler = METADATA_ASSEMBLERS.get(type(mutagen_file), None) - if handler is None: - logger.error('Metadata assembling for this track is unsupported.\n' - 'Please create an issue at https://github.com/flyingrub/scdl/issues and we will look into it') - - kwargs_no_sensitive = {k: v for k, v in kwargs.items() if k not in ('auth_token',)} - logger.error(f'Here is the information that you should attach to your issue:\n' - f'- Track: {track.permalink_url}\n' - f'- First 16 bytes: {stream.getvalue()[:16].hex()}\n' - f'- Identified as: {type(mutagen_file)}\n' - f'- Configuration: {kwargs_no_sensitive}') - return + try: + # Delete all the existing tags and write our own tags + if mutagen_file is not None: + stream.seek(0) + mutagen_file.delete(stream) + assemble_metadata(mutagen_file, metadata) + except NotImplementedError: + logger.error( + "Metadata assembling for this track is unsupported.\n" + "Please create an issue at https://github.com/flyingrub/scdl/issues " + "and we will look into it", + ) - # Delete all the existing tags and write our own tags - stream.seek(0) - mutagen_file.delete(stream) - handler(mutagen_file, metadata) + kwargs_no_sensitive = {k: v for k, v in kwargs.items() if k not in ("auth_token",)} + logger.error( + f"Here is the information that you should attach to your issue:\n" + f"- Track: {track.permalink_url}\n" + f"- First 16 bytes: {stream.getvalue()[:16].hex()}\n" + f"- Identified as: {type(mutagen_file)}\n" + f"- Configuration: {kwargs_no_sensitive}", + ) + return stream.seek(0) mutagen_file.save(stream) def re_encode_to_out( - track: BasicTrack, + track: Union[BasicTrack, Track], in_data: Union[requests.Response, str], out_codec: str, should_copy: bool, filename: str, + kwargs: SCDLArgs, playlist_info: Optional[PlaylistInfo] = None, skip_re_encoding: bool = False, - **kwargs, ) -> None: - to_stdout = is_downloading_to_stdout(**kwargs) + to_stdout = is_downloading_to_stdout(kwargs) encoded = re_encode_to_buffer( track, in_data, out_codec, should_copy, + kwargs, playlist_info, skip_re_encoding, - **kwargs, ) - out_handle = get_stdout() if to_stdout else open(filename, 'wb') - shutil.copyfileobj(encoded, out_handle) - - if not to_stdout: - out_handle.close() + # see https://github.com/python/mypy/issues/5512 + with get_stdout() if to_stdout else open(filename, "wb") as out_handle: # type: ignore[attr-defined] + shutil.copyfileobj(encoded, out_handle) -def _is_ffmpeg_progress_line(parameters: List[str]): +def _is_ffmpeg_progress_line(parameters: List[str]) -> bool: return len(parameters) == 2 and parameters[0] in ( "progress", "speed", @@ -1206,18 +1390,24 @@ def _get_ffmpeg_pipe( should_copy: bool, output_file: str, ) -> subprocess.Popen: - is_url: bool = isinstance(in_data, str) logger.info("Creating the ffmpeg pipe...") commands = build_ffmpeg_encoding_args( - in_data if is_url else '-', + in_data if isinstance(in_data, str) else "-", output_file, out_codec, - *(('-c', 'copy',) if should_copy else ()) + *( + ( + "-c", + "copy", + ) + if should_copy + else () + ), ) logger.debug(f"ffmpeg command: {' '.join(commands)}") - pipe = subprocess.Popen( + return subprocess.Popen( commands, stdin=subprocess.PIPE, stderr=subprocess.PIPE, @@ -1225,13 +1415,9 @@ def _get_ffmpeg_pipe( bufsize=FFMPEG_PIPE_CHUNK_SIZE, ) - # Wrap stderr with TextIOWrapper for automatic decoding - pipe.stderr = io.TextIOWrapper(pipe.stderr, encoding='utf-8', errors=None) - return pipe - def _is_unsupported_codec_for_streaming(codec: str) -> bool: - return codec in ('ipod',) + return codec in ("ipod",) def _re_encode_ffmpeg( @@ -1239,28 +1425,29 @@ def _re_encode_ffmpeg( out_codec: str, track_duration_ms: int, should_copy: bool, - **kwargs, + kwargs: SCDLArgs, ) -> io.BytesIO: streaming_supported = not _is_unsupported_codec_for_streaming(out_codec) - out_file_name = 'pipe:1' # stdout + out_file_name = "pipe:1" # stdout if not streaming_supported: out_file_name = str(pathlib.Path(tempfile.gettempdir()) / secrets.token_hex(8)) pipe = _get_ffmpeg_pipe(in_data, out_codec, should_copy, out_file_name) - logger.info('Encoding..') - errors_output = '' + logger.info("Encoding..") + errors_output = "" stdout = io.BytesIO() - # Sadly, we have to iterate both stdout and stderr at the same times in order for things to work. - # This is why we have 2 threads that are reading stderr, and writing stuff to stdin at the same time. - # I don't think there is any other way how to get this working and make it as fast as it is now. + # Sadly, we have to iterate both stdout and stderr at the same times in order for + # things to work. This is why we have 2 threads that are reading stderr, and + # writing stuff to stdin at the same time. I don't think there is any other way + # to get this working and make it as fast as it is now. # A function that reads encoded track to our `stdout` BytesIO object - def read_stdout(): - for chunk in iter(lambda: pipe.stdout.read(FFMPEG_PIPE_CHUNK_SIZE), b''): - stdout.write(chunk) + def read_stdout() -> None: + assert pipe.stdout is not None + shutil.copyfileobj(pipe.stdout, stdout, FFMPEG_PIPE_CHUNK_SIZE) pipe.stdout.close() stdout_thread = None @@ -1275,8 +1462,7 @@ def read_stdout(): assert pipe.stdin is not None stdin_thread = threading.Thread( target=_write_streaming_response_to_pipe, - args=(in_data, pipe.stdin,), - kwargs=kwargs, + args=(in_data, pipe.stdin, kwargs), daemon=True, ) @@ -1288,25 +1474,22 @@ def read_stdout(): # Read progress from stderr line by line total_sec = track_duration_ms / 1000 - with tqdm( - total=total_sec, - disable=bool(kwargs.get("hide_progress")), - unit="s" - ) as progress: - last_secs = 0 - for line in iter(pipe.stderr.readline, ''): - parameters = line.split('=', maxsplit=1) + with tqdm(total=total_sec, disable=bool(kwargs.get("hide_progress")), unit="s") as progress: + last_secs = 0.0 + assert pipe.stderr is not None + for line in io.TextIOWrapper(pipe.stderr, encoding="utf-8", errors=None): + parameters = line.split("=", maxsplit=1) if not _is_ffmpeg_progress_line(parameters): errors_output += line continue - if not line.startswith('out_time_ms'): + if not line.startswith("out_time_ms"): continue try: seconds = int(parameters[1]) / 1_000_000 except ValueError: - seconds = 0 + seconds = 0.0 seconds = min(seconds, total_sec) # clamp just to be sure changed = seconds - last_secs @@ -1322,11 +1505,11 @@ def read_stdout(): # Make sure that process has exited and get its exit code pipe.wait() if pipe.returncode != 0: - raise SoundCloudException(f'FFmpeg error({pipe.returncode}): {errors_output}') + raise FFmpegError(pipe.returncode, errors_output) # Read from the temp file, if needed if not streaming_supported: - with open(out_file_name, 'rb') as f: + with open(out_file_name, "rb") as f: shutil.copyfileobj(f, stdout) os.remove(out_file_name) @@ -1336,40 +1519,38 @@ def read_stdout(): def _copy_stream( in_data: requests.Response, # streaming response or url - **kwargs, + kwargs: SCDLArgs, ) -> io.BytesIO: result = io.BytesIO() - _write_streaming_response_to_pipe(in_data, result, **kwargs) + _write_streaming_response_to_pipe(in_data, result, kwargs) result.seek(0) return result def re_encode_to_buffer( - track: BasicTrack, + track: Union[BasicTrack, Track], in_data: Union[requests.Response, str], # streaming response or url out_codec: str, should_copy: bool, + kwargs: SCDLArgs, playlist_info: Optional[PlaylistInfo] = None, skip_re_encoding: bool = False, - **kwargs, ) -> io.BytesIO: if skip_re_encoding and isinstance(in_data, requests.Response): - encoded_data = _copy_stream(in_data, **kwargs) + encoded_data = _copy_stream(in_data, kwargs) else: - encoded_data = _re_encode_ffmpeg(in_data, out_codec, track.duration, should_copy, **kwargs) + encoded_data = _re_encode_ffmpeg(in_data, out_codec, track.duration, should_copy, kwargs) # Remove original metadata, add our own, and we are done if not kwargs.get("original_metadata"): - _add_metadata_to_stream(track, encoded_data, playlist_info, **kwargs) + _add_metadata_to_stream(track, encoded_data, kwargs, playlist_info) encoded_data.seek(0) return encoded_data -def is_ffmpeg_available(): - """ - Returns true if ffmpeg is available in the operating system - """ +def is_ffmpeg_available() -> bool: + """Returns true if ffmpeg is available in the operating system""" return shutil.which("ffmpeg") is not None diff --git a/scdl/utils.py b/scdl/utils.py index 36ae0c8b..61c5deca 100644 --- a/scdl/utils.py +++ b/scdl/utils.py @@ -1,38 +1,38 @@ -# -*- encoding: utf-8 -*- - -""" -Copied from +"""Copied from https://github.com/davidfischer-ch/pytoolbox/blob/master/pytoolbox/logging.py """ import email.message import logging import re +from types import MappingProxyType +from typing import Dict, Optional + from termcolor import colored -__all__ = ('ColorizeFilter', ) +__all__ = ("ColorizeFilter",) class ColorizeFilter(logging.Filter): - - color_by_level = { - logging.DEBUG: 'blue', - logging.WARNING: 'yellow', - logging.ERROR: 'red', - logging.INFO: 'white' - } - - def filter(self, record): + COLOR_BY_LEVEL = MappingProxyType( + { + logging.DEBUG: "blue", + logging.WARNING: "yellow", + logging.ERROR: "red", + logging.INFO: "white", + }, + ) + + def filter(self, record: logging.LogRecord) -> bool: record.raw_msg = record.msg - color = self.color_by_level.get(record.levelno) + color = self.COLOR_BY_LEVEL.get(record.levelno) if color: - record.msg = colored(record.msg, color) + record.msg = colored(record.msg, color) # type: ignore[arg-type] return True -def size_in_bytes(insize): - """ - Returns the size in bytes from strings such as '5 mb' into 5242880. +def size_in_bytes(insize: str) -> int: + """Return the size in bytes from strings such as '5 mb' into 5242880. >>> size_in_bytes('1m') 1048576 @@ -49,20 +49,20 @@ def size_in_bytes(insize): raise ValueError('no string specified') ValueError: no string specified """ - if insize is None or insize.strip() == '': - raise ValueError('no string specified') + if insize is None or insize.strip() == "": + raise ValueError("no string specified") units = { - 'k': 1024, - 'm': 1024 ** 2, - 'g': 1024 ** 3, - 't': 1024 ** 4, - 'p': 1024 ** 5, + "k": 1024, + "m": 1024**2, + "g": 1024**3, + "t": 1024**4, + "p": 1024**5, } - match = re.search(r'^\s*([0-9\.]+)\s*([kmgtp])?', insize, re.I) + match = re.search(r"^\s*([0-9\.]+)\s*([kmgtp])?", insize, re.IGNORECASE) if match is None: - raise ValueError('match not found') + raise ValueError("match not found") size, unit = match.groups() @@ -75,7 +75,9 @@ def size_in_bytes(insize): return int(size) -def parse_header(content_disposition): +def parse_header(content_disposition: Optional[str]) -> Dict[str, str]: + if not content_disposition: + return {} message = email.message.Message() - message['content-type'] = content_disposition - return dict(message.get_params()) + message["content-type"] = content_disposition + return dict(message.get_params({})) diff --git a/setup.py b/setup.py index bcf12bb9..97f890b0 100755 --- a/setup.py +++ b/setup.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python3 -# -*- encoding: utf-8 -*- - from os import path from setuptools import find_packages, setup @@ -28,10 +25,22 @@ "requests", "tqdm", "pathvalidate", - "soundcloud-v2>=1.3.10", + "soundcloud-v2>=1.5.2", "filelock>=3.0.0", + "typing_extensions; python_version < '3.11'", ], - extras_require={"test": ["pytest", "pytest-cov", "pytest-dotenv", "music-tag"]}, + extras_require={ + "dev": [ + "pytest", + "pytest-cov", + "pytest-dotenv", + "music-tag", + "ruff", + "mypy", + "types-requests", + "types-tqdm", + ], + }, url="https://github.com/flyingrub/scdl", classifiers=[ "Programming Language :: Python", diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_playlist.py b/tests/test_playlist.py index 6397685d..9f5858e9 100644 --- a/tests/test_playlist.py +++ b/tests/test_playlist.py @@ -5,8 +5,10 @@ def assert_track_playlist_1( - tmp_path: Path, playlist_folder: str = "test playlist", check_metadata: bool = True -): + tmp_path: Path, + playlist_folder: str = "test playlist", + check_metadata: bool = True, +) -> None: expected_name = "1_testing - test track.mp3" assert_track( tmp_path / playlist_folder, @@ -19,8 +21,10 @@ def assert_track_playlist_1( def assert_track_playlist_2( - tmp_path: Path, playlist_folder: str = "test playlist", check_metadata: bool = True -): + tmp_path: Path, + playlist_folder: str = "test playlist", + check_metadata: bool = True, +) -> None: expected_name = "2_test track 2.mp3" assert_track( tmp_path / playlist_folder, @@ -33,7 +37,7 @@ def assert_track_playlist_2( ) -def test_playlist(tmp_path: Path): +def test_playlist(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -47,7 +51,7 @@ def test_playlist(tmp_path: Path): assert_track_playlist_2(tmp_path) -def test_n(tmp_path: Path): +def test_n(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -63,7 +67,7 @@ def test_n(tmp_path: Path): assert_not_track(tmp_path / "test playlist", "2_testing - test track.mp3") -def test_offset(tmp_path: Path): +def test_offset(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -82,7 +86,7 @@ def test_offset(tmp_path: Path): assert_track_playlist_2(tmp_path) -def test_no_playlist_folder(tmp_path: Path): +def test_no_playlist_folder(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -99,7 +103,7 @@ def test_no_playlist_folder(tmp_path: Path): assert_not_track(tmp_path / "test playlist", "2_test track 2.mp3") -def test_no_strict_playlist(tmp_path: Path): +def test_no_strict_playlist(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -114,7 +118,7 @@ def test_no_strict_playlist(tmp_path: Path): assert_not_track(tmp_path / "test playlist", "2_test track 2.mp3") -def test_strict_playlist(tmp_path: Path): +def test_strict_playlist(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -130,7 +134,7 @@ def test_strict_playlist(tmp_path: Path): assert_not_track(tmp_path / "test playlist", "2_test track 2.mp3") -def test_sync(tmp_path: Path): +def test_sync(tmp_path: Path) -> None: os.chdir(tmp_path) os.makedirs("test playlist") r = call_scdl_with_auth( @@ -159,8 +163,6 @@ def test_sync(tmp_path: Path): "archive.txt", ) assert r.returncode == 0 - assert_not_track( - tmp_path / "test playlist", "Wan Bushi - Eurodance Vibes (part 1+2+3).mp3" - ) - with open("archive.txt", "r") as f: + assert_not_track(tmp_path / "test playlist", "Wan Bushi - Eurodance Vibes (part 1+2+3).mp3") + with open("archive.txt") as f: assert f.read().split() == ["1855267053", "1855318536"] diff --git a/tests/test_track.py b/tests/test_track.py index 25c5e77c..44ae1152 100644 --- a/tests/test_track.py +++ b/tests/test_track.py @@ -1,11 +1,13 @@ +import math import os -import pytest from pathlib import Path +import pytest + from tests.utils import assert_not_track, assert_track, call_scdl_with_auth -def test_original_download(tmp_path: Path): +def test_original_download(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -17,7 +19,7 @@ def test_original_download(tmp_path: Path): assert_track(tmp_path, "track.wav", "copy", "saves", None) -def test_original_to_stdout(tmp_path: Path): +def test_original_to_stdout(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -27,12 +29,13 @@ def test_original_to_stdout(tmp_path: Path): encoding=None, ) assert r.returncode == 0 - with open('track.wav', 'wb') as f: + with open("track.wav", "wb") as f: + assert isinstance(r.stdout, bytes) f.write(r.stdout) assert_track(tmp_path, "track.wav", "copy", "saves", None) -def test_mp3_to_stdout(tmp_path: Path): +def test_mp3_to_stdout(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -44,13 +47,14 @@ def test_mp3_to_stdout(tmp_path: Path): ) assert r.returncode == 0 - with open('track.mp3', 'wb') as f: + with open("track.mp3", "wb") as f: + assert isinstance(r.stdout, bytes) f.write(r.stdout) assert_track(tmp_path, "track.mp3") -def test_flac_to_stdout(tmp_path: Path): +def test_flac_to_stdout(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -61,14 +65,15 @@ def test_flac_to_stdout(tmp_path: Path): encoding=None, ) - with open('track.flac', 'wb') as f: + with open("track.flac", "wb") as f: + assert isinstance(r.stdout, bytes) f.write(r.stdout) assert r.returncode == 0 assert_track(tmp_path, "track.flac", "copy", "saves", None) -def test_flac(tmp_path: Path): +def test_flac(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -81,7 +86,7 @@ def test_flac(tmp_path: Path): assert_track(tmp_path, "track.flac", "copy", "saves", None) -def test_m4a(tmp_path: Path): +def test_m4a(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -92,8 +97,8 @@ def test_m4a(tmp_path: Path): "--opus", ) assert r.returncode == 0 - if (tmp_path / 'track.opus').exists(): - pytest.skip('No go+ subscription') + if (tmp_path / "track.opus").exists(): + pytest.skip("No go+ subscription") assert_track( tmp_path, "track.m4a", @@ -104,7 +109,7 @@ def test_m4a(tmp_path: Path): ) -def test_opus(tmp_path: Path): +def test_opus(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -118,7 +123,7 @@ def test_opus(tmp_path: Path): assert_track(tmp_path, "track.opus") -def test_mp3(tmp_path: Path): +def test_mp3(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -131,7 +136,7 @@ def test_mp3(tmp_path: Path): assert_track(tmp_path, "track.mp3") -def test_unlisted_track(tmp_path: Path): +def test_unlisted_track(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -144,7 +149,7 @@ def test_unlisted_track(tmp_path: Path): assert_track(tmp_path, "track.mp3", "test track 2") -def test_original_art(tmp_path: Path): +def test_original_art(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -158,7 +163,7 @@ def test_original_art(tmp_path: Path): assert_track(tmp_path, "track.mp3", expected_artwork_len=3409) -def test_original_name(tmp_path: Path): +def test_original_name(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -171,7 +176,7 @@ def test_original_name(tmp_path: Path): assert_track(tmp_path, "original.wav", check_metadata=False) -def test_original_metadata(tmp_path: Path): +def test_original_metadata(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -184,7 +189,7 @@ def test_original_metadata(tmp_path: Path): assert_track(tmp_path, "track.wav", "og title", "og artist", "og genre", 0) -def test_force_metadata(tmp_path: Path): +def test_force_metadata(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -207,7 +212,7 @@ def test_force_metadata(tmp_path: Path): assert_track(tmp_path, "track.wav", "copy", "saves", None) -def test_addtimestamp(tmp_path: Path): +def test_addtimestamp(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -219,7 +224,7 @@ def test_addtimestamp(tmp_path: Path): assert_track(tmp_path, "1719169486_testing - test track.mp3", check_metadata=False) -def test_addtofile(tmp_path: Path): +def test_addtofile(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -231,7 +236,7 @@ def test_addtofile(tmp_path: Path): assert_track(tmp_path, "7x11x13-testing - test track 2.mp3", check_metadata=False) -def test_extract_artist(tmp_path: Path): +def test_extract_artist(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -245,7 +250,7 @@ def test_extract_artist(tmp_path: Path): assert_track(tmp_path, "track.mp3", "test track", "testing") -def test_maxsize(tmp_path: Path): +def test_maxsize(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -254,10 +259,10 @@ def test_maxsize(tmp_path: Path): "--max-size=10kb", ) assert r.returncode == 1 - assert "not within --min-size and --max-size bounds" in r.stderr + assert "not within --min-size=0 and --max-size=10240" in r.stderr -def test_minsize(tmp_path: Path): +def test_minsize(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -266,10 +271,10 @@ def test_minsize(tmp_path: Path): "--min-size=1mb", ) assert r.returncode == 1 - assert "not within --min-size and --max-size bounds" in r.stderr + assert f"not within --min-size={1024**2} and --max-size={math.inf}" in r.stderr -def test_only_original(tmp_path: Path): +def test_only_original(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -280,7 +285,7 @@ def test_only_original(tmp_path: Path): assert "does not have original file available" in r.stderr -def test_overwrite(tmp_path: Path): +def test_overwrite(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -312,7 +317,7 @@ def test_overwrite(tmp_path: Path): assert r.returncode == 0 -def test_path(tmp_path: Path): +def test_path(tmp_path: Path) -> None: r = call_scdl_with_auth( "-l", "https://soundcloud.com/one-thousand-and-one/test-track", @@ -326,7 +331,7 @@ def test_path(tmp_path: Path): assert_track(tmp_path, "track.mp3", check_metadata=False) -def test_remove(tmp_path: Path): +def test_remove(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -350,7 +355,7 @@ def test_remove(tmp_path: Path): assert_not_track(tmp_path, "track.mp3") -def test_download_archive(tmp_path: Path): +def test_download_archive(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", diff --git a/tests/test_user.py b/tests/test_user.py index ebbb0739..e6e8b3b1 100644 --- a/tests/test_user.py +++ b/tests/test_user.py @@ -4,11 +4,11 @@ from tests.utils import assert_track, call_scdl_with_auth -def count_files(dir: Path): - return len(list(dir.rglob("*"))) +def count_files(folder: Path) -> int: + return len(list(folder.rglob("*"))) -def test_all(tmp_path: Path): +def test_all(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -22,7 +22,7 @@ def test_all(tmp_path: Path): assert count_files(tmp_path) == 3 -def test_tracks(tmp_path: Path): +def test_tracks(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -36,7 +36,7 @@ def test_tracks(tmp_path: Path): assert count_files(tmp_path) == 1 -def test_likes(tmp_path: Path): +def test_likes(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -46,13 +46,11 @@ def test_likes(tmp_path: Path): "--name-format={title}", ) assert r.returncode == 0 - assert_track( - tmp_path, "Wan Bushi - Eurodance Vibes (part 1+2+3).mp3", check_metadata=False - ) + assert_track(tmp_path, "Wan Bushi - Eurodance Vibes (part 1+2+3).mp3", check_metadata=False) assert count_files(tmp_path) == 1 -def test_commented(tmp_path: Path): +def test_commented(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -62,13 +60,11 @@ def test_commented(tmp_path: Path): "--name-format={title}", ) assert r.returncode == 0 - assert_track( - tmp_path, "Wan Bushi - Eurodance Vibes (part 1+2+3).mp3", check_metadata=False - ) + assert_track(tmp_path, "Wan Bushi - Eurodance Vibes (part 1+2+3).mp3", check_metadata=False) assert count_files(tmp_path) == 1 -def test_playlists(tmp_path: Path): +def test_playlists(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", @@ -80,16 +76,15 @@ def test_playlists(tmp_path: Path): assert count_files(tmp_path) == 3 -def test_reposts(tmp_path: Path): +def test_reposts(tmp_path: Path) -> None: os.chdir(tmp_path) r = call_scdl_with_auth( "-l", "https://soundcloud.com/one-thousand-and-one", "-r", "--name-format={title}", + "--onlymp3", ) assert r.returncode == 0 - assert_track( - tmp_path, "Wan Bushi - Eurodance Vibes (part 1+2+3).mp3", check_metadata=False - ) + assert_track(tmp_path, "Wan Bushi - Eurodance Vibes (part 1+2+3).mp3", check_metadata=False) assert count_files(tmp_path) == 1 diff --git a/tests/utils.py b/tests/utils.py index 5308461f..48fe3167 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -3,22 +3,26 @@ from pathlib import Path from typing import Optional -import music_tag +import music_tag # type: ignore[import] from soundcloud import SoundCloud client_id = SoundCloud().client_id -def call_scdl_with_auth(*args, encoding: Optional[str] = 'utf-8') -> subprocess.CompletedProcess[str]: +def call_scdl_with_auth( + *args: str, + encoding: Optional[str] = "utf-8", +) -> subprocess.CompletedProcess: auth_token = os.getenv("AUTH_TOKEN") assert auth_token - args = ( - ["scdl"] - + list(args) - + [f"--auth-token={auth_token}", f"--client-id={client_id}"] + args = ("scdl", *args, f"--auth-token={auth_token}", f"--client-id={client_id}") + return subprocess.run( + args, + capture_output=True, + encoding=encoding, + errors="ignore" if encoding is not None else None, + check=False, ) - return subprocess.run(args, capture_output=True, encoding=encoding, - errors='ignore' if encoding is not None else None) def assert_track( @@ -27,12 +31,12 @@ def assert_track( expected_title: str = "testing - test track", expected_artist: str = "7x11x13-testing", expected_genre: Optional[str] = "Testing", - expected_artwork_len: int = 16136, + expected_artwork_len: Optional[int] = 16136, expected_album: Optional[str] = None, expected_albumartist: Optional[str] = None, expected_tracknumber: Optional[int] = None, check_metadata: bool = True, -): +) -> None: file = tmp_path / expected_name assert file.exists() @@ -55,6 +59,6 @@ def assert_track( assert f["tracknumber"].value == expected_tracknumber -def assert_not_track(tmp_path: Path, expected_name: str): +def assert_not_track(tmp_path: Path, expected_name: str) -> None: file = tmp_path / expected_name assert not file.exists()