Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Nov 14, 2024
1 parent 345117b commit e5fd1eb
Show file tree
Hide file tree
Showing 34 changed files with 1,293 additions and 37 deletions.
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,10 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false

/// when set to true, datafusion would try to push the build side statistic
/// to probe phase
pub dynamic_join_pushdown: bool, default = true
}
}

Expand Down
28 changes: 22 additions & 6 deletions datafusion/core/src/datasource/physical_plan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@
//! Note: Most traits here need to be marked `Sync + Send` to be
//! compliant with the `SendableRecordBatchStream` trait.

use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector;
use crate::datasource::physical_plan::{FileMeta, FileScanConfig};
Expand All @@ -34,13 +29,19 @@ use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
use crate::physical_plan::RecordBatchStream;
use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::instant::Instant;
use datafusion_common::ScalarValue;

use datafusion_physical_plan::joins::DynamicFilterInfo;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt, Stream, StreamExt};
Expand Down Expand Up @@ -95,6 +96,8 @@ pub struct FileStream<F: FileOpener> {
baseline_metrics: BaselineMetrics,
/// Describes the behavior of the `FileStream` if file opening or scanning fails
on_error: OnError,
/// dynamic filters
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
}

/// Represents the state of the next `FileOpenFuture`. Since we need to poll
Expand Down Expand Up @@ -272,6 +275,7 @@ impl<F: FileOpener> FileStream<F> {
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
baseline_metrics: BaselineMetrics::new(metrics, partition),
on_error: OnError::Fail,
dynamic_filters: None,
})
}

Expand All @@ -283,6 +287,14 @@ impl<F: FileOpener> FileStream<F> {
self.on_error = on_error;
self
}
/// with dynamic filters
pub fn with_dynamic_filter(
mut self,
dynamic_filter: Option<Arc<DynamicFilterInfo>>,
) -> Self {
self.dynamic_filters = dynamic_filter;
self
}

/// Begin opening the next file in parallel while decoding the current file in FileStream.
///
Expand Down Expand Up @@ -390,7 +402,11 @@ impl<F: FileOpener> FileStream<F> {
}
}
match ready!(reader.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
Some(Ok(mut batch)) => {
// if there is a ready dynamic filter, we just use it to filter
if let Some(dynamic_filters) = &self.dynamic_filters {
batch = dynamic_filters.filter_batch(&batch)?
}
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
let result = self
Expand Down
49 changes: 44 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use crate::{
use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};

use datafusion_physical_plan::joins::DynamicFilterInfo;
use datafusion_physical_plan::Metric;
use itertools::Itertools;
use log::debug;

Expand Down Expand Up @@ -282,6 +284,8 @@ pub struct ParquetExec {
table_parquet_options: TableParquetOptions,
/// Optional user defined schema adapter
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
/// dynamic filters (like join filters)
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
}

impl From<ParquetExec> for ParquetExecBuilder {
Expand All @@ -291,7 +295,6 @@ impl From<ParquetExec> for ParquetExecBuilder {
}

/// [`ParquetExecBuilder`], builder for [`ParquetExec`].
///
/// See example on [`ParquetExec`].
pub struct ParquetExecBuilder {
file_scan_config: FileScanConfig,
Expand Down Expand Up @@ -463,6 +466,7 @@ impl ParquetExecBuilder {
cache,
table_parquet_options,
schema_adapter_factory,
dynamic_filters: None,
}
}
}
Expand Down Expand Up @@ -515,6 +519,7 @@ impl ParquetExec {
cache: _,
table_parquet_options,
schema_adapter_factory,
..
} = self;
ParquetExecBuilder {
file_scan_config: base_config,
Expand Down Expand Up @@ -711,10 +716,9 @@ impl DisplayAs for ParquetExec {
)
})
.unwrap_or_default();

write!(f, "ParquetExec: ")?;
self.base_config.fmt_as(t, f)?;
write!(f, "{}{}", predicate_string, pruning_predicate_string,)
write!(f, "{}{}", predicate_string, pruning_predicate_string)
}
}
}
Expand Down Expand Up @@ -798,7 +802,16 @@ impl ExecutionPlan for ParquetExec {
.schema_adapter_factory
.clone()
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));

if let Some(dynamic_filter) = &self.dynamic_filters {
let (final_expr, name) =
dynamic_filter.final_predicate(self.predicate.clone());
if final_expr.is_some() {
self.metrics.register(Arc::new(Metric::new(
datafusion_physical_plan::metrics::MetricValue::DynamicFilter(name),
None,
)));
}
}
let opener = ParquetOpener {
partition_index,
projection: Arc::from(projection),
Expand All @@ -819,7 +832,8 @@ impl ExecutionPlan for ParquetExec {
};

let stream =
FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?;
FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?
.with_dynamic_filter(self.dynamic_filters.clone());

Ok(Box::pin(stream))
}
Expand Down Expand Up @@ -862,8 +876,33 @@ impl ExecutionPlan for ParquetExec {
cache: self.cache.clone(),
table_parquet_options: self.table_parquet_options.clone(),
schema_adapter_factory: self.schema_adapter_factory.clone(),
dynamic_filters: self.dynamic_filters.clone(),
}))
}

fn support_dynamic_filter(&self) -> bool {
true
}

fn with_dynamic_filter(
&self,
dynamic_filters: Option<Arc<DynamicFilterInfo>>,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(Some(Arc::new(ParquetExec {
base_config: self.base_config.clone(),
projected_statistics: self.projected_statistics.clone(),
metrics: self.metrics.clone(),
predicate: self.predicate.clone(),
pruning_predicate: self.pruning_predicate.clone(),
page_pruning_predicate: self.page_pruning_predicate.clone(),
metadata_size_hint: self.metadata_size_hint,
parquet_file_reader_factory: self.parquet_file_reader_factory.clone(),
cache: self.cache.clone(),
table_parquet_options: self.table_parquet_options.clone(),
schema_adapter_factory: self.schema_adapter_factory.clone(),
dynamic_filters,
})))
}
}

fn should_enable_page_index(
Expand Down
116 changes: 116 additions & 0 deletions datafusion/core/src/physical_optimizer/join_filter_pushdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Pushdown the dynamic join filters down to scan execution if there is any

use std::sync::Arc;

use crate::datasource::physical_plan::ParquetExec;
use crate::physical_plan::ExecutionPlan;
use crate::{config::ConfigOptions, error::Result, physical_plan::joins::HashJoinExec};
use datafusion_common::tree_node::{Transformed, TransformedResult};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::joins::DynamicFilterInfo;

/// this rule used for pushing the build side statistic down to probe phase
#[derive(Default, Debug)]
pub struct JoinFilterPushdown {}

impl JoinFilterPushdown {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for JoinFilterPushdown {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if !config.optimizer.dynamic_join_pushdown {
return Ok(plan);
}
optimize_impl(plan, &mut None).data()
}

fn name(&self) -> &str {
"JoinFilterPushdown"
}

fn schema_check(&self) -> bool {
true
}
}

fn optimize_impl(
plan: Arc<dyn ExecutionPlan>,
join_filters: &mut Option<Arc<DynamicFilterInfo>>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
if let Some(hashjoin_exec) = plan.as_any().downcast_ref::<HashJoinExec>() {
join_filters.clone_from(&hashjoin_exec.dynamic_filters_pushdown);
let new_right = optimize_impl(hashjoin_exec.right.clone(), join_filters)?;
if new_right.transformed {
let new_hash_join = HashJoinExec::try_new(
hashjoin_exec.left().clone(),
new_right.data,
hashjoin_exec.on.clone(),
hashjoin_exec.filter().cloned(),
hashjoin_exec.join_type(),
hashjoin_exec.projection.clone(),
*hashjoin_exec.partition_mode(),
hashjoin_exec.null_equals_null(),
)?
.with_dynamic_filter(hashjoin_exec.dynamic_filters_pushdown.clone())?
.map_or(plan, |f| f);
return Ok(Transformed::yes(new_hash_join));
}
Ok(Transformed::no(plan))
} else if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
if let Some(dynamic_filters) = join_filters {
let final_exec = parquet_exec
.clone()
.with_dynamic_filter(Some(dynamic_filters.clone()))?;
if let Some(plan) = final_exec {
return Ok(Transformed::yes(plan));
} else {
return Ok(Transformed::no(plan));
}
}
Ok(Transformed::no(plan))
} else {
let children = plan.children();
let mut new_children = Vec::with_capacity(children.len());
let mut transformed = false;

for child in children {
let new_child = optimize_impl(child.clone(), join_filters)?;
if new_child.transformed {
transformed = true;
}
new_children.push(new_child.data);
}

if transformed {
let new_plan = plan.with_new_children(new_children)?;
Ok(Transformed::yes(new_plan))
} else {
Ok(Transformed::no(plan))
}
}
}
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ fn try_collect_left(
{
Ok(Some(swap_hash_join(hash_join, PartitionMode::CollectLeft)?))
} else {
Ok(Some(Arc::new(HashJoinExec::try_new(
Ok(HashJoinExec::try_new(
Arc::clone(left),
Arc::clone(right),
hash_join.on().to_vec(),
Expand All @@ -396,7 +396,8 @@ fn try_collect_left(
hash_join.projection.clone(),
PartitionMode::CollectLeft,
hash_join.null_equals_null(),
)?)))
)?
.with_dynamic_filter(hash_join.dynamic_filters_pushdown.clone())?)
}
}
(true, false) => Ok(Some(Arc::new(HashJoinExec::try_new(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
pub mod coalesce_batches;
pub mod enforce_distribution;
pub mod enforce_sorting;
#[cfg(feature = "parquet")]
pub mod join_filter_pushdown;
pub mod join_selection;
pub mod optimizer;
pub mod projection_pushdown;
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

//! Physical optimizer traits

use datafusion_physical_optimizer::PhysicalOptimizerRule;
use std::sync::Arc;

#[cfg(feature = "parquet")]
use super::join_filter_pushdown::JoinFilterPushdown;
use super::projection_pushdown::ProjectionPushdown;
use super::update_aggr_exprs::OptimizeAggregateOrder;
use crate::physical_optimizer::aggregate_statistics::AggregateStatistics;
Expand All @@ -33,6 +32,8 @@ use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggr
use crate::physical_optimizer::output_requirements::OutputRequirements;
use crate::physical_optimizer::sanity_checker::SanityCheckPlan;
use crate::physical_optimizer::topk_aggregation::TopKAggregation;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use std::sync::Arc;

/// A rule-based physical optimizer.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -112,6 +113,8 @@ impl PhysicalOptimizer {
// given query plan; i.e. it only acts as a final
// gatekeeping rule.
Arc::new(SanityCheckPlan::new()),
#[cfg(feature = "parquet")]
Arc::new(JoinFilterPushdown::new()),
];

Self::with_rules(rules)
Expand Down
Loading

0 comments on commit e5fd1eb

Please sign in to comment.