Skip to content

Commit

Permalink
Move OutputRequirements to datafusion-physical-optimizer crate
Browse files Browse the repository at this point in the history
  • Loading branch information
xinlifoobar committed Jul 21, 2024
1 parent 5da7ab3 commit 3c36a9a
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
use std::fmt::Debug;
use std::sync::Arc;

use super::output_requirements::OutputRequirementExec;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
Expand Down Expand Up @@ -56,6 +55,7 @@ use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAg
use datafusion_physical_plan::ExecutionPlanProperties;

use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use itertools::izip;

/// The `EnforceDistribution` rule ensures that distribution requirements are
Expand Down Expand Up @@ -1290,7 +1290,7 @@ pub(crate) mod tests {
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec};
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
use crate::physical_optimizer::output_requirements::OutputRequirements;
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
use crate::physical_optimizer::test_utils::{
check_integrity, coalesce_partitions_exec, repartition_exec,
};
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub mod enforce_sorting;
pub mod join_selection;
pub mod limited_distinct_aggregation;
pub mod optimizer;
pub mod output_requirements;
pub mod projection_pushdown;
pub mod pruning;
pub mod replace_with_order_preserving_variants;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ workspace = true

[dependencies]
datafusion-common = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
#![deny(clippy::clone_on_ref_ptr)]

mod optimizer;
pub mod output_requirements;

pub use optimizer::PhysicalOptimizerRule;
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@

use std::sync::Arc;

use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion_execution::TaskContext;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream};

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};

use crate::PhysicalOptimizerRule;

/// This rule either adds or removes [`OutputRequirements`]s to/from the physical
/// plan according to its `mode` attribute, which is set by the constructors
/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of
Expand Down Expand Up @@ -86,15 +88,15 @@ enum RuleMode {
///
/// See [`OutputRequirements`] for more details
#[derive(Debug)]
pub(crate) struct OutputRequirementExec {
pub struct OutputRequirementExec {
input: Arc<dyn ExecutionPlan>,
order_requirement: Option<LexRequirement>,
dist_requirement: Distribution,
cache: PlanProperties,
}

impl OutputRequirementExec {
pub(crate) fn new(
pub fn new(
input: Arc<dyn ExecutionPlan>,
requirements: Option<LexRequirement>,
dist_requirement: Distribution,
Expand All @@ -108,7 +110,7 @@ impl OutputRequirementExec {
}
}

pub(crate) fn input(&self) -> Arc<dyn ExecutionPlan> {
pub fn input(&self) -> Arc<dyn ExecutionPlan> {
self.input.clone()
}

Expand Down Expand Up @@ -179,8 +181,8 @@ impl ExecutionPlan for OutputRequirementExec {
fn execute(
&self,
_partition: usize,
_context: Arc<crate::execution::context::TaskContext>,
) -> Result<crate::physical_plan::SendableRecordBatchStream> {
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unreachable!();
}

Expand Down

0 comments on commit 3c36a9a

Please sign in to comment.