Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into altair
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoGorelli committed Aug 13, 2024
2 parents dfd25fa + b585293 commit 5bd4fb4
Show file tree
Hide file tree
Showing 53 changed files with 2,435 additions and 1,019 deletions.
304 changes: 161 additions & 143 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
let buffers = self.buffers.as_ref();

for view in self.views.as_ref() {
unsafe { mutable.push_view_copied_unchecked(*view, buffers) }
unsafe { mutable.push_view_unchecked(*view, buffers) }
}
mutable.freeze().with_validity(self.validity)
}
Expand Down
13 changes: 8 additions & 5 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
/// - caller must allocate enough capacity
/// - caller must ensure the view and buffers match.
/// - The array must not have validity.
pub(crate) unsafe fn push_view_copied_unchecked(&mut self, v: View, buffers: &[Buffer<u8>]) {
pub(crate) unsafe fn push_view_unchecked(&mut self, v: View, buffers: &[Buffer<u8>]) {
let len = v.length;
self.total_bytes_len += len as usize;
if len <= 12 {
Expand All @@ -165,7 +165,7 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
/// - caller must ensure the view and buffers match.
/// - The array must not have validity.
/// - caller must not mix use this function with other push functions.
pub unsafe fn push_view_unchecked(&mut self, mut v: View, buffers: &[Buffer<u8>]) {
pub unsafe fn push_view_unchecked_dedupe(&mut self, mut v: View, buffers: &[Buffer<u8>]) {
let len = v.length;
self.total_bytes_len += len as usize;
if len <= 12 {
Expand Down Expand Up @@ -438,14 +438,17 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
/// # Safety
/// Same as `push_view_unchecked()`.
#[inline]
pub unsafe fn extend_non_null_views_trusted_len_unchecked<I>(
pub unsafe fn extend_non_null_views_unchecked_dedupe<I>(
&mut self,
iterator: I,
buffers: &[Buffer<u8>],
) where
I: TrustedLen<Item = View>,
I: Iterator<Item = View>,
{
self.extend_non_null_views_unchecked(iterator, buffers);
self.reserve(iterator.size_hint().0);
for v in iterator {
self.push_view_unchecked_dedupe(v, buffers);
}
}

#[inline]
Expand Down
17 changes: 16 additions & 1 deletion crates/polars-arrow/src/array/growable/binview.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::ops::Deref;
use std::sync::Arc;

use polars_utils::aliases::{InitHashMaps, PlHashSet};

use super::Growable;
use crate::array::binview::{BinaryViewArrayGeneric, ViewType};
use crate::array::growable::utils::{extend_validity, extend_validity_copies, prepare_validity};
Expand All @@ -18,6 +20,7 @@ pub struct GrowableBinaryViewArray<'a, T: ViewType + ?Sized> {
inner: MutableBinaryViewArray<T>,
same_buffers: Option<&'a Arc<[Buffer<u8>]>>,
total_same_buffers_len: usize, // Only valid if same_buffers is Some.
has_duplicate_buffers: bool,
}

impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> {
Expand Down Expand Up @@ -51,13 +54,22 @@ impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> {
.then(|| arrays[0].total_buffer_len())
.unwrap_or_default();

let mut duplicates = PlHashSet::new();
let mut has_duplicate_buffers = false;
for arr in arrays.iter() {
if !duplicates.insert(arr.data_buffers().as_ptr()) {
has_duplicate_buffers = true;
break;
}
}
Self {
arrays,
data_type,
validity: prepare_validity(use_validity, capacity),
inner: MutableBinaryViewArray::<T>::with_capacity(capacity),
same_buffers,
total_same_buffers_len,
has_duplicate_buffers,
}
}

Expand Down Expand Up @@ -97,9 +109,12 @@ impl<'a, T: ViewType + ?Sized> Growable<'a> for GrowableBinaryViewArray<'a, T> {
.views
.extend(views_iter.inspect(|v| total_len += v.length as usize));
self.inner.total_bytes_len += total_len;
} else if self.has_duplicate_buffers {
self.inner
.extend_non_null_views_unchecked_dedupe(views_iter, local_buffers.deref());
} else {
self.inner
.extend_non_null_views_trusted_len_unchecked(views_iter, local_buffers.deref());
.extend_non_null_views_unchecked(views_iter, local_buffers.deref());
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-compute/src/if_then_else/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl<T: NotSimdPrimitive> IfThenElseKernel for PrimitiveArray<T> {
}
}

fn if_then_else_validity(
pub fn if_then_else_validity(
mask: &Bitmap,
if_true: Option<&Bitmap>,
if_false: Option<&Bitmap>,
Expand Down
68 changes: 52 additions & 16 deletions crates/polars-compute/src/if_then_else/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use arrow::array::{Array, BinaryViewArray, MutablePlBinary, Utf8ViewArray, View}
use arrow::bitmap::Bitmap;
use arrow::buffer::Buffer;
use arrow::datatypes::ArrowDataType;
use polars_utils::aliases::{InitHashMaps, PlHashSet};

use super::IfThenElseKernel;
use crate::if_then_else::scalar::if_then_else_broadcast_both_scalar_64;
Expand All @@ -28,12 +29,25 @@ fn make_buffer_and_views<const N: usize>(
(views, buf)
}

fn has_duplicate_buffers(bufs: &[Buffer<u8>]) -> bool {
let mut has_duplicate_buffers = false;
let mut bufset = PlHashSet::new();
for buf in bufs {
if !bufset.insert(buf.as_ptr()) {
has_duplicate_buffers = true;
break;
}
}
has_duplicate_buffers
}

impl IfThenElseKernel for BinaryViewArray {
type Scalar<'a> = &'a [u8];

fn if_then_else(mask: &Bitmap, if_true: &Self, if_false: &Self) -> Self {
let combined_buffers: Arc<_>;
let false_buffer_idx_offset: u32;
let mut has_duplicate_bufs = false;
if Arc::ptr_eq(if_true.data_buffers(), if_false.data_buffers()) {
// Share exact same buffers, no need to combine.
combined_buffers = if_true.data_buffers().clone();
Expand All @@ -42,7 +56,9 @@ impl IfThenElseKernel for BinaryViewArray {
// Put false buffers after true buffers.
let true_buffers = if_true.data_buffers().iter().cloned();
let false_buffers = if_false.data_buffers().iter().cloned();

combined_buffers = true_buffers.chain(false_buffers).collect();
has_duplicate_bufs = has_duplicate_buffers(&combined_buffers);
false_buffer_idx_offset = if_true.data_buffers().len() as u32;
}

Expand All @@ -57,12 +73,19 @@ impl IfThenElseKernel for BinaryViewArray {
let validity = super::if_then_else_validity(mask, if_true.validity(), if_false.validity());

let mut builder = MutablePlBinary::with_capacity(views.len());
unsafe {
builder.extend_non_null_views_trusted_len_unchecked(
views.into_iter(),
combined_buffers.deref(),
)
};

if has_duplicate_bufs {
unsafe {
builder.extend_non_null_views_unchecked_dedupe(
views.into_iter(),
combined_buffers.deref(),
)
};
} else {
unsafe {
builder.extend_non_null_views_unchecked(views.into_iter(), combined_buffers.deref())
};
}
builder
.freeze_with_dtype(if_true.data_type().clone())
.with_validity(validity)
Expand Down Expand Up @@ -90,12 +113,17 @@ impl IfThenElseKernel for BinaryViewArray {
let validity = super::if_then_else_validity(mask, None, if_false.validity());

let mut builder = MutablePlBinary::with_capacity(views.len());

unsafe {
builder.extend_non_null_views_trusted_len_unchecked(
views.into_iter(),
combined_buffers.deref(),
)
};
if has_duplicate_buffers(&combined_buffers) {
builder.extend_non_null_views_unchecked_dedupe(
views.into_iter(),
combined_buffers.deref(),
)
} else {
builder.extend_non_null_views_unchecked(views.into_iter(), combined_buffers.deref())
}
}
builder
.freeze_with_dtype(if_false.data_type().clone())
.with_validity(validity)
Expand Down Expand Up @@ -125,10 +153,14 @@ impl IfThenElseKernel for BinaryViewArray {

let mut builder = MutablePlBinary::with_capacity(views.len());
unsafe {
builder.extend_non_null_views_trusted_len_unchecked(
views.into_iter(),
combined_buffers.deref(),
)
if has_duplicate_buffers(&combined_buffers) {
builder.extend_non_null_views_unchecked_dedupe(
views.into_iter(),
combined_buffers.deref(),
)
} else {
builder.extend_non_null_views_unchecked(views.into_iter(), combined_buffers.deref())
}
};
builder
.freeze_with_dtype(if_true.data_type().clone())
Expand All @@ -152,7 +184,11 @@ impl IfThenElseKernel for BinaryViewArray {

let mut builder = MutablePlBinary::with_capacity(views.len());
unsafe {
builder.extend_non_null_views_trusted_len_unchecked(views.into_iter(), buffers.deref())
if has_duplicate_buffers(&buffers) {
builder.extend_non_null_views_unchecked_dedupe(views.into_iter(), buffers.deref())
} else {
builder.extend_non_null_views_unchecked(views.into_iter(), buffers.deref())
}
};
builder.freeze_with_dtype(dtype)
}
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-core/src/chunked_array/ops/full.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::bitmap::MutableBitmap;
use arrow::bitmap::{Bitmap, MutableBitmap};

use crate::chunked_array::builder::get_list_builder;
use crate::prelude::*;
Expand Down Expand Up @@ -189,7 +189,9 @@ impl ListChunked {
impl ChunkFullNull for StructChunked {
fn full_null(name: &str, length: usize) -> StructChunked {
let s = vec![Series::new_null("", length)];
StructChunked::from_series(name, &s).unwrap()
StructChunked::from_series(name, &s)
.unwrap()
.with_outer_validity(Some(Bitmap::new_zeroed(length)))
}
}

Expand Down
105 changes: 103 additions & 2 deletions crates/polars-core/src/chunked_array/ops/zip.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow::bitmap::Bitmap;
use arrow::compute::utils::{combine_validities_and, combine_validities_and_not};
use polars_compute::if_then_else::IfThenElseKernel;
use polars_compute::if_then_else::{if_then_else_validity, IfThenElseKernel};

#[cfg(feature = "object")]
use crate::chunked_array::object::ObjectArray;
Expand Down Expand Up @@ -62,7 +62,7 @@ fn combine_validities_chunked<

impl<T> ChunkZip<T> for ChunkedArray<T>
where
T: PolarsDataType,
T: PolarsDataType<IsStruct = FalseT>,
T::Array: for<'a> IfThenElseKernel<Scalar<'a> = T::Physical<'a>>,
ChunkedArray<T>: ChunkExpandAtIndex<T>,
{
Expand Down Expand Up @@ -206,3 +206,104 @@ impl<T: PolarsObject> IfThenElseKernel for ObjectArray<T> {
.collect_arr()
}
}

#[cfg(feature = "dtype-struct")]
impl ChunkZip<StructType> for StructChunked {
fn zip_with(
&self,
mask: &BooleanChunked,
other: &ChunkedArray<StructType>,
) -> PolarsResult<ChunkedArray<StructType>> {
let (l, r, mask) = align_chunks_ternary(self, other, mask);

// Prepare the boolean arrays such that Null maps to false.
// This prevents every field doing that.
// # SAFETY
// We don't modify the length and update the null count.
let mut mask = mask.into_owned();
unsafe {
for arr in mask.downcast_iter_mut() {
let bm = bool_null_to_false(arr);
*arr = BooleanArray::from_data_default(bm, None);
}
mask.set_null_count(0);
}

// Zip all the fields.
let fields = l
.fields_as_series()
.iter()
.zip(r.fields_as_series())
.map(|(lhs, rhs)| lhs.zip_with_same_type(&mask, &rhs))
.collect::<PolarsResult<Vec<_>>>()?;

let mut out = StructChunked::from_series(self.name(), &fields)?;

// Zip the validities.
if (l.null_count + r.null_count) > 0 {
let validities = l
.chunks()
.iter()
.zip(r.chunks())
.map(|(l, r)| (l.validity(), r.validity()));

fn broadcast(v: Option<&Bitmap>, arr: &ArrayRef) -> Bitmap {
if v.unwrap().get(0).unwrap() {
Bitmap::new_with_value(true, arr.len())
} else {
Bitmap::new_zeroed(arr.len())
}
}

// # SAFETY
// We don't modify the length and update the null count.
unsafe {
for ((arr, (lv, rv)), mask) in out
.chunks_mut()
.iter_mut()
.zip(validities)
.zip(mask.downcast_iter())
{
// TODO! we can optimize this and use a kernel that is able to broadcast wo/ allocating.
let (lv, rv) = match (lv.map(|b| b.len()), rv.map(|b| b.len())) {
(Some(1), Some(1)) if arr.len() != 1 => {
let lv = broadcast(lv, arr);
let rv = broadcast(rv, arr);
(Some(lv), Some(rv))
},
(Some(a), Some(b)) if a == b => (lv.cloned(), rv.cloned()),
(Some(1), _) => {
let lv = broadcast(lv, arr);
(Some(lv), rv.cloned())
},
(_, Some(1)) => {
let rv = broadcast(rv, arr);
(lv.cloned(), Some(rv))
},
(None, Some(_)) | (Some(_), None) | (None, None) => {
(lv.cloned(), rv.cloned())
},
(Some(a), Some(b)) => {
polars_bail!(InvalidOperation: "got different sizes in 'zip' operation, got length: {a} and {b}")
},
};

// broadcast mask
let validity = if mask.len() != arr.len() && mask.len() == 1 {
if mask.get(0).unwrap() {
lv
} else {
rv
}
} else {
if_then_else_validity(mask.values(), lv.as_ref(), rv.as_ref())
};

*arr = arr.with_validity(validity);
}
}
out.compute_len();
}
Ok(out)
}
}
12 changes: 3 additions & 9 deletions crates/polars-core/src/series/implementations/struct__.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,9 @@ impl PrivateSeries for SeriesWrap<StructChunked> {

#[cfg(feature = "zip_with")]
fn zip_with_same_type(&self, mask: &BooleanChunked, other: &Series) -> PolarsResult<Series> {
let other = other.struct_()?;
let fields = self
.0
.fields_as_series()
.iter()
.zip(other.fields_as_series())
.map(|(lhs, rhs)| lhs.zip_with_same_type(mask, &rhs))
.collect::<PolarsResult<Vec<_>>>()?;
StructChunked::from_series(self.0.name(), &fields).map(|ca| ca.into_series())
self.0
.zip_with(mask, other.struct_()?)
.map(|ca| ca.into_series())
}

#[cfg(feature = "algorithm_group_by")]
Expand Down
Loading

0 comments on commit 5bd4fb4

Please sign in to comment.