Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): Add RLE to RLE_DICTIONARY encoder #15959

Merged
merged 5 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/polars-arrow/src/compute/cast/binary_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ pub fn binary_to_dictionary<O: Offset, K: DictionaryKey>(
from: &BinaryArray<O>,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableBinaryArray<O>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-arrow/src/compute/cast/binview_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(super) fn binview_to_dictionary<K: DictionaryKey>(
from: &BinaryViewArray,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableBinaryViewArray<[u8]>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand All @@ -30,6 +31,7 @@ pub(super) fn utf8view_to_dictionary<K: DictionaryKey>(
from: &Utf8ViewArray,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableBinaryViewArray<str>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ pub fn primitive_to_dictionary<T: NativeType + Eq + Hash, K: DictionaryKey>(
let mut array = MutableDictionaryArray::<K, _>::try_empty(MutablePrimitiveArray::<T>::from(
from.data_type().clone(),
))?;
array.reserve(from.len());
array.try_extend(iter)?;

Ok(array.into())
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/compute/cast/utf8_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub fn utf8_to_dictionary<O: Offset, K: DictionaryKey>(
from: &Utf8Array<O>,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableUtf8Array<O>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ where
WriteOptions {
write_statistics: self.statistics,
compression: self.compression,
version: Version::V2,
version: Version::V1,
data_pagesize_limit: self.data_page_size,
}
}
Expand Down
41 changes: 6 additions & 35 deletions crates/polars-parquet/src/arrow/write/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use arrow::array::{Array, BinaryViewArray, DictionaryArray, DictionaryKey, Utf8ViewArray};
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::{ArrowDataType, IntegerType};
use num_traits::ToPrimitive;
use polars_error::{polars_bail, PolarsResult};

use super::binary::{
Expand All @@ -16,23 +15,19 @@ use super::primitive::{
use super::{binview, nested, Nested, WriteOptions};
use crate::arrow::read::schema::is_nullable;
use crate::arrow::write::{slice_nested_leaf, utils};
use crate::parquet::encoding::hybrid_rle::encode_u32;
use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{DictPage, Page};
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::{serialize_statistics, ParquetStatistics};
use crate::write::{to_nested, DynIter, ParquetType};
use crate::write::DynIter;

pub(crate) fn encode_as_dictionary_optional(
array: &dyn Array,
nested: &[Nested],
type_: PrimitiveType,
options: WriteOptions,
) -> Option<PolarsResult<DynIter<'static, PolarsResult<Page>>>> {
let nested = to_nested(array, &ParquetType::PrimitiveType(type_.clone()))
.ok()?
.pop()
.unwrap();

let dtype = Box::new(array.data_type().clone());

let len_before = array.len();
Expand All @@ -52,35 +47,11 @@ pub(crate) fn encode_as_dictionary_optional(
if (array.values().len() as f64) / (len_before as f64) > 0.75 {
return None;
}
if array.values().len().to_u16().is_some() {
let array = arrow::compute::cast::cast(
array,
&ArrowDataType::Dictionary(
IntegerType::UInt16,
Box::new(array.values().data_type().clone()),
false,
),
Default::default(),
)
.unwrap();

let array = array
.as_any()
.downcast_ref::<DictionaryArray<u16>>()
.unwrap();
return Some(array_to_pages(
array,
type_,
&nested,
options,
Encoding::RleDictionary,
));
}

Some(array_to_pages(
array,
type_,
&nested,
nested,
options,
Encoding::RleDictionary,
))
Expand Down Expand Up @@ -116,15 +87,15 @@ fn serialize_keys_values<K: DictionaryKey>(
buffer.push(num_bits as u8);

// followed by the encoded indices.
Ok(encode_u32(buffer, keys, num_bits)?)
Ok(encode::<u32, _, _>(buffer, keys, num_bits)?)
} else {
let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64);

// num_bits as a single byte
buffer.push(num_bits as u8);

// followed by the encoded indices.
Ok(encode_u32(buffer, keys, num_bits)?)
Ok(encode::<u32, _, _>(buffer, keys, num_bits)?)
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub fn array_to_pages(
// Only take this path for primitive columns
if matches!(nested.first(), Some(Nested::Primitive(_, _, _))) {
if let Some(result) =
encode_as_dictionary_optional(primitive_array, type_.clone(), options)
encode_as_dictionary_optional(primitive_array, nested, type_.clone(), options)
{
return result;
}
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-parquet/src/arrow/write/nested/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use polars_error::PolarsResult;
pub use rep::num_values;

use super::Nested;
use crate::parquet::encoding::hybrid_rle::encode_u32;
use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::read::levels::get_bit_width;
use crate::parquet::write::Version;

Expand Down Expand Up @@ -41,12 +41,12 @@ fn write_rep_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -
match version {
Version::V1 => {
write_levels_v1(buffer, |buffer: &mut Vec<u8>| {
encode_u32(buffer, levels, num_bits)?;
encode::<u32, _, _>(buffer, levels, num_bits)?;
Ok(())
})?;
},
Version::V2 => {
encode_u32(buffer, levels, num_bits)?;
encode::<u32, _, _>(buffer, levels, num_bits)?;
},
}

Expand All @@ -65,10 +65,10 @@ fn write_def_levels(buffer: &mut Vec<u8>, nested: &[Nested], version: Version) -

match version {
Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec<u8>| {
encode_u32(buffer, levels, num_bits)?;
encode::<u32, _, _>(buffer, levels, num_bits)?;
Ok(())
}),
Version::V2 => Ok(encode_u32(buffer, levels, num_bits)?),
Version::V2 => Ok(encode::<u32, _, _>(buffer, levels, num_bits)?),
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/polars-parquet/src/arrow/write/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use polars_error::*;

use super::{Version, WriteOptions};
use crate::parquet::compression::CompressionOptions;
use crate::parquet::encoding::hybrid_rle::encode_bool;
use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::metadata::Descriptor;
use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2};
Expand All @@ -14,7 +14,7 @@ use crate::parquet::statistics::ParquetStatistics;
fn encode_iter_v1<I: Iterator<Item = bool>>(buffer: &mut Vec<u8>, iter: I) -> PolarsResult<()> {
buffer.extend_from_slice(&[0; 4]);
let start = buffer.len();
encode_bool(buffer, iter)?;
encode::<bool, _, _>(buffer, iter, 1)?;
let end = buffer.len();
let length = end - start;

Expand All @@ -25,7 +25,7 @@ fn encode_iter_v1<I: Iterator<Item = bool>>(buffer: &mut Vec<u8>, iter: I) -> Po
}

fn encode_iter_v2<I: Iterator<Item = bool>>(writer: &mut Vec<u8>, iter: I) -> PolarsResult<()> {
Ok(encode_bool(writer, iter)?)
Ok(encode::<bool, _, _>(writer, iter, 1)?)
}

fn encode_iter<I: Iterator<Item = bool>>(
Expand Down
Loading
Loading