Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support all decompression types #20

Merged
merged 2 commits into from
Nov 4, 2023

Conversation

Jefffrey
Copy link
Collaborator

@Jefffrey Jefffrey commented Nov 4, 2023

Support decompressing snappy, lz4 and lzo compressed files. Also refactor how compression is handled to try reduce duplication.

Closes #10

Comment on lines +431 to +434
let compression = Compression::from_proto(
r.metadata().postscript.compression(),
r.metadata().postscript.compression_block_size,
);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe eventually can store Compression directly as field in FileMetadata so don't need to derive each time

Comment on lines +17 to +25
let days = Days::new(data.unsigned_abs());
// safe unwrap as is valid date
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let date = if data.is_negative() {
epoch.checked_sub_days(days)
} else {
epoch.checked_add_days(days)
};
date.context(error::AddDaysSnafu)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix to support dates before epoch

Comment on lines 13 to 52
#[derive(Clone, Copy, Debug)]
pub struct Compression {
compression_type: CompressionType,
/// No compression chunk will decompress to larger than this size.
/// Use to size the scratch buffer appropriately.
max_decompressed_block_size: usize,
}

impl Compression {
pub fn from_proto(
kind: proto::CompressionKind,
compression_block_size: Option<u64>,
) -> Option<Self> {
// Spec states default is 256K
let max_decompressed_block_size = compression_block_size.unwrap_or(256 * 1024) as usize;
match kind {
CompressionKind::None => None,
CompressionKind::Zlib => Some(Self {
compression_type: CompressionType::Zlib,
max_decompressed_block_size,
}),
CompressionKind::Snappy => Some(Self {
compression_type: CompressionType::Snappy,
max_decompressed_block_size,
}),
CompressionKind::Lzo => Some(Self {
compression_type: CompressionType::Lzo,
max_decompressed_block_size,
}),
CompressionKind::Lz4 => Some(Self {
compression_type: CompressionType::Lz4,
max_decompressed_block_size,
}),
CompressionKind::Zstd => Some(Self {
compression_type: CompressionType::Zstd,
max_decompressed_block_size,
}),
}
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to represent no compression as None in Option, since it has different behaviour from when there is compression.

Compression all shares similar behaviour of needing to decode a header for a compressed block whereas no compression doesn't need this, so figured would be more appropriate this way

Comment on lines +99 to +115
CompressionType::Lzo => {
let decompressed = lzokay_native::decompress_all(compressed_bytes, None)
.context(error::BuildLzoDecoderSnafu)?;
// TODO: better way to utilize scratch here
scratch.clear();
scratch.extend(decompressed);
}
CompressionType::Lz4 => {
let decompressed = lz4_flex::block::decompress(
compressed_bytes,
compression.max_decompressed_block_size,
)
.context(error::BuildLz4DecoderSnafu)?;
// TODO: better way to utilize scratch here
scratch.clear();
scratch.extend(decompressed);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's probably ways to make this more efficient but can probably tackle that later, especially when we have benchmarks to be able to measure such improvements

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Spark to generate as pyorc doesn't seem to support all compression types when writing (I think had issue with lzo and snappy?)

Spark supports all so more reliable, with downside of needing a Spark install to run this script

Comment on lines +299 to +307
pub fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
let formatted = pretty::pretty_format_batches(batches).unwrap().to_string();
let actual_lines: Vec<_> = formatted.trim().lines().collect();
assert_eq!(
&actual_lines, expected_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes it easier to copy the expected output and paste directly as code

src/reader/decompress.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@WenyXu WenyXu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Jefffrey Jefffrey merged commit cbc6371 into main Nov 4, 2023
6 checks passed
@Jefffrey Jefffrey deleted the feature/support_all_decompression branch November 4, 2023 11:16
waynexia pushed a commit that referenced this pull request Oct 24, 2024
* Support all decompression types

* Move default compression block size to const
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support all compression methods
2 participants