diff --git a/Cargo.lock b/Cargo.lock index b2bea7c1..12c45c2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -313,6 +313,12 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.5.0" @@ -434,9 +440,12 @@ dependencies = [ "futures", "futures-util", "lazy_static", + "lz4_flex", + "lzokay-native", "paste", "prost 0.11.9", "snafu", + "snap", "tokio", "zigzag", "zstd", @@ -801,6 +810,25 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lz4_flex" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ea9b256699eda7b0387ffbc776dd625e28bde3918446381781245b7a50349d8" +dependencies = [ + "twox-hash", +] + +[[package]] +name = "lzokay-native" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "792ba667add2798c6c3e988e630f4eb921b5cbc735044825b7111ef1582c8730" +dependencies = [ + "byteorder", + "thiserror", +] + [[package]] name = "memchr" version = "2.6.4" @@ -1193,6 +1221,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "snap" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" + [[package]] name = "static_assertions" version = "1.1.0" @@ -1253,6 +1287,26 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "thiserror" +version = "1.0.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -1285,6 +1339,16 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "unicode-ident" version = "1.0.12" diff --git a/Cargo.toml b/Cargo.toml index 31add2c7..735b156c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,9 +25,12 @@ flate2 = "1" futures = { version = "0.3", default-features = false, features = ["std"] } futures-util = "0.3" lazy_static = "1.4" +lz4_flex = "0.11" +lzokay-native = "0.1" paste = "1.0" prost = { version = "0.11" } snafu = "0.7" +snap = "1.1" tokio = { version = "1.28", features = [ "io-util", "sync", diff --git a/src/arrow_reader.rs b/src/arrow_reader.rs index 6fe4cd4e..183b2406 100644 --- a/src/arrow_reader.rs +++ b/src/arrow_reader.rs @@ -26,6 +26,7 @@ use crate::arrow_reader::column::timestamp::new_timestamp_iter; use crate::arrow_reader::column::NullableIterator; use crate::error::{self, Result}; use crate::proto::{StripeFooter, StripeInformation}; +use crate::reader::decompress::Compression; use crate::reader::schema::{create_field, TypeDescription}; use crate::reader::Reader; @@ -427,7 +428,10 @@ impl Stripe { ) -> Result { let footer = Arc::new(r.stripe_footer(stripe).clone()); - let compression = r.metadata().postscript.compression(); + let compression = Compression::from_proto( + r.metadata().postscript.compression(), + r.metadata().postscript.compression_block_size, + ); //TODO(weny): add tz let columns = columns .iter() diff --git a/src/arrow_reader/column.rs b/src/arrow_reader/column.rs index 3ac43871..9ed6a0d0 100644 --- a/src/arrow_reader/column.rs +++ b/src/arrow_reader/column.rs @@ -7,8 +7,8 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use crate::error::{self, Result}; use crate::proto::stream::Kind; -use crate::proto::{ColumnEncoding, CompressionKind, StripeFooter, StripeInformation}; -use crate::reader::decompress::Decompressor; +use crate::proto::{ColumnEncoding, StripeFooter, StripeInformation}; +use crate::reader::decompress::{Compression, Decompressor}; use crate::reader::schema::TypeDescription; use crate::reader::Reader; @@ -25,7 +25,7 @@ pub mod timestamp; pub struct Column { data: Bytes, number_of_rows: u64, - compression: CompressionKind, + compression: Option, footer: Arc, name: String, column: Arc, @@ -99,7 +99,7 @@ impl Column { pub fn new( reader: &mut Reader, - compression: CompressionKind, + compression: Option, name: &str, column: &Arc, footer: &Arc, @@ -120,7 +120,7 @@ impl Column { pub async fn new_async( reader: &mut Reader, - compression: CompressionKind, + compression: Option, name: &str, column: &Arc, footer: &Arc, diff --git a/src/arrow_reader/column/date.rs b/src/arrow_reader/column/date.rs index af2ec786..edbfd586 100644 --- a/src/arrow_reader/column/date.rs +++ b/src/arrow_reader/column/date.rs @@ -14,12 +14,15 @@ pub struct DateIterator { } pub fn convert_date(data: i64) -> Result { - let date = NaiveDate::from_ymd_opt(1970, 1, 1) - .context(error::InvalidDateSnafu)? - .checked_add_days(Days::new(data as u64)) - .context(error::AddDaysSnafu)?; - - Ok(date) + let days = Days::new(data.unsigned_abs()); + // safe unwrap as is valid date + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let date = if data.is_negative() { + epoch.checked_sub_days(days) + } else { + epoch.checked_add_days(days) + }; + date.context(error::AddDaysSnafu) } impl Iterator for DateIterator { diff --git a/src/async_arrow_reader.rs b/src/async_arrow_reader.rs index d9ba6938..6186f5d5 100644 --- a/src/async_arrow_reader.rs +++ b/src/async_arrow_reader.rs @@ -17,6 +17,7 @@ use crate::arrow_reader::{ }; use crate::error::Result; use crate::proto::StripeInformation; +use crate::reader::decompress::Compression; use crate::reader::schema::TypeDescription; use crate::reader::Reader; @@ -188,7 +189,10 @@ impl Stripe { ) -> Result { let footer = Arc::new(r.stripe_footer(stripe).clone()); - let compression = r.metadata().postscript.compression(); + let compression = Compression::from_proto( + r.metadata().postscript.compression(), + r.metadata().postscript.compression_block_size, + ); //TODO(weny): add tz let mut columns = Vec::with_capacity(column_defs.len()); for (name, typ) in column_defs.iter() { diff --git a/src/error.rs b/src/error.rs index d92af28b..0369382b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -87,6 +87,24 @@ pub enum Error { location: Location, source: io::Error, }, + + #[snafu(display("Failed to build snappy decoder: {}", source))] + BuildSnappyDecoder { + location: Location, + source: snap::Error, + }, + + #[snafu(display("Failed to build lzo decoder: {}", source))] + BuildLzoDecoder { + location: Location, + source: lzokay_native::Error, + }, + + #[snafu(display("Failed to build lz4 decoder: {}", source))] + BuildLz4Decoder { + location: Location, + source: lz4_flex::block::DecompressError, + }, } pub type Result = std::result::Result; diff --git a/src/reader/decompress.rs b/src/reader/decompress.rs index de1534a1..910b1c7d 100644 --- a/src/reader/decompress.rs +++ b/src/reader/decompress.rs @@ -3,13 +3,65 @@ use std::io::Read; -use arrow::datatypes::ToByteSlice; use bytes::{Bytes, BytesMut}; use fallible_streaming_iterator::FallibleStreamingIterator; use snafu::ResultExt; use crate::error::{self, Error}; -use crate::proto::CompressionKind; +use crate::proto::{self, CompressionKind}; + +// Spec states default is 256K +const DEFAULT_COMPRESSION_BLOCK_SIZE: u64 = 256 * 1024; + +#[derive(Clone, Copy, Debug)] +pub struct Compression { + compression_type: CompressionType, + /// No compression chunk will decompress to larger than this size. + /// Use to size the scratch buffer appropriately. + max_decompressed_block_size: usize, +} + +impl Compression { + pub fn from_proto( + kind: proto::CompressionKind, + compression_block_size: Option, + ) -> Option { + let max_decompressed_block_size = + compression_block_size.unwrap_or(DEFAULT_COMPRESSION_BLOCK_SIZE) as usize; + match kind { + CompressionKind::None => None, + CompressionKind::Zlib => Some(Self { + compression_type: CompressionType::Zlib, + max_decompressed_block_size, + }), + CompressionKind::Snappy => Some(Self { + compression_type: CompressionType::Snappy, + max_decompressed_block_size, + }), + CompressionKind::Lzo => Some(Self { + compression_type: CompressionType::Lzo, + max_decompressed_block_size, + }), + CompressionKind::Lz4 => Some(Self { + compression_type: CompressionType::Lz4, + max_decompressed_block_size, + }), + CompressionKind::Zstd => Some(Self { + compression_type: CompressionType::Zstd, + max_decompressed_block_size, + }), + } + } +} + +#[derive(Clone, Copy, Debug)] +pub enum CompressionType { + Zlib, + Snappy, + Lzo, + Lz4, + Zstd, +} fn decode_header(bytes: &[u8]) -> (bool, usize) { let a: [u8; 3] = (&bytes[..3]).try_into().unwrap(); @@ -21,6 +73,53 @@ fn decode_header(bytes: &[u8]) -> (bool, usize) { (is_original, length) } +fn decompress_block( + compression: Compression, + compressed_bytes: &[u8], + scratch: &mut Vec, +) -> Result<(), Error> { + match compression.compression_type { + CompressionType::Zlib => { + let mut gz = flate2::read::DeflateDecoder::new(compressed_bytes); + scratch.clear(); + gz.read_to_end(scratch).context(error::IoSnafu)?; + } + CompressionType::Zstd => { + let mut reader = + zstd::Decoder::new(compressed_bytes).context(error::BuildZstdDecoderSnafu)?; + scratch.clear(); + reader.read_to_end(scratch).context(error::IoSnafu)?; + } + CompressionType::Snappy => { + let len = snap::raw::decompress_len(compressed_bytes) + .context(error::BuildSnappyDecoderSnafu)?; + scratch.resize(len, 0); + let mut decoder = snap::raw::Decoder::new(); + decoder + .decompress(compressed_bytes, scratch) + .context(error::BuildSnappyDecoderSnafu)?; + } + CompressionType::Lzo => { + let decompressed = lzokay_native::decompress_all(compressed_bytes, None) + .context(error::BuildLzoDecoderSnafu)?; + // TODO: better way to utilize scratch here + scratch.clear(); + scratch.extend(decompressed); + } + CompressionType::Lz4 => { + let decompressed = lz4_flex::block::decompress( + compressed_bytes, + compression.max_decompressed_block_size, + ) + .context(error::BuildLz4DecoderSnafu)?; + // TODO: better way to utilize scratch here + scratch.clear(); + scratch.extend(decompressed); + } + }; + Ok(()) +} + enum State { Original(Bytes), Compressed(Vec), @@ -29,12 +128,12 @@ enum State { struct DecompressorIter { stream: BytesMut, current: Option, // when we have compression but the value is original - compression: CompressionKind, + compression: Option, scratch: Vec, } impl DecompressorIter { - pub fn new(stream: Bytes, compression: CompressionKind, scratch: Vec) -> Self { + pub fn new(stream: Bytes, compression: Option, scratch: Vec) -> Self { Self { stream: BytesMut::from(stream.as_ref()), current: None, @@ -62,13 +161,9 @@ impl FallibleStreamingIterator for DecompressorIter { self.current = None; return Ok(()); } + match self.compression { - CompressionKind::None => { - // todo: take stratch from current State::Compressed for re-use - self.current = Some(State::Original(self.stream.clone().into())); - self.stream.clear(); - } - CompressionKind::Zlib => { + Some(compression) => { // todo: take stratch from current State::Compressed for re-use let (is_original, length) = decode_header(&self.stream); let _ = self.stream.split_to(3); @@ -77,35 +172,18 @@ impl FallibleStreamingIterator for DecompressorIter { if is_original { self.current = Some(State::Original(maybe_compressed.into())); } else { - let mut gz = - flate2::read::DeflateDecoder::new(maybe_compressed.to_byte_slice()); - self.scratch.clear(); - gz.read_to_end(&mut self.scratch).context(error::IoSnafu)?; + decompress_block(compression, &maybe_compressed, &mut self.scratch)?; self.current = Some(State::Compressed(std::mem::take(&mut self.scratch))); } + Ok(()) } - CompressionKind::Zstd => { + None => { // todo: take stratch from current State::Compressed for re-use - let (is_original, length) = decode_header(&self.stream); - let _ = self.stream.split_to(3); - let maybe_compressed = self.stream.split_to(length); - - if is_original { - self.current = Some(State::Original(maybe_compressed.into())); - } else { - let mut reader = zstd::Decoder::new(maybe_compressed.to_byte_slice()) - .context(error::BuildZstdDecoderSnafu)?; - - self.scratch.clear(); - reader - .read_to_end(&mut self.scratch) - .context(error::IoSnafu)?; - self.current = Some(State::Compressed(std::mem::take(&mut self.scratch))); - } + self.current = Some(State::Original(self.stream.clone().into())); + self.stream.clear(); + Ok(()) } - _ => todo!(), - }; - Ok(()) + } } #[inline] @@ -126,7 +204,7 @@ pub struct Decompressor { impl Decompressor { /// Creates a new [`Decompressor`] that will use `scratch` as a temporary region. - pub fn new(stream: Bytes, compression: CompressionKind, scratch: Vec) -> Self { + pub fn new(stream: Bytes, compression: Option, scratch: Vec) -> Self { Self { decompressor: DecompressorIter::new(stream, compression, scratch), offset: 0, diff --git a/src/reader/metadata.rs b/src/reader/metadata.rs index ab911a76..1c54bf38 100644 --- a/src/reader/metadata.rs +++ b/src/reader/metadata.rs @@ -6,9 +6,11 @@ use snafu::{OptionExt, ResultExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use crate::error::{self, Result}; -use crate::proto::{CompressionKind, Footer, Metadata, PostScript, StripeFooter}; +use crate::proto::{Footer, Metadata, PostScript, StripeFooter}; use crate::reader::decompress::Decompressor; +use super::decompress::Compression; + const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024; /// The file's metadata. @@ -60,6 +62,10 @@ macro_rules! impl_read_metadata { // next is the postscript let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len..]) .context(error::DecodeProtoSnafu)?; + let compression = Compression::from_proto( + postscript.compression(), + postscript.compression_block_size + ); tail_bytes.truncate(tail_bytes.len() - postscript_len); // next is the footer @@ -76,7 +82,7 @@ macro_rules! impl_read_metadata { $reader .read_exact(&mut footer)$($_await)* .context(error::SeekSnafu)?; - let footer = deserialize_footer(&footer, postscript.compression())?; + let footer = deserialize_footer(&footer, compression)?; // finally the metadata let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu { @@ -91,7 +97,7 @@ macro_rules! impl_read_metadata { let mut metadata = vec![0; metadata_length]; $reader.read_exact(&mut metadata)$($_await)*.context(error::IoSnafu)?; - let metadata = deserialize_footer_metadata(&metadata, postscript.compression())?; + let metadata = deserialize_footer_metadata(&metadata, compression)?; let mut stripe_footers = Vec::with_capacity(footer.stripes.len()); @@ -112,7 +118,7 @@ macro_rules! impl_read_metadata { .context(error::IoSnafu)?; stripe_footers.push(deserialize_stripe_footer( &scratch, - postscript.compression(), + compression, )?); } @@ -140,7 +146,7 @@ where impl_read_metadata!(reader.await) } -fn deserialize_footer(bytes: &[u8], compression: CompressionKind) -> Result