diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index c78e8c7161cd2..e81489005e183 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -23,6 +23,7 @@ use std::{fmt::Debug, sync::Arc}; use arrow::{ array::{ArrayRef, AsArray}, + compute, datatypes::{ DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, @@ -34,7 +35,7 @@ use arrow::{ }; use arrow::{ - array::{Array, BooleanArray, Int64Array, Int64Builder, PrimitiveArray}, + array::{Array, BooleanArray, Int64Array, PrimitiveArray}, buffer::BooleanBuffer, }; use datafusion_common::{ @@ -448,35 +449,52 @@ impl GroupsAccumulator for CountGroupsAccumulator { let values = &values[0]; let state_array = match (values.logical_nulls(), opt_filter) { - (Some(nulls), None) => { - let mut builder = Int64Builder::with_capacity(values.len()); - nulls - .into_iter() - .for_each(|is_valid| builder.append_value(is_valid as i64)); - builder.finish() + (None, None) => { + // In case there is no nulls in input and no filter, returning array of 1 + Arc::new(Int64Array::from_value(1, values.len())) } - (Some(nulls), Some(filter)) => { - let mut builder = Int64Builder::with_capacity(values.len()); - nulls.into_iter().zip(filter.iter()).for_each( - |(is_valid, filter_value)| { - builder.append_value( - (is_valid && filter_value.is_some_and(|val| val)) as i64, - ) - }, - ); - builder.finish() + (Some(nulls), None) => { + // If there are any nulls in input values -- casting `nulls` (true for values, false for nulls) + // of input array to Int64 + let nulls = BooleanArray::new(nulls.into_inner(), None); + compute::cast(&nulls, &DataType::Int64)? } (None, Some(filter)) => { - let mut builder = Int64Builder::with_capacity(values.len()); - filter.into_iter().for_each(|filter_value| { - builder.append_value(filter_value.is_some_and(|val| val) as i64) - }); - builder.finish() + // If there is only filter + // - applying filter null mask to filter values by bitand filter values and nulls buffers + // (using buffers guarantees absence of nulls in result) + // - casting result of bitand to Int64 array + let (filter_values, filter_nulls) = filter.clone().into_parts(); + + let state_buf = match filter_nulls { + Some(filter_nulls) => &filter_values & filter_nulls.inner(), + None => filter_values, + }; + + let boolean_state = BooleanArray::new(state_buf, None); + compute::cast(&boolean_state, &DataType::Int64)? + } + (Some(nulls), Some(filter)) => { + // For both input nulls and filter + // - applying filter null mask to filter values by bitand filter values and nulls buffers + // (using buffers guarantees absence of nulls in result) + // - applying values null mask to filter buffer by another bitand on filter result and + // nulls from input values + // - casting result to Int64 array + let (filter_values, filter_nulls) = filter.clone().into_parts(); + + let filter_buf = match filter_nulls { + Some(filter_nulls) => &filter_values & filter_nulls.inner(), + None => filter_values, + }; + let state_buf = &filter_buf & nulls.inner(); + + let boolean_state = BooleanArray::new(state_buf, None); + compute::cast(&boolean_state, &DataType::Int64)? } - (None, None) => Int64Array::from_value(1, values.len()), }; - Ok(vec![Arc::new(state_array)]) + Ok(vec![state_array]) } fn supports_convert_to_state(&self) -> bool { diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs index ac68128cffc12..5b9a0ef9df2ec 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs @@ -17,10 +17,12 @@ use std::sync::Arc; -use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray, PrimitiveBuilder}; +use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}; +use arrow::buffer::NullBuffer; +use arrow::compute; use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::DataType; -use datafusion_common::Result; +use datafusion_common::{internal_datafusion_err, DataFusionError, Result}; use datafusion_expr::{EmitTo, GroupsAccumulator}; use super::accumulate::NullState; @@ -140,34 +142,45 @@ where opt_filter: Option<&BooleanArray>, ) -> Result> { let values = values[0].as_primitive::(); - let mut state = PrimitiveBuilder::::with_capacity(values.len()) - .with_data_type(self.data_type.clone()); - match opt_filter { + // Initializing state with starting values + let initial_state = + PrimitiveArray::::from_value(self.starting_value, values.len()); + + // Recalculating values in case there is filter + let values = match opt_filter { + None => values, Some(filter) => { - values - .iter() - .zip(filter.iter()) - .for_each(|(val, filter_val)| match (val, filter_val) { - (Some(val), Some(true)) => { - let mut state_val = self.starting_value; - (self.prim_fn)(&mut state_val, val); - state.append_value(state_val); - } - (_, _) => state.append_null(), - }) + let (filter_values, filter_nulls) = filter.clone().into_parts(); + // Calculating filter mask as a result of bitand of filter, and converting it to null buffer + let filter_bool = match filter_nulls { + Some(filter_nulls) => filter_nulls.inner() & &filter_values, + None => filter_values, + }; + let filter_nulls = NullBuffer::from(filter_bool); + + // Rebuilding input values with a new nulls mask, which is equal to + // the union of original nulls and filter mask + let (dt, values_buf, original_nulls) = values.clone().into_parts(); + let nulls_buf = + NullBuffer::union(original_nulls.as_ref(), Some(&filter_nulls)); + &PrimitiveArray::::new(values_buf, nulls_buf).with_data_type(dt) } - None => values.iter().for_each(|val| match val { - Some(val) => { - let mut state_val = self.starting_value; - (self.prim_fn)(&mut state_val, val); - state.append_value(state_val); - } - None => state.append_null(), - }), }; - Ok(vec![Arc::new(state.finish())]) + let state_values = compute::binary_mut(initial_state, values, |mut x, y| { + (self.prim_fn)(&mut x, y); + x + }); + let state_values = state_values + .map_err(|_| { + internal_datafusion_err!( + "initial_values underlying buffer must not be shared" + ) + })? + .map_err(|err| DataFusionError::from(err))?; + + Ok(vec![Arc::new(state_values)]) } fn supports_convert_to_state(&self) -> bool {