diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index afed5dd37535..3d80b617626a 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -24,7 +24,6 @@ use std::fmt::Debug; use std::sync::Arc; -use super::output_requirements::OutputRequirementExec; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::utils::{ @@ -56,6 +55,7 @@ use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAg use datafusion_physical_plan::ExecutionPlanProperties; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are @@ -1290,7 +1290,7 @@ pub(crate) mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; - use crate::physical_optimizer::output_requirements::OutputRequirements; + use datafusion_physical_optimizer::output_requirements::OutputRequirements; use crate::physical_optimizer::test_utils::{ check_integrity, coalesce_partitions_exec, repartition_exec, }; diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 582f340151ae..a0c9c3697744 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -29,7 +29,6 @@ pub mod enforce_sorting; pub mod join_selection; pub mod limited_distinct_aggregation; pub mod optimizer; -pub mod output_requirements; pub mod projection_pushdown; pub mod pruning; pub mod replace_with_order_preserving_variants; diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 9c0ee61da52a..125ea6acc77f 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -33,4 +33,6 @@ workspace = true [dependencies] datafusion-common = { workspace = true, default-features = true } +datafusion-execution = { workspace = true } +datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index c5a49216f5fd..6b9df7cad5c8 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -18,5 +18,6 @@ #![deny(clippy::clone_on_ref_ptr)] mod optimizer; +pub mod output_requirements; pub use optimizer::PhysicalOptimizerRule; diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs similarity index 95% rename from datafusion/core/src/physical_optimizer/output_requirements.rs rename to datafusion/physical-optimizer/src/output_requirements.rs index cb9a0cb90e6c..d85daee7c00a 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -24,17 +24,19 @@ use std::sync::Arc; -use crate::physical_plan::sorts::sort::SortExec; -use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; +use datafusion_execution::TaskContext; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Result, Statistics}; use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement}; -use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties}; +use crate::PhysicalOptimizerRule; + /// This rule either adds or removes [`OutputRequirements`]s to/from the physical /// plan according to its `mode` attribute, which is set by the constructors /// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of @@ -86,7 +88,7 @@ enum RuleMode { /// /// See [`OutputRequirements`] for more details #[derive(Debug)] -pub(crate) struct OutputRequirementExec { +pub struct OutputRequirementExec { input: Arc, order_requirement: Option, dist_requirement: Distribution, @@ -94,7 +96,7 @@ pub(crate) struct OutputRequirementExec { } impl OutputRequirementExec { - pub(crate) fn new( + pub fn new( input: Arc, requirements: Option, dist_requirement: Distribution, @@ -108,7 +110,7 @@ impl OutputRequirementExec { } } - pub(crate) fn input(&self) -> Arc { + pub fn input(&self) -> Arc { self.input.clone() } @@ -179,8 +181,8 @@ impl ExecutionPlan for OutputRequirementExec { fn execute( &self, _partition: usize, - _context: Arc, - ) -> Result { + _context: Arc, + ) -> Result { unreachable!(); }