Skip to content

Commit

Permalink
riff: Update in-memory offsets when modifying chunks
Browse files Browse the repository at this point in the history
This allows to safely perform multiple chunk insert / delete / resize operations on loaded RIFF files.

See #392
  • Loading branch information
phw committed Nov 15, 2019
1 parent 569b4d0 commit fe67c09
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 114 deletions.
252 changes: 163 additions & 89 deletions mutagen/_riff.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

from ._compat import text_type

from mutagen._util import resize_bytes, delete_bytes, MutagenError
from mutagen._util import (
MutagenError,
delete_bytes,
insert_bytes,
resize_bytes,
)


class error(MutagenError):
Expand Down Expand Up @@ -55,173 +60,242 @@ def assert_valid_chunk_id(id):
raise ValueError("Invalid RIFF-chunk-ID.")


class RiffChunkHeader(object):
""" RIFF chunk header"""
class RiffChunk(object):
"""Generic RIFF chunk"""

# Chunk headers are 8 bytes long (4 for ID and 4 for the size)
HEADER_SIZE = 8

def __init__(self, fileobj, parent_chunk):
self.__fileobj = fileobj
self.parent_chunk = parent_chunk
self.offset = fileobj.tell()

header = fileobj.read(self.HEADER_SIZE)
if len(header) < self.HEADER_SIZE:
raise InvalidChunk('Header size < %i' % self.HEADER_SIZE)

self.id, self.data_size = struct.unpack('<4sI', header)
self.data_offset = fileobj.tell()
@classmethod
def parse(cls, fileobj, parent_chunk=None):
header = fileobj.read(cls.HEADER_SIZE)
if len(header) < cls.HEADER_SIZE:
raise InvalidChunk('Header size < %i' % cls.HEADER_SIZE)

id, data_size = struct.unpack('<4sI', header)
try:
self.id = self.id.decode('ascii').rstrip()
id = id.decode('ascii').rstrip()
except UnicodeDecodeError as e:
raise InvalidChunk(e)

if not is_valid_chunk_id(self.id):
raise InvalidChunk('Invalid chunk ID %s' % self.id)
if not is_valid_chunk_id(id):
raise InvalidChunk('Invalid chunk ID %s' % id)

return cls.get_class(id)(fileobj, id, data_size, parent_chunk)

@classmethod
def get_class(cls, id):
if id in (u'LIST', u'RIFF'):
return ListRiffChunk
else:
return cls

def __init__(self, fileobj, id, data_size, parent_chunk):
self._fileobj = fileobj
self.id = id
self.data_size = data_size
self.parent_chunk = parent_chunk
self.data_offset = fileobj.tell()
self.offset = self.data_offset - self.HEADER_SIZE
self._calculate_size()

def read(self):
"""Read the chunks data"""

self.__fileobj.seek(self.data_offset)
return self.__fileobj.read(self.data_size)
self._fileobj.seek(self.data_offset)
return self._fileobj.read(self.data_size)

def write(self, data):
"""Write the chunk data"""

if len(data) > self.data_size:
raise ValueError

self.__fileobj.seek(self.data_offset)
self.__fileobj.write(data)
self._fileobj.seek(self.data_offset)
self._fileobj.write(data)
# Write the padding bytes
padding = self.padding()
if padding:
self.__fileobj.seek(self.data_offset + self.data_size + 1)
self.__fileobj.write(b'\x00' * padding)
self._fileobj.seek(self.data_offset + self.data_size + 1)
self._fileobj.write(b'\x00' * padding)

def delete(self):
"""Removes the chunk from the file"""

delete_bytes(self.__fileobj, self.size, self.offset)
delete_bytes(self._fileobj, self.size, self.offset)
if self.parent_chunk is not None:
self.parent_chunk._update_size(
self.parent_chunk.data_size - self.size)
self.parent_chunk._remove_subchunk(self)

def _update_size(self, data_size):
def _update_size(self, size_diff, changed_subchunk=None):
"""Update the size of the chunk"""

self.__fileobj.seek(self.offset + 4)
self.__fileobj.write(pack('<I', data_size))
if self.parent_chunk is not None:
new_padding = data_size % 2
size_diff = (self.data_size + self.padding()) \
- (data_size + new_padding)
self.parent_chunk._update_size(
self.parent_chunk.data_size - size_diff)
self.data_size = data_size
old_size = self.size
self.data_size += size_diff
self._fileobj.seek(self.offset + 4)
self._fileobj.write(pack('<I', self.data_size))
self._calculate_size()
if self.parent_chunk is not None:
self.parent_chunk._update_size(self.size - old_size, self)
if changed_subchunk:
self._update_sibling_offsets(
changed_subchunk, old_size - self.size)

def _calculate_size(self):
# Consider the padding byte for the total size of this chunk
self.size = self.HEADER_SIZE + self.data_size + self.padding()
assert self.size % 2 == 0

def resize(self, new_data_size):
"""Resize the file and update the chunk sizes"""

padding = new_data_size % 2
resize_bytes(self.__fileobj, self.data_size + self.padding(),
resize_bytes(self._fileobj, self.data_size + self.padding(),
new_data_size + padding, self.data_offset)
self._update_size(new_data_size)
size_diff = new_data_size - self.data_size
self._update_size(size_diff)

def padding(self):
"""Returns the number of padding bytes (0 or 1).
IFF chunks are required to be a even number in total length. If
RIFF chunks are required to be a even number in total length. If
data_size is odd a padding byte will be added at the end.
"""
return self.data_size % 2


class ListRiffChunk(RiffChunk):
"""A RIFF chunk containing other chunks.
This is either a 'LIST' or 'RIFF'
"""

MIN_DATA_SIZE = 4

def __init__(self, fileobj, id, data_size, parent_chunk):
if id not in (u'RIFF', u'LIST'):
raise InvalidChunk('Expected RIFF or LIST chunk, got %s' % id)

RiffChunk.__init__(self, fileobj, id, data_size, parent_chunk)

# Lists always store an addtional identifier as 4 bytes
if data_size < self.MIN_DATA_SIZE:
raise InvalidChunk('List data size < %i' % self.MIN_DATA_SIZE)

# Read the list name (e.g. WAVE for RIFF chunks, or INFO for LIST)
try:
self.name = fileobj.read(4).decode('ascii')
except UnicodeDecodeError as e:
raise error(e)

# Load all RIFF subchunks
self.__subchunks = []

def subchunks(self):
"""Returns a list of all subchunks.
The list is lazily loaded on first access.
"""
if not self.__subchunks:
next_offset = self.data_offset + 4
while next_offset < self.offset + self.size:
self._fileobj.seek(next_offset)
try:
chunk = RiffChunk.parse(self._fileobj, self)
except InvalidChunk:
break
self.__subchunks.append(chunk)

# Calculate the location of the next chunk
next_offset = chunk.offset + chunk.size
return self.__subchunks

def insert_chunk(self, id_, data=None):
"""Insert a new chunk at the end of the RIFF or LIST"""

assert isinstance(id_, text_type)

if not is_valid_chunk_id(id_):
raise KeyError("Invalid RIFF key.")

next_offset = self.offset + self.size
size = self.HEADER_SIZE
data_size = 0
if data:
data_size = len(data)
padding = data_size % 2
size += data_size + padding
insert_bytes(self._fileobj, size, next_offset)
self._fileobj.seek(next_offset)
self._fileobj.write(
pack('<4si', id_.ljust(4).encode('ascii'), data_size))
self._fileobj.seek(next_offset)
chunk = RiffChunk.parse(self._fileobj, self)
self._update_size(chunk.size)
if data:
chunk.write(data)
self.subchunks().append(chunk)
return chunk

def _remove_subchunk(self, chunk):
assert chunk in self.__subchunks
self._update_size(-chunk.size, chunk)
self.__subchunks.remove(chunk)

def _update_sibling_offsets(self, changed_subchunk, size_diff):
"""Update the offsets of subchunks after `changed_subchunk`.
"""
index = self.__subchunks.index(changed_subchunk)
sibling_chunks = self.__subchunks[index + 1:len(self.__subchunks)]
for sibling in sibling_chunks:
sibling.offset -= size_diff
sibling.data_offset -= size_diff


class RiffFile(object):
"""Representation of a RIFF file
Ref: http://www.johnloomis.org/cpe102/asgn/asgn1/riff.html
"""

def __init__(self, fileobj):
self._fileobj = fileobj
self.__subchunks = {}

# Reset read pointer to beginning of RIFF file
fileobj.seek(0)

# RIFF Files always start with the RIFF chunk
self._riff_chunk = RiffChunkHeader(fileobj, parent_chunk=None)
self.root = RiffChunk.parse(fileobj)

if (self._riff_chunk.id != 'RIFF'):
raise KeyError("Root chunk should be a RIFF chunk.")

# Read the RIFF file Type
try:
self.file_type = fileobj.read(4).decode('ascii')
except UnicodeDecodeError as e:
raise error(e)
self.__next_offset = fileobj.tell()

# Load all RIFF subchunks
while True:
try:
chunk = RiffChunkHeader(fileobj, self._riff_chunk)
except InvalidChunk:
break
# Normalize ID3v2-tag-chunk to lowercase
if chunk.id == 'ID3':
chunk.id = 'id3'
self.__subchunks[chunk.id] = chunk
if self.root.id != u'RIFF':
raise InvalidChunk("Root chunk must be a RIFF chunk, got %s"
% self.root.id)

# Calculate the location of the next chunk,
# considering the pad byte
self.__next_offset = chunk.offset + chunk.size
fileobj.seek(self.__next_offset)
self.file_type = self.root.name

def __contains__(self, id_):
"""Check if the RIFF file contains a specific chunk"""

assert_valid_chunk_id(id_)
return id_ in self.__subchunks
try:
self[id_]
return True
except KeyError:
return False

def __getitem__(self, id_):
"""Get a chunk from the RIFF file"""

assert_valid_chunk_id(id_)

try:
return self.__subchunks[id_]
except KeyError:
raise KeyError("%r has no %r chunk" % (self._fileobj, id_))
found_chunk = None
for chunk in self.root.subchunks():
if chunk.id == id_:
found_chunk = chunk
break
else:
raise KeyError("No %r chunk found" % id_)
return found_chunk

def delete_chunk(self, id_):
"""Remove a chunk from the RIFF file"""

assert_valid_chunk_id(id_)
self.__subchunks.pop(id_).delete()
self[id_].delete()

def insert_chunk(self, id_):
def insert_chunk(self, id_, data=None):
"""Insert a new chunk at the end of the RIFF file"""

assert isinstance(id_, text_type)

if not is_valid_chunk_id(id_):
raise KeyError("Invalid RIFF key.")

self._fileobj.seek(self.__next_offset)
self._fileobj.write(pack('<4si', id_.ljust(4).encode('ascii'), 0))
self._fileobj.seek(self.__next_offset)
chunk = RiffChunkHeader(self._fileobj, self._riff_chunk)
self._riff_chunk._update_size(self._riff_chunk.data_size + chunk.size)

self.__subchunks[id_] = chunk
self.__next_offset = chunk.offset + chunk.size
return self.root.insert_chunk(id_, data)
5 changes: 5 additions & 0 deletions mutagen/wave.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2017 Borewit
# Copyright (C) 2019 Philipp Wolfer
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -36,6 +37,10 @@ def __init__(self, fileobj):
if self.file_type != u'WAVE':
raise error("Expected RIFF/WAVE.")

# Normalize ID3v2-tag-chunk to lowercase
if u'ID3' in self:
self[u'ID3'].id = u'id3'


class WaveStreamInfo(StreamInfo):
"""WaveStreamInfo()
Expand Down
Binary file modified tests/data/silence-2s-PCM-16000-08-ID3v23.wav
Binary file not shown.
Loading

0 comments on commit fe67c09

Please sign in to comment.