Skip to content

Commit

Permalink
perf: Branchless Parquet Prefiltering (#19190)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Nov 1, 2024
1 parent 3d124c5 commit 3fe10a3
Show file tree
Hide file tree
Showing 45 changed files with 3,728 additions and 4,752 deletions.
12 changes: 11 additions & 1 deletion crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
// Verify the invariants
#[cfg(debug_assertions)]
{
if let Some(validity) = validity.as_ref() {
assert_eq!(validity.len(), views.len());
}

// @TODO: Enable this. This is currently bugged with concatenate.
// let mut actual_total_buffer_len = 0;
// let mut actual_total_bytes_len = 0;
Expand All @@ -169,7 +173,13 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
// actual_total_buffer_len += buffer.len();
// }

for view in views.iter() {
for (i, view) in views.iter().enumerate() {
let is_valid = validity.as_ref().map_or(true, |v| v.get_bit(i));

if !is_valid {
continue;
}

// actual_total_bytes_len += view.length as usize;
if view.length > View::MAX_INLINE_SIZE {
assert!((view.buffer_idx as usize) < (buffers.len()));
Expand Down
6 changes: 5 additions & 1 deletion crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
Self::from_iterator(slice.as_ref().iter().map(|opt_v| opt_v.as_ref()))
}

fn finish_in_progress(&mut self) -> bool {
pub fn finish_in_progress(&mut self) -> bool {
if !self.in_progress_buffer.is_empty() {
self.completed_buffers
.push(std::mem::take(&mut self.in_progress_buffer).into());
Expand All @@ -530,6 +530,10 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
arr
}

pub fn take(self) -> (Vec<View>, Vec<Buffer<u8>>) {
(self.views, self.completed_buffers)
}

#[inline]
pub fn value(&self, i: usize) -> &T {
assert!(i < self.len());
Expand Down
13 changes: 13 additions & 0 deletions crates/polars-arrow/src/array/binview/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,19 @@ impl View {
}
}

/// Construct a byte slice from an inline view.
///
/// # Safety
///
/// Assumes that this view is inlinable.
pub unsafe fn get_inlined_slice_unchecked(&self) -> &[u8] {
debug_assert!(self.length <= View::MAX_INLINE_SIZE);

let ptr = self as *const View as *const u8;
// SAFETY: Invariant of function
unsafe { std::slice::from_raw_parts(ptr.add(4), self.length as usize) }
}

/// Extend a `Vec<View>` with inline views slices of `src` with `width`.
///
/// This tries to use SIMD to optimize the copying and can be massively faster than doing a
Expand Down
41 changes: 40 additions & 1 deletion crates/polars-arrow/src/bitmap/bitmask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::simd::{LaneCount, Mask, MaskElement, SupportedLaneCount};

use polars_utils::slice::load_padded_le_u64;

use super::iterator::FastU56BitmapIter;
use super::utils::{count_zeros, BitmapIter};
use crate::bitmap::Bitmap;

/// Returns the nth set bit in w, if n+1 bits are set. The indexing is
Expand Down Expand Up @@ -110,6 +112,39 @@ impl<'a> BitMask<'a> {
(left, right)
}

#[inline]
pub fn sliced(&self, offset: usize, length: usize) -> Self {
assert!(offset.checked_add(length).unwrap() <= self.len);
unsafe { self.sliced_unchecked(offset, length) }
}

/// # Safety
/// The index must be in-bounds.
#[inline]
pub unsafe fn sliced_unchecked(&self, offset: usize, length: usize) -> Self {
if cfg!(debug_assertions) {
assert!(offset.checked_add(length).unwrap() <= self.len);
}

Self {
bytes: self.bytes,
offset: self.offset + offset,
len: length,
}
}

pub fn unset_bits(&self) -> usize {
count_zeros(self.bytes, self.offset, self.len)
}

pub fn set_bits(&self) -> usize {
self.len - self.unset_bits()
}

pub fn fast_iter_u56(&self) -> FastU56BitmapIter {
FastU56BitmapIter::new(self.bytes, self.offset, self.len)
}

#[cfg(feature = "simd")]
#[inline]
pub fn get_simd<T, const N: usize>(&self, idx: usize) -> Mask<T, N>
Expand Down Expand Up @@ -162,7 +197,7 @@ impl<'a> BitMask<'a> {

/// Computes the index of the nth set bit after start.
///
/// Both are zero-indexed, so nth_set_bit_idx(0, 0) finds the index of the
/// Both are zero-indexed, so `nth_set_bit_idx(0, 0)` finds the index of the
/// first bit set (which can be 0 as well). The returned index is absolute,
/// not relative to start.
pub fn nth_set_bit_idx(&self, mut n: usize, mut start: usize) -> Option<usize> {
Expand Down Expand Up @@ -245,6 +280,10 @@ impl<'a> BitMask<'a> {
false
}
}

pub fn iter(&self) -> BitmapIter {
BitmapIter::new(self.bytes, self.offset, self.len)
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/bitmap/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl Iterator for FastU32BitmapIter<'_> {

unsafe impl TrustedLen for FastU32BitmapIter<'_> {}

#[derive(Clone)]
pub struct FastU56BitmapIter<'a> {
bytes: &'a [u8],
shift: u32,
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-arrow/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<T> Buffer<T> {
}

/// Auxiliary method to create a new Buffer
pub(crate) fn from_storage(storage: SharedStorage<T>) -> Self {
pub fn from_storage(storage: SharedStorage<T>) -> Self {
let ptr = storage.as_ptr();
let length = storage.len();
Buffer {
Expand Down Expand Up @@ -164,6 +164,8 @@ impl<T> Buffer<T> {
#[inline]
#[must_use]
pub unsafe fn sliced_unchecked(mut self, offset: usize, length: usize) -> Self {
debug_assert!(offset + length <= self.len());

self.slice_unchecked(offset, length);
self
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/pushable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl<T: NativeType> Pushable<Option<T>> for MutablePrimitiveArray<T> {
pub trait NoOption {}
impl NoOption for &str {}
impl NoOption for &[u8] {}
impl NoOption for Vec<u8> {}

impl<T, K> Pushable<T> for MutableBinaryViewArray<K>
where
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-arrow/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,12 @@ impl<T: Pod> SharedStorage<T> {
return Err(self);
}

Ok(SharedStorage {
let storage = SharedStorage {
inner: self.inner.cast(),
phantom: PhantomData,
})
};
std::mem::forget(self);
Ok(storage)
}
}

Expand Down
62 changes: 51 additions & 11 deletions crates/polars-arrow/src/types/aligned_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub unsafe trait AlignedBytesCast<B: AlignedBytes>: Pod {}
pub trait AlignedBytes: Pod + Zeroable + Copy + Default + Eq {
const ALIGNMENT: usize;
const SIZE: usize;
const SIZE_ALIGNMENT_PAIR: PrimitiveSizeAlignmentPair;

type Unaligned: AsRef<[u8]>
+ AsMut<[u8]>
Expand Down Expand Up @@ -45,7 +46,7 @@ pub trait AlignedBytes: Pod + Zeroable + Copy + Default + Eq {

macro_rules! impl_aligned_bytes {
(
$(($name:ident, $size:literal, $alignment:literal, [$($eq_type:ty),*]),)+
$(($name:ident, $size:literal, $alignment:literal, $sap:ident, [$($eq_type:ty),*]),)+
) => {
$(
/// Bytes with a size and alignment.
Expand All @@ -59,6 +60,7 @@ macro_rules! impl_aligned_bytes {
impl AlignedBytes for $name {
const ALIGNMENT: usize = $alignment;
const SIZE: usize = $size;
const SIZE_ALIGNMENT_PAIR: PrimitiveSizeAlignmentPair = PrimitiveSizeAlignmentPair::$sap;

type Unaligned = [u8; $size];

Expand Down Expand Up @@ -98,15 +100,53 @@ macro_rules! impl_aligned_bytes {
}
}

#[derive(Clone, Copy)]
pub enum PrimitiveSizeAlignmentPair {
S1A1,
S2A2,
S4A4,
S8A4,
S8A8,
S12A4,
S16A4,
S16A8,
S16A16,
S32A16,
}

impl PrimitiveSizeAlignmentPair {
pub const fn size(self) -> usize {
match self {
Self::S1A1 => 1,
Self::S2A2 => 2,
Self::S4A4 => 4,
Self::S8A4 | Self::S8A8 => 8,
Self::S12A4 => 12,
Self::S16A4 | Self::S16A8 | Self::S16A16 => 16,
Self::S32A16 => 32,
}
}

pub const fn alignment(self) -> usize {
match self {
Self::S1A1 => 1,
Self::S2A2 => 2,
Self::S4A4 | Self::S8A4 | Self::S12A4 | Self::S16A4 => 4,
Self::S8A8 | Self::S16A8 => 8,
Self::S16A16 | Self::S32A16 => 16,
}
}
}

impl_aligned_bytes! {
(Bytes1Alignment1, 1, 1, [u8, i8]),
(Bytes2Alignment2, 2, 2, [u16, i16, f16]),
(Bytes4Alignment4, 4, 4, [u32, i32, f32]),
(Bytes8Alignment8, 8, 8, [u64, i64, f64]),
(Bytes8Alignment4, 8, 4, [days_ms]),
(Bytes12Alignment4, 12, 4, [[u32; 3]]),
(Bytes16Alignment4, 16, 4, [View]),
(Bytes16Alignment8, 16, 8, [months_days_ns]),
(Bytes16Alignment16, 16, 16, [u128, i128]),
(Bytes32Alignment16, 32, 16, [i256]),
(Bytes1Alignment1, 1, 1, S1A1, [u8, i8]),
(Bytes2Alignment2, 2, 2, S2A2, [u16, i16, f16]),
(Bytes4Alignment4, 4, 4, S4A4, [u32, i32, f32]),
(Bytes8Alignment8, 8, 8, S8A8, [u64, i64, f64]),
(Bytes8Alignment4, 8, 4, S8A4, [days_ms]),
(Bytes12Alignment4, 12, 4, S12A4, [[u32; 3]]),
(Bytes16Alignment4, 16, 4, S16A4, [View]),
(Bytes16Alignment8, 16, 8, S16A8, [months_days_ns]),
(Bytes16Alignment16, 16, 16, S16A16, [u128, i128]),
(Bytes32Alignment16, 32, 16, S32A16, [i256]),
}
1 change: 1 addition & 0 deletions crates/polars-compute/src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use arrow::array::{new_empty_array, Array, BinaryViewArray, BooleanArray, Primit
use arrow::bitmap::utils::SlicesIterator;
use arrow::bitmap::Bitmap;
use arrow::with_match_primitive_type_full;
pub use boolean::filter_boolean_kernel;

pub fn filter(array: &dyn Array, mask: &BooleanArray) -> Box<dyn Array> {
assert_eq!(array.len(), mask.len());
Expand Down
35 changes: 28 additions & 7 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ fn rg_to_dfs(
use_statistics: bool,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
if config::verbose() {
eprintln!("parquet scan with parallel = {parallel:?}");
}

// If we are only interested in the row_index, we take a little special path here.
if projection.is_empty() {
if let Some(row_index) = row_index {
Expand Down Expand Up @@ -341,6 +345,10 @@ fn rg_to_dfs_prefiltered(
let num_live_columns = live_variables.len();
let num_dead_columns = projection.len() - num_live_columns;

if config::verbose() {
eprintln!("parquet live columns = {num_live_columns}, dead columns = {num_dead_columns}");
}

// @NOTE: This is probably already sorted, but just to be sure.
let mut projection_sorted = projection.to_vec();
projection_sorted.sort();
Expand Down Expand Up @@ -446,6 +454,10 @@ fn rg_to_dfs_prefiltered(
debug_assert_eq!(df.height(), filter_mask.set_bits());

if filter_mask.set_bits() == 0 {
if config::verbose() {
eprintln!("parquet filter mask found that row group can be skipped");
}

return Ok(None);
}

Expand Down Expand Up @@ -886,10 +898,19 @@ pub fn read_parquet<R: MmapBytesReader>(
.unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));

if let Some(predicate) = predicate {
if std::env::var("POLARS_PARQUET_AUTO_PREFILTERED").is_ok_and(|v| v == "1")
&& predicate.live_variables().map_or(0, |v| v.len()) * n_row_groups
>= POOL.current_num_threads()
{
let prefilter_env = std::env::var("POLARS_PARQUET_PREFILTER");
let prefilter_env = prefilter_env.as_deref();

let num_live_variables = predicate.live_variables().map_or(0, |v| v.len());
let mut do_prefilter = false;

do_prefilter |= prefilter_env == Ok("1"); // Force enable
do_prefilter |= num_live_variables * n_row_groups >= POOL.current_num_threads()
&& materialized_projection.len() >= num_live_variables;

do_prefilter &= prefilter_env != Ok("0"); // Force disable

if do_prefilter {
parallel = ParallelStrategy::Prefiltered;
}
}
Expand Down Expand Up @@ -1419,12 +1440,12 @@ impl PrefilterMaskSetting {
pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
match self {
Self::Auto => {
// Prefiltering is more expensive for nested types so we make the cut-off
// higher.
// Prefiltering is only expensive for nested types so we make the cut-off quite
// high.
let is_nested = dtype.is_nested();

// We empirically selected these numbers.
(is_nested && prefilter_cost <= 0.01) || (!is_nested && prefilter_cost <= 0.02)
is_nested && prefilter_cost <= 0.01
},
Self::Pre => true,
Self::Post => false,
Expand Down
Loading

0 comments on commit 3fe10a3

Please sign in to comment.