Skip to content

Commit

Permalink
removed LexOrdering::from_ref instead using clone and created LexOrde…
Browse files Browse the repository at this point in the history
…ring::empty() fn
  • Loading branch information
jatin510 committed Nov 3, 2024
1 parent 1ac5cb3 commit a134a34
Show file tree
Hide file tree
Showing 19 changed files with 94 additions and 110 deletions.
122 changes: 55 additions & 67 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,88 +70,76 @@ impl RunOpt {
let sort_cases = vec![
(
"sort utf8",
LexOrdering {
inner: vec![PhysicalSortExpr {
expr: col("request_method", &schema)?,
options: Default::default(),
}],
},
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("request_method", &schema)?,
options: Default::default(),
}]),
),
(
"sort int",
LexOrdering {
inner: vec![PhysicalSortExpr {
expr: col("response_bytes", &schema)?,
options: Default::default(),
}],
},
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("response_bytes", &schema)?,
options: Default::default(),
}]),
),
(
"sort decimal",
LexOrdering {
inner: vec![PhysicalSortExpr {
expr: col("decimal_price", &schema)?,
options: Default::default(),
}],
},
LexOrdering::new(vec![PhysicalSortExpr {
expr: col("decimal_price", &schema)?,
options: Default::default(),
}]),
),
(
"sort integer tuple",
LexOrdering {
inner: vec![
PhysicalSortExpr {
expr: col("request_bytes", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("response_bytes", &schema)?,
options: Default::default(),
},
],
},
LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("request_bytes", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("response_bytes", &schema)?,
options: Default::default(),
},
]),
),
(
"sort utf8 tuple",
LexOrdering {
inner: vec![
// sort utf8 tuple
PhysicalSortExpr {
expr: col("service", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("host", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("pod", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("image", &schema)?,
options: Default::default(),
},
],
},
LexOrdering::new(vec![
// sort utf8 tuple
PhysicalSortExpr {
expr: col("service", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("host", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("pod", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("image", &schema)?,
options: Default::default(),
},
]),
),
(
"sort mixed tuple",
LexOrdering {
inner: vec![
PhysicalSortExpr {
expr: col("service", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("request_bytes", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("decimal_price", &schema)?,
options: Default::default(),
},
],
},
LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("service", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("request_bytes", &schema)?,
options: Default::default(),
},
PhysicalSortExpr {
expr: col("decimal_price", &schema)?,
options: Default::default(),
},
]),
),
];
for (title, expr) in sort_cases {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl MinMaxStatistics {
Ok(Self {
min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
sort_order: LexOrdering::from_ref(sort_order),
sort_order: sort_order.clone(),
})
}

Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,10 @@ fn analyze_immediate_sort_removal(
let sort_input = sort_exec.input();
// If this sort is unnecessary, we should remove it:
if sort_input.equivalence_properties().ordering_satisfy(
&sort_exec
sort_exec
.properties()
.output_ordering()
.cloned()
.unwrap_or_default(),
.unwrap_or(LexOrdering::empty()),
) {
node.plan = if !sort_exec.preserve_partitioning()
&& sort_input.output_partitioning().partition_count() > 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,10 @@ pub(crate) fn replace_with_order_preserving_variants(
.plan
.equivalence_properties()
.ordering_satisfy(
&requirements
requirements
.plan
.output_ordering()
.cloned()
.unwrap_or_default(),
.unwrap_or(LexOrdering::empty()),
)
{
for child in alternate_plan.children.iter_mut() {
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,8 @@ fn try_pushdown_requirements_to_join(

let (new_left_ordering, new_right_ordering) = match push_side {
JoinSide::Left => {
let left_eq_properties = left_eq_properties
.clone()
.with_reorder(LexOrdering::from_ref(sort_expr));
let left_eq_properties =
left_eq_properties.clone().with_reorder(sort_expr.clone());
if left_eq_properties
.ordering_satisfy_requirement(&left_requirement.unwrap_or_default())
{
Expand All @@ -385,9 +384,8 @@ fn try_pushdown_requirements_to_join(
}
}
JoinSide::Right => {
let right_eq_properties = right_eq_properties
.clone()
.with_reorder(LexOrdering::from_ref(sort_expr));
let right_eq_properties =
right_eq_properties.clone().with_reorder(sort_expr.clone());
if right_eq_properties
.ordering_satisfy_requirement(&right_requirement.unwrap_or_default())
{
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl AggregateUDFImpl for ArrayAgg {
OrderSensitiveArrayAggAccumulator::try_new(
&data_type,
&ordering_dtypes,
LexOrdering::from_ref(acc_args.ordering_req),
acc_args.ordering_req.clone(),
acc_args.is_reversed,
)
.map(|acc| Box::new(acc) as _)
Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl AggregateUDFImpl for FirstValue {
FirstValueAccumulator::try_new(
acc_args.return_type,
&ordering_dtypes,
LexOrdering::from_ref(acc_args.ordering_req),
acc_args.ordering_req.clone(),
acc_args.ignore_nulls,
)
.map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
Expand Down Expand Up @@ -455,7 +455,7 @@ impl AggregateUDFImpl for LastValue {
LastValueAccumulator::try_new(
acc_args.return_type,
&ordering_dtypes,
LexOrdering::from_ref(acc_args.ordering_req),
acc_args.ordering_req.clone(),
acc_args.ignore_nulls,
)
.map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl AggregateUDFImpl for NthValueAgg {
n,
&data_type,
&ordering_dtypes,
LexOrdering::from_ref(acc_args.ordering_req),
acc_args.ordering_req.clone(),
)
.map(|acc| Box::new(acc) as _)
}
Expand Down
13 changes: 8 additions & 5 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::fmt;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::ops::{Deref, Index, Range, RangeFrom, RangeTo};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use std::vec::IntoIter;

use arrow::compute::kernels::sort::{SortColumn, SortOptions};
Expand Down Expand Up @@ -352,18 +352,21 @@ impl AsRef<LexOrdering> for LexOrdering {
}
}

static EMPTY_ORDER: OnceLock<LexOrdering> = OnceLock::new();

impl LexOrdering {
// Creates a new [`LexOrdering`] from a vector
pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
Self { inner }
}

pub fn capacity(&self) -> usize {
self.inner.capacity()
/// Return an empty LexOrdering (no expressions)
pub fn empty() -> &'static LexOrdering {
EMPTY_ORDER.get_or_init(LexOrdering::default)
}

pub fn from_ref(lex_ordering_ref: &LexOrdering) -> Self {
Self::new(lex_ordering_ref.to_vec())
pub fn capacity(&self) -> usize {
self.inner.capacity()
}

pub fn clear(&mut self) {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl PlainAggregateWindowExpr {
Self {
aggregate,
partition_by: partition_by.to_vec(),
order_by: LexOrdering::from_ref(order_by),
order_by: order_by.clone(),
window_frame,
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/window/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl BuiltInWindowExpr {
Self {
expr,
partition_by: partition_by.to_vec(),
order_by: LexOrdering::from_ref(order_by),
order_by: order_by.clone(),
window_frame,
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl SlidingAggregateWindowExpr {
Self {
aggregate,
partition_by: partition_by.to_vec(),
order_by: LexOrdering::from_ref(order_by),
order_by: order_by.clone(),
window_frame,
}
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ fn offset_ordering(
options: sort_expr.options,
})
.collect(),
_ => LexOrdering::from_ref(ordering),
_ => ordering.clone(),
}
}

Expand All @@ -503,7 +503,7 @@ pub fn calculate_join_output_ordering(
if join_type == JoinType::Inner && probe_side == Some(JoinSide::Left) {
replace_on_columns_of_right_ordering(
on_columns,
&mut LexOrdering::from_ref(right_ordering),
&mut right_ordering.clone(),
)
.ok()?;
merge_vectors(
Expand All @@ -512,15 +512,15 @@ pub fn calculate_join_output_ordering(
.as_ref(),
)
} else {
LexOrdering::from_ref(left_ordering)
left_ordering.clone()
}
}
[false, true] => {
// Special case, we can prefix ordering of left side with the ordering of right side.
if join_type == JoinType::Inner && probe_side == Some(JoinSide::Right) {
replace_on_columns_of_right_ordering(
on_columns,
&mut LexOrdering::from_ref(right_ordering),
&mut right_ordering.clone(),
)
.ok()?;
merge_vectors(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ impl DisplayAs for RepartitionExec {
}

if let Some(sort_exprs) = self.sort_exprs() {
write!(f, ", sort_exprs={}", LexOrdering::from_ref(sort_exprs))?;
write!(f, ", sort_exprs={}", sort_exprs.clone())?;
}
Ok(())
}
Expand Down
5 changes: 1 addition & 4 deletions datafusion/physical-plan/src/sorts/streaming_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use arrow_array::*;
use datafusion_common::{internal_err, Result};
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use std::sync::OnceLock;

macro_rules! primitive_merge_helper {
($t:ty, $($v:ident),+) => {
Expand Down Expand Up @@ -63,14 +62,12 @@ pub struct StreamingMergeBuilder<'a> {
enable_round_robin_tie_breaker: bool,
}

static EMPTY_ORDER: OnceLock<LexOrdering> = OnceLock::new();

impl<'a> Default for StreamingMergeBuilder<'a> {
fn default() -> Self {
Self {
streams: vec![],
schema: None,
expressions: EMPTY_ORDER.get_or_init(LexOrdering::default),
expressions: LexOrdering::empty(),
metrics: None,
batch_size: None,
fetch: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ mod tests {
Arc::new(BuiltInWindowExpr::new(
last_value_func,
&[],
LexOrdering::default().as_ref(),
&LexOrdering::default(),
Arc::new(WindowFrame::new_bounds(
WindowFrameUnits::Rows,
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
Expand All @@ -1563,7 +1563,7 @@ mod tests {
Arc::new(BuiltInWindowExpr::new(
nth_value_func1,
&[],
LexOrdering::default().as_ref(),
&LexOrdering::default(),
Arc::new(WindowFrame::new_bounds(
WindowFrameUnits::Rows,
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
Expand All @@ -1574,7 +1574,7 @@ mod tests {
Arc::new(BuiltInWindowExpr::new(
nth_value_func2,
&[],
LexOrdering::default().as_ref(),
&LexOrdering::default(),
Arc::new(WindowFrame::new_bounds(
WindowFrameUnits::Rows,
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
Expand Down
Loading

0 comments on commit a134a34

Please sign in to comment.