Skip to content

Commit 34c8285

Browse files
alambadriangb
andauthored
Improve doc comments of filter-pushdown-apis (#22)
* Improve doc comments * Apply suggestions from code review --------- Co-authored-by: Adrian Garcia Badaracco <[email protected]>
1 parent c78a590 commit 34c8285

File tree

3 files changed

+47
-26
lines changed

3 files changed

+47
-26
lines changed

datafusion/datasource/src/file.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ pub trait FileSource: Send + Sync {
9595
Ok(None)
9696
}
9797

98+
/// Push down filters to the file source if supported.
99+
///
100+
/// Returns `Ok(None)` by default. See [`ExecutionPlan::with_filter_pushdown_result`]
101+
/// for more details.
102+
///
103+
/// [`ExecutionPlan::with_filter_pushdown_result`]: datafusion_physical_plan::execution_plan::ExecutionPlan::with_filter_pushdown_result
98104
fn push_down_filters(
99105
&self,
100106
_filters: &[PhysicalExprRef],

datafusion/datasource/src/source.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion_physical_plan::{
3434

3535
use crate::file_scan_config::FileScanConfig;
3636
use datafusion_common::config::ConfigOptions;
37-
use datafusion_common::{Constraints, Statistics};
37+
use datafusion_common::{Constraints, Result, Statistics};
3838
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3939
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExprRef};
4040
use datafusion_physical_expr_common::sort_expr::LexOrdering;
@@ -54,7 +54,7 @@ pub trait DataSource: Send + Sync + Debug {
5454
&self,
5555
partition: usize,
5656
context: Arc<TaskContext>,
57-
) -> datafusion_common::Result<SendableRecordBatchStream>;
57+
) -> Result<SendableRecordBatchStream>;
5858
fn as_any(&self) -> &dyn Any;
5959
/// Format this source for display in explain plans
6060
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
@@ -65,13 +65,13 @@ pub trait DataSource: Send + Sync + Debug {
6565
_target_partitions: usize,
6666
_repartition_file_min_size: usize,
6767
_output_ordering: Option<LexOrdering>,
68-
) -> datafusion_common::Result<Option<Arc<dyn DataSource>>> {
68+
) -> Result<Option<Arc<dyn DataSource>>> {
6969
Ok(None)
7070
}
7171

7272
fn output_partitioning(&self) -> Partitioning;
7373
fn eq_properties(&self) -> EquivalenceProperties;
74-
fn statistics(&self) -> datafusion_common::Result<Statistics>;
74+
fn statistics(&self) -> Result<Statistics>;
7575
/// Return a copy of this DataSource with a new fetch limit
7676
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
7777
fn fetch(&self) -> Option<usize>;
@@ -81,15 +81,25 @@ pub trait DataSource: Send + Sync + Debug {
8181
fn try_swapping_with_projection(
8282
&self,
8383
_projection: &ProjectionExec,
84-
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
85-
/// Push down filters from parent execution plans to this data source.
86-
/// This is expected to return Ok(None) if the filters cannot be pushed down.
87-
/// If they can be pushed down it should return a [`FilterPushdownResult`] containing the new
88-
/// data source and the support level for each filter (exact or inexact).
84+
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
85+
86+
/// Push down filters into this `DataSource`.
87+
///
88+
/// Returns `Ok(None)` if the filters cannot be evaluated within the
89+
/// `DataSource`.
90+
///
91+
/// If the filters can be evaluated by the `DataSource`,
92+
/// return a [`FilterPushdownResult`] containing an updated
93+
/// `DataSource` and the support level for each filter (exact or inexact).
94+
///
95+
/// Default implementation returns `Ok(None)`. See [`ExecutionPlan::with_filter_pushdown_result`]
96+
/// for more details.
97+
///
98+
/// [`ExecutionPlan::push_down_filters`]: datafusion_physical_plan::execution_plan::ExecutionPlan::with_filter_pushdown_result
8999
fn push_down_filters(
90100
&self,
91101
_filters: &[PhysicalExprRef],
92-
) -> datafusion_common::Result<Option<DataSourceFilterPushdownResult>> {
102+
) -> Result<Option<DataSourceFilterPushdownResult>>> {
93103
Ok(None)
94104
}
95105
}
@@ -146,15 +156,15 @@ impl ExecutionPlan for DataSourceExec {
146156
fn with_new_children(
147157
self: Arc<Self>,
148158
_: Vec<Arc<dyn ExecutionPlan>>,
149-
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
159+
) -> Result<Arc<dyn ExecutionPlan>> {
150160
Ok(self)
151161
}
152162

153163
fn repartitioned(
154164
&self,
155165
target_partitions: usize,
156166
config: &ConfigOptions,
157-
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
167+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
158168
let data_source = self.data_source.repartitioned(
159169
target_partitions,
160170
config.optimizer.repartition_file_min_size,
@@ -178,15 +188,15 @@ impl ExecutionPlan for DataSourceExec {
178188
&self,
179189
partition: usize,
180190
context: Arc<TaskContext>,
181-
) -> datafusion_common::Result<SendableRecordBatchStream> {
191+
) -> Result<SendableRecordBatchStream> {
182192
self.data_source.open(partition, context)
183193
}
184194

185195
fn metrics(&self) -> Option<MetricsSet> {
186196
Some(self.data_source.metrics().clone_inner())
187197
}
188198

189-
fn statistics(&self) -> datafusion_common::Result<Statistics> {
199+
fn statistics(&self) -> Result<Statistics> {
190200
self.data_source.statistics()
191201
}
192202

@@ -204,15 +214,15 @@ impl ExecutionPlan for DataSourceExec {
204214
fn try_swapping_with_projection(
205215
&self,
206216
projection: &ProjectionExec,
207-
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
217+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
208218
self.data_source.try_swapping_with_projection(projection)
209219
}
210220

211221
fn with_filter_pushdown_result(
212222
self: Arc<Self>,
213223
own_filters_result: &[FilterSupport],
214224
parent_filters_remaining: &[PhysicalExprRef],
215-
) -> datafusion_common::Result<Option<ExecutionPlanFilterPushdownResult>> {
225+
) -> Result<Option<ExecutionPlanFilterPushdownResult>> {
216226
// We didn't give out any filters, this should be empty!
217227
assert!(own_filters_result.is_empty());
218228
// Forward filter pushdown to our data source.

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -469,27 +469,32 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
469469
}
470470

471471
/// Returns a set of filters that this operator owns but would like to be pushed down.
472-
/// For example, a `TopK` operator may produce dynamic filters that reference it's currrent state,
473-
/// while a `FilterExec` will just hand of the filters it has as is.
474-
/// The default implementation returns an empty vector.
475-
/// These filters are applied row-by row and any that return `false` or `NULL` will be
476-
/// filtered out and any that return `true` will be kept.
477-
/// The expressions returned **must** always return `true` or `false`;
478-
/// other truthy or falsy values are not allowed (e.g. `0`, `1`).
472+
///
473+
/// For example, a `TopK` operator may produce dynamic filters that
474+
/// reference it's current state, while a `FilterExec` will just hand of the
475+
/// filters it has as is.
476+
///
477+
/// The default implementation returns an empty vector. These filters are
478+
/// applied row-by row:
479+
/// 1. any that return `false` or `NULL` will be filtered out
480+
/// 2. any that return `true` will be kept.
481+
///
482+
/// The expressions returned **must** always be Boolean ( `true`, `false` or
483+
/// NULL); other truthy or falsy values are not allowed (e.g. `0`, `1`).
479484
///
480485
/// # Returns
481486
/// A vector of filters that this operator would like to push down.
482487
/// These should be treated as the split conjunction of a `WHERE` clause.
483488
/// That is, a query such as `WHERE a = 1 AND b = 2` would return two
484489
/// filters: `a = 1` and `b = 2`.
485-
/// They can always be assembled into a single filter using
486-
/// [`split_conjunction`][datafusion_physical_expr::split_conjunction].
490+
/// They can be combined into a single filter using
491+
/// [`conjunction`][datafusion_physical_expr::conjunction].
487492
fn filters_for_pushdown(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
488493
Ok(Vec::new())
489494
}
490495

491496
/// Checks which filters this node allows to be pushed down through it from a parent to a child.
492-
/// For example, a `ProjectionExec` node can allow filters that only refernece
497+
/// For example, a `ProjectionExec` node can allow filters that only reference
493498
/// columns it did not create through but filters that reference columns it is creating cannot be pushed down any further.
494499
/// That is, it only allows some filters through because it changes the schema of the data.
495500
/// Aggregation nodes may not allow any filters to be pushed down as they change the cardinality of the data.

0 commit comments

Comments
 (0)