diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/functions/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/functions/mod.rs index c34f7abdedc3..742804c1db3a 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/functions/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/functions/mod.rs @@ -23,7 +23,7 @@ pub(super) fn process_functions( swapping, schema: _, } => { - let clear = !ctx.acc_projections.is_empty(); + let clear = ctx.has_pushed_down(); process_rename( &mut ctx.acc_projections, &mut ctx.projected_names, @@ -65,7 +65,7 @@ pub(super) fn process_functions( process_unpivot(proj_pd, lp, args, input, ctx, lp_arena, expr_arena) }, _ => { - if function.allow_projection_pd() && !ctx.acc_projections.is_empty() { + if function.allow_projection_pd() && ctx.has_pushed_down() { let original_acc_projection_len = ctx.acc_projections.len(); // add columns needed for the function. diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/functions/unpivot.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/functions/unpivot.rs index e959853859f7..a84e62b514de 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/functions/unpivot.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/functions/unpivot.rs @@ -43,7 +43,7 @@ pub(super) fn process_unpivot( expr_arena, ) }); - let ctx = ProjectionContext::new(acc_projections, projected_names, ctx.projections_seen); + let ctx = ProjectionContext::new(acc_projections, projected_names, ctx.inner); proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?; diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/group_by.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/group_by.rs index a4bcedf0d732..7bf9ac41ef62 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/group_by.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/group_by.rs @@ -32,7 +32,7 @@ pub(super) fn process_group_by( let builder = IRBuilder::new(input, expr_arena, lp_arena); Ok(proj_pd.finish_node_simple_projection(&ctx.acc_projections, builder)) } else { - let has_pushed_down = !ctx.acc_projections.is_empty(); + let has_pushed_down = ctx.has_pushed_down(); // TODO! remove unnecessary vec alloc. let (mut acc_projections, _local_projections, mut names) = split_acc_projections( @@ -46,7 +46,7 @@ pub(super) fn process_group_by( let projected_aggs = aggs .into_iter() .filter(|agg| { - if has_pushed_down && ctx.projections_seen > 0 { + if has_pushed_down && ctx.inner.projections_seen > 0 { ctx.projected_names.contains(agg.output_name()) } else { true @@ -75,7 +75,7 @@ pub(super) fn process_group_by( let node = expr_arena.add(AExpr::Column(options.index_column.clone())); add_expr_to_accumulated(node, &mut acc_projections, &mut names, expr_arena); } - let ctx = ProjectionContext::new(acc_projections, names, ctx.projections_seen); + let ctx = ProjectionContext::new(acc_projections, names, ctx.inner); proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?; diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/hconcat.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/hconcat.rs index 607826321b7f..b3e3c5ace987 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/hconcat.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/hconcat.rs @@ -36,7 +36,7 @@ pub(super) fn process_hconcat( input_names.insert(name); } } - let ctx = ProjectionContext::new(input_pushdown, input_names, ctx.projections_seen); + let ctx = ProjectionContext::new(input_pushdown, input_names, ctx.inner); proj_pd.pushdown_and_assign(*input, ctx, lp_arena, expr_arena)?; } diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/hstack.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/hstack.rs index 86f0aca5183a..b459628a31b1 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/hstack.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/hstack.rs @@ -10,7 +10,7 @@ pub(super) fn process_hstack( lp_arena: &mut Arena, expr_arena: &mut Arena, ) -> PolarsResult { - if !ctx.acc_projections.is_empty() { + if ctx.has_pushed_down() { let mut pruned_with_cols = Vec::with_capacity(exprs.len()); // Check if output names are used upstream @@ -57,7 +57,7 @@ pub(super) fn process_hstack( true, // expands_schema ); - let ctx = ProjectionContext::new(acc_projections, names, ctx.projections_seen); + let ctx = ProjectionContext::new(acc_projections, names, ctx.inner); proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?; let lp = IRBuilder::new(input, expr_arena, lp_arena) diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs index d8f5d1bf2555..f6882aadb340 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs @@ -65,7 +65,7 @@ pub(super) fn process_asof_join( // left_on = "a", right_on = "b // will remove the name "b" (it is "a" now). That columns should therefore not // be added to a local projection. - if !ctx.acc_projections.is_empty() { + if ctx.has_pushed_down() { let schema_left = lp_arena.get(input_left).schema(lp_arena); let schema_right = lp_arena.get(input_right).schema(lp_arena); @@ -160,8 +160,8 @@ pub(super) fn process_asof_join( } } - let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.projections_seen); - let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.projections_seen); + let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.inner); + let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.inner); proj_pd.pushdown_and_assign(input_left, ctx_left, lp_arena, expr_arena)?; proj_pd.pushdown_and_assign(input_right, ctx_right, lp_arena, expr_arena)?; @@ -223,7 +223,7 @@ pub(super) fn process_join( // left_on = "a", right_on = "b // will remove the name "b" (it is "a" now). That columns should therefore not // be added to a local projection. - if !ctx.acc_projections.is_empty() { + if ctx.has_pushed_down() { let schema_left = lp_arena.get(input_left).schema(lp_arena); let schema_right = lp_arena.get(input_right).schema(lp_arena); @@ -339,8 +339,8 @@ pub(super) fn process_join( } } - let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.projections_seen); - let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.projections_seen); + let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.inner); + let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.inner); proj_pd.pushdown_and_assign(input_left, ctx_left, lp_arena, expr_arena)?; proj_pd.pushdown_and_assign(input_right, ctx_right, lp_arena, expr_arena)?; diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs index b7ddd136dd2d..c5073f8accd0 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs @@ -27,36 +27,38 @@ use crate::prelude::optimizer::projection_pushdown::rename::process_rename; use crate::prelude::*; use crate::utils::aexpr_to_leaf_names; +#[derive(Default, Copy, Clone)] +struct ProjectionCopyState { + projections_seen: usize, + count_star: bool, +} + #[derive(Clone, Default)] struct ProjectionContext { acc_projections: Vec, projected_names: PlHashSet, - projections_seen: usize, + inner: ProjectionCopyState, } impl ProjectionContext { fn new( acc_projections: Vec, projected_names: PlHashSet, - projections_seen: usize, + inner: ProjectionCopyState, ) -> Self { Self { acc_projections, projected_names, - projections_seen, + inner, } } - fn reset(&self, reset_count: bool) -> ProjectionContext { - Self { - acc_projections: Default::default(), - projected_names: Default::default(), - projections_seen: if reset_count { - 0 - } else { - self.projections_seen - }, - } + /// If this is `true`, other nodes should add the columns + /// they need to the push down state + fn has_pushed_down(&self) -> bool { + // count star also acts like a pushdown as we will select a single column at the source + // when there were no other projections. + !self.acc_projections.is_empty() || self.inner.count_star } fn process_count_star_at_scan(&mut self, schema: &Schema, expr_arena: &mut Arena) { @@ -230,7 +232,8 @@ impl ProjectionPushDown { .iter() .map(|&node| { let alp = lp_arena.take(node); - let alp = self.push_down(alp, ctx.reset(false), lp_arena, expr_arena)?; + let ctx = ProjectionContext::new(Default::default(), Default::default(), ctx.inner); + let alp = self.push_down(alp, ctx, lp_arena, expr_arena)?; lp_arena.replace(node, alp); Ok(node) }) @@ -384,7 +387,7 @@ impl ProjectionPushDown { if self.is_count_star { ctx.process_count_star_at_scan(&schema, expr_arena); } - if !ctx.acc_projections.is_empty() { + if ctx.has_pushed_down() { output_schema = Some(Arc::new(update_scan_schema( &ctx.acc_projections, expr_arena, @@ -659,7 +662,7 @@ impl ProjectionPushDown { slice, sort_options, } => { - if !ctx.acc_projections.is_empty() { + if ctx.has_pushed_down() { // Make sure that the column(s) used for the sort is projected by_column.iter().for_each(|node| { add_expr_to_accumulated( @@ -681,7 +684,7 @@ impl ProjectionPushDown { }, Distinct { input, options } => { // make sure that the set of unique columns is projected - if !ctx.acc_projections.is_empty() { + if ctx.has_pushed_down() { if let Some(subset) = options.subset.as_ref() { subset.iter().for_each(|name| { add_str_to_accumulated( @@ -709,7 +712,7 @@ impl ProjectionPushDown { Ok(Distinct { input, options }) }, Filter { predicate, input } => { - if !ctx.acc_projections.is_empty() { + if ctx.has_pushed_down() { // make sure that the filter column is projected add_expr_to_accumulated( predicate.node(), diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/projection.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/projection.rs index 6dd94f21dcef..6be0c26b860d 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/projection.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/projection.rs @@ -22,9 +22,65 @@ pub(super) fn process_projection( // as there would be no projections and we would read // the whole file while we only want the count if exprs.len() == 1 && is_count(exprs[0].node(), expr_arena) { + //let input_schema = lp_arena.get(input).schema(lp_arena); + //let expr = exprs[0].node(); + //let expr = if input_schema.is_empty() { + // // If the input schema is empty, we should just project + // // ourselves + // exprs[0].node() + //} else { + // // Select the last column projection. + // let (last_name, _) = input_schema.try_get_at_index(input_schema.len() - 1)?; + // + // let name = match lp_arena.get(input) { + // IR::Select { expr: exprs, .. } | IR::HStack { exprs, .. } => (|| { + // for e in exprs { + // if !e.is_scalar(expr_arena) { + // return e.output_name(); + // } + // } + // + // last_name + // })(), + // + // IR::Scan { + // file_info, + // output_schema, + // .. + // } => { + // let schema = output_schema.as_ref().unwrap_or(&file_info.schema); + // // NOTE: the first can be the inserted index column, so that might not work + // let (last_name, _) = schema.try_get_at_index(schema.len() - 1)?; + // last_name + // }, + // + // IR::DataFrameScan { + // schema, + // output_schema, + // .. + // } => { + // // NOTE: the first can be the inserted index column, so that might not work + // let schema = output_schema.as_ref().unwrap_or(schema); + // let (last_name, _) = schema.try_get_at_index(schema.len() - 1)?; + // last_name + // }, + // + // _ => last_name, + // }; + // + // expr_arena.add(AExpr::Column(name.clone())) + //}; + // Clear all accumulated projections since we only project a single column from this level. ctx.acc_projections.clear(); ctx.projected_names.clear(); + ctx.inner.count_star = true; + //add_expr_to_accumulated( + // expr, + // &mut ctx.acc_projections, + // &mut ctx.projected_names, + // expr_arena, + //); local_projection.push(exprs.pop().unwrap()); proj_pd.is_count_star = true; } else { @@ -112,7 +168,7 @@ pub(super) fn process_projection( } } - ctx.projections_seen += 1; + ctx.inner.projections_seen += 1; proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?; let builder = IRBuilder::new(input, expr_arena, lp_arena); diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/semi_anti_join.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/semi_anti_join.rs index d3f06b5d2a55..419f75695a17 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/semi_anti_join.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/semi_anti_join.rs @@ -56,8 +56,8 @@ pub(super) fn process_semi_anti_join( } } - let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.projections_seen); - let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.projections_seen); + let ctx_left = ProjectionContext::new(pushdown_left, names_left, ctx.inner); + let ctx_right = ProjectionContext::new(pushdown_right, names_right, ctx.inner); proj_pd.pushdown_and_assign(input_left, ctx_left, lp_arena, expr_arena)?; proj_pd.pushdown_and_assign(input_right, ctx_right, lp_arena, expr_arena)?; diff --git a/py-polars/tests/unit/test_projections.py b/py-polars/tests/unit/test_projections.py index 58e425cd9806..6bf3afb55657 100644 --- a/py-polars/tests/unit/test_projections.py +++ b/py-polars/tests/unit/test_projections.py @@ -536,7 +536,7 @@ def test_projection_empty_frame_len_16904() -> None: q = df.select(pl.len()) - assert "PROJECT */0" in q.explain() + assert "PROJECT 0/0" in q.explain() expect = pl.DataFrame({"len": [0]}, schema_overrides={"len": pl.UInt32()}) assert_frame_equal(q.collect(), expect) @@ -630,3 +630,19 @@ def test_select_len_20337() -> None: q = q.with_row_index("foo") assert q.select(pl.len()).collect().item() == 3 + + +def test_filter_count_projection_20902() -> None: + lineitem_ldf = pl.LazyFrame( + { + "l_partkey": [1], + "l_quantity": [1], + "l_extendedprice": [1], + } + ) + assert ( + "PROJECT 1/3" + in lineitem_ldf.filter(pl.col("l_partkey").is_between(10, 20)) + .select(pl.len()) + .explain() + )