diff --git a/.gitignore b/.gitignore index fbffe1b0..7318eca6 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ coverage .cache *.egg-info .coverage +*.iml .idea .hypothesis .pytest_cache diff --git a/mutagen/_file.py b/mutagen/_file.py index e4535a18..bd89f18b 100644 --- a/mutagen/_file.py +++ b/mutagen/_file.py @@ -267,10 +267,11 @@ def File(filething, options=None, easy=False): from mutagen.smf import SMF from mutagen.tak import TAK from mutagen.dsf import DSF + from mutagen.wave import WAVE options = [MP3, TrueAudio, OggTheora, OggSpeex, OggVorbis, OggFLAC, FLAC, AIFF, APEv2File, MP4, ID3FileType, WavPack, Musepack, MonkeysAudio, OptimFROG, ASF, OggOpus, AAC, AC3, - SMF, TAK, DSF] + SMF, TAK, DSF, WAVE] if not options: return None diff --git a/mutagen/_riff.py b/mutagen/_riff.py new file mode 100644 index 00000000..f39c351b --- /dev/null +++ b/mutagen/_riff.py @@ -0,0 +1,235 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2017 Borewit +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +"""Resource Interchange File Format (RIFF).""" + +import struct +from abc import abstractmethod +from struct import pack + +from ._compat import text_type + +from mutagen._util import resize_bytes, delete_bytes, MutagenError + + +class error(MutagenError): + pass + + +class InvalidChunk(error): + pass + + +def is_valid_chunk_id(id): + """ is_valid_chunk_id(FOURCC) + + Arguments: + id (FOURCC) + Returns: + true if valid; otherwise false + + Check if argument id is valid FOURCC type. + """ + + assert isinstance(id, text_type) + + if len(id) != 4: + return False + + for i in range(0, 3): + if id[i] < u' ' or id[i] > u'~': + return False + + return True + + +# Assert FOURCC formatted valid +def assert_valid_chunk_id(id): + if not is_valid_chunk_id(id): + raise ValueError("RIFF-chunk-ID must be four ASCII characters.") + + +class _ChunkHeader(): + """ Abstract common RIFF chunk header""" + + # Chunk headers are 8 bytes long (4 for ID and 4 for the size) + HEADER_SIZE = 8 + + @property + @abstractmethod + def _struct(self): + """ must be implemented in order to instantiate """ + return 'xxxx' + + def __init__(self, fileobj, parent_chunk): + self.__fileobj = fileobj + self.parent_chunk = parent_chunk + self.offset = fileobj.tell() + + header = fileobj.read(self.HEADER_SIZE) + if len(header) < self.HEADER_SIZE: + raise InvalidChunk() + + self.id, self.data_size = struct.unpack(self._struct, header) + + try: + self.id = self.id.decode('ascii') + except UnicodeDecodeError: + raise InvalidChunk() + + if not is_valid_chunk_id(self.id): + raise InvalidChunk() + + self.size = self.HEADER_SIZE + self.data_size + self.data_offset = fileobj.tell() + + def read(self): + """Read the chunks data""" + + self.__fileobj.seek(self.data_offset) + return self.__fileobj.read(self.data_size) + + def write(self, data): + """Write the chunk data""" + + if len(data) > self.data_size: + raise ValueError + + self.__fileobj.seek(self.data_offset) + self.__fileobj.write(data) + + def delete(self): + """Removes the chunk from the file""" + + delete_bytes(self.__fileobj, self.size, self.offset) + if self.parent_chunk is not None: + self.parent_chunk._update_size( + self.parent_chunk.data_size - self.size) + + def _update_size(self, data_size): + """Update the size of the chunk""" + + self.__fileobj.seek(self.offset + 4) + self.__fileobj.write(pack('>I', data_size)) + if self.parent_chunk is not None: + size_diff = self.data_size - data_size + self.parent_chunk._update_size( + self.parent_chunk.data_size - size_diff) + self.data_size = data_size + self.size = data_size + self.HEADER_SIZE + + def resize(self, new_data_size): + """Resize the file and update the chunk sizes""" + + resize_bytes( + self.__fileobj, self.data_size, new_data_size, self.data_offset) + self._update_size(new_data_size) + + +class RiffChunkHeader(_ChunkHeader): + """Representation of the RIFF chunk header""" + + @property + def _struct(self): + return '>4sI' # Size in Big-Endian + + def __init__(self, fileobj, parent_chunk=None): + _ChunkHeader.__init__(self, fileobj, parent_chunk) + + +class RiffSubchunk(_ChunkHeader): + """Representation of a RIFF Subchunk""" + + @property + def _struct(self): + return '<4sI' # Size in Little-Endian + + def __init__(self, fileobj, parent_chunk=None): + _ChunkHeader.__init__(self, fileobj, parent_chunk) + + +class RiffFile(object): + """Representation of a RIFF file + + Ref: http://www.johnloomis.org/cpe102/asgn/asgn1/riff.html + """ + + def __init__(self, fileobj): + self._fileobj = fileobj + self.__subchunks = {} + + # Reset read pointer to beginning of RIFF file + fileobj.seek(0) + + # RIFF Files always start with the RIFF chunk + self._riffChunk = RiffChunkHeader(fileobj) + + if (self._riffChunk.id != 'RIFF'): + raise KeyError("Root chunk should be a RIFF chunk.") + + # Read the RIFF file Type + self.fileType = fileobj.read(4).decode('ascii') + + # Load all RIFF subchunks + while True: + try: + chunk = RiffSubchunk(fileobj, self._riffChunk) + except InvalidChunk: + break + # Normalize ID3v2-tag-chunk to lowercase + if chunk.id == 'ID3 ': + chunk.id = 'id3 ' + self.__subchunks[chunk.id] = chunk + + # Calculate the location of the next chunk, + # considering the pad byte + self.__next_offset = chunk.offset + chunk.size + self.__next_offset += self.__next_offset % 2 + fileobj.seek(self.__next_offset) + + def __contains__(self, id_): + """Check if the IFF file contains a specific chunk""" + + assert_valid_chunk_id(id_) + + return id_ in self.__subchunks + + def __getitem__(self, id_): + """Get a chunk from the IFF file""" + + assert_valid_chunk_id(id_) + + try: + return self.__subchunks[id_] + except KeyError: + raise KeyError( + "%r has no %r chunk" % (self._fileobj, id_)) + + def __delitem__(self, id_): + """Remove a chunk from the IFF file""" + + assert_valid_chunk_id(id_) + + self.__subchunks.pop(id_).delete() + + def insert_chunk(self, id_): + """Insert a new chunk at the end of the IFF file""" + + assert isinstance(id_, text_type) + + if not is_valid_chunk_id(id_): + raise KeyError("RIFF key must be four ASCII characters.") + + self.fileobj.seek(self.__next_offset) + self.fileobj.write(pack('>4si', id_.ljust(4).encode('ascii'), 0)) + self.fileobj.seek(self.__next_offset) + chunk = RiffChunkHeader(self.fileobj, self[u'RIFF']) + self[u'RIFF']._update_size(self[u'RIFF'].data_size + chunk.size) + + self.__subchunks[id_] = chunk + self.__next_offset = chunk.offset + chunk.size diff --git a/mutagen/wave.py b/mutagen/wave.py new file mode 100644 index 00000000..3e899380 --- /dev/null +++ b/mutagen/wave.py @@ -0,0 +1,219 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2017 Borewit +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +"""Microsoft WAVE/RIFF audio file/stream information and tags.""" + +import sys +import struct + +from ._compat import endswith, reraise + +from mutagen import StreamInfo, FileType + +from mutagen.id3 import ID3 +from mutagen._riff import RiffFile, InvalidChunk, error +from mutagen.id3._util import ID3NoHeaderError, error as ID3Error +from mutagen._util import loadfile, \ + convert_error, MutagenError + +__all__ = ["WAVE", "Open", "delete"] + + +class error(MutagenError): + """WAVE stream parsing errors.""" + + +class WaveFile(RiffFile): + """Representation of a RIFF/WAVE file""" + + def __init__(self, fileobj): + RiffFile.__init__(self, fileobj) + + if self.fileType != u'WAVE': + raise error("Expected RIFF/WAVE.") + + +class WaveStreamInfo(StreamInfo): + """WaveStreamInfo() + + Microsoft WAVE file information. + + Information is parsed from the 'fmt ' & 'data'chunk of the RIFF/WAVE file + + Attributes: + length (`float`): audio length, in seconds + bitrate (`int`): audio bitrate, in bits per second + channels (`int`): The number of audio channels + sample_rate (`int`): audio sample rate, in Hz + sample_size (`int`): The audio sample size + """ + + length = 0 + bitrate = 0 + channels = 0 + sample_rate = 0 + + SIZE = 16 + + @convert_error(IOError, error) + def __init__(self, fileobj): + """Raises error""" + + waveFile = WaveFile(fileobj) + try: + waveFormatChunk = waveFile[u'fmt '] + except KeyError as e: + raise error(str(e)) + + data = waveFormatChunk.read() + + header = fileobj.read(self.SIZE) + if len(header) < self.SIZE: + raise InvalidChunk() + + # RIFF: http://soundfile.sapp.org/doc/WaveFormat/ + # Python struct.unpack: + # https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment + info = struct.unpack(' 0: + self.length = self.number_of_samples / self.sample_rate + + def pprint(self): + return u"%d channel AIFF @ %d bps, %s Hz, %.2f seconds" % ( + self.channels, self.bitrate, self.sample_rate, self.length) + + +class _WaveID3(ID3): + """A Wave file with ID3v2 tags""" + + print("RIFF/WAVE_WaveID3(ID3)") + + def _pre_load_header(self, fileobj): + try: + fileobj.seek(WaveFile(fileobj)[u'id3 '].data_offset) + except (InvalidChunk, KeyError): + raise ID3NoHeaderError("No ID3 chunk") + + @convert_error(IOError, error) + @loadfile(writable=True) + def save(self, filething, v1=1, v2_version=4, v23_sep='/', padding=None): + """Save ID3v2 data to the Wave/RIFF file""" + + fileobj = filething.fileobj + + wave_file = WaveFile(fileobj) + + if 'id3 ' not in wave_file: + wave_file.insert_chunk(u'id3 ') + + chunk = wave_file[u'id3 '] + + try: + data = self._prepare_data( + fileobj, chunk.data_offset, chunk.data_size, v2_version, + v23_sep, padding) + except ID3Error as e: + reraise(error, e, sys.exc_info()[2]) + + chunk.resize(len(data)) + chunk.write(data) + + @loadfile(writable=True) + def delete(self, filething): + """Completely removes the ID3 chunk from the RIFF/WAVE file""" + + fileobj = filething.fileobj + + waveFile = WaveFile(fileobj) + + if 'id3 ' in waveFile: + try: + waveFile['id3 '].delete() + except ValueError: + pass + + self.clear() + + +@convert_error(IOError, error) +@loadfile(method=False, writable=True) +def delete(filething): + """Completely removes the ID3 chunk from the RIFF file""" + + try: + del RiffFile(filething.fileobj)[u'id3 '] + except KeyError: + pass + + +class WAVE(FileType): + """WAVE(filething) + + A Waveform Audio File Format + (WAVE, or more commonly known as WAV due to its filename extension) + + Arguments: + filething (filething) + + Attributes: + tags (`mutagen.id3.ID3`) + info (`WaveStreamInfo`) + """ + + _mimes = ["audio/wav", "audio/wave"] + + @staticmethod + def score(filename, fileobj, header): + filename = filename.lower() + + return (header.startswith(b"RIFF") * 2 + endswith(filename, b".wav") + + endswith(filename, b".wave")) + + def add_tags(self): + """Add an empty ID3 tag to the file.""" + if self.tags is None: + self.tags = _WaveID3() + else: + raise error("an ID3 tag already exists") + + @convert_error(IOError, error) + @loadfile() + def load(self, filething, **kwargs): + """Load stream and tag information from a file.""" + + fileobj = filething.fileobj + + try: + self.info = WaveStreamInfo(fileobj) + except ValueError as e: + raise error(e) + + fileobj.seek(0, 0) + + try: + self.tags = _WaveID3(fileobj, **kwargs) + except ID3NoHeaderError: + self.tags = None + except ID3Error as e: + raise error(e) + else: + self.tags.filename = self.filename + + +Open = WAVE diff --git a/tests/data/silence-2s-PCM-16000-08-ID3v23.wav b/tests/data/silence-2s-PCM-16000-08-ID3v23.wav new file mode 100644 index 00000000..095c93fb Binary files /dev/null and b/tests/data/silence-2s-PCM-16000-08-ID3v23.wav differ diff --git a/tests/data/silence-2s-PCM-16000-08-notags.wav b/tests/data/silence-2s-PCM-16000-08-notags.wav new file mode 100644 index 00000000..722b4e29 Binary files /dev/null and b/tests/data/silence-2s-PCM-16000-08-notags.wav differ diff --git a/tests/data/silence-2s-PCM-44100-16-ID3v23.wav b/tests/data/silence-2s-PCM-44100-16-ID3v23.wav new file mode 100644 index 00000000..e75cc8c8 Binary files /dev/null and b/tests/data/silence-2s-PCM-44100-16-ID3v23.wav differ diff --git a/tests/test___init__.py b/tests/test___init__.py index 0ac6528d..7c4a6b1a 100644 --- a/tests/test___init__.py +++ b/tests/test___init__.py @@ -35,6 +35,7 @@ from mutagen.smf import SMF from mutagen.tak import TAK from mutagen.dsf import DSF +from mutagen.wave import WAVE from os import devnull @@ -534,6 +535,11 @@ def test_dict(self): os.path.join(DATA_DIR, '5644800-2ch-s01-silence.dsf'), os.path.join(DATA_DIR, 'with-id3.dsf'), os.path.join(DATA_DIR, 'without-id3.dsf'), + ], + WAVE: [ + os.path.join(DATA_DIR, 'silence-2s-PCM-16000-08-ID3v23.wav'), + os.path.join(DATA_DIR, 'silence-2s-PCM-16000-08-ID3v23.wav'), + os.path.join(DATA_DIR, 'silence-2s-PCM-16000-08-notags.wav'), ] } diff --git a/tests/test_wave.py b/tests/test_wave.py new file mode 100644 index 00000000..104ed21b --- /dev/null +++ b/tests/test_wave.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- + +import os + +from mutagen.wave import WAVE +from tests import TestCase, DATA_DIR, get_temp_copy + + +class TWave(TestCase): + def setUp(self): + fn_wav_pcm_2s_16000_08_ID3v23 = \ + os.path.join(DATA_DIR, "silence-2s-PCM-16000-08-ID3v23.wav") + self.wav_pcm_2s_16000_08_ID3v23 = \ + WAVE(fn_wav_pcm_2s_16000_08_ID3v23) + + self.tmp_fn_pcm_2s_16000_08_ID3v23 = \ + get_temp_copy(fn_wav_pcm_2s_16000_08_ID3v23) + self.tmp_wav_pcm_2s_16000_08_ID3v23 = \ + WAVE(self.tmp_fn_pcm_2s_16000_08_ID3v23) + + fn_wav_pcm_2s_16000_08_notags = \ + os.path.join(DATA_DIR, "silence-2s-PCM-16000-08-notags.wav") + self.wav_pcm_2s_16000_08_notags = \ + WAVE(fn_wav_pcm_2s_16000_08_notags) + + self.tmp_fn_pcm_2s_16000_08_notag = \ + get_temp_copy(fn_wav_pcm_2s_16000_08_notags) + self.tmp_wav_pcm_2s_16000_08_notag = \ + WAVE(self.tmp_fn_pcm_2s_16000_08_notag) + + fn_wav_pcm_2s_44100_16_ID3v23 = \ + os.path.join(DATA_DIR, "silence-2s-PCM-44100-16-ID3v23.wav") + self.wav_pcm_2s_44100_16_ID3v23 = WAVE(fn_wav_pcm_2s_44100_16_ID3v23) + + def test_channels(self): + self.failUnlessEqual(self.wav_pcm_2s_16000_08_ID3v23.info.channels, 2) + self.failUnlessEqual(self.wav_pcm_2s_44100_16_ID3v23.info.channels, 2) + + def test_sample_rate(self): + self.failUnlessEqual(self.wav_pcm_2s_16000_08_ID3v23.info.sample_rate, + 16000) + self.failUnlessEqual(self.wav_pcm_2s_44100_16_ID3v23.info.sample_rate, + 44100) + + def test_number_of_samples(self): + self.failUnlessEqual(self.wav_pcm_2s_16000_08_ID3v23. + info.number_of_samples, 32000) + self.failUnlessEqual(self.wav_pcm_2s_44100_16_ID3v23. + info.number_of_samples, 88200) + + def test_length(self): + self.failUnlessAlmostEqual(self.wav_pcm_2s_16000_08_ID3v23.info.length, + 2.0, 2) + self.failUnlessAlmostEqual(self.wav_pcm_2s_44100_16_ID3v23.info.length, + 2.0, 2) + + def test_not_my_file(self): + self.failUnlessRaises( + KeyError, WAVE, os.path.join(DATA_DIR, "empty.ogg")) + + def test_pprint(self): + self.wav_pcm_2s_44100_16_ID3v23.pprint() + + def test_mime(self): + self.failUnless("audio/wav" in self.wav_pcm_2s_44100_16_ID3v23.mime) + self.failUnless("audio/wave" in self.wav_pcm_2s_44100_16_ID3v23.mime) + + def test_ID3_tags(self): + id3 = self.wav_pcm_2s_44100_16_ID3v23.tags + self.assertEquals(id3["TALB"], "Quod Libet Test Data") + self.assertEquals(id3["TCON"], "Silence") + self.assertEquals(id3["TIT2"], "Silence") + self.assertEquals(id3["TPE1"], ["piman / jzig"]) # ToDo: split on '/'? + + def test_delete(self): + self.tmp_wav_pcm_2s_16000_08_ID3v23.delete() + + self.failIf(self.tmp_wav_pcm_2s_16000_08_ID3v23.tags) + self.failUnless(WAVE(self.tmp_fn_pcm_2s_16000_08_ID3v23).tags is None) + + def test_save_no_tags(self): + self.tmp_wav_pcm_2s_16000_08_ID3v23.tags = None + self.tmp_wav_pcm_2s_16000_08_ID3v23.save() + self.assertTrue(self.tmp_wav_pcm_2s_16000_08_ID3v23.tags is None) + + def test_add_tags_already_there(self): + self.failUnless(self.tmp_wav_pcm_2s_16000_08_ID3v23.tags) + self.failUnlessRaises(Exception, + self.tmp_wav_pcm_2s_16000_08_ID3v23.add_tags) + + def test_roundtrip(self): + self.failUnlessEqual(self.tmp_wav_pcm_2s_16000_08_ID3v23["TIT2"], + ["Silence"]) + self.tmp_wav_pcm_2s_16000_08_ID3v23.save() + new = WAVE(self.tmp_wav_pcm_2s_16000_08_ID3v23.filename) + self.failUnlessEqual(new["TIT2"], ["Silence"]) + + def test_save_tags(self): + from mutagen.id3 import TIT1 + tags = self.tmp_wav_pcm_2s_16000_08_ID3v23.tags + tags.add(TIT1(encoding=3, text="foobar")) + tags.save() + + new = WAVE(self.tmp_wav_pcm_2s_16000_08_ID3v23.filename) + self.failUnlessEqual(new["TIT1"], ["foobar"]) + + def test_save_without_ID3_chunk(self): + from mutagen.id3 import TIT1 + self.tmp_wav_pcm_2s_16000_08_notag["TIT1"] = TIT1(encoding=3, + text="foobar") + self.tmp_wav_pcm_2s_16000_08_notag.save() + self.failUnless(WAVE(self.tmp_fn_pcm_2s_16000_08_notag)["TIT1"] + == "foobar")