Skip to content

perf(array-agg): add fast path for array agg for merge_batch #14299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 76 additions & 3 deletions datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! `ARRAY_AGG` aggregate implementation: [`ArrayAgg`]

use arrow::array::{new_empty_array, Array, ArrayRef, AsArray, StructArray};
use arrow::array::{new_empty_array, Array, ArrayRef, AsArray, ListArray, StructArray};
use arrow::datatypes::DataType;

use arrow_schema::{Field, Fields};
Expand Down Expand Up @@ -177,6 +177,67 @@ impl ArrayAggAccumulator {
datatype: datatype.clone(),
})
}

/// This function will return the underlying list array values if all valid values are consecutive without gaps (i.e. no null value point to a non empty list)
/// If there are gaps but only in the end of the list array, the function will return the values without the null values in the end
fn get_optional_values_to_merge_as_is(list_array: &ListArray) -> Option<ArrayRef> {
let offsets = list_array.value_offsets();
// Offsets always have at least 1 value
let initial_offset = offsets[0];
let null_count = list_array.null_count();

// If no nulls than just use the fast path
// This is ok as the state is a ListArray rather than a ListViewArray so all the values are consecutive
if null_count == 0 {
// According to Arrow specification, the first offset can be non-zero
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while writing this code I first found out that offsets does not have to start from 0

let list_values = list_array.values().slice(
initial_offset as usize,
(offsets[offsets.len() - 1] - initial_offset) as usize,
);
return Some(list_values);
}

// If all the values are null than just return an empty values array
if list_array.null_count() == list_array.len() {
return Some(list_array.values().slice(0, 0));
}

// According to the Arrow spec, null values can point to non empty lists
// So this will check if all null values starting from the first valid value to the last one point to a 0 length list so we can just slice the underlying value

// Unwrapping is safe as we just checked if there is a null value
let nulls = list_array.nulls().unwrap();

let mut valid_slices_iter = nulls.valid_slices();

// This is safe as we validated that that are at least 1 valid value in the array
let (start, end) = valid_slices_iter.next().unwrap();

let start_offset = offsets[start];

// End is exclusive, so it already point to the last offset value
// This is valid as the length of the array is always 1 less than the length of the offsets
let mut end_offset_of_last_valid_value = offsets[end];

for (start, end) in valid_slices_iter {
// If there is a null value that point to a non empty list than the start offset of the valid value
// will be different that the end offset of the last valid value
if offsets[start] != end_offset_of_last_valid_value {
return None;
}

// End is exclusive, so it already point to the last offset value
// This is valid as the length of the array is always 1 less than the length of the offsets
end_offset_of_last_valid_value = offsets[end];
}

let consecutive_valid_values = list_array.values().slice(
start_offset as usize,
(end_offset_of_last_valid_value - start_offset) as usize,
);

Some(consecutive_valid_values)
}
}

impl Accumulator for ArrayAggAccumulator {
Expand Down Expand Up @@ -208,9 +269,21 @@ impl Accumulator for ArrayAggAccumulator {
}

let list_arr = as_list_array(&states[0])?;
for arr in list_arr.iter().flatten() {
self.values.push(arr);

match Self::get_optional_values_to_merge_as_is(list_arr) {
Some(values) => {
// Make sure we don't insert empty lists
if values.len() > 0 {
self.values.push(values);
}
}
None => {
for arr in list_arr.iter().flatten() {
self.values.push(arr);
}
}
}

Ok(())
}

Expand Down
Loading