From 0372f41f7e3ee2cf96a62e5d871b8b079bacd0b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Tue, 20 Aug 2024 09:56:30 +0800 Subject: [PATCH 01/13] save --- .../src/aggregate.rs | 191 ++++++++++++++++++ .../src/window/sliding_aggregate.rs | 25 --- 2 files changed, 191 insertions(+), 25 deletions(-) diff --git a/datafusion/physical-expr-functions-aggregate/src/aggregate.rs b/datafusion/physical-expr-functions-aggregate/src/aggregate.rs index 8185f0fdd51f..16d7478c8807 100644 --- a/datafusion/physical-expr-functions-aggregate/src/aggregate.rs +++ b/datafusion/physical-expr-functions-aggregate/src/aggregate.rs @@ -216,6 +216,197 @@ impl AggregateFunctionExpr { pub fn is_reversed(&self) -> bool { self.is_reversed } + + pub fn state_fields(&self) -> Result> { + let args = StateFieldsArgs { + name: &self.name, + input_types: &self.input_types, + return_type: &self.data_type, + ordering_fields: &self.ordering_fields, + is_distinct: self.is_distinct, + }; + + self.fun.state_fields(args) + } + + // TODO remove Result + pub fn field(&self) -> Result { + Ok(Field::new(&self.name, self.data_type.clone(), true)) + } + + pub fn create_accumulator(&self) -> Result> { + let acc_args = AccumulatorArgs { + return_type: &self.data_type, + schema: &self.schema, + ignore_nulls: self.ignore_nulls, + ordering_req: &self.ordering_req, + is_distinct: self.is_distinct, + name: &self.name, + is_reversed: self.is_reversed, + exprs: &self.args, + }; + + self.fun.accumulator(acc_args) + } + + pub fn create_sliding_accumulator(&self) -> Result> { + let args = AccumulatorArgs { + return_type: &self.data_type, + schema: &self.schema, + ignore_nulls: self.ignore_nulls, + ordering_req: &self.ordering_req, + is_distinct: self.is_distinct, + name: &self.name, + is_reversed: self.is_reversed, + exprs: &self.args, + }; + + let accumulator = self.fun.create_sliding_accumulator(args)?; + + // Accumulators that have window frame startings different + // than `UNBOUNDED PRECEDING`, such as `1 PRECEDING`, need to + // implement retract_batch method in order to run correctly + // currently in DataFusion. + // + // If this `retract_batches` is not present, there is no way + // to calculate result correctly. For example, the query + // + // ```sql + // SELECT + // SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS sum_a + // FROM + // t + // ``` + // + // 1. First sum value will be the sum of rows between `[0, 1)`, + // + // 2. Second sum value will be the sum of rows between `[0, 2)` + // + // 3. Third sum value will be the sum of rows between `[1, 3)`, etc. + // + // Since the accumulator keeps the running sum: + // + // 1. First sum we add to the state sum value between `[0, 1)` + // + // 2. Second sum we add to the state sum value between `[1, 2)` + // (`[0, 1)` is already in the state sum, hence running sum will + // cover `[0, 2)` range) + // + // 3. Third sum we add to the state sum value between `[2, 3)` + // (`[0, 2)` is already in the state sum). Also we need to + // retract values between `[0, 1)` by this way we can obtain sum + // between [1, 3) which is indeed the appropriate range. + // + // When we use `UNBOUNDED PRECEDING` in the query starting + // index will always be 0 for the desired range, and hence the + // `retract_batch` method will not be called. In this case + // having retract_batch is not a requirement. + // + // This approach is a a bit different than window function + // approach. In window function (when they use a window frame) + // they get all the desired range during evaluation. + if !accumulator.supports_retract_batch() { + return not_impl_err!( + "Aggregate can not be used as a sliding accumulator because \ + `retract_batch` is not implemented: {}", + self.name + ); + } + Ok(accumulator) + } + + pub fn groups_accumulator_supported(&self) -> bool { + let args = AccumulatorArgs { + return_type: &self.data_type, + schema: &self.schema, + ignore_nulls: self.ignore_nulls, + ordering_req: &self.ordering_req, + is_distinct: self.is_distinct, + name: &self.name, + is_reversed: self.is_reversed, + exprs: &self.args, + }; + self.fun.groups_accumulator_supported(args) + } + + pub fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { + if self.ordering_req.is_empty() { + return None; + } + + if !self.order_sensitivity().is_insensitive() { + return Some(&self.ordering_req); + } + + None + } + + pub fn order_sensitivity(&self) -> AggregateOrderSensitivity { + if !self.ordering_req.is_empty() { + // If there is requirement, use the sensitivity of the implementation + self.fun.order_sensitivity() + } else { + // If no requirement, aggregator is order insensitive + AggregateOrderSensitivity::Insensitive + } + } + + pub fn with_beneficial_ordering( + self: Arc, + beneficial_ordering: bool, + ) -> Result>> { + let Some(updated_fn) = self + .fun + .clone() + .with_beneficial_ordering(beneficial_ordering)? + else { + return Ok(None); + }; + + AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec()) + .order_by(self.ordering_req.to_vec()) + .schema(Arc::new(self.schema.clone())) + .alias(self.name().to_string()) + .with_ignore_nulls(self.ignore_nulls) + .with_distinct(self.is_distinct) + .with_reversed(self.is_reversed) + .build() + .map(Some) + } + + pub fn reverse_expr(&self) -> Option> { + match self.fun.reverse_udf() { + ReversedUDAF::NotSupported => None, + ReversedUDAF::Identical => Some(Arc::new(self.clone())), + ReversedUDAF::Reversed(reverse_udf) => { + let reverse_ordering_req = reverse_order_bys(&self.ordering_req); + let mut name = self.name().to_string(); + // If the function is changed, we need to reverse order_by clause as well + // i.e. First(a order by b asc null first) -> Last(a order by b desc null last) + if self.fun().name() == reverse_udf.name() { + } else { + replace_order_by_clause(&mut name); + } + replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name()); + + AggregateExprBuilder::new(reverse_udf, self.args.to_vec()) + .order_by(reverse_ordering_req.to_vec()) + .schema(Arc::new(self.schema.clone())) + .alias(name) + .with_ignore_nulls(self.ignore_nulls) + .with_distinct(self.is_distinct) + .with_reversed(!self.is_reversed) + .build() + .ok() + } + } + } + + pub fn get_minmax_desc(&self) -> Option<(Field, bool)> { + self.fun + .is_descending() + .and_then(|flag| self.field().ok().map(|f| (f, flag))) + } } impl AggregateExpr for AggregateFunctionExpr { diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 50e9632b2196..1494129cf897 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -141,31 +141,6 @@ impl WindowExpr for SlidingAggregateWindowExpr { fn uses_bounded_memory(&self) -> bool { !self.window_frame.end_bound.is_unbounded() } - - fn with_new_expressions( - &self, - args: Vec>, - partition_bys: Vec>, - order_by_exprs: Vec>, - ) -> Option> { - debug_assert_eq!(self.order_by.len(), order_by_exprs.len()); - - let new_order_by = self - .order_by - .iter() - .zip(order_by_exprs) - .map(|(req, new_expr)| PhysicalSortExpr { - expr: new_expr, - options: req.options, - }) - .collect::>(); - Some(Arc::new(SlidingAggregateWindowExpr { - aggregate: self.aggregate.with_new_expressions(args, vec![])?, - partition_by: partition_bys, - order_by: new_order_by, - window_frame: Arc::clone(&self.window_frame), - })) - } } impl AggregateWindowExpr for SlidingAggregateWindowExpr { From 7a7c78359a60c3a6e1746bf3edb71b1d8814aca6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Wed, 21 Aug 2024 16:24:34 +0800 Subject: [PATCH 02/13] Move AggregateFunctionExpr into physical-expr crate --- datafusion/core/src/lib.rs | 2 +- .../combine_partial_final_agg.rs | 2 +- datafusion/core/src/physical_planner.rs | 2 +- datafusion/core/src/test_util/mod.rs | 2 +- .../src/lib.rs | 2 -- .../src/aggregate.rs | 35 +++++++++++++++---- datafusion/physical-expr/src/lib.rs | 22 +----------- .../physical-plan/src/aggregates/mod.rs | 2 +- datafusion/physical-plan/src/lib.rs | 2 +- datafusion/physical-plan/src/windows/mod.rs | 2 +- 10 files changed, 37 insertions(+), 36 deletions(-) rename datafusion/{physical-expr-functions-aggregate => physical-expr}/src/aggregate.rs (95%) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index daeb21db9d05..3f670fde7e47 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -558,7 +558,7 @@ pub mod physical_expr_common { /// re-export of [`datafusion_physical_expr_functions_aggregate`] crate pub mod physical_expr_functions_aggregate { - pub use datafusion_physical_expr_functions_aggregate::*; + pub use datafusion_physical_expr::*; } /// re-export of [`datafusion_physical_expr`] crate diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index f65a4c837a60..3fcc9346470b 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -176,8 +176,8 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::sum::sum_udaf; + use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; /// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected macro_rules! assert_optimized { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6536f9a01439..19cfd1af094b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -82,9 +82,9 @@ use datafusion_expr::{ DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, }; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::LexOrdering; -use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_sql::utils::window_expr_common_partition_keys; diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index ca8376fdec0a..93e0ac0d7abf 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -54,7 +54,7 @@ use datafusion_physical_expr::{ use async_trait::async_trait; use datafusion_catalog::Session; -use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; use futures::Stream; use tempfile::TempDir; // backwards compatibility diff --git a/datafusion/physical-expr-functions-aggregate/src/lib.rs b/datafusion/physical-expr-functions-aggregate/src/lib.rs index 2ff7ff5777ec..509d3a4290a8 100644 --- a/datafusion/physical-expr-functions-aggregate/src/lib.rs +++ b/datafusion/physical-expr-functions-aggregate/src/lib.rs @@ -16,5 +16,3 @@ // under the License. //! Technically, all aggregate functions that depend on `expr` crate should be included here. - -pub mod aggregate; diff --git a/datafusion/physical-expr-functions-aggregate/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs similarity index 95% rename from datafusion/physical-expr-functions-aggregate/src/aggregate.rs rename to datafusion/physical-expr/src/aggregate.rs index 6ba8f9bc3fe6..6989025a7357 100644 --- a/datafusion/physical-expr-functions-aggregate/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -15,6 +15,26 @@ // specific language governing permissions and limitations // under the License. +pub(crate) mod groups_accumulator { + #[allow(unused_imports)] + pub(crate) mod accumulate { + pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState; + } + pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{ + accumulate::NullState, GroupsAccumulatorAdapter, + }; +} +pub(crate) mod stats { + pub use datafusion_functions_aggregate_common::stats::StatsType; +} +pub mod utils { + pub use datafusion_functions_aggregate_common::utils::{ + adjust_output_array, down_cast_any_ref, get_accum_scalar_values_as_arrays, + get_sort_options, ordering_fields, DecimalAverager, Hashable, + }; +} +pub use datafusion_functions_aggregate_common::aggregate::AggregateExpr; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::ScalarValue; use datafusion_common::{internal_err, not_impl_err, Result}; @@ -26,9 +46,8 @@ use datafusion_expr_common::groups_accumulator::GroupsAccumulator; use datafusion_expr_common::type_coercion::aggregates::check_arg_count; use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs; use datafusion_functions_aggregate_common::accumulator::StateFieldsArgs; -use datafusion_functions_aggregate_common::aggregate::AggregateExpr; use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; -use datafusion_functions_aggregate_common::utils::{self, down_cast_any_ref}; +use datafusion_functions_aggregate_common::utils::down_cast_any_ref; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_expr_common::utils::reverse_order_bys; @@ -95,7 +114,11 @@ impl AggregateExprBuilder { .map(|e| e.expr.data_type(&schema)) .collect::>>()?; - ordering_fields = utils::ordering_fields(&ordering_req, &ordering_types); + ordering_fields = + datafusion_functions_aggregate_common::utils::ordering_fields( + &ordering_req, + &ordering_types, + ); } let input_exprs_types = args @@ -363,9 +386,9 @@ impl AggregateFunctionExpr { .fun .clone() .with_beneficial_ordering(beneficial_ordering)? - else { - return Ok(None); - }; + else { + return Ok(None); + }; AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec()) .order_by(self.ordering_req.to_vec()) diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index c4255172d680..a400a617031e 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -19,27 +19,7 @@ #![deny(clippy::clone_on_ref_ptr)] // Backward compatibility -pub mod aggregate { - pub(crate) mod groups_accumulator { - #[allow(unused_imports)] - pub(crate) mod accumulate { - pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState; - } - pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{ - accumulate::NullState, GroupsAccumulatorAdapter, - }; - } - pub(crate) mod stats { - pub use datafusion_functions_aggregate_common::stats::StatsType; - } - pub mod utils { - pub use datafusion_functions_aggregate_common::utils::{ - adjust_output_array, down_cast_any_ref, get_accum_scalar_values_as_arrays, - get_sort_options, ordering_fields, DecimalAverager, Hashable, - }; - } - pub use datafusion_functions_aggregate_common::aggregate::AggregateExpr; -} +pub mod aggregate; pub mod analysis; pub mod binary_map { pub use datafusion_physical_expr_common::binary_map::{ArrowBytesSet, OutputType}; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 89d4c452cca6..a1d91b9d3296 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1218,8 +1218,8 @@ mod tests { use datafusion_physical_expr::PhysicalSortExpr; use crate::common::collect; + use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::Literal; - use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; use futures::{FutureExt, Stream}; // Generate a schema which consists of 5 columns (a, b, c, d, e) diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 59c5da6b6fb2..0196999b4930 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -82,7 +82,7 @@ pub mod windows; pub mod work_table; pub mod udaf { - pub use datafusion_physical_expr_functions_aggregate::aggregate::AggregateFunctionExpr; + pub use datafusion_physical_expr::aggregate::AggregateFunctionExpr; } #[cfg(test)] diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index f938f4410a99..c721c0caebaf 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -35,6 +35,7 @@ use datafusion_expr::{ BuiltInWindowFunction, PartitionEvaluator, WindowFrame, WindowFunctionDefinition, WindowUDF, }; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ reverse_order_bys, @@ -42,7 +43,6 @@ use datafusion_physical_expr::{ AggregateExpr, ConstExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement, }; -use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; use itertools::Itertools; mod bounded_window_agg_exec; From 4b86152665d1b56ac4f771c874b6029c8d23fba3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Wed, 21 Aug 2024 16:36:39 +0800 Subject: [PATCH 03/13] Move AggregateExpr trait into physical-expr crate --- .../src/aggregate.rs | 166 +--------------- .../functions-aggregate-common/src/utils.rs | 21 +- datafusion/physical-expr/src/aggregate.rs | 179 +++++++++++++++++- datafusion/physical-expr/src/lib.rs | 4 +- 4 files changed, 178 insertions(+), 192 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs b/datafusion/functions-aggregate-common/src/aggregate.rs index 698d1350cb61..17b00e4ed499 100644 --- a/datafusion/functions-aggregate-common/src/aggregate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate.rs @@ -18,169 +18,5 @@ //! [`AggregateExpr`] which defines the interface all aggregate expressions //! (built-in and custom) need to satisfy. -use crate::order::AggregateOrderSensitivity; -use arrow::datatypes::{DataType, Field}; -use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; -use datafusion_expr_common::accumulator::Accumulator; -use datafusion_expr_common::groups_accumulator::GroupsAccumulator; -use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; -use std::fmt::Debug; -use std::{any::Any, sync::Arc}; - pub mod count_distinct; -pub mod groups_accumulator; - -/// An aggregate expression that: -/// * knows its resulting field -/// * knows how to create its accumulator -/// * knows its accumulator's state's field -/// * knows the expressions from whose its accumulator will receive values -/// -/// Any implementation of this trait also needs to implement the -/// `PartialEq` to allows comparing equality between the -/// trait objects. -pub trait AggregateExpr: Send + Sync + Debug + PartialEq { - /// Returns the aggregate expression as [`Any`] so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// the field of the final result of this aggregation. - fn field(&self) -> Result; - - /// the accumulator used to accumulate values from the expressions. - /// the accumulator expects the same number of arguments as `expressions` and must - /// return states with the same description as `state_fields` - fn create_accumulator(&self) -> Result>; - - /// the fields that encapsulate the Accumulator's state - /// the number of fields here equals the number of states that the accumulator contains - fn state_fields(&self) -> Result>; - - /// expressions that are passed to the Accumulator. - /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. - fn expressions(&self) -> Vec>; - - /// Order by requirements for the aggregate function - /// By default it is `None` (there is no requirement) - /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this - fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - /// Indicates whether aggregator can produce the correct result with any - /// arbitrary input ordering. By default, we assume that aggregate expressions - /// are order insensitive. - fn order_sensitivity(&self) -> AggregateOrderSensitivity { - AggregateOrderSensitivity::Insensitive - } - - /// Sets the indicator whether ordering requirements of the aggregator is - /// satisfied by its input. If this is not the case, aggregators with order - /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce - /// the correct result with possibly more work internally. - /// - /// # Returns - /// - /// Returns `Ok(Some(updated_expr))` if the process completes successfully. - /// If the expression can benefit from existing input ordering, but does - /// not implement the method, returns an error. Order insensitive and hard - /// requirement aggregators return `Ok(None)`. - fn with_beneficial_ordering( - self: Arc, - _requirement_satisfied: bool, - ) -> Result>> { - if self.order_bys().is_some() && self.order_sensitivity().is_beneficial() { - return exec_err!( - "Should implement with satisfied for aggregator :{:?}", - self.name() - ); - } - Ok(None) - } - - /// Human readable name such as `"MIN(c2)"`. The default - /// implementation returns placeholder text. - fn name(&self) -> &str { - "AggregateExpr: default name" - } - - /// If the aggregate expression has a specialized - /// [`GroupsAccumulator`] implementation. If this returns true, - /// `[Self::create_groups_accumulator`] will be called. - fn groups_accumulator_supported(&self) -> bool { - false - } - - /// Return a specialized [`GroupsAccumulator`] that manages state - /// for all groups. - /// - /// For maximum performance, a [`GroupsAccumulator`] should be - /// implemented in addition to [`Accumulator`]. - fn create_groups_accumulator(&self) -> Result> { - not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} yet") - } - - /// Construct an expression that calculates the aggregate in reverse. - /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). - /// For aggregates that do not support calculation in reverse, - /// returns None (which is the default value). - fn reverse_expr(&self) -> Option> { - None - } - - /// Creates accumulator implementation that supports retract - fn create_sliding_accumulator(&self) -> Result> { - not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet") - } - - /// Returns all expressions used in the [`AggregateExpr`]. - /// These expressions are (1)function arguments, (2) order by expressions. - fn all_expressions(&self) -> AggregatePhysicalExpressions { - let args = self.expressions(); - let order_bys = self.order_bys().unwrap_or(&[]); - let order_by_exprs = order_bys - .iter() - .map(|sort_expr| Arc::clone(&sort_expr.expr)) - .collect::>(); - AggregatePhysicalExpressions { - args, - order_by_exprs, - } - } - - /// Rewrites [`AggregateExpr`], with new expressions given. The argument should be consistent - /// with the return value of the [`AggregateExpr::all_expressions`] method. - /// Returns `Some(Arc)` if re-write is supported, otherwise returns `None`. - fn with_new_expressions( - &self, - _args: Vec>, - _order_by_exprs: Vec>, - ) -> Option> { - None - } - - /// If this function is max, return (output_field, true) - /// if the function is min, return (output_field, false) - /// otherwise return None (the default) - /// - /// output_field is the name of the column produced by this aggregate - /// - /// Note: this is used to use special aggregate implementations in certain conditions - fn get_minmax_desc(&self) -> Option<(Field, bool)> { - None - } - - /// Returns default value of the function given the input is Null - /// Most of the aggregate function return Null if input is Null, - /// while `count` returns 0 if input is Null - fn default_value(&self, data_type: &DataType) -> Result; -} - -/// Stores the physical expressions used inside the `AggregateExpr`. -pub struct AggregatePhysicalExpressions { - /// Aggregate function arguments - pub args: Vec>, - /// Order by expressions - pub order_by_exprs: Vec>, -} +pub mod groups_accumulator; \ No newline at end of file diff --git a/datafusion/functions-aggregate-common/src/utils.rs b/datafusion/functions-aggregate-common/src/utils.rs index 7b8ce0397af8..0c7f76185469 100644 --- a/datafusion/functions-aggregate-common/src/utils.rs +++ b/datafusion/functions-aggregate-common/src/utils.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, sync::Arc}; +use std::{sync::Arc}; use arrow::array::{ArrayRef, AsArray}; use arrow::datatypes::ArrowNativeType; @@ -32,25 +32,6 @@ use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_expr_common::accumulator::Accumulator; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; -use crate::aggregate::AggregateExpr; - -/// Downcast a `Box` or `Arc` -/// and return the inner trait object as [`Any`] so -/// that it can be downcast to a specific implementation. -/// -/// This method is used when implementing the `PartialEq` -/// for [`AggregateExpr`] aggregation expressions and allows comparing the equality -/// between the trait objects. -pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { - if let Some(obj) = any.downcast_ref::>() { - obj.as_any() - } else if let Some(obj) = any.downcast_ref::>() { - obj.as_any() - } else { - any - } -} - /// Convert scalar values from an accumulator into arrays. pub fn get_accum_scalar_values_as_arrays( accum: &mut dyn Accumulator, diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 6989025a7357..4c4c9266f4ce 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -29,14 +29,14 @@ pub(crate) mod stats { } pub mod utils { pub use datafusion_functions_aggregate_common::utils::{ - adjust_output_array, down_cast_any_ref, get_accum_scalar_values_as_arrays, + adjust_output_array, get_accum_scalar_values_as_arrays, get_sort_options, ordering_fields, DecimalAverager, Hashable, }; + pub use super::down_cast_any_ref; } -pub use datafusion_functions_aggregate_common::aggregate::AggregateExpr; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::ScalarValue; +use datafusion_common::{exec_err, ScalarValue}; use datafusion_common::{internal_err, not_impl_err, Result}; use datafusion_expr::expr::create_function_physical_name; use datafusion_expr::AggregateUDF; @@ -47,7 +47,6 @@ use datafusion_expr_common::type_coercion::aggregates::check_arg_count; use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs; use datafusion_functions_aggregate_common::accumulator::StateFieldsArgs; use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; -use datafusion_functions_aggregate_common::utils::down_cast_any_ref; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_expr_common::utils::reverse_order_bys; @@ -55,6 +54,161 @@ use datafusion_physical_expr_common::utils::reverse_order_bys; use std::fmt::Debug; use std::{any::Any, sync::Arc}; +/// An aggregate expression that: +/// * knows its resulting field +/// * knows how to create its accumulator +/// * knows its accumulator's state's field +/// * knows the expressions from whose its accumulator will receive values +/// +/// Any implementation of this trait also needs to implement the +/// `PartialEq` to allows comparing equality between the +/// trait objects. +pub trait AggregateExpr: Send + Sync + Debug + PartialEq { + /// Returns the aggregate expression as [`Any`] so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// the field of the final result of this aggregation. + fn field(&self) -> Result; + + /// the accumulator used to accumulate values from the expressions. + /// the accumulator expects the same number of arguments as `expressions` and must + /// return states with the same description as `state_fields` + fn create_accumulator(&self) -> Result>; + + /// the fields that encapsulate the Accumulator's state + /// the number of fields here equals the number of states that the accumulator contains + fn state_fields(&self) -> Result>; + + /// expressions that are passed to the Accumulator. + /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. + fn expressions(&self) -> Vec>; + + /// Order by requirements for the aggregate function + /// By default it is `None` (there is no requirement) + /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this + fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + /// Indicates whether aggregator can produce the correct result with any + /// arbitrary input ordering. By default, we assume that aggregate expressions + /// are order insensitive. + fn order_sensitivity(&self) -> AggregateOrderSensitivity { + AggregateOrderSensitivity::Insensitive + } + + /// Sets the indicator whether ordering requirements of the aggregator is + /// satisfied by its input. If this is not the case, aggregators with order + /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce + /// the correct result with possibly more work internally. + /// + /// # Returns + /// + /// Returns `Ok(Some(updated_expr))` if the process completes successfully. + /// If the expression can benefit from existing input ordering, but does + /// not implement the method, returns an error. Order insensitive and hard + /// requirement aggregators return `Ok(None)`. + fn with_beneficial_ordering( + self: Arc, + _requirement_satisfied: bool, + ) -> Result>> { + if self.order_bys().is_some() && self.order_sensitivity().is_beneficial() { + return exec_err!( + "Should implement with satisfied for aggregator :{:?}", + self.name() + ); + } + Ok(None) + } + + /// Human readable name such as `"MIN(c2)"`. The default + /// implementation returns placeholder text. + fn name(&self) -> &str { + "AggregateExpr: default name" + } + + /// If the aggregate expression has a specialized + /// [`GroupsAccumulator`] implementation. If this returns true, + /// `[Self::create_groups_accumulator`] will be called. + fn groups_accumulator_supported(&self) -> bool { + false + } + + /// Return a specialized [`GroupsAccumulator`] that manages state + /// for all groups. + /// + /// For maximum performance, a [`GroupsAccumulator`] should be + /// implemented in addition to [`Accumulator`]. + fn create_groups_accumulator(&self) -> Result> { + not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} yet") + } + + /// Construct an expression that calculates the aggregate in reverse. + /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). + /// For aggregates that do not support calculation in reverse, + /// returns None (which is the default value). + fn reverse_expr(&self) -> Option> { + None + } + + /// Creates accumulator implementation that supports retract + fn create_sliding_accumulator(&self) -> Result> { + not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet") + } + + /// Returns all expressions used in the [`datafusion_functions_aggregate_common::aggregate::AggregateExpr`]. + /// These expressions are (1)function arguments, (2) order by expressions. + fn all_expressions(&self) -> AggregatePhysicalExpressions { + let args = self.expressions(); + let order_bys = self.order_bys().unwrap_or(&[]); + let order_by_exprs = order_bys + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + AggregatePhysicalExpressions { + args, + order_by_exprs, + } + } + + /// Rewrites [`datafusion_functions_aggregate_common::aggregate::AggregateExpr`], with new expressions given. The argument should be consistent + /// with the return value of the [`datafusion_functions_aggregate_common::aggregate::AggregateExpr::all_expressions`] method. + /// Returns `Some(Arc)` if re-write is supported, otherwise returns `None`. + fn with_new_expressions( + &self, + _args: Vec>, + _order_by_exprs: Vec>, + ) -> Option> { + None + } + + /// If this function is max, return (output_field, true) + /// if the function is min, return (output_field, false) + /// otherwise return None (the default) + /// + /// output_field is the name of the column produced by this aggregate + /// + /// Note: this is used to use special aggregate implementations in certain conditions + fn get_minmax_desc(&self) -> Option<(Field, bool)> { + None + } + + /// Returns default value of the function given the input is Null + /// Most of the aggregate function return Null if input is Null, + /// while `count` returns 0 if input is Null + fn default_value(&self, data_type: &DataType) -> Result; +} + +/// Stores the physical expressions used inside the `AggregateExpr`. +pub struct AggregatePhysicalExpressions { + /// Aggregate function arguments + pub args: Vec>, + /// Order by expressions + pub order_by_exprs: Vec>, +} + + /// Builder for physical [`AggregateExpr`] /// /// `AggregateExpr` contains the information necessary to call @@ -667,6 +821,23 @@ impl AggregateExpr for AggregateFunctionExpr { } } +/// Downcast a `Box` or `Arc` +/// and return the inner trait object as [`Any`] so +/// that it can be downcast to a specific implementation. +/// +/// This method is used when implementing the `PartialEq` +/// for [`AggregateExpr`] aggregation expressions and allows comparing the equality +/// between the trait objects. +pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { + if let Some(obj) = any.downcast_ref::>() { + obj.as_any() + } else if let Some(obj) = any.downcast_ref::>() { + obj.as_any() + } else { + any + } +} + impl PartialEq for AggregateFunctionExpr { fn eq(&self, other: &dyn Any) -> bool { down_cast_any_ref(other) diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index a400a617031e..e5a0db1d55ac 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -45,11 +45,9 @@ pub mod execution_props { pub use datafusion_expr::var_provider::{VarProvider, VarType}; } +pub use aggregate::AggregateExpr; pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; -pub use datafusion_functions_aggregate_common::aggregate::{ - AggregateExpr, AggregatePhysicalExpressions, -}; pub use equivalence::{calculate_union, ConstExpr, EquivalenceProperties}; pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ From 03f3ec8c26624df83a01725ea803bef2fd43f797 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Wed, 21 Aug 2024 17:24:32 +0800 Subject: [PATCH 04/13] Remove AggregateExpr trait --- .../combine_partial_final_agg.rs | 11 +- .../physical_optimizer/update_aggr_exprs.rs | 7 +- datafusion/core/src/physical_planner.rs | 8 +- datafusion/core/src/test_util/mod.rs | 8 +- .../src/aggregate.rs | 2 +- .../functions-aggregate-common/src/utils.rs | 2 +- datafusion/physical-expr/src/aggregate.rs | 456 ++---------------- datafusion/physical-expr/src/lib.rs | 1 - .../physical-expr/src/window/aggregate.rs | 11 +- .../src/window/sliding_aggregate.rs | 11 +- .../src/aggregate_statistics.rs | 32 +- .../physical-plan/src/aggregates/mod.rs | 108 ++--- .../physical-plan/src/aggregates/row_hash.rs | 7 +- .../physical-plan/src/execution_plan.rs | 2 +- datafusion/physical-plan/src/lib.rs | 2 +- datafusion/physical-plan/src/windows/mod.rs | 7 +- datafusion/proto/src/physical_plan/mod.rs | 5 +- .../proto/src/physical_plan/to_proto.rs | 72 ++- .../tests/cases/roundtrip_physical_plan.rs | 18 +- 19 files changed, 185 insertions(+), 585 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 3fcc9346470b..3ac0ae295017 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -26,7 +26,8 @@ use crate::physical_plan::ExecutionPlan; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_physical_expr::{physical_exprs_equal, AggregateExpr, PhysicalExpr}; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; +use datafusion_physical_expr::{physical_exprs_equal, PhysicalExpr}; use datafusion_physical_optimizer::PhysicalOptimizerRule; /// CombinePartialFinalAggregate optimizer rule combines the adjacent Partial and Final AggregateExecs @@ -127,7 +128,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { type GroupExprsRef<'a> = ( &'a PhysicalGroupBy, - &'a [Arc], + &'a [Arc], &'a [Option>], ); @@ -229,7 +230,7 @@ mod tests { fn partial_aggregate_exec( input: Arc, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + aggr_expr: Vec>, ) -> Arc { let schema = input.schema(); let n_aggr = aggr_expr.len(); @@ -249,7 +250,7 @@ mod tests { fn final_aggregate_exec( input: Arc, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + aggr_expr: Vec>, ) -> Arc { let schema = input.schema(); let n_aggr = aggr_expr.len(); @@ -277,7 +278,7 @@ mod tests { expr: Arc, name: &str, schema: &Schema, - ) -> Arc { + ) -> Arc { AggregateExprBuilder::new(count_udaf(), vec![expr]) .schema(Arc::new(schema.clone())) .alias(name) diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs index f8edf73e3d2a..08609a03b515 100644 --- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs +++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs @@ -23,8 +23,9 @@ use std::sync::Arc; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{plan_datafusion_err, Result}; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ - reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalSortRequirement, + reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement, }; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::concat_slices; @@ -130,10 +131,10 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { /// successfully. Any errors occurring during the conversion process are /// passed through. fn try_convert_aggregate_if_better( - aggr_exprs: Vec>, + aggr_exprs: Vec>, prefix_requirement: &[PhysicalSortRequirement], eq_properties: &EquivalenceProperties, -) -> Result>> { +) -> Result>> { aggr_exprs .into_iter() .map(|aggr_expr| { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 19cfd1af094b..38820f4b1170 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -58,8 +58,8 @@ use crate::physical_plan::unnest::UnnestExec; use crate::physical_plan::values::ValuesExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{ - displayable, windows, AggregateExpr, ExecutionPlan, ExecutionPlanProperties, - InputOrderMode, Partitioning, PhysicalExpr, WindowExpr, + displayable, windows, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, + Partitioning, PhysicalExpr, WindowExpr, }; use arrow::compute::SortOptions; @@ -82,7 +82,7 @@ use datafusion_expr::{ DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, }; -use datafusion_physical_expr::aggregate::AggregateExprBuilder; +use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::LexOrdering; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; @@ -1542,7 +1542,7 @@ pub fn create_window_expr( } type AggregateExprWithOptionalArgs = ( - Arc, + Arc, // The filter clause, if any Option>, // Ordering requirements, if any diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 93e0ac0d7abf..faa9378535fd 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -48,13 +48,11 @@ use datafusion_common::TableReference; use datafusion_expr::utils::COUNT_STAR_EXPANSION; use datafusion_expr::{CreateExternalTable, Expr, TableType}; use datafusion_functions_aggregate::count::count_udaf; -use datafusion_physical_expr::{ - expressions, AggregateExpr, EquivalenceProperties, PhysicalExpr, -}; +use datafusion_physical_expr::{expressions, EquivalenceProperties, PhysicalExpr}; use async_trait::async_trait; use datafusion_catalog::Session; -use datafusion_physical_expr::aggregate::AggregateExprBuilder; +use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use futures::Stream; use tempfile::TempDir; // backwards compatibility @@ -429,7 +427,7 @@ impl TestAggregate { } /// Return appropriate expr depending if COUNT is for col or table (*) - pub fn count_expr(&self, schema: &Schema) -> Arc { + pub fn count_expr(&self, schema: &Schema) -> Arc { AggregateExprBuilder::new(count_udaf(), vec![self.column()]) .schema(Arc::new(schema.clone())) .alias(self.column_name()) diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs b/datafusion/functions-aggregate-common/src/aggregate.rs index 17b00e4ed499..1a729b148e2e 100644 --- a/datafusion/functions-aggregate-common/src/aggregate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate.rs @@ -19,4 +19,4 @@ //! (built-in and custom) need to satisfy. pub mod count_distinct; -pub mod groups_accumulator; \ No newline at end of file +pub mod groups_accumulator; diff --git a/datafusion/functions-aggregate-common/src/utils.rs b/datafusion/functions-aggregate-common/src/utils.rs index 0c7f76185469..4fba772d8ddc 100644 --- a/datafusion/functions-aggregate-common/src/utils.rs +++ b/datafusion/functions-aggregate-common/src/utils.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{sync::Arc}; +use std::sync::Arc; use arrow::array::{ArrayRef, AsArray}; use arrow::datatypes::ArrowNativeType; diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 4c4c9266f4ce..62ec0fab6640 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -29,20 +29,18 @@ pub(crate) mod stats { } pub mod utils { pub use datafusion_functions_aggregate_common::utils::{ - adjust_output_array, get_accum_scalar_values_as_arrays, - get_sort_options, ordering_fields, DecimalAverager, Hashable, + adjust_output_array, get_accum_scalar_values_as_arrays, get_sort_options, + ordering_fields, DecimalAverager, Hashable, }; - pub use super::down_cast_any_ref; } use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::{exec_err, ScalarValue}; +use datafusion_common::ScalarValue; use datafusion_common::{internal_err, not_impl_err, Result}; use datafusion_expr::expr::create_function_physical_name; use datafusion_expr::AggregateUDF; use datafusion_expr::ReversedUDAF; use datafusion_expr_common::accumulator::Accumulator; -use datafusion_expr_common::groups_accumulator::GroupsAccumulator; use datafusion_expr_common::type_coercion::aggregates::check_arg_count; use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs; use datafusion_functions_aggregate_common::accumulator::StateFieldsArgs; @@ -51,167 +49,13 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_expr_common::utils::reverse_order_bys; +use datafusion_expr_common::groups_accumulator::GroupsAccumulator; use std::fmt::Debug; -use std::{any::Any, sync::Arc}; - -/// An aggregate expression that: -/// * knows its resulting field -/// * knows how to create its accumulator -/// * knows its accumulator's state's field -/// * knows the expressions from whose its accumulator will receive values -/// -/// Any implementation of this trait also needs to implement the -/// `PartialEq` to allows comparing equality between the -/// trait objects. -pub trait AggregateExpr: Send + Sync + Debug + PartialEq { - /// Returns the aggregate expression as [`Any`] so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// the field of the final result of this aggregation. - fn field(&self) -> Result; - - /// the accumulator used to accumulate values from the expressions. - /// the accumulator expects the same number of arguments as `expressions` and must - /// return states with the same description as `state_fields` - fn create_accumulator(&self) -> Result>; - - /// the fields that encapsulate the Accumulator's state - /// the number of fields here equals the number of states that the accumulator contains - fn state_fields(&self) -> Result>; - - /// expressions that are passed to the Accumulator. - /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. - fn expressions(&self) -> Vec>; - - /// Order by requirements for the aggregate function - /// By default it is `None` (there is no requirement) - /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this - fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - /// Indicates whether aggregator can produce the correct result with any - /// arbitrary input ordering. By default, we assume that aggregate expressions - /// are order insensitive. - fn order_sensitivity(&self) -> AggregateOrderSensitivity { - AggregateOrderSensitivity::Insensitive - } - - /// Sets the indicator whether ordering requirements of the aggregator is - /// satisfied by its input. If this is not the case, aggregators with order - /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce - /// the correct result with possibly more work internally. - /// - /// # Returns - /// - /// Returns `Ok(Some(updated_expr))` if the process completes successfully. - /// If the expression can benefit from existing input ordering, but does - /// not implement the method, returns an error. Order insensitive and hard - /// requirement aggregators return `Ok(None)`. - fn with_beneficial_ordering( - self: Arc, - _requirement_satisfied: bool, - ) -> Result>> { - if self.order_bys().is_some() && self.order_sensitivity().is_beneficial() { - return exec_err!( - "Should implement with satisfied for aggregator :{:?}", - self.name() - ); - } - Ok(None) - } - - /// Human readable name such as `"MIN(c2)"`. The default - /// implementation returns placeholder text. - fn name(&self) -> &str { - "AggregateExpr: default name" - } - - /// If the aggregate expression has a specialized - /// [`GroupsAccumulator`] implementation. If this returns true, - /// `[Self::create_groups_accumulator`] will be called. - fn groups_accumulator_supported(&self) -> bool { - false - } - - /// Return a specialized [`GroupsAccumulator`] that manages state - /// for all groups. - /// - /// For maximum performance, a [`GroupsAccumulator`] should be - /// implemented in addition to [`Accumulator`]. - fn create_groups_accumulator(&self) -> Result> { - not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} yet") - } - - /// Construct an expression that calculates the aggregate in reverse. - /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). - /// For aggregates that do not support calculation in reverse, - /// returns None (which is the default value). - fn reverse_expr(&self) -> Option> { - None - } - - /// Creates accumulator implementation that supports retract - fn create_sliding_accumulator(&self) -> Result> { - not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet") - } - - /// Returns all expressions used in the [`datafusion_functions_aggregate_common::aggregate::AggregateExpr`]. - /// These expressions are (1)function arguments, (2) order by expressions. - fn all_expressions(&self) -> AggregatePhysicalExpressions { - let args = self.expressions(); - let order_bys = self.order_bys().unwrap_or(&[]); - let order_by_exprs = order_bys - .iter() - .map(|sort_expr| Arc::clone(&sort_expr.expr)) - .collect::>(); - AggregatePhysicalExpressions { - args, - order_by_exprs, - } - } - - /// Rewrites [`datafusion_functions_aggregate_common::aggregate::AggregateExpr`], with new expressions given. The argument should be consistent - /// with the return value of the [`datafusion_functions_aggregate_common::aggregate::AggregateExpr::all_expressions`] method. - /// Returns `Some(Arc)` if re-write is supported, otherwise returns `None`. - fn with_new_expressions( - &self, - _args: Vec>, - _order_by_exprs: Vec>, - ) -> Option> { - None - } - - /// If this function is max, return (output_field, true) - /// if the function is min, return (output_field, false) - /// otherwise return None (the default) - /// - /// output_field is the name of the column produced by this aggregate - /// - /// Note: this is used to use special aggregate implementations in certain conditions - fn get_minmax_desc(&self) -> Option<(Field, bool)> { - None - } - - /// Returns default value of the function given the input is Null - /// Most of the aggregate function return Null if input is Null, - /// while `count` returns 0 if input is Null - fn default_value(&self, data_type: &DataType) -> Result; -} +use std::sync::Arc; -/// Stores the physical expressions used inside the `AggregateExpr`. -pub struct AggregatePhysicalExpressions { - /// Aggregate function arguments - pub args: Vec>, - /// Order by expressions - pub order_by_exprs: Vec>, -} - - -/// Builder for physical [`AggregateExpr`] +/// Builder for physical [`AggregateFunctionExpr`] /// -/// `AggregateExpr` contains the information necessary to call +/// `AggregateFunctionExpr` contains the information necessary to call /// an aggregate expression. #[derive(Debug, Clone)] pub struct AggregateExprBuilder { @@ -245,7 +89,7 @@ impl AggregateExprBuilder { } } - pub fn build(self) -> Result> { + pub fn build(self) -> Result> { let Self { fun, args, @@ -410,7 +254,6 @@ impl AggregateFunctionExpr { self.fun.state_fields(args) } - // TODO remove Result pub fn field(&self) -> Result { Ok(Field::new(&self.name, self.data_type.clone(), true)) } @@ -510,220 +353,7 @@ impl AggregateFunctionExpr { self.fun.groups_accumulator_supported(args) } - pub fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - if self.ordering_req.is_empty() { - return None; - } - - if !self.order_sensitivity().is_insensitive() { - return Some(&self.ordering_req); - } - - None - } - - pub fn order_sensitivity(&self) -> AggregateOrderSensitivity { - if !self.ordering_req.is_empty() { - // If there is requirement, use the sensitivity of the implementation - self.fun.order_sensitivity() - } else { - // If no requirement, aggregator is order insensitive - AggregateOrderSensitivity::Insensitive - } - } - - pub fn with_beneficial_ordering( - self: Arc, - beneficial_ordering: bool, - ) -> Result>> { - let Some(updated_fn) = self - .fun - .clone() - .with_beneficial_ordering(beneficial_ordering)? - else { - return Ok(None); - }; - - AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec()) - .order_by(self.ordering_req.to_vec()) - .schema(Arc::new(self.schema.clone())) - .alias(self.name().to_string()) - .with_ignore_nulls(self.ignore_nulls) - .with_distinct(self.is_distinct) - .with_reversed(self.is_reversed) - .build() - .map(Some) - } - - pub fn reverse_expr(&self) -> Option> { - match self.fun.reverse_udf() { - ReversedUDAF::NotSupported => None, - ReversedUDAF::Identical => Some(Arc::new(self.clone())), - ReversedUDAF::Reversed(reverse_udf) => { - let reverse_ordering_req = reverse_order_bys(&self.ordering_req); - let mut name = self.name().to_string(); - // If the function is changed, we need to reverse order_by clause as well - // i.e. First(a order by b asc null first) -> Last(a order by b desc null last) - if self.fun().name() == reverse_udf.name() { - } else { - replace_order_by_clause(&mut name); - } - replace_fn_name_clause(&mut name, self.fun.name(), reverse_udf.name()); - - AggregateExprBuilder::new(reverse_udf, self.args.to_vec()) - .order_by(reverse_ordering_req.to_vec()) - .schema(Arc::new(self.schema.clone())) - .alias(name) - .with_ignore_nulls(self.ignore_nulls) - .with_distinct(self.is_distinct) - .with_reversed(!self.is_reversed) - .build() - .ok() - } - } - } - - pub fn get_minmax_desc(&self) -> Option<(Field, bool)> { - self.fun - .is_descending() - .and_then(|flag| self.field().ok().map(|f| (f, flag))) - } - - pub fn is_nullable(&self) -> bool { - self.is_nullable - } -} - -impl AggregateExpr for AggregateFunctionExpr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn expressions(&self) -> Vec> { - self.args.clone() - } - - fn state_fields(&self) -> Result> { - let args = StateFieldsArgs { - name: &self.name, - input_types: &self.input_types, - return_type: &self.data_type, - ordering_fields: &self.ordering_fields, - is_distinct: self.is_distinct, - }; - - self.fun.state_fields(args) - } - - fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.is_nullable, - )) - } - - fn create_accumulator(&self) -> Result> { - let acc_args = AccumulatorArgs { - return_type: &self.data_type, - schema: &self.schema, - ignore_nulls: self.ignore_nulls, - ordering_req: &self.ordering_req, - is_distinct: self.is_distinct, - name: &self.name, - is_reversed: self.is_reversed, - exprs: &self.args, - }; - - self.fun.accumulator(acc_args) - } - - fn create_sliding_accumulator(&self) -> Result> { - let args = AccumulatorArgs { - return_type: &self.data_type, - schema: &self.schema, - ignore_nulls: self.ignore_nulls, - ordering_req: &self.ordering_req, - is_distinct: self.is_distinct, - name: &self.name, - is_reversed: self.is_reversed, - exprs: &self.args, - }; - - let accumulator = self.fun.create_sliding_accumulator(args)?; - - // Accumulators that have window frame startings different - // than `UNBOUNDED PRECEDING`, such as `1 PRECEDING`, need to - // implement retract_batch method in order to run correctly - // currently in DataFusion. - // - // If this `retract_batches` is not present, there is no way - // to calculate result correctly. For example, the query - // - // ```sql - // SELECT - // SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS sum_a - // FROM - // t - // ``` - // - // 1. First sum value will be the sum of rows between `[0, 1)`, - // - // 2. Second sum value will be the sum of rows between `[0, 2)` - // - // 3. Third sum value will be the sum of rows between `[1, 3)`, etc. - // - // Since the accumulator keeps the running sum: - // - // 1. First sum we add to the state sum value between `[0, 1)` - // - // 2. Second sum we add to the state sum value between `[1, 2)` - // (`[0, 1)` is already in the state sum, hence running sum will - // cover `[0, 2)` range) - // - // 3. Third sum we add to the state sum value between `[2, 3)` - // (`[0, 2)` is already in the state sum). Also we need to - // retract values between `[0, 1)` by this way we can obtain sum - // between [1, 3) which is indeed the appropriate range. - // - // When we use `UNBOUNDED PRECEDING` in the query starting - // index will always be 0 for the desired range, and hence the - // `retract_batch` method will not be called. In this case - // having retract_batch is not a requirement. - // - // This approach is a a bit different than window function - // approach. In window function (when they use a window frame) - // they get all the desired range during evaluation. - if !accumulator.supports_retract_batch() { - return not_impl_err!( - "Aggregate can not be used as a sliding accumulator because \ - `retract_batch` is not implemented: {}", - self.name - ); - } - Ok(accumulator) - } - - fn name(&self) -> &str { - &self.name - } - - fn groups_accumulator_supported(&self) -> bool { - let args = AccumulatorArgs { - return_type: &self.data_type, - schema: &self.schema, - ignore_nulls: self.ignore_nulls, - ordering_req: &self.ordering_req, - is_distinct: self.is_distinct, - name: &self.name, - is_reversed: self.is_reversed, - exprs: &self.args, - }; - self.fun.groups_accumulator_supported(args) - } - - fn create_groups_accumulator(&self) -> Result> { + pub fn create_groups_accumulator(&self) -> Result> { let args = AccumulatorArgs { return_type: &self.data_type, schema: &self.schema, @@ -737,7 +367,7 @@ impl AggregateExpr for AggregateFunctionExpr { self.fun.create_groups_accumulator(args) } - fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { + pub fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { if self.ordering_req.is_empty() { return None; } @@ -749,7 +379,7 @@ impl AggregateExpr for AggregateFunctionExpr { None } - fn order_sensitivity(&self) -> AggregateOrderSensitivity { + pub fn order_sensitivity(&self) -> AggregateOrderSensitivity { if !self.ordering_req.is_empty() { // If there is requirement, use the sensitivity of the implementation self.fun.order_sensitivity() @@ -759,10 +389,10 @@ impl AggregateExpr for AggregateFunctionExpr { } } - fn with_beneficial_ordering( + pub fn with_beneficial_ordering( self: Arc, beneficial_ordering: bool, - ) -> Result>> { + ) -> Result>> { let Some(updated_fn) = self .fun .clone() @@ -782,7 +412,7 @@ impl AggregateExpr for AggregateFunctionExpr { .map(Some) } - fn reverse_expr(&self) -> Option> { + pub fn reverse_expr(&self) -> Option> { match self.fun.reverse_udf() { ReversedUDAF::NotSupported => None, ReversedUDAF::Identical => Some(Arc::new(self.clone())), @@ -791,7 +421,7 @@ impl AggregateExpr for AggregateFunctionExpr { let mut name = self.name().to_string(); // If the function is changed, we need to reverse order_by clause as well // i.e. First(a order by b asc null first) -> Last(a order by b desc null last) - if self.fun().name() == reverse_udf.name() { + if self.name() == reverse_udf.name() { } else { replace_order_by_clause(&mut name); } @@ -810,50 +440,40 @@ impl AggregateExpr for AggregateFunctionExpr { } } - fn get_minmax_desc(&self) -> Option<(Field, bool)> { + pub fn get_minmax_desc(&self) -> Option<(Field, bool)> { self.fun .is_descending() .and_then(|flag| self.field().ok().map(|f| (f, flag))) } - fn default_value(&self, data_type: &DataType) -> Result { + pub fn default_value(&self, data_type: &DataType) -> Result { self.fun.default_value(data_type) } -} -/// Downcast a `Box` or `Arc` -/// and return the inner trait object as [`Any`] so -/// that it can be downcast to a specific implementation. -/// -/// This method is used when implementing the `PartialEq` -/// for [`AggregateExpr`] aggregation expressions and allows comparing the equality -/// between the trait objects. -pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { - if let Some(obj) = any.downcast_ref::>() { - obj.as_any() - } else if let Some(obj) = any.downcast_ref::>() { - obj.as_any() - } else { - any + pub fn is_nullable(&self) -> bool { + self.is_nullable + } + + pub fn expressions(&self) -> Vec> { + self.args.clone() + } + + pub fn name(&self) -> &str { + &self.name } } -impl PartialEq for AggregateFunctionExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.fun == x.fun - && self.args.len() == x.args.len() - && self - .args - .iter() - .zip(x.args.iter()) - .all(|(this_arg, other_arg)| this_arg.eq(other_arg)) - }) - .unwrap_or(false) +impl PartialEq for AggregateFunctionExpr { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.data_type == other.data_type + && self.fun == other.fun + && self.args.len() == other.args.len() + && self + .args + .iter() + .zip(other.args.iter()) + .all(|(this_arg, other_arg)| this_arg.eq(other_arg)) } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index e5a0db1d55ac..7db7188b85d3 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -45,7 +45,6 @@ pub mod execution_props { pub use datafusion_expr::var_provider::{VarProvider, VarType}; } -pub use aggregate::AggregateExpr; pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; pub use equivalence::{calculate_union, ConstExpr, EquivalenceProperties}; diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 52015f425217..5439e140502a 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -29,20 +29,19 @@ use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{Accumulator, WindowFrame}; +use crate::aggregate::AggregateFunctionExpr; use crate::window::window_expr::AggregateWindowExpr; use crate::window::{ PartitionBatches, PartitionWindowAggStates, SlidingAggregateWindowExpr, WindowExpr, }; -use crate::{ - expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr, -}; +use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr}; /// A window expr that takes the form of an aggregate function. /// /// See comments on [`WindowExpr`] for more details. #[derive(Debug)] pub struct PlainAggregateWindowExpr { - aggregate: Arc, + aggregate: Arc, partition_by: Vec>, order_by: Vec, window_frame: Arc, @@ -51,7 +50,7 @@ pub struct PlainAggregateWindowExpr { impl PlainAggregateWindowExpr { /// Create a new aggregate window function expression pub fn new( - aggregate: Arc, + aggregate: Arc, partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, @@ -65,7 +64,7 @@ impl PlainAggregateWindowExpr { } /// Get aggregate expr of AggregateWindowExpr - pub fn get_aggregate_expr(&self) -> &Arc { + pub fn get_aggregate_expr(&self) -> &Arc { &self.aggregate } } diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index ae0aeceb3b4e..e06c0cf90919 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -28,13 +28,12 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{Accumulator, WindowFrame}; +use crate::aggregate::AggregateFunctionExpr; use crate::window::window_expr::AggregateWindowExpr; use crate::window::{ PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr, WindowExpr, }; -use crate::{ - expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr, -}; +use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr}; /// A window expr that takes the form of an aggregate function that /// can be incrementally computed over sliding windows. @@ -42,7 +41,7 @@ use crate::{ /// See comments on [`WindowExpr`] for more details. #[derive(Debug)] pub struct SlidingAggregateWindowExpr { - aggregate: Arc, + aggregate: Arc, partition_by: Vec>, order_by: Vec, window_frame: Arc, @@ -51,7 +50,7 @@ pub struct SlidingAggregateWindowExpr { impl SlidingAggregateWindowExpr { /// Create a new (sliding) aggregate window function expression. pub fn new( - aggregate: Arc, + aggregate: Arc, partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, @@ -65,7 +64,7 @@ impl SlidingAggregateWindowExpr { } /// Get the [AggregateExpr] of this object. - pub fn get_aggregate_expr(&self) -> &Arc { + pub fn get_aggregate_expr(&self) -> &Arc { &self.aggregate } } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 66b250c5063b..a41ed197bbc0 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -23,7 +23,7 @@ use datafusion_common::scalar::ScalarValue; use datafusion_common::Result; use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::projection::ProjectionExec; -use datafusion_physical_plan::{expressions, AggregateExpr, ExecutionPlan, Statistics}; +use datafusion_physical_plan::{expressions, ExecutionPlan, Statistics}; use crate::PhysicalOptimizerRule; use datafusion_common::stats::Precision; @@ -137,7 +137,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option> /// If this agg_expr is a count that can be exactly derived from the statistics, return it. fn take_optimizable_column_and_table_count( - agg_expr: &dyn AggregateExpr, + agg_expr: &AggregateFunctionExpr, stats: &Statistics, ) -> Option<(ScalarValue, String)> { let col_stats = &stats.column_statistics; @@ -174,7 +174,7 @@ fn take_optimizable_column_and_table_count( /// If this agg_expr is a min that is exactly defined in the statistics, return it. fn take_optimizable_min( - agg_expr: &dyn AggregateExpr, + agg_expr: &AggregateFunctionExpr, stats: &Statistics, ) -> Option<(ScalarValue, String)> { if let Precision::Exact(num_rows) = &stats.num_rows { @@ -220,7 +220,7 @@ fn take_optimizable_min( /// If this agg_expr is a max that is exactly defined in the statistics, return it. fn take_optimizable_max( - agg_expr: &dyn AggregateExpr, + agg_expr: &AggregateFunctionExpr, stats: &Statistics, ) -> Option<(ScalarValue, String)> { if let Precision::Exact(num_rows) = &stats.num_rows { @@ -266,33 +266,27 @@ fn take_optimizable_max( // TODO: Move this check into AggregateUDFImpl // https://github.com/apache/datafusion/issues/11153 -fn is_non_distinct_count(agg_expr: &dyn AggregateExpr) -> bool { - if let Some(agg_expr) = agg_expr.as_any().downcast_ref::() { - if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() { - return true; - } +fn is_non_distinct_count(agg_expr: &AggregateFunctionExpr) -> bool { + if agg_expr.name() == "count" && !agg_expr.is_distinct() { + return true; } false } // TODO: Move this check into AggregateUDFImpl // https://github.com/apache/datafusion/issues/11153 -fn is_min(agg_expr: &dyn AggregateExpr) -> bool { - if let Some(agg_expr) = agg_expr.as_any().downcast_ref::() { - if agg_expr.fun().name().to_lowercase() == "min" { - return true; - } +fn is_min(agg_expr: &AggregateFunctionExpr) -> bool { + if agg_expr.name().to_lowercase() == "min" { + return true; } false } // TODO: Move this check into AggregateUDFImpl // https://github.com/apache/datafusion/issues/11153 -fn is_max(agg_expr: &dyn AggregateExpr) -> bool { - if let Some(agg_expr) = agg_expr.as_any().downcast_ref::() { - if agg_expr.fun().name().to_lowercase() == "max" { - return true; - } +fn is_max(agg_expr: &AggregateFunctionExpr) -> bool { + if agg_expr.name().to_lowercase() == "max" { + return true; } false } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index a1d91b9d3296..565528809482 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -42,10 +42,11 @@ use datafusion_expr::Accumulator; use datafusion_physical_expr::{ equivalence::{collapse_lex_req, ProjectionMapping}, expressions::{Column, UnKnownColumn}, - physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering, - LexRequirement, PhysicalExpr, PhysicalSortRequirement, + physical_exprs_contains, EquivalenceProperties, LexOrdering, LexRequirement, + PhysicalExpr, PhysicalSortRequirement, }; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use itertools::Itertools; pub mod group_values; @@ -253,7 +254,7 @@ pub struct AggregateExec { /// Group by expressions group_by: PhysicalGroupBy, /// Aggregate expressions - aggr_expr: Vec>, + aggr_expr: Vec>, /// FILTER (WHERE clause) expression for each aggregate expression filter_expr: Vec>>, /// Set if the output of this aggregation is truncated by a upstream sort/limit clause @@ -280,7 +281,10 @@ impl AggregateExec { /// Function used in `ConvertFirstLast` optimizer rule, /// where we need parts of the new value, others cloned from the old one /// Rewrites aggregate exec with new aggregate expressions. - pub fn with_new_aggr_exprs(&self, aggr_expr: Vec>) -> Self { + pub fn with_new_aggr_exprs( + &self, + aggr_expr: Vec>, + ) -> Self { Self { aggr_expr, // clone the rest of the fields @@ -306,7 +310,7 @@ impl AggregateExec { pub fn try_new( mode: AggregateMode, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, @@ -343,7 +347,7 @@ impl AggregateExec { fn try_new_with_schema( mode: AggregateMode, group_by: PhysicalGroupBy, - mut aggr_expr: Vec>, + mut aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, @@ -451,7 +455,7 @@ impl AggregateExec { } /// Aggregate expressions - pub fn aggr_expr(&self) -> &[Arc] { + pub fn aggr_expr(&self) -> &[Arc] { &self.aggr_expr } @@ -788,7 +792,7 @@ impl ExecutionPlan for AggregateExec { fn create_schema( input_schema: &Schema, group_expr: &[(Arc, String)], - aggr_expr: &[Arc], + aggr_expr: &[Arc], contains_null_expr: bool, mode: AggregateMode, ) -> Result { @@ -834,7 +838,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { /// /// # Parameters /// -/// - `aggr_expr`: A reference to an `Arc` representing the +/// - `aggr_expr`: A reference to an `Arc` representing the /// aggregate expression. /// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the /// physical GROUP BY expression. @@ -846,7 +850,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { /// A `LexOrdering` instance indicating the lexical ordering requirement for /// the aggregate expression. fn get_aggregate_expr_req( - aggr_expr: &Arc, + aggr_expr: &Arc, group_by: &PhysicalGroupBy, agg_mode: &AggregateMode, ) -> LexOrdering { @@ -894,7 +898,7 @@ fn get_aggregate_expr_req( /// the aggregator requirement is incompatible. fn finer_ordering( existing_req: &LexOrdering, - aggr_expr: &Arc, + aggr_expr: &Arc, group_by: &PhysicalGroupBy, eq_properties: &EquivalenceProperties, agg_mode: &AggregateMode, @@ -912,7 +916,7 @@ pub fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { /// /// # Parameters /// -/// - `aggr_exprs`: A slice of `Arc` containing all the +/// - `aggr_exprs`: A slice of `Arc` containing all the /// aggregate expressions. /// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the /// physical GROUP BY expression. @@ -926,7 +930,7 @@ pub fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { /// A `LexRequirement` instance, which is the requirement that satisfies all the /// aggregate requirements. Returns an error in case of conflicting requirements. pub fn get_finer_aggregate_exprs_requirement( - aggr_exprs: &mut [Arc], + aggr_exprs: &mut [Arc], group_by: &PhysicalGroupBy, eq_properties: &EquivalenceProperties, agg_mode: &AggregateMode, @@ -996,10 +1000,10 @@ pub fn get_finer_aggregate_exprs_requirement( /// returns physical expressions for arguments to evaluate against a batch /// The expressions are different depending on `mode`: -/// * Partial: AggregateExpr::expressions -/// * Final: columns of `AggregateExpr::state_fields()` +/// * Partial: AggregateFunctionExpr::expressions +/// * Final: columns of `AggregateFunctionExpr::state_fields()` pub fn aggregate_expressions( - aggr_expr: &[Arc], + aggr_expr: &[Arc], mode: &AggregateMode, col_idx_base: usize, ) -> Result>>> { @@ -1035,12 +1039,12 @@ pub fn aggregate_expressions( } /// uses `state_fields` to build a vec of physical column expressions required to merge the -/// AggregateExpr' accumulator's state. +/// AggregateFunctionExpr' accumulator's state. /// /// `index_base` is the starting physical column index for the next expanded state field. fn merge_expressions( index_base: usize, - expr: &Arc, + expr: &Arc, ) -> Result>> { expr.state_fields().map(|fields| { fields @@ -1054,7 +1058,7 @@ fn merge_expressions( pub type AccumulatorItem = Box; pub fn create_accumulators( - aggr_expr: &[Arc], + aggr_expr: &[Arc], ) -> Result> { aggr_expr .iter() @@ -1496,13 +1500,12 @@ mod tests { groups: vec![vec![false]], }; - let aggregates: Vec> = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) - .schema(Arc::clone(&input_schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates: Vec> = vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) + .schema(Arc::clone(&input_schema)) + .alias("AVG(b)") + .build()?, + ]; let task_ctx = if spill { // set to an appropriate value to trigger spill @@ -1793,7 +1796,7 @@ mod tests { } // Median(a) - fn test_median_agg_expr(schema: SchemaRef) -> Result> { + fn test_median_agg_expr(schema: SchemaRef) -> Result> { AggregateExprBuilder::new(median_udaf(), vec![col("a", &schema)?]) .schema(schema) .alias("MEDIAN(a)") @@ -1819,17 +1822,16 @@ mod tests { }; // something that allocates within the aggregator - let aggregates_v0: Vec> = + let aggregates_v0: Vec> = vec![test_median_agg_expr(Arc::clone(&input_schema))?]; // use fast-path in `row_hash.rs`. - let aggregates_v2: Vec> = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) - .schema(Arc::clone(&input_schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates_v2: Vec> = vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) + .schema(Arc::clone(&input_schema)) + .alias("AVG(b)") + .build()?, + ]; for (version, groups, aggregates) in [ (0, groups_none, aggregates_v0), @@ -1883,13 +1885,12 @@ mod tests { let groups = PhysicalGroupBy::default(); - let aggregates: Vec> = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("AVG(a)") - .build()?, - ]; + let aggregates: Vec> = vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("AVG(a)") + .build()?, + ]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); @@ -1923,13 +1924,12 @@ mod tests { let groups = PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]); - let aggregates: Vec> = - vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates: Vec> = vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("AVG(b)") + .build()?, + ]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); @@ -1974,7 +1974,7 @@ mod tests { fn test_first_value_agg_expr( schema: &Schema, sort_options: SortOptions, - ) -> Result> { + ) -> Result> { let ordering_req = [PhysicalSortExpr { expr: col("b", schema)?, options: sort_options, @@ -1992,7 +1992,7 @@ mod tests { fn test_last_value_agg_expr( schema: &Schema, sort_options: SortOptions, - ) -> Result> { + ) -> Result> { let ordering_req = [PhysicalSortExpr { expr: col("b", schema)?, options: sort_options, @@ -2047,7 +2047,7 @@ mod tests { descending: false, nulls_first: false, }; - let aggregates: Vec> = if is_first_acc { + let aggregates: Vec> = if is_first_acc { vec![test_first_value_agg_expr(&schema, sort_options)?] } else { vec![test_last_value_agg_expr(&schema, sort_options)?] @@ -2211,7 +2211,7 @@ mod tests { }; let groups = PhysicalGroupBy::new_single(vec![(col_a, "a".to_string())]); - let aggregates: Vec> = vec![ + let aggregates: Vec> = vec![ test_first_value_agg_expr(&schema, option_desc)?, test_last_value_agg_expr(&schema, option_desc)?, ]; @@ -2269,7 +2269,7 @@ mod tests { ], ); - let aggregates: Vec> = + let aggregates: Vec> = vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1)]) .schema(Arc::clone(&schema)) .alias("1") diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index b3221752d034..7429e67bbfa0 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -47,10 +47,9 @@ use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_expr::{EmitTo, GroupsAccumulator}; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::{ - AggregateExpr, GroupsAccumulatorAdapter, PhysicalSortExpr, -}; +use datafusion_physical_expr::{GroupsAccumulatorAdapter, PhysicalSortExpr}; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use futures::ready; use futures::stream::{Stream, StreamExt}; use log::debug; @@ -537,7 +536,7 @@ impl GroupedHashAggregateStream { /// that is supported by the aggregate, or a /// [`GroupsAccumulatorAdapter`] if not. pub(crate) fn create_group_accumulator( - agg_expr: &Arc, + agg_expr: &Arc, ) -> Result> { if agg_expr.groups_accumulator_supported() { agg_expr.create_groups_accumulator() diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index a6a15e46860c..17040406bf3c 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -34,7 +34,7 @@ pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; pub use datafusion_physical_expr::{ - expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr, + expressions, functions, udf, Distribution, Partitioning, PhysicalExpr, }; use datafusion_physical_expr::{ EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement, diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 0196999b4930..cf3f744f48cb 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -31,7 +31,7 @@ pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr::PhysicalSortExpr; pub use datafusion_physical_expr::{ - expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr, + expressions, functions, udf, Distribution, Partitioning, PhysicalExpr, }; pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index c721c0caebaf..dce86aa8d95a 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -35,13 +35,12 @@ use datafusion_expr::{ BuiltInWindowFunction, PartitionEvaluator, WindowFrame, WindowFunctionDefinition, WindowUDF, }; -use datafusion_physical_expr::aggregate::AggregateExprBuilder; +use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ reverse_order_bys, window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr}, - AggregateExpr, ConstExpr, EquivalenceProperties, LexOrdering, - PhysicalSortRequirement, + ConstExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement, }; use itertools::Itertools; @@ -139,7 +138,7 @@ fn window_expr_from_aggregate_expr( partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, - aggregate: Arc, + aggregate: Arc, ) -> Arc { // Is there a potentially unlimited sized window frame? let unbounded_window = window_frame.start_bound.is_unbounded(); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 0f6722dd375b..369bdced0fad 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -34,6 +34,7 @@ use datafusion::datasource::physical_plan::ParquetExec; use datafusion::datasource::physical_plan::{AvroExec, CsvExec}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; +use datafusion::physical_expr::aggregate::AggregateFunctionExpr; use datafusion::physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; use datafusion::physical_plan::aggregates::AggregateMode; use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy}; @@ -59,7 +60,7 @@ use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMerge use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use datafusion::physical_plan::{ - AggregateExpr, ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr, + ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr, }; use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use datafusion_expr::{AggregateUDF, ScalarUDF}; @@ -464,7 +465,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { }) .collect::, _>>()?; - let physical_aggr_expr: Vec> = hash_agg + let physical_aggr_expr: Vec> = hash_agg .aggr_expr .iter() .zip(hash_agg.aggr_expr_name.iter()) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 7949a457f40f..ebbcef1249c6 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -29,7 +29,7 @@ use datafusion::physical_plan::expressions::{ }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; -use datafusion::physical_plan::{AggregateExpr, Partitioning, PhysicalExpr, WindowExpr}; +use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; use datafusion::{ datasource::{ file_format::{csv::CsvSink, json::JsonSink}, @@ -49,58 +49,50 @@ use crate::protobuf::{ use super::PhysicalExtensionCodec; pub fn serialize_physical_aggr_expr( - aggr_expr: Arc, + aggr_expr: Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { let expressions = serialize_physical_exprs(aggr_expr.expressions(), codec)?; let ordering_req = aggr_expr.order_bys().unwrap_or(&[]).to_vec(); let ordering_req = serialize_physical_sort_exprs(ordering_req, codec)?; - if let Some(a) = aggr_expr.as_any().downcast_ref::() { - let name = a.fun().name().to_string(); - let mut buf = Vec::new(); - codec.try_encode_udaf(a.fun(), &mut buf)?; - Ok(protobuf::PhysicalExprNode { - expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( - protobuf::PhysicalAggregateExprNode { - aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)), - expr: expressions, - ordering_req, - distinct: a.is_distinct(), - ignore_nulls: a.ignore_nulls(), - fun_definition: (!buf.is_empty()).then_some(buf) - }, - )), - }) - } else { - unreachable!("No other types exists besides AggergationFunctionExpr"); - } + let name = aggr_expr.name().to_string(); + let mut buf = Vec::new(); + codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; + Ok(protobuf::PhysicalExprNode { + expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( + protobuf::PhysicalAggregateExprNode { + aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)), + expr: expressions, + ordering_req, + distinct: aggr_expr.is_distinct(), + ignore_nulls: aggr_expr.ignore_nulls(), + fun_definition: (!buf.is_empty()).then_some(buf) + }, + )), + }) } fn serialize_physical_window_aggr_expr( - aggr_expr: &dyn AggregateExpr, + aggr_expr: &AggregateFunctionExpr, _window_frame: &WindowFrame, codec: &dyn PhysicalExtensionCodec, ) -> Result<(physical_window_expr_node::WindowFunction, Option>)> { - if let Some(a) = aggr_expr.as_any().downcast_ref::() { - if a.is_distinct() || a.ignore_nulls() { - // TODO - return not_impl_err!( - "Distinct aggregate functions not supported in window expressions" - ); - } - - let mut buf = Vec::new(); - codec.try_encode_udaf(a.fun(), &mut buf)?; - Ok(( - physical_window_expr_node::WindowFunction::UserDefinedAggrFunction( - a.fun().name().to_string(), - ), - (!buf.is_empty()).then_some(buf), - )) - } else { - unreachable!("No other types exists besides AggergationFunctionExpr"); + if aggr_expr.is_distinct() || aggr_expr.ignore_nulls() { + // TODO + return not_impl_err!( + "Distinct aggregate functions not supported in window expressions" + ); } + + let mut buf = Vec::new(); + codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; + Ok(( + physical_window_expr_node::WindowFunction::UserDefinedAggrFunction( + aggr_expr.name().to_string(), + ), + (!buf.is_empty()).then_some(buf), + )) } pub fn serialize_physical_window_expr( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 6766468ef443..1725f8d143dc 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -46,7 +46,6 @@ use datafusion::datasource::physical_plan::{ use datafusion::execution::FunctionRegistry; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility}; -use datafusion::physical_expr::aggregate::utils::down_cast_any_ref; use datafusion::physical_expr::expressions::Literal; use datafusion::physical_expr::window::SlidingAggregateWindowExpr; use datafusion::physical_expr::{PhysicalSortRequirement, ScalarFunctionExpr}; @@ -69,13 +68,12 @@ use datafusion::physical_plan::placeholder_row::PlaceholderRowExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; use datafusion::physical_plan::windows::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowAggExec, }; -use datafusion::physical_plan::{ - AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, Statistics, -}; +use datafusion::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr, Statistics}; use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; use datafusion_common::config::TableParquetOptions; @@ -361,7 +359,7 @@ fn rountrip_aggregate() -> Result<()> { .alias("NTH_VALUE(b, 1)") .build()?; - let test_cases: Vec>> = vec![ + let test_cases: Vec>> = vec![ // AVG vec![avg_expr], // NTH_VALUE @@ -394,7 +392,7 @@ fn rountrip_aggregate_with_limit() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec> = + let aggregates: Vec> = vec![ AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) @@ -423,7 +421,7 @@ fn rountrip_aggregate_with_approx_pencentile_cont() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec> = vec![AggregateExprBuilder::new( + let aggregates: Vec> = vec![AggregateExprBuilder::new( approx_percentile_cont_udaf(), vec![col("b", &schema)?, lit(0.5)], ) @@ -458,7 +456,7 @@ fn rountrip_aggregate_with_sort() -> Result<()> { }, }]; - let aggregates: Vec> = + let aggregates: Vec> = vec![ AggregateExprBuilder::new(array_agg_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) @@ -525,7 +523,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec> = + let aggregates: Vec> = vec![ AggregateExprBuilder::new(Arc::new(udaf), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) @@ -730,7 +728,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { } impl PartialEq for CustomPredicateExpr { fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) + other .downcast_ref::() .map(|x| self.inner.eq(&x.inner)) .unwrap_or(false) From 21587898ca869ee58ef0261368c97f5ef5267e99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Wed, 21 Aug 2024 17:36:07 +0800 Subject: [PATCH 05/13] Fix doc and tests --- datafusion/core/src/lib.rs | 1 - datafusion/core/src/physical_optimizer/update_aggr_exprs.rs | 2 +- datafusion/core/src/physical_planner.rs | 2 +- datafusion/functions-aggregate-common/src/aggregate.rs | 3 --- datafusion/physical-expr/src/aggregate.rs | 6 +++++- datafusion/physical-expr/src/window/sliding_aggregate.rs | 2 +- datafusion/physical-plan/src/aggregates/row_hash.rs | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 3f670fde7e47..2a1fa90b8584 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -490,7 +490,6 @@ //! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule //! [`Schema`]: arrow::datatypes::Schema //! [`PhysicalExpr`]: physical_plan::PhysicalExpr -//! [`AggregateExpr`]: physical_plan::AggregateExpr //! [`RecordBatch`]: arrow::record_batch::RecordBatch //! [`RecordBatchReader`]: arrow::record_batch::RecordBatchReader //! [`Array`]: arrow::array::Array diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs index 08609a03b515..a2726d62e9f6 100644 --- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs +++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs @@ -118,7 +118,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { /// /// # Parameters /// -/// * `aggr_exprs` - A vector of `Arc` representing the +/// * `aggr_exprs` - A vector of `Arc` representing the /// aggregate expressions to be optimized. /// * `prefix_requirement` - An array slice representing the ordering /// requirements preceding the aggregate expressions. diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 38820f4b1170..289963a9f440 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -720,7 +720,7 @@ impl DefaultPhysicalPlanner { // optimization purposes. For example, a FIRST_VALUE may turn // into a LAST_VALUE with the reverse ordering requirement. // To reflect such changes to subsequent stages, use the updated - // `AggregateExpr`/`PhysicalSortExpr` objects. + // `AggregateFunctionExpr`/`PhysicalSortExpr` objects. let updated_aggregates = initial_aggr.aggr_expr().to_vec(); let next_partition_mode = if can_repartition { diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs b/datafusion/functions-aggregate-common/src/aggregate.rs index 1a729b148e2e..c9cbaa8396fc 100644 --- a/datafusion/functions-aggregate-common/src/aggregate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate.rs @@ -15,8 +15,5 @@ // specific language governing permissions and limitations // under the License. -//! [`AggregateExpr`] which defines the interface all aggregate expressions -//! (built-in and custom) need to satisfy. - pub mod count_distinct; pub mod groups_accumulator; diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 62ec0fab6640..0d8f27565fa1 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -255,7 +255,11 @@ impl AggregateFunctionExpr { } pub fn field(&self) -> Result { - Ok(Field::new(&self.name, self.data_type.clone(), true)) + Ok(Field::new( + &self.name, + self.data_type.clone(), + self.is_nullable, + )) } pub fn create_accumulator(&self) -> Result> { diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index e06c0cf90919..e25776ce0b6e 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -63,7 +63,7 @@ impl SlidingAggregateWindowExpr { } } - /// Get the [AggregateExpr] of this object. + /// Get the [AggregateFunctionExpr] of this object. pub fn get_aggregate_expr(&self) -> &Arc { &self.aggregate } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 7429e67bbfa0..a0d83b918e08 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -319,7 +319,7 @@ pub(crate) struct GroupedHashAggregateStream { input: SendableRecordBatchStream, mode: AggregateMode, - /// Accumulators, one for each `AggregateExpr` in the query + /// Accumulators, one for each `AggregateFunctionExpr` in the query /// /// For example, if the query has aggregates, `SUM(x)`, /// `COUNT(y)`, there will be two accumulators, each one From 8461a512e9c23ee82d9b1ff96c1fe1191c9630d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Wed, 21 Aug 2024 17:50:03 +0800 Subject: [PATCH 06/13] Remove empty crate physical-expr-functions-aggregate --- Cargo.toml | 2 - datafusion/core/Cargo.toml | 1 - .../Cargo.toml | 48 ------------------- .../src/lib.rs | 18 ------- datafusion/physical-plan/Cargo.toml | 1 - 5 files changed, 70 deletions(-) delete mode 100644 datafusion/physical-expr-functions-aggregate/Cargo.toml delete mode 100644 datafusion/physical-expr-functions-aggregate/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index ae344a46a1bd..5ffe1a70b0a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ members = [ "datafusion/optimizer", "datafusion/physical-expr", "datafusion/physical-expr-common", - "datafusion/physical-expr-functions-aggregate", "datafusion/physical-optimizer", "datafusion/physical-plan", "datafusion/proto", @@ -107,7 +106,6 @@ datafusion-functions-window = { path = "datafusion/functions-window", version = datafusion-optimizer = { path = "datafusion/optimizer", version = "41.0.0", default-features = false } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "41.0.0", default-features = false } datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "41.0.0", default-features = false } -datafusion-physical-expr-functions-aggregate = { path = "datafusion/physical-expr-functions-aggregate", version = "41.0.0" } datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "41.0.0" } datafusion-physical-plan = { path = "datafusion/physical-plan", version = "41.0.0" } datafusion-proto = { path = "datafusion/proto", version = "41.0.0" } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index adbba3eb31d6..de228e058096 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -110,7 +110,6 @@ datafusion-functions-window = { workspace = true } datafusion-optimizer = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } -datafusion-physical-expr-functions-aggregate = { workspace = true } datafusion-physical-optimizer = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-sql = { workspace = true } diff --git a/datafusion/physical-expr-functions-aggregate/Cargo.toml b/datafusion/physical-expr-functions-aggregate/Cargo.toml deleted file mode 100644 index 6eed89614c53..000000000000 --- a/datafusion/physical-expr-functions-aggregate/Cargo.toml +++ /dev/null @@ -1,48 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "datafusion-physical-expr-functions-aggregate" -description = "Logical plan and expression representation for DataFusion query engine" -keywords = ["datafusion", "logical", "plan", "expressions"] -readme = "README.md" -version = { workspace = true } -edition = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -license = { workspace = true } -authors = { workspace = true } -rust-version = { workspace = true } - -[lints] -workspace = true - -[lib] -name = "datafusion_physical_expr_functions_aggregate" -path = "src/lib.rs" - -[features] - -[dependencies] -ahash = { workspace = true } -arrow = { workspace = true } -datafusion-common = { workspace = true } -datafusion-expr = { workspace = true } -datafusion-expr-common = { workspace = true } -datafusion-functions-aggregate-common = { workspace = true } -datafusion-physical-expr-common = { workspace = true } -rand = { workspace = true } diff --git a/datafusion/physical-expr-functions-aggregate/src/lib.rs b/datafusion/physical-expr-functions-aggregate/src/lib.rs deleted file mode 100644 index 509d3a4290a8..000000000000 --- a/datafusion/physical-expr-functions-aggregate/src/lib.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Technically, all aggregate functions that depend on `expr` crate should be included here. diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 78da4dc9c53f..24387c5f15ee 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -55,7 +55,6 @@ datafusion-functions-aggregate = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-physical-expr-common = { workspace = true } -datafusion-physical-expr-functions-aggregate = { workspace = true } futures = { workspace = true } half = { workspace = true } hashbrown = { workspace = true } From a918e7e9e495f57b72d239de3dc24d82e35eb6fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Wed, 21 Aug 2024 17:56:00 +0800 Subject: [PATCH 07/13] Use func name instead of expr name --- datafusion/physical-optimizer/src/aggregate_statistics.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index a41ed197bbc0..3dd962611d97 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -267,7 +267,7 @@ fn take_optimizable_max( // TODO: Move this check into AggregateUDFImpl // https://github.com/apache/datafusion/issues/11153 fn is_non_distinct_count(agg_expr: &AggregateFunctionExpr) -> bool { - if agg_expr.name() == "count" && !agg_expr.is_distinct() { + if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() { return true; } false @@ -276,7 +276,7 @@ fn is_non_distinct_count(agg_expr: &AggregateFunctionExpr) -> bool { // TODO: Move this check into AggregateUDFImpl // https://github.com/apache/datafusion/issues/11153 fn is_min(agg_expr: &AggregateFunctionExpr) -> bool { - if agg_expr.name().to_lowercase() == "min" { + if agg_expr.fun().name().to_lowercase() == "min" { return true; } false @@ -285,7 +285,7 @@ fn is_min(agg_expr: &AggregateFunctionExpr) -> bool { // TODO: Move this check into AggregateUDFImpl // https://github.com/apache/datafusion/issues/11153 fn is_max(agg_expr: &AggregateFunctionExpr) -> bool { - if agg_expr.name().to_lowercase() == "max" { + if agg_expr.fun().name().to_lowercase() == "max" { return true; } false From 5a553da932e1894e3b770f406f64fbbc9a74f5c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Wed, 21 Aug 2024 18:07:06 +0800 Subject: [PATCH 08/13] Fix tests --- datafusion/physical-expr/src/aggregate.rs | 6 +----- datafusion/proto/src/physical_plan/to_proto.rs | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 0d8f27565fa1..66d3322c68e5 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -112,11 +112,7 @@ impl AggregateExprBuilder { .map(|e| e.expr.data_type(&schema)) .collect::>>()?; - ordering_fields = - datafusion_functions_aggregate_common::utils::ordering_fields( - &ordering_req, - &ordering_types, - ); + ordering_fields = utils::ordering_fields(&ordering_req, &ordering_types); } let input_exprs_types = args diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index ebbcef1249c6..555ad22a9bc1 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -56,7 +56,7 @@ pub fn serialize_physical_aggr_expr( let ordering_req = aggr_expr.order_bys().unwrap_or(&[]).to_vec(); let ordering_req = serialize_physical_sort_exprs(ordering_req, codec)?; - let name = aggr_expr.name().to_string(); + let name = aggr_expr.fun().name().to_string(); let mut buf = Vec::new(); codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { @@ -89,7 +89,7 @@ fn serialize_physical_window_aggr_expr( codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; Ok(( physical_window_expr_node::WindowFunction::UserDefinedAggrFunction( - aggr_expr.name().to_string(), + aggr_expr.fun().name().to_string(), ), (!buf.is_empty()).then_some(buf), )) From cb83a8bf1ec8e6908430e5ae74b6990d4c870660 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Thu, 22 Aug 2024 10:17:12 +0800 Subject: [PATCH 09/13] Fix tests and clippy lints --- datafusion/core/src/lib.rs | 5 ----- datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 2 +- datafusion/physical-expr/src/aggregate.rs | 2 +- datafusion/physical-optimizer/src/aggregate_statistics.rs | 6 +++--- datafusion/proto/src/physical_plan/mod.rs | 2 +- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 2 +- 6 files changed, 7 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 3b2694e9b264..67f3cb01c0a4 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -555,11 +555,6 @@ pub mod physical_expr_common { pub use datafusion_physical_expr_common::*; } -/// re-export of [`datafusion_physical_expr_functions_aggregate`] crate -pub mod physical_expr_functions_aggregate { - pub use datafusion_physical_expr::*; -} - /// re-export of [`datafusion_physical_expr`] crate pub mod physical_expr { pub use datafusion_physical_expr::*; diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 138e5bda7f39..62e9be63983c 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -25,7 +25,7 @@ use arrow::util::pretty::pretty_format_batches; use arrow_array::types::Int64Type; use datafusion::common::Result; use datafusion::datasource::MemTable; -use datafusion::physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; +use datafusion::physical_expr::aggregate::AggregateExprBuilder; use datafusion::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 5a2b3bff44ab..9b8b9e7856b7 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -419,7 +419,7 @@ impl AggregateFunctionExpr { let mut name = self.name().to_string(); // If the function is changed, we need to reverse order_by clause as well // i.e. First(a order by b asc null first) -> Last(a order by b desc null last) - if self.name() == reverse_udf.name() { + if self.fun().name() == reverse_udf.name() { } else { replace_order_by_clause(&mut name); } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 3dd962611d97..2b8725b5bac7 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -58,12 +58,12 @@ impl PhysicalOptimizerRule for AggregateStatistics { let mut projections = vec![]; for expr in partial_agg_exec.aggr_expr() { if let Some((non_null_rows, name)) = - take_optimizable_column_and_table_count(&**expr, &stats) + take_optimizable_column_and_table_count(expr, &stats) { projections.push((expressions::lit(non_null_rows), name.to_owned())); - } else if let Some((min, name)) = take_optimizable_min(&**expr, &stats) { + } else if let Some((min, name)) = take_optimizable_min(expr, &stats) { projections.push((expressions::lit(min), name.to_owned())); - } else if let Some((max, name)) = take_optimizable_max(&**expr, &stats) { + } else if let Some((max, name)) = take_optimizable_max(expr, &stats) { projections.push((expressions::lit(max), name.to_owned())); } else { // TODO: we need all aggr_expr to be resolved (cf TODO fullres) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 369bdced0fad..25e2c95cae8d 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -18,7 +18,7 @@ use std::fmt::Debug; use std::sync::Arc; -use datafusion::physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; +use datafusion::physical_expr::aggregate::AggregateExprBuilder; use prost::bytes::BufMut; use prost::Message; diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 1725f8d143dc..e439a50cab16 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -24,7 +24,7 @@ use std::vec; use arrow::array::RecordBatch; use arrow::csv::WriterBuilder; -use datafusion::physical_expr_functions_aggregate::aggregate::AggregateExprBuilder; +use datafusion::physical_expr::aggregate::AggregateExprBuilder; use datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf; use datafusion_functions_aggregate::array_agg::array_agg_udaf; use datafusion_functions_aggregate::min_max::max_udaf; From 07ff35f8039840928d38b49d587bf174e1e70c33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Thu, 22 Aug 2024 10:37:25 +0800 Subject: [PATCH 10/13] Cargo update for datafusion-cli --- datafusion-cli/Cargo.lock | 160 ++++++++++++++++++++++---------------- 1 file changed, 91 insertions(+), 69 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index e35eb3906b9a..26a431535458 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "adler32" version = "1.2.0" @@ -167,9 +173,9 @@ checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a" [[package]] name = "arrayvec" -version = "0.7.4" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" @@ -430,7 +436,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -765,7 +771,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.4", "object", "rustc-demangle", ] @@ -815,9 +821,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.5.3" +version = "1.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9ec96fe9a81b5e365f9db71fe00edc4fe4ca2cc7dcb7861f0603012a7caa210" +checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7" dependencies = [ "arrayref", "arrayvec", @@ -999,7 +1005,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1155,7 +1161,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edb49164822f3ee45b17acd4a208cfc1251410cf0cad9a833234c9890774dd9f" dependencies = [ "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1206,7 +1212,6 @@ dependencies = [ "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-expr-common", - "datafusion-physical-expr-functions-aggregate", "datafusion-physical-optimizer", "datafusion-physical-plan", "datafusion-sql", @@ -1498,20 +1503,6 @@ dependencies = [ "rand", ] -[[package]] -name = "datafusion-physical-expr-functions-aggregate" -version = "41.0.0" -dependencies = [ - "ahash", - "arrow", - "datafusion-common", - "datafusion-expr", - "datafusion-expr-common", - "datafusion-functions-aggregate-common", - "datafusion-physical-expr-common", - "rand", -] - [[package]] name = "datafusion-physical-optimizer" version = "41.0.0" @@ -1543,7 +1534,6 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-physical-expr", "datafusion-physical-expr-common", - "datafusion-physical-expr-functions-aggregate", "futures", "half", "hashbrown", @@ -1740,12 +1730,12 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.31" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f211bbe8e69bbd0cfdea405084f128ae8b4aaa6b0b522fc8f2b009084797920" +checksum = "9c0596c1eac1f9e04ed902702e9878208b336edc9d6fddc8a48387349bab3666" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.8.0", ] [[package]] @@ -1828,7 +1818,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -1921,9 +1911,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" dependencies = [ "atomic-waker", "bytes", @@ -2108,7 +2098,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.5", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "httparse", @@ -2145,7 +2135,7 @@ dependencies = [ "hyper 1.4.1", "hyper-util", "rustls 0.23.12", - "rustls-native-certs 0.7.1", + "rustls-native-certs 0.7.2", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -2350,9 +2340,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.156" +version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5f43f184355eefb8d17fc948dbecf6c13be3c141f20d834ae842193a448c72a" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" [[package]] name = "libflate" @@ -2486,6 +2476,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + [[package]] name = "mio" version = "1.0.2" @@ -2826,7 +2825,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3025,9 +3024,9 @@ dependencies = [ [[package]] name = "redox_users" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd283d9651eeda4b2a83a43c1c91b266c40fd76ecd39a50a8c630ae69dc72891" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" dependencies = [ "getrandom", "libredox", @@ -3071,15 +3070,15 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "reqwest" -version = "0.12.5" +version = "0.12.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" +checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" dependencies = [ "base64 0.22.1", "bytes", "futures-core", "futures-util", - "h2 0.4.5", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "http-body-util", @@ -3095,7 +3094,7 @@ dependencies = [ "pin-project-lite", "quinn", "rustls 0.23.12", - "rustls-native-certs 0.7.1", + "rustls-native-certs 0.7.2", "rustls-pemfile 2.1.3", "rustls-pki-types", "serde", @@ -3111,7 +3110,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "winreg", + "windows-registry", ] [[package]] @@ -3250,9 +3249,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +checksum = "04182dffc9091a404e0fc069ea5cd60e5b866c3adf881eff99a32d048242dffa" dependencies = [ "openssl-probe", "rustls-pemfile 2.1.3", @@ -3418,7 +3417,7 @@ checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3560,7 +3559,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3606,7 +3605,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3619,7 +3618,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3641,9 +3640,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.74" +version = "2.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fceb41e3d546d0bd83421d3409b1460cc7444cd389341a4c880fe7a042cb3d7" +checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9" dependencies = [ "proc-macro2", "quote", @@ -3655,6 +3654,9 @@ name = "sync_wrapper" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] [[package]] name = "tempfile" @@ -3701,7 +3703,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3771,9 +3773,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.2" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", @@ -3795,7 +3797,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3892,7 +3894,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -3937,7 +3939,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] @@ -4092,7 +4094,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", "wasm-bindgen-shared", ] @@ -4126,7 +4128,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4210,6 +4212,36 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -4358,16 +4390,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "winreg" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" -dependencies = [ - "cfg-if", - "windows-sys 0.48.0", -] - [[package]] name = "xmlparser" version = "0.13.6" @@ -4401,7 +4423,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.74", + "syn 2.0.75", ] [[package]] From aeaf520d4bc8998588108b5899f7694f90212547 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Thu, 22 Aug 2024 11:37:24 +0800 Subject: [PATCH 11/13] Add docs --- datafusion/physical-expr/src/aggregate.rs | 183 ++++++++++++++-------- 1 file changed, 116 insertions(+), 67 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 9b8b9e7856b7..51f3313910c2 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -221,6 +221,17 @@ impl AggregateFunctionExpr { &self.fun } + /// expressions that are passed to the Accumulator. + /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. + pub fn expressions(&self) -> Vec> { + self.args.clone() + } + + /// Human readable name such as `"MIN(c2)"`. + pub fn name(&self) -> &str { + &self.name + } + /// Return if the aggregation is distinct pub fn is_distinct(&self) -> bool { self.is_distinct @@ -236,18 +247,12 @@ impl AggregateFunctionExpr { self.is_reversed } - pub fn state_fields(&self) -> Result> { - let args = StateFieldsArgs { - name: &self.name, - input_types: &self.input_types, - return_type: &self.data_type, - ordering_fields: &self.ordering_fields, - is_distinct: self.is_distinct, - }; - - self.fun.state_fields(args) + /// Return if the aggregation is nullable + pub fn is_nullable(&self) -> bool { + self.is_nullable } + /// the field of the final result of this aggregation. pub fn field(&self) -> Result { Ok(Field::new( &self.name, @@ -256,6 +261,9 @@ impl AggregateFunctionExpr { )) } + /// the accumulator used to accumulate values from the expressions. + /// the accumulator expects the same number of arguments as `expressions` and must + /// return states with the same description as `state_fields` pub fn create_accumulator(&self) -> Result> { let acc_args = AccumulatorArgs { return_type: &self.data_type, @@ -271,6 +279,82 @@ impl AggregateFunctionExpr { self.fun.accumulator(acc_args) } + /// the field of the final result of this aggregation. + pub fn state_fields(&self) -> Result> { + let args = StateFieldsArgs { + name: &self.name, + input_types: &self.input_types, + return_type: &self.data_type, + ordering_fields: &self.ordering_fields, + is_distinct: self.is_distinct, + }; + + self.fun.state_fields(args) + } + + /// Order by requirements for the aggregate function + /// By default it is `None` (there is no requirement) + /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this + pub fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { + if self.ordering_req.is_empty() { + return None; + } + + if !self.order_sensitivity().is_insensitive() { + return Some(&self.ordering_req); + } + + None + } + + /// Indicates whether aggregator can produce the correct result with any + /// arbitrary input ordering. By default, we assume that aggregate expressions + /// are order insensitive. + pub fn order_sensitivity(&self) -> AggregateOrderSensitivity { + if !self.ordering_req.is_empty() { + // If there is requirement, use the sensitivity of the implementation + self.fun.order_sensitivity() + } else { + // If no requirement, aggregator is order insensitive + AggregateOrderSensitivity::Insensitive + } + } + + /// Sets the indicator whether ordering requirements of the aggregator is + /// satisfied by its input. If this is not the case, aggregators with order + /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce + /// the correct result with possibly more work internally. + /// + /// # Returns + /// + /// Returns `Ok(Some(updated_expr))` if the process completes successfully. + /// If the expression can benefit from existing input ordering, but does + /// not implement the method, returns an error. Order insensitive and hard + /// requirement aggregators return `Ok(None)`. + pub fn with_beneficial_ordering( + self: Arc, + beneficial_ordering: bool, + ) -> Result>> { + let Some(updated_fn) = self + .fun + .clone() + .with_beneficial_ordering(beneficial_ordering)? + else { + return Ok(None); + }; + + AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec()) + .order_by(self.ordering_req.to_vec()) + .schema(Arc::new(self.schema.clone())) + .alias(self.name().to_string()) + .with_ignore_nulls(self.ignore_nulls) + .with_distinct(self.is_distinct) + .with_reversed(self.is_reversed) + .build() + .map(Some) + } + + /// Creates accumulator implementation that supports retract pub fn create_sliding_accumulator(&self) -> Result> { let args = AccumulatorArgs { return_type: &self.data_type, @@ -337,6 +421,9 @@ impl AggregateFunctionExpr { Ok(accumulator) } + /// If the aggregate expression has a specialized + /// [`GroupsAccumulator`] implementation. If this returns true, + /// `[Self::create_groups_accumulator`] will be called. pub fn groups_accumulator_supported(&self) -> bool { let args = AccumulatorArgs { return_type: &self.data_type, @@ -351,6 +438,11 @@ impl AggregateFunctionExpr { self.fun.groups_accumulator_supported(args) } + /// Return a specialized [`GroupsAccumulator`] that manages state + /// for all groups. + /// + /// For maximum performance, a [`GroupsAccumulator`] should be + /// implemented in addition to [`Accumulator`]. pub fn create_groups_accumulator(&self) -> Result> { let args = AccumulatorArgs { return_type: &self.data_type, @@ -365,51 +457,10 @@ impl AggregateFunctionExpr { self.fun.create_groups_accumulator(args) } - pub fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - if self.ordering_req.is_empty() { - return None; - } - - if !self.order_sensitivity().is_insensitive() { - return Some(&self.ordering_req); - } - - None - } - - pub fn order_sensitivity(&self) -> AggregateOrderSensitivity { - if !self.ordering_req.is_empty() { - // If there is requirement, use the sensitivity of the implementation - self.fun.order_sensitivity() - } else { - // If no requirement, aggregator is order insensitive - AggregateOrderSensitivity::Insensitive - } - } - - pub fn with_beneficial_ordering( - self: Arc, - beneficial_ordering: bool, - ) -> Result>> { - let Some(updated_fn) = self - .fun - .clone() - .with_beneficial_ordering(beneficial_ordering)? - else { - return Ok(None); - }; - - AggregateExprBuilder::new(Arc::new(updated_fn), self.args.to_vec()) - .order_by(self.ordering_req.to_vec()) - .schema(Arc::new(self.schema.clone())) - .alias(self.name().to_string()) - .with_ignore_nulls(self.ignore_nulls) - .with_distinct(self.is_distinct) - .with_reversed(self.is_reversed) - .build() - .map(Some) - } - + /// Construct an expression that calculates the aggregate in reverse. + /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). + /// For aggregates that do not support calculation in reverse, + /// returns None (which is the default value). pub fn reverse_expr(&self) -> Option> { match self.fun.reverse_udf() { ReversedUDAF::NotSupported => None, @@ -438,27 +489,25 @@ impl AggregateFunctionExpr { } } + /// If this function is max, return (output_field, true) + /// if the function is min, return (output_field, false) + /// otherwise return None (the default) + /// + /// output_field is the name of the column produced by this aggregate + /// + /// Note: this is used to use special aggregate implementations in certain conditions pub fn get_minmax_desc(&self) -> Option<(Field, bool)> { self.fun .is_descending() .and_then(|flag| self.field().ok().map(|f| (f, flag))) } + /// Returns default value of the function given the input is Null + /// Most of the aggregate function return Null if input is Null, + /// while `count` returns 0 if input is Null pub fn default_value(&self, data_type: &DataType) -> Result { self.fun.default_value(data_type) } - - pub fn is_nullable(&self) -> bool { - self.is_nullable - } - - pub fn expressions(&self) -> Vec> { - self.args.clone() - } - - pub fn name(&self) -> &str { - &self.name - } } impl PartialEq for AggregateFunctionExpr { From a5a95707724cb035113252b9afc95ad6bf3d2679 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Thu, 22 Aug 2024 17:11:23 +0800 Subject: [PATCH 12/13] Add functions back --- datafusion/physical-expr/src/aggregate.rs | 34 +++++++++++++++++++ .../src/window/sliding_aggregate.rs | 25 ++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 51f3313910c2..683bf2a035de 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -489,6 +489,32 @@ impl AggregateFunctionExpr { } } + /// Returns all expressions used in the [`AggregateExpr`]. + /// These expressions are (1)function arguments, (2) order by expressions. + pub fn all_expressions(&self) -> AggregatePhysicalExpressions { + let args = self.expressions(); + let order_bys = self.order_bys().unwrap_or(&[]); + let order_by_exprs = order_bys + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + AggregatePhysicalExpressions { + args, + order_by_exprs, + } + } + + /// Rewrites [`AggregateExpr`], with new expressions given. The argument should be consistent + /// with the return value of the [`AggregateExpr::all_expressions`] method. + /// Returns `Some(Arc)` if re-write is supported, otherwise returns `None`. + pub fn with_new_expressions( + &self, + _args: Vec>, + _order_by_exprs: Vec>, + ) -> Option> { + None + } + /// If this function is max, return (output_field, true) /// if the function is min, return (output_field, false) /// otherwise return None (the default) @@ -510,6 +536,14 @@ impl AggregateFunctionExpr { } } +/// Stores the physical expressions used inside the `AggregateExpr`. +pub struct AggregatePhysicalExpressions { + /// Aggregate function arguments + pub args: Vec>, + /// Order by expressions + pub order_by_exprs: Vec>, +} + impl PartialEq for AggregateFunctionExpr { fn eq(&self, other: &Self) -> bool { self.name == other.name diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index e25776ce0b6e..ac3a4f4c09ec 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -140,6 +140,31 @@ impl WindowExpr for SlidingAggregateWindowExpr { fn uses_bounded_memory(&self) -> bool { !self.window_frame.end_bound.is_unbounded() } + + fn with_new_expressions( + &self, + args: Vec>, + partition_bys: Vec>, + order_by_exprs: Vec>, + ) -> Option> { + debug_assert_eq!(self.order_by.len(), order_by_exprs.len()); + + let new_order_by = self + .order_by + .iter() + .zip(order_by_exprs) + .map(|(req, new_expr)| PhysicalSortExpr { + expr: new_expr, + options: req.options, + }) + .collect::>(); + Some(Arc::new(SlidingAggregateWindowExpr { + aggregate: self.aggregate.with_new_expressions(args, vec![])?, + partition_by: partition_bys, + order_by: new_order_by, + window_frame: Arc::clone(&self.window_frame), + })) + } } impl AggregateWindowExpr for SlidingAggregateWindowExpr { From 70ea79c3eba9be8d6672085ea6348c9f7bcdda2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Thu, 22 Aug 2024 17:22:57 +0800 Subject: [PATCH 13/13] Fix doc --- datafusion/physical-expr/src/aggregate.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 683bf2a035de..5c1216f2a386 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -489,7 +489,7 @@ impl AggregateFunctionExpr { } } - /// Returns all expressions used in the [`AggregateExpr`]. + /// Returns all expressions used in the [`AggregateFunctionExpr`]. /// These expressions are (1)function arguments, (2) order by expressions. pub fn all_expressions(&self) -> AggregatePhysicalExpressions { let args = self.expressions(); @@ -504,8 +504,8 @@ impl AggregateFunctionExpr { } } - /// Rewrites [`AggregateExpr`], with new expressions given. The argument should be consistent - /// with the return value of the [`AggregateExpr::all_expressions`] method. + /// Rewrites [`AggregateFunctionExpr`], with new expressions given. The argument should be consistent + /// with the return value of the [`AggregateFunctionExpr::all_expressions`] method. /// Returns `Some(Arc)` if re-write is supported, otherwise returns `None`. pub fn with_new_expressions( &self,