diff --git a/mutagen/aiff.py b/mutagen/aiff.py index a4a0eb39..8d797ed8 100644 --- a/mutagen/aiff.py +++ b/mutagen/aiff.py @@ -19,8 +19,14 @@ from mutagen.id3 import ID3 from mutagen.id3._util import ID3NoHeaderError, error as ID3Error -from mutagen._util import resize_bytes, delete_bytes, MutagenError, loadfile, \ - convert_error +from mutagen._util import ( + MutagenError, + convert_error, + delete_bytes, + insert_bytes, + loadfile, + resize_bytes, +) __all__ = ["AIFF", "Open", "delete"] @@ -74,33 +80,44 @@ class IFFChunk(object): # Chunk headers are 8 bytes long (4 for ID and 4 for the size) HEADER_SIZE = 8 - def __init__(self, fileobj, parent_chunk=None): - self.__fileobj = fileobj - self.parent_chunk = parent_chunk - self.offset = fileobj.tell() + @classmethod + def parse(cls, fileobj, parent_chunk=None): + header = fileobj.read(cls.HEADER_SIZE) + if len(header) < cls.HEADER_SIZE: + raise InvalidChunk('Header size < %i' % cls.HEADER_SIZE) - header = fileobj.read(self.HEADER_SIZE) - if len(header) < self.HEADER_SIZE: - raise InvalidChunk() + id, data_size = struct.unpack('>4sI', header) + try: + id = id.decode('ascii').rstrip() + except UnicodeDecodeError as e: + raise InvalidChunk(e) - self.id, self.data_size = struct.unpack('>4si', header) - self.data_offset = fileobj.tell() + if not is_valid_chunk_id(id): + raise InvalidChunk('Invalid chunk ID %s' % id) - try: - self.id = self.id.decode('ascii') - except UnicodeDecodeError: - raise InvalidChunk() + return cls.get_class(id)(fileobj, id, data_size, parent_chunk) - if not is_valid_chunk_id(self.id): - raise InvalidChunk() + @classmethod + def get_class(cls, id): + if id == 'FORM': + return FormIFFChunk + else: + return cls + def __init__(self, fileobj, id, data_size, parent_chunk): + self._fileobj = fileobj + self.id = id + self.data_size = data_size + self.parent_chunk = parent_chunk + self.data_offset = fileobj.tell() + self.offset = self.data_offset - self.HEADER_SIZE self._calculate_size() def read(self): """Read the chunks data""" - self.__fileobj.seek(self.data_offset) - return self.__fileobj.read(self.data_size) + self._fileobj.seek(self.data_offset) + return self._fileobj.read(self.data_size) def write(self, data): """Write the chunk data""" @@ -108,35 +125,34 @@ def write(self, data): if len(data) > self.data_size: raise ValueError - self.__fileobj.seek(self.data_offset) - self.__fileobj.write(data) + self._fileobj.seek(self.data_offset) + self._fileobj.write(data) # Write the padding bytes padding = self.padding() if padding: - self.__fileobj.seek(self.data_offset + self.data_size + 1) - self.__fileobj.write(b'\x00' * padding) + self._fileobj.seek(self.data_offset + self.data_size + 1) + self._fileobj.write(b'\x00' * padding) def delete(self): """Removes the chunk from the file""" - delete_bytes(self.__fileobj, self.size, self.offset) + delete_bytes(self._fileobj, self.size, self.offset) if self.parent_chunk is not None: - self.parent_chunk._update_size( - self.parent_chunk.data_size - self.size) + self.parent_chunk._remove_subchunk(self) - def _update_size(self, data_size): + def _update_size(self, size_diff, changed_subchunk=None): """Update the size of the chunk""" - self.__fileobj.seek(self.offset + 4) - self.__fileobj.write(pack('>I', data_size)) - if self.parent_chunk is not None: - new_padding = data_size % 2 - size_diff = (self.data_size + self.padding()) \ - - (data_size + new_padding) - self.parent_chunk._update_size( - self.parent_chunk.data_size - size_diff) - self.data_size = data_size + old_size = self.size + self.data_size += size_diff + self._fileobj.seek(self.offset + 4) + self._fileobj.write(pack('>I', self.data_size)) self._calculate_size() + if self.parent_chunk is not None: + self.parent_chunk._update_size(self.size - old_size, self) + if changed_subchunk: + self._update_sibling_offsets( + changed_subchunk, old_size - self.size) def _calculate_size(self): self.size = self.HEADER_SIZE + self.data_size + self.padding() @@ -146,9 +162,10 @@ def resize(self, new_data_size): """Resize the file and update the chunk sizes""" padding = new_data_size % 2 - resize_bytes(self.__fileobj, self.data_size + self.padding(), + resize_bytes(self._fileobj, self.data_size + self.padding(), new_data_size + padding, self.data_offset) - self._update_size(new_data_size) + size_diff = new_data_size - self.data_size + self._update_size(size_diff) def padding(self): """Returns the number of padding bytes (0 or 1). @@ -158,77 +175,147 @@ def padding(self): return self.data_size % 2 +class FormIFFChunk(IFFChunk): + """A IFF chunk containing other chunks. + This is either a 'LIST' or 'RIFF' + """ + + MIN_DATA_SIZE = 4 + + def __init__(self, fileobj, id, data_size, parent_chunk): + if id != u'FORM': + raise InvalidChunk('Expected FORM chunk, got %s' % id) + + IFFChunk.__init__(self, fileobj, id, data_size, parent_chunk) + + # Lists always store an addtional identifier as 4 bytes + if data_size < self.MIN_DATA_SIZE: + raise InvalidChunk('FORM data size < %i' % self.MIN_DATA_SIZE) + + # Read the FORM id (usually AIFF) + try: + self.name = fileobj.read(4).decode('ascii') + except UnicodeDecodeError as e: + raise error(e) + + # Load all IFF subchunks + self.__subchunks = [] + + def subchunks(self): + """Returns a list of all subchunks. + The list is lazily loaded on first access. + """ + if not self.__subchunks: + next_offset = self.data_offset + 4 + while next_offset < self.offset + self.size: + self._fileobj.seek(next_offset) + try: + chunk = IFFChunk.parse(self._fileobj, self) + except InvalidChunk: + break + self.__subchunks.append(chunk) + + # Calculate the location of the next chunk + next_offset = chunk.offset + chunk.size + return self.__subchunks + + def insert_chunk(self, id_, data=None): + """Insert a new chunk at the end of the FORM chunk""" + + assert isinstance(id_, text_type) + + if not is_valid_chunk_id(id_): + raise KeyError("Invalid IFF key.") + + next_offset = self.offset + self.size + size = self.HEADER_SIZE + data_size = 0 + if data: + data_size = len(data) + padding = data_size % 2 + size += data_size + padding + insert_bytes(self._fileobj, size, next_offset) + self._fileobj.seek(next_offset) + self._fileobj.write( + pack('>4si', id_.ljust(4).encode('ascii'), data_size)) + self._fileobj.seek(next_offset) + chunk = IFFChunk.parse(self._fileobj, self) + self._update_size(chunk.size) + if data: + chunk.write(data) + self.subchunks().append(chunk) + return chunk + + def _remove_subchunk(self, chunk): + assert chunk in self.__subchunks + self._update_size(-chunk.size, chunk) + self.__subchunks.remove(chunk) + + def _update_sibling_offsets(self, changed_subchunk, size_diff): + """Update the offsets of subchunks after `changed_subchunk`. + """ + index = self.__subchunks.index(changed_subchunk) + sibling_chunks = self.__subchunks[index + 1:len(self.__subchunks)] + for sibling in sibling_chunks: + sibling.offset -= size_diff + sibling.data_offset -= size_diff + + class IFFFile(object): """Representation of a IFF file""" def __init__(self, fileobj): - self.__fileobj = fileobj - self.__chunks = {} - # AIFF Files always start with the FORM chunk which contains a 4 byte # ID before the start of other chunks fileobj.seek(0) - self.__chunks[u'FORM'] = IFFChunk(fileobj) - - # Skip past the 4 byte FORM id - fileobj.seek(IFFChunk.HEADER_SIZE + 4) + self.root = IFFChunk.parse(fileobj) - # Where the next chunk can be located. We need to keep track of this - # since the size indicated in the FORM header may not match up with the - # offset determined from the size of the last chunk in the file - self.__next_offset = fileobj.tell() - - # Load all of the chunks - while True: - try: - chunk = IFFChunk(fileobj, self[u'FORM']) - except InvalidChunk: - break - self.__chunks[chunk.id.strip()] = chunk - - # Calculate the location of the next chunk, - # considering the pad byte - self.__next_offset = chunk.offset + chunk.size - fileobj.seek(self.__next_offset) + if self.root.id != u'FORM': + raise InvalidChunk("Root chunk must be a RIFF chunk, got %s" + % self.root.id) def __contains__(self, id_): """Check if the IFF file contains a specific chunk""" assert_valid_chunk_id(id_) - - return id_ in self.__chunks + try: + self[id_] + return True + except KeyError: + return False def __getitem__(self, id_): """Get a chunk from the IFF file""" assert_valid_chunk_id(id_) - - try: - return self.__chunks[id_] - except KeyError: - raise KeyError( - "%r has no %r chunk" % (self.__fileobj, id_)) + if id_ == 'FORM': # For backwards compatibility + return self.root + found_chunk = None + for chunk in self.root.subchunks(): + if chunk.id == id_: + found_chunk = chunk + break + else: + raise KeyError("No %r chunk found" % id_) + return found_chunk def __delitem__(self, id_): """Remove a chunk from the IFF file""" assert_valid_chunk_id(id_) + self.delete_chunk(id_) - self.__chunks.pop(id_).delete() - - def insert_chunk(self, id_): - """Insert a new chunk at the end of the IFF file""" + def delete_chunk(self, id_): + """Remove a chunk from the RIFF file""" assert_valid_chunk_id(id_) + self[id_].delete() - self.__fileobj.seek(self.__next_offset) - self.__fileobj.write(pack('>4si', id_.ljust(4).encode('ascii'), 0)) - self.__fileobj.seek(self.__next_offset) - chunk = IFFChunk(self.__fileobj, self[u'FORM']) - self[u'FORM']._update_size(self[u'FORM'].data_size + chunk.size) + def insert_chunk(self, id_, data=None): + """Insert a new chunk at the end of the IFF file""" - self.__chunks[id_] = chunk - self.__next_offset = chunk.offset + chunk.size + assert_valid_chunk_id(id_) + return self.root.insert_chunk(id_, data) class AIFFInfo(StreamInfo): diff --git a/tests/test_aiff.py b/tests/test_aiff.py index 17f9c101..c5382406 100644 --- a/tests/test_aiff.py +++ b/tests/test_aiff.py @@ -233,8 +233,8 @@ def test_FORM_chunk_resize(self): self.iff_1_tmp[u'FORM'].resize(17000) self.failUnlessEqual( IFFFile(self.file_1_tmp)[u'FORM'].data_size, 17000) - self.iff_2_tmp[u'FORM'].resize(0) - self.failUnlessEqual(IFFFile(self.file_2_tmp)[u'FORM'].data_size, 0) + self.iff_2_tmp[u'FORM'].resize(4) + self.failUnlessEqual(IFFFile(self.file_2_tmp)[u'FORM'].data_size, 4) def test_child_chunk_resize(self): self.iff_1_tmp[u'ID3'].resize(128) @@ -277,27 +277,25 @@ def test_insert_padded_chunks(self): self.failUnlessEqual(0, self.iff_2_tmp[u'TST2'].padding()) def test_delete_padded_chunks(self): - self.iff_2_tmp.insert_chunk(u'TST1') + iff_file = self.iff_2_tmp + iff_file.insert_chunk(u'TST1') # Resize to odd length, should insert 1 padding byte - self.iff_2_tmp[u'TST1'].resize(3) + iff_file[u'TST1'].resize(3) # Insert another chunk after the first one - new_iff = IFFFile(self.file_2_tmp) - new_iff.insert_chunk(u'TST2') - new_iff[u'TST2'].resize(2) - new_iff = IFFFile(self.file_2_tmp) - self.failUnlessEqual(new_iff[u'FORM'].size, 16076) - self.failUnlessEqual(new_iff[u'FORM'].data_size, 16068) - self.failUnlessEqual(new_iff[u'TST1'].size, 12) - self.failUnlessEqual(new_iff[u'TST1'].data_size, 3) - self.failUnlessEqual(new_iff[u'TST1'].data_offset, 16062) - self.failUnlessEqual(new_iff[u'TST2'].size, 10) - self.failUnlessEqual(new_iff[u'TST2'].data_size, 2) - self.failUnlessEqual(new_iff[u'TST2'].data_offset, 16074) + iff_file.insert_chunk(u'TST2') + iff_file[u'TST2'].resize(2) + self.failUnlessEqual(iff_file[u'FORM'].size, 16076) + self.failUnlessEqual(iff_file[u'FORM'].data_size, 16068) + self.failUnlessEqual(iff_file[u'TST1'].size, 12) + self.failUnlessEqual(iff_file[u'TST1'].data_size, 3) + self.failUnlessEqual(iff_file[u'TST1'].data_offset, 16062) + self.failUnlessEqual(iff_file[u'TST2'].size, 10) + self.failUnlessEqual(iff_file[u'TST2'].data_size, 2) + self.failUnlessEqual(iff_file[u'TST2'].data_offset, 16074) # Delete the odd chunk - del new_iff[u'TST1'] - new_iff = IFFFile(self.file_2_tmp) - self.failUnlessEqual(new_iff[u'FORM'].size, 16064) - self.failUnlessEqual(new_iff[u'FORM'].data_size, 16056) - self.failUnlessEqual(new_iff[u'TST2'].size, 10) - self.failUnlessEqual(new_iff[u'TST2'].data_size, 2) - self.failUnlessEqual(new_iff[u'TST2'].data_offset, 16062) + iff_file.delete_chunk(u'TST1') + self.failUnlessEqual(iff_file[u'FORM'].size, 16064) + self.failUnlessEqual(iff_file[u'FORM'].data_size, 16056) + self.failUnlessEqual(iff_file[u'TST2'].size, 10) + self.failUnlessEqual(iff_file[u'TST2'].data_size, 2) + self.failUnlessEqual(iff_file[u'TST2'].data_offset, 16062)