Skip to content

Commit

Permalink
speeding up conversion to state
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Aug 2, 2024
1 parent d820054 commit a99c081
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 49 deletions.
66 changes: 42 additions & 24 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{fmt::Debug, sync::Arc};

use arrow::{
array::{ArrayRef, AsArray},
compute,
datatypes::{
DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field,
Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
Expand All @@ -34,7 +35,7 @@ use arrow::{
};

use arrow::{
array::{Array, BooleanArray, Int64Array, Int64Builder, PrimitiveArray},
array::{Array, BooleanArray, Int64Array, PrimitiveArray},
buffer::BooleanBuffer,
};
use datafusion_common::{
Expand Down Expand Up @@ -448,35 +449,52 @@ impl GroupsAccumulator for CountGroupsAccumulator {
let values = &values[0];

let state_array = match (values.logical_nulls(), opt_filter) {
(Some(nulls), None) => {
let mut builder = Int64Builder::with_capacity(values.len());
nulls
.into_iter()
.for_each(|is_valid| builder.append_value(is_valid as i64));
builder.finish()
(None, None) => {
// In case there is no nulls in input and no filter, returning array of 1
Arc::new(Int64Array::from_value(1, values.len()))
}
(Some(nulls), Some(filter)) => {
let mut builder = Int64Builder::with_capacity(values.len());
nulls.into_iter().zip(filter.iter()).for_each(
|(is_valid, filter_value)| {
builder.append_value(
(is_valid && filter_value.is_some_and(|val| val)) as i64,
)
},
);
builder.finish()
(Some(nulls), None) => {
// If there are any nulls in input values -- casting `nulls` (true for values, false for nulls)
// of input array to Int64
let nulls = BooleanArray::new(nulls.into_inner(), None);
compute::cast(&nulls, &DataType::Int64)?
}
(None, Some(filter)) => {
let mut builder = Int64Builder::with_capacity(values.len());
filter.into_iter().for_each(|filter_value| {
builder.append_value(filter_value.is_some_and(|val| val) as i64)
});
builder.finish()
// If there is only filter
// - applying filter null mask to filter values by bitand filter values and nulls buffers
// (using buffers guarantees absence of nulls in result)
// - casting result of bitand to Int64 array
let (filter_values, filter_nulls) = filter.clone().into_parts();

let state_buf = match filter_nulls {
Some(filter_nulls) => &filter_values & filter_nulls.inner(),
None => filter_values,
};

let boolean_state = BooleanArray::new(state_buf, None);
compute::cast(&boolean_state, &DataType::Int64)?
}
(Some(nulls), Some(filter)) => {
// For both input nulls and filter
// - applying filter null mask to filter values by bitand filter values and nulls buffers
// (using buffers guarantees absence of nulls in result)
// - applying values null mask to filter buffer by another bitand on filter result and
// nulls from input values
// - casting result to Int64 array
let (filter_values, filter_nulls) = filter.clone().into_parts();

let filter_buf = match filter_nulls {
Some(filter_nulls) => &filter_values & filter_nulls.inner(),
None => filter_values,
};
let state_buf = &filter_buf & nulls.inner();

let boolean_state = BooleanArray::new(state_buf, None);
compute::cast(&boolean_state, &DataType::Int64)?
}
(None, None) => Int64Array::from_value(1, values.len()),
};

Ok(vec![Arc::new(state_array)])
Ok(vec![state_array])
}

fn supports_convert_to_state(&self) -> bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

use std::sync::Arc;

use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray, PrimitiveBuilder};
use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray};
use arrow::buffer::NullBuffer;
use arrow::compute;
use arrow::datatypes::ArrowPrimitiveType;
use arrow::datatypes::DataType;
use datafusion_common::Result;
use datafusion_common::{internal_datafusion_err, DataFusionError, Result};
use datafusion_expr::{EmitTo, GroupsAccumulator};

use super::accumulate::NullState;
Expand Down Expand Up @@ -140,34 +142,45 @@ where
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
let values = values[0].as_primitive::<T>();
let mut state = PrimitiveBuilder::<T>::with_capacity(values.len())
.with_data_type(self.data_type.clone());

match opt_filter {
// Initializing state with starting values
let initial_state =
PrimitiveArray::<T>::from_value(self.starting_value, values.len());

// Recalculating values in case there is filter
let values = match opt_filter {
None => values,
Some(filter) => {
values
.iter()
.zip(filter.iter())
.for_each(|(val, filter_val)| match (val, filter_val) {
(Some(val), Some(true)) => {
let mut state_val = self.starting_value;
(self.prim_fn)(&mut state_val, val);
state.append_value(state_val);
}
(_, _) => state.append_null(),
})
let (filter_values, filter_nulls) = filter.clone().into_parts();
// Calculating filter mask as a result of bitand of filter, and converting it to null buffer
let filter_bool = match filter_nulls {
Some(filter_nulls) => filter_nulls.inner() & &filter_values,
None => filter_values,
};
let filter_nulls = NullBuffer::from(filter_bool);

// Rebuilding input values with a new nulls mask, which is equal to
// the union of original nulls and filter mask
let (dt, values_buf, original_nulls) = values.clone().into_parts();
let nulls_buf =
NullBuffer::union(original_nulls.as_ref(), Some(&filter_nulls));
&PrimitiveArray::<T>::new(values_buf, nulls_buf).with_data_type(dt)
}
None => values.iter().for_each(|val| match val {
Some(val) => {
let mut state_val = self.starting_value;
(self.prim_fn)(&mut state_val, val);
state.append_value(state_val);
}
None => state.append_null(),
}),
};

Ok(vec![Arc::new(state.finish())])
let state_values = compute::binary_mut(initial_state, values, |mut x, y| {
(self.prim_fn)(&mut x, y);
x
});
let state_values = state_values
.map_err(|_| {
internal_datafusion_err!(
"initial_values underlying buffer must not be shared"
)
})?
.map_err(|err| DataFusionError::from(err))?;

Ok(vec![Arc::new(state_values)])
}

fn supports_convert_to_state(&self) -> bool {
Expand Down

0 comments on commit a99c081

Please sign in to comment.