From 016ed03cf18c3f60594cf6f8fedd202561c27a91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Sun, 1 Sep 2024 15:58:53 +0800 Subject: [PATCH] Remove redundant result of AggregateFunctionExpr::field (#12258) --- datafusion/physical-expr/src/aggregate.rs | 12 +++--------- datafusion/physical-expr/src/window/aggregate.rs | 4 ++-- .../physical-expr/src/window/sliding_aggregate.rs | 4 ++-- .../physical-optimizer/src/aggregate_statistics.rs | 4 ++-- datafusion/physical-plan/src/aggregates/mod.rs | 2 +- datafusion/proto/src/physical_plan/mod.rs | 7 ++----- 6 files changed, 12 insertions(+), 21 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 5c1216f2a386..d62dc27ece86 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -253,12 +253,8 @@ impl AggregateFunctionExpr { } /// the field of the final result of this aggregation. - pub fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.is_nullable, - )) + pub fn field(&self) -> Field { + Field::new(&self.name, self.data_type.clone(), self.is_nullable) } /// the accumulator used to accumulate values from the expressions. @@ -523,9 +519,7 @@ impl AggregateFunctionExpr { /// /// Note: this is used to use special aggregate implementations in certain conditions pub fn get_minmax_desc(&self) -> Option<(Field, bool)> { - self.fun - .is_descending() - .and_then(|flag| self.field().ok().map(|f| (f, flag))) + self.fun.is_descending().map(|flag| (self.field(), flag)) } /// Returns default value of the function given the input is Null diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 5439e140502a..1cc08a4e99aa 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -79,7 +79,7 @@ impl WindowExpr for PlainAggregateWindowExpr { } fn field(&self) -> Result { - self.aggregate.field() + Ok(self.aggregate.field()) } fn name(&self) -> &str { @@ -177,7 +177,7 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr { ) -> Result { if cur_range.start == cur_range.end { self.aggregate - .default_value(self.aggregate.field()?.data_type()) + .default_value(self.aggregate.field().data_type()) } else { // Accumulate any new rows that have entered the window: let update_bound = cur_range.end - last_range.end; diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index ac3a4f4c09ec..b3848e15ee42 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -81,7 +81,7 @@ impl WindowExpr for SlidingAggregateWindowExpr { } fn field(&self) -> Result { - self.aggregate.field() + Ok(self.aggregate.field()) } fn name(&self) -> &str { @@ -183,7 +183,7 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr { ) -> Result { if cur_range.start == cur_range.end { self.aggregate - .default_value(self.aggregate.field()?.data_type()) + .default_value(self.aggregate.field().data_type()) } else { // Accumulate any new rows that have entered the window: let update_bound = cur_range.end - last_range.end; diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 2b8725b5bac7..863c5ab2d288 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -183,7 +183,7 @@ fn take_optimizable_min( // MIN/MAX with 0 rows is always null if is_min(agg_expr) { if let Ok(min_data_type) = - ScalarValue::try_from(agg_expr.field().unwrap().data_type()) + ScalarValue::try_from(agg_expr.field().data_type()) { return Some((min_data_type, agg_expr.name().to_string())); } @@ -229,7 +229,7 @@ fn take_optimizable_max( // MIN/MAX with 0 rows is always null if is_max(agg_expr) { if let Ok(max_data_type) = - ScalarValue::try_from(agg_expr.field().unwrap().data_type()) + ScalarValue::try_from(agg_expr.field().data_type()) { return Some((max_data_type, agg_expr.name().to_string())); } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 0f33a9d7b992..2f974f1ef4e0 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -821,7 +821,7 @@ fn create_schema( | AggregateMode::SinglePartitioned => { // in final mode, the field with the final result of the accumulator for expr in aggr_expr { - fields.push(expr.field()?) + fields.push(expr.field()) } } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 04cbf8b537b3..e622af745062 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1444,11 +1444,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let agg_names = exec .aggr_expr() .iter() - .map(|expr| match expr.field() { - Ok(field) => Ok(field.name().clone()), - Err(e) => Err(e), - }) - .collect::>()?; + .map(|expr| expr.name().to_string()) + .collect::>(); let agg_mode = match exec.mode() { AggregateMode::Partial => protobuf::AggregateMode::Partial,