Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 28, 2024
1 parent 9f87955 commit 5ef1038
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ use arrow::datatypes::GenericBinaryType;
use arrow::datatypes::GenericStringType;
use datafusion_common::utils::proxy::VecAllocExt;

use std::sync::Arc;
use std::vec;

use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};
use std::sync::Arc;
use std::vec;

/// Trait for group values column-wise row comparison
///
Expand Down Expand Up @@ -65,10 +64,8 @@ pub trait GroupColumn: Send + Sync {

pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType> {
group_values: Vec<T::Native>,
/// If false, the input is guaranteed to have no nulls
nullable: bool,
/// Null state
nulls: MaybeNullBufferBuilder,
/// Null state (when None, input is guaranteed not to have nulls)
nulls: Option<MaybeNullBufferBuilder>,
}

impl<T> PrimitiveGroupValueBuilder<T>
Expand All @@ -79,34 +76,48 @@ where
///
/// If `nullable` is false, it means the input will never have nulls
pub fn new(nullable: bool) -> Self {
let nulls = if nullable {
Some(MaybeNullBufferBuilder::new())
} else {
None
};

Self {
group_values: vec![],
nulls: MaybeNullBufferBuilder::new(),
nullable,
nulls,
}
}
}

impl<T: ArrowPrimitiveType> GroupColumn for PrimitiveGroupValueBuilder<T> {
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
// fast path when input has no nulls
if !self.nullable {
debug_assert!(!self.nulls.has_nulls());
self.group_values[lhs_row] == array.as_primitive::<T>().value(rhs_row)
} else {
// slow path if the input could have nulls
self.nulls.is_null(lhs_row) == array.is_null(rhs_row)
&& self.group_values[lhs_row] == array.as_primitive::<T>().value(rhs_row)
match self.nulls.as_ref() {
None => {
self.group_values[lhs_row] == array.as_primitive::<T>().value(rhs_row)
}
Some(nulls) => {
// slower path if the input could have nulls
nulls.is_null(lhs_row) == array.is_null(rhs_row)
&& self.group_values[lhs_row]
== array.as_primitive::<T>().value(rhs_row)
}
}
}

fn append_val(&mut self, array: &ArrayRef, row: usize) {
if array.is_null(row) {
self.nulls.append(true);
self.group_values.push(T::default_value());
} else {
self.nulls.append(false);
self.group_values.push(array.as_primitive::<T>().value(row));
match self.nulls.as_mut() {
// input can't possibly have nulls, so don't worry about them
None => self.group_values.push(array.as_primitive::<T>().value(row)),
Some(nulls) => {
if array.is_null(row) {
nulls.append(true);
self.group_values.push(T::default_value());
} else {
nulls.append(false);
self.group_values.push(array.as_primitive::<T>().value(row));
}
}
}
}

Expand All @@ -115,26 +126,32 @@ impl<T: ArrowPrimitiveType> GroupColumn for PrimitiveGroupValueBuilder<T> {
}

fn size(&self) -> usize {
// BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes)
self.group_values.allocated_size() + self.nulls.allocated_size()
let nulls_size = self
.nulls
.as_ref()
.map(|nulls| nulls.allocated_size())
.unwrap_or(0);

self.group_values.allocated_size() + nulls_size
}

fn build(self: Box<Self>) -> ArrayRef {
let Self {
group_values,
nulls,
nullable: _,
} = *self;

let nulls = nulls.and_then(|nulls| nulls.build());

Arc::new(PrimitiveArray::<T>::new(
ScalarBuffer::from(group_values),
nulls.build(),
nulls,
))
}

fn take_n(&mut self, n: usize) -> ArrayRef {
let first_n = self.group_values.drain(0..n).collect::<Vec<_>>();
let first_n_nulls = self.nulls.take_n(n);
let first_n_nulls = self.nulls.as_mut().and_then(|nulls| nulls.take_n(n));

Arc::new(PrimitiveArray::<T>::new(
ScalarBuffer::from(first_n),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use arrow_buffer::{BooleanBufferBuilder, NullBuffer};

/// Support building up an optional null mask
/// Builder for an (optional) null mask
///
/// Optimized for avoid creating the bitmask when all values are non-null
#[derive(Debug)]
pub(crate) enum MaybeNullBufferBuilder {
/// seen `row_count` rows but no nulls yet
Expand All @@ -35,13 +37,6 @@ impl MaybeNullBufferBuilder {
Self::NoNulls { row_count: 0 }
}

/// Returns true if this builder is tracking any nulls
///
/// This will return true if at least one null has been set via XXX
pub fn has_nulls(&self) -> bool {
matches!(self, Self::Nulls { .. })
}

/// Return true if the row at index `row` is null
pub fn is_null(&self, row: usize) -> bool {
match self {
Expand Down Expand Up @@ -79,6 +74,7 @@ impl MaybeNullBufferBuilder {
pub fn allocated_size(&self) -> usize {
match self {
Self::NoNulls { .. } => 0,
// BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes)
Self::Nulls(builder) => builder.capacity() / 8,
}
}
Expand Down

0 comments on commit 5ef1038

Please sign in to comment.