Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor synchronous parsing of file tail metadata #43

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ pub mod decompress;
pub mod metadata;
pub mod schema;

use std::io::{Read, Seek};
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;

use tokio::io::{AsyncRead, AsyncSeek};
Expand All @@ -21,7 +22,7 @@ pub struct Reader<R> {
pub(crate) schema: Arc<TypeDescription>,
}

impl<R: Read + Seek> Reader<R> {
impl<R: ChunkReader> Reader<R> {
pub fn new(mut r: R) -> Result<Self> {
let metadata = Box::new(read_metadata(&mut r)?);
let schema = create_schema(&metadata.footer.types, 0)?;
Expand Down Expand Up @@ -80,3 +81,44 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send> Reader<R> {
})
}
}

/// Primary source used for reading required bytes for operations.
#[allow(clippy::len_without_is_empty)]
// TODO: async version
pub trait ChunkReader {
type T: Read;

/// Get total length of bytes. Useful for parsing the metadata located at
/// the end of the file.
fn len(&self) -> u64;

/// Get a reader starting at a specific offset.
fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T>;

/// Read bytes from an offset with specific length.
fn get_bytes(&self, offset_from_start: u64, length: u64) -> std::io::Result<Vec<u8>> {
let mut bytes = vec![0; length as usize];
self.get_read(offset_from_start)?
.take(length)
.read_exact(&mut bytes)?;
Ok(bytes)
}
}

impl ChunkReader for File {
type T = BufReader<File>;

fn len(&self) -> u64 {
self.metadata().map(|m| m.len()).unwrap_or(0u64)
}

/// Care needs to be taken when using this simultaneously as underlying
/// file descriptor is the same and will be affected by other invocations.
///
/// See [`File::try_clone()`] for more details.
fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T> {
let mut reader = self.try_clone()?;
reader.seek(SeekFrom::Start(offset_from_start))?;
Ok(BufReader::new(self.try_clone()?))
}
}
71 changes: 43 additions & 28 deletions src/reader/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,27 @@ pub enum CompressionType {
Zstd,
}

fn decode_header(bytes: &[u8]) -> (bool, usize) {
let a: [u8; 3] = (&bytes[..3]).try_into().unwrap();
let a = [0, a[0], a[1], a[2]];
let length = u32::from_le_bytes(a);
let is_original = a[1] & 1 == 1;
let length = (length >> (8 + 1)) as usize;
/// Indicates length of block and whether it's compressed or not.
#[derive(Debug, PartialEq, Eq)]
enum CompressionHeader {
Original(u32),
Compressed(u32),
}

(is_original, length)
/// ORC files are compressed in blocks, with a 3 byte header at the start
/// of these blocks indicating the length of the block and whether it's
/// compressed or not.
fn decode_header(bytes: [u8; 3]) -> CompressionHeader {
let bytes = [bytes[0], bytes[1], bytes[2], 0];
let length = u32::from_le_bytes(bytes);
let is_original = length & 1 == 1;
// to clear the is_original bit
let length = length >> 1;
if is_original {
CompressionHeader::Original(length)
} else {
CompressionHeader::Compressed(length)
}
Comment on lines +66 to +86
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to make things more explicit, utilize Rust's enums more instead of relying on booleans

}

fn decompress_block(
Expand Down Expand Up @@ -164,21 +177,24 @@ impl FallibleStreamingIterator for DecompressorIter {

match self.compression {
Some(compression) => {
// todo: take stratch from current State::Compressed for re-use
let (is_original, length) = decode_header(&self.stream);
let _ = self.stream.split_to(3);
let maybe_compressed = self.stream.split_to(length);

if is_original {
self.current = Some(State::Original(maybe_compressed.into()));
} else {
decompress_block(compression, &maybe_compressed, &mut self.scratch)?;
self.current = Some(State::Compressed(std::mem::take(&mut self.scratch)));
}
// TODO: take stratch from current State::Compressed for re-use
let header = self.stream.split_to(3);
let header = [header[0], header[1], header[2]];
match decode_header(header) {
CompressionHeader::Original(length) => {
let original = self.stream.split_to(length as usize);
self.current = Some(State::Original(original.into()));
}
CompressionHeader::Compressed(length) => {
let compressed = self.stream.split_to(length as usize);
decompress_block(compression, &compressed, &mut self.scratch)?;
self.current = Some(State::Compressed(std::mem::take(&mut self.scratch)));
}
};
Ok(())
}
None => {
// todo: take stratch from current State::Compressed for re-use
// TODO: take stratch from current State::Compressed for re-use
self.current = Some(State::Original(self.stream.clone().into()));
self.stream.clear();
Ok(())
Expand Down Expand Up @@ -261,20 +277,19 @@ mod tests {
#[test]
fn decode_uncompressed() {
// 5 uncompressed = [0x0b, 0x00, 0x00] = [0b1011, 0, 0]
let bytes = &[0b1011, 0, 0, 0];
let bytes = [0b1011, 0, 0];

let (is_original, length) = decode_header(bytes);
assert!(is_original);
assert_eq!(length, 5);
let expected = CompressionHeader::Original(5);
let actual = decode_header(bytes);
assert_eq!(expected, actual);
}

#[test]
fn decode_compressed() {
// 100_000 compressed = [0x40, 0x0d, 0x03] = [0b01000000, 0b00001101, 0b00000011]
let bytes = &[0b01000000, 0b00001101, 0b00000011, 0];

let (is_original, length) = decode_header(bytes);
assert!(!is_original);
assert_eq!(length, 100_000);
let bytes = [0b0100_0000, 0b0000_1101, 0b0000_0011];
let expected = CompressionHeader::Compressed(100_000);
let actual = decode_header(bytes);
assert_eq!(expected, actual);
}
}
Loading
Loading