Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIFF: Fix handling of padding bytes, safe chunk manipulation #409

Merged
merged 4 commits into from
Nov 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 189 additions & 85 deletions mutagen/aiff.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Evan Purkhiser
# 2014 Ben Ockmore
# 2019 Philipp Wolfer
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -18,8 +19,14 @@

from mutagen.id3 import ID3
from mutagen.id3._util import ID3NoHeaderError, error as ID3Error
from mutagen._util import resize_bytes, delete_bytes, MutagenError, loadfile, \
convert_error
from mutagen._util import (
MutagenError,
convert_error,
delete_bytes,
insert_bytes,
loadfile,
resize_bytes,
)

__all__ = ["AIFF", "Open", "delete"]

Expand Down Expand Up @@ -73,143 +80,245 @@ class IFFChunk(object):
# Chunk headers are 8 bytes long (4 for ID and 4 for the size)
HEADER_SIZE = 8

def __init__(self, fileobj, parent_chunk=None):
self.__fileobj = fileobj
self.parent_chunk = parent_chunk
self.offset = fileobj.tell()
@classmethod
def parse(cls, fileobj, parent_chunk=None):
header = fileobj.read(cls.HEADER_SIZE)
if len(header) < cls.HEADER_SIZE:
raise InvalidChunk('Header size < %i' % cls.HEADER_SIZE)

header = fileobj.read(self.HEADER_SIZE)
if len(header) < self.HEADER_SIZE:
raise InvalidChunk()
id, data_size = struct.unpack('>4sI', header)
try:
id = id.decode('ascii').rstrip()
except UnicodeDecodeError as e:
raise InvalidChunk(e)

self.id, self.data_size = struct.unpack('>4si', header)
if not is_valid_chunk_id(id):
raise InvalidChunk('Invalid chunk ID %s' % id)

try:
self.id = self.id.decode('ascii')
except UnicodeDecodeError:
raise InvalidChunk()
return cls.get_class(id)(fileobj, id, data_size, parent_chunk)

if not is_valid_chunk_id(self.id):
raise InvalidChunk()
@classmethod
def get_class(cls, id):
if id == 'FORM':
return FormIFFChunk
else:
return cls

self.size = self.HEADER_SIZE + self.data_size
def __init__(self, fileobj, id, data_size, parent_chunk):
self._fileobj = fileobj
self.id = id
self.data_size = data_size
self.parent_chunk = parent_chunk
self.data_offset = fileobj.tell()
self.offset = self.data_offset - self.HEADER_SIZE
self._calculate_size()

def read(self):
"""Read the chunks data"""

self.__fileobj.seek(self.data_offset)
return self.__fileobj.read(self.data_size)
self._fileobj.seek(self.data_offset)
return self._fileobj.read(self.data_size)

def write(self, data):
"""Write the chunk data"""

if len(data) > self.data_size:
raise ValueError

self.__fileobj.seek(self.data_offset)
self.__fileobj.write(data)
self._fileobj.seek(self.data_offset)
self._fileobj.write(data)
# Write the padding bytes
padding = self.padding()
if padding:
self._fileobj.seek(self.data_offset + self.data_size)
self._fileobj.write(b'\x00' * padding)

def delete(self):
"""Removes the chunk from the file"""

delete_bytes(self.__fileobj, self.size, self.offset)
delete_bytes(self._fileobj, self.size, self.offset)
if self.parent_chunk is not None:
self.parent_chunk._update_size(
self.parent_chunk.data_size - self.size)
self.parent_chunk._remove_subchunk(self)
self._fileobj.flush()

def _update_size(self, data_size):
def _update_size(self, size_diff, changed_subchunk=None):
"""Update the size of the chunk"""

self.__fileobj.seek(self.offset + 4)
self.__fileobj.write(pack('>I', data_size))
old_size = self.size
self.data_size += size_diff
self._fileobj.seek(self.offset + 4)
self._fileobj.write(pack('>I', self.data_size))
self._calculate_size()
if self.parent_chunk is not None:
size_diff = self.data_size - data_size
self.parent_chunk._update_size(
self.parent_chunk.data_size - size_diff)
self.data_size = data_size
self.size = data_size + self.HEADER_SIZE
self.parent_chunk._update_size(self.size - old_size, self)
if changed_subchunk:
self._update_sibling_offsets(
changed_subchunk, old_size - self.size)

def _calculate_size(self):
self.size = self.HEADER_SIZE + self.data_size + self.padding()
assert self.size % 2 == 0

def resize(self, new_data_size):
"""Resize the file and update the chunk sizes"""

resize_bytes(
self.__fileobj, self.data_size, new_data_size, self.data_offset)
self._update_size(new_data_size)
padding = new_data_size % 2
resize_bytes(self._fileobj, self.data_size + self.padding(),
new_data_size + padding, self.data_offset)
size_diff = new_data_size - self.data_size
self._update_size(size_diff)
self._fileobj.flush()

def padding(self):
"""Returns the number of padding bytes (0 or 1).
IFF chunks are required to be a even number in total length. If
data_size is odd a padding byte will be added at the end.
"""
return self.data_size % 2


class FormIFFChunk(IFFChunk):
"""A IFF chunk containing other chunks.
This is either a 'LIST' or 'RIFF'
"""

MIN_DATA_SIZE = 4

def __init__(self, fileobj, id, data_size, parent_chunk):
if id != u'FORM':
raise InvalidChunk('Expected FORM chunk, got %s' % id)

IFFChunk.__init__(self, fileobj, id, data_size, parent_chunk)

# Lists always store an addtional identifier as 4 bytes
if data_size < self.MIN_DATA_SIZE:
raise InvalidChunk('FORM data size < %i' % self.MIN_DATA_SIZE)

# Read the FORM id (usually AIFF)
try:
self.name = fileobj.read(4).decode('ascii')
except UnicodeDecodeError as e:
raise error(e)

# Load all IFF subchunks
self.__subchunks = []

def subchunks(self):
"""Returns a list of all subchunks.
The list is lazily loaded on first access.
"""
if not self.__subchunks:
next_offset = self.data_offset + 4
while next_offset < self.offset + self.size:
self._fileobj.seek(next_offset)
try:
chunk = IFFChunk.parse(self._fileobj, self)
except InvalidChunk:
break
self.__subchunks.append(chunk)

# Calculate the location of the next chunk
next_offset = chunk.offset + chunk.size
return self.__subchunks

def insert_chunk(self, id_, data=None):
"""Insert a new chunk at the end of the FORM chunk"""

assert isinstance(id_, text_type)

if not is_valid_chunk_id(id_):
raise KeyError("Invalid IFF key.")

next_offset = self.offset + self.size
size = self.HEADER_SIZE
data_size = 0
if data:
data_size = len(data)
padding = data_size % 2
size += data_size + padding
insert_bytes(self._fileobj, size, next_offset)
self._fileobj.seek(next_offset)
self._fileobj.write(
pack('>4si', id_.ljust(4).encode('ascii'), data_size))
self._fileobj.seek(next_offset)
chunk = IFFChunk.parse(self._fileobj, self)
self._update_size(chunk.size)
if data:
chunk.write(data)
self.subchunks().append(chunk)
self._fileobj.flush()
return chunk

def _remove_subchunk(self, chunk):
assert chunk in self.__subchunks
self._update_size(-chunk.size, chunk)
self.__subchunks.remove(chunk)

def _update_sibling_offsets(self, changed_subchunk, size_diff):
"""Update the offsets of subchunks after `changed_subchunk`.
"""
index = self.__subchunks.index(changed_subchunk)
sibling_chunks = self.__subchunks[index + 1:len(self.__subchunks)]
for sibling in sibling_chunks:
sibling.offset -= size_diff
sibling.data_offset -= size_diff


class IFFFile(object):
"""Representation of a IFF file"""

def __init__(self, fileobj):
self.__fileobj = fileobj
self.__chunks = {}

# AIFF Files always start with the FORM chunk which contains a 4 byte
# ID before the start of other chunks
fileobj.seek(0)
self.__chunks[u'FORM'] = IFFChunk(fileobj)

# Skip past the 4 byte FORM id
fileobj.seek(IFFChunk.HEADER_SIZE + 4)
self.root = IFFChunk.parse(fileobj)

# Where the next chunk can be located. We need to keep track of this
# since the size indicated in the FORM header may not match up with the
# offset determined from the size of the last chunk in the file
self.__next_offset = fileobj.tell()

# Load all of the chunks
while True:
try:
chunk = IFFChunk(fileobj, self[u'FORM'])
except InvalidChunk:
break
self.__chunks[chunk.id.strip()] = chunk

# Calculate the location of the next chunk,
# considering the pad byte
self.__next_offset = chunk.offset + chunk.size
self.__next_offset += self.__next_offset % 2
fileobj.seek(self.__next_offset)
if self.root.id != u'FORM':
raise InvalidChunk("Root chunk must be a RIFF chunk, got %s"
% self.root.id)

def __contains__(self, id_):
"""Check if the IFF file contains a specific chunk"""

assert_valid_chunk_id(id_)

return id_ in self.__chunks
try:
self[id_]
return True
except KeyError:
return False

def __getitem__(self, id_):
"""Get a chunk from the IFF file"""

assert_valid_chunk_id(id_)

try:
return self.__chunks[id_]
except KeyError:
raise KeyError(
"%r has no %r chunk" % (self.__fileobj, id_))
if id_ == 'FORM': # For backwards compatibility
return self.root
found_chunk = None
for chunk in self.root.subchunks():
if chunk.id == id_:
found_chunk = chunk
break
else:
raise KeyError("No %r chunk found" % id_)
return found_chunk

def __delitem__(self, id_):
"""Remove a chunk from the IFF file"""

assert_valid_chunk_id(id_)
self.delete_chunk(id_)

self.__chunks.pop(id_).delete()

def insert_chunk(self, id_):
"""Insert a new chunk at the end of the IFF file"""
def delete_chunk(self, id_):
"""Remove a chunk from the RIFF file"""

assert_valid_chunk_id(id_)
self[id_].delete()

self.__fileobj.seek(self.__next_offset)
self.__fileobj.write(pack('>4si', id_.ljust(4).encode('ascii'), 0))
self.__fileobj.seek(self.__next_offset)
chunk = IFFChunk(self.__fileobj, self[u'FORM'])
self[u'FORM']._update_size(self[u'FORM'].data_size + chunk.size)
def insert_chunk(self, id_, data=None):
"""Insert a new chunk at the end of the IFF file"""

self.__chunks[id_] = chunk
self.__next_offset = chunk.offset + chunk.size
assert_valid_chunk_id(id_)
return self.root.insert_chunk(id_, data)


class AIFFInfo(StreamInfo):
Expand Down Expand Up @@ -291,12 +400,7 @@ def save(self, filething=None, v2_version=4, v23_sep='/', padding=None):
except ID3Error as e:
reraise(error, e, sys.exc_info()[2])

new_size = len(data)
new_size += new_size % 2 # pad byte
assert new_size % 2 == 0
chunk.resize(new_size)
data += (new_size - len(data)) * b'\x00'
assert new_size == len(data)
chunk.resize(len(data))
chunk.write(data)

@loadfile(writable=True)
Expand Down
Loading