From c7bae1bac7d9510e048b420f6692faeca9d2130c Mon Sep 17 00:00:00 2001 From: donmai-me <71143298+donmai-me@users.noreply.github.com> Date: Fri, 8 Apr 2022 02:52:30 +0800 Subject: [PATCH] Add initial support for USM alpha chunks --- CHANGELOG.md | 9 +- wannacri/usm/media/protocols.py | 261 +++----------------------------- wannacri/usm/media/video.py | 2 + wannacri/usm/tools.py | 1 + wannacri/usm/types.py | 6 +- wannacri/usm/usm.py | 199 +++++++++++++----------- 6 files changed, 141 insertions(+), 337 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c32bb18..f43047d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## [0.2.5] - 2022-04-08 ### Added - New operation in command-line called `encryptusm` which encrypts an existing USM file. +- Support for new USM Chunk `@ALP` which is used for alpha transparency videos. Currently, supports probe and extraction operations only. Thank you to [EmirioBomb](https://github.com/EmirioBomb) for bringing this to my attention and providing sample files. + +### Changed +- Renamed Usm constructor parameters. ### Removed - Check for chunk header in `chunk_size_and_padding` function @@ -49,7 +53,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fixed bug in extractusm that causes it to fail when output directory doesn't exist. - Fixed bug where program fails when directory exists. -[Unreleased]: https://github.com/donmai-me/WannaCRI/compare/0.2.4...HEAD +[Unreleased]: https://github.com/donmai-me/WannaCRI/compare/0.2.5...HEAD +[0.2.5]: https://github.com/donmai-me/WannaCRI/compare/0.2.4...0.2.5 [0.2.4]: https://github.com/donmai-me/WannaCRI/compare/0.2.3...0.2.4 [0.2.3]: https://github.com/donmai-me/WannaCRI/compare/0.2.2...0.2.3 [0.2.2]: https://github.com/donmai-me/WannaCRI/compare/0.2.1...0.2.2 diff --git a/wannacri/usm/media/protocols.py b/wannacri/usm/media/protocols.py index e8c7f3e..5773c5b 100755 --- a/wannacri/usm/media/protocols.py +++ b/wannacri/usm/media/protocols.py @@ -52,10 +52,21 @@ def metadata_pages(self, pages: Optional[List[UsmPage]]): def channel_number(self) -> int: return self._channel_number + @channel_number.setter + def channel_number(self, new_channel_number: int): + if new_channel_number < 0: + raise ValueError(f"Given negative channel number: {new_channel_number}") + + self._channel_number = new_channel_number + @property def header_page(self) -> UsmPage: return self._header_page + @header_page.setter + def header_page(self, new_header_page: UsmPage): + self._header_page = new_header_page + @property def filename(self) -> str: """A slugified filename, which is stored inside the crid_page. @@ -68,6 +79,12 @@ def filename(self) -> str: result: str = filename.val.split("/")[-1] return slugify(result, allow_unicode=True) + @filename.setter + def filename(self, new_filename: str): + new_filename = slugify(new_filename, allow_unicode=True) + self.crid_page["filename"].type = ElementType.STRING + self.crid_page["filename"].val = new_filename + def __len__(self) -> int: """The number of packets a Usm videos or audios has.""" return self._length @@ -89,6 +106,7 @@ class UsmVideo(UsmMedia, Protocol): # Classes that explicitly inherit UsmVideo should have this attribute # to use the default stream and chunks methods. _stream: Generator[Tuple[bytes, bool], None, None] + is_alpha: bool def stream( self, mode: OpMode = OpMode.NONE, key: Optional[bytes] = None @@ -317,246 +335,3 @@ def chunks( channel_number=self.channel_number, ), ] - - -# TODO: Delete the comments when done rewriting - -# class VP9: -# def __init__(self, filepath: str): -# self.filename = os.path.basename(filepath) -# self.filesize = os.path.getsize(filepath) -# self.info = ffmpeg.probe(filepath, show_entries="packet=dts,pts_time,pos,flags") - -# if len(self.info.get("streams")) == 0: -# raise Exception("No streams found") -# elif len(self.info.get("streams")) > 1: -# raise NotImplementedError("Can only accept one stream") -# elif self.info.get("format").get("format_name") != "ivf": -# raise Exception("Not an ivf file") -# elif self.info.get("streams")[0].get("codec_name") != "vp9": -# raise Exception("Not a VP9 videos") - -# video_stream = self.info.get("streams")[0] -# self.bitrate = int(self.info.get("format").get("bit_rate")) -# self.width = int(video_stream.get("width")) -# self.height = int(video_stream.get("height")) -# self.framerate_n = int(video_stream.get("r_frame_rate").split("/")[0]) -# self.framerate_d = int(video_stream.get("r_frame_rate").split("/")[1]) - -# self.frames = self.info.get("packets") -# self.keyframes = [frame for frame in self.frames if frame.get("flags") == "K_"] -# self.total_frames = len(self.frames) -# self.file = open(filepath, "rb") - -# def export( -# self, encrypt: bool, key: Optional[int] = None, encoding: str = "UTF-8" -# ) -> bytes: -# if encrypt: -# video_key1, video_key2, _ = generate_keys(key) - -# debug = open("FRAME_TIME_DEBUG", "w+") -# stream_chunks = bytearray() -# keyframe_usm_offsets = [] -# max_frame_size = 0 -# max_packed_frame_size = 0 -# max_keyframe_to_keyframe_size = 0 -# current_keyframe_to_keyframe_size = 0 -# self.file.seek(0, 0) -# for i, frame in enumerate(self.frames): -# # frame_time formula is based on existing usm files. -# # TODO: Does this hold up for videos that's not 30fps? -# debug.write("Frame {}: time = {}".format(i, frame.get("pts_time"))) -# # frame_time = int(99.86891 * i) -# frame_time = int(i * 99.9) -# if frame in self.keyframes: -# keyframe_usm_offsets.append(len(stream_chunks)) - -# if i == len(self.frames) - 1: -# # Last frame -# frame_size = self.filesize - int(frame.get("pos")) -# else: -# frame_size = int(self.frames[i + 1].get("pos")) - int(frame.get("pos")) - -# if i == 0: -# # Include 32 byte header for first frame -# max_frame_size = frame_size + 32 -# packet = self.file.read(frame_size + 32) -# else: -# if frame_size > max_frame_size: -# max_frame_size = frame_size - -# packet = self.file.read(frame_size) - -# if frame.get("flags") != "K_": -# current_keyframe_to_keyframe_size += frame_size -# elif current_keyframe_to_keyframe_size > max_keyframe_to_keyframe_size: -# max_keyframe_to_keyframe_size = current_keyframe_to_keyframe_size -# current_keyframe_to_keyframe_size = frame_size - -# if encrypt: -# packet = encrypt_video_packet(packet, video_key1, video_key2) - -# padding_size = 0x20 - (len(packet) % 0x20) if len(packet) % 0x20 != 0 else 0 -# packed_frame_chunk = generate_packed_chunk( -# "@SFV", -# 0, -# int(100 * self.framerate_n / self.framerate_d), -# frame_time, -# packet, -# padding_size, -# ) -# if len(packed_frame_chunk) > max_packed_frame_size: -# max_packed_frame_size = len(packed_frame_chunk) - -# stream_chunks += packed_frame_chunk - -# stream_chunks += generate_packed_chunk( -# "@SFV", -# 2, -# int(self.framerate_n / self.framerate_d), -# 0, -# bytes("#CONTENTS END ===============", "UTF-8") + bytes(1), -# 0, -# ) - -# seek_chunks = bytearray() -# keyframe_pages = [] -# video_header_end_offset = get_video_header_end_offset(len(self.keyframes)) -# # Offset of videos header end offset plus length of videos header end -# stream_offset = video_header_end_offset + 0x40 -# for i, keyframe in enumerate(self.keyframes): -# keyframe_usm_offset = keyframe_usm_offsets[i] -# keyframe_offset = stream_offset + keyframe_usm_offset -# keyframe_page = UsmPage("VIDEO_SEEKINFO", i) -# keyframe_page.add("ofs_byte", ElementType.clonglong, keyframe_offset) -# keyframe_page.add("ofs_frmid", ElementType.cuint, keyframe.get("dts")) -# keyframe_page.add("num_skip", ElementType.cushort, 0) -# keyframe_page.add("resv", ElementType.cushort, 0) -# keyframe_pages.append(keyframe_page) - -# seek_chunks_payload = pack_pages(keyframe_pages, string_padding=1) -# # 0x20 bytes for chunk header. -# seek_chunks_padding = ( -# video_header_end_offset - 0xA40 - 0x20 - len(seek_chunks_payload) -# ) -# seek_chunks += generate_packed_chunk( -# "@SFV", -# 3, -# int(self.framerate_n / self.framerate_d), -# 0, -# seek_chunks_payload, -# seek_chunks_padding, -# ) -# metadata_size = len(seek_chunks) -# seek_chunks += generate_packed_chunk( -# "@SFV", -# 2, -# int(self.framerate_n / self.framerate_d), -# 0, -# bytes("#METADATA END ===============", "UTF-8") + bytes(1), -# 0, -# ) - -# header_page = UsmPage("VIDEO_HDRINFO", 0) -# header_page.add("width", ElementType.cint, self.width) -# header_page.add("height", ElementType.cint, self.height) -# header_page.add("mat_width", ElementType.cint, self.width) -# header_page.add("mat_height", ElementType.cint, self.height) -# header_page.add("disp_width", ElementType.cint, self.width) -# header_page.add("disp_height", ElementType.cint, self.height) -# header_page.add("scrn_width", ElementType.cint, 0) -# header_page.add("mpeg_dcprec", ElementType.cchar, 0) -# header_page.add("mpeg_codec", ElementType.cchar, 9) -# # TODO: Check if videos has transparency -# header_page.add("alpha_type", ElementType.cint, 0) -# header_page.add("total_frames", ElementType.cint, self.total_frames) - -# framerate_n = self.framerate_n -# framerate_d = self.framerate_d - -# if framerate_d < 1000 and framerate_d != 1000: -# framerate_d *= 1000 -# framerate_n *= 1000 - -# header_page.add("framerate_n", ElementType.cint, framerate_n) -# header_page.add("framerate_d", ElementType.cint, framerate_d) -# header_page.add("metadata_count", ElementType.cint, 1) -# header_page.add("metadata_size", ElementType.cint, metadata_size) -# header_page.add("ixsize", ElementType.cint, max_packed_frame_size) -# header_page.add("pre_padding", ElementType.cint, 0) -# header_page.add("max_picture_size", ElementType.cint, 0) -# header_page.add("color_space", ElementType.cint, 0) -# header_page.add("picture_type", ElementType.cint, 0) - -# header_chunk_payload = pack_pages([header_page]) -# header_chunk_padding = 0xA00 - 0x800 - 0x20 - len(header_chunk_payload) -# header_chunk = bytearray() -# header_chunk += generate_packed_chunk( -# "@SFV", -# 1, -# int(self.framerate_n / self.framerate_d), -# 0, -# header_chunk_payload, -# header_chunk_padding, -# ) -# header_chunk += generate_packed_chunk( -# "@SFV", -# 2, -# int(self.framerate_n / self.framerate_d), -# 0, -# bytes("#HEADER END ===============", "UTF-8") + bytes(1), -# 0, -# ) - -# directory_pages = [] -# for i in range(0, 2): -# if i == 0: -# filename = os.path.splitext(self.filename)[0] + ".usm" -# # filename = r"I:\000125 千本桜\000125.usm" -# filesize = ( -# 0x800 + len(header_chunk) + len(seek_chunks) + len(stream_chunks) -# ) -# stmid = 0 -# chno = -1 -# minchk = 1 -# # TODO: Find formula for minbuf for usm -# minbuf = round(1.98746 * max_frame_size) -# minbuf += 0x10 - (minbuf % 0x10) if minbuf % 0x10 != 0 else 0 -# else: -# filename = self.filename -# # filename = r"I:\000125 千本桜\000125.ivf" - -# filesize = self.filesize -# stmid = 1079199318 # @SFV -# chno = 0 -# minchk = 3 -# minbuf = max_frame_size - -# directory_part = UsmPage("CRIUSF_DIR_STREAM", 0) -# directory_part.add("fmtver", ElementType.cint, 16777984) -# directory_part.add("filename", ElementType.cstring, filename) -# directory_part.add("filesize", ElementType.cint, filesize) -# directory_part.add("datasize", ElementType.cint, 0) -# directory_part.add("stmid", ElementType.cint, stmid) -# directory_part.add("chno", ElementType.cshort, chno) -# directory_part.add("minchk", ElementType.cshort, minchk) -# directory_part.add("minbuf", ElementType.cint, minbuf) -# directory_part.add("avbps", ElementType.cint, self.bitrate) -# directory_pages.append(directory_part) - -# directory_chunk_payload = pack_pages(directory_pages, encoding, 5) -# directory_chunk_padding = 0x800 - 0x20 - len(directory_chunk_payload) -# directory_chunk = bytearray() -# directory_chunk += generate_packed_chunk( -# "CRID", -# 1, -# int(self.framerate_n / self.framerate_d), -# 0, -# directory_chunk_payload, -# directory_chunk_padding, -# ) -# result = directory_chunk + header_chunk + seek_chunks + stream_chunks -# return bytes(result) - -# def close(self): -# self.file.close() diff --git a/wannacri/usm/media/video.py b/wannacri/usm/media/video.py index a445c57..6dfe9df 100644 --- a/wannacri/usm/media/video.py +++ b/wannacri/usm/media/video.py @@ -21,6 +21,7 @@ def __init__( length: int, channel_number: int = 0, metadata_pages: Optional[List[UsmPage]] = None, + is_alpha: bool = False, ): self._stream = stream self._crid_page = crid_page @@ -28,6 +29,7 @@ def __init__( self._length = length self._channel_number = channel_number self._metadata_pages = metadata_pages + self.is_alpha = is_alpha class Vp9(UsmVideo): diff --git a/wannacri/usm/tools.py b/wannacri/usm/tools.py index 5778812..526700e 100755 --- a/wannacri/usm/tools.py +++ b/wannacri/usm/tools.py @@ -39,6 +39,7 @@ def is_valid_chunk(signature: bytes) -> bool: bytes("CRID", "UTF-8"), # CRI USF DIR STREAM bytes("@SFV", "UTF-8"), # Video bytes("@SFA", "UTF-8"), # Audio + bytes("@ALP", "UTF-8") ] return signature[:4] in valid_signatures diff --git a/wannacri/usm/types.py b/wannacri/usm/types.py index a790497..9f67bf9 100755 --- a/wannacri/usm/types.py +++ b/wannacri/usm/types.py @@ -8,6 +8,7 @@ class ChunkType(Enum): INFO = bytearray("CRID", "UTF-8") VIDEO = bytearray("@SFV", "UTF-8") AUDIO = bytearray("@SFA", "UTF-8") + ALPHA = bytearray("@ALP", "UTF-8") @staticmethod def from_bytes(data: bytes) -> ChunkType: @@ -19,6 +20,9 @@ def from_bytes(data: bytes) -> ChunkType: raise ValueError(f"Unknown chunk signature: {bytes_to_hex(data[:4])}") + def __str__(self): + return str(self.value, "UTF-8") + class PayloadType(Enum): STREAM = 0 @@ -61,7 +65,7 @@ class ElementType(Enum): ULONGLONG = 0x17 # 8 bytes FLOAT = 0x18 # 4 bytes # TODO: Confirm DOUBLE's existence - # DOUBLE = 0x19 # 8 bytes + # DOUBLE = 0x19 # 8 bytes STRING = 0x1A # Null byte terminated BYTES = 0x1B # Bytes diff --git a/wannacri/usm/usm.py b/wannacri/usm/usm.py index 1b42784..f1d1818 100755 --- a/wannacri/usm/usm.py +++ b/wannacri/usm/usm.py @@ -39,23 +39,30 @@ class UsmChannel: class Usm: def __init__( self, - video: List[UsmVideo], - audio: Optional[List[UsmAudio]] = None, + videos: List[UsmVideo], + audios: Optional[List[UsmAudio]] = None, + alphas: Optional[List[UsmVideo]] = None, key: Optional[int] = None, usm_crid: Optional[UsmPage] = None, version: int = 16777984, ) -> None: - if len(video) == 0: + if len(videos) == 0: raise ValueError("No video given.") - if audio is None: + if audios is None: self.audios = [] else: - self.audios = audio + self.audios = audios self.audios.sort() + if alphas is None: + self.alphas = [] + else: + self.alphas = alphas + self.alphas.sort() + self.version = version - self.videos = video + self.videos = videos self.videos.sort() self._usm_crid = usm_crid @@ -65,10 +72,11 @@ def __init__( "Initialising USM", extra={ "version": self.version, - "is_key_given": False if key is None else True, - "is_usm_crid_given": False if usm_crid is None else True, + "is_key_given": key is not None, + "is_usm_crid_given": usm_crid is not None, "num_videos": len(self.videos), "num_audios": len(self.audios), + "num_alphas": len(self.alphas), }, ) @@ -146,7 +154,7 @@ def open( "usm_name": filename, "size": filesize, "encoding": encoding, - "is_key_given": key is None, + "is_key_given": key is not None, }, ) @@ -155,7 +163,7 @@ def open( if not is_usm(signature): raise ValueError(f"Invalid file signature: {bytes_to_hex(signature)}") - crids, video_channels, audio_channels = _process_chunks( + crids, video_channels, audio_channels, alpha_channels = _process_chunks( usmfile, filesize, encoding ) @@ -163,6 +171,7 @@ def open( usmmutex = threading.Lock() videos = [] audios = [] + alphas = [] version: Optional[int] = None for channel_number, video_channel in video_channels.items(): crid = [ @@ -213,6 +222,28 @@ def open( ) ) + for channel_number, alpha_channel in alpha_channels.items(): + crid = [page for page in crids if page.get("chno").val == channel_number and page.get("stmid").val == 0x40414C50] + + if len(crid) == 0: + raise ValueError(f"No crid page found for video ch {channel_number}") + + alphas.append( + GenericVideo( + video_sink( + usmfile, + usmmutex, + alpha_channel.stream, + keyframes_from_seek_pages(alpha_channel.metadata) + ), + crid[0], + alpha_channel.header, + len(alpha_channel.stream), + channel_number=channel_number, + is_alpha=True, + ) + ) + usm_crid = [page for page in crids if page.get("chno").val == -1] if len(usm_crid) == 0: raise ValueError("No usm crid page found.") @@ -220,7 +251,7 @@ def open( raise ValueError("Format version not found.") return cls( - version=version, video=videos, audio=audios, key=key, usm_crid=usm_crid[0] + version=version, videos=videos, audios=audios, alphas=alphas, key=key, usm_crid=usm_crid[0] ) def demux( @@ -228,10 +259,11 @@ def demux( path: str, save_video: bool = True, save_audio: bool = True, + save_alpha: bool = True, save_pages: bool = False, folder_name: Optional[str] = None, ) -> Tuple[List[str], List[str]]: - """Saves all videos, audios, pages (depending on configuration) of a Usm.""" + """Saves all videos, audios, alpha videos, pages (depending on configuration) of a Usm.""" if folder_name is None: folder_name = self.filename @@ -244,36 +276,34 @@ def demux( videos = [] audios = [] + alphas = [] - if save_video: - logging.info("Saving videos") - mode = OpMode.NONE if self.video_key is None else OpMode.DECRYPT - vid_output = os.path.join(output, "videos") - if not os.path.exists(vid_output): - os.mkdir(vid_output) + def save(usm_array, output_array, name, key): + if len(usm_array) == 0: + return - for vid in self.videos: - filename = os.path.join(vid_output, vid.filename) - with open(filename, "wb+") as f: - for packet, _ in vid.stream(mode, self.video_key): - f.write(packet) + logging.info(f"Saving {name}") + mode = OpMode.NONE if key is None else OpMode.DECRYPT + sub_output = os.path.join(output, name) + if not os.path.exists(sub_output): + os.mkdir(sub_output) - videos.append(filename) + for item in usm_array: + filename = os.path.join(sub_output, item.filename) + with open(filename, "wb") as f: + for packet in item.stream(mode, key): + f.write(packet if type(packet) is not tuple else packet[0]) - if save_audio: - logging.info("Saving audios") - mode = OpMode.NONE if self.audio_key is None else OpMode.DECRYPT - aud_output = os.path.join(output, "audios") - if not os.path.exists(aud_output): - os.mkdir(aud_output) + output_array.append(filename) - for aud in self.audios: - filename = os.path.join(aud_output, aud.filename) - with open(filename, "wb+") as f: - for packet in aud.stream(mode, self.audio_key): - f.write(packet) + if save_video: + save(self.videos, videos, "videos", self.video_key) - audios.append(filename) + if save_audio: + save(self.audios, audios, "audios", self.audio_key) + + if save_alpha: + save(self.alphas, alphas, "alphas", self.video_key) if save_pages: logging.info("Saving pages") @@ -379,11 +409,39 @@ def stream( yield stream_file.read(0x800) +def _chunk_helper(default_dict_ch: Dict[int, UsmChannel], chunk: UsmChunk, offset: int): + """Helper function for _process_chunks. Fills default_dict_ch with information about + the passed chunk and offset.""" + if chunk.payload_type == PayloadType.STREAM: + default_dict_ch[chunk.channel_number].stream.append( + (offset + chunk.payload_offset, len(chunk.payload)) + ) + elif chunk.payload_type == PayloadType.SECTION_END: + logging.debug( + f"{chunk.chunk_type} section end", + extra={ + "payload": bytes_to_hex(chunk.payload) if isinstance(chunk.payload, bytes) else chunk.payload, + "offset": offset, + }, + ) + elif chunk.payload_type == PayloadType.HEADER: + default_dict_ch[chunk.channel_number].header = chunk.payload[0] + elif chunk.payload_type == PayloadType.METADATA: + default_dict_ch[chunk.channel_number].metadata = chunk.payload + else: + raise ValueError(f"Unknown payload type: {chunk.payload_type}") + + def _process_chunks( - usmfile: IO, - filesize: int, - encoding: str, -) -> Tuple[List[UsmPage], Dict[int, UsmChannel], Dict[int, UsmChannel]]: + usmfile: IO, + filesize: int, + encoding: str, +) -> Tuple[List[UsmPage], Dict[int, UsmChannel], Dict[int, UsmChannel], Dict[int, UsmChannel]]: + """Helper function that reads all the chunks in a USM file and returns a tuple of + 1. A list of USM pages about the contents of the USM file. + 2. A dictionary of USM video channels. + 3. A dictionary of USM audio channels. + 4. A dictionary of USM alpha video channels.""" crids: List[UsmPage] = [] video_ch: Dict[int, UsmChannel] = defaultdict( lambda: UsmChannel(stream=[], header=UsmPage("")) @@ -391,6 +449,7 @@ def _process_chunks( audio_ch: Dict[int, UsmChannel] = defaultdict( lambda: UsmChannel(stream=[], header=UsmPage("")) ) + alpha_ch: Dict[int, UsmChannel] = defaultdict(lambda: UsmChannel(stream=[], header=UsmPage(""))) usmfile.seek(0, 0) while filesize > usmfile.tell(): @@ -423,61 +482,19 @@ def _process_chunks( "Received info chunk payload that's not a list", extra={"payload": chunk.payload}, ) + # Video chunk elif chunk.chunk_type is ChunkType.VIDEO: - if chunk.payload_type == PayloadType.STREAM: - video_ch[chunk.channel_number].stream.append( - (offset + chunk.payload_offset, len(chunk.payload)) - ) - elif chunk.payload_type == PayloadType.SECTION_END: - if isinstance(chunk.payload, bytes): - logging.debug( - "@SFV section end", - extra={ - "payload": bytes_to_hex(chunk.payload), - "offset": offset, - }, - ) - else: - logging.debug( - "@SFV section end", - extra={ - "payload": chunk.payload, - "offset": offset, - }, - ) - elif chunk.payload_type == PayloadType.HEADER: - video_ch[chunk.channel_number].header = chunk.payload[0] - elif chunk.payload_type == PayloadType.METADATA: - video_ch[chunk.channel_number].metadata = chunk.payload + _chunk_helper(video_ch, chunk, offset) + # Alpha chunk + elif chunk.chunk_type is ChunkType.ALPHA: + _chunk_helper(alpha_ch, chunk, offset) + + # Audio chunk elif chunk.chunk_type is ChunkType.AUDIO: - if chunk.payload_type == PayloadType.STREAM: - audio_ch[chunk.channel_number].stream.append( - (offset + chunk.payload_offset, len(chunk.payload)) - ) - elif chunk.payload_type == PayloadType.SECTION_END: - if isinstance(chunk.payload, bytes): - logging.debug( - "@SFA section end", - extra={ - "payload": bytes_to_hex(chunk.payload), - "offset": offset, - }, - ) - else: - logging.debug( - "@SFA section end", - extra={ - "payload": chunk.payload, - "offset": offset, - }, - ) - elif chunk.payload_type == PayloadType.HEADER: - audio_ch[chunk.channel_number].header = chunk.payload[0] - elif chunk.payload_type == PayloadType.METADATA: - audio_ch[chunk.channel_number].metadata = chunk.payload + _chunk_helper(audio_ch, chunk, offset) - return crids, video_ch, audio_ch + return crids, video_ch, audio_ch, alpha_ch def _generate_header_metadata_chunks(