diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index e412d814239d..e7580d3e33ef 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -429,6 +429,7 @@ pub(crate) mod tests { self.column_name(), false, false, + false, ) .unwrap() } diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 29148a594f31..ddb7d36fb595 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -288,6 +288,7 @@ mod tests { name, false, false, + false, ) .unwrap() } @@ -378,6 +379,7 @@ mod tests { "Sum(b)", false, false, + false, )?]; let groups: Vec<(Arc, String)> = vec![(col("c", &schema)?, "c".to_string())]; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 97533cd5276a..329d343f13fc 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1839,34 +1839,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( .unwrap_or(sqlparser::ast::NullTreatment::RespectNulls) == NullTreatment::IgnoreNulls; - // TODO: Remove this after array_agg are all udafs let (agg_expr, filter, order_by) = match func_def { - AggregateFunctionDefinition::UDF(udf) - if udf.name() == "ARRAY_AGG" && order_by.is_some() => - { - // not yet support UDAF, fallback to builtin - let physical_sort_exprs = match order_by { - Some(exprs) => Some(create_physical_sort_exprs( - exprs, - logical_input_schema, - execution_props, - )?), - None => None, - }; - let ordering_reqs: Vec = - physical_sort_exprs.clone().unwrap_or(vec![]); - let fun = aggregates::AggregateFunction::ArrayAgg; - let agg_expr = aggregates::create_aggregate_expr( - &fun, - *distinct, - &physical_args, - &ordering_reqs, - physical_input_schema, - name, - ignore_nulls, - )?; - (agg_expr, filter, physical_sort_exprs) - } AggregateFunctionDefinition::BuiltIn(fun) => { let physical_sort_exprs = match order_by { Some(exprs) => Some(create_physical_sort_exprs( @@ -1899,19 +1872,23 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( )?), None => None, }; + let ordering_reqs: Vec = physical_sort_exprs.clone().unwrap_or(vec![]); - let agg_expr = udaf::create_aggregate_expr( + + let agg_expr = udaf::create_aggregate_expr_with_dfschema( fun, &physical_args, args, &sort_exprs, &ordering_reqs, - physical_input_schema, + logical_input_schema, name, ignore_nulls, *distinct, + false, )?; + (agg_expr, filter, physical_sort_exprs) } }; diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index a04f4f349122..736560da97db 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -113,6 +113,7 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str "sum1", false, false, + false, ) .unwrap()]; let expr = group_by_columns diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index 73ab51494de6..d722e55de487 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -20,7 +20,7 @@ use crate::ColumnarValue; use crate::{Accumulator, Expr, PartitionEvaluator}; use arrow::datatypes::{DataType, Field, Schema}; -use datafusion_common::Result; +use datafusion_common::{DFSchema, Result}; use std::sync::Arc; #[derive(Debug, Clone, Copy)] @@ -57,6 +57,9 @@ pub struct AccumulatorArgs<'a> { /// The schema of the input arguments pub schema: &'a Schema, + /// The schema of the input arguments + pub dfschema: &'a DFSchema, + /// Whether to ignore nulls. /// /// SQL allows the user to specify `IGNORE NULLS`, for example: @@ -78,6 +81,9 @@ pub struct AccumulatorArgs<'a> { /// If no `ORDER BY` is specified, `sort_exprs`` will be empty. pub sort_exprs: &'a [Expr], + /// Whether the aggregation is running in reverse order + pub is_reversed: bool, + /// The name of the aggregate expression pub name: &'a str, diff --git a/datafusion/functions-aggregate/src/approx_percentile_cont.rs b/datafusion/functions-aggregate/src/approx_percentile_cont.rs index bbe7d21e2486..dfb94a84cbec 100644 --- a/datafusion/functions-aggregate/src/approx_percentile_cont.rs +++ b/datafusion/functions-aggregate/src/approx_percentile_cont.rs @@ -30,7 +30,8 @@ use arrow::{ use arrow_schema::{Field, Schema}; use datafusion_common::{ - downcast_value, internal_err, not_impl_err, plan_err, DataFusionError, ScalarValue, + downcast_value, internal_err, not_impl_err, plan_err, DFSchema, DataFusionError, + ScalarValue, }; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::{INTEGERS, NUMERICS}; @@ -42,7 +43,7 @@ use datafusion_expr::{ use datafusion_physical_expr_common::aggregate::tdigest::{ TDigest, TryIntoF64, DEFAULT_MAX_SIZE, }; -use datafusion_physical_expr_common::utils::limited_convert_logical_expr_to_physical_expr; +use datafusion_physical_expr_common::utils::limited_convert_logical_expr_to_physical_expr_with_dfschema; make_udaf_expr_and_func!( ApproxPercentileCont, @@ -135,7 +136,9 @@ impl ApproxPercentileCont { fn get_lit_value(expr: &Expr) -> datafusion_common::Result { let empty_schema = Arc::new(Schema::empty()); let empty_batch = RecordBatch::new_empty(Arc::clone(&empty_schema)); - let expr = limited_convert_logical_expr_to_physical_expr(expr, &empty_schema)?; + let dfschema = DFSchema::empty(); + let expr = + limited_convert_logical_expr_to_physical_expr_with_dfschema(expr, &dfschema)?; let result = expr.evaluate(&empty_batch)?; match result { ColumnarValue::Array(_) => Err(DataFusionError::Internal(format!( diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 9ad453d7a4b2..777a242aa27e 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -17,19 +17,25 @@ //! `ARRAY_AGG` aggregate implementation: [`ArrayAgg`] -use arrow::array::{Array, ArrayRef, AsArray}; +use arrow::array::{new_empty_array, Array, ArrayRef, AsArray, StructArray}; use arrow::datatypes::DataType; -use arrow_schema::Field; +use arrow_schema::{Field, Fields}; use datafusion_common::cast::as_list_array; -use datafusion_common::utils::array_into_list_array_nullable; -use datafusion_common::ScalarValue; +use datafusion_common::utils::{array_into_list_array_nullable, get_row_at_idx}; +use datafusion_common::{exec_err, ScalarValue}; use datafusion_common::{internal_err, Result}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; use datafusion_expr::AggregateUDFImpl; use datafusion_expr::{Accumulator, Signature, Volatility}; -use std::collections::HashSet; +use datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays; +use datafusion_physical_expr_common::aggregate::utils::ordering_fields; +use datafusion_physical_expr_common::sort_expr::{ + limited_convert_logical_sort_exprs_to_physical_with_dfschema, LexOrdering, + PhysicalSortExpr, +}; +use std::collections::{HashSet, VecDeque}; use std::sync::Arc; make_udaf_expr_and_func!( @@ -91,11 +97,24 @@ impl AggregateUDFImpl for ArrayAgg { )]); } - Ok(vec![Field::new_list( + let mut fields = vec![Field::new_list( format_state_name(args.name, "array_agg"), Field::new("item", args.input_type.clone(), true), true, - )]) + )]; + + if args.ordering_fields.is_empty() { + return Ok(fields); + } + + let orderings = args.ordering_fields.to_vec(); + fields.push(Field::new_list( + format_state_name(args.name, "array_agg_orderings"), + Field::new("item", DataType::Struct(Fields::from(orderings)), true), + false, + )); + + Ok(fields) } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { @@ -105,7 +124,31 @@ impl AggregateUDFImpl for ArrayAgg { )?)); } - Ok(Box::new(ArrayAggAccumulator::try_new(acc_args.input_type)?)) + if acc_args.sort_exprs.is_empty() { + return Ok(Box::new(ArrayAggAccumulator::try_new(acc_args.input_type)?)); + } + + let ordering_req = limited_convert_logical_sort_exprs_to_physical_with_dfschema( + acc_args.sort_exprs, + acc_args.dfschema, + )?; + + let ordering_dtypes = ordering_req + .iter() + .map(|e| e.expr.data_type(acc_args.schema)) + .collect::>>()?; + + OrderSensitiveArrayAggAccumulator::try_new( + acc_args.input_type, + &ordering_dtypes, + ordering_req, + acc_args.is_reversed, + ) + .map(|acc| Box::new(acc) as _) + } + + fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF { + datafusion_expr::ReversedUDAF::Reversed(array_agg_udaf()) } } @@ -259,3 +302,367 @@ impl Accumulator for DistinctArrayAggAccumulator { - std::mem::size_of_val(&self.datatype) } } + +/// Accumulator for a `ARRAY_AGG(... ORDER BY ..., ...)` aggregation. In a multi +/// partition setting, partial aggregations are computed for every partition, +/// and then their results are merged. +#[derive(Debug)] +pub(crate) struct OrderSensitiveArrayAggAccumulator { + /// Stores entries in the `ARRAY_AGG` result. + values: Vec, + /// Stores values of ordering requirement expressions corresponding to each + /// entry in `values`. This information is used when merging results from + /// different partitions. For detailed information how merging is done, see + /// [`merge_ordered_arrays`]. + ordering_values: Vec>, + /// Stores datatypes of expressions inside values and ordering requirement + /// expressions. + datatypes: Vec, + /// Stores the ordering requirement of the `Accumulator`. + ordering_req: LexOrdering, + /// Whether the aggregation is running in reverse. + reverse: bool, +} + +impl OrderSensitiveArrayAggAccumulator { + /// Create a new order-sensitive ARRAY_AGG accumulator based on the given + /// item data type. + pub fn try_new( + datatype: &DataType, + ordering_dtypes: &[DataType], + ordering_req: LexOrdering, + reverse: bool, + ) -> Result { + let mut datatypes = vec![datatype.clone()]; + datatypes.extend(ordering_dtypes.iter().cloned()); + Ok(Self { + values: vec![], + ordering_values: vec![], + datatypes, + ordering_req, + reverse, + }) + } +} + +impl Accumulator for OrderSensitiveArrayAggAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let n_row = values[0].len(); + for index in 0..n_row { + let row = get_row_at_idx(values, index)?; + self.values.push(row[0].clone()); + self.ordering_values.push(row[1..].to_vec()); + } + + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + if states.is_empty() { + return Ok(()); + } + + // First entry in the state is the aggregation result. Second entry + // stores values received for ordering requirement columns for each + // aggregation value inside `ARRAY_AGG` list. For each `StructArray` + // inside `ARRAY_AGG` list, we will receive an `Array` that stores values + // received from its ordering requirement expression. (This information + // is necessary for during merging). + let [array_agg_values, agg_orderings, ..] = &states else { + return exec_err!("State should have two elements"); + }; + let Some(agg_orderings) = agg_orderings.as_list_opt::() else { + return exec_err!("Expects to receive a list array"); + }; + + // Stores ARRAY_AGG results coming from each partition + let mut partition_values = vec![]; + // Stores ordering requirement expression results coming from each partition + let mut partition_ordering_values = vec![]; + + // Existing values should be merged also. + partition_values.push(self.values.clone().into()); + partition_ordering_values.push(self.ordering_values.clone().into()); + + // Convert array to Scalars to sort them easily. Convert back to array at evaluation. + let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; + for v in array_agg_res.into_iter() { + partition_values.push(v.into()); + } + + let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; + + for partition_ordering_rows in orderings.into_iter() { + // Extract value from struct to ordering_rows for each group/partition + let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| { + if let ScalarValue::Struct(s) = ordering_row { + let mut ordering_columns_per_row = vec![]; + + for column in s.columns() { + let sv = ScalarValue::try_from_array(column, 0)?; + ordering_columns_per_row.push(sv); + } + + Ok(ordering_columns_per_row) + } else { + exec_err!( + "Expects to receive ScalarValue::Struct(Arc) but got:{:?}", + ordering_row.data_type() + ) + } + }).collect::>>()?; + + partition_ordering_values.push(ordering_value); + } + + let sort_options = self + .ordering_req + .iter() + .map(|sort_expr| sort_expr.options) + .collect::>(); + + (self.values, self.ordering_values) = merge_ordered_arrays( + &mut partition_values, + &mut partition_ordering_values, + &sort_options, + )?; + + Ok(()) + } + + fn state(&mut self) -> Result> { + let mut result = vec![self.evaluate()?]; + result.push(self.evaluate_orderings()?); + + Ok(result) + } + + fn evaluate(&mut self) -> Result { + if self.values.is_empty() { + return Ok(ScalarValue::new_null_list( + self.datatypes[0].clone(), + true, + 1, + )); + } + + let values = self.values.clone(); + let array = if self.reverse { + ScalarValue::new_list_from_iter( + values.into_iter().rev(), + &self.datatypes[0], + true, + ) + } else { + ScalarValue::new_list_from_iter(values.into_iter(), &self.datatypes[0], true) + }; + Ok(ScalarValue::List(array)) + } + + fn size(&self) -> usize { + let mut total = std::mem::size_of_val(self) + + ScalarValue::size_of_vec(&self.values) + - std::mem::size_of_val(&self.values); + + // Add size of the `self.ordering_values` + total += + std::mem::size_of::>() * self.ordering_values.capacity(); + for row in &self.ordering_values { + total += ScalarValue::size_of_vec(row) - std::mem::size_of_val(row); + } + + // Add size of the `self.datatypes` + total += std::mem::size_of::() * self.datatypes.capacity(); + for dtype in &self.datatypes { + total += dtype.size() - std::mem::size_of_val(dtype); + } + + // Add size of the `self.ordering_req` + total += std::mem::size_of::() * self.ordering_req.capacity(); + // TODO: Calculate size of each `PhysicalSortExpr` more accurately. + total + } +} + +impl OrderSensitiveArrayAggAccumulator { + fn evaluate_orderings(&self) -> Result { + let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]); + let num_columns = fields.len(); + let struct_field = Fields::from(fields.clone()); + + let mut column_wise_ordering_values = vec![]; + for i in 0..num_columns { + let column_values = self + .ordering_values + .iter() + .map(|x| x[i].clone()) + .collect::>(); + let array = if column_values.is_empty() { + new_empty_array(fields[i].data_type()) + } else { + ScalarValue::iter_to_array(column_values.into_iter())? + }; + column_wise_ordering_values.push(array); + } + + let ordering_array = StructArray::try_new( + struct_field.clone(), + column_wise_ordering_values, + None, + )?; + Ok(ScalarValue::List(Arc::new(array_into_list_array_nullable( + Arc::new(ordering_array), + )))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::VecDeque; + use std::sync::Arc; + + use arrow::array::Int64Array; + use arrow_schema::SortOptions; + + use datafusion_common::utils::get_row_at_idx; + use datafusion_common::{Result, ScalarValue}; + + #[test] + fn test_merge_asc() -> Result<()> { + let lhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), + Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), + ]; + let n_row = lhs_arrays[0].len(); + let lhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&lhs_arrays, idx)) + .collect::>>()?; + + let rhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), + Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), + ]; + let n_row = rhs_arrays[0].len(); + let rhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&rhs_arrays, idx)) + .collect::>>()?; + let sort_options = vec![ + SortOptions { + descending: false, + nulls_first: false, + }, + SortOptions { + descending: false, + nulls_first: false, + }, + ]; + + let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; + let lhs_vals = (0..lhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) + .collect::>>()?; + + let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; + let rhs_vals = (0..rhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) + .collect::>>()?; + let expected = + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef; + let expected_ts = vec![ + Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef, + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef, + ]; + + let (merged_vals, merged_ts) = merge_ordered_arrays( + &mut [lhs_vals, rhs_vals], + &mut [lhs_orderings, rhs_orderings], + &sort_options, + )?; + let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; + let merged_ts = (0..merged_ts[0].len()) + .map(|col_idx| { + ScalarValue::iter_to_array( + (0..merged_ts.len()) + .map(|row_idx| merged_ts[row_idx][col_idx].clone()), + ) + }) + .collect::>>()?; + + assert_eq!(&merged_vals, &expected); + assert_eq!(&merged_ts, &expected_ts); + + Ok(()) + } + + #[test] + fn test_merge_desc() -> Result<()> { + let lhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), + Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), + ]; + let n_row = lhs_arrays[0].len(); + let lhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&lhs_arrays, idx)) + .collect::>>()?; + + let rhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), + Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), + ]; + let n_row = rhs_arrays[0].len(); + let rhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&rhs_arrays, idx)) + .collect::>>()?; + let sort_options = vec![ + SortOptions { + descending: true, + nulls_first: false, + }, + SortOptions { + descending: true, + nulls_first: false, + }, + ]; + + // Values (which will be merged) doesn't have to be ordered. + let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; + let lhs_vals = (0..lhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) + .collect::>>()?; + + let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; + let rhs_vals = (0..rhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) + .collect::>>()?; + let expected = + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef; + let expected_ts = vec![ + Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef, + Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef, + ]; + let (merged_vals, merged_ts) = merge_ordered_arrays( + &mut [lhs_vals, rhs_vals], + &mut [lhs_orderings, rhs_orderings], + &sort_options, + )?; + let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; + let merged_ts = (0..merged_ts[0].len()) + .map(|col_idx| { + ScalarValue::iter_to_array( + (0..merged_ts.len()) + .map(|row_idx| merged_ts[row_idx][col_idx].clone()), + ) + }) + .collect::>>()?; + + assert_eq!(&merged_vals, &expected); + assert_eq!(&merged_ts, &expected_ts); + Ok(()) + } +} diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 0e619bacef82..ba11f7e91e07 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -36,7 +36,8 @@ use datafusion_expr::{ }; use datafusion_physical_expr_common::aggregate::utils::get_sort_options; use datafusion_physical_expr_common::sort_expr::{ - limited_convert_logical_sort_exprs_to_physical, LexOrdering, PhysicalSortExpr, + limited_convert_logical_sort_exprs_to_physical_with_dfschema, LexOrdering, + PhysicalSortExpr, }; create_func!(FirstValue, first_value_udaf); @@ -116,9 +117,9 @@ impl AggregateUDFImpl for FirstValue { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - let ordering_req = limited_convert_logical_sort_exprs_to_physical( + let ordering_req = limited_convert_logical_sort_exprs_to_physical_with_dfschema( acc_args.sort_exprs, - acc_args.schema, + acc_args.dfschema, )?; let ordering_dtypes = ordering_req @@ -415,9 +416,9 @@ impl AggregateUDFImpl for LastValue { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - let ordering_req = limited_convert_logical_sort_exprs_to_physical( + let ordering_req = limited_convert_logical_sort_exprs_to_physical_with_dfschema( acc_args.sort_exprs, - acc_args.schema, + acc_args.dfschema, )?; let ordering_dtypes = ordering_req diff --git a/datafusion/functions-aggregate/src/nth_value.rs b/datafusion/functions-aggregate/src/nth_value.rs index 6719c673c55b..9bbd68c9bdf6 100644 --- a/datafusion/functions-aggregate/src/nth_value.rs +++ b/datafusion/functions-aggregate/src/nth_value.rs @@ -36,7 +36,8 @@ use datafusion_expr::{ use datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays; use datafusion_physical_expr_common::aggregate::utils::ordering_fields; use datafusion_physical_expr_common::sort_expr::{ - limited_convert_logical_sort_exprs_to_physical, LexOrdering, PhysicalSortExpr, + limited_convert_logical_sort_exprs_to_physical_with_dfschema, LexOrdering, + PhysicalSortExpr, }; make_udaf_expr_and_func!( @@ -111,9 +112,9 @@ impl AggregateUDFImpl for NthValueAgg { ), }?; - let ordering_req = limited_convert_logical_sort_exprs_to_physical( + let ordering_req = limited_convert_logical_sort_exprs_to_physical_with_dfschema( acc_args.sort_exprs, - acc_args.schema, + acc_args.dfschema, )?; let ordering_dtypes = ordering_req diff --git a/datafusion/functions-aggregate/src/stddev.rs b/datafusion/functions-aggregate/src/stddev.rs index 42cf44f65d8f..247962dc2ce1 100644 --- a/datafusion/functions-aggregate/src/stddev.rs +++ b/datafusion/functions-aggregate/src/stddev.rs @@ -273,6 +273,7 @@ mod tests { use arrow::{array::*, datatypes::*}; + use datafusion_common::DFSchema; use datafusion_expr::AggregateUDF; use datafusion_physical_expr_common::aggregate::utils::get_accum_scalar_values_as_arrays; use datafusion_physical_expr_common::expressions::column::col; @@ -324,13 +325,16 @@ mod tests { agg2: Arc, schema: &Schema, ) -> Result { + let dfschema = DFSchema::empty(); let args1 = AccumulatorArgs { data_type: &DataType::Float64, schema, + dfschema: &dfschema, ignore_nulls: false, sort_exprs: &[], name: "a", is_distinct: false, + is_reversed: false, input_type: &DataType::Float64, input_exprs: &[datafusion_expr::col("a")], }; @@ -338,10 +342,12 @@ mod tests { let args2 = AccumulatorArgs { data_type: &DataType::Float64, schema, + dfschema: &dfschema, ignore_nulls: false, sort_exprs: &[], name: "a", is_distinct: false, + is_reversed: false, input_type: &DataType::Float64, input_exprs: &[datafusion_expr::col("a")], }; diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs index 7a4a3a6cac4b..05c7e1caed0e 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -23,7 +23,7 @@ pub mod tdigest; pub mod utils; use arrow::datatypes::{DataType, Field, Schema}; -use datafusion_common::{not_impl_err, Result}; +use datafusion_common::{not_impl_err, DFSchema, Result}; use datafusion_expr::function::StateFieldsArgs; use datafusion_expr::type_coercion::aggregates::check_arg_count; use datafusion_expr::ReversedUDAF; @@ -51,6 +51,10 @@ use datafusion_expr::utils::AggregateOrderSensitivity; /// /// `input_exprs` and `sort_exprs` are used for customizing Accumulator as the arguments in `AccumulatorArgs`, /// if you don't need them it is fine to pass empty slice `&[]`. +/// +/// `is_reversed` is used to indicate whether the aggregation is running in reverse order, +/// it could be used to hint Accumulator to accumulate in the reversed order, +/// you can just set to false if you are not reversing expression #[allow(clippy::too_many_arguments)] pub fn create_aggregate_expr( fun: &AggregateUDF, @@ -62,6 +66,7 @@ pub fn create_aggregate_expr( name: impl Into, ignore_nulls: bool, is_distinct: bool, + is_reversed: bool, ) -> Result> { debug_assert_eq!(sort_exprs.len(), ordering_req.len()); @@ -81,6 +86,61 @@ pub fn create_aggregate_expr( .map(|e| e.expr.data_type(schema)) .collect::>>()?; + let ordering_fields = ordering_fields(ordering_req, &ordering_types); + let name = name.into(); + + Ok(Arc::new(AggregateFunctionExpr { + fun: fun.clone(), + args: input_phy_exprs.to_vec(), + logical_args: input_exprs.to_vec(), + data_type: fun.return_type(&input_exprs_types)?, + name, + schema: schema.clone(), + dfschema: DFSchema::empty(), + sort_exprs: sort_exprs.to_vec(), + ordering_req: ordering_req.to_vec(), + ignore_nulls, + ordering_fields, + is_distinct, + input_type: input_exprs_types[0].clone(), + is_reversed, + })) +} + +#[allow(clippy::too_many_arguments)] +// This is not for external usage, consider creating with `create_aggregate_expr` instead. +pub fn create_aggregate_expr_with_dfschema( + fun: &AggregateUDF, + input_phy_exprs: &[Arc], + input_exprs: &[Expr], + sort_exprs: &[Expr], + ordering_req: &[PhysicalSortExpr], + dfschema: &DFSchema, + name: impl Into, + ignore_nulls: bool, + is_distinct: bool, + is_reversed: bool, +) -> Result> { + debug_assert_eq!(sort_exprs.len(), ordering_req.len()); + + let schema: Schema = dfschema.into(); + + let input_exprs_types = input_phy_exprs + .iter() + .map(|arg| arg.data_type(&schema)) + .collect::>>()?; + + check_arg_count( + fun.name(), + &input_exprs_types, + &fun.signature().type_signature, + )?; + + let ordering_types = ordering_req + .iter() + .map(|e| e.expr.data_type(&schema)) + .collect::>>()?; + let ordering_fields = ordering_fields(ordering_req, &ordering_types); Ok(Arc::new(AggregateFunctionExpr { @@ -90,12 +150,14 @@ pub fn create_aggregate_expr( data_type: fun.return_type(&input_exprs_types)?, name: name.into(), schema: schema.clone(), + dfschema: dfschema.clone(), sort_exprs: sort_exprs.to_vec(), ordering_req: ordering_req.to_vec(), ignore_nulls, ordering_fields, is_distinct, input_type: input_exprs_types[0].clone(), + is_reversed, })) } @@ -261,6 +323,7 @@ pub struct AggregateFunctionExpr { data_type: DataType, name: String, schema: Schema, + dfschema: DFSchema, // The logical order by expressions sort_exprs: Vec, // The physical order by expressions @@ -270,6 +333,7 @@ pub struct AggregateFunctionExpr { // fields used for order sensitive aggregation functions ordering_fields: Vec, is_distinct: bool, + is_reversed: bool, input_type: DataType, } @@ -288,6 +352,11 @@ impl AggregateFunctionExpr { pub fn ignore_nulls(&self) -> bool { self.ignore_nulls } + + /// Return if the aggregation is distinct + pub fn is_reversed(&self) -> bool { + self.is_reversed + } } impl AggregateExpr for AggregateFunctionExpr { @@ -320,12 +389,14 @@ impl AggregateExpr for AggregateFunctionExpr { let acc_args = AccumulatorArgs { data_type: &self.data_type, schema: &self.schema, + dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, sort_exprs: &self.sort_exprs, is_distinct: self.is_distinct, input_type: &self.input_type, input_exprs: &self.logical_args, name: &self.name, + is_reversed: self.is_reversed, }; self.fun.accumulator(acc_args) @@ -335,12 +406,14 @@ impl AggregateExpr for AggregateFunctionExpr { let args = AccumulatorArgs { data_type: &self.data_type, schema: &self.schema, + dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, sort_exprs: &self.sort_exprs, is_distinct: self.is_distinct, input_type: &self.input_type, input_exprs: &self.logical_args, name: &self.name, + is_reversed: self.is_reversed, }; let accumulator = self.fun.create_sliding_accumulator(args)?; @@ -405,12 +478,14 @@ impl AggregateExpr for AggregateFunctionExpr { let args = AccumulatorArgs { data_type: &self.data_type, schema: &self.schema, + dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, sort_exprs: &self.sort_exprs, is_distinct: self.is_distinct, input_type: &self.input_type, input_exprs: &self.logical_args, name: &self.name, + is_reversed: self.is_reversed, }; self.fun.groups_accumulator_supported(args) } @@ -419,12 +494,14 @@ impl AggregateExpr for AggregateFunctionExpr { let args = AccumulatorArgs { data_type: &self.data_type, schema: &self.schema, + dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, sort_exprs: &self.sort_exprs, is_distinct: self.is_distinct, input_type: &self.input_type, input_exprs: &self.logical_args, name: &self.name, + is_reversed: self.is_reversed, }; self.fun.create_groups_accumulator(args) } @@ -462,16 +539,17 @@ impl AggregateExpr for AggregateFunctionExpr { else { return Ok(None); }; - create_aggregate_expr( + create_aggregate_expr_with_dfschema( &updated_fn, &self.args, &self.logical_args, &self.sort_exprs, &self.ordering_req, - &self.schema, + &self.dfschema, self.name(), self.ignore_nulls, self.is_distinct, + self.is_reversed, ) .map(Some) } @@ -495,18 +573,23 @@ impl AggregateExpr for AggregateFunctionExpr { }) .collect::>(); let mut name = self.name().to_string(); - replace_order_by_clause(&mut name); + // TODO: Generalize order-by clause rewrite + if reverse_udf.name() == "ARRAY_AGG" { + } else { + replace_order_by_clause(&mut name); + } replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name()); - let reverse_aggr = create_aggregate_expr( + let reverse_aggr = create_aggregate_expr_with_dfschema( &reverse_udf, &self.args, &self.logical_args, &reverse_sort_exprs, &reverse_ordering_req, - &self.schema, + &self.dfschema, name, self.ignore_nulls, self.is_distinct, + !self.is_reversed, ) .unwrap(); diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 8fb1356a8092..2b506b74216f 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -22,12 +22,12 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::PhysicalExpr; -use crate::utils::limited_convert_logical_expr_to_physical_expr; +use crate::utils::limited_convert_logical_expr_to_physical_expr_with_dfschema; use arrow::compute::kernels::sort::{SortColumn, SortOptions}; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{exec_err, DFSchema, Result}; use datafusion_expr::{ColumnarValue, Expr}; /// Represents Sort operation for a column in a RecordBatch @@ -275,9 +275,9 @@ pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement]; /// Converts each [`Expr::Sort`] into a corresponding [`PhysicalSortExpr`]. /// Returns an error if the given logical expression is not a [`Expr::Sort`]. -pub fn limited_convert_logical_sort_exprs_to_physical( +pub fn limited_convert_logical_sort_exprs_to_physical_with_dfschema( exprs: &[Expr], - schema: &Schema, + dfschema: &DFSchema, ) -> Result> { // Construct PhysicalSortExpr objects from Expr objects: let mut sort_exprs = vec![]; @@ -286,7 +286,10 @@ pub fn limited_convert_logical_sort_exprs_to_physical( return exec_err!("Expects to receive sort expression"); }; sort_exprs.push(PhysicalSortExpr::new( - limited_convert_logical_expr_to_physical_expr(sort.expr.as_ref(), schema)?, + limited_convert_logical_expr_to_physical_expr_with_dfschema( + sort.expr.as_ref(), + dfschema, + )?, SortOptions { descending: !sort.asc, nulls_first: sort.nulls_first, diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils.rs index 44622bd309df..0978a906a5dc 100644 --- a/datafusion/physical-expr-common/src/utils.rs +++ b/datafusion/physical-expr-common/src/utils.rs @@ -19,15 +19,15 @@ use std::sync::Arc; use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; use arrow::compute::{and_kleene, is_not_null, SlicesIterator}; -use arrow::datatypes::Schema; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{exec_err, DFSchema, Result}; use datafusion_expr::expr::Alias; use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::Expr; +use crate::expressions::column::Column; use crate::expressions::literal::Literal; -use crate::expressions::{self, CastExpr}; +use crate::expressions::CastExpr; use crate::physical_expr::PhysicalExpr; use crate::sort_expr::PhysicalSortExpr; use crate::tree_node::ExprContext; @@ -110,19 +110,22 @@ pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec`. /// If conversion is not supported yet, returns Error. -pub fn limited_convert_logical_expr_to_physical_expr( +pub fn limited_convert_logical_expr_to_physical_expr_with_dfschema( expr: &Expr, - schema: &Schema, + dfschema: &DFSchema, ) -> Result> { match expr { - Expr::Alias(Alias { expr, .. }) => { - Ok(limited_convert_logical_expr_to_physical_expr(expr, schema)?) + Expr::Alias(Alias { expr, .. }) => Ok( + limited_convert_logical_expr_to_physical_expr_with_dfschema(expr, dfschema)?, + ), + Expr::Column(col) => { + let idx = dfschema.index_of_column(col)?; + Ok(Arc::new(Column::new(&col.name, idx))) } - Expr::Column(col) => expressions::column::col(&col.name, schema), Expr::Cast(cast_expr) => Ok(Arc::new(CastExpr::new( - limited_convert_logical_expr_to_physical_expr( + limited_convert_logical_expr_to_physical_expr_with_dfschema( cast_expr.expr.as_ref(), - schema, + dfschema, )?, cast_expr.data_type.clone(), None, diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs deleted file mode 100644 index 992c06f5bf62..000000000000 --- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs +++ /dev/null @@ -1,520 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Defines physical expressions which specify ordering requirement -//! that can evaluated at runtime during query execution - -use std::any::Any; -use std::collections::VecDeque; -use std::fmt::Debug; -use std::sync::Arc; - -use crate::aggregate::utils::{down_cast_any_ref, ordering_fields}; -use crate::expressions::format_state_name; -use crate::{ - reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr, -}; - -use arrow::datatypes::{DataType, Field}; -use arrow_array::cast::AsArray; -use arrow_array::{new_empty_array, Array, ArrayRef, StructArray}; -use arrow_schema::Fields; -use datafusion_common::utils::{array_into_list_array_nullable, get_row_at_idx}; -use datafusion_common::{exec_err, Result, ScalarValue}; -use datafusion_expr::utils::AggregateOrderSensitivity; -use datafusion_expr::Accumulator; -use datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays; - -/// Expression for a `ARRAY_AGG(... ORDER BY ..., ...)` aggregation. In a multi -/// partition setting, partial aggregations are computed for every partition, -/// and then their results are merged. -#[derive(Debug)] -pub struct OrderSensitiveArrayAgg { - /// Column name - name: String, - /// The `DataType` for the input expression - input_data_type: DataType, - /// The input expression - expr: Arc, - /// Ordering data types - order_by_data_types: Vec, - /// Ordering requirement - ordering_req: LexOrdering, - /// Whether the aggregation is running in reverse - reverse: bool, -} - -impl OrderSensitiveArrayAgg { - /// Create a new `OrderSensitiveArrayAgg` aggregate function - pub fn new( - expr: Arc, - name: impl Into, - input_data_type: DataType, - order_by_data_types: Vec, - ordering_req: LexOrdering, - ) -> Self { - Self { - name: name.into(), - input_data_type, - expr, - order_by_data_types, - ordering_req, - reverse: false, - } - } -} - -impl AggregateExpr for OrderSensitiveArrayAgg { - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new_list( - &self.name, - // This should be the same as return type of AggregateFunction::OrderSensitiveArrayAgg - Field::new("item", self.input_data_type.clone(), true), - true, - )) - } - - fn create_accumulator(&self) -> Result> { - OrderSensitiveArrayAggAccumulator::try_new( - &self.input_data_type, - &self.order_by_data_types, - self.ordering_req.clone(), - self.reverse, - ) - .map(|acc| Box::new(acc) as _) - } - - fn state_fields(&self) -> Result> { - let mut fields = vec![Field::new_list( - format_state_name(&self.name, "array_agg"), - Field::new("item", self.input_data_type.clone(), true), - true, // This should be the same as field() - )]; - let orderings = ordering_fields(&self.ordering_req, &self.order_by_data_types); - fields.push(Field::new_list( - format_state_name(&self.name, "array_agg_orderings"), - Field::new("item", DataType::Struct(Fields::from(orderings)), true), - false, - )); - Ok(fields) - } - - fn expressions(&self) -> Vec> { - vec![Arc::clone(&self.expr)] - } - - fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - (!self.ordering_req.is_empty()).then_some(&self.ordering_req) - } - - fn order_sensitivity(&self) -> AggregateOrderSensitivity { - AggregateOrderSensitivity::HardRequirement - } - - fn name(&self) -> &str { - &self.name - } - - fn reverse_expr(&self) -> Option> { - Some(Arc::new(Self { - name: self.name.to_string(), - input_data_type: self.input_data_type.clone(), - expr: Arc::clone(&self.expr), - order_by_data_types: self.order_by_data_types.clone(), - // Reverse requirement: - ordering_req: reverse_order_bys(&self.ordering_req), - reverse: !self.reverse, - })) - } -} - -impl PartialEq for OrderSensitiveArrayAgg { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.input_data_type == x.input_data_type - && self.order_by_data_types == x.order_by_data_types - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - -#[derive(Debug)] -pub(crate) struct OrderSensitiveArrayAggAccumulator { - /// Stores entries in the `ARRAY_AGG` result. - values: Vec, - /// Stores values of ordering requirement expressions corresponding to each - /// entry in `values`. This information is used when merging results from - /// different partitions. For detailed information how merging is done, see - /// [`merge_ordered_arrays`]. - ordering_values: Vec>, - /// Stores datatypes of expressions inside values and ordering requirement - /// expressions. - datatypes: Vec, - /// Stores the ordering requirement of the `Accumulator`. - ordering_req: LexOrdering, - /// Whether the aggregation is running in reverse. - reverse: bool, -} - -impl OrderSensitiveArrayAggAccumulator { - /// Create a new order-sensitive ARRAY_AGG accumulator based on the given - /// item data type. - pub fn try_new( - datatype: &DataType, - ordering_dtypes: &[DataType], - ordering_req: LexOrdering, - reverse: bool, - ) -> Result { - let mut datatypes = vec![datatype.clone()]; - datatypes.extend(ordering_dtypes.iter().cloned()); - Ok(Self { - values: vec![], - ordering_values: vec![], - datatypes, - ordering_req, - reverse, - }) - } -} - -impl Accumulator for OrderSensitiveArrayAggAccumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.is_empty() { - return Ok(()); - } - - let n_row = values[0].len(); - for index in 0..n_row { - let row = get_row_at_idx(values, index)?; - self.values.push(row[0].clone()); - self.ordering_values.push(row[1..].to_vec()); - } - - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - if states.is_empty() { - return Ok(()); - } - - // First entry in the state is the aggregation result. Second entry - // stores values received for ordering requirement columns for each - // aggregation value inside `ARRAY_AGG` list. For each `StructArray` - // inside `ARRAY_AGG` list, we will receive an `Array` that stores values - // received from its ordering requirement expression. (This information - // is necessary for during merging). - let [array_agg_values, agg_orderings, ..] = &states else { - return exec_err!("State should have two elements"); - }; - let Some(agg_orderings) = agg_orderings.as_list_opt::() else { - return exec_err!("Expects to receive a list array"); - }; - - // Stores ARRAY_AGG results coming from each partition - let mut partition_values = vec![]; - // Stores ordering requirement expression results coming from each partition - let mut partition_ordering_values = vec![]; - - // Existing values should be merged also. - partition_values.push(self.values.clone().into()); - partition_ordering_values.push(self.ordering_values.clone().into()); - - // Convert array to Scalars to sort them easily. Convert back to array at evaluation. - let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; - for v in array_agg_res.into_iter() { - partition_values.push(v.into()); - } - - let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; - - for partition_ordering_rows in orderings.into_iter() { - // Extract value from struct to ordering_rows for each group/partition - let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| { - if let ScalarValue::Struct(s) = ordering_row { - let mut ordering_columns_per_row = vec![]; - - for column in s.columns() { - let sv = ScalarValue::try_from_array(column, 0)?; - ordering_columns_per_row.push(sv); - } - - Ok(ordering_columns_per_row) - } else { - exec_err!( - "Expects to receive ScalarValue::Struct(Arc) but got:{:?}", - ordering_row.data_type() - ) - } - }).collect::>>()?; - - partition_ordering_values.push(ordering_value); - } - - let sort_options = self - .ordering_req - .iter() - .map(|sort_expr| sort_expr.options) - .collect::>(); - - (self.values, self.ordering_values) = merge_ordered_arrays( - &mut partition_values, - &mut partition_ordering_values, - &sort_options, - )?; - - Ok(()) - } - - fn state(&mut self) -> Result> { - let mut result = vec![self.evaluate()?]; - result.push(self.evaluate_orderings()?); - Ok(result) - } - - fn evaluate(&mut self) -> Result { - if self.values.is_empty() { - return Ok(ScalarValue::new_null_list( - self.datatypes[0].clone(), - true, - 1, - )); - } - - let values = self.values.clone(); - let array = if self.reverse { - ScalarValue::new_list_from_iter( - values.into_iter().rev(), - &self.datatypes[0], - true, - ) - } else { - ScalarValue::new_list_from_iter(values.into_iter(), &self.datatypes[0], true) - }; - Ok(ScalarValue::List(array)) - } - - fn size(&self) -> usize { - let mut total = std::mem::size_of_val(self) - + ScalarValue::size_of_vec(&self.values) - - std::mem::size_of_val(&self.values); - - // Add size of the `self.ordering_values` - total += - std::mem::size_of::>() * self.ordering_values.capacity(); - for row in &self.ordering_values { - total += ScalarValue::size_of_vec(row) - std::mem::size_of_val(row); - } - - // Add size of the `self.datatypes` - total += std::mem::size_of::() * self.datatypes.capacity(); - for dtype in &self.datatypes { - total += dtype.size() - std::mem::size_of_val(dtype); - } - - // Add size of the `self.ordering_req` - total += std::mem::size_of::() * self.ordering_req.capacity(); - // TODO: Calculate size of each `PhysicalSortExpr` more accurately. - total - } -} - -impl OrderSensitiveArrayAggAccumulator { - fn evaluate_orderings(&self) -> Result { - let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]); - let num_columns = fields.len(); - let struct_field = Fields::from(fields.clone()); - - let mut column_wise_ordering_values = vec![]; - for i in 0..num_columns { - let column_values = self - .ordering_values - .iter() - .map(|x| x[i].clone()) - .collect::>(); - let array = if column_values.is_empty() { - new_empty_array(fields[i].data_type()) - } else { - ScalarValue::iter_to_array(column_values.into_iter())? - }; - column_wise_ordering_values.push(array); - } - - let ordering_array = StructArray::try_new( - struct_field.clone(), - column_wise_ordering_values, - None, - )?; - Ok(ScalarValue::List(Arc::new(array_into_list_array_nullable( - Arc::new(ordering_array), - )))) - } -} - -#[cfg(test)] -mod tests { - use std::collections::VecDeque; - use std::sync::Arc; - - use crate::aggregate::array_agg_ordered::merge_ordered_arrays; - - use arrow_array::{Array, ArrayRef, Int64Array}; - use arrow_schema::SortOptions; - use datafusion_common::utils::get_row_at_idx; - use datafusion_common::{Result, ScalarValue}; - - #[test] - fn test_merge_asc() -> Result<()> { - let lhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), - Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), - ]; - let n_row = lhs_arrays[0].len(); - let lhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&lhs_arrays, idx)) - .collect::>>()?; - - let rhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), - Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), - ]; - let n_row = rhs_arrays[0].len(); - let rhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&rhs_arrays, idx)) - .collect::>>()?; - let sort_options = vec![ - SortOptions { - descending: false, - nulls_first: false, - }, - SortOptions { - descending: false, - nulls_first: false, - }, - ]; - - let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; - let lhs_vals = (0..lhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) - .collect::>>()?; - - let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; - let rhs_vals = (0..rhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) - .collect::>>()?; - let expected = - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef; - let expected_ts = vec![ - Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef, - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef, - ]; - - let (merged_vals, merged_ts) = merge_ordered_arrays( - &mut [lhs_vals, rhs_vals], - &mut [lhs_orderings, rhs_orderings], - &sort_options, - )?; - let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; - let merged_ts = (0..merged_ts[0].len()) - .map(|col_idx| { - ScalarValue::iter_to_array( - (0..merged_ts.len()) - .map(|row_idx| merged_ts[row_idx][col_idx].clone()), - ) - }) - .collect::>>()?; - - assert_eq!(&merged_vals, &expected); - assert_eq!(&merged_ts, &expected_ts); - - Ok(()) - } - - #[test] - fn test_merge_desc() -> Result<()> { - let lhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), - Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), - ]; - let n_row = lhs_arrays[0].len(); - let lhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&lhs_arrays, idx)) - .collect::>>()?; - - let rhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), - Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), - ]; - let n_row = rhs_arrays[0].len(); - let rhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&rhs_arrays, idx)) - .collect::>>()?; - let sort_options = vec![ - SortOptions { - descending: true, - nulls_first: false, - }, - SortOptions { - descending: true, - nulls_first: false, - }, - ]; - - // Values (which will be merged) doesn't have to be ordered. - let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; - let lhs_vals = (0..lhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) - .collect::>>()?; - - let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; - let rhs_vals = (0..rhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) - .collect::>>()?; - let expected = - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef; - let expected_ts = vec![ - Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef, - Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef, - ]; - let (merged_vals, merged_ts) = merge_ordered_arrays( - &mut [lhs_vals, rhs_vals], - &mut [lhs_orderings, rhs_orderings], - &sort_options, - )?; - let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; - let merged_ts = (0..merged_ts[0].len()) - .map(|col_idx| { - ScalarValue::iter_to_array( - (0..merged_ts.len()) - .map(|row_idx| merged_ts[row_idx][col_idx].clone()), - ) - }) - .collect::>>()?; - - assert_eq!(&merged_vals, &expected); - assert_eq!(&merged_ts, &expected_ts); - Ok(()) - } -} diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index 9c270561f37d..27c1533d0552 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -42,7 +42,7 @@ pub fn create_aggregate_expr( fun: &AggregateFunction, distinct: bool, input_phy_exprs: &[Arc], - ordering_req: &[PhysicalSortExpr], + _ordering_req: &[PhysicalSortExpr], input_schema: &Schema, name: impl Into, _ignore_nulls: bool, @@ -54,29 +54,9 @@ pub fn create_aggregate_expr( .map(|e| e.data_type(input_schema)) .collect::>>()?; let data_type = input_phy_types[0].clone(); - let ordering_types = ordering_req - .iter() - .map(|e| e.expr.data_type(input_schema)) - .collect::>>()?; let input_phy_exprs = input_phy_exprs.to_vec(); Ok(match (fun, distinct) { - (AggregateFunction::ArrayAgg, _) => { - let expr = Arc::clone(&input_phy_exprs[0]); - - if ordering_req.is_empty() { - return internal_err!( - "ArrayAgg without ordering should be handled as UDAF" - ); - } else { - Arc::new(expressions::OrderSensitiveArrayAgg::new( - expr, - name, - data_type, - ordering_types, - ordering_req.to_vec(), - )) - } - } + (AggregateFunction::ArrayAgg, _) => return internal_err!("not reachable"), (AggregateFunction::Min, _) => Arc::new(expressions::Min::new( Arc::clone(&input_phy_exprs[0]), name, diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 749cf2be7297..264c48513050 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -pub use datafusion_physical_expr_common::aggregate::AggregateExpr; - -pub(crate) mod array_agg_ordered; #[macro_use] pub(crate) mod min_max; pub(crate) mod groups_accumulator; @@ -31,3 +28,5 @@ pub mod utils { get_sort_options, ordering_fields, DecimalAverager, Hashable, }; } + +pub use datafusion_physical_expr_common::aggregate::AggregateExpr; diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index fa80bc9873f0..4bc61af00aec 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -34,7 +34,6 @@ mod try_cast; pub mod helpers { pub use crate::aggregate::min_max::{max, min}; } -pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg; pub use crate::aggregate::build_in::create_aggregate_expr; pub use crate::aggregate::min_max::{Max, MaxAccumulator, Min, MinAccumulator}; pub use crate::aggregate::stats::StatsType; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4146dda7641d..e7cd5cb2725b 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1194,22 +1194,25 @@ mod tests { use arrow::datatypes::DataType; use arrow_array::{Float32Array, Int32Array}; use datafusion_common::{ - assert_batches_eq, assert_batches_sorted_eq, internal_err, DataFusionError, - ScalarValue, + assert_batches_eq, assert_batches_sorted_eq, internal_err, DFSchema, DFSchemaRef, + DataFusionError, ScalarValue, }; use datafusion_execution::config::SessionConfig; use datafusion_execution::memory_pool::FairSpillPool; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_expr::expr::Sort; + use datafusion_functions_aggregate::array_agg::array_agg_udaf; use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::first_last::{FirstValue, LastValue}; use datafusion_functions_aggregate::median::median_udaf; - use datafusion_physical_expr::expressions::{lit, OrderSensitiveArrayAgg}; + use datafusion_physical_expr::expressions::lit; use datafusion_physical_expr::PhysicalSortExpr; use crate::common::collect; - use datafusion_physical_expr_common::aggregate::create_aggregate_expr; + use datafusion_physical_expr_common::aggregate::{ + create_aggregate_expr, create_aggregate_expr_with_dfschema, + }; use datafusion_physical_expr_common::expressions::Literal; use futures::{FutureExt, Stream}; @@ -1258,19 +1261,22 @@ mod tests { } /// Generates some mock data for aggregate tests. - fn some_data_v2() -> (Arc, Vec) { + fn some_data_v2() -> (Arc, DFSchemaRef, Vec) { // Define a schema: let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::UInt32, false), Field::new("b", DataType::Float64, false), ])); + let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap(); + // Generate data so that first and last value results are at 2nd and // 3rd partitions. With this construction, we guarantee we don't receive // the expected result by accident, but merging actually works properly; // i.e. it doesn't depend on the data insertion order. ( Arc::clone(&schema), + Arc::new(df_schema), vec![ RecordBatch::try_new( Arc::clone(&schema), @@ -1355,6 +1361,7 @@ mod tests { "COUNT(1)", false, false, + false, )?]; let task_ctx = if spill { @@ -1504,6 +1511,7 @@ mod tests { "AVG(b)", false, false, + false, )?]; let task_ctx = if spill { @@ -1808,6 +1816,7 @@ mod tests { "MEDIAN(a)", false, false, + false, ) } @@ -1844,6 +1853,7 @@ mod tests { "AVG(b)", false, false, + false, )?]; for (version, groups, aggregates) in [ @@ -1908,6 +1918,7 @@ mod tests { "AVG(a)", false, false, + false, )?]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); @@ -1952,6 +1963,7 @@ mod tests { "AVG(b)", false, false, + false, )?]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); @@ -1996,12 +2008,11 @@ mod tests { // FIRST_VALUE(b ORDER BY b ) fn test_first_value_agg_expr( schema: &Schema, + dfschema: &DFSchema, sort_options: SortOptions, ) -> Result> { let sort_exprs = vec![datafusion_expr::Expr::Sort(Sort { - expr: Box::new(datafusion_expr::Expr::Column( - datafusion_common::Column::new(Some("table1"), "b"), - )), + expr: Box::new(datafusion_expr::col("b")), asc: !sort_options.descending, nulls_first: sort_options.nulls_first, })]; @@ -2012,28 +2023,28 @@ mod tests { let args = vec![col("b", schema)?]; let logical_args = vec![datafusion_expr::col("b")]; let func = datafusion_expr::AggregateUDF::new_from_impl(FirstValue::new()); - datafusion_physical_expr_common::aggregate::create_aggregate_expr( + datafusion_physical_expr_common::aggregate::create_aggregate_expr_with_dfschema( &func, &args, &logical_args, &sort_exprs, &ordering_req, - schema, + dfschema, "FIRST_VALUE(b)", false, false, + false, ) } // LAST_VALUE(b ORDER BY b ) fn test_last_value_agg_expr( schema: &Schema, + dfschema: &DFSchema, sort_options: SortOptions, ) -> Result> { let sort_exprs = vec![datafusion_expr::Expr::Sort(Sort { - expr: Box::new(datafusion_expr::Expr::Column( - datafusion_common::Column::new(Some("table1"), "b"), - )), + expr: Box::new(datafusion_expr::col("b")), asc: !sort_options.descending, nulls_first: sort_options.nulls_first, })]; @@ -2044,16 +2055,17 @@ mod tests { let args = vec![col("b", schema)?]; let logical_args = vec![datafusion_expr::col("b")]; let func = datafusion_expr::AggregateUDF::new_from_impl(LastValue::new()); - create_aggregate_expr( + create_aggregate_expr_with_dfschema( &func, &args, &logical_args, &sort_exprs, &ordering_req, - schema, + dfschema, "LAST_VALUE(b)", false, false, + false, ) } @@ -2086,7 +2098,7 @@ mod tests { Arc::new(TaskContext::default()) }; - let (schema, data) = some_data_v2(); + let (schema, df_schema, data) = some_data_v2(); let partition1 = data[0].clone(); let partition2 = data[1].clone(); let partition3 = data[2].clone(); @@ -2100,9 +2112,13 @@ mod tests { nulls_first: false, }; let aggregates: Vec> = if is_first_acc { - vec![test_first_value_agg_expr(&schema, sort_options)?] + vec![test_first_value_agg_expr( + &schema, + &df_schema, + sort_options, + )?] } else { - vec![test_last_value_agg_expr(&schema, sort_options)?] + vec![test_last_value_agg_expr(&schema, &df_schema, sort_options)?] }; let memory_exec = Arc::new(MemoryExec::try_new( @@ -2169,6 +2185,8 @@ mod tests { #[tokio::test] async fn test_get_finest_requirements() -> Result<()> { let test_schema = create_test_schema()?; + let test_df_schema = DFSchema::try_from(Arc::clone(&test_schema)).unwrap(); + // Assume column a and b are aliases // Assume also that a ASC and c DESC describe the same global ordering for the table. (Since they are ordering equivalent). let options1 = SortOptions { @@ -2178,7 +2196,7 @@ mod tests { let col_a = &col("a", &test_schema)?; let col_b = &col("b", &test_schema)?; let col_c = &col("c", &test_schema)?; - let mut eq_properties = EquivalenceProperties::new(test_schema); + let mut eq_properties = EquivalenceProperties::new(Arc::clone(&test_schema)); // Columns a and b are equal. eq_properties.add_equal_conditions(col_a, col_b)?; // Aggregate requirements are @@ -2214,6 +2232,46 @@ mod tests { }, ]), ]; + let col_expr_a = Box::new(datafusion_expr::col("a")); + let col_expr_b = Box::new(datafusion_expr::col("b")); + let col_expr_c = Box::new(datafusion_expr::col("c")); + let sort_exprs = vec![ + None, + Some(vec![datafusion_expr::Expr::Sort(Sort::new( + col_expr_a.clone(), + options1.descending, + options1.nulls_first, + ))]), + Some(vec![ + datafusion_expr::Expr::Sort(Sort::new( + col_expr_a.clone(), + options1.descending, + options1.nulls_first, + )), + datafusion_expr::Expr::Sort(Sort::new( + col_expr_b.clone(), + options1.descending, + options1.nulls_first, + )), + datafusion_expr::Expr::Sort(Sort::new( + col_expr_c, + options1.descending, + options1.nulls_first, + )), + ]), + Some(vec![ + datafusion_expr::Expr::Sort(Sort::new( + col_expr_a, + options1.descending, + options1.nulls_first, + )), + datafusion_expr::Expr::Sort(Sort::new( + col_expr_b, + options1.descending, + options1.nulls_first, + )), + ]), + ]; let common_requirement = vec![ PhysicalSortExpr { expr: Arc::clone(col_a), @@ -2226,14 +2284,23 @@ mod tests { ]; let mut aggr_exprs = order_by_exprs .into_iter() - .map(|order_by_expr| { - Arc::new(OrderSensitiveArrayAgg::new( - Arc::clone(col_a), + .zip(sort_exprs.into_iter()) + .map(|(order_by_expr, sort_exprs)| { + let ordering_req = order_by_expr.unwrap_or_default(); + let sort_exprs = sort_exprs.unwrap_or_default(); + create_aggregate_expr_with_dfschema( + &array_agg_udaf(), + &[Arc::clone(col_a)], + &[], + &sort_exprs, + &ordering_req, + &test_df_schema, "array_agg", - DataType::Int32, - vec![], - order_by_expr.unwrap_or_default(), - )) as _ + false, + false, + false, + ) + .unwrap() }) .collect::>(); let group_by = PhysicalGroupBy::new_single(vec![]); @@ -2254,6 +2321,7 @@ mod tests { Field::new("a", DataType::Float32, true), Field::new("b", DataType::Float32, true), ])); + let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap(); let col_a = col("a", &schema)?; let option_desc = SortOptions { @@ -2263,8 +2331,8 @@ mod tests { let groups = PhysicalGroupBy::new_single(vec![(col_a, "a".to_string())]); let aggregates: Vec> = vec![ - test_first_value_agg_expr(&schema, option_desc)?, - test_last_value_agg_expr(&schema, option_desc)?, + test_first_value_agg_expr(&schema, &df_schema, option_desc)?, + test_last_value_agg_expr(&schema, &df_schema, option_desc)?, ]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let aggregate_exec = Arc::new(AggregateExec::try_new( @@ -2330,6 +2398,7 @@ mod tests { "1", false, false, + false, )?]; let input_batches = (0..4) diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 046977da0a37..c834005bb7c3 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -92,7 +92,7 @@ pub mod work_table; pub mod udaf { pub use datafusion_physical_expr_common::aggregate::{ - create_aggregate_expr, AggregateFunctionExpr, + create_aggregate_expr, create_aggregate_expr_with_dfschema, AggregateFunctionExpr, }; } diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 5eca7af19d16..959796489c19 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -157,6 +157,7 @@ pub fn create_window_expr( name, ignore_nulls, false, + false, )?; window_expr_from_aggregate_expr( partition_by, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 1220f42ded83..7403bc06e0f6 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -506,7 +506,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { // TODO: `order by` is not supported for UDAF yet let sort_exprs = &[]; let ordering_req = &[]; - udaf::create_aggregate_expr(agg_udf.as_ref(), &input_phy_expr, logical_exprs, sort_exprs, ordering_req, &physical_schema, name, agg_node.ignore_nulls, agg_node.distinct) + udaf::create_aggregate_expr(agg_udf.as_ref(), &input_phy_expr, logical_exprs, sort_exprs, ordering_req, &physical_schema, name, agg_node.ignore_nulls, agg_node.distinct, false) } } }).transpose()?.ok_or_else(|| { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index e9a90fce2663..140482b9903c 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -24,8 +24,8 @@ use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, CumeDist, InListExpr, IsNotNullExpr, - IsNullExpr, Literal, Max, Min, NegativeExpr, NotExpr, NthValue, Ntile, - OrderSensitiveArrayAgg, Rank, RankType, RowNumber, TryCastExpr, WindowShift, + IsNullExpr, Literal, Max, Min, NegativeExpr, NotExpr, NthValue, Ntile, Rank, + RankType, RowNumber, TryCastExpr, WindowShift, }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; @@ -260,10 +260,8 @@ struct AggrFn { fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result { let aggr_expr = expr.as_any(); - // TODO: remove OrderSensitiveArrayAgg - let inner = if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::ArrayAgg - } else if aggr_expr.downcast_ref::().is_some() { + // TODO: remove Min and Max + let inner = if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::Min } else if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::Max diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index fba6dfe42599..31ed0837d2f5 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -301,6 +301,7 @@ fn roundtrip_window() -> Result<()> { "avg(b)", false, false, + false, )?, &[], &[], @@ -324,6 +325,7 @@ fn roundtrip_window() -> Result<()> { "SUM(a) RANGE BETWEEN CURRENT ROW AND UNBOUNDED PRECEEDING", false, false, + false, )?; let sliding_aggr_window_expr = Arc::new(SlidingAggregateWindowExpr::new( @@ -367,6 +369,7 @@ fn rountrip_aggregate() -> Result<()> { "AVG(b)", false, false, + false, )?], // NTH_VALUE vec![create_aggregate_expr( @@ -379,6 +382,7 @@ fn rountrip_aggregate() -> Result<()> { "NTH_VALUE(b, 1)", false, false, + false, )?], // STRING_AGG vec![create_aggregate_expr( @@ -394,6 +398,7 @@ fn rountrip_aggregate() -> Result<()> { "STRING_AGG(name, ',')", false, false, + false, )?], ]; @@ -431,6 +436,7 @@ fn rountrip_aggregate_with_limit() -> Result<()> { "AVG(b)", false, false, + false, )?]; let agg = AggregateExec::try_new( @@ -502,6 +508,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> { "example_agg", false, false, + false, )?]; roundtrip_test_with_context( @@ -1000,6 +1007,7 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { "aggregate_udf", false, false, + false, )?; let filter = Arc::new(FilterExec::try_new( @@ -1032,6 +1040,7 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { "aggregate_udf", true, true, + false, )?; let aggregate = Arc::new(AggregateExec::try_new(