Skip to content

Commit f3b1141

Browse files
jayzhan-synnadaberkaysynnadaozankabak
authored
Replace execution_mode with emission_type and boundedness (#13823)
* feat: update execution modes and add bitflags dependency - Introduced `Incremental` execution mode alongside existing modes in the DataFusion execution plan. - Updated various execution plans to utilize the new `Incremental` mode where applicable, enhancing streaming capabilities. - Added `bitflags` dependency to `Cargo.toml` for better management of execution modes. - Adjusted execution mode handling in multiple files to ensure compatibility with the new structure. * add exec API Signed-off-by: Jay Zhan <[email protected]> * replace done but has stackoverflow Signed-off-by: Jay Zhan <[email protected]> * exec API done Signed-off-by: Jay Zhan <[email protected]> * Refactor execution plan properties to remove execution mode - Removed the `ExecutionMode` parameter from `PlanProperties` across multiple physical plan implementations. - Updated related functions to utilize the new structure, ensuring compatibility with the changes. - Adjusted comments and cleaned up imports to reflect the removal of execution mode handling. This refactor simplifies the execution plan properties and enhances maintainability. * Refactor execution plan to remove `ExecutionMode` and introduce `EmissionType` - Removed the `ExecutionMode` parameter from `PlanProperties` and related implementations across multiple files. - Introduced `EmissionType` to better represent the output characteristics of execution plans. - Updated functions and tests to reflect the new structure, ensuring compatibility and enhancing maintainability. - Cleaned up imports and adjusted comments accordingly. This refactor simplifies the execution plan properties and improves the clarity of memory handling in execution plans. * fix test Signed-off-by: Jay Zhan <[email protected]> * Refactor join handling and emission type logic - Updated test cases in `sanity_checker.rs` to reflect changes in expected outcomes for bounded and unbounded joins, ensuring accurate test coverage. - Simplified the `is_pipeline_breaking` method in `execution_plan.rs` to clarify the conditions under which a plan is considered pipeline-breaking. - Enhanced the emission type determination logic in `execution_plan.rs` to prioritize `Final` over `Both` and `Incremental`, improving clarity in execution plan behavior. - Adjusted join type handling in `hash_join.rs` to classify `Right` joins as `Incremental`, allowing for immediate row emission. These changes improve the accuracy of tests and the clarity of execution plan properties. * Implement emission type for execution plans - Updated multiple execution plan implementations to replace `unimplemented!()` with `EmissionType::Incremental`, ensuring that the emission type is correctly defined for various plans. - This change enhances the clarity and functionality of the execution plans by explicitly specifying their emission behavior. These updates contribute to a more robust execution plan framework within the DataFusion project. * Enhance join type documentation and refine emission type logic - Updated the `JoinType` enum in `join_type.rs` to include detailed descriptions for each join type, improving clarity on their behavior and expected results. - Modified the emission type logic in `hash_join.rs` to ensure that `Right` and `RightAnti` joins are classified as `Incremental`, allowing for immediate row emission when applicable. These changes improve the documentation and functionality of join operations within the DataFusion project. * Refactor emission type logic in join and sort execution plans - Updated the emission type determination in `SortMergeJoinExec` and `SymmetricHashJoinExec` to utilize the `emission_type_from_children` function, enhancing the accuracy of emission behavior based on input characteristics. - Clarified comments in `sort.rs` regarding the conditions under which results are emitted, emphasizing the relationship between input sorting and emission type. - These changes improve the clarity and functionality of the execution plans within the DataFusion project, ensuring more robust handling of emission types. * Refactor emission type handling in execution plans - Updated the `emission_type_from_children` function to accept an iterator instead of a slice, enhancing flexibility in how child execution plans are passed. - Modified the `SymmetricHashJoinExec` implementation to utilize the new function signature, improving code clarity and maintainability. These changes streamline the emission type determination process within the DataFusion project, contributing to a more robust execution plan framework. * Enhance execution plan properties with boundedness and emission type - Introduced `boundedness` and `pipeline_behavior` methods to the `ExecutionPlanProperties` trait, improving the handling of execution plan characteristics. - Updated the `CsvExec`, `SortExec`, and related implementations to utilize the new methods for determining boundedness and emission behavior. - Refactored the `ensure_distribution` function to use the new boundedness logic, enhancing clarity in distribution decisions. - These changes contribute to a more robust and maintainable execution plan framework within the DataFusion project. * Refactor execution plans to enhance boundedness and emission type handling - Updated multiple execution plan implementations to incorporate `Boundedness` and `EmissionType`, improving the clarity and functionality of execution plans. - Replaced instances of `unimplemented!()` with appropriate emission types, ensuring that plans correctly define their output behavior. - Refactored the `PlanProperties` structure to utilize the new boundedness logic, enhancing decision-making in execution plans. - These changes contribute to a more robust and maintainable execution plan framework within the DataFusion project. * Refactor memory handling in execution plans - Updated the condition for checking memory requirements in execution plans from `has_finite_memory()` to `boundedness().requires_finite_memory()`, improving clarity in memory management. - This change enhances the robustness of execution plans within the DataFusion project by ensuring more accurate assessments of memory constraints. * Refactor boundedness checks in execution plans - Updated conditions for checking boundedness in various execution plans to use `is_unbounded()` instead of `requires_finite_memory()`, enhancing clarity in memory management. - Adjusted the `PlanProperties` structure to reflect these changes, ensuring more accurate assessments of memory constraints across the DataFusion project. - These modifications contribute to a more robust and maintainable execution plan framework, improving the handling of boundedness in execution strategies. * Remove TODO comment regarding unbounded execution plans in `UnboundedExec` implementation - Eliminated the outdated comment suggesting a switch to unbounded execution with finite memory, streamlining the code and improving clarity. - This change contributes to a cleaner and more maintainable codebase within the DataFusion project. * Refactor execution plan boundedness and emission type handling - Updated the `is_pipeline_breaking` method to use `requires_finite_memory()` for improved clarity in determining pipeline behavior. - Enhanced the `Boundedness` enum to include detailed documentation on memory requirements for unbounded streams. - Refactored `compute_properties` methods in `GlobalLimitExec` and `LocalLimitExec` to directly use the input's boundedness, simplifying the logic. - Adjusted emission type determination in `NestedLoopJoinExec` to utilize the `emission_type_from_children` function, ensuring accurate output behavior based on input characteristics. These changes contribute to a more robust and maintainable execution plan framework within the DataFusion project, improving clarity and functionality in handling boundedness and emission types. * Refactor emission type and boundedness handling in execution plans - Removed the `OptionalEmissionType` struct from `plan_properties.rs`, simplifying the codebase. - Updated the `is_pipeline_breaking` function in `execution_plan.rs` for improved readability by formatting the condition across multiple lines. - Adjusted the `GlobalLimitExec` implementation in `limit.rs` to directly use the input's boundedness, enhancing clarity in memory management. These changes contribute to a more streamlined and maintainable execution plan framework within the DataFusion project, improving the handling of emission types and boundedness. * Refactor GlobalLimitExec and LocalLimitExec to enhance boundedness handling - Updated the `compute_properties` methods in both `GlobalLimitExec` and `LocalLimitExec` to replace `EmissionType::Final` with `Boundedness::Bounded`, reflecting that limit operations always produce a finite number of rows. - Changed the input's boundedness reference to `pipeline_behavior()` for improved clarity in execution plan properties. These changes contribute to a more streamlined and maintainable execution plan framework within the DataFusion project, enhancing the handling of boundedness in limit operations. * Review Part1 * Update sanity_checker.rs * addressing reviews * Review Part 1 * Update datafusion/physical-plan/src/execution_plan.rs * Update datafusion/physical-plan/src/execution_plan.rs * Shorten imports * Enhance documentation for JoinType and Boundedness enums - Improved descriptions for the Inner and Full join types in join_type.rs to clarify their behavior and examples. - Added explanations regarding the boundedness of output streams and memory requirements in execution_plan.rs, including specific examples for operators like Median and Min/Max. --------- Signed-off-by: Jay Zhan <[email protected]> Co-authored-by: berkaysynnada <[email protected]> Co-authored-by: Mehmet Ozan Kabak <[email protected]>
1 parent 95d296c commit f3b1141

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+748
-457
lines changed

datafusion-cli/src/exec.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ use crate::{
3333
};
3434

3535
use datafusion::common::instant::Instant;
36-
use datafusion::common::plan_datafusion_err;
36+
use datafusion::common::{plan_datafusion_err, plan_err};
3737
use datafusion::config::ConfigFileType;
3838
use datafusion::datasource::listing::ListingTableUrl;
3939
use datafusion::error::{DataFusionError, Result};
4040
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
41+
use datafusion::physical_plan::execution_plan::EmissionType;
4142
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
4243
use datafusion::sql::parser::{DFParser, Statement};
4344
use datafusion::sql::sqlparser::dialect::dialect_from_str;
@@ -234,10 +235,19 @@ pub(super) async fn exec_and_print(
234235
let df = ctx.execute_logical_plan(plan).await?;
235236
let physical_plan = df.create_physical_plan().await?;
236237

237-
if physical_plan.execution_mode().is_unbounded() {
238+
if physical_plan.boundedness().is_unbounded() {
239+
if physical_plan.pipeline_behavior() == EmissionType::Final {
240+
return plan_err!(
241+
"The given query can generate a valid result only once \
242+
the source finishes, but the source is unbounded"
243+
);
244+
}
245+
// As the input stream comes, we can generate results.
246+
// However, memory safety is not guaranteed.
238247
let stream = execute_stream(physical_plan, task_ctx.clone())?;
239248
print_options.print_stream(stream, now).await?;
240249
} else {
250+
// Bounded stream; collected results are printed after all input consumed.
241251
let schema = physical_plan.schema();
242252
let results = collect(physical_plan, task_ctx.clone()).await?;
243253
adjusted.into_inner().print_batches(schema, &results, now)?;

datafusion-examples/examples/custom_datasource.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ use datafusion::error::Result;
3030
use datafusion::execution::context::TaskContext;
3131
use datafusion::logical_expr::LogicalPlanBuilder;
3232
use datafusion::physical_expr::EquivalenceProperties;
33+
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
3334
use datafusion::physical_plan::memory::MemoryStream;
3435
use datafusion::physical_plan::{
35-
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
36-
Partitioning, PlanProperties, SendableRecordBatchStream,
36+
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
37+
PlanProperties, SendableRecordBatchStream,
3738
};
3839
use datafusion::prelude::*;
3940

@@ -214,7 +215,8 @@ impl CustomExec {
214215
PlanProperties::new(
215216
eq_properties,
216217
Partitioning::UnknownPartitioning(1),
217-
ExecutionMode::Bounded,
218+
EmissionType::Incremental,
219+
Boundedness::Bounded,
218220
)
219221
}
220222
}

datafusion/common/src/join_type.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,30 @@ use crate::{DataFusionError, Result};
2828
/// Join type
2929
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
3030
pub enum JoinType {
31-
/// Inner Join
31+
/// Inner Join - Returns only rows where there is a matching value in both tables based on the join condition.
32+
/// For example, if joining table A and B on A.id = B.id, only rows where A.id equals B.id will be included.
33+
/// All columns from both tables are returned for the matching rows. Non-matching rows are excluded entirely.
3234
Inner,
33-
/// Left Join
35+
/// Left Join - Returns all rows from the left table and matching rows from the right table.
36+
/// If no match, NULL values are returned for columns from the right table.
3437
Left,
35-
/// Right Join
38+
/// Right Join - Returns all rows from the right table and matching rows from the left table.
39+
/// If no match, NULL values are returned for columns from the left table.
3640
Right,
37-
/// Full Join
41+
/// Full Join (also called Full Outer Join) - Returns all rows from both tables, matching rows where possible.
42+
/// When a row from either table has no match in the other table, the missing columns are filled with NULL values.
43+
/// For example, if table A has row X with no match in table B, the result will contain row X with NULL values for all of table B's columns.
44+
/// This join type preserves all records from both tables, making it useful when you need to see all data regardless of matches.
3845
Full,
39-
/// Left Semi Join
46+
/// Left Semi Join - Returns rows from the left table that have matching rows in the right table.
47+
/// Only columns from the left table are returned.
4048
LeftSemi,
41-
/// Right Semi Join
49+
/// Right Semi Join - Returns rows from the right table that have matching rows in the left table.
50+
/// Only columns from the right table are returned.
4251
RightSemi,
43-
/// Left Anti Join
52+
/// Left Anti Join - Returns rows from the left table that do not have a matching row in the right table.
4453
LeftAnti,
45-
/// Right Anti Join
54+
/// Right Anti Join - Returns rows from the right table that do not have a matching row in the left table.
4655
RightAnti,
4756
/// Left Mark join
4857
///

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ use datafusion_common::config::ConfigOptions;
3838
use datafusion_common::Statistics;
3939
use datafusion_execution::TaskContext;
4040
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
41-
use datafusion_physical_plan::{ExecutionMode, PlanProperties};
41+
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
42+
use datafusion_physical_plan::PlanProperties;
4243

4344
use futures::StreamExt;
4445
use itertools::Itertools;
@@ -97,7 +98,8 @@ impl ArrowExec {
9798
PlanProperties::new(
9899
eq_properties,
99100
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
100-
ExecutionMode::Bounded, // Execution Mode
101+
EmissionType::Incremental,
102+
Boundedness::Bounded,
101103
)
102104
}
103105

datafusion/core/src/datasource/physical_plan/avro.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@ use super::FileScanConfig;
2424
use crate::error::Result;
2525
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
2626
use crate::physical_plan::{
27-
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
28-
PlanProperties, SendableRecordBatchStream, Statistics,
27+
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
28+
SendableRecordBatchStream, Statistics,
2929
};
3030

3131
use arrow::datatypes::SchemaRef;
3232
use datafusion_execution::TaskContext;
3333
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
34+
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
3435

3536
/// Execution plan for scanning Avro data source
3637
#[derive(Debug, Clone)]
@@ -81,7 +82,8 @@ impl AvroExec {
8182
PlanProperties::new(
8283
eq_properties,
8384
Partitioning::UnknownPartitioning(n_partitions), // Output Partitioning
84-
ExecutionMode::Bounded, // Execution Mode
85+
EmissionType::Incremental,
86+
Boundedness::Bounded,
8587
)
8688
}
8789
}

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ use crate::datasource::physical_plan::FileMeta;
3333
use crate::error::{DataFusionError, Result};
3434
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
3535
use crate::physical_plan::{
36-
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
37-
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
36+
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
37+
PlanProperties, SendableRecordBatchStream, Statistics,
3838
};
3939

4040
use arrow::csv;
@@ -43,6 +43,7 @@ use datafusion_common::config::ConfigOptions;
4343
use datafusion_execution::TaskContext;
4444
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
4545

46+
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
4647
use futures::{StreamExt, TryStreamExt};
4748
use object_store::buffered::BufWriter;
4849
use object_store::{GetOptions, GetResultPayload, ObjectStore};
@@ -327,7 +328,8 @@ impl CsvExec {
327328
PlanProperties::new(
328329
eq_properties,
329330
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
330-
ExecutionMode::Bounded, // Execution Mode
331+
EmissionType::Incremental,
332+
Boundedness::Bounded,
331333
)
332334
}
333335

datafusion/core/src/datasource/physical_plan/json.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,15 @@ use crate::datasource::physical_plan::FileMeta;
3333
use crate::error::{DataFusionError, Result};
3434
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
3535
use crate::physical_plan::{
36-
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
37-
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
36+
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
37+
PlanProperties, SendableRecordBatchStream, Statistics,
3838
};
3939

4040
use arrow::json::ReaderBuilder;
4141
use arrow::{datatypes::SchemaRef, json};
4242
use datafusion_execution::TaskContext;
4343
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
44+
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
4445

4546
use futures::{StreamExt, TryStreamExt};
4647
use object_store::buffered::BufWriter;
@@ -107,7 +108,8 @@ impl NdJsonExec {
107108
PlanProperties::new(
108109
eq_properties,
109110
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
110-
ExecutionMode::Bounded, // Execution Mode
111+
EmissionType::Incremental,
112+
Boundedness::Bounded,
111113
)
112114
}
113115

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@ use crate::{
3434
physical_optimizer::pruning::PruningPredicate,
3535
physical_plan::{
3636
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
37-
DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
37+
DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
3838
SendableRecordBatchStream, Statistics,
3939
},
4040
};
4141

4242
use arrow::datatypes::SchemaRef;
4343
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
44+
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
4445

4546
use itertools::Itertools;
4647
use log::debug;
@@ -654,13 +655,11 @@ impl ParquetExec {
654655
orderings: &[LexOrdering],
655656
file_config: &FileScanConfig,
656657
) -> PlanProperties {
657-
// Equivalence Properties
658-
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
659-
660658
PlanProperties::new(
661-
eq_properties,
659+
EquivalenceProperties::new_with_orderings(schema, orderings),
662660
Self::output_partitioning_helper(file_config), // Output Partitioning
663-
ExecutionMode::Bounded, // Execution Mode
661+
EmissionType::Incremental,
662+
Boundedness::Bounded,
664663
)
665664
}
666665

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,13 @@ use datafusion_physical_expr::utils::map_columns_before_projection;
5252
use datafusion_physical_expr::{
5353
physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
5454
};
55+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5556
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
5657
use datafusion_physical_optimizer::PhysicalOptimizerRule;
58+
use datafusion_physical_plan::execution_plan::EmissionType;
5759
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
5860
use datafusion_physical_plan::ExecutionPlanProperties;
5961

60-
use datafusion_physical_expr_common::sort_expr::LexOrdering;
6162
use itertools::izip;
6263

6364
/// The `EnforceDistribution` rule ensures that distribution requirements are
@@ -1161,12 +1162,17 @@ fn ensure_distribution(
11611162
let should_use_estimates = config
11621163
.execution
11631164
.use_row_number_estimates_to_optimize_partitioning;
1164-
let is_unbounded = dist_context.plan.execution_mode().is_unbounded();
1165+
let unbounded_and_pipeline_friendly = dist_context.plan.boundedness().is_unbounded()
1166+
&& matches!(
1167+
dist_context.plan.pipeline_behavior(),
1168+
EmissionType::Incremental | EmissionType::Both
1169+
);
11651170
// Use order preserving variants either of the conditions true
11661171
// - it is desired according to config
11671172
// - when plan is unbounded
1173+
// - when it is pipeline friendly (can incrementally produce results)
11681174
let order_preserving_variants_desirable =
1169-
is_unbounded || config.optimizer.prefer_existing_sort;
1175+
unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
11701176

11711177
// Remove unnecessary repartition from the physical plan if any
11721178
let DistributionContext {
@@ -1459,7 +1465,8 @@ pub(crate) mod tests {
14591465
PlanProperties::new(
14601466
input.equivalence_properties().clone(), // Equivalence Properties
14611467
input.output_partitioning().clone(), // Output Partitioning
1462-
input.execution_mode(), // Execution Mode
1468+
input.pipeline_behavior(), // Pipeline Behavior
1469+
input.boundedness(), // Boundedness
14631470
)
14641471
}
14651472
}

datafusion/core/src/physical_optimizer/enforce_sorting.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ fn replace_with_partial_sort(
214214
let plan_any = plan.as_any();
215215
if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() {
216216
let child = Arc::clone(sort_plan.children()[0]);
217-
if !child.execution_mode().is_unbounded() {
217+
if !child.boundedness().is_unbounded() {
218218
return Ok(plan);
219219
}
220220

datafusion/core/src/physical_optimizer/join_selection.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use datafusion_physical_expr::expressions::Column;
4343
use datafusion_physical_expr::PhysicalExpr;
4444
use datafusion_physical_expr_common::sort_expr::LexOrdering;
4545
use datafusion_physical_optimizer::PhysicalOptimizerRule;
46+
use datafusion_physical_plan::execution_plan::EmissionType;
4647

4748
/// The [`JoinSelection`] rule tries to modify a given plan so that it can
4849
/// accommodate infinite sources and optimize joins in the plan according to
@@ -516,7 +517,8 @@ fn statistical_join_selection_subrule(
516517
pub type PipelineFixerSubrule =
517518
dyn Fn(Arc<dyn ExecutionPlan>, &ConfigOptions) -> Result<Arc<dyn ExecutionPlan>>;
518519

519-
/// Converts a hash join to a symmetric hash join in the case of infinite inputs on both sides.
520+
/// Converts a hash join to a symmetric hash join if both its inputs are
521+
/// unbounded and incremental.
520522
///
521523
/// This subrule checks if a hash join can be replaced with a symmetric hash join when dealing
522524
/// with unbounded (infinite) inputs on both sides. This replacement avoids pipeline breaking and
@@ -537,10 +539,18 @@ fn hash_join_convert_symmetric_subrule(
537539
) -> Result<Arc<dyn ExecutionPlan>> {
538540
// Check if the current plan node is a HashJoinExec.
539541
if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
540-
let left_unbounded = hash_join.left.execution_mode().is_unbounded();
541-
let right_unbounded = hash_join.right.execution_mode().is_unbounded();
542-
// Process only if both left and right sides are unbounded.
543-
if left_unbounded && right_unbounded {
542+
let left_unbounded = hash_join.left.boundedness().is_unbounded();
543+
let left_incremental = matches!(
544+
hash_join.left.pipeline_behavior(),
545+
EmissionType::Incremental | EmissionType::Both
546+
);
547+
let right_unbounded = hash_join.right.boundedness().is_unbounded();
548+
let right_incremental = matches!(
549+
hash_join.right.pipeline_behavior(),
550+
EmissionType::Incremental | EmissionType::Both
551+
);
552+
// Process only if both left and right sides are unbounded and incrementally emit.
553+
if left_unbounded && right_unbounded & left_incremental & right_incremental {
544554
// Determine the partition mode based on configuration.
545555
let mode = if config_options.optimizer.repartition_joins {
546556
StreamJoinPartitionMode::Partitioned
@@ -669,8 +679,8 @@ fn hash_join_swap_subrule(
669679
_config_options: &ConfigOptions,
670680
) -> Result<Arc<dyn ExecutionPlan>> {
671681
if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
672-
if hash_join.left.execution_mode().is_unbounded()
673-
&& !hash_join.right.execution_mode().is_unbounded()
682+
if hash_join.left.boundedness().is_unbounded()
683+
&& !hash_join.right.boundedness().is_unbounded()
674684
&& matches!(
675685
*hash_join.join_type(),
676686
JoinType::Inner
@@ -2025,12 +2035,12 @@ mod hash_join_tests {
20252035
assert_eq!(
20262036
(
20272037
t.case.as_str(),
2028-
if left.execution_mode().is_unbounded() {
2038+
if left.boundedness().is_unbounded() {
20292039
SourceType::Unbounded
20302040
} else {
20312041
SourceType::Bounded
20322042
},
2033-
if right.execution_mode().is_unbounded() {
2043+
if right.boundedness().is_unbounded() {
20342044
SourceType::Unbounded
20352045
} else {
20362046
SourceType::Bounded

datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
2929

3030
use datafusion_common::config::ConfigOptions;
3131
use datafusion_common::tree_node::Transformed;
32+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3233
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
34+
use datafusion_physical_plan::execution_plan::EmissionType;
3335
use datafusion_physical_plan::tree_node::PlanContext;
3436
use datafusion_physical_plan::ExecutionPlanProperties;
3537

36-
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3738
use itertools::izip;
3839

3940
/// For a given `plan`, this object carries the information one needs from its
@@ -246,7 +247,8 @@ pub(crate) fn replace_with_order_preserving_variants(
246247
// For unbounded cases, we replace with the order-preserving variant in any
247248
// case, as doing so helps fix the pipeline. Also replace if config allows.
248249
let use_order_preserving_variant = config.optimizer.prefer_existing_sort
249-
|| !requirements.plan.execution_mode().pipeline_friendly();
250+
|| (requirements.plan.boundedness().is_unbounded()
251+
&& requirements.plan.pipeline_behavior() == EmissionType::Final);
250252

251253
// Create an alternate plan with order-preserving variants:
252254
let mut alternate_plan = plan_with_order_preserving_variants(

0 commit comments

Comments
 (0)