From 1c233a89a88125b635ef365b0bea67d7f786ade7 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sat, 25 Jan 2025 21:40:18 +0200 Subject: [PATCH 1/4] perf(array-agg): add fast path for array agg for `merge_batch` --- .../functions-aggregate/src/array_agg.rs | 75 ++++++++++++++++++- 1 file changed, 72 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index b75de83f6ace..f685074f560b 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -17,7 +17,7 @@ //! `ARRAY_AGG` aggregate implementation: [`ArrayAgg`] -use arrow::array::{new_empty_array, Array, ArrayRef, AsArray, StructArray}; +use arrow::array::{new_empty_array, Array, ArrayRef, AsArray, ListArray, StructArray}; use arrow::datatypes::DataType; use arrow_schema::{Field, Fields}; @@ -177,6 +177,66 @@ impl ArrayAggAccumulator { datatype: datatype.clone(), }) } + + /// This function will return the underlying list array values if all valid values are consecutive without gaps (i.e. no null value point to a non empty list) + /// If there are gaps but only in the end of the list array, the function will return the values without the null values in the end + fn get_optional_values_to_merge_as_is(list_array: &ListArray) -> Option { + let offsets = list_array.value_offsets(); + // Offsets always have at least 1 value + let initial_offset = offsets[0]; + let null_count = list_array.null_count(); + + // If no nulls than just use the fast path + // This is ok as the state is a ListArray rather than a ListViewArray so all the values are consecutive + if null_count == 0 { + // According to Arrow specification, the first offset can be non-zero + let list_values = list_array + .values() + .slice(initial_offset as usize, offsets[offsets.len() - 1] as usize); + return Some(list_values); + } + + // If all the values are null than just return an empty values array + if list_array.null_count() == list_array.len() { + return Some(list_array.values().slice(0, 0)); + } + + // According to the Arrow spec, null values can point to non empty lists + // So this will check if all null values starting from the first point to a 0 length list so we can just use the values as is + + // Unwrapping is safe as we just checked if there is a null value + let nulls = list_array.nulls().unwrap(); + + let mut valid_slices_iter = nulls.valid_slices(); + + // This is safe as we validated that that are at least 1 valid value in the array + let (start, end) = valid_slices_iter.next().unwrap(); + + let start_offset = offsets[start]; + + // End is exclusive, so it already point to the last offset value + // This is valid as the length of the array is always 1 less than the length of the offsets + let mut end_offset_of_last_valid_value = offsets[end]; + + for (start, end) in valid_slices_iter { + // If there is a null value that point to a non empty list than the start offset of the valid value + // will be different that the end offset of the last valid value + if offsets[start] != end_offset_of_last_valid_value { + return None; + } + + // End is exclusive, so it already point to the last offset value + // This is valid as the length of the array is always 1 less than the length of the offsets + end_offset_of_last_valid_value = offsets[end]; + } + + let consecutive_valid_values = list_array.values().slice( + start_offset as usize, + end_offset_of_last_valid_value as usize, + ); + + Some(consecutive_valid_values) + } } impl Accumulator for ArrayAggAccumulator { @@ -208,9 +268,18 @@ impl Accumulator for ArrayAggAccumulator { } let list_arr = as_list_array(&states[0])?; - for arr in list_arr.iter().flatten() { - self.values.push(arr); + + match Self::get_optional_values_to_merge_as_is(list_arr) { + Some(values) => { + self.values.push(values); + } + None => { + for arr in list_arr.iter().flatten() { + self.values.push(arr); + } + } } + Ok(()) } From 7bb4def96877a2835de41419f446cbc48800a65c Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sat, 25 Jan 2025 21:52:33 +0200 Subject: [PATCH 2/4] update comment --- datafusion/functions-aggregate/src/array_agg.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index f685074f560b..0ff47aa1cc96 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -202,7 +202,7 @@ impl ArrayAggAccumulator { } // According to the Arrow spec, null values can point to non empty lists - // So this will check if all null values starting from the first point to a 0 length list so we can just use the values as is + // So this will check if all null values starting from the first valid value to the last one point to a 0 length list so we can just slice the underlying value // Unwrapping is safe as we just checked if there is a null value let nulls = list_array.nulls().unwrap(); From 9019c9e4beb0273da3e8f7385e193b72eadba538 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sat, 25 Jan 2025 22:06:34 +0200 Subject: [PATCH 3/4] fix slice length --- datafusion/functions-aggregate/src/array_agg.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 0ff47aa1cc96..51eab011c81a 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -190,9 +190,10 @@ impl ArrayAggAccumulator { // This is ok as the state is a ListArray rather than a ListViewArray so all the values are consecutive if null_count == 0 { // According to Arrow specification, the first offset can be non-zero - let list_values = list_array - .values() - .slice(initial_offset as usize, offsets[offsets.len() - 1] as usize); + let list_values = list_array.values().slice( + initial_offset as usize, + (offsets[offsets.len() - 1] - initial_offset) as usize, + ); return Some(list_values); } @@ -232,7 +233,7 @@ impl ArrayAggAccumulator { let consecutive_valid_values = list_array.values().slice( start_offset as usize, - end_offset_of_last_valid_value as usize, + (end_offset_of_last_valid_value - start_offset) as usize, ); Some(consecutive_valid_values) From 7e4c6819fe9e894bdaed18801b165d927d87245b Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sat, 25 Jan 2025 22:16:58 +0200 Subject: [PATCH 4/4] fix: make sure we are not inserting empty lists --- datafusion/functions-aggregate/src/array_agg.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 51eab011c81a..9fff05999122 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -272,7 +272,10 @@ impl Accumulator for ArrayAggAccumulator { match Self::get_optional_values_to_merge_as_is(list_arr) { Some(values) => { - self.values.push(values); + // Make sure we don't insert empty lists + if values.len() > 0 { + self.values.push(values); + } } None => { for arr in list_arr.iter().flatten() {