From 60f49fb2b470a02befd4bc76cc528e290bb1c968 Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Thu, 16 Nov 2023 22:43:23 +1100 Subject: [PATCH 1/3] Refactor synchronous parsing of file tail metadata --- src/reader.rs | 46 +++++- src/reader/decompress.rs | 71 ++++---- src/reader/metadata.rs | 338 +++++++++++++++++++++++++-------------- 3 files changed, 309 insertions(+), 146 deletions(-) diff --git a/src/reader.rs b/src/reader.rs index b49f0a80..b4ad4789 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -3,7 +3,8 @@ pub mod decompress; pub mod metadata; pub mod schema; -use std::io::{Read, Seek}; +use std::fs::File; +use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncSeek}; @@ -21,7 +22,7 @@ pub struct Reader { pub(crate) schema: Arc, } -impl Reader { +impl Reader { pub fn new(mut r: R) -> Result { let metadata = Box::new(read_metadata(&mut r)?); let schema = create_schema(&metadata.footer.types, 0)?; @@ -80,3 +81,44 @@ impl Reader { }) } } + +/// Primary source used for reading required bytes for operations. +#[allow(clippy::len_without_is_empty)] +// TODO: async version +pub trait ChunkReader { + type T: Read; + + /// Get total length of bytes. Useful for parsing the metadata located at + /// the end of the file. + fn len(&self) -> u64; + + /// Get a reader starting at a specific offset. + fn get_read(&self, offset_from_start: u64) -> std::io::Result; + + /// Read bytes from an offset with specific length. + fn get_bytes(&self, offset_from_start: u64, length: u64) -> std::io::Result> { + let mut bytes = vec![0; length as usize]; + self.get_read(offset_from_start)? + .take(length) + .read_exact(&mut bytes)?; + Ok(bytes) + } +} + +impl ChunkReader for File { + type T = BufReader; + + fn len(&self) -> u64 { + self.metadata().map(|m| m.len()).unwrap_or(0u64) + } + + /// Care needs to be taken when using this simulatenously as underlying + /// file descriptor is the same and will be affected by other invocations. + /// + /// See [`File::try_clone()`] for more details. + fn get_read(&self, offset_from_start: u64) -> std::io::Result { + let mut reader = self.try_clone()?; + reader.seek(SeekFrom::Start(offset_from_start))?; + Ok(BufReader::new(self.try_clone()?)) + } +} diff --git a/src/reader/decompress.rs b/src/reader/decompress.rs index 910b1c7d..4b598444 100644 --- a/src/reader/decompress.rs +++ b/src/reader/decompress.rs @@ -63,14 +63,27 @@ pub enum CompressionType { Zstd, } -fn decode_header(bytes: &[u8]) -> (bool, usize) { - let a: [u8; 3] = (&bytes[..3]).try_into().unwrap(); - let a = [0, a[0], a[1], a[2]]; - let length = u32::from_le_bytes(a); - let is_original = a[1] & 1 == 1; - let length = (length >> (8 + 1)) as usize; +/// Indicates length of block and whether it's compressed or not. +#[derive(Debug, PartialEq, Eq)] +enum CompressionHeader { + Original(u32), + Compressed(u32), +} - (is_original, length) +/// ORC files are compressed in blocks, with a 3 byte header at the start +/// of these blocks indicating the length of the block and whether it's +/// compressed or not. +fn decode_header(bytes: [u8; 3]) -> CompressionHeader { + let bytes = [bytes[0], bytes[1], bytes[2], 0]; + let length = u32::from_le_bytes(bytes); + let is_original = length & 1 == 1; + // to clear the is_original bit + let length = length >> 1; + if is_original { + CompressionHeader::Original(length) + } else { + CompressionHeader::Compressed(length) + } } fn decompress_block( @@ -164,21 +177,24 @@ impl FallibleStreamingIterator for DecompressorIter { match self.compression { Some(compression) => { - // todo: take stratch from current State::Compressed for re-use - let (is_original, length) = decode_header(&self.stream); - let _ = self.stream.split_to(3); - let maybe_compressed = self.stream.split_to(length); - - if is_original { - self.current = Some(State::Original(maybe_compressed.into())); - } else { - decompress_block(compression, &maybe_compressed, &mut self.scratch)?; - self.current = Some(State::Compressed(std::mem::take(&mut self.scratch))); - } + // TODO: take stratch from current State::Compressed for re-use + let header = self.stream.split_to(3); + let header = [header[0], header[1], header[2]]; + match decode_header(header) { + CompressionHeader::Original(length) => { + let original = self.stream.split_to(length as usize); + self.current = Some(State::Original(original.into())); + } + CompressionHeader::Compressed(length) => { + let compressed = self.stream.split_to(length as usize); + decompress_block(compression, &compressed, &mut self.scratch)?; + self.current = Some(State::Compressed(std::mem::take(&mut self.scratch))); + } + }; Ok(()) } None => { - // todo: take stratch from current State::Compressed for re-use + // TODO: take stratch from current State::Compressed for re-use self.current = Some(State::Original(self.stream.clone().into())); self.stream.clear(); Ok(()) @@ -261,20 +277,19 @@ mod tests { #[test] fn decode_uncompressed() { // 5 uncompressed = [0x0b, 0x00, 0x00] = [0b1011, 0, 0] - let bytes = &[0b1011, 0, 0, 0]; + let bytes = [0b1011, 0, 0]; - let (is_original, length) = decode_header(bytes); - assert!(is_original); - assert_eq!(length, 5); + let expected = CompressionHeader::Original(5); + let actual = decode_header(bytes); + assert_eq!(expected, actual); } #[test] fn decode_compressed() { // 100_000 compressed = [0x40, 0x0d, 0x03] = [0b01000000, 0b00001101, 0b00000011] - let bytes = &[0b01000000, 0b00001101, 0b00000011, 0]; - - let (is_original, length) = decode_header(bytes); - assert!(!is_original); - assert_eq!(length, 100_000); + let bytes = [0b0100_0000, 0b0000_1101, 0b0000_0011]; + let expected = CompressionHeader::Compressed(100_000); + let actual = decode_header(bytes); + assert_eq!(expected, actual); } } diff --git a/src/reader/metadata.rs b/src/reader/metadata.rs index 1c54bf38..fa33354f 100644 --- a/src/reader/metadata.rs +++ b/src/reader/metadata.rs @@ -1,4 +1,28 @@ -use std::io::{Read, Seek, SeekFrom}; +//! Parse ORC file tail metadata structures from file. +//! +//! File tail structure: +//! +//! ------------------ +//! | Metadata | +//! | | +//! ------------------ +//! | Footer | +//! | | +//! ------------------ +//! | Postscript |X| +//! ------------------ +//! +//! Where X is last byte in file indicating +//! Postscript length in bytes. +//! +//! Footer and Metadata lengths are encoded in Postscript. +//! Postscript is never compressed, Footer and Metadata +//! may be compressed depending Postscript config value. +//! +//! If they are compressed then their lengths indicate their +//! compressed lengths. + +use std::io::{Read, SeekFrom}; use bytes::Bytes; use prost::Message; @@ -10,6 +34,7 @@ use crate::proto::{Footer, Metadata, PostScript, StripeFooter}; use crate::reader::decompress::Decompressor; use super::decompress::Compression; +use super::ChunkReader; const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024; @@ -22,144 +47,224 @@ pub struct FileMetadata { pub stripe_footers: Vec, } -macro_rules! impl_read_metadata { - ($reader:ident $($_await:tt)*) => { - { - let file_len = { - let old_pos = $reader.stream_position()$($_await)*.context(error::SeekSnafu)?; - let len = $reader.seek(SeekFrom::End(0))$($_await)*.context(error::SeekSnafu)?; - - // Avoid seeking a third time when we were already at the end of the - // stream. The branch is usually way cheaper than a seek operation. - if old_pos != len { - $reader.seek(SeekFrom::Start(old_pos))$($_await)* - .context(error::SeekSnafu)?; - } - len - }; - - // initial read of the footer - let assume_footer_len = if file_len < DEFAULT_FOOTER_SIZE { - file_len - } else { - DEFAULT_FOOTER_SIZE - }; - - $reader - .seek(SeekFrom::End(-(assume_footer_len as i64)))$($_await)* - .context(error::SeekSnafu)?; - let mut tail_bytes = Vec::with_capacity(assume_footer_len as usize); - $reader - .take(assume_footer_len) - .read_to_end(&mut tail_bytes)$($_await)* - .context(error::IoSnafu)?; - - // The final byte of the file contains the serialized length of the Postscript, - // which must be less than 256 bytes. - let postscript_len = tail_bytes[tail_bytes.len() - 1] as usize; - tail_bytes.truncate(tail_bytes.len() - 1); - - // next is the postscript - let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len..]) - .context(error::DecodeProtoSnafu)?; - let compression = Compression::from_proto( - postscript.compression(), - postscript.compression_block_size - ); - tail_bytes.truncate(tail_bytes.len() - postscript_len); - - // next is the footer - let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu { - msg: "Footer length is empty", - })? as usize; // todo: throw error - - let footer_offset = file_len - footer_length as u64 - postscript_len as u64 - 1; - - $reader - .seek(SeekFrom::Start(footer_offset))$($_await)* - .context(error::SeekSnafu)?; - let mut footer = vec![0; footer_length]; - $reader - .read_exact(&mut footer)$($_await)* - .context(error::SeekSnafu)?; - let footer = deserialize_footer(&footer, compression)?; +pub fn read_metadata(reader: &mut R) -> Result +where + R: ChunkReader, +{ + let file_len = reader.len(); + // TODO: return error if empty - // finally the metadata - let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu { - msg: "Metadata length is empty", - })? as usize; - let metadata_offset = - file_len - metadata_length as u64 - footer_length as u64 - postscript_len as u64 - 1; + // Initial read of the file tail + // Use a default size for first read in hopes of capturing all sections with one read + // At worst need two reads to get all necessary bytes + let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE); + let mut tail_bytes = reader + .get_bytes(file_len - assume_footer_len, assume_footer_len) + .context(error::IoSnafu)?; - $reader - .seek(SeekFrom::Start(metadata_offset))$($_await)* - .context(error::SeekSnafu)?; - let mut metadata = vec![0; metadata_length]; - $reader.read_exact(&mut metadata)$($_await)*.context(error::IoSnafu)?; - - let metadata = deserialize_footer_metadata(&metadata, compression)?; - - let mut stripe_footers = Vec::with_capacity(footer.stripes.len()); - - let mut scratch = Vec::::new(); - - for stripe in &footer.stripes { - let start = stripe.offset() + stripe.index_length() + stripe.data_length(); - let len = stripe.footer_length(); - $reader - .seek(SeekFrom::Start(start))$($_await)* - .context(error::SeekSnafu)?; - - scratch.clear(); - scratch.reserve(len as usize); - $reader - .take(len) - .read_to_end(&mut scratch)$($_await)* - .context(error::IoSnafu)?; - stripe_footers.push(deserialize_stripe_footer( - &scratch, - compression, - )?); - } - - Ok(FileMetadata { - postscript, - footer, - metadata, - stripe_footers, - }) - } + // The final byte of the file contains the serialized length of the Postscript, + // which must be less than 256 bytes. + let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64; + tail_bytes.truncate(tail_bytes.len() - 1); + + // TODO: slice here could panic if file too small + let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..]) + .context(error::DecodeProtoSnafu)?; + let compression = + Compression::from_proto(postscript.compression(), postscript.compression_block_size); + tail_bytes.truncate(tail_bytes.len() - postscript_len as usize); + + let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu { + msg: "Footer length is empty", + })?; + let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu { + msg: "Metadata length is empty", + })?; + + // Ensure we have enough bytes for Footer and Metadata + let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 { + // Need second read + // -1 is the postscript length byte + let offset = file_len - 1 - postscript_len - footer_length - metadata_length; + let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64; + let mut prepend_bytes = reader + .get_bytes(offset, bytes_to_read) + .context(error::IoSnafu)?; + prepend_bytes.extend(tail_bytes); + prepend_bytes + } else { + tail_bytes }; -} -pub fn read_metadata(reader: &mut R) -> Result -where - R: Read + Seek, -{ - impl_read_metadata!(reader) + let footer = deserialize_footer( + &tail_bytes[tail_bytes.len() - footer_length as usize..], + compression, + )?; + tail_bytes.truncate(tail_bytes.len() - footer_length as usize); + + let metadata = deserialize_footer_metadata( + &tail_bytes[tail_bytes.len() - metadata_length as usize..], + compression, + )?; + + let mut stripe_footers = Vec::with_capacity(footer.stripes.len()); + + // clippy read_zero_byte_vec lint causing issues so init to non-zero length + let mut scratch = vec![0]; + for stripe in &footer.stripes { + let offset = stripe.offset() + stripe.index_length() + stripe.data_length(); + let len = stripe.footer_length(); + + let mut read = reader.get_read(offset).context(error::IoSnafu)?; + scratch.resize(len as usize, 0); + read.read_exact(&mut scratch).context(error::IoSnafu)?; + stripe_footers.push(deserialize_stripe_footer(&scratch, compression)?); + } + + Ok(FileMetadata { + postscript, + footer, + metadata, + stripe_footers, + }) } +// TODO: refactor like for sync pub async fn read_metadata_async(reader: &mut R) -> Result where R: AsyncRead + AsyncSeek + Unpin + Send, { - impl_read_metadata!(reader.await) + let file_len = { + let old_pos = reader.stream_position().await.context(error::SeekSnafu)?; + let len = reader + .seek(SeekFrom::End(0)) + .await + .context(error::SeekSnafu)?; + + // Avoid seeking a third time when we were already at the end of the + // stream. The branch is usually way cheaper than a seek operation. + if old_pos != len { + reader + .seek(SeekFrom::Start(old_pos)) + .await + .context(error::SeekSnafu)?; + } + len + }; + + // initial read of the footer + let assume_footer_len = if file_len < DEFAULT_FOOTER_SIZE { + file_len + } else { + DEFAULT_FOOTER_SIZE + }; + + reader + .seek(SeekFrom::End(-(assume_footer_len as i64))) + .await + .context(error::SeekSnafu)?; + let mut tail_bytes = Vec::with_capacity(assume_footer_len as usize); + reader + .take(assume_footer_len) + .read_to_end(&mut tail_bytes) + .await + .context(error::IoSnafu)?; + + // The final byte of the file contains the serialized length of the Postscript, + // which must be less than 256 bytes. + let postscript_len = tail_bytes[tail_bytes.len() - 1] as usize; + tail_bytes.truncate(tail_bytes.len() - 1); + + // next is the postscript + let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len..]) + .context(error::DecodeProtoSnafu)?; + let compression = + Compression::from_proto(postscript.compression(), postscript.compression_block_size); + tail_bytes.truncate(tail_bytes.len() - postscript_len); + + // next is the footer + let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu { + msg: "Footer length is empty", + })? as usize; // todo: throw error + + let footer_offset = file_len - footer_length as u64 - postscript_len as u64 - 1; + + reader + .seek(SeekFrom::Start(footer_offset)) + .await + .context(error::SeekSnafu)?; + let mut footer = vec![0; footer_length]; + reader + .read_exact(&mut footer) + .await + .context(error::SeekSnafu)?; + let footer = deserialize_footer(&footer, compression)?; + + // finally the metadata + let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu { + msg: "Metadata length is empty", + })? as usize; + let metadata_offset = + file_len - metadata_length as u64 - footer_length as u64 - postscript_len as u64 - 1; + + reader + .seek(SeekFrom::Start(metadata_offset)) + .await + .context(error::SeekSnafu)?; + let mut metadata = vec![0; metadata_length]; + reader + .read_exact(&mut metadata) + .await + .context(error::IoSnafu)?; + + let metadata = deserialize_footer_metadata(&metadata, compression)?; + + let mut stripe_footers = Vec::with_capacity(footer.stripes.len()); + + let mut scratch = Vec::::new(); + + for stripe in &footer.stripes { + let start = stripe.offset() + stripe.index_length() + stripe.data_length(); + let len = stripe.footer_length(); + reader + .seek(SeekFrom::Start(start)) + .await + .context(error::SeekSnafu)?; + + scratch.clear(); + scratch.reserve(len as usize); + reader + .take(len) + .read_to_end(&mut scratch) + .await + .context(error::IoSnafu)?; + stripe_footers.push(deserialize_stripe_footer(&scratch, compression)?); + } + + Ok(FileMetadata { + postscript, + footer, + metadata, + stripe_footers, + }) } fn deserialize_footer(bytes: &[u8], compression: Option) -> Result