Skip to content

Commit

Permalink
Merge pull request #409 from phw/fix/aiff-padding
Browse files Browse the repository at this point in the history
AIFF: Fix handling of padding bytes, safe chunk manipulation
  • Loading branch information
lazka authored Nov 17, 2019
2 parents 1c668ac + 715dc6f commit 3252c26
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 87 deletions.
274 changes: 189 additions & 85 deletions mutagen/aiff.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Evan Purkhiser
# 2014 Ben Ockmore
# 2019 Philipp Wolfer
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -18,8 +19,14 @@

from mutagen.id3 import ID3
from mutagen.id3._util import ID3NoHeaderError, error as ID3Error
from mutagen._util import resize_bytes, delete_bytes, MutagenError, loadfile, \
convert_error
from mutagen._util import (
MutagenError,
convert_error,
delete_bytes,
insert_bytes,
loadfile,
resize_bytes,
)

__all__ = ["AIFF", "Open", "delete"]

Expand Down Expand Up @@ -73,143 +80,245 @@ class IFFChunk(object):
# Chunk headers are 8 bytes long (4 for ID and 4 for the size)
HEADER_SIZE = 8

def __init__(self, fileobj, parent_chunk=None):
self.__fileobj = fileobj
self.parent_chunk = parent_chunk
self.offset = fileobj.tell()
@classmethod
def parse(cls, fileobj, parent_chunk=None):
header = fileobj.read(cls.HEADER_SIZE)
if len(header) < cls.HEADER_SIZE:
raise InvalidChunk('Header size < %i' % cls.HEADER_SIZE)

header = fileobj.read(self.HEADER_SIZE)
if len(header) < self.HEADER_SIZE:
raise InvalidChunk()
id, data_size = struct.unpack('>4sI', header)
try:
id = id.decode('ascii').rstrip()
except UnicodeDecodeError as e:
raise InvalidChunk(e)

self.id, self.data_size = struct.unpack('>4si', header)
if not is_valid_chunk_id(id):
raise InvalidChunk('Invalid chunk ID %s' % id)

try:
self.id = self.id.decode('ascii')
except UnicodeDecodeError:
raise InvalidChunk()
return cls.get_class(id)(fileobj, id, data_size, parent_chunk)

if not is_valid_chunk_id(self.id):
raise InvalidChunk()
@classmethod
def get_class(cls, id):
if id == 'FORM':
return FormIFFChunk
else:
return cls

self.size = self.HEADER_SIZE + self.data_size
def __init__(self, fileobj, id, data_size, parent_chunk):
self._fileobj = fileobj
self.id = id
self.data_size = data_size
self.parent_chunk = parent_chunk
self.data_offset = fileobj.tell()
self.offset = self.data_offset - self.HEADER_SIZE
self._calculate_size()

def read(self):
"""Read the chunks data"""

self.__fileobj.seek(self.data_offset)
return self.__fileobj.read(self.data_size)
self._fileobj.seek(self.data_offset)
return self._fileobj.read(self.data_size)

def write(self, data):
"""Write the chunk data"""

if len(data) > self.data_size:
raise ValueError

self.__fileobj.seek(self.data_offset)
self.__fileobj.write(data)
self._fileobj.seek(self.data_offset)
self._fileobj.write(data)
# Write the padding bytes
padding = self.padding()
if padding:
self._fileobj.seek(self.data_offset + self.data_size)
self._fileobj.write(b'\x00' * padding)

def delete(self):
"""Removes the chunk from the file"""

delete_bytes(self.__fileobj, self.size, self.offset)
delete_bytes(self._fileobj, self.size, self.offset)
if self.parent_chunk is not None:
self.parent_chunk._update_size(
self.parent_chunk.data_size - self.size)
self.parent_chunk._remove_subchunk(self)
self._fileobj.flush()

def _update_size(self, data_size):
def _update_size(self, size_diff, changed_subchunk=None):
"""Update the size of the chunk"""

self.__fileobj.seek(self.offset + 4)
self.__fileobj.write(pack('>I', data_size))
old_size = self.size
self.data_size += size_diff
self._fileobj.seek(self.offset + 4)
self._fileobj.write(pack('>I', self.data_size))
self._calculate_size()
if self.parent_chunk is not None:
size_diff = self.data_size - data_size
self.parent_chunk._update_size(
self.parent_chunk.data_size - size_diff)
self.data_size = data_size
self.size = data_size + self.HEADER_SIZE
self.parent_chunk._update_size(self.size - old_size, self)
if changed_subchunk:
self._update_sibling_offsets(
changed_subchunk, old_size - self.size)

def _calculate_size(self):
self.size = self.HEADER_SIZE + self.data_size + self.padding()
assert self.size % 2 == 0

def resize(self, new_data_size):
"""Resize the file and update the chunk sizes"""

resize_bytes(
self.__fileobj, self.data_size, new_data_size, self.data_offset)
self._update_size(new_data_size)
padding = new_data_size % 2
resize_bytes(self._fileobj, self.data_size + self.padding(),
new_data_size + padding, self.data_offset)
size_diff = new_data_size - self.data_size
self._update_size(size_diff)
self._fileobj.flush()

def padding(self):
"""Returns the number of padding bytes (0 or 1).
IFF chunks are required to be a even number in total length. If
data_size is odd a padding byte will be added at the end.
"""
return self.data_size % 2


class FormIFFChunk(IFFChunk):
"""A IFF chunk containing other chunks.
This is either a 'LIST' or 'RIFF'
"""

MIN_DATA_SIZE = 4

def __init__(self, fileobj, id, data_size, parent_chunk):
if id != u'FORM':
raise InvalidChunk('Expected FORM chunk, got %s' % id)

IFFChunk.__init__(self, fileobj, id, data_size, parent_chunk)

# Lists always store an addtional identifier as 4 bytes
if data_size < self.MIN_DATA_SIZE:
raise InvalidChunk('FORM data size < %i' % self.MIN_DATA_SIZE)

# Read the FORM id (usually AIFF)
try:
self.name = fileobj.read(4).decode('ascii')
except UnicodeDecodeError as e:
raise error(e)

# Load all IFF subchunks
self.__subchunks = []

def subchunks(self):
"""Returns a list of all subchunks.
The list is lazily loaded on first access.
"""
if not self.__subchunks:
next_offset = self.data_offset + 4
while next_offset < self.offset + self.size:
self._fileobj.seek(next_offset)
try:
chunk = IFFChunk.parse(self._fileobj, self)
except InvalidChunk:
break
self.__subchunks.append(chunk)

# Calculate the location of the next chunk
next_offset = chunk.offset + chunk.size
return self.__subchunks

def insert_chunk(self, id_, data=None):
"""Insert a new chunk at the end of the FORM chunk"""

assert isinstance(id_, text_type)

if not is_valid_chunk_id(id_):
raise KeyError("Invalid IFF key.")

next_offset = self.offset + self.size
size = self.HEADER_SIZE
data_size = 0
if data:
data_size = len(data)
padding = data_size % 2
size += data_size + padding
insert_bytes(self._fileobj, size, next_offset)
self._fileobj.seek(next_offset)
self._fileobj.write(
pack('>4si', id_.ljust(4).encode('ascii'), data_size))
self._fileobj.seek(next_offset)
chunk = IFFChunk.parse(self._fileobj, self)
self._update_size(chunk.size)
if data:
chunk.write(data)
self.subchunks().append(chunk)
self._fileobj.flush()
return chunk

def _remove_subchunk(self, chunk):
assert chunk in self.__subchunks
self._update_size(-chunk.size, chunk)
self.__subchunks.remove(chunk)

def _update_sibling_offsets(self, changed_subchunk, size_diff):
"""Update the offsets of subchunks after `changed_subchunk`.
"""
index = self.__subchunks.index(changed_subchunk)
sibling_chunks = self.__subchunks[index + 1:len(self.__subchunks)]
for sibling in sibling_chunks:
sibling.offset -= size_diff
sibling.data_offset -= size_diff


class IFFFile(object):
"""Representation of a IFF file"""

def __init__(self, fileobj):
self.__fileobj = fileobj
self.__chunks = {}

# AIFF Files always start with the FORM chunk which contains a 4 byte
# ID before the start of other chunks
fileobj.seek(0)
self.__chunks[u'FORM'] = IFFChunk(fileobj)

# Skip past the 4 byte FORM id
fileobj.seek(IFFChunk.HEADER_SIZE + 4)
self.root = IFFChunk.parse(fileobj)

# Where the next chunk can be located. We need to keep track of this
# since the size indicated in the FORM header may not match up with the
# offset determined from the size of the last chunk in the file
self.__next_offset = fileobj.tell()

# Load all of the chunks
while True:
try:
chunk = IFFChunk(fileobj, self[u'FORM'])
except InvalidChunk:
break
self.__chunks[chunk.id.strip()] = chunk

# Calculate the location of the next chunk,
# considering the pad byte
self.__next_offset = chunk.offset + chunk.size
self.__next_offset += self.__next_offset % 2
fileobj.seek(self.__next_offset)
if self.root.id != u'FORM':
raise InvalidChunk("Root chunk must be a RIFF chunk, got %s"
% self.root.id)

def __contains__(self, id_):
"""Check if the IFF file contains a specific chunk"""

assert_valid_chunk_id(id_)

return id_ in self.__chunks
try:
self[id_]
return True
except KeyError:
return False

def __getitem__(self, id_):
"""Get a chunk from the IFF file"""

assert_valid_chunk_id(id_)

try:
return self.__chunks[id_]
except KeyError:
raise KeyError(
"%r has no %r chunk" % (self.__fileobj, id_))
if id_ == 'FORM': # For backwards compatibility
return self.root
found_chunk = None
for chunk in self.root.subchunks():
if chunk.id == id_:
found_chunk = chunk
break
else:
raise KeyError("No %r chunk found" % id_)
return found_chunk

def __delitem__(self, id_):
"""Remove a chunk from the IFF file"""

assert_valid_chunk_id(id_)
self.delete_chunk(id_)

self.__chunks.pop(id_).delete()

def insert_chunk(self, id_):
"""Insert a new chunk at the end of the IFF file"""
def delete_chunk(self, id_):
"""Remove a chunk from the RIFF file"""

assert_valid_chunk_id(id_)
self[id_].delete()

self.__fileobj.seek(self.__next_offset)
self.__fileobj.write(pack('>4si', id_.ljust(4).encode('ascii'), 0))
self.__fileobj.seek(self.__next_offset)
chunk = IFFChunk(self.__fileobj, self[u'FORM'])
self[u'FORM']._update_size(self[u'FORM'].data_size + chunk.size)
def insert_chunk(self, id_, data=None):
"""Insert a new chunk at the end of the IFF file"""

self.__chunks[id_] = chunk
self.__next_offset = chunk.offset + chunk.size
assert_valid_chunk_id(id_)
return self.root.insert_chunk(id_, data)


class AIFFInfo(StreamInfo):
Expand Down Expand Up @@ -291,12 +400,7 @@ def save(self, filething=None, v2_version=4, v23_sep='/', padding=None):
except ID3Error as e:
reraise(error, e, sys.exc_info()[2])

new_size = len(data)
new_size += new_size % 2 # pad byte
assert new_size % 2 == 0
chunk.resize(new_size)
data += (new_size - len(data)) * b'\x00'
assert new_size == len(data)
chunk.resize(len(data))
chunk.write(data)

@loadfile(writable=True)
Expand Down
Loading

0 comments on commit 3252c26

Please sign in to comment.