Skip to content

Commit ea0686b

Browse files
Window Functions Order Conservation -- Follow-up On Set Monotonicity (#14813)
* case 0 dbg * dbg case 4 * dbg case 9 * Update enforce_sorting.rs * dbg case 10-11 * dbg case 19 * dbg 24 * dbg 48 * final * Update enforce_sorting.rs * clippy * fix the existing test * Update ordering.rs * clean-up * simplify partial constantness * Update ordering.rs * Review --------- Co-authored-by: Mehmet Ozan Kabak <[email protected]>
1 parent f2cdc14 commit ea0686b

File tree

8 files changed

+1596
-353
lines changed

8 files changed

+1596
-353
lines changed

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

+1,304-239
Large diffs are not rendered by default.

datafusion/core/tests/physical_optimizer/test_utils.rs

+7-61
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ use datafusion_datasource::file_scan_config::FileScanConfig;
3737
use datafusion_execution::object_store::ObjectStoreUrl;
3838
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3939
use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
40-
use datafusion_functions_aggregate::average::avg_udaf;
4140
use datafusion_functions_aggregate::count::count_udaf;
4241
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
4342
use datafusion_physical_expr::expressions::col;
4443
use datafusion_physical_expr::{expressions, PhysicalExpr};
45-
use datafusion_physical_expr_common::sort_expr::LexRequirement;
46-
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
44+
use datafusion_physical_expr_common::sort_expr::{
45+
LexOrdering, LexRequirement, PhysicalSortExpr,
46+
};
4747
use datafusion_physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggregation;
4848
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4949
use datafusion_physical_plan::aggregates::{
@@ -62,11 +62,10 @@ use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
6262
use datafusion_physical_plan::tree_node::PlanContext;
6363
use datafusion_physical_plan::union::UnionExec;
6464
use datafusion_physical_plan::windows::{create_window_expr, BoundedWindowAggExec};
65-
use datafusion_physical_plan::ExecutionPlan;
6665
use datafusion_physical_plan::{
67-
displayable, DisplayAs, DisplayFormatType, PlanProperties,
66+
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, InputOrderMode,
67+
Partitioning, PlanProperties,
6868
};
69-
use datafusion_physical_plan::{InputOrderMode, Partitioning};
7069

7170
/// Create a non sorted parquet exec
7271
pub fn parquet_exec(schema: &SchemaRef) -> Arc<DataSourceExec> {
@@ -128,17 +127,6 @@ pub fn create_test_schema3() -> Result<SchemaRef> {
128127
Ok(schema)
129128
}
130129

131-
// Generate a schema which consists of 5 columns (a, b, c, d, e) of Uint64
132-
pub fn create_test_schema4() -> Result<SchemaRef> {
133-
let a = Field::new("a", DataType::UInt64, true);
134-
let b = Field::new("b", DataType::UInt64, false);
135-
let c = Field::new("c", DataType::UInt64, true);
136-
let d = Field::new("d", DataType::UInt64, false);
137-
let e = Field::new("e", DataType::Int64, false);
138-
let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
139-
Ok(schema)
140-
}
141-
142130
pub fn sort_merge_join_exec(
143131
left: Arc<dyn ExecutionPlan>,
144132
right: Arc<dyn ExecutionPlan>,
@@ -207,33 +195,20 @@ pub fn bounded_window_exec(
207195
col_name: &str,
208196
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
209197
input: Arc<dyn ExecutionPlan>,
210-
) -> Arc<dyn ExecutionPlan> {
211-
bounded_window_exec_with_partition(col_name, sort_exprs, &[], input, false)
212-
}
213-
214-
pub fn bounded_window_exec_with_partition(
215-
col_name: &str,
216-
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
217-
partition_by: &[Arc<dyn PhysicalExpr>],
218-
input: Arc<dyn ExecutionPlan>,
219-
should_reverse: bool,
220198
) -> Arc<dyn ExecutionPlan> {
221199
let sort_exprs: LexOrdering = sort_exprs.into_iter().collect();
222200
let schema = input.schema();
223-
let mut window_expr = create_window_expr(
201+
let window_expr = create_window_expr(
224202
&WindowFunctionDefinition::AggregateUDF(count_udaf()),
225203
"count".to_owned(),
226204
&[col(col_name, &schema).unwrap()],
227-
partition_by,
205+
&[],
228206
sort_exprs.as_ref(),
229207
Arc::new(WindowFrame::new(Some(false))),
230208
schema.as_ref(),
231209
false,
232210
)
233211
.unwrap();
234-
if should_reverse {
235-
window_expr = window_expr.get_reverse_expr().unwrap();
236-
}
237212

238213
Arc::new(
239214
BoundedWindowAggExec::try_new(
@@ -246,35 +221,6 @@ pub fn bounded_window_exec_with_partition(
246221
)
247222
}
248223

249-
pub fn bounded_window_exec_non_set_monotonic(
250-
col_name: &str,
251-
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
252-
input: Arc<dyn ExecutionPlan>,
253-
) -> Arc<dyn ExecutionPlan> {
254-
let sort_exprs: LexOrdering = sort_exprs.into_iter().collect();
255-
let schema = input.schema();
256-
257-
Arc::new(
258-
BoundedWindowAggExec::try_new(
259-
vec![create_window_expr(
260-
&WindowFunctionDefinition::AggregateUDF(avg_udaf()),
261-
"avg".to_owned(),
262-
&[col(col_name, &schema).unwrap()],
263-
&[],
264-
sort_exprs.as_ref(),
265-
Arc::new(WindowFrame::new(Some(false))),
266-
schema.as_ref(),
267-
false,
268-
)
269-
.unwrap()],
270-
Arc::clone(&input),
271-
InputOrderMode::Sorted,
272-
false,
273-
)
274-
.unwrap(),
275-
)
276-
}
277-
278224
pub fn filter_exec(
279225
predicate: Arc<dyn PhysicalExpr>,
280226
input: Arc<dyn ExecutionPlan>,

datafusion/physical-expr-common/src/sort_expr.rs

+5
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,11 @@ impl LexOrdering {
361361
self.inner.clear()
362362
}
363363

364+
/// Takes ownership of the actual vector of `PhysicalSortExpr`s in the LexOrdering.
365+
pub fn take_exprs(self) -> Vec<PhysicalSortExpr> {
366+
self.inner
367+
}
368+
364369
/// Returns `true` if the LexOrdering contains `expr`
365370
pub fn contains(&self, expr: &PhysicalSortExpr) -> bool {
366371
self.inner.contains(expr)

datafusion/physical-expr/src/equivalence/class.rs

+23-20
Original file line numberDiff line numberDiff line change
@@ -456,17 +456,19 @@ impl EquivalenceGroup {
456456
/// The expression is replaced with the first expression in the equivalence
457457
/// class it matches with (if any).
458458
pub fn normalize_expr(&self, expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
459-
Arc::clone(&expr)
460-
.transform(|expr| {
461-
for cls in self.iter() {
462-
if cls.contains(&expr) {
463-
return Ok(Transformed::yes(cls.canonical_expr().unwrap()));
464-
}
459+
expr.transform(|expr| {
460+
for cls in self.iter() {
461+
if cls.contains(&expr) {
462+
// The unwrap below is safe because the guard above ensures
463+
// that the class is not empty.
464+
return Ok(Transformed::yes(cls.canonical_expr().unwrap()));
465465
}
466-
Ok(Transformed::no(expr))
467-
})
468-
.data()
469-
.unwrap_or(expr)
466+
}
467+
Ok(Transformed::no(expr))
468+
})
469+
.data()
470+
.unwrap()
471+
// The unwrap above is safe because the closure always returns `Ok`.
470472
}
471473

472474
/// Normalizes the given sort expression according to this group.
@@ -585,20 +587,21 @@ impl EquivalenceGroup {
585587
(new_class.len() > 1).then_some(EquivalenceClass::new(new_class))
586588
});
587589

588-
// the key is the source expression and the value is the EquivalenceClass that contains the target expression of the source expression.
589-
let mut new_classes: IndexMap<Arc<dyn PhysicalExpr>, EquivalenceClass> =
590-
IndexMap::new();
591-
mapping.iter().for_each(|(source, target)| {
592-
// We need to find equivalent projected expressions.
593-
// e.g. table with columns [a,b,c] and a == b, projection: [a+c, b+c].
594-
// To conclude that a + c == b + c we firsty normalize all source expressions
595-
// in the mapping, then merge all equivalent expressions into the classes.
590+
// The key is the source expression, and the value is the equivalence
591+
// class that contains the corresponding target expression.
592+
let mut new_classes: IndexMap<_, _> = IndexMap::new();
593+
for (source, target) in mapping.iter() {
594+
// We need to find equivalent projected expressions. For example,
595+
// consider a table with columns `[a, b, c]` with `a` == `b`, and
596+
// projection `[a + c, b + c]`. To conclude that `a + c == b + c`,
597+
// we first normalize all source expressions in the mapping, then
598+
// merge all equivalent expressions into the classes.
596599
let normalized_expr = self.normalize_expr(Arc::clone(source));
597600
new_classes
598601
.entry(normalized_expr)
599602
.or_insert_with(EquivalenceClass::new_empty)
600603
.push(Arc::clone(target));
601-
});
604+
}
602605
// Only add equivalence classes with at least two members as singleton
603606
// equivalence classes are meaningless.
604607
let new_classes = new_classes
@@ -642,7 +645,7 @@ impl EquivalenceGroup {
642645
// are equal in the resulting table.
643646
if join_type == &JoinType::Inner {
644647
for (lhs, rhs) in on.iter() {
645-
let new_lhs = Arc::clone(lhs) as _;
648+
let new_lhs = Arc::clone(lhs);
646649
// Rewrite rhs to point to the right side of the join:
647650
let new_rhs = Arc::clone(rhs)
648651
.transform(|expr| {

datafusion/physical-expr/src/equivalence/ordering.rs

+78
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use std::vec::IntoIter;
2222

2323
use crate::equivalence::add_offset_to_expr;
2424
use crate::{LexOrdering, PhysicalExpr};
25+
2526
use arrow::compute::SortOptions;
27+
use datafusion_common::HashSet;
2628

2729
/// An `OrderingEquivalenceClass` object keeps track of different alternative
2830
/// orderings than can describe a schema. For example, consider the following table:
@@ -234,6 +236,82 @@ impl OrderingEquivalenceClass {
234236
}
235237
None
236238
}
239+
240+
/// Checks whether the given expression is partially constant according to
241+
/// this ordering equivalence class.
242+
///
243+
/// This function determines whether `expr` appears in at least one combination
244+
/// of `descending` and `nulls_first` options that indicate partial constantness
245+
/// in a lexicographical ordering. Specifically, an expression is considered
246+
/// a partial constant in this context if its `SortOptions` satisfies either
247+
/// of the following conditions:
248+
/// - It is `descending` with `nulls_first` and _also_ `ascending` with
249+
/// `nulls_last`, OR
250+
/// - It is `descending` with `nulls_last` and _also_ `ascending` with
251+
/// `nulls_first`.
252+
///
253+
/// The equivalence mechanism primarily uses `ConstExpr`s to represent globally
254+
/// constant expressions. However, some expressions may only be partially
255+
/// constant within a lexicographical ordering. This function helps identify
256+
/// such cases. If an expression is constant within a prefix ordering, it is
257+
/// added as a constant during `ordering_satisfy_requirement()` iterations
258+
/// after the corresponding prefix requirement is satisfied.
259+
///
260+
/// ### Example Scenarios
261+
///
262+
/// In these scenarios, we assume that all expressions share the same sort
263+
/// properties.
264+
///
265+
/// #### Case 1: Sort Requirement `[a, c]`
266+
///
267+
/// **Existing Orderings:** `[[a, b, c], [a, d]]`, **Constants:** `[]`
268+
/// 1. `ordering_satisfy_single()` returns `true` because the requirement
269+
/// `a` is satisfied by `[a, b, c].first()`.
270+
/// 2. `a` is added as a constant for the next iteration.
271+
/// 3. The normalized orderings become `[[b, c], [d]]`.
272+
/// 4. `ordering_satisfy_single()` returns `false` for `c`, as neither
273+
/// `[b, c]` nor `[d]` satisfies `c`.
274+
///
275+
/// #### Case 2: Sort Requirement `[a, d]`
276+
///
277+
/// **Existing Orderings:** `[[a, b, c], [a, d]]`, **Constants:** `[]`
278+
/// 1. `ordering_satisfy_single()` returns `true` because the requirement
279+
/// `a` is satisfied by `[a, b, c].first()`.
280+
/// 2. `a` is added as a constant for the next iteration.
281+
/// 3. The normalized orderings become `[[b, c], [d]]`.
282+
/// 4. `ordering_satisfy_single()` returns `true` for `d`, as `[d]` satisfies
283+
/// `d`.
284+
///
285+
/// ### Future Improvements
286+
///
287+
/// This function may become unnecessary if any of the following improvements
288+
/// are implemented:
289+
/// 1. `SortOptions` supports encoding constantness information.
290+
/// 2. `EquivalenceProperties` gains `FunctionalDependency` awareness, eliminating
291+
/// the need for `Constant` and `Constraints`.
292+
pub fn is_expr_partial_const(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
293+
let mut constantness_defining_pairs = [
294+
HashSet::from([(false, false), (true, true)]),
295+
HashSet::from([(false, true), (true, false)]),
296+
];
297+
298+
for ordering in self.iter() {
299+
if let Some(leading_ordering) = ordering.first() {
300+
if leading_ordering.expr.eq(expr) {
301+
let opt = (
302+
leading_ordering.options.descending,
303+
leading_ordering.options.nulls_first,
304+
);
305+
constantness_defining_pairs[0].remove(&opt);
306+
constantness_defining_pairs[1].remove(&opt);
307+
}
308+
}
309+
}
310+
311+
constantness_defining_pairs
312+
.iter()
313+
.any(|pair| pair.is_empty())
314+
}
237315
}
238316

239317
/// Convert the `OrderingEquivalenceClass` into an iterator of LexOrderings

datafusion/physical-expr/src/equivalence/properties.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -1464,12 +1464,12 @@ fn update_properties(
14641464
let normalized_expr = eq_properties
14651465
.eq_group
14661466
.normalize_expr(Arc::clone(&node.expr));
1467-
if eq_properties.is_expr_constant(&normalized_expr) {
1468-
node.data.sort_properties = SortProperties::Singleton;
1469-
} else if let Some(options) = eq_properties
1470-
.normalized_oeq_class()
1471-
.get_options(&normalized_expr)
1467+
let oeq_class = eq_properties.normalized_oeq_class();
1468+
if eq_properties.is_expr_constant(&normalized_expr)
1469+
|| oeq_class.is_expr_partial_const(&normalized_expr)
14721470
{
1471+
node.data.sort_properties = SortProperties::Singleton;
1472+
} else if let Some(options) = oeq_class.get_options(&normalized_expr) {
14731473
node.data.sort_properties = SortProperties::Ordered(options);
14741474
}
14751475
Ok(Transformed::yes(node))

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

+15-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
5050
/// of the parent node as its data.
5151
///
5252
/// [`EnforceSorting`]: crate::enforce_sorting::EnforceSorting
53-
#[derive(Default, Clone)]
53+
#[derive(Default, Clone, Debug)]
5454
pub struct ParentRequirements {
5555
ordering_requirement: Option<LexRequirement>,
5656
fetch: Option<usize>,
@@ -191,7 +191,20 @@ fn pushdown_requirement_to_children(
191191
.then(|| LexRequirement::new(request_child.to_vec()));
192192
Ok(Some(vec![req]))
193193
}
194-
RequirementsCompatibility::Compatible(adjusted) => Ok(Some(vec![adjusted])),
194+
RequirementsCompatibility::Compatible(adjusted) => {
195+
// If parent requirements are more specific than output ordering
196+
// of the window plan, then we can deduce that the parent expects
197+
// an ordering from the columns created by window functions. If
198+
// that's the case, we block the pushdown of sort operation.
199+
if !plan
200+
.equivalence_properties()
201+
.ordering_satisfy_requirement(parent_required)
202+
{
203+
return Ok(None);
204+
}
205+
206+
Ok(Some(vec![adjusted]))
207+
}
195208
RequirementsCompatibility::NonCompatible => Ok(None),
196209
}
197210
} else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {

0 commit comments

Comments
 (0)