Skip to content

Commit 9dd2cfc

Browse files
authored
Do not push down Sorts if it violates the sort requirements (#11678)
* Do not push down Sorts if it violates the sort requirements * Test for pushing through orders
1 parent 0f554fa commit 9dd2cfc

File tree

3 files changed

+186
-1
lines changed

3 files changed

+186
-1
lines changed

datafusion/core/src/physical_optimizer/enforce_sorting.rs

+64
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,7 @@ mod tests {
621621
limit_exec, local_limit_exec, memory_exec, parquet_exec, parquet_exec_sorted,
622622
repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec,
623623
sort_preserving_merge_exec, spr_repartition_exec, union_exec,
624+
RequirementsTestExec,
624625
};
625626
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
626627
use crate::prelude::{SessionConfig, SessionContext};
@@ -2346,4 +2347,67 @@ mod tests {
23462347
assert_optimized!(expected_input, expected_no_change, physical_plan, true);
23472348
Ok(())
23482349
}
2350+
2351+
#[tokio::test]
2352+
async fn test_push_with_required_input_ordering_prohibited() -> Result<()> {
2353+
// SortExec: expr=[b] <-- can't push this down
2354+
// RequiredInputOrder expr=[a] <-- this requires input sorted by a, and preserves the input order
2355+
// SortExec: expr=[a]
2356+
// MemoryExec
2357+
let schema = create_test_schema3()?;
2358+
let sort_exprs_a = vec![sort_expr("a", &schema)];
2359+
let sort_exprs_b = vec![sort_expr("b", &schema)];
2360+
let plan = memory_exec(&schema);
2361+
let plan = sort_exec(sort_exprs_a.clone(), plan);
2362+
let plan = RequirementsTestExec::new(plan)
2363+
.with_required_input_ordering(sort_exprs_a)
2364+
.with_maintains_input_order(true)
2365+
.into_arc();
2366+
let plan = sort_exec(sort_exprs_b, plan);
2367+
2368+
let expected_input = [
2369+
"SortExec: expr=[b@1 ASC], preserve_partitioning=[false]",
2370+
" RequiredInputOrderingExec",
2371+
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
2372+
" MemoryExec: partitions=1, partition_sizes=[0]",
2373+
];
2374+
// should not be able to push shorts
2375+
let expected_no_change = expected_input;
2376+
assert_optimized!(expected_input, expected_no_change, plan, true);
2377+
Ok(())
2378+
}
2379+
2380+
// test when the required input ordering is satisfied so could push through
2381+
#[tokio::test]
2382+
async fn test_push_with_required_input_ordering_allowed() -> Result<()> {
2383+
// SortExec: expr=[a,b] <-- can push this down (as it is compatible with the required input ordering)
2384+
// RequiredInputOrder expr=[a] <-- this requires input sorted by a, and preserves the input order
2385+
// SortExec: expr=[a]
2386+
// MemoryExec
2387+
let schema = create_test_schema3()?;
2388+
let sort_exprs_a = vec![sort_expr("a", &schema)];
2389+
let sort_exprs_ab = vec![sort_expr("a", &schema), sort_expr("b", &schema)];
2390+
let plan = memory_exec(&schema);
2391+
let plan = sort_exec(sort_exprs_a.clone(), plan);
2392+
let plan = RequirementsTestExec::new(plan)
2393+
.with_required_input_ordering(sort_exprs_a)
2394+
.with_maintains_input_order(true)
2395+
.into_arc();
2396+
let plan = sort_exec(sort_exprs_ab, plan);
2397+
2398+
let expected_input = [
2399+
"SortExec: expr=[a@0 ASC,b@1 ASC], preserve_partitioning=[false]",
2400+
" RequiredInputOrderingExec",
2401+
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
2402+
" MemoryExec: partitions=1, partition_sizes=[0]",
2403+
];
2404+
// should able to push shorts
2405+
let expected = [
2406+
"RequiredInputOrderingExec",
2407+
" SortExec: expr=[a@0 ASC,b@1 ASC], preserve_partitioning=[false]",
2408+
" MemoryExec: partitions=1, partition_sizes=[0]",
2409+
];
2410+
assert_optimized!(expected_input, expected, plan, true);
2411+
Ok(())
2412+
}
23492413
}

datafusion/core/src/physical_optimizer/sort_pushdown.rs

+24
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ fn pushdown_requirement_to_children(
176176
|| plan.as_any().is::<ProjectionExec>()
177177
|| is_limit(plan)
178178
|| plan.as_any().is::<HashJoinExec>()
179+
|| pushdown_would_violate_requirements(parent_required, plan.as_ref())
179180
{
180181
// If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements.
181182
// For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering.
@@ -211,6 +212,29 @@ fn pushdown_requirement_to_children(
211212
// TODO: Add support for Projection push down
212213
}
213214

215+
/// Return true if pushing the sort requirements through a node would violate
216+
/// the input sorting requirements for the plan
217+
fn pushdown_would_violate_requirements(
218+
parent_required: LexRequirementRef,
219+
child: &dyn ExecutionPlan,
220+
) -> bool {
221+
child
222+
.required_input_ordering()
223+
.iter()
224+
.any(|child_required| {
225+
let Some(child_required) = child_required.as_ref() else {
226+
// no requirements, so pushing down would not violate anything
227+
return false;
228+
};
229+
// check if the plan's requirements would still e satisfied if we pushed
230+
// down the parent requirements
231+
child_required
232+
.iter()
233+
.zip(parent_required.iter())
234+
.all(|(c, p)| !c.compatible(p))
235+
})
236+
}
237+
214238
/// Determine children requirements:
215239
/// - If children requirements are more specific, do not push down parent
216240
/// requirements.

datafusion/core/src/physical_optimizer/test_utils.rs

+98-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
//! Collection of testing utility functions that are leveraged by the query optimizer rules
1919
20+
use std::any::Any;
21+
use std::fmt::Formatter;
2022
use std::sync::Arc;
2123

2224
use crate::datasource::listing::PartitionedFile;
@@ -47,10 +49,14 @@ use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
4749
use datafusion_functions_aggregate::count::count_udaf;
4850
use datafusion_physical_expr::expressions::col;
4951
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
50-
use datafusion_physical_plan::displayable;
5152
use datafusion_physical_plan::tree_node::PlanContext;
53+
use datafusion_physical_plan::{
54+
displayable, DisplayAs, DisplayFormatType, PlanProperties,
55+
};
5256

5357
use async_trait::async_trait;
58+
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
59+
use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement;
5460

5561
async fn register_current_csv(
5662
ctx: &SessionContext,
@@ -354,6 +360,97 @@ pub fn sort_exec(
354360
Arc::new(SortExec::new(sort_exprs, input))
355361
}
356362

363+
/// A test [`ExecutionPlan`] whose requirements can be configured.
364+
#[derive(Debug)]
365+
pub struct RequirementsTestExec {
366+
required_input_ordering: Vec<PhysicalSortExpr>,
367+
maintains_input_order: bool,
368+
input: Arc<dyn ExecutionPlan>,
369+
}
370+
371+
impl RequirementsTestExec {
372+
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
373+
Self {
374+
required_input_ordering: vec![],
375+
maintains_input_order: true,
376+
input,
377+
}
378+
}
379+
380+
/// sets the required input ordering
381+
pub fn with_required_input_ordering(
382+
mut self,
383+
required_input_ordering: Vec<PhysicalSortExpr>,
384+
) -> Self {
385+
self.required_input_ordering = required_input_ordering;
386+
self
387+
}
388+
389+
/// set the maintains_input_order flag
390+
pub fn with_maintains_input_order(mut self, maintains_input_order: bool) -> Self {
391+
self.maintains_input_order = maintains_input_order;
392+
self
393+
}
394+
395+
/// returns this ExecutionPlan as an Arc<dyn ExecutionPlan>
396+
pub fn into_arc(self) -> Arc<dyn ExecutionPlan> {
397+
Arc::new(self)
398+
}
399+
}
400+
401+
impl DisplayAs for RequirementsTestExec {
402+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
403+
write!(f, "RequiredInputOrderingExec")
404+
}
405+
}
406+
407+
impl ExecutionPlan for RequirementsTestExec {
408+
fn name(&self) -> &str {
409+
"RequiredInputOrderingExec"
410+
}
411+
412+
fn as_any(&self) -> &dyn Any {
413+
self
414+
}
415+
416+
fn properties(&self) -> &PlanProperties {
417+
self.input.properties()
418+
}
419+
420+
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
421+
let requirement =
422+
PhysicalSortRequirement::from_sort_exprs(&self.required_input_ordering);
423+
vec![Some(requirement)]
424+
}
425+
426+
fn maintains_input_order(&self) -> Vec<bool> {
427+
vec![self.maintains_input_order]
428+
}
429+
430+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
431+
vec![&self.input]
432+
}
433+
434+
fn with_new_children(
435+
self: Arc<Self>,
436+
children: Vec<Arc<dyn ExecutionPlan>>,
437+
) -> Result<Arc<dyn ExecutionPlan>> {
438+
assert_eq!(children.len(), 1);
439+
Ok(RequirementsTestExec::new(children[0].clone())
440+
.with_required_input_ordering(self.required_input_ordering.clone())
441+
.with_maintains_input_order(self.maintains_input_order)
442+
.into_arc())
443+
}
444+
445+
fn execute(
446+
&self,
447+
_partition: usize,
448+
_context: Arc<TaskContext>,
449+
) -> Result<SendableRecordBatchStream> {
450+
unimplemented!("Test exec does not support execution")
451+
}
452+
}
453+
357454
/// A [`PlanContext`] object is susceptible to being left in an inconsistent state after
358455
/// untested mutable operations. It is crucial that there be no discrepancies between a plan
359456
/// associated with the root node and the plan generated after traversing all nodes

0 commit comments

Comments
 (0)