From a134a34ddfdb711304c88a8e261d2f7584a29331 Mon Sep 17 00:00:00 2001 From: jatin Date: Sun, 3 Nov 2024 19:13:51 +0530 Subject: [PATCH] removed LexOrdering::from_ref instead using clone and created LexOrdering::empty() fn --- benchmarks/src/sort.rs | 122 ++++++++---------- .../datasource/physical_plan/statistics.rs | 2 +- .../src/physical_optimizer/enforce_sorting.rs | 5 +- .../replace_with_order_preserving_variants.rs | 5 +- .../src/physical_optimizer/sort_pushdown.rs | 10 +- .../functions-aggregate/src/array_agg.rs | 2 +- .../functions-aggregate/src/first_last.rs | 4 +- .../functions-aggregate/src/nth_value.rs | 2 +- .../physical-expr-common/src/sort_expr.rs | 13 +- .../physical-expr/src/window/aggregate.rs | 2 +- .../physical-expr/src/window/built_in.rs | 2 +- .../src/window/sliding_aggregate.rs | 2 +- datafusion/physical-plan/src/joins/utils.rs | 8 +- .../physical-plan/src/repartition/mod.rs | 2 +- .../src/sorts/streaming_merge.rs | 5 +- .../src/windows/bounded_window_agg_exec.rs | 6 +- datafusion/physical-plan/src/windows/mod.rs | 2 +- .../proto/src/physical_plan/to_proto.rs | 2 +- .../tests/cases/roundtrip_physical_plan.rs | 8 +- 19 files changed, 94 insertions(+), 110 deletions(-) diff --git a/benchmarks/src/sort.rs b/benchmarks/src/sort.rs index 8c6073cb26a0..f4b707611cfb 100644 --- a/benchmarks/src/sort.rs +++ b/benchmarks/src/sort.rs @@ -70,88 +70,76 @@ impl RunOpt { let sort_cases = vec![ ( "sort utf8", - LexOrdering { - inner: vec![PhysicalSortExpr { - expr: col("request_method", &schema)?, - options: Default::default(), - }], - }, + LexOrdering::new(vec![PhysicalSortExpr { + expr: col("request_method", &schema)?, + options: Default::default(), + }]), ), ( "sort int", - LexOrdering { - inner: vec![PhysicalSortExpr { - expr: col("response_bytes", &schema)?, - options: Default::default(), - }], - }, + LexOrdering::new(vec![PhysicalSortExpr { + expr: col("response_bytes", &schema)?, + options: Default::default(), + }]), ), ( "sort decimal", - LexOrdering { - inner: vec![PhysicalSortExpr { - expr: col("decimal_price", &schema)?, - options: Default::default(), - }], - }, + LexOrdering::new(vec![PhysicalSortExpr { + expr: col("decimal_price", &schema)?, + options: Default::default(), + }]), ), ( "sort integer tuple", - LexOrdering { - inner: vec![ - PhysicalSortExpr { - expr: col("request_bytes", &schema)?, - options: Default::default(), - }, - PhysicalSortExpr { - expr: col("response_bytes", &schema)?, - options: Default::default(), - }, - ], - }, + LexOrdering::new(vec![ + PhysicalSortExpr { + expr: col("request_bytes", &schema)?, + options: Default::default(), + }, + PhysicalSortExpr { + expr: col("response_bytes", &schema)?, + options: Default::default(), + }, + ]), ), ( "sort utf8 tuple", - LexOrdering { - inner: vec![ - // sort utf8 tuple - PhysicalSortExpr { - expr: col("service", &schema)?, - options: Default::default(), - }, - PhysicalSortExpr { - expr: col("host", &schema)?, - options: Default::default(), - }, - PhysicalSortExpr { - expr: col("pod", &schema)?, - options: Default::default(), - }, - PhysicalSortExpr { - expr: col("image", &schema)?, - options: Default::default(), - }, - ], - }, + LexOrdering::new(vec![ + // sort utf8 tuple + PhysicalSortExpr { + expr: col("service", &schema)?, + options: Default::default(), + }, + PhysicalSortExpr { + expr: col("host", &schema)?, + options: Default::default(), + }, + PhysicalSortExpr { + expr: col("pod", &schema)?, + options: Default::default(), + }, + PhysicalSortExpr { + expr: col("image", &schema)?, + options: Default::default(), + }, + ]), ), ( "sort mixed tuple", - LexOrdering { - inner: vec![ - PhysicalSortExpr { - expr: col("service", &schema)?, - options: Default::default(), - }, - PhysicalSortExpr { - expr: col("request_bytes", &schema)?, - options: Default::default(), - }, - PhysicalSortExpr { - expr: col("decimal_price", &schema)?, - options: Default::default(), - }, - ], - }, + LexOrdering::new(vec![ + PhysicalSortExpr { + expr: col("service", &schema)?, + options: Default::default(), + }, + PhysicalSortExpr { + expr: col("request_bytes", &schema)?, + options: Default::default(), + }, + PhysicalSortExpr { + expr: col("decimal_price", &schema)?, + options: Default::default(), + }, + ]), ), ]; for (title, expr) in sort_cases { diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs b/datafusion/core/src/datasource/physical_plan/statistics.rs index 1a41e78537a8..488098e7861c 100644 --- a/datafusion/core/src/datasource/physical_plan/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/statistics.rs @@ -259,7 +259,7 @@ impl MinMaxStatistics { Ok(Self { min_by_sort_order: min.map_err(|e| e.context("build min rows"))?, max_by_sort_order: max.map_err(|e| e.context("build max rows"))?, - sort_order: LexOrdering::from_ref(sort_order), + sort_order: sort_order.clone(), }) } diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index d5e5468caa54..adc3d7cac10c 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -392,11 +392,10 @@ fn analyze_immediate_sort_removal( let sort_input = sort_exec.input(); // If this sort is unnecessary, we should remove it: if sort_input.equivalence_properties().ordering_satisfy( - &sort_exec + sort_exec .properties() .output_ordering() - .cloned() - .unwrap_or_default(), + .unwrap_or(LexOrdering::empty()), ) { node.plan = if !sort_exec.preserve_partitioning() && sort_input.output_partitioning().partition_count() > 1 diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index ffbc68cc5ad5..c80aea411f57 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -260,11 +260,10 @@ pub(crate) fn replace_with_order_preserving_variants( .plan .equivalence_properties() .ordering_satisfy( - &requirements + requirements .plan .output_ordering() - .cloned() - .unwrap_or_default(), + .unwrap_or(LexOrdering::empty()), ) { for child in alternate_plan.children.iter_mut() { diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index fc540cd1b394..5d0ade297c11 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -372,9 +372,8 @@ fn try_pushdown_requirements_to_join( let (new_left_ordering, new_right_ordering) = match push_side { JoinSide::Left => { - let left_eq_properties = left_eq_properties - .clone() - .with_reorder(LexOrdering::from_ref(sort_expr)); + let left_eq_properties = + left_eq_properties.clone().with_reorder(sort_expr.clone()); if left_eq_properties .ordering_satisfy_requirement(&left_requirement.unwrap_or_default()) { @@ -385,9 +384,8 @@ fn try_pushdown_requirements_to_join( } } JoinSide::Right => { - let right_eq_properties = right_eq_properties - .clone() - .with_reorder(LexOrdering::from_ref(sort_expr)); + let right_eq_properties = + right_eq_properties.clone().with_reorder(sort_expr.clone()); if right_eq_properties .ordering_satisfy_requirement(&right_requirement.unwrap_or_default()) { diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 7c22c21e38c9..252a07cb11d8 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -135,7 +135,7 @@ impl AggregateUDFImpl for ArrayAgg { OrderSensitiveArrayAggAccumulator::try_new( &data_type, &ordering_dtypes, - LexOrdering::from_ref(acc_args.ordering_req), + acc_args.ordering_req.clone(), acc_args.is_reversed, ) .map(|acc| Box::new(acc) as _) diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index db4d035c6842..3ca1422668e0 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -130,7 +130,7 @@ impl AggregateUDFImpl for FirstValue { FirstValueAccumulator::try_new( acc_args.return_type, &ordering_dtypes, - LexOrdering::from_ref(acc_args.ordering_req), + acc_args.ordering_req.clone(), acc_args.ignore_nulls, ) .map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _) @@ -455,7 +455,7 @@ impl AggregateUDFImpl for LastValue { LastValueAccumulator::try_new( acc_args.return_type, &ordering_dtypes, - LexOrdering::from_ref(acc_args.ordering_req), + acc_args.ordering_req.clone(), acc_args.ignore_nulls, ) .map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _) diff --git a/datafusion/functions-aggregate/src/nth_value.rs b/datafusion/functions-aggregate/src/nth_value.rs index 5f3a8cf2f161..f3e892fa73d8 100644 --- a/datafusion/functions-aggregate/src/nth_value.rs +++ b/datafusion/functions-aggregate/src/nth_value.rs @@ -133,7 +133,7 @@ impl AggregateUDFImpl for NthValueAgg { n, &data_type, &ordering_dtypes, - LexOrdering::from_ref(acc_args.ordering_req), + acc_args.ordering_req.clone(), ) .map(|acc| Box::new(acc) as _) } diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index f9d17f55c89d..f91d583215b3 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -22,7 +22,7 @@ use std::fmt; use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; use std::ops::{Deref, Index, Range, RangeFrom, RangeTo}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use std::vec::IntoIter; use arrow::compute::kernels::sort::{SortColumn, SortOptions}; @@ -352,18 +352,21 @@ impl AsRef for LexOrdering { } } +static EMPTY_ORDER: OnceLock = OnceLock::new(); + impl LexOrdering { // Creates a new [`LexOrdering`] from a vector pub fn new(inner: Vec) -> Self { Self { inner } } - pub fn capacity(&self) -> usize { - self.inner.capacity() + /// Return an empty LexOrdering (no expressions) + pub fn empty() -> &'static LexOrdering { + EMPTY_ORDER.get_or_init(LexOrdering::default) } - pub fn from_ref(lex_ordering_ref: &LexOrdering) -> Self { - Self::new(lex_ordering_ref.to_vec()) + pub fn capacity(&self) -> usize { + self.inner.capacity() } pub fn clear(&mut self) { diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 0693866764db..0c56bdc92985 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -58,7 +58,7 @@ impl PlainAggregateWindowExpr { Self { aggregate, partition_by: partition_by.to_vec(), - order_by: LexOrdering::from_ref(order_by), + order_by: order_by.clone(), window_frame, } } diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index 7173835c74bc..0f6c3f921892 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -55,7 +55,7 @@ impl BuiltInWindowExpr { Self { expr, partition_by: partition_by.to_vec(), - order_by: LexOrdering::from_ref(order_by), + order_by: order_by.clone(), window_frame, } } diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index fbb5e8f15c05..572eb8866a44 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -58,7 +58,7 @@ impl SlidingAggregateWindowExpr { Self { aggregate, partition_by: partition_by.to_vec(), - order_by: LexOrdering::from_ref(order_by), + order_by: order_by.clone(), window_frame, } } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 2502e64a7a69..0623f5418d42 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -483,7 +483,7 @@ fn offset_ordering( options: sort_expr.options, }) .collect(), - _ => LexOrdering::from_ref(ordering), + _ => ordering.clone(), } } @@ -503,7 +503,7 @@ pub fn calculate_join_output_ordering( if join_type == JoinType::Inner && probe_side == Some(JoinSide::Left) { replace_on_columns_of_right_ordering( on_columns, - &mut LexOrdering::from_ref(right_ordering), + &mut right_ordering.clone(), ) .ok()?; merge_vectors( @@ -512,7 +512,7 @@ pub fn calculate_join_output_ordering( .as_ref(), ) } else { - LexOrdering::from_ref(left_ordering) + left_ordering.clone() } } [false, true] => { @@ -520,7 +520,7 @@ pub fn calculate_join_output_ordering( if join_type == JoinType::Inner && probe_side == Some(JoinSide::Right) { replace_on_columns_of_right_ordering( on_columns, - &mut LexOrdering::from_ref(right_ordering), + &mut right_ordering.clone(), ) .ok()?; merge_vectors( diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 2988150c3f76..410e84067f65 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -503,7 +503,7 @@ impl DisplayAs for RepartitionExec { } if let Some(sort_exprs) = self.sort_exprs() { - write!(f, ", sort_exprs={}", LexOrdering::from_ref(sort_exprs))?; + write!(f, ", sort_exprs={}", sort_exprs.clone())?; } Ok(()) } diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 345cd5c2bbcc..4350235ef47d 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -29,7 +29,6 @@ use arrow_array::*; use datafusion_common::{internal_err, Result}; use datafusion_execution::memory_pool::MemoryReservation; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use std::sync::OnceLock; macro_rules! primitive_merge_helper { ($t:ty, $($v:ident),+) => { @@ -63,14 +62,12 @@ pub struct StreamingMergeBuilder<'a> { enable_round_robin_tie_breaker: bool, } -static EMPTY_ORDER: OnceLock = OnceLock::new(); - impl<'a> Default for StreamingMergeBuilder<'a> { fn default() -> Self { Self { streams: vec![], schema: None, - expressions: EMPTY_ORDER.get_or_init(LexOrdering::default), + expressions: LexOrdering::empty(), metrics: None, batch_size: None, fetch: None, diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 4c1d13a3e12d..65603192fd08 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1552,7 +1552,7 @@ mod tests { Arc::new(BuiltInWindowExpr::new( last_value_func, &[], - LexOrdering::default().as_ref(), + &LexOrdering::default(), Arc::new(WindowFrame::new_bounds( WindowFrameUnits::Rows, WindowFrameBound::Preceding(ScalarValue::UInt64(None)), @@ -1563,7 +1563,7 @@ mod tests { Arc::new(BuiltInWindowExpr::new( nth_value_func1, &[], - LexOrdering::default().as_ref(), + &LexOrdering::default(), Arc::new(WindowFrame::new_bounds( WindowFrameUnits::Rows, WindowFrameBound::Preceding(ScalarValue::UInt64(None)), @@ -1574,7 +1574,7 @@ mod tests { Arc::new(BuiltInWindowExpr::new( nth_value_func2, &[], - LexOrdering::default().as_ref(), + &LexOrdering::default(), Arc::new(WindowFrame::new_bounds( WindowFrameUnits::Rows, WindowFrameBound::Preceding(ScalarValue::UInt64(None)), diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index c4427bf8fffd..da7f6d79e578 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -699,7 +699,7 @@ mod tests { "count".to_owned(), &[col("a", &schema)?], &[], - LexOrdering::default().as_ref(), + &LexOrdering::default(), Arc::new(WindowFrame::new(None)), schema.as_ref(), false, diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 4bf7e353326e..dc94ad075c53 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -53,7 +53,7 @@ pub fn serialize_physical_aggr_expr( ) -> Result { let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?; let ordering_req = match aggr_expr.order_bys() { - Some(order) => LexOrdering::from_ref(order), + Some(order) => order.clone(), None => LexOrdering::default(), }; let ordering_req = serialize_physical_sort_exprs(ordering_req, codec)?; diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 1797c0ff3ec5..8c8dcccee376 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -309,7 +309,7 @@ fn roundtrip_window() -> Result<()> { .build() .map(Arc::new)?, &[], - LexOrdering::default().as_ref(), + &LexOrdering::default(), Arc::new(WindowFrame::new(None)), )); @@ -329,7 +329,7 @@ fn roundtrip_window() -> Result<()> { let sliding_aggr_window_expr = Arc::new(SlidingAggregateWindowExpr::new( sum_expr, &[], - LexOrdering::default().as_ref(), + &LexOrdering::default(), Arc::new(window_frame), )); @@ -1015,7 +1015,7 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> { vec![Arc::new(PlainAggregateWindowExpr::new( aggr_expr.clone(), &[col("author", &schema)?], - LexOrdering::default().as_ref(), + &LexOrdering::default(), Arc::new(WindowFrame::new(None)), ))], filter, @@ -1076,7 +1076,7 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { vec![Arc::new(PlainAggregateWindowExpr::new( aggr_expr, &[col("author", &schema)?], - LexOrdering::default().as_ref(), + &LexOrdering::default(), Arc::new(WindowFrame::new(None)), ))], filter,