Skip to content

Commit

Permalink
perf: Allow for arbitrary skips in Parquet Dictionary Decoding (#19649)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Nov 6, 2024
1 parent a186f12 commit b23da1f
Show file tree
Hide file tree
Showing 18 changed files with 1,130 additions and 831 deletions.
18 changes: 17 additions & 1 deletion crates/polars-arrow/src/bitmap/bitmask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::simd::{LaneCount, Mask, MaskElement, SupportedLaneCount};
use polars_utils::slice::load_padded_le_u64;

use super::iterator::FastU56BitmapIter;
use super::utils::{count_zeros, BitmapIter};
use super::utils::{count_zeros, fmt, BitmapIter};
use crate::bitmap::Bitmap;

/// Returns the nth set bit in w, if n+1 bits are set. The indexing is
Expand Down Expand Up @@ -79,6 +79,15 @@ pub struct BitMask<'a> {
len: usize,
}

impl std::fmt::Debug for BitMask<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { bytes, offset, len } = self;
let offset_num_bytes = offset / 8;
let offset_in_byte = offset % 8;
fmt(&bytes[offset_num_bytes..], offset_in_byte, *len, f)
}
}

impl<'a> BitMask<'a> {
pub fn from_bitmap(bitmap: &'a Bitmap) -> Self {
let (bytes, offset, len) = bitmap.as_slice();
Expand All @@ -92,6 +101,13 @@ impl<'a> BitMask<'a> {
self.len
}

#[inline]
pub fn advance_by(&mut self, idx: usize) {
assert!(idx <= self.len);
self.offset += idx;
self.len -= idx;
}

#[inline]
pub fn split_at(&self, idx: usize) -> (Self, Self) {
assert!(idx <= self.len);
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-parquet/src/arrow/read/deserialize/binview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::buffer::Buffer;
use arrow::datatypes::{ArrowDataType, PhysicalType};

use super::utils::dict_encoded::{append_validity, constrain_page_validity};
use super::dictionary_encoded::{append_validity, constrain_page_validity};
use super::utils::{
dict_indices_decoder, filter_from_range, freeze_validity, unspecialized_decode,
};
use super::Filter;
use super::{dictionary_encoded, Filter};
use crate::parquet::encoding::{delta_byte_array, delta_length_byte_array, hybrid_rle, Encoding};
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::page::{split_buffer, DataPage, DictPage};
Expand Down Expand Up @@ -521,7 +521,7 @@ impl utils::Decoder for BinViewDecoder {

let start_length = decoded.0.views().len();

utils::dict_encoded::decode_dict(
dictionary_encoded::decode_dict(
indexes.clone(),
dict,
state.is_optional,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::ArrowDataType;
use polars_compute::filter::filter_boolean_kernel;

use super::utils::dict_encoded::{append_validity, constrain_page_validity};
use super::dictionary_encoded::{append_validity, constrain_page_validity};
use super::utils::{
self, decode_hybrid_rle_into_bitmap, filter_from_range, freeze_validity, Decoder, ExactSize,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
use arrow::bitmap::bitmask::BitMask;
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::types::{AlignedBytes, NativeType};
use polars_compute::filter::filter_boolean_kernel;

use super::ParquetError;
use crate::parquet::encoding::hybrid_rle::HybridRleDecoder;
use crate::parquet::error::ParquetResult;
use crate::read::Filter;

mod optional;
mod optional_masked_dense;
mod required;
mod required_masked_dense;

pub fn decode_dict<T: NativeType>(
values: HybridRleDecoder<'_>,
dict: &[T],
is_optional: bool,
page_validity: Option<&Bitmap>,
filter: Option<Filter>,
validity: &mut MutableBitmap,
target: &mut Vec<T>,
) -> ParquetResult<()> {
decode_dict_dispatch(
values,
bytemuck::cast_slice(dict),
is_optional,
page_validity,
filter,
validity,
<T::AlignedBytes as AlignedBytes>::cast_vec_ref_mut(target),
)
}

#[inline(never)]
pub fn decode_dict_dispatch<B: AlignedBytes>(
mut values: HybridRleDecoder<'_>,
dict: &[B],
is_optional: bool,
page_validity: Option<&Bitmap>,
filter: Option<Filter>,
validity: &mut MutableBitmap,
target: &mut Vec<B>,
) -> ParquetResult<()> {
if cfg!(debug_assertions) && is_optional {
assert_eq!(target.len(), validity.len());
}

if is_optional {
append_validity(page_validity, filter.as_ref(), validity, values.len());
}

let page_validity = constrain_page_validity(values.len(), page_validity, filter.as_ref());

match (filter, page_validity) {
(None, None) => required::decode(values, dict, target, 0),
(Some(Filter::Range(rng)), None) => {
values.limit_to(rng.end);
required::decode(values, dict, target, rng.start)
},
(None, Some(page_validity)) => optional::decode(values, dict, page_validity, target, 0),
(Some(Filter::Range(rng)), Some(page_validity)) => {
optional::decode(values, dict, page_validity, target, rng.start)
},
(Some(Filter::Mask(filter)), None) => {
required_masked_dense::decode(values, dict, filter, target)
},
(Some(Filter::Mask(filter)), Some(page_validity)) => {
optional_masked_dense::decode(values, dict, filter, page_validity, target)
},
}?;

if cfg!(debug_assertions) && is_optional {
assert_eq!(target.len(), validity.len());
}

Ok(())
}

pub(crate) fn append_validity(
page_validity: Option<&Bitmap>,
filter: Option<&Filter>,
validity: &mut MutableBitmap,
values_len: usize,
) {
match (page_validity, filter) {
(None, None) => validity.extend_constant(values_len, true),
(None, Some(f)) => validity.extend_constant(f.num_rows(), true),
(Some(page_validity), None) => validity.extend_from_bitmap(page_validity),
(Some(page_validity), Some(Filter::Range(rng))) => {
let page_validity = page_validity.clone();
validity.extend_from_bitmap(&page_validity.clone().sliced(rng.start, rng.len()))
},
(Some(page_validity), Some(Filter::Mask(mask))) => {
validity.extend_from_bitmap(&filter_boolean_kernel(page_validity, mask))
},
}
}

pub(crate) fn constrain_page_validity(
values_len: usize,
page_validity: Option<&Bitmap>,
filter: Option<&Filter>,
) -> Option<Bitmap> {
let num_unfiltered_rows = match (filter.as_ref(), page_validity) {
(None, None) => values_len,
(None, Some(pv)) => {
debug_assert!(pv.len() >= values_len);
pv.len()
},
(Some(f), v) => {
if cfg!(debug_assertions) {
if let Some(v) = v {
assert!(v.len() >= f.max_offset());
}
}

f.max_offset()
},
};

page_validity.map(|pv| {
if pv.len() > num_unfiltered_rows {
pv.clone().sliced(0, num_unfiltered_rows)
} else {
pv.clone()
}
})
}

#[cold]
fn oob_dict_idx() -> ParquetError {
ParquetError::oos("Dictionary Index is out-of-bounds")
}

#[cold]
fn no_more_bitpacked_values() -> ParquetError {
ParquetError::oos("Bitpacked Hybrid-RLE ran out before all values were served")
}

#[inline(always)]
fn verify_dict_indices(indices: &[u32; 32], dict_size: usize) -> ParquetResult<()> {
let mut is_valid = true;
for &idx in indices {
is_valid &= (idx as usize) < dict_size;
}

if is_valid {
return Ok(());
}

Err(oob_dict_idx())
}

#[inline(always)]
fn verify_dict_indices_slice(indices: &[u32], dict_size: usize) -> ParquetResult<()> {
let mut is_valid = true;
for &idx in indices {
is_valid &= (idx as usize) < dict_size;
}

if is_valid {
return Ok(());
}

Err(oob_dict_idx())
}

/// Skip over entire chunks in a [`HybridRleDecoder`] as long as all skipped chunks do not include
/// more than `num_values_to_skip` values.
#[inline(always)]
fn required_skip_whole_chunks(
values: &mut HybridRleDecoder<'_>,
num_values_to_skip: &mut usize,
) -> ParquetResult<()> {
if *num_values_to_skip == 0 {
return Ok(());
}

loop {
let mut values_clone = values.clone();
let Some(chunk_len) = values_clone.next_chunk_length()? else {
break;
};
if *num_values_to_skip < chunk_len {
break;
}
*values = values_clone;
*num_values_to_skip -= chunk_len;
}

Ok(())
}

/// Skip over entire chunks in a [`HybridRleDecoder`] as long as all skipped chunks do not include
/// more than `num_values_to_skip` values.
#[inline(always)]
fn optional_skip_whole_chunks(
values: &mut HybridRleDecoder<'_>,
validity: &mut BitMask<'_>,
num_rows_to_skip: &mut usize,
num_values_to_skip: &mut usize,
) -> ParquetResult<()> {
if *num_values_to_skip == 0 {
return Ok(());
}

let mut total_num_skipped_values = 0;

loop {
let mut values_clone = values.clone();
let Some(chunk_len) = values_clone.next_chunk_length()? else {
break;
};
if *num_values_to_skip < chunk_len {
break;
}
*values = values_clone;
*num_values_to_skip -= chunk_len;
total_num_skipped_values += chunk_len;
}

if total_num_skipped_values > 0 {
let offset = validity
.nth_set_bit_idx(total_num_skipped_values - 1, 0)
.map_or(validity.len(), |v| v + 1);
*num_rows_to_skip -= offset;
validity.advance_by(offset);
}

Ok(())
}
Loading

0 comments on commit b23da1f

Please sign in to comment.