Skip to content

Commit

Permalink
Apply projection to Statistics in FilterExec (apache#13187)
Browse files Browse the repository at this point in the history
* Apply projection to `Statistics` in `FilterExec`

* Use Statistics::project in HashJoin
  • Loading branch information
alamb authored Nov 3, 2024
1 parent a9d4d52 commit 85f92ef
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 12 deletions.
20 changes: 20 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,26 @@ impl Statistics {
self
}

/// Project the statistics to the given column indices.
///
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
/// "b"}`.
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
let Some(projection) = projection else {
return self;
};

// todo: it would be nice to avoid cloning column statistics if
// possible (e.g. if the projection did not contain duplicates)
self.column_statistics = projection
.iter()
.map(|&i| self.column_statistics[i].clone())
.collect();

self
}

/// Calculates the statistics after `fetch` and `skip` operations apply.
/// Here, `self` denotes per-partition statistics. Use the `n_partitions`
/// parameter to compute global statistics in a multi-partition setting.
Expand Down
7 changes: 6 additions & 1 deletion datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,12 @@ impl ExecutionPlan for FilterExec {
/// The output statistics of a filtering operation can be estimated if the
/// predicate's selectivity value can be determined for the incoming data.
fn statistics(&self) -> Result<Statistics> {
Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)
let stats = Self::statistics_helper(
&self.input,
self.predicate(),
self.default_selectivity,
)?;
Ok(stats.project(self.projection.as_ref()))
}

fn cardinality_effect(&self) -> CardinalityEffect {
Expand Down
13 changes: 2 additions & 11 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,24 +785,15 @@ impl ExecutionPlan for HashJoinExec {
// TODO stats: it is not possible in general to know the output size of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
let mut stats = estimate_join_statistics(
let stats = estimate_join_statistics(
Arc::clone(&self.left),
Arc::clone(&self.right),
self.on.clone(),
&self.join_type,
&self.join_schema,
)?;
// Project statistics if there is a projection
if let Some(projection) = &self.projection {
stats.column_statistics = stats
.column_statistics
.into_iter()
.enumerate()
.filter(|(i, _)| projection.contains(i))
.map(|(_, s)| s)
.collect();
}
Ok(stats)
Ok(stats.project(self.projection.as_ref()))
}
}

Expand Down
49 changes: 49 additions & 0 deletions datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,52 @@ FixedSizeBinary(16) 0166ce1d46129ad104fa4990c6057c91

statement ok
DROP TABLE test_non_utf8_binary;


## Tests for https://github.com/apache/datafusion/issues/13186
statement ok
create table cpu (time timestamp, usage_idle float, usage_user float, cpu int);

statement ok
insert into cpu values ('1970-01-01 00:00:00', 1.0, 2.0, 3);

# must put it into a parquet file to get statistics
statement ok
copy (select * from cpu) to 'test_files/scratch/parquet/cpu.parquet';

# Run queries against parquet files
statement ok
create external table cpu_parquet
stored as parquet
location 'test_files/scratch/parquet/cpu.parquet';

# Double filtering
#
# Expect 1 row for both queries
query PI
select time, rn
from (
select time, row_number() OVER (ORDER BY usage_idle, time) as rn
from cpu
where cpu = 3
) where rn > 0;
----
1970-01-01T00:00:00 1

query PI
select time, rn
from (
select time, row_number() OVER (ORDER BY usage_idle, time) as rn
from cpu_parquet
where cpu = 3
) where rn > 0;
----
1970-01-01T00:00:00 1


# Clean up
statement ok
drop table cpu;

statement ok
drop table cpu_parquet;

0 comments on commit 85f92ef

Please sign in to comment.