From 21b3fc113ec750d41c11c0d6c3ba07ccd434ca9e Mon Sep 17 00:00:00 2001 From: ritchie Date: Wed, 25 Dec 2024 11:00:31 +0100 Subject: [PATCH] perf: Take different binview dedup strategy depending on chunks ratio --- .../src/chunked_array/gather/chunked.rs | 132 +++++++++++++++++- 1 file changed, 128 insertions(+), 4 deletions(-) diff --git a/crates/polars-ops/src/chunked_array/gather/chunked.rs b/crates/polars-ops/src/chunked_array/gather/chunked.rs index eb02399a2f3..093af5aeb53 100644 --- a/crates/polars-ops/src/chunked_array/gather/chunked.rs +++ b/crates/polars-ops/src/chunked_array/gather/chunked.rs @@ -4,6 +4,7 @@ use arrow::array::{Array, BinaryViewArrayGeneric, View, ViewType}; use arrow::bitmap::BitmapBuilder; use arrow::buffer::Buffer; use arrow::legacy::trusted_len::TrustedLenPush; +use hashbrown::hash_map::Entry; use polars_core::prelude::gather::_update_gather_sorted_flag; use polars_core::prelude::*; use polars_core::series::IsSorted; @@ -472,7 +473,53 @@ where }; arc_data_buffers = arr.data_buffers().clone(); - } else { + } + // Dedup the buffers while creating the views. + else if by.len() < ca.n_chunks() { + let mut buffer_idxs = PlHashMap::with_capacity(8); + let mut buffers = Vec::with_capacity(8); + + validity = if ca.has_nulls() { + let mut validity = BitmapBuilder::with_capacity(by.len()); + for id in by.iter() { + let (chunk_idx, array_idx) = id.extract(); + + let arr = ca.downcast_get_unchecked(chunk_idx as usize); + if arr.is_null_unchecked(array_idx as usize) { + views.push_unchecked(View::default()); + validity.push_unchecked(false); + } else { + let view = *arr.views().get_unchecked(array_idx as usize); + views.push_unchecked(update_view_and_dedup( + view, + arr.data_buffers(), + &mut buffer_idxs, + &mut buffers, + )); + validity.push_unchecked(true); + } + } + Some(validity.freeze()) + } else { + for id in by.iter() { + let (chunk_idx, array_idx) = id.extract(); + + let arr = ca.downcast_get_unchecked(chunk_idx as usize); + let view = *arr.views().get_unchecked(array_idx as usize); + views.push_unchecked(update_view_and_dedup( + view, + arr.data_buffers(), + &mut buffer_idxs, + &mut buffers, + )); + } + None + }; + + arc_data_buffers = buffers.into(); + } + // Dedup the buffers up front + else { let (buffers, buffer_offsets) = dedup_buffers(ca); validity = if ca.has_nulls() { @@ -531,13 +578,36 @@ unsafe fn rewrite_view(mut view: View, chunk_idx: IdxSize, buffer_offsets: &[u32 view } +unsafe fn update_view_and_dedup( + mut view: View, + orig_buffers: &[Buffer], + buffer_idxs: &mut PlHashMap<(*const u8, usize), u32>, + buffers: &mut Vec>, +) -> View { + if view.length > 12 { + // Dedup on pointer + length. + let orig_buffer = orig_buffers.get_unchecked(view.buffer_idx as usize); + view.buffer_idx = + match buffer_idxs.entry((orig_buffer.as_slice().as_ptr(), orig_buffer.len())) { + Entry::Occupied(o) => *o.get(), + Entry::Vacant(v) => { + let buffer_idx = buffers.len() as u32; + buffers.push(orig_buffer.clone()); + v.insert(buffer_idx); + buffer_idx + }, + }; + } + view +} + fn dedup_buffers(ca: &ChunkedArray) -> (Vec>, Vec) where T: PolarsDataType>, V: ViewType + ?Sized, { - // Dedup buffers up front. Note: don't do this during view update, as this is much more - // costly. + // Dedup buffers up front. Note: don't do this during view update, as this is often is much + // more costly. let mut buffers = Vec::with_capacity(ca.chunks().len()); // Dont need to include the length, as we look at the arc pointers, which are immutable. let mut buffers_dedup = PlHashSet::with_capacity(ca.chunks().len()); @@ -598,7 +668,61 @@ where } arr.data_buffers().clone() - } else { + } + // Dedup the buffers while creating the views. + else if by.len() < ca.n_chunks() { + let mut buffer_idxs = PlHashMap::with_capacity(8); + let mut buffers = Vec::with_capacity(8); + + if ca.has_nulls() { + for id in by.iter() { + let (chunk_idx, array_idx) = id.extract(); + + if id.is_null() { + views.push_unchecked(View::default()); + validity.push_unchecked(false); + } else { + let arr = ca.downcast_get_unchecked(chunk_idx as usize); + if arr.is_null_unchecked(array_idx as usize) { + views.push_unchecked(View::default()); + validity.push_unchecked(false); + } else { + let view = *arr.views().get_unchecked(array_idx as usize); + views.push_unchecked(update_view_and_dedup( + view, + arr.data_buffers(), + &mut buffer_idxs, + &mut buffers, + )); + validity.push_unchecked(true); + } + } + } + } else { + for id in by.iter() { + let (chunk_idx, array_idx) = id.extract(); + + if id.is_null() { + views.push_unchecked(View::default()); + validity.push_unchecked(false); + } else { + let arr = ca.downcast_get_unchecked(chunk_idx as usize); + let view = *arr.views().get_unchecked(array_idx as usize); + views.push_unchecked(update_view_and_dedup( + view, + arr.data_buffers(), + &mut buffer_idxs, + &mut buffers, + )); + validity.push_unchecked(true); + } + } + }; + + buffers.into() + } + // Dedup the buffers up front + else { let (buffers, buffer_offsets) = dedup_buffers(ca); if ca.has_nulls() {