Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
perf: Ensure count query select minimal columns
Browse files Browse the repository at this point in the history
ritchie46 committed Jan 26, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 0c416a3 commit bdaabd6
Showing 10 changed files with 112 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ pub(super) fn process_functions(
swapping,
schema: _,
} => {
let clear = !ctx.acc_projections.is_empty();
let clear = ctx.has_pushed_down();
process_rename(
&mut ctx.acc_projections,
&mut ctx.projected_names,
@@ -65,7 +65,7 @@ pub(super) fn process_functions(
process_unpivot(proj_pd, lp, args, input, ctx, lp_arena, expr_arena)
},
_ => {
if function.allow_projection_pd() && !ctx.acc_projections.is_empty() {
if function.allow_projection_pd() && ctx.has_pushed_down() {
let original_acc_projection_len = ctx.acc_projections.len();

// add columns needed for the function.
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ pub(super) fn process_unpivot(
expr_arena,
)
});
let ctx = ProjectionContext::new(acc_projections, projected_names, ctx.projections_seen);
let ctx = ProjectionContext::new(acc_projections, projected_names, ctx.inner);

proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;

Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ pub(super) fn process_group_by(
let builder = IRBuilder::new(input, expr_arena, lp_arena);
Ok(proj_pd.finish_node_simple_projection(&ctx.acc_projections, builder))
} else {
let has_pushed_down = !ctx.acc_projections.is_empty();
let has_pushed_down = ctx.has_pushed_down();

// TODO! remove unnecessary vec alloc.
let (mut acc_projections, _local_projections, mut names) = split_acc_projections(
@@ -46,7 +46,7 @@ pub(super) fn process_group_by(
let projected_aggs = aggs
.into_iter()
.filter(|agg| {
if has_pushed_down && ctx.projections_seen > 0 {
if has_pushed_down && ctx.inner.projections_seen > 0 {
ctx.projected_names.contains(agg.output_name())
} else {
true
@@ -75,7 +75,7 @@ pub(super) fn process_group_by(
let node = expr_arena.add(AExpr::Column(options.index_column.clone()));
add_expr_to_accumulated(node, &mut acc_projections, &mut names, expr_arena);
}
let ctx = ProjectionContext::new(acc_projections, names, ctx.projections_seen);
let ctx = ProjectionContext::new(acc_projections, names, ctx.inner);

proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;

Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ pub(super) fn process_hconcat(
input_names.insert(name);
}
}
let ctx = ProjectionContext::new(input_pushdown, input_names, ctx.projections_seen);
let ctx = ProjectionContext::new(input_pushdown, input_names, ctx.inner);
proj_pd.pushdown_and_assign(*input, ctx, lp_arena, expr_arena)?;
}

Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ pub(super) fn process_hstack(
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<IR> {
if !ctx.acc_projections.is_empty() {
if ctx.has_pushed_down() {
let mut pruned_with_cols = Vec::with_capacity(exprs.len());

// Check if output names are used upstream
@@ -57,7 +57,7 @@ pub(super) fn process_hstack(
true, // expands_schema
);

let ctx = ProjectionContext::new(acc_projections, names, ctx.projections_seen);
let ctx = ProjectionContext::new(acc_projections, names, ctx.inner);
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;

let lp = IRBuilder::new(input, expr_arena, lp_arena)
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ pub(super) fn process_asof_join(
// left_on = "a", right_on = "b
// will remove the name "b" (it is "a" now). That columns should therefore not
// be added to a local projection.
if !ctx.acc_projections.is_empty() {
if ctx.has_pushed_down() {
let schema_left = lp_arena.get(input_left).schema(lp_arena);
let schema_right = lp_arena.get(input_right).schema(lp_arena);

@@ -160,8 +160,8 @@ pub(super) fn process_asof_join(
}
}

let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.projections_seen);
let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.projections_seen);
let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.inner);
let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.inner);

proj_pd.pushdown_and_assign(input_left, ctx_left, lp_arena, expr_arena)?;
proj_pd.pushdown_and_assign(input_right, ctx_right, lp_arena, expr_arena)?;
@@ -223,7 +223,7 @@ pub(super) fn process_join(
// left_on = "a", right_on = "b
// will remove the name "b" (it is "a" now). That columns should therefore not
// be added to a local projection.
if !ctx.acc_projections.is_empty() {
if ctx.has_pushed_down() {
let schema_left = lp_arena.get(input_left).schema(lp_arena);
let schema_right = lp_arena.get(input_right).schema(lp_arena);

@@ -339,8 +339,8 @@ pub(super) fn process_join(
}
}

let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.projections_seen);
let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.projections_seen);
let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.inner);
let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.inner);

proj_pd.pushdown_and_assign(input_left, ctx_left, lp_arena, expr_arena)?;
proj_pd.pushdown_and_assign(input_right, ctx_right, lp_arena, expr_arena)?;
39 changes: 21 additions & 18 deletions crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs
Original file line number Diff line number Diff line change
@@ -27,36 +27,38 @@ use crate::prelude::optimizer::projection_pushdown::rename::process_rename;
use crate::prelude::*;
use crate::utils::aexpr_to_leaf_names;

#[derive(Default, Copy, Clone)]
struct ProjectionCopyState {
projections_seen: usize,
count_star: bool,
}

#[derive(Clone, Default)]
struct ProjectionContext {
acc_projections: Vec<ColumnNode>,
projected_names: PlHashSet<PlSmallStr>,
projections_seen: usize,
inner: ProjectionCopyState,
}

impl ProjectionContext {
fn new(
acc_projections: Vec<ColumnNode>,
projected_names: PlHashSet<PlSmallStr>,
projections_seen: usize,
inner: ProjectionCopyState,
) -> Self {
Self {
acc_projections,
projected_names,
projections_seen,
inner,
}
}

fn reset(&self, reset_count: bool) -> ProjectionContext {
Self {
acc_projections: Default::default(),
projected_names: Default::default(),
projections_seen: if reset_count {
0
} else {
self.projections_seen
},
}
/// If this is `true`, other nodes should add the columns
/// they need to the push down state
fn has_pushed_down(&self) -> bool {
// count star also acts like a pushdown as we will select a single column at the source
// when there were no other projections.
!self.acc_projections.is_empty() || self.inner.count_star
}

fn process_count_star_at_scan(&mut self, schema: &Schema, expr_arena: &mut Arena<AExpr>) {
@@ -230,7 +232,8 @@ impl ProjectionPushDown {
.iter()
.map(|&node| {
let alp = lp_arena.take(node);
let alp = self.push_down(alp, ctx.reset(false), lp_arena, expr_arena)?;
let ctx = ProjectionContext::new(Default::default(), Default::default(), ctx.inner);
let alp = self.push_down(alp, ctx, lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)
})
@@ -384,7 +387,7 @@ impl ProjectionPushDown {
if self.is_count_star {
ctx.process_count_star_at_scan(&schema, expr_arena);
}
if !ctx.acc_projections.is_empty() {
if ctx.has_pushed_down() {
output_schema = Some(Arc::new(update_scan_schema(
&ctx.acc_projections,
expr_arena,
@@ -659,7 +662,7 @@ impl ProjectionPushDown {
slice,
sort_options,
} => {
if !ctx.acc_projections.is_empty() {
if ctx.has_pushed_down() {
// Make sure that the column(s) used for the sort is projected
by_column.iter().for_each(|node| {
add_expr_to_accumulated(
@@ -681,7 +684,7 @@ impl ProjectionPushDown {
},
Distinct { input, options } => {
// make sure that the set of unique columns is projected
if !ctx.acc_projections.is_empty() {
if ctx.has_pushed_down() {
if let Some(subset) = options.subset.as_ref() {
subset.iter().for_each(|name| {
add_str_to_accumulated(
@@ -709,7 +712,7 @@ impl ProjectionPushDown {
Ok(Distinct { input, options })
},
Filter { predicate, input } => {
if !ctx.acc_projections.is_empty() {
if ctx.has_pushed_down() {
// make sure that the filter column is projected
add_expr_to_accumulated(
predicate.node(),
Original file line number Diff line number Diff line change
@@ -22,9 +22,65 @@ pub(super) fn process_projection(
// as there would be no projections and we would read
// the whole file while we only want the count
if exprs.len() == 1 && is_count(exprs[0].node(), expr_arena) {
//let input_schema = lp_arena.get(input).schema(lp_arena);
//let expr = exprs[0].node();
//let expr = if input_schema.is_empty() {
// // If the input schema is empty, we should just project
// // ourselves
// exprs[0].node()
//} else {
// // Select the last column projection.
// let (last_name, _) = input_schema.try_get_at_index(input_schema.len() - 1)?;
//
// let name = match lp_arena.get(input) {
// IR::Select { expr: exprs, .. } | IR::HStack { exprs, .. } => (|| {
// for e in exprs {
// if !e.is_scalar(expr_arena) {
// return e.output_name();
// }
// }
//
// last_name
// })(),
//
// IR::Scan {
// file_info,
// output_schema,
// ..
// } => {
// let schema = output_schema.as_ref().unwrap_or(&file_info.schema);
// // NOTE: the first can be the inserted index column, so that might not work
// let (last_name, _) = schema.try_get_at_index(schema.len() - 1)?;
// last_name
// },
//
// IR::DataFrameScan {
// schema,
// output_schema,
// ..
// } => {
// // NOTE: the first can be the inserted index column, so that might not work
// let schema = output_schema.as_ref().unwrap_or(schema);
// let (last_name, _) = schema.try_get_at_index(schema.len() - 1)?;
// last_name
// },
//
// _ => last_name,
// };
//
// expr_arena.add(AExpr::Column(name.clone()))
//};

// Clear all accumulated projections since we only project a single column from this level.
ctx.acc_projections.clear();
ctx.projected_names.clear();
ctx.inner.count_star = true;
//add_expr_to_accumulated(
// expr,
// &mut ctx.acc_projections,
// &mut ctx.projected_names,
// expr_arena,
//);
local_projection.push(exprs.pop().unwrap());
proj_pd.is_count_star = true;
} else {
@@ -112,7 +168,7 @@ pub(super) fn process_projection(
}
}

ctx.projections_seen += 1;
ctx.inner.projections_seen += 1;
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;

let builder = IRBuilder::new(input, expr_arena, lp_arena);
Original file line number Diff line number Diff line change
@@ -56,8 +56,8 @@ pub(super) fn process_semi_anti_join(
}
}

let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.projections_seen);
let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.projections_seen);
let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.inner);
let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.inner);

proj_pd.pushdown_and_assign(input_left, ctx_left, lp_arena, expr_arena)?;
proj_pd.pushdown_and_assign(input_right, ctx_right, lp_arena, expr_arena)?;
18 changes: 17 additions & 1 deletion py-polars/tests/unit/test_projections.py
Original file line number Diff line number Diff line change
@@ -536,7 +536,7 @@ def test_projection_empty_frame_len_16904() -> None:

q = df.select(pl.len())

assert "PROJECT */0" in q.explain()
assert "PROJECT 0/0" in q.explain()

expect = pl.DataFrame({"len": [0]}, schema_overrides={"len": pl.UInt32()})
assert_frame_equal(q.collect(), expect)
@@ -630,3 +630,19 @@ def test_select_len_20337() -> None:

q = q.with_row_index("foo")
assert q.select(pl.len()).collect().item() == 3


def test_filter_count_projection_20902() -> None:
lineitem_ldf = pl.LazyFrame(
{
"l_partkey": [1],
"l_quantity": [1],
"l_extendedprice": [1],
}
)
assert (
"PROJECT 1/3"
in lineitem_ldf.filter(pl.col("l_partkey").is_between(10, 20))
.select(pl.len())
.explain()
)

0 comments on commit bdaabd6

Please sign in to comment.