diff --git a/.github/workflows/test-rust.yml b/.github/workflows/test-rust.yml index 6db364e210d1..4e54ca0cf8e9 100644 --- a/.github/workflows/test-rust.yml +++ b/.github/workflows/test-rust.yml @@ -52,6 +52,7 @@ jobs: -p polars-io -p polars-lazy -p polars-ops + -p polars-parquet -p polars-plan -p polars-row -p polars-sql @@ -68,6 +69,7 @@ jobs: -p polars-io -p polars-lazy -p polars-ops + -p polars-parquet -p polars-plan -p polars-row -p polars-sql diff --git a/crates/polars-arrow/src/compute/cast/binary_to.rs b/crates/polars-arrow/src/compute/cast/binary_to.rs index c7970fe6a051..d5e8bfb30852 100644 --- a/crates/polars-arrow/src/compute/cast/binary_to.rs +++ b/crates/polars-arrow/src/compute/cast/binary_to.rs @@ -139,6 +139,7 @@ pub fn binary_to_dictionary( from: &BinaryArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); + array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/binview_to.rs b/crates/polars-arrow/src/compute/cast/binview_to.rs index 8c7ef4c2453a..1c157110ec49 100644 --- a/crates/polars-arrow/src/compute/cast/binview_to.rs +++ b/crates/polars-arrow/src/compute/cast/binview_to.rs @@ -21,6 +21,7 @@ pub(super) fn binview_to_dictionary( from: &BinaryViewArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); + array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) @@ -30,6 +31,7 @@ pub(super) fn utf8view_to_dictionary( from: &Utf8ViewArray, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); + array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/primitive_to.rs b/crates/polars-arrow/src/compute/cast/primitive_to.rs index d0d2056b70de..583b6ab19a96 100644 --- a/crates/polars-arrow/src/compute/cast/primitive_to.rs +++ b/crates/polars-arrow/src/compute/cast/primitive_to.rs @@ -318,6 +318,7 @@ pub fn primitive_to_dictionary( let mut array = MutableDictionaryArray::::try_empty(MutablePrimitiveArray::::from( from.data_type().clone(), ))?; + array.reserve(from.len()); array.try_extend(iter)?; Ok(array.into()) diff --git a/crates/polars-arrow/src/compute/cast/utf8_to.rs b/crates/polars-arrow/src/compute/cast/utf8_to.rs index 4df2876d394e..85b478c43817 100644 --- a/crates/polars-arrow/src/compute/cast/utf8_to.rs +++ b/crates/polars-arrow/src/compute/cast/utf8_to.rs @@ -27,6 +27,7 @@ pub fn utf8_to_dictionary( from: &Utf8Array, ) -> PolarsResult> { let mut array = MutableDictionaryArray::>::new(); + array.reserve(from.len()); array.try_extend(from.iter())?; Ok(array.into()) diff --git a/crates/polars-io/src/parquet/write/writer.rs b/crates/polars-io/src/parquet/write/writer.rs index 2408d66e9ba2..620ac11c3351 100644 --- a/crates/polars-io/src/parquet/write/writer.rs +++ b/crates/polars-io/src/parquet/write/writer.rs @@ -102,7 +102,7 @@ where WriteOptions { write_statistics: self.statistics, compression: self.compression, - version: Version::V2, + version: Version::V1, data_pagesize_limit: self.data_page_size, } } diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index b3ea666865c9..0525578589eb 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -1,7 +1,6 @@ use arrow::array::{Array, BinaryViewArray, DictionaryArray, DictionaryKey, Utf8ViewArray}; use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::datatypes::{ArrowDataType, IntegerType}; -use num_traits::ToPrimitive; use polars_error::{polars_bail, PolarsResult}; use super::binary::{ @@ -16,23 +15,19 @@ use super::primitive::{ use super::{binview, nested, Nested, WriteOptions}; use crate::arrow::read::schema::is_nullable; use crate::arrow::write::{slice_nested_leaf, utils}; -use crate::parquet::encoding::hybrid_rle::encode_u32; +use crate::parquet::encoding::hybrid_rle::encode; use crate::parquet::encoding::Encoding; use crate::parquet::page::{DictPage, Page}; use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::{serialize_statistics, ParquetStatistics}; -use crate::write::{to_nested, DynIter, ParquetType}; +use crate::write::DynIter; pub(crate) fn encode_as_dictionary_optional( array: &dyn Array, + nested: &[Nested], type_: PrimitiveType, options: WriteOptions, ) -> Option>>> { - let nested = to_nested(array, &ParquetType::PrimitiveType(type_.clone())) - .ok()? - .pop() - .unwrap(); - let dtype = Box::new(array.data_type().clone()); let len_before = array.len(); @@ -52,35 +47,11 @@ pub(crate) fn encode_as_dictionary_optional( if (array.values().len() as f64) / (len_before as f64) > 0.75 { return None; } - if array.values().len().to_u16().is_some() { - let array = arrow::compute::cast::cast( - array, - &ArrowDataType::Dictionary( - IntegerType::UInt16, - Box::new(array.values().data_type().clone()), - false, - ), - Default::default(), - ) - .unwrap(); - - let array = array - .as_any() - .downcast_ref::>() - .unwrap(); - return Some(array_to_pages( - array, - type_, - &nested, - options, - Encoding::RleDictionary, - )); - } Some(array_to_pages( array, type_, - &nested, + nested, options, Encoding::RleDictionary, )) @@ -116,7 +87,7 @@ fn serialize_keys_values( buffer.push(num_bits as u8); // followed by the encoded indices. - Ok(encode_u32(buffer, keys, num_bits)?) + Ok(encode::(buffer, keys, num_bits)?) } else { let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64); @@ -124,7 +95,7 @@ fn serialize_keys_values( buffer.push(num_bits as u8); // followed by the encoded indices. - Ok(encode_u32(buffer, keys, num_bits)?) + Ok(encode::(buffer, keys, num_bits)?) } } diff --git a/crates/polars-parquet/src/arrow/write/mod.rs b/crates/polars-parquet/src/arrow/write/mod.rs index a980177c4835..65e03cecaae4 100644 --- a/crates/polars-parquet/src/arrow/write/mod.rs +++ b/crates/polars-parquet/src/arrow/write/mod.rs @@ -219,7 +219,7 @@ pub fn array_to_pages( // Only take this path for primitive columns if matches!(nested.first(), Some(Nested::Primitive(_, _, _))) { if let Some(result) = - encode_as_dictionary_optional(primitive_array, type_.clone(), options) + encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options) { return result; } diff --git a/crates/polars-parquet/src/arrow/write/nested/mod.rs b/crates/polars-parquet/src/arrow/write/nested/mod.rs index 46e15eec6c72..9aed392a06ee 100644 --- a/crates/polars-parquet/src/arrow/write/nested/mod.rs +++ b/crates/polars-parquet/src/arrow/write/nested/mod.rs @@ -6,7 +6,7 @@ use polars_error::PolarsResult; pub use rep::num_values; use super::Nested; -use crate::parquet::encoding::hybrid_rle::encode_u32; +use crate::parquet::encoding::hybrid_rle::encode; use crate::parquet::read::levels::get_bit_width; use crate::parquet::write::Version; @@ -41,12 +41,12 @@ fn write_rep_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - match version { Version::V1 => { write_levels_v1(buffer, |buffer: &mut Vec| { - encode_u32(buffer, levels, num_bits)?; + encode::(buffer, levels, num_bits)?; Ok(()) })?; }, Version::V2 => { - encode_u32(buffer, levels, num_bits)?; + encode::(buffer, levels, num_bits)?; }, } @@ -65,10 +65,10 @@ fn write_def_levels(buffer: &mut Vec, nested: &[Nested], version: Version) - match version { Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { - encode_u32(buffer, levels, num_bits)?; + encode::(buffer, levels, num_bits)?; Ok(()) }), - Version::V2 => Ok(encode_u32(buffer, levels, num_bits)?), + Version::V2 => Ok(encode::(buffer, levels, num_bits)?), } } diff --git a/crates/polars-parquet/src/arrow/write/utils.rs b/crates/polars-parquet/src/arrow/write/utils.rs index 2032029b2de4..0ba9f4289bab 100644 --- a/crates/polars-parquet/src/arrow/write/utils.rs +++ b/crates/polars-parquet/src/arrow/write/utils.rs @@ -4,7 +4,7 @@ use polars_error::*; use super::{Version, WriteOptions}; use crate::parquet::compression::CompressionOptions; -use crate::parquet::encoding::hybrid_rle::encode_bool; +use crate::parquet::encoding::hybrid_rle::encode; use crate::parquet::encoding::Encoding; use crate::parquet::metadata::Descriptor; use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}; @@ -14,7 +14,7 @@ use crate::parquet::statistics::ParquetStatistics; fn encode_iter_v1>(buffer: &mut Vec, iter: I) -> PolarsResult<()> { buffer.extend_from_slice(&[0; 4]); let start = buffer.len(); - encode_bool(buffer, iter)?; + encode::(buffer, iter, 1)?; let end = buffer.len(); let length = end - start; @@ -25,7 +25,7 @@ fn encode_iter_v1>(buffer: &mut Vec, iter: I) -> Po } fn encode_iter_v2>(writer: &mut Vec, iter: I) -> PolarsResult<()> { - Ok(encode_bool(writer, iter)?) + Ok(encode::(writer, iter, 1)?) } fn encode_iter>( diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs index 1c4dd67ccec7..7e1858e44979 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs @@ -3,98 +3,232 @@ use std::io::Write; use super::bitpacked_encode; use crate::parquet::encoding::{bitpacked, ceil8, uleb128}; -/// RLE-hybrid encoding of `u32`. This currently only yields bitpacked values. -pub fn encode_u32>( - writer: &mut W, - iterator: I, - num_bits: u32, -) -> std::io::Result<()> { - let num_bits = num_bits as u8; - // the length of the iterator. - let length = iterator.size_hint().1.unwrap(); +// Arbitrary value that balances memory usage and storage overhead +const MAX_VALUES_PER_LITERAL_RUN: usize = (1 << 10) * 8; + +pub trait Encoder { + fn bitpacked_encode>( + writer: &mut W, + iterator: I, + num_bits: usize, + ) -> std::io::Result<()>; + + fn run_length_encode( + writer: &mut W, + run_length: usize, + value: T, + bit_width: u32, + ) -> std::io::Result<()>; +} - // write the length + indicator - let mut header = ceil8(length) as u64; - header <<= 1; - header |= 1; // it is bitpacked => first bit is set - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - writer.write_all(&container[..used])?; +const U32_BLOCK_LEN: usize = 32; - bitpacked_encode_u32(writer, iterator, num_bits as usize)?; +impl Encoder for u32 { + fn bitpacked_encode>( + writer: &mut W, + mut iterator: I, + num_bits: usize, + ) -> std::io::Result<()> { + // the length of the iterator. + let length = iterator.size_hint().1.unwrap(); + + let mut header = ceil8(length) as u64; + header <<= 1; + header |= 1; // it is bitpacked => first bit is set + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + writer.write_all(&container[..used])?; + + let chunks = length / U32_BLOCK_LEN; + let remainder = length - chunks * U32_BLOCK_LEN; + let mut buffer = [0u32; U32_BLOCK_LEN]; + + // simplified from ceil8(U32_BLOCK_LEN * num_bits) since U32_BLOCK_LEN = 32 + let compressed_chunk_size = 4 * num_bits; + + for _ in 0..chunks { + iterator + .by_ref() + .take(U32_BLOCK_LEN) + .zip(buffer.iter_mut()) + .for_each(|(item, buf)| *buf = item); + + let mut packed = [0u8; 4 * U32_BLOCK_LEN]; + bitpacked::encode_pack::(&buffer, num_bits, packed.as_mut()); + writer.write_all(&packed[..compressed_chunk_size])?; + } + + if remainder != 0 { + // Must be careful here to ensure we write a multiple of `num_bits` + // (the bit width) to align with the spec. Some readers also rely on + // this - see https://github.com/pola-rs/polars/pull/13883. + + // this is ceil8(remainder * num_bits), but we ensure the output is a + // multiple of num_bits by rewriting it as ceil8(remainder) * num_bits + let compressed_remainder_size = ceil8(remainder) * num_bits; + iterator + .by_ref() + .take(remainder) + .zip(buffer.iter_mut()) + .for_each(|(item, buf)| *buf = item); + + let mut packed = [0u8; 4 * U32_BLOCK_LEN]; + // No need to zero rest of buffer because remainder is either: + // * Multiple of 8: We pad non-terminal literal runs to have a + // multiple of 8 values. Once compressed, the data will end on + // clean byte boundaries and packed[..compressed_remainder_size] + // will include only the remainder values and nothing extra. + // * Final run: Extra values from buffer will be included in + // packed[..compressed_remainder_size] but ignored when decoding + // because they extend beyond known column length + bitpacked::encode_pack(&buffer, num_bits, packed.as_mut()); + writer.write_all(&packed[..compressed_remainder_size])?; + }; + Ok(()) + } - Ok(()) + fn run_length_encode( + writer: &mut W, + run_length: usize, + value: u32, + bit_width: u32, + ) -> std::io::Result<()> { + // write the length + indicator + let mut header = run_length as u64; + header <<= 1; + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + writer.write_all(&container[..used])?; + + let num_bytes = ceil8(bit_width as usize); + let bytes = value.to_le_bytes(); + writer.write_all(&bytes[..num_bytes])?; + Ok(()) + } } -const U32_BLOCK_LEN: usize = 32; - -fn bitpacked_encode_u32>( - writer: &mut W, - mut iterator: I, - num_bits: usize, -) -> std::io::Result<()> { - // the length of the iterator. - let length = iterator.size_hint().1.unwrap(); - - let chunks = length / U32_BLOCK_LEN; - let remainder = length - chunks * U32_BLOCK_LEN; - let mut buffer = [0u32; U32_BLOCK_LEN]; - - // simplified from ceil8(U32_BLOCK_LEN * num_bits) since U32_BLOCK_LEN = 32 - let compressed_chunk_size = 4 * num_bits; - - for _ in 0..chunks { - iterator - .by_ref() - .take(U32_BLOCK_LEN) - .zip(buffer.iter_mut()) - .for_each(|(item, buf)| *buf = item); - - let mut packed = [0u8; 4 * U32_BLOCK_LEN]; - bitpacked::encode_pack::(&buffer, num_bits, packed.as_mut()); - writer.write_all(&packed[..compressed_chunk_size])?; +impl Encoder for bool { + fn bitpacked_encode>( + writer: &mut W, + iterator: I, + _num_bits: usize, + ) -> std::io::Result<()> { + // the length of the iterator. + let length = iterator.size_hint().1.unwrap(); + + let mut header = ceil8(length) as u64; + header <<= 1; + header |= 1; // it is bitpacked => first bit is set + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + writer.write_all(&container[..used])?; + bitpacked_encode(writer, iterator)?; + Ok(()) } - if remainder != 0 { - // Must be careful here to ensure we write a multiple of `num_bits` - // (the bit width) to align with the spec. Some readers also rely on - // this - see https://github.com/pola-rs/polars/pull/13883. - - // this is ceil8(remainder * num_bits), but we ensure the output is a - // multiple of num_bits by rewriting it as ceil8(remainder) * num_bits - let compressed_remainder_size = ceil8(remainder) * num_bits; - iterator - .by_ref() - .take(remainder) - .zip(buffer.iter_mut()) - .for_each(|(item, buf)| *buf = item); - - let mut packed = [0u8; 4 * U32_BLOCK_LEN]; - bitpacked::encode_pack(&buffer, num_bits, packed.as_mut()); - writer.write_all(&packed[..compressed_remainder_size])?; - }; - Ok(()) + fn run_length_encode( + writer: &mut W, + run_length: usize, + value: bool, + _bit_width: u32, + ) -> std::io::Result<()> { + // write the length + indicator + let mut header = run_length as u64; + header <<= 1; + let mut container = [0; 10]; + let used = uleb128::encode(header, &mut container); + writer.write_all(&container[..used])?; + writer.write_all(&(value as u8).to_le_bytes())?; + Ok(()) + } } -/// the bitpacked part of the encoder. -pub fn encode_bool>( +#[allow(clippy::comparison_chain)] +pub fn encode, W: Write, I: Iterator>( writer: &mut W, iterator: I, + num_bits: u32, ) -> std::io::Result<()> { - // the length of the iterator. - let length = iterator.size_hint().1.unwrap(); - - // write the length + indicator - let mut header = ceil8(length) as u64; - header <<= 1; - header |= 1; // it is bitpacked => first bit is set - let mut container = [0; 10]; - let used = uleb128::encode(header, &mut container); - - writer.write_all(&container[..used])?; - - // encode the iterator - bitpacked_encode(writer, iterator) + let mut consecutive_repeats: usize = 0; + let mut previous_val = T::default(); + let mut buffered_bits = [previous_val; MAX_VALUES_PER_LITERAL_RUN]; + let mut buffer_idx = 0; + let mut literal_run_idx = 0; + for val in iterator { + if val == previous_val { + consecutive_repeats += 1; + if consecutive_repeats >= 8 { + // Run is long enough to RLE, no need to buffer values + if consecutive_repeats > 8 { + continue; + } else { + // When we encounter a run long enough to potentially RLE, + // we must first ensure that the buffered literal run has + // a multiple of 8 values for bit-packing. If not, we pad + // up by taking some of the consecutive repeats + let literal_padding = (8 - (literal_run_idx % 8)) % 8; + consecutive_repeats -= literal_padding; + literal_run_idx += literal_padding; + } + } + // Too short to RLE, continue to buffer values + } else if consecutive_repeats > 8 { + // Value changed so start a new run but the current run is long + // enough to RLE. First, bit-pack any buffered literal run. Then, + // RLE current run and reset consecutive repeat counter and buffer. + if literal_run_idx > 0 { + debug_assert!(literal_run_idx % 8 == 0); + T::bitpacked_encode( + writer, + buffered_bits.iter().take(literal_run_idx).copied(), + num_bits as usize, + )?; + literal_run_idx = 0; + } + T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?; + consecutive_repeats = 1; + buffer_idx = 0; + } else { + // Value changed so start a new run but the current run is not long + // enough to RLE. Consolidate all consecutive repeats into buffered + // literal run. + literal_run_idx = buffer_idx; + consecutive_repeats = 1; + } + // If buffer is full, bit-pack as literal run and reset + if buffer_idx == MAX_VALUES_PER_LITERAL_RUN { + T::bitpacked_encode(writer, buffered_bits.iter().copied(), num_bits as usize)?; + // If buffer fills up in the middle of a run, all but the last + // repeat is consolidated into the literal run. + debug_assert!( + (consecutive_repeats < 8) + && (buffer_idx - literal_run_idx == consecutive_repeats - 1) + ); + consecutive_repeats = 1; + buffer_idx = 0; + literal_run_idx = 0; + } + buffered_bits[buffer_idx] = val; + previous_val = val; + buffer_idx += 1; + } + // Final run not long enough to RLE, extend literal run. + if consecutive_repeats <= 8 { + literal_run_idx = buffer_idx; + } + // Bit-pack final buffered literal run, if any + if literal_run_idx > 0 { + T::bitpacked_encode( + writer, + buffered_bits.iter().take(literal_run_idx).copied(), + num_bits as usize, + )?; + } + // RLE final consecutive run if long enough + if consecutive_repeats > 8 { + T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?; + } + Ok(()) } #[cfg(test)] @@ -108,7 +242,7 @@ mod tests { let mut vec = vec![]; - encode_bool(&mut vec, iter)?; + encode::(&mut vec, iter, 1)?; assert_eq!(vec, vec![(2 << 1 | 1), 0b10011101u8, 0b00011101]); @@ -119,9 +253,10 @@ mod tests { fn bool_from_iter() -> std::io::Result<()> { let mut vec = vec![]; - encode_bool( + encode::( &mut vec, vec![true, true, true, true, true, true, true, true].into_iter(), + 1, )?; assert_eq!(vec, vec![(1 << 1 | 1), 0b11111111]); @@ -132,7 +267,7 @@ mod tests { fn test_encode_u32() -> std::io::Result<()> { let mut vec = vec![]; - encode_u32(&mut vec, vec![0, 1, 2, 1, 2, 1, 1, 0, 3].into_iter(), 2)?; + encode::(&mut vec, vec![0, 1, 2, 1, 2, 1, 1, 0, 3].into_iter(), 2)?; assert_eq!( vec, @@ -153,7 +288,7 @@ mod tests { let values = (0..128).map(|x| x % 4); - encode_u32(&mut vec, values, 2)?; + encode::(&mut vec, values, 2)?; let length = 128; let expected = 0b11_10_01_00u8; @@ -170,7 +305,7 @@ mod tests { let values = vec![3, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3].into_iter(); let mut vec = vec![]; - encode_u32(&mut vec, values, 2)?; + encode::(&mut vec, values, 2)?; let expected = vec![5, 207, 254, 247, 51]; assert_eq!(expected, vec); diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs index 3dc072552524..89816f87fb54 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs @@ -4,7 +4,7 @@ mod decoder; mod encoder; pub use bitmap::{encode_bool as bitpacked_encode, BitmapIter}; pub use decoder::Decoder; -pub use encoder::{encode_bool, encode_u32}; +pub use encoder::encode; use polars_utils::iter::FallibleIterator; use super::bitpacked; @@ -137,7 +137,7 @@ mod tests { let data = (0..1000).collect::>(); - encode_u32(&mut buffer, data.iter().cloned(), num_bits).unwrap(); + encode::(&mut buffer, data.iter().cloned(), num_bits).unwrap(); let decoder = HybridRleDecoder::try_new(&buffer, num_bits, data.len())?; diff --git a/crates/polars/tests/it/io/parquet/write/binary.rs b/crates/polars/tests/it/io/parquet/write/binary.rs index 3112f115c3e7..dd4e3a942c46 100644 --- a/crates/polars/tests/it/io/parquet/write/binary.rs +++ b/crates/polars/tests/it/io/parquet/write/binary.rs @@ -1,4 +1,4 @@ -use polars_parquet::parquet::encoding::hybrid_rle::encode_bool; +use polars_parquet::parquet::encoding::hybrid_rle::encode; use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::Result; use polars_parquet::parquet::metadata::Descriptor; @@ -25,7 +25,7 @@ fn unzip_option(array: &[Option>]) -> Result<(Vec, Vec)> { false } }); - encode_bool(&mut validity, iter)?; + encode::(&mut validity, iter, 1)?; // write the length, now that it is known let mut validity = validity.into_inner(); diff --git a/crates/polars/tests/it/io/parquet/write/primitive.rs b/crates/polars/tests/it/io/parquet/write/primitive.rs index 3b5ae150896a..e5da32252e99 100644 --- a/crates/polars/tests/it/io/parquet/write/primitive.rs +++ b/crates/polars/tests/it/io/parquet/write/primitive.rs @@ -1,4 +1,4 @@ -use polars_parquet::parquet::encoding::hybrid_rle::encode_bool; +use polars_parquet::parquet::encoding::hybrid_rle::encode; use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::Result; use polars_parquet::parquet::metadata::Descriptor; @@ -24,7 +24,7 @@ fn unzip_option(array: &[Option]) -> Result<(Vec, Vec) false } }); - encode_bool(&mut validity, iter)?; + encode::(&mut validity, iter, 1)?; // write the length, now that it is known let mut validity = validity.into_inner(); diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 12ac1a835b40..6ef0c4201981 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -892,3 +892,68 @@ def test_no_glob_windows(tmp_path: Path) -> None: df.write_parquet(str(p2)) assert_frame_equal(pl.scan_parquet(str(p1), glob=False).collect(), df) + + +@pytest.mark.slow() +def test_hybrid_rle() -> None: + # 10_007 elements to test if not a nice multiple of 8 + n = 10_007 + literal_literal = [] + literal_rle = [] + for i in range(500): + literal_literal.append(np.repeat(i, 5)) + literal_literal.append(np.repeat(i + 2, 11)) + literal_rle.append(np.repeat(i, 5)) + literal_rle.append(np.repeat(i + 2, 15)) + literal_literal.append(np.random.randint(0, 10, size=2007)) + literal_rle.append(np.random.randint(0, 10, size=7)) + literal_literal = np.concatenate(literal_literal) + literal_rle = np.concatenate(literal_rle) + df = pl.DataFrame( + { + # Primitive types + "i64": pl.Series([1, 2], dtype=pl.Int64).sample(n, with_replacement=True), + "u64": pl.Series([1, 2], dtype=pl.UInt64).sample(n, with_replacement=True), + "i8": pl.Series([1, 2], dtype=pl.Int8).sample(n, with_replacement=True), + "u8": pl.Series([1, 2], dtype=pl.UInt8).sample(n, with_replacement=True), + "string": pl.Series(["abc", "def"], dtype=pl.String).sample( + n, with_replacement=True + ), + "categorical": pl.Series(["aaa", "bbb"], dtype=pl.Categorical).sample( + n, with_replacement=True + ), + # Fill up bit-packing buffer in middle of consecutive run + "large_bit_pack": np.concatenate( + [np.repeat(i, 5) for i in range(2000)] + + [np.random.randint(0, 10, size=7)] + ), + # Literal run that is not a multiple of 8 followed by consecutive + # run initially long enough to RLE but not after padding literal + "literal_literal": literal_literal, + # Literal run that is not a multiple of 8 followed by consecutive + # run long enough to RLE even after padding literal + "literal_rle": literal_rle, + # Final run not long enough to RLE + "final_literal": np.concatenate( + [np.random.randint(0, 100, 10_000), np.repeat(-1, 7)] + ), + # Final run long enough to RLE + "final_rle": np.concatenate( + [np.random.randint(0, 100, 9_998), np.repeat(-1, 9)] + ), + # Test filling up bit-packing buffer for encode_bool, + # which is only used to encode validities + "large_bit_pack_validity": [0, None] * 4092 + + [0] * 9 + + [1] * 9 + + [2] * 10 + + [0] * 1795, + } + ) + f = io.BytesIO() + df.write_parquet(f) + f.seek(0) + for column in pq.ParquetFile(f).metadata.to_dict()["row_groups"][0]["columns"]: + assert "RLE_DICTIONARY" in column["encodings"] + f.seek(0) + assert_frame_equal(pl.read_parquet(f), df)