Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor synchronous parsing of file tail metadata #43

Merged
merged 3 commits into from
Nov 17, 2023

Conversation

Jefffrey
Copy link
Collaborator

@Jefffrey Jefffrey commented Nov 16, 2023

Initial attempt at refactoring how file read is handled, as stated here #42 (comment)

Copy link
Collaborator Author

@Jefffrey Jefffrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if this looks like a good direction to head in, or if you have other thoughts 👍

src/reader.rs Outdated
Comment on lines 85 to 124
/// Primary source used for reading required bytes for operations.
#[allow(clippy::len_without_is_empty)]
// TODO: async version
pub trait ChunkReader {
type T: Read;

/// Get total length of bytes. Useful for parsing the metadata located at
/// the end of the file.
fn len(&self) -> u64;

/// Get a reader starting at a specific offset.
fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T>;

/// Read bytes from an offset with specific length.
fn get_bytes(&self, offset_from_start: u64, length: u64) -> std::io::Result<Vec<u8>> {
let mut bytes = vec![0; length as usize];
self.get_read(offset_from_start)?
.take(length)
.read_exact(&mut bytes)?;
Ok(bytes)
}
}

impl ChunkReader for File {
type T = BufReader<File>;

fn len(&self) -> u64 {
self.metadata().map(|m| m.len()).unwrap_or(0u64)
}

/// Care needs to be taken when using this simulatenously as underlying
/// file descriptor is the same and will be affected by other invocations.
///
/// See [`File::try_clone()`] for more details.
fn get_read(&self, offset_from_start: u64) -> std::io::Result<Self::T> {
let mut reader = self.try_clone()?;
reader.seek(SeekFrom::Start(offset_from_start))?;
Ok(BufReader::new(self.try_clone()?))
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially what is done in parquet at the moment. I like this interface as we don't have to keep doing the Seek ourselves

Comment on lines +66 to +86
/// Indicates length of block and whether it's compressed or not.
#[derive(Debug, PartialEq, Eq)]
enum CompressionHeader {
Original(u32),
Compressed(u32),
}

(is_original, length)
/// ORC files are compressed in blocks, with a 3 byte header at the start
/// of these blocks indicating the length of the block and whether it's
/// compressed or not.
fn decode_header(bytes: [u8; 3]) -> CompressionHeader {
let bytes = [bytes[0], bytes[1], bytes[2], 0];
let length = u32::from_le_bytes(bytes);
let is_original = length & 1 == 1;
// to clear the is_original bit
let length = length >> 1;
if is_original {
CompressionHeader::Original(length)
} else {
CompressionHeader::Compressed(length)
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to make things more explicit, utilize Rust's enums more instead of relying on booleans

Comment on lines +1 to +23
//! Parse ORC file tail metadata structures from file.
//!
//! File tail structure:
//!
//! ------------------
//! | Metadata |
//! | |
//! ------------------
//! | Footer |
//! | |
//! ------------------
//! | Postscript |X|
//! ------------------
//!
//! Where X is last byte in file indicating
//! Postscript length in bytes.
//!
//! Footer and Metadata lengths are encoded in Postscript.
//! Postscript is never compressed, Footer and Metadata
//! may be compressed depending Postscript config value.
//!
//! If they are compressed then their lengths indicate their
//! compressed lengths.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where possible lets start adding docs

I prefer adding docs about the spec here since it removes the need for people to have to refer to the spec separately when trying to understand the code

Comment on lines +57 to +82
// Initial read of the file tail
// Use a default size for first read in hopes of capturing all sections with one read
// At worst need two reads to get all necessary bytes
let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE);
let mut tail_bytes = reader
.get_bytes(file_len - assume_footer_len, assume_footer_len)
.context(error::IoSnafu)?;

$reader
.seek(SeekFrom::Start(metadata_offset))$($_await)*
.context(error::SeekSnafu)?;
let mut metadata = vec![0; metadata_length];
$reader.read_exact(&mut metadata)$($_await)*.context(error::IoSnafu)?;

let metadata = deserialize_footer_metadata(&metadata, compression)?;

let mut stripe_footers = Vec::with_capacity(footer.stripes.len());

let mut scratch = Vec::<u8>::new();

for stripe in &footer.stripes {
let start = stripe.offset() + stripe.index_length() + stripe.data_length();
let len = stripe.footer_length();
$reader
.seek(SeekFrom::Start(start))$($_await)*
.context(error::SeekSnafu)?;

scratch.clear();
scratch.reserve(len as usize);
$reader
.take(len)
.read_to_end(&mut scratch)$($_await)*
.context(error::IoSnafu)?;
stripe_footers.push(deserialize_stripe_footer(
&scratch,
compression,
)?);
}

Ok(FileMetadata {
postscript,
footer,
metadata,
stripe_footers,
})
}
// The final byte of the file contains the serialized length of the Postscript,
// which must be less than 256 bytes.
let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64;
tail_bytes.truncate(tail_bytes.len() - 1);

// TODO: slice here could panic if file too small
let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..])
.context(error::DecodeProtoSnafu)?;
let compression =
Compression::from_proto(postscript.compression(), postscript.compression_block_size);
tail_bytes.truncate(tail_bytes.len() - postscript_len as usize);

let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu {
msg: "Footer length is empty",
})?;
let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu {
msg: "Metadata length is empty",
})?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the new ChunkReader trait here to do the reads instead

Comment on lines +85 to +96
let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 {
// Need second read
// -1 is the postscript length byte
let offset = file_len - 1 - postscript_len - footer_length - metadata_length;
let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64;
let mut prepend_bytes = reader
.get_bytes(offset, bytes_to_read)
.context(error::IoSnafu)?;
prepend_bytes.extend(tail_bytes);
prepend_bytes
} else {
tail_bytes
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea being we do an initial read hoping to get all bytes necessary to decode postscript, footer and metadata

But at worst need one more read if first read is insufficient (as informed by decoded postscript)

Comment on lines +112 to +122
// clippy read_zero_byte_vec lint causing issues so init to non-zero length
let mut scratch = vec![0];
for stripe in &footer.stripes {
let offset = stripe.offset() + stripe.index_length() + stripe.data_length();
let len = stripe.footer_length();

let mut read = reader.get_read(offset).context(error::IoSnafu)?;
scratch.resize(len as usize, 0);
read.read_exact(&mut scratch).context(error::IoSnafu)?;
stripe_footers.push(deserialize_stripe_footer(&scratch, compression)?);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might refactor this away later, we'll see (might not want to decode all stripe footers here and now)

pub async fn read_metadata_async<R>(reader: &mut R) -> Result<FileMetadata>
where
R: AsyncRead + AsyncSeek + Unpin + Send,
{
impl_read_metadata!(reader.await)
let file_len = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copied what the macro did into here

I'll focus on sync interface first then can apply to async, to limit scope of the work

@Jefffrey Jefffrey merged commit eed416c into main Nov 17, 2023
6 checks passed
@Jefffrey Jefffrey deleted the refactor_metadata_parse branch November 17, 2023 08:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants