Skip to content

Commit

Permalink
Add initial support for USM alpha chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
donmai-me committed Apr 7, 2022
1 parent 1d4929b commit c7bae1b
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 337 deletions.
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
## [0.2.5] - 2022-04-08
### Added
- New operation in command-line called `encryptusm` which encrypts an existing USM file.
- Support for new USM Chunk `@ALP` which is used for alpha transparency videos. Currently, supports probe and extraction operations only. Thank you to [EmirioBomb](https://github.com/EmirioBomb) for bringing this to my attention and providing sample files.

### Changed
- Renamed Usm constructor parameters.

### Removed
- Check for chunk header in `chunk_size_and_padding` function
Expand Down Expand Up @@ -49,7 +53,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed bug in extractusm that causes it to fail when output directory doesn't exist.
- Fixed bug where program fails when directory exists.

[Unreleased]: https://github.com/donmai-me/WannaCRI/compare/0.2.4...HEAD
[Unreleased]: https://github.com/donmai-me/WannaCRI/compare/0.2.5...HEAD
[0.2.5]: https://github.com/donmai-me/WannaCRI/compare/0.2.4...0.2.5
[0.2.4]: https://github.com/donmai-me/WannaCRI/compare/0.2.3...0.2.4
[0.2.3]: https://github.com/donmai-me/WannaCRI/compare/0.2.2...0.2.3
[0.2.2]: https://github.com/donmai-me/WannaCRI/compare/0.2.1...0.2.2
Expand Down
261 changes: 18 additions & 243 deletions wannacri/usm/media/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,21 @@ def metadata_pages(self, pages: Optional[List[UsmPage]]):
def channel_number(self) -> int:
return self._channel_number

@channel_number.setter
def channel_number(self, new_channel_number: int):
if new_channel_number < 0:
raise ValueError(f"Given negative channel number: {new_channel_number}")

self._channel_number = new_channel_number

@property
def header_page(self) -> UsmPage:
return self._header_page

@header_page.setter
def header_page(self, new_header_page: UsmPage):
self._header_page = new_header_page

@property
def filename(self) -> str:
"""A slugified filename, which is stored inside the crid_page.
Expand All @@ -68,6 +79,12 @@ def filename(self) -> str:
result: str = filename.val.split("/")[-1]
return slugify(result, allow_unicode=True)

@filename.setter
def filename(self, new_filename: str):
new_filename = slugify(new_filename, allow_unicode=True)
self.crid_page["filename"].type = ElementType.STRING
self.crid_page["filename"].val = new_filename

def __len__(self) -> int:
"""The number of packets a Usm videos or audios has."""
return self._length
Expand All @@ -89,6 +106,7 @@ class UsmVideo(UsmMedia, Protocol):
# Classes that explicitly inherit UsmVideo should have this attribute
# to use the default stream and chunks methods.
_stream: Generator[Tuple[bytes, bool], None, None]
is_alpha: bool

def stream(
self, mode: OpMode = OpMode.NONE, key: Optional[bytes] = None
Expand Down Expand Up @@ -317,246 +335,3 @@ def chunks(
channel_number=self.channel_number,
),
]


# TODO: Delete the comments when done rewriting

# class VP9:
# def __init__(self, filepath: str):
# self.filename = os.path.basename(filepath)
# self.filesize = os.path.getsize(filepath)
# self.info = ffmpeg.probe(filepath, show_entries="packet=dts,pts_time,pos,flags")

# if len(self.info.get("streams")) == 0:
# raise Exception("No streams found")
# elif len(self.info.get("streams")) > 1:
# raise NotImplementedError("Can only accept one stream")
# elif self.info.get("format").get("format_name") != "ivf":
# raise Exception("Not an ivf file")
# elif self.info.get("streams")[0].get("codec_name") != "vp9":
# raise Exception("Not a VP9 videos")

# video_stream = self.info.get("streams")[0]
# self.bitrate = int(self.info.get("format").get("bit_rate"))
# self.width = int(video_stream.get("width"))
# self.height = int(video_stream.get("height"))
# self.framerate_n = int(video_stream.get("r_frame_rate").split("/")[0])
# self.framerate_d = int(video_stream.get("r_frame_rate").split("/")[1])

# self.frames = self.info.get("packets")
# self.keyframes = [frame for frame in self.frames if frame.get("flags") == "K_"]
# self.total_frames = len(self.frames)
# self.file = open(filepath, "rb")

# def export(
# self, encrypt: bool, key: Optional[int] = None, encoding: str = "UTF-8"
# ) -> bytes:
# if encrypt:
# video_key1, video_key2, _ = generate_keys(key)

# debug = open("FRAME_TIME_DEBUG", "w+")
# stream_chunks = bytearray()
# keyframe_usm_offsets = []
# max_frame_size = 0
# max_packed_frame_size = 0
# max_keyframe_to_keyframe_size = 0
# current_keyframe_to_keyframe_size = 0
# self.file.seek(0, 0)
# for i, frame in enumerate(self.frames):
# # frame_time formula is based on existing usm files.
# # TODO: Does this hold up for videos that's not 30fps?
# debug.write("Frame {}: time = {}".format(i, frame.get("pts_time")))
# # frame_time = int(99.86891 * i)
# frame_time = int(i * 99.9)
# if frame in self.keyframes:
# keyframe_usm_offsets.append(len(stream_chunks))

# if i == len(self.frames) - 1:
# # Last frame
# frame_size = self.filesize - int(frame.get("pos"))
# else:
# frame_size = int(self.frames[i + 1].get("pos")) - int(frame.get("pos"))

# if i == 0:
# # Include 32 byte header for first frame
# max_frame_size = frame_size + 32
# packet = self.file.read(frame_size + 32)
# else:
# if frame_size > max_frame_size:
# max_frame_size = frame_size

# packet = self.file.read(frame_size)

# if frame.get("flags") != "K_":
# current_keyframe_to_keyframe_size += frame_size
# elif current_keyframe_to_keyframe_size > max_keyframe_to_keyframe_size:
# max_keyframe_to_keyframe_size = current_keyframe_to_keyframe_size
# current_keyframe_to_keyframe_size = frame_size

# if encrypt:
# packet = encrypt_video_packet(packet, video_key1, video_key2)

# padding_size = 0x20 - (len(packet) % 0x20) if len(packet) % 0x20 != 0 else 0
# packed_frame_chunk = generate_packed_chunk(
# "@SFV",
# 0,
# int(100 * self.framerate_n / self.framerate_d),
# frame_time,
# packet,
# padding_size,
# )
# if len(packed_frame_chunk) > max_packed_frame_size:
# max_packed_frame_size = len(packed_frame_chunk)

# stream_chunks += packed_frame_chunk

# stream_chunks += generate_packed_chunk(
# "@SFV",
# 2,
# int(self.framerate_n / self.framerate_d),
# 0,
# bytes("#CONTENTS END ===============", "UTF-8") + bytes(1),
# 0,
# )

# seek_chunks = bytearray()
# keyframe_pages = []
# video_header_end_offset = get_video_header_end_offset(len(self.keyframes))
# # Offset of videos header end offset plus length of videos header end
# stream_offset = video_header_end_offset + 0x40
# for i, keyframe in enumerate(self.keyframes):
# keyframe_usm_offset = keyframe_usm_offsets[i]
# keyframe_offset = stream_offset + keyframe_usm_offset
# keyframe_page = UsmPage("VIDEO_SEEKINFO", i)
# keyframe_page.add("ofs_byte", ElementType.clonglong, keyframe_offset)
# keyframe_page.add("ofs_frmid", ElementType.cuint, keyframe.get("dts"))
# keyframe_page.add("num_skip", ElementType.cushort, 0)
# keyframe_page.add("resv", ElementType.cushort, 0)
# keyframe_pages.append(keyframe_page)

# seek_chunks_payload = pack_pages(keyframe_pages, string_padding=1)
# # 0x20 bytes for chunk header.
# seek_chunks_padding = (
# video_header_end_offset - 0xA40 - 0x20 - len(seek_chunks_payload)
# )
# seek_chunks += generate_packed_chunk(
# "@SFV",
# 3,
# int(self.framerate_n / self.framerate_d),
# 0,
# seek_chunks_payload,
# seek_chunks_padding,
# )
# metadata_size = len(seek_chunks)
# seek_chunks += generate_packed_chunk(
# "@SFV",
# 2,
# int(self.framerate_n / self.framerate_d),
# 0,
# bytes("#METADATA END ===============", "UTF-8") + bytes(1),
# 0,
# )

# header_page = UsmPage("VIDEO_HDRINFO", 0)
# header_page.add("width", ElementType.cint, self.width)
# header_page.add("height", ElementType.cint, self.height)
# header_page.add("mat_width", ElementType.cint, self.width)
# header_page.add("mat_height", ElementType.cint, self.height)
# header_page.add("disp_width", ElementType.cint, self.width)
# header_page.add("disp_height", ElementType.cint, self.height)
# header_page.add("scrn_width", ElementType.cint, 0)
# header_page.add("mpeg_dcprec", ElementType.cchar, 0)
# header_page.add("mpeg_codec", ElementType.cchar, 9)
# # TODO: Check if videos has transparency
# header_page.add("alpha_type", ElementType.cint, 0)
# header_page.add("total_frames", ElementType.cint, self.total_frames)

# framerate_n = self.framerate_n
# framerate_d = self.framerate_d

# if framerate_d < 1000 and framerate_d != 1000:
# framerate_d *= 1000
# framerate_n *= 1000

# header_page.add("framerate_n", ElementType.cint, framerate_n)
# header_page.add("framerate_d", ElementType.cint, framerate_d)
# header_page.add("metadata_count", ElementType.cint, 1)
# header_page.add("metadata_size", ElementType.cint, metadata_size)
# header_page.add("ixsize", ElementType.cint, max_packed_frame_size)
# header_page.add("pre_padding", ElementType.cint, 0)
# header_page.add("max_picture_size", ElementType.cint, 0)
# header_page.add("color_space", ElementType.cint, 0)
# header_page.add("picture_type", ElementType.cint, 0)

# header_chunk_payload = pack_pages([header_page])
# header_chunk_padding = 0xA00 - 0x800 - 0x20 - len(header_chunk_payload)
# header_chunk = bytearray()
# header_chunk += generate_packed_chunk(
# "@SFV",
# 1,
# int(self.framerate_n / self.framerate_d),
# 0,
# header_chunk_payload,
# header_chunk_padding,
# )
# header_chunk += generate_packed_chunk(
# "@SFV",
# 2,
# int(self.framerate_n / self.framerate_d),
# 0,
# bytes("#HEADER END ===============", "UTF-8") + bytes(1),
# 0,
# )

# directory_pages = []
# for i in range(0, 2):
# if i == 0:
# filename = os.path.splitext(self.filename)[0] + ".usm"
# # filename = r"I:\000125 千本桜\000125.usm"
# filesize = (
# 0x800 + len(header_chunk) + len(seek_chunks) + len(stream_chunks)
# )
# stmid = 0
# chno = -1
# minchk = 1
# # TODO: Find formula for minbuf for usm
# minbuf = round(1.98746 * max_frame_size)
# minbuf += 0x10 - (minbuf % 0x10) if minbuf % 0x10 != 0 else 0
# else:
# filename = self.filename
# # filename = r"I:\000125 千本桜\000125.ivf"

# filesize = self.filesize
# stmid = 1079199318 # @SFV
# chno = 0
# minchk = 3
# minbuf = max_frame_size

# directory_part = UsmPage("CRIUSF_DIR_STREAM", 0)
# directory_part.add("fmtver", ElementType.cint, 16777984)
# directory_part.add("filename", ElementType.cstring, filename)
# directory_part.add("filesize", ElementType.cint, filesize)
# directory_part.add("datasize", ElementType.cint, 0)
# directory_part.add("stmid", ElementType.cint, stmid)
# directory_part.add("chno", ElementType.cshort, chno)
# directory_part.add("minchk", ElementType.cshort, minchk)
# directory_part.add("minbuf", ElementType.cint, minbuf)
# directory_part.add("avbps", ElementType.cint, self.bitrate)
# directory_pages.append(directory_part)

# directory_chunk_payload = pack_pages(directory_pages, encoding, 5)
# directory_chunk_padding = 0x800 - 0x20 - len(directory_chunk_payload)
# directory_chunk = bytearray()
# directory_chunk += generate_packed_chunk(
# "CRID",
# 1,
# int(self.framerate_n / self.framerate_d),
# 0,
# directory_chunk_payload,
# directory_chunk_padding,
# )
# result = directory_chunk + header_chunk + seek_chunks + stream_chunks
# return bytes(result)

# def close(self):
# self.file.close()
2 changes: 2 additions & 0 deletions wannacri/usm/media/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ def __init__(
length: int,
channel_number: int = 0,
metadata_pages: Optional[List[UsmPage]] = None,
is_alpha: bool = False,
):
self._stream = stream
self._crid_page = crid_page
self._header_page = header_page
self._length = length
self._channel_number = channel_number
self._metadata_pages = metadata_pages
self.is_alpha = is_alpha


class Vp9(UsmVideo):
Expand Down
1 change: 1 addition & 0 deletions wannacri/usm/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def is_valid_chunk(signature: bytes) -> bool:
bytes("CRID", "UTF-8"), # CRI USF DIR STREAM
bytes("@SFV", "UTF-8"), # Video
bytes("@SFA", "UTF-8"), # Audio
bytes("@ALP", "UTF-8")
]
return signature[:4] in valid_signatures

Expand Down
6 changes: 5 additions & 1 deletion wannacri/usm/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class ChunkType(Enum):
INFO = bytearray("CRID", "UTF-8")
VIDEO = bytearray("@SFV", "UTF-8")
AUDIO = bytearray("@SFA", "UTF-8")
ALPHA = bytearray("@ALP", "UTF-8")

@staticmethod
def from_bytes(data: bytes) -> ChunkType:
Expand All @@ -19,6 +20,9 @@ def from_bytes(data: bytes) -> ChunkType:

raise ValueError(f"Unknown chunk signature: {bytes_to_hex(data[:4])}")

def __str__(self):
return str(self.value, "UTF-8")


class PayloadType(Enum):
STREAM = 0
Expand Down Expand Up @@ -61,7 +65,7 @@ class ElementType(Enum):
ULONGLONG = 0x17 # 8 bytes
FLOAT = 0x18 # 4 bytes
# TODO: Confirm DOUBLE's existence
# DOUBLE = 0x19 # 8 bytes
# DOUBLE = 0x19 # 8 bytes
STRING = 0x1A # Null byte terminated
BYTES = 0x1B # Bytes

Expand Down
Loading

0 comments on commit c7bae1b

Please sign in to comment.