From 579044882be3cceb8031882cc1fbf448e10a41d0 Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Sun, 22 Nov 2020 02:56:01 +0900 Subject: [PATCH 01/17] Simplify the logic of finding a new page header --- src/reading.rs | 199 +++---------------------------------------------- 1 file changed, 12 insertions(+), 187 deletions(-) diff --git a/src/reading.rs b/src/reading.rs index 53ddd7e..50bf8e7 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -12,7 +12,7 @@ Reading logic use std::error; use std::io; -use std::io::{Cursor, Read, Write, SeekFrom, Error, ErrorKind}; +use std::io::{Cursor, Write, SeekFrom, Error, ErrorKind}; use byteorder::{ReadBytesExt, LittleEndian}; use std::collections::HashMap; use std::collections::hash_map::Entry; @@ -20,7 +20,6 @@ use std::fmt::{Display, Formatter, Error as FmtError}; use std::mem::replace; use crc::vorbis_crc32_update; use Packet; -use std::io::Seek; /// Error that can be raised when decoding an Ogg transport. #[derive(Debug)] @@ -59,7 +58,7 @@ impl error::Error for OggReadError { fn cause(&self) -> Option<&dyn error::Error> { match *self { - OggReadError::ReadError(ref err) => Some(err as &error::Error), + OggReadError::ReadError(ref err) => Some(err as &dyn error::Error), _ => None } } @@ -528,180 +527,6 @@ impl BasePacketReader { } } -#[derive(Clone, Copy)] -enum UntilPageHeaderReaderMode { - Searching, - FoundWithNeeded(u8), - SeekNeeded(i32), - Found, -} - -enum UntilPageHeaderResult { - Eof, - Found, - ReadNeeded, - SeekNeeded, -} - -struct UntilPageHeaderReader { - mode :UntilPageHeaderReaderMode, - /// Capture pattern offset. Needed so that if we only partially - /// recognized the capture pattern, we later on only check the - /// remaining part. - cpt_of :u8, - /// The return buffer. - ret_buf :[u8; 27], - read_amount :usize, -} - -impl UntilPageHeaderReader { - pub fn new() -> Self { - UntilPageHeaderReader { - mode : UntilPageHeaderReaderMode::Searching, - cpt_of : 0, - ret_buf : [0; 27], - read_amount : 0, - } - } - /// Returns Some(off), where off is the offset of the last byte - /// of the capture pattern if it's found, None if the capture pattern - /// is not inside the passed slice. - /// - /// Changes the capture pattern offset accordingly - fn check_arr(&mut self, arr :&[u8]) -> Option { - for (i, ch) in arr.iter().enumerate() { - match *ch { - b'O' => self.cpt_of = 1, - b'g' if self.cpt_of == 1 || self.cpt_of == 2 => self.cpt_of += 1, - b'S' if self.cpt_of == 3 => return Some(i), - _ => self.cpt_of = 0, - } - } - return None; - } - /// Do one read exactly, and if it was successful, - /// return Ok(true) if the full header has been read and can be extracted with - /// - /// or return Ok(false) if the - pub fn do_read(&mut self, mut rdr :R) - -> Result { - use self::UntilPageHeaderReaderMode::*; - use self::UntilPageHeaderResult as Res; - // The array's size is freely choseable, but must be > 27, - // and must well fit into an i32 (needs to be stored in SeekNeeded) - let mut buf :[u8; 1024] = [0; 1024]; - - let rd_len = tri!(rdr.read(if self.read_amount < 27 { - // This is an optimisation for the most likely case: - // the next page directly follows the current read position. - // Then it would be a waste to read more than the needed amount. - &mut buf[0 .. 27 - self.read_amount] - } else { - match self.mode { - Searching => &mut buf, - FoundWithNeeded(amount) => &mut buf[0 .. amount as usize], - SeekNeeded(_) => return Ok(Res::SeekNeeded), - Found => return Ok(Res::Found), - } - })); - - if rd_len == 0 { - // Reached EOF. - if self.read_amount == 0 { - // If we have read nothing yet, there is no data - // but ogg data, meaning the stream ends legally - // and without corruption. - return Ok(Res::Eof); - } else { - // There is most likely a corruption here. - // I'm not sure, but the ogg spec doesn't say that - // random data past the last ogg page is allowed, - // so we just assume it's not allowed. - tri!(Err(OggReadError::NoCapturePatternFound)); - } - } - self.read_amount += rd_len; - - // 150 kb gives us a bit of safety: we can survive - // up to one page with a corrupted capture pattern - // after having seeked right after a capture pattern - // of an earlier page. - let read_amount_max = 150 * 1024; - if self.read_amount > read_amount_max { - // Exhaustive searching for the capture pattern - // has returned no ogg capture pattern. - tri!(Err(OggReadError::NoCapturePatternFound)); - } - - let rd_buf = &buf[0 .. rd_len]; - - use std::cmp::min; - let (off, needed) = match self.mode { - Searching => match self.check_arr(rd_buf) { - // Capture pattern found - Some(off) => { - self.ret_buf[0] = b'O'; - self.ret_buf[1] = b'g'; - self.ret_buf[2] = b'g'; - self.ret_buf[3] = b'S'; // (Not actually needed) - (off, 24) - }, - // Nothing found - None => return Ok(Res::ReadNeeded), - }, - FoundWithNeeded(needed) => { - (0, needed as usize) - }, - _ => unimplemented!(), - }; - - let fnd_buf = &rd_buf[off..]; - - let copy_amount = min(needed, fnd_buf.len()); - let start_fill = 27 - needed; - (&mut self.ret_buf[start_fill .. copy_amount + start_fill]) - .copy_from_slice(&fnd_buf[0 .. copy_amount]); - if fnd_buf.len() == needed { - // Capture pattern found! - self.mode = Found; - return Ok(Res::Found); - } else if fnd_buf.len() < needed { - // We still have to read some content. - let needed_new = needed - copy_amount; - self.mode = FoundWithNeeded(needed_new as u8); - return Ok(Res::ReadNeeded); - } else { - // We have read too much content (exceeding the header). - // Seek back so that we are at the position - // right after the header. - - self.mode = SeekNeeded(needed as i32 - fnd_buf.len() as i32); - return Ok(Res::SeekNeeded); - } - } - pub fn do_seek(&mut self, mut skr :S) - -> Result { - use self::UntilPageHeaderReaderMode::*; - use self::UntilPageHeaderResult as Res; - match self.mode { - Searching | FoundWithNeeded(_) => Ok(Res::ReadNeeded), - SeekNeeded(offs) => { - tri!(skr.seek(SeekFrom::Current(offs as i64))); - self.mode = Found; - Ok(Res::Found) - }, - Found => Ok(Res::Found), - } - } - pub fn into_header(self) -> [u8; 27] { - use self::UntilPageHeaderReaderMode::*; - match self.mode { - Found => self.ret_buf, - _ => panic!("wrong mode"), - } - } -} - /** Reader for packets from an Ogg stream. @@ -767,18 +592,18 @@ impl PacketReader { /// Ok(None) is returned if the stream has ended without an uncompleted page /// or non page data after the last page (if any) present. fn read_until_pg_header(&mut self) -> Result, OggReadError> { - let mut r = UntilPageHeaderReader::new(); - use self::UntilPageHeaderResult::*; - let mut res = tri!(r.do_read(&mut self.rdr)); - loop { - res = match res { - Eof => return Ok(None), - Found => break, - ReadNeeded => tri!(r.do_read(&mut self.rdr)), - SeekNeeded => tri!(r.do_seek(&mut self.rdr)) + let mut ret = [0u8; 27]; + ret[..4].copy_from_slice(b"OggS"); + let mut len = 0; + while len < 4 { + match (tri!(self.rdr.read_u8()), len) { + (b'O', _) => len = 1, + (b'g', 1) | (b'g', 2) | (b'S', 3) => len += 1, + _ => len = 0, } } - Ok(Some(r.into_header())) + tri!(self.rdr.read_exact(&mut ret[4..27])); + Ok(Some(ret)) } /// Parses and reads a new OGG page From 22c31379709866018729e29baed125bc07f057b2 Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Sun, 22 Nov 2020 04:45:59 +0900 Subject: [PATCH 02/17] Remove requirement for io::Seek for reader --- src/reading.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/reading.rs b/src/reading.rs index 50bf8e7..5d22556 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -537,13 +537,13 @@ consistent when it encounters the `WouldBlock` error kind. If you desire async functionality, consider enabling the `async` feature and look into the async module. */ -pub struct PacketReader { +pub struct PacketReader { rdr :T, base_pck_rdr :BasePacketReader, } -impl PacketReader { +impl PacketReader { /// Constructs a new `PacketReader` with a given `Read`. pub fn new(rdr :T) -> PacketReader { PacketReader { rdr, base_pck_rdr : BasePacketReader::new() } @@ -628,7 +628,9 @@ impl PacketReader { Ok(Some(tri!(pg_prs.parse_packet_data(packet_data)))) } +} +impl PacketReader { /// Seeks the underlying reader /// /// Seeks the reader that this PacketReader bases on by the specified From a197365188047079738e33a41f33874a1798571e Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Sun, 22 Nov 2020 06:30:40 +0900 Subject: [PATCH 03/17] Add additional notes about the reader on docs --- src/reading.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/reading.rs b/src/reading.rs index 5d22556..b9554c5 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -536,6 +536,9 @@ This reader is not async ready. It does not keep its internal state consistent when it encounters the `WouldBlock` error kind. If you desire async functionality, consider enabling the `async` feature and look into the async module. + +The reader passed to this packet reader should be buffered properly, +since `Read::read` will be called many times. */ pub struct PacketReader { rdr :T, From 5f4137498c8fb2802956cbbacf6911406c3791df Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Mon, 23 Nov 2020 22:47:30 +0900 Subject: [PATCH 04/17] Return None if no new page header is fully available ...instead of returning an error. --- src/reading.rs | 45 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/src/reading.rs b/src/reading.rs index b9554c5..6c233e0 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -20,6 +20,7 @@ use std::fmt::{Display, Formatter, Error as FmtError}; use std::mem::replace; use crc::vorbis_crc32_update; use Packet; +use std::io::Read; /// Error that can be raised when decoding an Ogg transport. #[derive(Debug)] @@ -527,6 +528,23 @@ impl BasePacketReader { } } +#[derive(Default)] +struct MagicFinder { + len: usize, +} +impl MagicFinder { + fn feed(&mut self, b: u8){ + match (b, self.len) { + (b'O', _) => self.len = 1, + (b'g', 1..=2) | (b'S', 3) => self.len += 1, + _ => self.len = 0, + } + } + fn found(&self) -> bool { + self.len == 4 + } +} + /** Reader for packets from an Ogg stream. @@ -597,16 +615,27 @@ impl PacketReader { fn read_until_pg_header(&mut self) -> Result, OggReadError> { let mut ret = [0u8; 27]; ret[..4].copy_from_slice(b"OggS"); - let mut len = 0; - while len < 4 { - match (tri!(self.rdr.read_u8()), len) { - (b'O', _) => len = 1, - (b'g', 1) | (b'g', 2) | (b'S', 3) => len += 1, - _ => len = 0, + let mut finder = MagicFinder::default(); + while !finder.found() { + let next = match self.rdr.by_ref().bytes().next() { + Some(b) => tri!(b), + None => return Ok(None), + }; + finder.feed(next); + } + let mut target = &mut ret[4..]; + loop { + let read = tri!(self.rdr.read(target)); + if read == 0 { + break; } + target = &mut target[read..]; + } + if target.is_empty() { + Ok(Some(ret)) + } else { + Ok(None) } - tri!(self.rdr.read_exact(&mut ret[4..27])); - Ok(Some(ret)) } /// Parses and reads a new OGG page From 8373a62ec8cab6d0f00745f81fb3c76e6908e963 Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Mon, 23 Nov 2020 22:54:07 +0900 Subject: [PATCH 05/17] Modify the code so that it compiles on 1.27.2 --- src/reading.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/reading.rs b/src/reading.rs index 6c233e0..0f5fc2d 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -623,15 +623,15 @@ impl PacketReader { }; finder.feed(next); } - let mut target = &mut ret[4..]; + let mut pos = 4; loop { - let read = tri!(self.rdr.read(target)); + let read = tri!(self.rdr.read(&mut ret[pos..])); if read == 0 { break; } - target = &mut target[read..]; + pos += read; } - if target.is_empty() { + if pos == ret.len() { Ok(Some(ret)) } else { Ok(None) From 764570a4e2a69f3ddddc797a781088e99689b7a2 Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Tue, 24 Nov 2020 23:37:16 +0900 Subject: [PATCH 06/17] Add a logic to find a new page after seek --- src/crc.rs | 176 +++++++++++++++++++++++++++++++++++++++++++++++-- src/reading.rs | 82 ++++++++++++++++++++++- 2 files changed, 252 insertions(+), 6 deletions(-) diff --git a/src/crc.rs b/src/crc.rs index 3b39861..3206cdc 100644 --- a/src/crc.rs +++ b/src/crc.rs @@ -11,6 +11,8 @@ Implementation of the CRC algorithm with the vorbis specific parameters and setup */ +use std::ops::{Add, Mul}; + // Lookup table to enable bytewise CRC32 calculation // Created using the crc32-table-generate example. // @@ -105,17 +107,141 @@ const fn lookup_array() -> [u32; 0x100] { } */ +/// An instance of polynomial quotient ring, +/// F_2[x] / x^32 + x^26 + x^23 + x^22 + x^16 + x^12 + x^11 + x^10 + x^8 + x^7 + x^5 + x^4 + x^2 + x + 1 +/// represented as a 32-bit unsigned integer. +/// The i-th least significant bit corresponds to the coefficient of x^i. +/// +/// This struct is introduced for the sake of readability. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct Crc32(pub u32); + +impl From for Crc32 { + fn from(a: u32) -> Self { + Crc32(a) + } +} +impl From for u32 { + fn from(a: Crc32) -> Self { + a.0 + } +} + +impl Add for Crc32 where C: Into { + type Output = Crc32; + + fn add(self, rhs: C) -> Self { + Crc32(self.0 ^ rhs.into()) + } +} + +/// An array such that X_N[n] = x^n on Crc32. +const X_N: &[u32] = &[ + 0x00000001, 0x00000002, 0x00000004, 0x00000008, + 0x00000010, 0x00000020, 0x00000040, 0x00000080, + 0x00000100, 0x00000200, 0x00000400, 0x00000800, + 0x00001000, 0x00002000, 0x00004000, 0x00008000, + 0x00010000, 0x00020000, 0x00040000, 0x00080000, + 0x00100000, 0x00200000, 0x00400000, 0x00800000, + 0x01000000, 0x02000000, 0x04000000, 0x08000000, + 0x10000000, 0x20000000, 0x40000000, 0x80000000, + 0x04c11db7, 0x09823b6e, 0x130476dc, 0x2608edb8, + 0x4c11db70, 0x9823b6e0, 0x34867077, 0x690ce0ee, + 0xd219c1dc, 0xa0f29e0f, 0x452421a9, 0x8a484352, + 0x10519b13, 0x20a33626, 0x41466c4c, 0x828cd898, + 0x01d8ac87, 0x03b1590e, 0x0762b21c, 0x0ec56438, + 0x1d8ac870, 0x3b1590e0, 0x762b21c0, 0xec564380, + 0xdc6d9ab7, 0xbc1a28d9, 0x7cf54c05, 0xf9ea980a, + 0xf7142da3, 0xeae946f1, 0xd1139055, 0xa6e63d1d, +]; + +impl Mul for Crc32 where C: Into { + type Output = Crc32; + fn mul(self, rhs: C) -> Self { + // Very slow algorithm, so-called "grade-school multiplication". + // Will be refined later. + let mut ret = 0; + let mut i = 0; + let rhs = rhs.into(); + while i < 32 { + let mut j = 0; + while j < 32 { + if (self.0 & 1 << i) != 0 && (rhs & 1 << j) != 0 { + ret ^= X_N[i + j]; + } + j += 1; + } + i += 1; + } + ret.into() + } +} + +impl Crc32 { + /// Given a polynomial of degree 7 rhs, calculates self * x^8 + rhs * x^32. + pub fn push(&self, rhs: u8) -> Self { + let ret = (self.0 << 8) ^ CRC_LOOKUP_ARRAY[rhs as usize ^ (self.0 >> 24) as usize]; + ret.into() + } + + /// Calculates self * x. + pub fn mul_x(&self) -> Self { + let (b, c) = self.0.overflowing_mul(2); + let ret = b ^ (0u32.wrapping_sub(c as u32) & 0x04c11db7); + ret.into() + } + + /// Calculates self * x^8. + pub fn mul_x8(&self) -> Self { + self.push(0) + } + + /// Given an integer n, calculates self * x^(8n) in a naive way. + /// The time complexity is O(n), and may be slow for large n. + pub fn mul_x8n(&self, mut n: usize) -> Self { + let mut ret = *self; + while n > 0 { + ret = ret.mul_x8(); + n -= 1; + } + ret + } +} + +/// An array such that X8_2_N[n] = (x^8)^(2^n) on Crc32. +const X8_2_N: &[u32] = &[ + 0x00000100, 0x00010000, 0x04c11db7, 0x490d678d, + 0xe8a45605, 0x75be46b7, 0xe6228b11, 0x567fddeb, + 0x88fe2237, 0x0e857e71, 0x7001e426, 0x075de2b2, + 0xf12a7f90, 0xf0b4a1c1, 0x58f46c0c, 0xc3395ade, + 0x96837f8c, 0x544037f9, 0x23b7b136, 0xb2e16ba8, +]; + +impl Crc32 { + /// Given a non-negative integer n, calculates x^(8n). + /// It must be n < 2^20, othrwise it panics. + pub fn x_8n(mut n: usize) -> Crc32 { + assert!(n < 1<<20); + let mut ret = Crc32(1); + let mut i = 0; + while n > 0 { + if n & 1 > 0 { + ret = ret * X8_2_N[i]; + } + n /= 2; + i += 1; + } + ret + } +} + #[cfg(test)] pub fn vorbis_crc32(array :&[u8]) -> u32 { return vorbis_crc32_update(0, array); } pub fn vorbis_crc32_update(cur :u32, array :&[u8]) -> u32 { - let mut ret :u32 = cur; - for av in array { - ret = (ret << 8) ^ CRC_LOOKUP_ARRAY[(*av as u32 ^ (ret >> 24)) as usize]; - } - return ret; + array.iter().fold(Crc32(cur), |cur, &x| cur.push(x)).0 } #[test] @@ -139,3 +265,43 @@ fn test_crc32() { assert_eq!(vorbis_crc32(test_arr), 0x3d4e946d); assert_eq!(vorbis_crc32(&test_arr[0 .. 27]), 0x7b374db8); } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_mul_x() { + for (i, &a) in CRC_LOOKUP_ARRAY.iter().enumerate() { + let mut x = Crc32::from(i as u32); + for _ in 0..32 { + x = x.mul_x(); + } + assert_eq!(a, x.0); + } + } + + fn x_8n_naive(n: usize) -> Crc32 { + let mut ret = Crc32(1); + for _ in 0..n { + ret = ret.push(0); + } + ret + } + + #[test] + fn test_x_8n() { + for i in 0..100 { + assert_eq!(x_8n_naive(i), Crc32::x_8n(i)); + } + assert_eq!(x_8n_naive(12345), Crc32::x_8n(12345)); + } + + #[test] + fn test_mul_x8n() { + let a = Crc32(0xa1b2c3d4); + for i in 0..100 { + assert_eq!(a * Crc32::x_8n(i), a.mul_x8n(i)); + } + } +} diff --git a/src/reading.rs b/src/reading.rs index 0f5fc2d..3591d3c 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -18,9 +18,11 @@ use std::collections::HashMap; use std::collections::hash_map::Entry; use std::fmt::{Display, Formatter, Error as FmtError}; use std::mem::replace; -use crc::vorbis_crc32_update; +use crc::{vorbis_crc32_update, Crc32}; use Packet; use std::io::Read; +use std::cmp::Reverse; +use std::collections::{VecDeque, binary_heap::{BinaryHeap, PeekMut}, BTreeSet}; /// Error that can be raised when decoding an Ogg transport. #[derive(Debug)] @@ -543,6 +545,84 @@ impl MagicFinder { fn found(&self) -> bool { self.len == 4 } + fn match_len(&self) -> usize { + self.len + } +} + + +pub fn find_next_page(reader: &mut R) -> Result, OggReadError> { + let capacity = 26 + 255 + 255 * 255; + + let mut bytes_buffer = VecDeque::with_capacity(capacity); + let mut cumulative_sums = VecDeque::with_capacity(capacity); + cumulative_sums.push_back(0usize); + let mut cumulative_crcs = VecDeque::with_capacity(capacity); + cumulative_crcs.push_back(Crc32(0)); + + let mut magic_finder = MagicFinder::default(); + let mut page_begin_queue = BTreeSet::new(); + let mut page_segments_queue = VecDeque::new(); + let mut segment_table_end_queue = BinaryHeap::new(); + let mut page_end_queue = BinaryHeap::new(); + for (i, b) in reader.bytes().enumerate() { + let b = tri!(b); + bytes_buffer.push_back(b); + let sum_next = cumulative_sums.front().unwrap().wrapping_add(b as usize); + cumulative_sums.push_front(sum_next); + let crc_next = cumulative_crcs.front().unwrap().push(b); + cumulative_crcs.push_front(crc_next); + + let i = i + 1; + + magic_finder.feed(b); + if magic_finder.found() { + let begin = i.checked_sub(4).expect("at least four bytes are read after magic is found"); + page_begin_queue.insert(begin); + page_segments_queue.push_back((begin + 27, begin)); + } + + if page_segments_queue.front().map_or(false, |t| t.0 == i) { + let begin = page_segments_queue.pop_front().expect("an element always exists").1; + let table_len = b as usize; + segment_table_end_queue.push(Reverse((i + table_len, table_len, begin))); + } + + while let Some(peek) = segment_table_end_queue.peek_mut().filter(|t| (t.0).0 == i) { + let (_, table_len, page_begin) = PeekMut::pop(peek).0; + let page_content_len = cumulative_sums[0].wrapping_sub(cumulative_sums[table_len]); + page_end_queue.push(Reverse((i + page_content_len, page_begin))); + } + + while let Some(peek) = page_end_queue.peek_mut().filter(|t| (t.0).0 == i) { + let (_, page_begin) = PeekMut::pop(peek).0; + let page_len = i - page_begin; + // a = 2^(8 * (page_len - 26)) + // b = 2^(8 * (page_len - 22)) + // c = 2^(8 * page_len) + let a = Crc32::x_8n(page_len - 26); + let b = a.mul_x8n(4); + let c = b.mul_x8n(22); // TODO: Which is faster, pushing 22 times or multiplying at once? + let crc_calculated = cumulative_crcs[0] + + cumulative_crcs[page_len - 26] * a + + cumulative_crcs[page_len - 22] * b + + cumulative_crcs[page_len] * c; + let crc_input = bytes_buffer.iter().skip(bytes_buffer.len() - page_len + 22).take(4); + if crc_calculated.0.to_le_bytes().iter().zip(crc_input).all(|(x, y)| x == y) { + return Ok(Some((page_begin, page_len))); + } + assert!(page_begin_queue.remove(&page_begin)); + return Ok(None); + } + + let buffer_begin = i - bytes_buffer.len(); + let drain_end = page_begin_queue.iter().next().copied().unwrap_or(i - magic_finder.match_len()); + bytes_buffer.drain(..drain_end - buffer_begin); + let retain_len = i - drain_end; + cumulative_sums.drain(retain_len + 1..); + cumulative_crcs.drain(retain_len + 1..); + } + Ok(None) } /** From 2699b0730bdc9b23498ed40f86b45aa9f762c0aa Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Thu, 26 Nov 2020 00:17:43 +0900 Subject: [PATCH 07/17] Make compilable in 1.27.2 --- src/backports.rs | 32 ++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/reading.rs | 4 ++++ 3 files changed, 37 insertions(+) create mode 100644 src/backports.rs diff --git a/src/backports.rs b/src/backports.rs new file mode 100644 index 0000000..b18be8e --- /dev/null +++ b/src/backports.rs @@ -0,0 +1,32 @@ +// Copied from standback https://github.com/jhpratt/standback + +pub trait Sealed {} +impl Sealed for T {} + +#[allow(non_camel_case_types)] +pub trait Option_v1_35<'a, T: Copy + 'a>: Sealed> { + fn copied(self) -> Option; +} + +impl<'a, T: Copy + 'a> Option_v1_35<'a, T> for Option<&'a T> { + fn copied(self) -> Option { + self.map(|&t| t) + } +} + +#[allow(non_camel_case_types)] +pub trait u32_v1_32: Sealed { + fn to_le_bytes(self) -> [u8; 4]; +} + +impl u32_v1_32 for u32 { + fn to_le_bytes(self) -> [u8; 4] { + // TODO: better implementation? + [ + (self & 0xff) as u8, + ((self >> 8) & 0xff) as u8, + ((self >> 16) & 0xff) as u8, + ((self >> 24) & 0xff) as u8, + ] + } +} diff --git a/src/lib.rs b/src/lib.rs index 70429c4..1e47fcb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,6 +36,7 @@ macro_rules! tri { }; } +mod backports; mod crc; pub mod reading; pub mod writing; diff --git a/src/reading.rs b/src/reading.rs index 3591d3c..6926ede 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -23,6 +23,8 @@ use Packet; use std::io::Read; use std::cmp::Reverse; use std::collections::{VecDeque, binary_heap::{BinaryHeap, PeekMut}, BTreeSet}; +#[allow(unused_imports)] +use backports::{Option_v1_35, u32_v1_32}; /// Error that can be raised when decoding an Ogg transport. #[derive(Debug)] @@ -568,8 +570,10 @@ pub fn find_next_page(reader: &mut R) -> Result, for (i, b) in reader.bytes().enumerate() { let b = tri!(b); bytes_buffer.push_back(b); + // TODO the following two lines can be written at once in the newer compiler let sum_next = cumulative_sums.front().unwrap().wrapping_add(b as usize); cumulative_sums.push_front(sum_next); + // TODO the following two lines can be written at once in the newer compiler let crc_next = cumulative_crcs.front().unwrap().push(b); cumulative_crcs.push_front(crc_next); From 9e5b7c902efd74a17713d144789186620e59785f Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Fri, 12 Feb 2021 21:59:07 +0900 Subject: [PATCH 08/17] WIP --- src/reading.rs | 197 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 192 insertions(+), 5 deletions(-) diff --git a/src/reading.rs b/src/reading.rs index 6926ede..54fe268 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -13,6 +13,7 @@ Reading logic use std::error; use std::io; use std::io::{Cursor, Write, SeekFrom, Error, ErrorKind}; +use std::ops::{RangeBounds, Bound}; use byteorder::{ReadBytesExt, LittleEndian}; use std::collections::HashMap; use std::collections::hash_map::Entry; @@ -25,6 +26,7 @@ use std::cmp::Reverse; use std::collections::{VecDeque, binary_heap::{BinaryHeap, PeekMut}, BTreeSet}; #[allow(unused_imports)] use backports::{Option_v1_35, u32_v1_32}; +use std::convert::TryInto; /// Error that can be raised when decoding an Ogg transport. #[derive(Debug)] @@ -552,8 +554,46 @@ impl MagicFinder { } } +pub struct CapturedPage { + offset: u64, + bytes_buffer: VecDeque +} + +impl CapturedPage { + fn stream_serial(&self) -> u32 { + u32::from_le_bytes( + self.bytes_buffer + .iter() + .skip(14) + .take(4) + .copied() + .collect::>() + .try_into() + .expect("Bytes_buffer should be a valid captured page") + ) + } + fn absolute_granule_position(&self) -> Option { + let res = u64::from_le_bytes( + self.bytes_buffer + .iter() + .skip(6) + .take(8) + .copied() + .collect::>() + .try_into() + .expect("Bytes_buffer should be a valid captured page") + ); + if res == !0 { None } else { Some(res) } + } + fn len(&self) -> u64 { + self.bytes_buffer.len() as u64 + } +} -pub fn find_next_page(reader: &mut R) -> Result, OggReadError> { +pub fn find_next_page( + reader: &mut R, + limit: Option, +) -> Result, OggReadError> { let capacity = 26 + 255 + 255 * 255; let mut bytes_buffer = VecDeque::with_capacity(capacity); @@ -567,7 +607,11 @@ pub fn find_next_page(reader: &mut R) -> Result, let mut page_segments_queue = VecDeque::new(); let mut segment_table_end_queue = BinaryHeap::new(); let mut page_end_queue = BinaryHeap::new(); - for (i, b) in reader.bytes().enumerate() { + for (i, b) in reader + .bytes() + .enumerate() + .take_while(|&(i, _)| limit.map_or(true, |limit| i < limit as usize)) + { let b = tri!(b); bytes_buffer.push_back(b); // TODO the following two lines can be written at once in the newer compiler @@ -611,9 +655,20 @@ pub fn find_next_page(reader: &mut R) -> Result, + cumulative_crcs[page_len - 26] * a + cumulative_crcs[page_len - 22] * b + cumulative_crcs[page_len] * c; - let crc_input = bytes_buffer.iter().skip(bytes_buffer.len() - page_len + 22).take(4); - if crc_calculated.0.to_le_bytes().iter().zip(crc_input).all(|(x, y)| x == y) { - return Ok(Some((page_begin, page_len))); + let crc_matches = { + // TODO borrow checker of newer version does not require this block + let crc_input = bytes_buffer.iter().skip(bytes_buffer.len() - page_len + 22).take(4); + crc_calculated.0.to_le_bytes().iter().zip(crc_input).all(|(x, y)| x == y) + }; + if crc_matches { + // TODO borrow checker of newer version does not require this variable + let drain_len = bytes_buffer.len() - page_len; + bytes_buffer.drain(..drain_len); + let ret = CapturedPage { + offset: page_begin as u64, + bytes_buffer, + }; + return Ok(Some(ret)); } assert!(page_begin_queue.remove(&page_begin)); return Ok(None); @@ -629,6 +684,35 @@ pub fn find_next_page(reader: &mut R) -> Result, Ok(None) } +struct PageIterator { + reader: R, + stream_serial: Option, + limit: u64, +} + +impl PageIterator { + fn new(reader: R, stream_serial: Option, limit: u64) -> Self { + Self { + reader, + stream_serial, + limit, + } + } +} + +impl Iterator for PageIterator { + type Item = Result; + + fn next(&mut self) -> Option { + let page = match find_next_page(&mut self.reader, Some(self.limit)) { + Err(e) => return Some(Err(e)), + Ok(None) => return None, + Ok(Some(page)) => page, + }; + let reader = PageReader:: + } +} + /** Reader for packets from an Ogg stream. @@ -947,6 +1031,105 @@ impl PacketReader { } } } + + /// Seek the reader by the absolute granule position. + /// + /// This function first find the page such that: + /// * the absolute granule position specified in the page header + /// is less than or equal to that specified in the argument, + /// * at least one packet ends within the page, + /// * the stream serial specified in the page header + /// is equal to that specified in the argument, if any, and + /// * the entire page is within the specified range of bytes. + /// (To search entire stream, specify `..` as the `range` argument`.) + /// + /// Then, the internal reader state is set up so that + /// the next packet yieleded by `read_packet` or `read_packet_expected` is such packet that: + /// it is the first packet that ends within the page later than the page found above. + /// + /// If such page mentioned above was found, + /// this function returns the absolute granule position + /// specified in the page header of the page. + /// If no such page was found, a `None` value is returned instead, + /// which means that the reader seek to the beginning of specified range. + /// + /// Note that, when a stream_serial is specified, + /// pages with the stream serial should be "uniformly distributed" within the range; + /// otherwise this function may be less performant. + /// For example, it is ok to specify the "entire" range + /// if the file has only one logical stream + /// or several streams are multiplexed uniformly; + /// on the other hand, if the file has multiple chained logical stream and + /// you are focusing only one of them, + /// it is not recommended to specify the entire range; + /// instead, you should first find the "bounds" of the stream, + /// that is, where the stream starts and ends, + /// and specify the appropriate range. + pub fn seek_absgp_le( + &mut self, absgp: u64, stream_serial: Option, range: R + ) -> Result + where + R: RangeBounds + { + let mut seek_begin = match range.start_bound() { + Bound::Included(&a) => a, + Bound::Excluded(&a) => a+1, // TODO excluded bound should not be permitted + Bound::Unbounded => 0, + }; + let mut seek_end = match range.end_bound() { + Bound::Excluded(&a) => a, + Bound::Included(&a) => a+1, // TODO included bound should not be permitted + Bound::Unbounded => tri!(self.rdr.seek(SeekFrom::End(0))), + }; + + // lb: lower bound + tri!(self.rdr.seek(SeekFrom::Start(seek_begin))); + let mut reader = PageIterator::new(self.rdr, stream_serial, seek_end); + let first_page = match reader.next() { + None => return Ok(SeekResult::NoPageFound), + Some(page) => tri!(page), + }; + let first_absgp = first_page.absolute_granule_position().unwrap_or_else(|| { + }); + // ub: upper bound + + let mut next_page_begin = seek_end; + let mut target_page = None; + + // Narrow down the range of positions where the target page begins, + // until it is narrow enough and it is faster to search linearly. + 'bisect: while seek_end - seek_begin > 1 { + let seek_pos = seek_begin + (seek_end - seek_begin) / 2; + tri!(self.rdr.seek(SeekFrom::Start(seek_pos))); + let mut current_pos = seek_pos; + while let Some(next_page) = tri!(find_next_page( + &mut self.rdr, + Some(next_page_begin - current_pos) + )) { + let page_begin = seek_pos + next_page.offset; + let page_end = page_begin + next_page.len(); + current_pos = page_end; + if stream_serial.map_or(false, |s| s != next_page.stream_serial()) { + continue; + } + let page_absgp = match next_page.absolute_granule_position() { + None => continue, + Some(absgp) => absgp, + }; + if page_absgp <= absgp { + target_page = Some((page_begin, next_page)); + seek_begin = page_end; + } else { + next_page_begin = page_begin; + seek_end = seek_pos; + } + continue 'bisect; + } + } + + Ok(None) + } + /// Resets the internal state by deleting all /// unread packets. pub fn delete_unread_packets(&mut self) { @@ -954,6 +1137,10 @@ impl PacketReader { } } +enum SeekResult { + NoPageFound, +} + // util function fn seek_before_end(mut rdr :T, offs :u64) -> Result { From b35ac4b51e2fb7cc7044c185820caef5e3079854 Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Fri, 12 Feb 2021 23:46:13 +0900 Subject: [PATCH 09/17] WIP --- src/reading.rs | 129 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 90 insertions(+), 39 deletions(-) diff --git a/src/reading.rs b/src/reading.rs index 54fe268..6d92be3 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -588,6 +588,12 @@ impl CapturedPage { fn len(&self) -> u64 { self.bytes_buffer.len() as u64 } + fn begin_pos(&self) -> u64 { + self.offset + } + fn end_pos(&self) -> u64 { + self.offset + self.len() + } } pub fn find_next_page( @@ -684,14 +690,14 @@ pub fn find_next_page( Ok(None) } -struct PageIterator { - reader: R, +struct PageIterator<'a, R: Read> { + reader: &'a mut R, stream_serial: Option, limit: u64, } -impl PageIterator { - fn new(reader: R, stream_serial: Option, limit: u64) -> Self { +impl <'a, R: Read> PageIterator<'a, R> { + fn new(reader: &'a mut R, stream_serial: Option, limit: u64) -> Self { Self { reader, stream_serial, @@ -700,7 +706,7 @@ impl PageIterator { } } -impl Iterator for PageIterator { +impl <'a, R: Read> Iterator for PageIterator<'a, R> { type Item = Result; fn next(&mut self) -> Option { @@ -709,7 +715,9 @@ impl Iterator for PageIterator { Ok(None) => return None, Ok(Some(page)) => page, }; - let reader = PageReader:: + // let reader = PageReader:: + // TODO when returning a page, its offset should be in stream-coordinated + todo!() } } @@ -1084,50 +1092,91 @@ impl PacketReader { // lb: lower bound tri!(self.rdr.seek(SeekFrom::Start(seek_begin))); - let mut reader = PageIterator::new(self.rdr, stream_serial, seek_end); + let mut reader = PageIterator::new(&mut self.rdr, stream_serial, seek_end); let first_page = match reader.next() { None => return Ok(SeekResult::NoPageFound), Some(page) => tri!(page), }; - let first_absgp = first_page.absolute_granule_position().unwrap_or_else(|| { - }); - // ub: upper bound + if let Some(first_page_absgp) = first_page.absolute_granule_position() { + if absgp < first_page_absgp { + // TODO Seek to the first page + return Ok(SeekResult::SeekedToFirstPage); + } + // Otherwise, we can discard all the packets in the first page. + } else { + let next_page = reader.find(|page| match page { + Err(_) => true, + Ok(page) => page.absolute_granule_position().is_some(), + }); + let first_page_with_absgp = match next_page { + None => return Ok(SeekResult::NoPacketEnd), + Some(page) => tri!(page), + }; + let first_absgp = first_page_with_absgp + .absolute_granule_position() + .expect("It is provable that absgp always exists"); + if absgp < first_absgp { + // TODO Seek to the first page + return Ok(SeekResult::SeekedToFirstPage); + } + } + let mut lb_page_end = tri!(self.rdr.seek(SeekFrom::Current(0))); + + let mut ub_pos = seek_end; + let mut ub_page_first = seek_end; - let mut next_page_begin = seek_end; - let mut target_page = None; + let linear_search_threshold = 256 * 256; - // Narrow down the range of positions where the target page begins, - // until it is narrow enough and it is faster to search linearly. - 'bisect: while seek_end - seek_begin > 1 { - let seek_pos = seek_begin + (seek_end - seek_begin) / 2; + while lb_page_end < ub_pos { + let seek_pos = if ub_pos - lb_page_end < linear_search_threshold { + lb_page_end + } else { + (lb_page_end + ub_pos) / 2 + }; tri!(self.rdr.seek(SeekFrom::Start(seek_pos))); - let mut current_pos = seek_pos; - while let Some(next_page) = tri!(find_next_page( - &mut self.rdr, - Some(next_page_begin - current_pos) - )) { - let page_begin = seek_pos + next_page.offset; - let page_end = page_begin + next_page.len(); - current_pos = page_end; - if stream_serial.map_or(false, |s| s != next_page.stream_serial()) { + + let first_page = match tri!(find_next_page(&mut self.rdr, Some(ub_page_first))) { + None => { + ub_pos = seek_pos; continue; + }, + Some(page) => page, + }; + let first_page_begin_pos = first_page.begin_pos(); + + let target_page = if stream_serial.map_or(true, |s| s == first_page.stream_serial()) + && first_page.absolute_granule_position().is_some() + { + first_page + } else { + let mut reader = PageIterator::new(&mut self.rdr, stream_serial, ub_page_first); + let next_page = reader.find(|page| match page { + Err(_) => true, + Ok(page) => page.absolute_granule_position().is_some(), + }); + match next_page { + None => { + ub_pos = seek_pos; + ub_page_first = first_page_begin_pos; + continue; + }, + Some(page) => tri!(page), } - let page_absgp = match next_page.absolute_granule_position() { - None => continue, - Some(absgp) => absgp, - }; - if page_absgp <= absgp { - target_page = Some((page_begin, next_page)); - seek_begin = page_end; - } else { - next_page_begin = page_begin; - seek_end = seek_pos; - } - continue 'bisect; + }; + + let target_absgp = target_page + .absolute_granule_position() + .expect("It is provable that absgp always exists"); + if absgp < target_absgp { + ub_pos = seek_pos; + ub_page_first = first_page_begin_pos; + ub_target_page_first = target_page.begin_pos(); + } else { + lb_page_end = target_page.end_pos(); } } - Ok(None) + todo!() } /// Resets the internal state by deleting all @@ -1137,8 +1186,10 @@ impl PacketReader { } } -enum SeekResult { +pub enum SeekResult { NoPageFound, + NoPacketEnd, + SeekedToFirstPage, } // util function From d5dc032d09749cb99ad43b34b7c689162b809f7b Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Sat, 13 Feb 2021 04:24:02 +0900 Subject: [PATCH 10/17] Modified seek --- src/reading.rs | 113 +++++++++++++++++++++++++------------------------ 1 file changed, 58 insertions(+), 55 deletions(-) diff --git a/src/reading.rs b/src/reading.rs index 6d92be3..77e5982 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -560,7 +560,7 @@ pub struct CapturedPage { } impl CapturedPage { - fn stream_serial(&self) -> u32 { + pub fn stream_serial(&self) -> u32 { u32::from_le_bytes( self.bytes_buffer .iter() @@ -572,7 +572,7 @@ impl CapturedPage { .expect("Bytes_buffer should be a valid captured page") ) } - fn absolute_granule_position(&self) -> Option { + pub fn absolute_granule_position(&self) -> Option { let res = u64::from_le_bytes( self.bytes_buffer .iter() @@ -585,13 +585,13 @@ impl CapturedPage { ); if res == !0 { None } else { Some(res) } } - fn len(&self) -> u64 { + pub fn len(&self) -> u64 { self.bytes_buffer.len() as u64 } - fn begin_pos(&self) -> u64 { + pub fn begin_pos(&self) -> u64 { self.offset } - fn end_pos(&self) -> u64 { + pub fn end_pos(&self) -> u64 { self.offset + self.len() } } @@ -693,14 +693,16 @@ pub fn find_next_page( struct PageIterator<'a, R: Read> { reader: &'a mut R, stream_serial: Option, + offset: u64, limit: u64, } impl <'a, R: Read> PageIterator<'a, R> { - fn new(reader: &'a mut R, stream_serial: Option, limit: u64) -> Self { + fn new(reader: &'a mut R, stream_serial: Option, offset: u64, limit: u64) -> Self { Self { reader, stream_serial, + offset, limit, } } @@ -710,14 +712,19 @@ impl <'a, R: Read> Iterator for PageIterator<'a, R> { type Item = Result; fn next(&mut self) -> Option { - let page = match find_next_page(&mut self.reader, Some(self.limit)) { - Err(e) => return Some(Err(e)), - Ok(None) => return None, - Ok(Some(page)) => page, - }; - // let reader = PageReader:: - // TODO when returning a page, its offset should be in stream-coordinated - todo!() + loop { + let mut page = match find_next_page(&mut self.reader, Some(self.limit - self.offset)) { + Err(e) => return Some(Err(e)), + Ok(None) => return None, + Ok(Some(page)) => page, + }; + let begin_pos = self.offset + page.offset; + self.offset += page.end_pos(); + page.offset = begin_pos; + if self.stream_serial.map_or(true, |s| s == page.stream_serial()) { + break Some(Ok(page)); + } + } } } @@ -1075,55 +1082,28 @@ impl PacketReader { /// and specify the appropriate range. pub fn seek_absgp_le( &mut self, absgp: u64, stream_serial: Option, range: R - ) -> Result + ) -> Result, OggReadError> where R: RangeBounds { - let mut seek_begin = match range.start_bound() { + let seek_begin = match range.start_bound() { Bound::Included(&a) => a, Bound::Excluded(&a) => a+1, // TODO excluded bound should not be permitted Bound::Unbounded => 0, }; - let mut seek_end = match range.end_bound() { + let seek_end = match range.end_bound() { Bound::Excluded(&a) => a, Bound::Included(&a) => a+1, // TODO included bound should not be permitted Bound::Unbounded => tri!(self.rdr.seek(SeekFrom::End(0))), }; // lb: lower bound - tri!(self.rdr.seek(SeekFrom::Start(seek_begin))); - let mut reader = PageIterator::new(&mut self.rdr, stream_serial, seek_end); - let first_page = match reader.next() { - None => return Ok(SeekResult::NoPageFound), - Some(page) => tri!(page), - }; - if let Some(first_page_absgp) = first_page.absolute_granule_position() { - if absgp < first_page_absgp { - // TODO Seek to the first page - return Ok(SeekResult::SeekedToFirstPage); - } - // Otherwise, we can discard all the packets in the first page. - } else { - let next_page = reader.find(|page| match page { - Err(_) => true, - Ok(page) => page.absolute_granule_position().is_some(), - }); - let first_page_with_absgp = match next_page { - None => return Ok(SeekResult::NoPacketEnd), - Some(page) => tri!(page), - }; - let first_absgp = first_page_with_absgp - .absolute_granule_position() - .expect("It is provable that absgp always exists"); - if absgp < first_absgp { - // TODO Seek to the first page - return Ok(SeekResult::SeekedToFirstPage); - } - } - let mut lb_page_end = tri!(self.rdr.seek(SeekFrom::Current(0))); - + let mut lb_page_begin = seek_begin; + let mut lb_page_end = seek_begin; + let mut lb_absgp = None; + // ub: upper bound let mut ub_pos = seek_end; - let mut ub_page_first = seek_end; + let mut ub_page_begin = seek_end; let linear_search_threshold = 256 * 256; @@ -1134,14 +1114,18 @@ impl PacketReader { (lb_page_end + ub_pos) / 2 }; tri!(self.rdr.seek(SeekFrom::Start(seek_pos))); + println!("{} {} {} {} {}", lb_page_begin, lb_page_end, seek_pos, ub_pos, ub_page_begin); - let first_page = match tri!(find_next_page(&mut self.rdr, Some(ub_page_first))) { + let mut first_page = match tri!(find_next_page(&mut self.rdr, Some(ub_page_begin))) { None => { + println!("\tNo first page"); ub_pos = seek_pos; continue; }, Some(page) => page, }; + first_page.offset += seek_pos; + let first_page = first_page; let first_page_begin_pos = first_page.begin_pos(); let target_page = if stream_serial.map_or(true, |s| s == first_page.stream_serial()) @@ -1149,15 +1133,21 @@ impl PacketReader { { first_page } else { - let mut reader = PageIterator::new(&mut self.rdr, stream_serial, ub_page_first); + let mut reader = PageIterator::new( + &mut self.rdr, + stream_serial, + first_page.end_pos(), + ub_page_begin, + ); let next_page = reader.find(|page| match page { Err(_) => true, Ok(page) => page.absolute_granule_position().is_some(), }); match next_page { None => { + println!("\tNo next page"); ub_pos = seek_pos; - ub_page_first = first_page_begin_pos; + ub_page_begin = first_page_begin_pos; continue; }, Some(page) => tri!(page), @@ -1167,16 +1157,29 @@ impl PacketReader { let target_absgp = target_page .absolute_granule_position() .expect("It is provable that absgp always exists"); + println!("\tabsgp: {}", target_absgp); if absgp < target_absgp { + println!("\tbefore here"); ub_pos = seek_pos; - ub_page_first = first_page_begin_pos; - ub_target_page_first = target_page.begin_pos(); + ub_page_begin = first_page_begin_pos; } else { + println!("\tafter here"); + lb_page_begin = target_page.begin_pos(); lb_page_end = target_page.end_pos(); + lb_absgp = Some(target_absgp); } } - todo!() + println!("Seek to {}", lb_page_begin); + tri!(self.seek_bytes(SeekFrom::Start(lb_page_begin))); + if lb_absgp.is_some() { + let page = tri!(self.read_ogg_page()) + .expect("If lb_absgp is Some, then there is a page right after lb_page_begin"); + tri!(self.base_pck_rdr.push_page(page)); + // Consume packets in the current page + std::iter::repeat_with(|| self.base_pck_rdr.read_packet()).find(Option::is_none); + } + Ok(lb_absgp) } /// Resets the internal state by deleting all From ca375df1fe45fc147375f738a656f9dc4fa41e7c Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Sun, 14 Feb 2021 14:09:21 +0900 Subject: [PATCH 11/17] Comment out --- src/reading.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/reading.rs b/src/reading.rs index 77e5982..0104c56 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -716,7 +716,7 @@ impl <'a, R: Read> Iterator for PageIterator<'a, R> { let mut page = match find_next_page(&mut self.reader, Some(self.limit - self.offset)) { Err(e) => return Some(Err(e)), Ok(None) => return None, - Ok(Some(page)) => page, + Ok(Some(page)) => page, }; let begin_pos = self.offset + page.offset; self.offset += page.end_pos(); @@ -1061,9 +1061,9 @@ impl PacketReader { /// Then, the internal reader state is set up so that /// the next packet yieleded by `read_packet` or `read_packet_expected` is such packet that: /// it is the first packet that ends within the page later than the page found above. - /// - /// If such page mentioned above was found, - /// this function returns the absolute granule position + /// + /// If such page mentioned above was found, + /// this function returns the absolute granule position /// specified in the page header of the page. /// If no such page was found, a `None` value is returned instead, /// which means that the reader seek to the beginning of specified range. @@ -1077,7 +1077,7 @@ impl PacketReader { /// on the other hand, if the file has multiple chained logical stream and /// you are focusing only one of them, /// it is not recommended to specify the entire range; - /// instead, you should first find the "bounds" of the stream, + /// instead, you should first find the "bounds" of the stream, /// that is, where the stream starts and ends, /// and specify the appropriate range. pub fn seek_absgp_le( @@ -1114,11 +1114,11 @@ impl PacketReader { (lb_page_end + ub_pos) / 2 }; tri!(self.rdr.seek(SeekFrom::Start(seek_pos))); - println!("{} {} {} {} {}", lb_page_begin, lb_page_end, seek_pos, ub_pos, ub_page_begin); + // println!("{} {} {} {} {}", lb_page_begin, lb_page_end, seek_pos, ub_pos, ub_page_begin); let mut first_page = match tri!(find_next_page(&mut self.rdr, Some(ub_page_begin))) { None => { - println!("\tNo first page"); + // println!("\tNo first page"); ub_pos = seek_pos; continue; }, @@ -1134,8 +1134,8 @@ impl PacketReader { first_page } else { let mut reader = PageIterator::new( - &mut self.rdr, - stream_serial, + &mut self.rdr, + stream_serial, first_page.end_pos(), ub_page_begin, ); @@ -1145,7 +1145,7 @@ impl PacketReader { }); match next_page { None => { - println!("\tNo next page"); + // println!("\tNo next page"); ub_pos = seek_pos; ub_page_begin = first_page_begin_pos; continue; @@ -1157,20 +1157,20 @@ impl PacketReader { let target_absgp = target_page .absolute_granule_position() .expect("It is provable that absgp always exists"); - println!("\tabsgp: {}", target_absgp); + // println!("\tabsgp: {}", target_absgp); if absgp < target_absgp { - println!("\tbefore here"); + // println!("\tbefore here"); ub_pos = seek_pos; ub_page_begin = first_page_begin_pos; } else { - println!("\tafter here"); + // println!("\tafter here"); lb_page_begin = target_page.begin_pos(); lb_page_end = target_page.end_pos(); lb_absgp = Some(target_absgp); } } - println!("Seek to {}", lb_page_begin); + // println!("Seek to {}", lb_page_begin); tri!(self.seek_bytes(SeekFrom::Start(lb_page_begin))); if lb_absgp.is_some() { let page = tri!(self.read_ogg_page()) From 39ca3e350a319b23bd263059e1e1dbd14c035b0d Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Sun, 14 Feb 2021 16:15:16 +0900 Subject: [PATCH 12/17] Add comments --- src/reading.rs | 72 +++++++++++++++++++++++++++----------------------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/src/reading.rs b/src/reading.rs index 0104c56..e603dc0 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -1047,40 +1047,46 @@ impl PacketReader { } } - /// Seek the reader by the absolute granule position. + /// Seeks the reader by the absolute granule position. /// - /// This function first find the page such that: - /// * the absolute granule position specified in the page header - /// is less than or equal to that specified in the argument, - /// * at least one packet ends within the page, - /// * the stream serial specified in the page header - /// is equal to that specified in the argument, if any, and - /// * the entire page is within the specified range of bytes. - /// (To search entire stream, specify `..` as the `range` argument`.) + /// This functions are intersted in such packet that: + /// - belongs to the logical stream specified by `stream_serial`, if specified, and + /// - ends in a page which entirely spans within the given range. + /// Let us call such packet "interesting." /// - /// Then, the internal reader state is set up so that - /// the next packet yieleded by `read_packet` or `read_packet_expected` is such packet that: - /// it is the first packet that ends within the page later than the page found above. + /// Among those packets, let us call a packet is "old" if the packet whose + /// absolute granule position is less than or equal to the provided `absgp`. + /// By calling this function, + /// the internal state of this reader is equivalent to the state + /// right after reading the last "old" packet. /// - /// If such page mentioned above was found, - /// this function returns the absolute granule position - /// specified in the page header of the page. - /// If no such page was found, a `None` value is returned instead, - /// which means that the reader seek to the beginning of specified range. + /// Therefore, after calling this function and before seeking explicitly elsewhere, + /// all the "interesting" packets will have the absolute granule position + /// greater than the specified `absgp`. /// - /// Note that, when a stream_serial is specified, - /// pages with the stream serial should be "uniformly distributed" within the range; - /// otherwise this function may be less performant. - /// For example, it is ok to specify the "entire" range - /// if the file has only one logical stream - /// or several streams are multiplexed uniformly; - /// on the other hand, if the file has multiple chained logical stream and - /// you are focusing only one of them, - /// it is not recommended to specify the entire range; - /// instead, you should first find the "bounds" of the stream, - /// that is, where the stream starts and ends, - /// and specify the appropriate range. - pub fn seek_absgp_le( + /// This function use binary searching to find appropriate position to seek. + /// Therefore, the following conditions should be satisfied: + /// - The absolute granule position specified should monotonically increase + /// (except for those pages without a packet end). + /// If `stream_serial` is fixed, + /// or there are only one logical stream within the specified range, + /// then this condition is naturally fulfilled. + /// However, if `stream_serial` is `None` + /// and there are more than one logical stream within the range, + /// you should be careful of the possibility of non-monotonicity. + /// - When `stream_serial` is specified, + /// the pages with the stream serial should be "well-distributed" within the range; + /// otherwise, this function may be less performant. + /// For example, it is ok to specify the "entire" range + /// if the file has only one logical stream + /// or several streams are multiplexed uniformly; + /// on the other hand, if the file has multiple chained logical stream and + /// you are focusing only one of them, + /// it is not recommended to specify the entire range; + /// instead, you should first find the "bounds" of the stream, + /// that is, where the stream starts and ends, + /// and specify the appropriate range. + pub fn seek_absgp_new( &mut self, absgp: u64, stream_serial: Option, range: R ) -> Result, OggReadError> where @@ -1088,14 +1094,14 @@ impl PacketReader { { let seek_begin = match range.start_bound() { Bound::Included(&a) => a, - Bound::Excluded(&a) => a+1, // TODO excluded bound should not be permitted + Bound::Excluded(&a) => a+1, Bound::Unbounded => 0, }; let seek_end = match range.end_bound() { Bound::Excluded(&a) => a, - Bound::Included(&a) => a+1, // TODO included bound should not be permitted + Bound::Included(&a) => a+1, Bound::Unbounded => tri!(self.rdr.seek(SeekFrom::End(0))), - }; + }.max(seek_begin); // lb: lower bound let mut lb_page_begin = seek_begin; From b87e6378dec96a6938ce6916d21832fe666bb42b Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Sun, 14 Feb 2021 17:42:11 +0900 Subject: [PATCH 13/17] Find the ned of logical stream --- src/reading.rs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/src/reading.rs b/src/reading.rs index e603dc0..c581688 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -1117,7 +1117,7 @@ impl PacketReader { let seek_pos = if ub_pos - lb_page_end < linear_search_threshold { lb_page_end } else { - (lb_page_end + ub_pos) / 2 + lb_page_end + (ub_pos - lb_page_end) / 2 }; tri!(self.rdr.seek(SeekFrom::Start(seek_pos))); // println!("{} {} {} {} {}", lb_page_begin, lb_page_end, seek_pos, ub_pos, ub_page_begin); @@ -1188,6 +1188,55 @@ impl PacketReader { Ok(lb_absgp) } + /// Finds the position of the end of logical stream + /// + /// This function first reads the next page via `find_next_page`. + /// If there are no such page, this function returns `Ok(None)`. + /// If there are, it retrieves the stream serial that the page belongs. + /// Then, this function performs binary search for the end of the logical stream + /// with the retrieved stream serial. + /// Therefore, the logical stream must be unmultiplexed until the end of the stream, + /// or it may misbehave. + /// + /// The state of reader after calling this function is unspecified. + pub fn find_end_of_logical_stream(&mut self) -> Result, OggReadError> { + let current_pos = tri!(self.rdr.seek(SeekFrom::Current(0))); + let page = match tri!(find_next_page(&mut self.rdr, None)) { + Some(page) => page, + None => return Ok(None) + }; + let stream_serial = page.stream_serial(); + let mut lb_pos = current_pos + page.offset; + let mut ub_pos = tri!(self.rdr.seek(SeekFrom::End(0))); + let linear_search_threshold = 256 * 256; + + while lb_pos < ub_pos { + let seek_pos = if ub_pos - lb_pos < linear_search_threshold { + lb_pos + } else { + lb_pos + (ub_pos - lb_pos) / 2 + }; + tri!(self.rdr.seek(SeekFrom::Start(seek_pos))); + + let mut target_page = match tri!(find_next_page(&mut self.rdr, None)) { + None => { + ub_pos = seek_pos; + continue; + }, + Some(page) => page, + }; + target_page.offset += seek_pos; + let target_page = target_page; + if target_page.stream_serial() == stream_serial { + lb_pos = target_page.end_pos(); + } else { + ub_pos = target_page.begin_pos(); + } + } + + Ok(Some(lb_pos)) + } + /// Resets the internal state by deleting all /// unread packets. pub fn delete_unread_packets(&mut self) { From 388867f1a78b472c0fe70db8350972574bf5caf7 Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Sun, 14 Feb 2021 17:44:39 +0900 Subject: [PATCH 14/17] Add comments --- src/reading.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/reading.rs b/src/reading.rs index c581688..b282d4c 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -1086,6 +1086,11 @@ impl PacketReader { /// instead, you should first find the "bounds" of the stream, /// that is, where the stream starts and ends, /// and specify the appropriate range. + /// + /// ## Returns + /// + /// This function returns the absolute granule position of the last "old" packet. + /// If there were no "old" packets, it returns `None`. pub fn seek_absgp_new( &mut self, absgp: u64, stream_serial: Option, range: R ) -> Result, OggReadError> From c0b6ada5d22e17f6dd7b772455a0a6b63018b3ca Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Sun, 14 Feb 2021 20:12:05 +0900 Subject: [PATCH 15/17] Remove unused enum --- src/reading.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/reading.rs b/src/reading.rs index b282d4c..bf9006f 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -1249,12 +1249,6 @@ impl PacketReader { } } -pub enum SeekResult { - NoPageFound, - NoPacketEnd, - SeekedToFirstPage, -} - // util function fn seek_before_end(mut rdr :T, offs :u64) -> Result { From 7d6a8e5e7140fb8a2c0283eef1b8252c50535c94 Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Mon, 15 Feb 2021 00:30:56 +0900 Subject: [PATCH 16/17] delete_unread_packets does not require Seek --- src/reading.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/reading.rs b/src/reading.rs index bf9006f..ef7402c 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -843,6 +843,12 @@ impl PacketReader { Ok(Some(tri!(pg_prs.parse_packet_data(packet_data)))) } + + /// Resets the internal state by deleting all + /// unread packets. + pub fn delete_unread_packets(&mut self) { + self.base_pck_rdr.update_after_seek(); + } } impl PacketReader { @@ -1241,12 +1247,6 @@ impl PacketReader { Ok(Some(lb_pos)) } - - /// Resets the internal state by deleting all - /// unread packets. - pub fn delete_unread_packets(&mut self) { - self.base_pck_rdr.update_after_seek(); - } } // util function From 59231beb20eed69da5aa085e1ecf51778323d564 Mon Sep 17 00:00:00 2001 From: TonalidadeHidrica <47710717+TonalidadeHidrica@users.noreply.github.com> Date: Mon, 15 Feb 2021 19:23:54 +0900 Subject: [PATCH 17/17] fix typo --- src/reading.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reading.rs b/src/reading.rs index ef7402c..e138a29 100644 --- a/src/reading.rs +++ b/src/reading.rs @@ -1055,7 +1055,7 @@ impl PacketReader { /// Seeks the reader by the absolute granule position. /// - /// This functions are intersted in such packet that: + /// This function is intersted in such packet that: /// - belongs to the logical stream specified by `stream_serial`, if specified, and /// - ends in a page which entirely spans within the given range. /// Let us call such packet "interesting."