diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 249672cbcbfb..873ac986cfa6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -102,7 +102,13 @@ impl GroupColumn for PrimitiveGroupValueBuilder { } fn append_val(&mut self, array: &ArrayRef, row: usize) { - self.nulls.append(array.is_null(row)) + if array.is_null(row) { + self.nulls.append(true); + self.group_values.push(T::default_value()); + } else { + self.nulls.append(false); + self.group_values.push(array.as_primitive::().value(row)); + } } fn len(&self) -> usize { diff --git a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs index 622f6e001784..13e2e210292e 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs @@ -19,7 +19,7 @@ use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; /// Support building up an optional null mask #[derive(Debug)] -pub enum MaybeNullBufferBuilder { +pub(crate) enum MaybeNullBufferBuilder { /// seen `row_count` rows but no nulls yet NoNulls { row_count: usize }, /// have at least one null value @@ -47,7 +47,7 @@ impl MaybeNullBufferBuilder { match self { Self::NoNulls { .. } => false, // validity mask means a unset bit is NULL - Self::Nulls(nulls) => !nulls.get_bit(row), + Self::Nulls(builder) => !builder.get_bit(row), } } @@ -59,10 +59,7 @@ impl MaybeNullBufferBuilder { /// If `value` is false, the row is non null pub fn append(&mut self, is_null: bool) { match self { - Self::NoNulls { row_count } if !is_null => { - *row_count += 1; - } - Self::NoNulls { row_count } => { + Self::NoNulls { row_count } if is_null => { // have seen no nulls so far, this is the first null, // need to create the nulls buffer for all currently valid values // alloc 2x the need given we push a new but immediately @@ -71,15 +68,19 @@ impl MaybeNullBufferBuilder { nulls.append(false); *self = Self::Nulls(nulls); } - Self::Nulls(nulls) => nulls.append(is_null), + Self::NoNulls { row_count } => { + *row_count += 1; + } + Self::Nulls(builder) => builder.append(!is_null), } } + /// return the number of heap allocated bytes used by this structure to store boolean values pub fn allocated_size(&self) -> usize { match self { Self::NoNulls { .. } => 0, - Self::Nulls(nulls) => nulls.capacity() / 8, + Self::Nulls(builder) => builder.capacity() / 8, } } @@ -87,7 +88,7 @@ impl MaybeNullBufferBuilder { pub fn build(self) -> Option { match self { Self::NoNulls { .. } => None, - Self::Nulls(mut nulls) => Some(NullBuffer::from(nulls.finish())), + Self::Nulls(mut builder) => Some(NullBuffer::from(builder.finish())), } } @@ -99,16 +100,16 @@ impl MaybeNullBufferBuilder { *row_count -= n; None } - Self::Nulls(nulls) => { + Self::Nulls(builder) => { // Copy over the values at n..len-1 values to the start of a // new builder and leave it in self // // TODO: it would be great to use something like `set_bits` from arrow here. - let mut new_builder = BooleanBufferBuilder::new(nulls.len()); - for i in n..nulls.len() { - new_builder.append(nulls.get_bit(i)); + let mut new_builder = BooleanBufferBuilder::new(builder.len()); + for i in n..builder.len() { + new_builder.append(builder.get_bit(i)); } - std::mem::swap(&mut new_builder, nulls); + std::mem::swap(&mut new_builder, builder); // take only first n values from the original builder new_builder.truncate(n);