diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 4fce2ec500e4..500e731a5b4f 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1319,6 +1319,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions", + "datafusion-functions-aggregate", "itertools", "log", "paste", diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs index 760952d94815..23e98714dfa4 100644 --- a/datafusion/expr/src/aggregate_function.rs +++ b/datafusion/expr/src/aggregate_function.rs @@ -39,8 +39,6 @@ pub enum AggregateFunction { Max, /// Aggregation into an array ArrayAgg, - /// N'th value in a group according to some ordering - NthValue, } impl AggregateFunction { @@ -50,7 +48,6 @@ impl AggregateFunction { Min => "MIN", Max => "MAX", ArrayAgg => "ARRAY_AGG", - NthValue => "NTH_VALUE", } } } @@ -69,7 +66,6 @@ impl FromStr for AggregateFunction { "max" => AggregateFunction::Max, "min" => AggregateFunction::Min, "array_agg" => AggregateFunction::ArrayAgg, - "nth_value" => AggregateFunction::NthValue, _ => { return plan_err!("There is no built-in function named {name}"); } @@ -114,7 +110,6 @@ impl AggregateFunction { coerced_data_types[0].clone(), input_expr_nullable[0], )))), - AggregateFunction::NthValue => Ok(coerced_data_types[0].clone()), } } @@ -124,7 +119,6 @@ impl AggregateFunction { match self { AggregateFunction::Max | AggregateFunction::Min => Ok(true), AggregateFunction::ArrayAgg => Ok(false), - AggregateFunction::NthValue => Ok(true), } } } @@ -147,7 +141,6 @@ impl AggregateFunction { .collect::>(); Signature::uniform(1, valid, Volatility::Immutable) } - AggregateFunction::NthValue => Signature::any(2, Volatility::Immutable), } } } diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr/src/type_coercion/aggregates.rs index 0f7464b96b3e..fbec6e2f8024 100644 --- a/datafusion/expr/src/type_coercion/aggregates.rs +++ b/datafusion/expr/src/type_coercion/aggregates.rs @@ -101,7 +101,6 @@ pub fn coerce_types( // unpack the dictionary to get the value get_min_max_result_type(input_types) } - AggregateFunction::NthValue => Ok(input_types.to_vec()), } } diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index fc485a284ab4..6ae2dfb3697c 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -74,6 +74,7 @@ pub mod average; pub mod bit_and_or_xor; pub mod bool_and_or; pub mod grouping; +pub mod nth_value; pub mod string_agg; use crate::approx_percentile_cont::approx_percentile_cont_udaf; @@ -105,6 +106,7 @@ pub mod expr_fn { pub use super::first_last::last_value; pub use super::grouping::grouping; pub use super::median::median; + pub use super::nth_value::nth_value; pub use super::regr::regr_avgx; pub use super::regr::regr_avgy; pub use super::regr::regr_count; @@ -157,6 +159,7 @@ pub fn all_default_aggregate_functions() -> Vec> { bool_and_or::bool_or_udaf(), average::avg_udaf(), grouping::grouping_udaf(), + nth_value::nth_value_udaf(), ] } diff --git a/datafusion/physical-expr/src/aggregate/nth_value.rs b/datafusion/functions-aggregate/src/nth_value.rs similarity index 77% rename from datafusion/physical-expr/src/aggregate/nth_value.rs rename to datafusion/functions-aggregate/src/nth_value.rs index b75ecd1066ca..6719c673c55b 100644 --- a/datafusion/physical-expr/src/aggregate/nth_value.rs +++ b/datafusion/functions-aggregate/src/nth_value.rs @@ -22,149 +22,149 @@ use std::any::Any; use std::collections::VecDeque; use std::sync::Arc; -use crate::aggregate::array_agg_ordered::merge_ordered_arrays; -use crate::aggregate::utils::{down_cast_any_ref, ordering_fields}; -use crate::expressions::{format_state_name, Literal}; -use crate::{ - reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr, -}; - -use arrow_array::cast::AsArray; -use arrow_array::{new_empty_array, ArrayRef, StructArray}; +use arrow::array::{new_empty_array, ArrayRef, AsArray, StructArray}; use arrow_schema::{DataType, Field, Fields}; + use datafusion_common::utils::{array_into_list_array_nullable, get_row_at_idx}; -use datafusion_common::{exec_err, internal_err, Result, ScalarValue}; -use datafusion_expr::utils::AggregateOrderSensitivity; -use datafusion_expr::Accumulator; +use datafusion_common::{exec_err, internal_err, not_impl_err, Result, ScalarValue}; +use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion_expr::utils::format_state_name; +use datafusion_expr::{ + Accumulator, AggregateUDF, AggregateUDFImpl, Expr, ReversedUDAF, Signature, + Volatility, +}; +use datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays; +use datafusion_physical_expr_common::aggregate::utils::ordering_fields; +use datafusion_physical_expr_common::sort_expr::{ + limited_convert_logical_sort_exprs_to_physical, LexOrdering, PhysicalSortExpr, +}; + +make_udaf_expr_and_func!( + NthValueAgg, + nth_value, + "Returns the nth value in a group of values.", + nth_value_udaf +); /// Expression for a `NTH_VALUE(... ORDER BY ..., ...)` aggregation. In a multi /// partition setting, partial aggregations are computed for every partition, /// and then their results are merged. #[derive(Debug)] pub struct NthValueAgg { - /// Column name - name: String, - /// The `DataType` for the input expression - input_data_type: DataType, - /// The input expression - expr: Arc, - /// The `N` value. - n: i64, - /// If the input expression can have `NULL`s - nullable: bool, - /// Ordering data types - order_by_data_types: Vec, - /// Ordering requirement - ordering_req: LexOrdering, + signature: Signature, + /// Determines whether `N` is relative to the beginning or the end + /// of the aggregation. When set to `true`, then `N` is from the end. + reversed: bool, } impl NthValueAgg { /// Create a new `NthValueAgg` aggregate function - pub fn new( - expr: Arc, - n: i64, - name: impl Into, - input_data_type: DataType, - nullable: bool, - order_by_data_types: Vec, - ordering_req: LexOrdering, - ) -> Self { + pub fn new() -> Self { Self { - name: name.into(), - input_data_type, - expr, - n, - nullable, - order_by_data_types, - ordering_req, + signature: Signature::any(2, Volatility::Immutable), + reversed: false, } } + + pub fn with_reversed(mut self, reversed: bool) -> Self { + self.reversed = reversed; + self + } } -impl AggregateExpr for NthValueAgg { +impl Default for NthValueAgg { + fn default() -> Self { + Self::new() + } +} + +impl AggregateUDFImpl for NthValueAgg { fn as_any(&self) -> &dyn Any { self } - fn field(&self) -> Result { - Ok(Field::new(&self.name, self.input_data_type.clone(), true)) + fn name(&self) -> &str { + "nth_value" + } + + fn signature(&self) -> &Signature { + &self.signature } - fn create_accumulator(&self) -> Result> { - Ok(Box::new(NthValueAccumulator::try_new( - self.n, - &self.input_data_type, - &self.order_by_data_types, - self.ordering_req.clone(), - )?)) + fn return_type(&self, arg_types: &[DataType]) -> Result { + Ok(arg_types[0].clone()) } - fn state_fields(&self) -> Result> { + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + let n = match acc_args.input_exprs[1] { + Expr::Literal(ScalarValue::Int64(Some(value))) => { + if self.reversed { + Ok(-value) + } else { + Ok(value) + } + } + _ => not_impl_err!( + "{} not supported for n: {}", + self.name(), + &acc_args.input_exprs[1] + ), + }?; + + let ordering_req = limited_convert_logical_sort_exprs_to_physical( + acc_args.sort_exprs, + acc_args.schema, + )?; + + let ordering_dtypes = ordering_req + .iter() + .map(|e| e.expr.data_type(acc_args.schema)) + .collect::>>()?; + + NthValueAccumulator::try_new( + n, + acc_args.input_type, + &ordering_dtypes, + ordering_req, + ) + .map(|acc| Box::new(acc) as _) + } + + fn state_fields(&self, args: StateFieldsArgs) -> Result> { let mut fields = vec![Field::new_list( - format_state_name(&self.name, "nth_value"), - Field::new("item", self.input_data_type.clone(), true), - self.nullable, // This should be the same as field() + format_state_name(self.name(), "nth_value"), + // TODO: The nullability of the list element should be configurable. + // The hard-coded `true` should be changed once the field for + // nullability is added to `StateFieldArgs` struct. + // See: https://github.com/apache/datafusion/pull/11063 + Field::new("item", args.input_type.clone(), true), + false, )]; - if !self.ordering_req.is_empty() { - let orderings = - ordering_fields(&self.ordering_req, &self.order_by_data_types); + let orderings = args.ordering_fields.to_vec(); + if !orderings.is_empty() { fields.push(Field::new_list( - format_state_name(&self.name, "nth_value_orderings"), + format_state_name(self.name(), "nth_value_orderings"), Field::new("item", DataType::Struct(Fields::from(orderings)), true), - self.nullable, + false, )); } Ok(fields) } - fn expressions(&self) -> Vec> { - let n = Arc::new(Literal::new(ScalarValue::Int64(Some(self.n)))) as _; - vec![Arc::clone(&self.expr), n] + fn aliases(&self) -> &[String] { + &[] } - fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - (!self.ordering_req.is_empty()).then_some(&self.ordering_req) - } - - fn order_sensitivity(&self) -> AggregateOrderSensitivity { - AggregateOrderSensitivity::HardRequirement - } - - fn name(&self) -> &str { - &self.name - } - - fn reverse_expr(&self) -> Option> { - Some(Arc::new(Self { - name: self.name.to_string(), - input_data_type: self.input_data_type.clone(), - expr: Arc::clone(&self.expr), - // index should be from the opposite side - n: -self.n, - nullable: self.nullable, - order_by_data_types: self.order_by_data_types.clone(), - // reverse requirement - ordering_req: reverse_order_bys(&self.ordering_req), - }) as _) - } -} - -impl PartialEq for NthValueAgg { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.input_data_type == x.input_data_type - && self.order_by_data_types == x.order_by_data_types - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) + fn reverse_expr(&self) -> ReversedUDAF { + ReversedUDAF::Reversed(Arc::from(AggregateUDF::from( + Self::new().with_reversed(!self.reversed), + ))) } } #[derive(Debug)] -pub(crate) struct NthValueAccumulator { +pub struct NthValueAccumulator { + /// The `N` value. n: i64, /// Stores entries in the `NTH_VALUE` result. values: VecDeque, diff --git a/datafusion/functions-array/Cargo.toml b/datafusion/functions-array/Cargo.toml index eb1ef9e03f31..73c5b9114a2c 100644 --- a/datafusion/functions-array/Cargo.toml +++ b/datafusion/functions-array/Cargo.toml @@ -49,6 +49,7 @@ datafusion-common = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions = { workspace = true } +datafusion-functions-aggregate = { workspace = true } itertools = { version = "0.12", features = ["use_std"] } log = { workspace = true } paste = "1.0.14" diff --git a/datafusion/functions-array/src/planner.rs b/datafusion/functions-array/src/planner.rs index cfb3e5ed0729..01853fb56908 100644 --- a/datafusion/functions-array/src/planner.rs +++ b/datafusion/functions-array/src/planner.rs @@ -23,6 +23,7 @@ use datafusion_expr::{ sqlparser, AggregateFunction, Expr, ExprSchemable, GetFieldAccess, }; use datafusion_functions::expr_fn::get_field; +use datafusion_functions_aggregate::nth_value::nth_value_udaf; use crate::{ array_has::array_has_all, @@ -119,8 +120,8 @@ impl UserDefinedSQLPlanner for FieldAccessPlanner { // Special case for array_agg(expr)[index] to NTH_VALUE(expr, index) Expr::AggregateFunction(agg_func) if is_array_agg(&agg_func) => { Ok(PlannerResult::Planned(Expr::AggregateFunction( - datafusion_expr::expr::AggregateFunction::new( - AggregateFunction::NthValue, + datafusion_expr::expr::AggregateFunction::new_udf( + nth_value_udaf(), agg_func .args .into_iter() diff --git a/datafusion/physical-expr-common/src/aggregate/merge_arrays.rs b/datafusion/physical-expr-common/src/aggregate/merge_arrays.rs new file mode 100644 index 000000000000..544bdc182829 --- /dev/null +++ b/datafusion/physical-expr-common/src/aggregate/merge_arrays.rs @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::compute::SortOptions; +use datafusion_common::utils::compare_rows; +use datafusion_common::{exec_err, ScalarValue}; +use std::cmp::Ordering; +use std::collections::{BinaryHeap, VecDeque}; + +/// This is a wrapper struct to be able to correctly merge `ARRAY_AGG` data from +/// multiple partitions using `BinaryHeap`. When used inside `BinaryHeap`, this +/// struct returns smallest `CustomElement`, where smallest is determined by +/// `ordering` values (`Vec`) according to `sort_options`. +#[derive(Debug, PartialEq, Eq)] +struct CustomElement<'a> { + /// Stores the partition this entry came from + branch_idx: usize, + /// Values to merge + value: ScalarValue, + // Comparison "key" + ordering: Vec, + /// Options defining the ordering semantics + sort_options: &'a [SortOptions], +} + +impl<'a> CustomElement<'a> { + fn new( + branch_idx: usize, + value: ScalarValue, + ordering: Vec, + sort_options: &'a [SortOptions], + ) -> Self { + Self { + branch_idx, + value, + ordering, + sort_options, + } + } + + fn ordering( + &self, + current: &[ScalarValue], + target: &[ScalarValue], + ) -> datafusion_common::Result { + // Calculate ordering according to `sort_options` + compare_rows(current, target, self.sort_options) + } +} + +// Overwrite ordering implementation such that +// - `self.ordering` values are used for comparison, +// - When used inside `BinaryHeap` it is a min-heap. +impl<'a> Ord for CustomElement<'a> { + fn cmp(&self, other: &Self) -> Ordering { + // Compares according to custom ordering + self.ordering(&self.ordering, &other.ordering) + // Convert max heap to min heap + .map(|ordering| ordering.reverse()) + // This function return error, when `self.ordering` and `other.ordering` + // have different types (such as one is `ScalarValue::Int64`, other is `ScalarValue::Float32`) + // Here this case won't happen, because data from each partition will have same type + .unwrap() + } +} + +impl<'a> PartialOrd for CustomElement<'a> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// This functions merges `values` array (`&[Vec]`) into single array `Vec` +/// Merging done according to ordering values stored inside `ordering_values` (`&[Vec>]`) +/// Inner `Vec` in the `ordering_values` can be thought as ordering information for the +/// each `ScalarValue` in the `values` array. +/// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec` +/// of the `ordering_values` array). +/// +/// As an example +/// values can be \[ +/// \[1, 2, 3, 4, 5\], +/// \[1, 2, 3, 4\], +/// \[1, 2, 3, 4, 5, 6\], +/// \] +/// In this case we will be merging three arrays (doesn't have to be same size) +/// and produce a merged array with size 15 (sum of 5+4+6) +/// Merging will be done according to ordering at `ordering_values` vector. +/// As an example `ordering_values` can be [ +/// \[(1, a), (2, b), (3, b), (4, a), (5, b) \], +/// \[(1, a), (2, b), (3, b), (4, a) \], +/// \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \], +/// ] +/// For each ScalarValue in the `values` we have a corresponding `Vec` (like timestamp of it) +/// for the example above `sort_options` will have size two, that defines ordering requirement of the merge. +/// Inner `Vec`s of the `ordering_values` will be compared according `sort_options` (Their sizes should match) +pub fn merge_ordered_arrays( + // We will merge values into single `Vec`. + values: &mut [VecDeque], + // `values` will be merged according to `ordering_values`. + // Inner `Vec` can be thought as ordering information for the + // each `ScalarValue` in the values`. + ordering_values: &mut [VecDeque>], + // Defines according to which ordering comparisons should be done. + sort_options: &[SortOptions], +) -> datafusion_common::Result<(Vec, Vec>)> { + // Keep track the most recent data of each branch, in binary heap data structure. + let mut heap = BinaryHeap::::new(); + + if values.len() != ordering_values.len() + || values + .iter() + .zip(ordering_values.iter()) + .any(|(vals, ordering_vals)| vals.len() != ordering_vals.len()) + { + return exec_err!( + "Expects values arguments and/or ordering_values arguments to have same size" + ); + } + let n_branch = values.len(); + let mut merged_values = vec![]; + let mut merged_orderings = vec![]; + // Continue iterating the loop until consuming data of all branches. + loop { + let minimum = if let Some(minimum) = heap.pop() { + minimum + } else { + // Heap is empty, fill it with the next entries from each branch. + for branch_idx in 0..n_branch { + if let Some(orderings) = ordering_values[branch_idx].pop_front() { + // Their size should be same, we can safely .unwrap here. + let value = values[branch_idx].pop_front().unwrap(); + // Push the next element to the heap: + heap.push(CustomElement::new( + branch_idx, + value, + orderings, + sort_options, + )); + } + // If None, we consumed this branch, skip it. + } + + // Now we have filled the heap, get the largest entry (this will be + // the next element in merge). + if let Some(minimum) = heap.pop() { + minimum + } else { + // Heap is empty, this means that all indices are same with + // `end_indices`. We have consumed all of the branches, merge + // is completed, exit from the loop: + break; + } + }; + let CustomElement { + branch_idx, + value, + ordering, + .. + } = minimum; + // Add minimum value in the heap to the result + merged_values.push(value); + merged_orderings.push(ordering); + + // If there is an available entry, push next entry in the most + // recently consumed branch to the heap. + if let Some(orderings) = ordering_values[branch_idx].pop_front() { + // Their size should be same, we can safely .unwrap here. + let value = values[branch_idx].pop_front().unwrap(); + // Push the next element to the heap: + heap.push(CustomElement::new( + branch_idx, + value, + orderings, + sort_options, + )); + } + } + + Ok((merged_values, merged_orderings)) +} diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs index cd309b7f7d29..35666f199ace 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -17,6 +17,7 @@ pub mod count_distinct; pub mod groups_accumulator; +pub mod merge_arrays; pub mod stats; pub mod tdigest; pub mod utils; diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs index 3b122fe9f82b..a64d97637c3b 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs @@ -19,8 +19,7 @@ //! that can evaluated at runtime during query execution use std::any::Any; -use std::cmp::Ordering; -use std::collections::{BinaryHeap, VecDeque}; +use std::collections::VecDeque; use std::fmt::Debug; use std::sync::Arc; @@ -33,11 +32,12 @@ use crate::{ use arrow::datatypes::{DataType, Field}; use arrow_array::cast::AsArray; use arrow_array::{new_empty_array, Array, ArrayRef, StructArray}; -use arrow_schema::{Fields, SortOptions}; -use datafusion_common::utils::{array_into_list_array, compare_rows, get_row_at_idx}; +use arrow_schema::Fields; +use datafusion_common::utils::{array_into_list_array, get_row_at_idx}; use datafusion_common::{exec_err, Result, ScalarValue}; use datafusion_expr::utils::AggregateOrderSensitivity; use datafusion_expr::Accumulator; +use datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays; /// Expression for a `ARRAY_AGG(... ORDER BY ..., ...)` aggregation. In a multi /// partition setting, partial aggregations are computed for every partition, @@ -384,179 +384,6 @@ impl OrderSensitiveArrayAggAccumulator { } } -/// This is a wrapper struct to be able to correctly merge `ARRAY_AGG` data from -/// multiple partitions using `BinaryHeap`. When used inside `BinaryHeap`, this -/// struct returns smallest `CustomElement`, where smallest is determined by -/// `ordering` values (`Vec`) according to `sort_options`. -#[derive(Debug, PartialEq, Eq)] -struct CustomElement<'a> { - /// Stores the partition this entry came from - branch_idx: usize, - /// Values to merge - value: ScalarValue, - // Comparison "key" - ordering: Vec, - /// Options defining the ordering semantics - sort_options: &'a [SortOptions], -} - -impl<'a> CustomElement<'a> { - fn new( - branch_idx: usize, - value: ScalarValue, - ordering: Vec, - sort_options: &'a [SortOptions], - ) -> Self { - Self { - branch_idx, - value, - ordering, - sort_options, - } - } - - fn ordering( - &self, - current: &[ScalarValue], - target: &[ScalarValue], - ) -> Result { - // Calculate ordering according to `sort_options` - compare_rows(current, target, self.sort_options) - } -} - -// Overwrite ordering implementation such that -// - `self.ordering` values are used for comparison, -// - When used inside `BinaryHeap` it is a min-heap. -impl<'a> Ord for CustomElement<'a> { - fn cmp(&self, other: &Self) -> Ordering { - // Compares according to custom ordering - self.ordering(&self.ordering, &other.ordering) - // Convert max heap to min heap - .map(|ordering| ordering.reverse()) - // This function return error, when `self.ordering` and `other.ordering` - // have different types (such as one is `ScalarValue::Int64`, other is `ScalarValue::Float32`) - // Here this case won't happen, because data from each partition will have same type - .unwrap() - } -} - -impl<'a> PartialOrd for CustomElement<'a> { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -/// This functions merges `values` array (`&[Vec]`) into single array `Vec` -/// Merging done according to ordering values stored inside `ordering_values` (`&[Vec>]`) -/// Inner `Vec` in the `ordering_values` can be thought as ordering information for the -/// each `ScalarValue` in the `values` array. -/// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec` -/// of the `ordering_values` array). -/// -/// As an example -/// values can be \[ -/// \[1, 2, 3, 4, 5\], -/// \[1, 2, 3, 4\], -/// \[1, 2, 3, 4, 5, 6\], -/// \] -/// In this case we will be merging three arrays (doesn't have to be same size) -/// and produce a merged array with size 15 (sum of 5+4+6) -/// Merging will be done according to ordering at `ordering_values` vector. -/// As an example `ordering_values` can be [ -/// \[(1, a), (2, b), (3, b), (4, a), (5, b) \], -/// \[(1, a), (2, b), (3, b), (4, a) \], -/// \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \], -/// ] -/// For each ScalarValue in the `values` we have a corresponding `Vec` (like timestamp of it) -/// for the example above `sort_options` will have size two, that defines ordering requirement of the merge. -/// Inner `Vec`s of the `ordering_values` will be compared according `sort_options` (Their sizes should match) -pub(crate) fn merge_ordered_arrays( - // We will merge values into single `Vec`. - values: &mut [VecDeque], - // `values` will be merged according to `ordering_values`. - // Inner `Vec` can be thought as ordering information for the - // each `ScalarValue` in the values`. - ordering_values: &mut [VecDeque>], - // Defines according to which ordering comparisons should be done. - sort_options: &[SortOptions], -) -> Result<(Vec, Vec>)> { - // Keep track the most recent data of each branch, in binary heap data structure. - let mut heap = BinaryHeap::::new(); - - if values.len() != ordering_values.len() - || values - .iter() - .zip(ordering_values.iter()) - .any(|(vals, ordering_vals)| vals.len() != ordering_vals.len()) - { - return exec_err!( - "Expects values arguments and/or ordering_values arguments to have same size" - ); - } - let n_branch = values.len(); - let mut merged_values = vec![]; - let mut merged_orderings = vec![]; - // Continue iterating the loop until consuming data of all branches. - loop { - let minimum = if let Some(minimum) = heap.pop() { - minimum - } else { - // Heap is empty, fill it with the next entries from each branch. - for branch_idx in 0..n_branch { - if let Some(orderings) = ordering_values[branch_idx].pop_front() { - // Their size should be same, we can safely .unwrap here. - let value = values[branch_idx].pop_front().unwrap(); - // Push the next element to the heap: - heap.push(CustomElement::new( - branch_idx, - value, - orderings, - sort_options, - )); - } - // If None, we consumed this branch, skip it. - } - - // Now we have filled the heap, get the largest entry (this will be - // the next element in merge). - if let Some(minimum) = heap.pop() { - minimum - } else { - // Heap is empty, this means that all indices are same with - // `end_indices`. We have consumed all of the branches, merge - // is completed, exit from the loop: - break; - } - }; - let CustomElement { - branch_idx, - value, - ordering, - .. - } = minimum; - // Add minimum value in the heap to the result - merged_values.push(value); - merged_orderings.push(ordering); - - // If there is an available entry, push next entry in the most - // recently consumed branch to the heap. - if let Some(orderings) = ordering_values[branch_idx].pop_front() { - // Their size should be same, we can safely .unwrap here. - let value = values[branch_idx].pop_front().unwrap(); - // Push the next element to the heap: - heap.push(CustomElement::new( - branch_idx, - value, - orderings, - sort_options, - )); - } - } - - Ok((merged_values, merged_orderings)) -} - #[cfg(test)] mod tests { use std::collections::VecDeque; diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index 1eadf7247f7c..d4cd3d51d174 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -30,10 +30,10 @@ use std::sync::Arc; use arrow::datatypes::Schema; -use datafusion_common::{exec_err, not_impl_err, Result}; +use datafusion_common::{not_impl_err, Result}; use datafusion_expr::AggregateFunction; -use crate::expressions::{self, Literal}; +use crate::expressions::{self}; use crate::{AggregateExpr, PhysicalExpr, PhysicalSortExpr}; /// Create a physical aggregation expression. @@ -102,26 +102,6 @@ pub fn create_aggregate_expr( name, data_type, )), - (AggregateFunction::NthValue, _) => { - let expr = &input_phy_exprs[0]; - let Some(n) = input_phy_exprs[1] - .as_any() - .downcast_ref::() - .map(|literal| literal.value()) - else { - return exec_err!("Second argument of NTH_VALUE needs to be a literal"); - }; - let nullable = expr.nullable(input_schema)?; - Arc::new(expressions::NthValueAgg::new( - Arc::clone(expr), - n.clone().try_into()?, - name, - input_phy_types[0].clone(), - nullable, - ordering_types, - ordering_req.to_vec(), - )) - } }) } diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index f0de7446f6f1..b9d803900f53 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -20,7 +20,6 @@ pub use datafusion_physical_expr_common::aggregate::AggregateExpr; pub(crate) mod array_agg; pub(crate) mod array_agg_distinct; pub(crate) mod array_agg_ordered; -pub(crate) mod nth_value; #[macro_use] pub(crate) mod min_max; pub(crate) mod groups_accumulator; diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 1f2c955ad07e..7d8f12091f46 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -39,7 +39,6 @@ pub use crate::aggregate::array_agg_distinct::DistinctArrayAgg; pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg; pub use crate::aggregate::build_in::create_aggregate_expr; pub use crate::aggregate::min_max::{Max, MaxAccumulator, Min, MinAccumulator}; -pub use crate::aggregate::nth_value::NthValueAgg; pub use crate::aggregate::stats::StatsType; pub use crate::window::cume_dist::{cume_dist, CumeDist}; pub use crate::window::lead_lag::{lag, lead, WindowShift}; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index ce6c0c53c3fc..345765b08be3 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -500,7 +500,7 @@ enum AggregateFunction { // REGR_SYY = 33; // REGR_SXY = 34; // STRING_AGG = 35; - NTH_VALUE_AGG = 36; + // NTH_VALUE_AGG = 36; } message AggregateExprNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 347654e52b73..905f0d984955 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -535,7 +535,6 @@ impl serde::Serialize for AggregateFunction { Self::Min => "MIN", Self::Max => "MAX", Self::ArrayAgg => "ARRAY_AGG", - Self::NthValueAgg => "NTH_VALUE_AGG", }; serializer.serialize_str(variant) } @@ -550,7 +549,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { "MIN", "MAX", "ARRAY_AGG", - "NTH_VALUE_AGG", ]; struct GeneratedVisitor; @@ -594,7 +592,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { "MIN" => Ok(AggregateFunction::Min), "MAX" => Ok(AggregateFunction::Max), "ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg), - "NTH_VALUE_AGG" => Ok(AggregateFunction::NthValueAgg), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c74f172482b7..b16d26ee6e1e 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1924,7 +1924,7 @@ pub enum AggregateFunction { /// AVG = 3; /// COUNT = 4; /// APPROX_DISTINCT = 5; - ArrayAgg = 6, + /// /// VARIANCE = 7; /// VARIANCE_POP = 8; /// COVARIANCE = 9; @@ -1952,7 +1952,8 @@ pub enum AggregateFunction { /// REGR_SYY = 33; /// REGR_SXY = 34; /// STRING_AGG = 35; - NthValueAgg = 36, + /// NTH_VALUE_AGG = 36; + ArrayAgg = 6, } impl AggregateFunction { /// String value of the enum field names used in the ProtoBuf definition. @@ -1964,7 +1965,6 @@ impl AggregateFunction { AggregateFunction::Min => "MIN", AggregateFunction::Max => "MAX", AggregateFunction::ArrayAgg => "ARRAY_AGG", - AggregateFunction::NthValueAgg => "NTH_VALUE_AGG", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -1973,7 +1973,6 @@ impl AggregateFunction { "MIN" => Some(Self::Min), "MAX" => Some(Self::Max), "ARRAY_AGG" => Some(Self::ArrayAgg), - "NTH_VALUE_AGG" => Some(Self::NthValueAgg), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index f4fb69280436..a58af8afdd04 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -145,7 +145,6 @@ impl From for AggregateFunction { protobuf::AggregateFunction::Min => Self::Min, protobuf::AggregateFunction::Max => Self::Max, protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg, - protobuf::AggregateFunction::NthValueAgg => Self::NthValue, } } } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 7570040a1d08..d8f8ea002b2d 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -117,7 +117,6 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction { AggregateFunction::Min => Self::Min, AggregateFunction::Max => Self::Max, AggregateFunction::ArrayAgg => Self::ArrayAgg, - AggregateFunction::NthValue => Self::NthValueAgg, } } } @@ -377,9 +376,6 @@ pub fn serialize_expr( AggregateFunction::ArrayAgg => protobuf::AggregateFunction::ArrayAgg, AggregateFunction::Min => protobuf::AggregateFunction::Min, AggregateFunction::Max => protobuf::AggregateFunction::Max, - AggregateFunction::NthValue => { - protobuf::AggregateFunction::NthValueAgg - } }; let aggregate_expr = protobuf::AggregateExprNode { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 23cdc666e701..5e982ad2afde 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -25,8 +25,8 @@ use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ ArrayAgg, BinaryExpr, CaseExpr, CastExpr, Column, CumeDist, DistinctArrayAgg, InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, NegativeExpr, NotExpr, - NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType, RowNumber, - TryCastExpr, WindowShift, + NthValue, Ntile, OrderSensitiveArrayAgg, Rank, RankType, RowNumber, TryCastExpr, + WindowShift, }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; @@ -255,8 +255,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result { protobuf::AggregateFunction::Min } else if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::Max - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::NthValueAgg } else { return not_impl_err!("Aggregate function not supported: {expr:?}"); }; diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 106247b2d441..d8d85ace1a29 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -38,7 +38,7 @@ use datafusion::datasource::physical_plan::{ }; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility}; -use datafusion::physical_expr::expressions::{Max, NthValueAgg}; +use datafusion::physical_expr::expressions::Max; use datafusion::physical_expr::window::SlidingAggregateWindowExpr; use datafusion::physical_expr::{PhysicalSortRequirement, ScalarFunctionExpr}; use datafusion::physical_plan::aggregates::{ @@ -81,6 +81,7 @@ use datafusion_expr::{ ScalarUDFImpl, Signature, SimpleAggregateUDF, WindowFrame, WindowFrameBound, }; use datafusion_functions_aggregate::average::avg_udaf; +use datafusion_functions_aggregate::nth_value::nth_value_udaf; use datafusion_functions_aggregate::string_agg::StringAgg; use datafusion_proto::physical_plan::{ AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec, @@ -362,15 +363,17 @@ fn rountrip_aggregate() -> Result<()> { false, )?], // NTH_VALUE - vec![Arc::new(NthValueAgg::new( - col("b", &schema)?, - 1, - "NTH_VALUE(b, 1)".to_string(), - DataType::Int64, + vec![udaf::create_aggregate_expr( + &nth_value_udaf(), + &[col("b", &schema)?, lit(1u64)], + &[], + &[], + &[], + &schema, + "NTH_VALUE(b, 1)", false, - Vec::new(), - Vec::new(), - ))], + false, + )?], // STRING_AGG vec![udaf::create_aggregate_expr( &AggregateUDF::new_from_impl(StringAgg::new()), diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index ea460cb3efc2..d9ddf57eb192 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -415,9 +415,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ) -> Result { // check udaf first let udaf = self.context_provider.get_aggregate_meta(name); - // Skip first value and last value, since we expect window builtin first/last value not udaf version + // Use the builtin window function instead of the user-defined aggregate function if udaf.as_ref().is_some_and(|udaf| { - udaf.name() != "first_value" && udaf.name() != "last_value" + udaf.name() != "first_value" + && udaf.name() != "last_value" + && udaf.name() != "nth_value" }) { Ok(WindowFunctionDefinition::AggregateUDF(udaf.unwrap())) } else { diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt b/datafusion/sqllogictest/test_files/agg_func_substitute.slt index 342d45e7fb24..9a0a1d587433 100644 --- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt +++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt @@ -39,16 +39,16 @@ EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result GROUP BY a; ---- logical_plan -01)Projection: multiple_ordered_table.a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] AS result -02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[NTH_VALUE(multiple_ordered_table.c, Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]] +01)Projection: multiple_ordered_table.a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] AS result +02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[nth_value(multiple_ordered_table.c, Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]] 03)----TableScan: multiple_ordered_table projection=[a, c] physical_plan -01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] -02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] +02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] 04)------CoalesceBatchesExec: target_batch_size=8192 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true @@ -59,16 +59,16 @@ EXPLAIN SELECT a, NTH_VALUE(c, 1 ORDER BY c) as result GROUP BY a; ---- logical_plan -01)Projection: multiple_ordered_table.a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] AS result -02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[NTH_VALUE(multiple_ordered_table.c, Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]] +01)Projection: multiple_ordered_table.a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] AS result +02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[nth_value(multiple_ordered_table.c, Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]] 03)----TableScan: multiple_ordered_table projection=[a, c] physical_plan -01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] -02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] +02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] 04)------CoalesceBatchesExec: target_batch_size=8192 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true @@ -78,16 +78,16 @@ EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result GROUP BY a; ---- logical_plan -01)Projection: multiple_ordered_table.a, NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] AS result -02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[NTH_VALUE(multiple_ordered_table.c, Int64(101)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] AS NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]] +01)Projection: multiple_ordered_table.a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] AS result +02)--Aggregate: groupBy=[[multiple_ordered_table.a]], aggr=[[nth_value(multiple_ordered_table.c, Int64(101)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] AS nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]]] 03)----TableScan: multiple_ordered_table projection=[a, c] physical_plan -01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] -02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] +02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] 04)------CoalesceBatchesExec: target_batch_size=8192 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted +06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true