diff --git a/crates/polars-lazy/src/tests/cse.rs b/crates/polars-lazy/src/tests/cse.rs index 0f2dd8b79d34..e0bb92c387cb 100644 --- a/crates/polars-lazy/src/tests/cse.rs +++ b/crates/polars-lazy/src/tests/cse.rs @@ -10,6 +10,14 @@ fn cached_before_root(q: LazyFrame) { } } +fn count_caches(q: LazyFrame) -> usize { + let (node, lp_arena, _) = q.to_alp_optimized().unwrap(); + (&lp_arena) + .iter(node) + .filter(|(_node, lp)| matches!(lp, ALogicalPlan::Cache { .. })) + .count() +} + #[test] fn test_cse_self_joins() -> PolarsResult<()> { let lf = scan_foods_ipc(); @@ -312,3 +320,37 @@ fn test_cse_columns_projections() -> PolarsResult<()> { Ok(()) } + +#[test] +fn test_cse_prune_scan_filter_difference() -> PolarsResult<()> { + let lf = scan_foods_ipc(); + let lf = lf.with_column(col("category").str().to_uppercase()); + + let pred = col("fats_g").gt(2.0); + + // If filter are the same, we can cache + let q = lf + .clone() + .filter(pred.clone()) + .left_join(lf.clone().filter(pred), col("fats_g"), col("fats_g")) + .with_comm_subplan_elim(true); + cached_before_root(q); + + // If the filters are different the caches are removed. + let q = lf + .clone() + .filter(col("fats_g").gt(2.0)) + .clone() + .left_join( + lf.filter(col("fats_g").gt(1.0)), + col("fats_g"), + col("fats_g"), + ) + .with_comm_subplan_elim(true); + + // Check that the caches are removed and that both predicates have been pushed down instead. + assert_eq!(count_caches(q.clone()), 0); + assert!(predicate_at_scan(q)); + + Ok(()) +} diff --git a/crates/polars-plan/src/dsl/expr.rs b/crates/polars-plan/src/dsl/expr.rs index cf7cc2a31fbc..cb88fc45be55 100644 --- a/crates/polars-plan/src/dsl/expr.rs +++ b/crates/polars-plan/src/dsl/expr.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; pub use super::expr_dyn_fn::*; use crate::prelude::*; -#[derive(PartialEq, Clone)] +#[derive(PartialEq, Clone, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum AggExpr { Min { @@ -220,22 +220,61 @@ impl Hash for Expr { std::mem::discriminant(function).hash(state); options.hash(state); }, + Expr::Gather { + expr, + idx, + returns_scalar, + } => { + expr.hash(state); + idx.hash(state); + returns_scalar.hash(state); + }, // already hashed by discriminant Expr::Wildcard | Expr::Len => {}, - #[allow(unreachable_code)] - _ => { - // the panic checks if we hit this - #[cfg(debug_assertions)] - { - todo!("IMPLEMENT") - } - // TODO! derive. This is only a temporary fix - // Because PartialEq will have a lot of `false`, e.g. on Function - // Types, this may lead to many file reads, as we use predicate comparison - // to check if we can cache a file - let s = format!("{self:?}"); - s.hash(state) + Expr::SortBy { + expr, + by, + descending, + } => { + expr.hash(state); + by.hash(state); + descending.hash(state); + }, + Expr::Agg(input) => input.hash(state), + Expr::Explode(input) => input.hash(state), + Expr::Window { + function, + partition_by, + options, + } => { + function.hash(state); + partition_by.hash(state); + options.hash(state); + }, + Expr::Slice { + input, + offset, + length, + } => { + input.hash(state); + offset.hash(state); + length.hash(state); + }, + Expr::Exclude(input, excl) => { + input.hash(state); + excl.hash(state); + }, + Expr::RenameAlias { function: _, expr } => expr.hash(state), + Expr::AnonymousFunction { + input, + function: _, + output_type: _, + options, + } => { + input.hash(state); + options.hash(state); }, + Expr::SubPlan(_, names) => names.hash(state), } } } diff --git a/crates/polars-plan/src/logical_plan/optimizer/cache_states.rs b/crates/polars-plan/src/logical_plan/optimizer/cache_states.rs index 6b2e2b175c9b..6f7a865daade 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/cache_states.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/cache_states.rs @@ -5,117 +5,252 @@ use super::*; fn get_upper_projections( parent: Node, lp_arena: &Arena, - expr_arena: &Arena, -) -> Option>> { + names_scratch: &mut Vec, +) -> bool { let parent = lp_arena.get(parent); use ALogicalPlan::*; // During projection pushdown all accumulated. match parent { - Projection { expr, .. } => { - let mut out = Vec::with_capacity(expr.len()); - for e in expr { - out.extend(aexpr_to_leaf_names_iter(e.node(), expr_arena)); - } - Some(out) - }, SimpleProjection { columns, .. } => { - let out = columns - .iter_names() - .map(|s| ColumnName::from(s.as_str())) - .collect(); - Some(out) + let iter = columns.iter_names().map(|s| ColumnName::from(s.as_str())); + names_scratch.extend(iter); + false + }, + Selection { .. } => true, + // Only filter and projection nodes are allowed, any other node we stop. + _ => false, + } +} + +fn get_upper_predicates( + parent: Node, + lp_arena: &Arena, + expr_arena: &mut Arena, + predicate_scratch: &mut Vec, +) -> bool { + let parent = lp_arena.get(parent); + + use ALogicalPlan::*; + match parent { + Selection { predicate, .. } => { + let expr = predicate.to_expr(expr_arena); + predicate_scratch.push(expr); + false }, - // other - _ => None, + SimpleProjection { .. } => true, + // Only filter and projection nodes are allowed, any other node we stop. + _ => false, } } -/// This will ensure that all equal caches communicate the amount of columns +type TwoParents = [Option; 2]; + +/// 1. This will ensure that all equal caches communicate the amount of columns /// they need to project. +/// 2. +/// - This will ensure we apply predicate in the subtrees below the caches. +/// - If the predicate above the cache is the same for all matching caches that filter will be applied +/// as well. +/// +/// # Example +/// Consider this tree, where `SUB-TREE` is duplicate and can be cached. +/// +/// +/// Tree +/// | +/// | +/// |--------------------|-------------------| +/// | | +/// SUB-TREE SUB-TREE +/// +/// STEPS: +/// - 1 CSE will run and will insert cache nodes +/// +/// Tree +/// | +/// | +/// |--------------------|-------------------| +/// | | +/// | CACHE 0 | CACHE 0 +/// | | +/// SUB-TREE SUB-TREE +/// +/// - 2 predicate and projection pushdown will run and will insert optional FILTER and PROJECTION above the caches +/// +/// Tree +/// | +/// | +/// |--------------------|-------------------| +/// | FILTER (optional) | FILTER (optional) +/// | PROJ (optional) | PROJ (optional) +/// | | +/// | CACHE 0 | CACHE 0 +/// | | +/// SUB-TREE SUB-TREE +/// +/// # Projection optimization +/// The union of the projection is determined and the projection will be pushed down. +/// +/// Tree +/// | +/// | +/// |--------------------|-------------------| +/// | FILTER (optional) | FILTER (optional) +/// | CACHE 0 | CACHE 0 +/// | | +/// SUB-TREE SUB-TREE +/// UNION PROJ (optional) UNION PROJ (optional) +/// +/// # Filter optimization +/// Depending on the predicates the predicate pushdown optimization will run. +/// Possible cases: +/// - NO FILTERS: run predicate pd from the cache nodes -> finish +/// - Above the filters the caches are the same -> run predicate pd from the filter node -> finish +/// - There is a cache without predicates above the cache node -> run predicate form the cache nodes -> finish +/// - The predicates above the cache nodes are all different -> remove the cache nodes -> finish pub(super) fn set_cache_states( root: Node, lp_arena: &mut Arena, expr_arena: &mut Arena, scratch: &mut Vec, - has_caches: bool, + verbose: bool, ) -> PolarsResult<()> { let mut stack = Vec::with_capacity(4); + let mut names_scratch = vec![]; + let mut predicates_scratch = vec![]; scratch.clear(); stack.clear(); - // Per cache id holds: - // - a Vec: with children of the node - // - a Set: with the union of projected column names. - // - a Set: with the union of hstack column names. + #[derive(Default)] + struct Value { + // All the children of the cache per cache-id. + children: Vec, + parents: Vec, + cache_nodes: Vec, + // Union over projected names. + names_union: PlHashSet, + // Union over predicates. + predicate_union: PlHashMap, + } let mut cache_schema_and_children = BTreeMap::new(); - stack.push((root, None, None, None)); + // Stack frame + #[derive(Default, Copy, Clone)] + struct Frame { + current: Node, + cache_id: Option, + parent: TwoParents, + previous_cache: Option, + } + let init = Frame { + current: root, + ..Default::default() + }; + + stack.push(init); // # First traversal. // Collect the union of columns per cache id. // And find the cache parents. - while let Some((current_node, mut cache_id, mut parent, mut previous_cache)) = stack.pop() { - let lp = lp_arena.get(current_node); + while let Some(mut frame) = stack.pop() { + let lp = lp_arena.get(frame.current); lp.copy_inputs(scratch); use ALogicalPlan::*; match lp { // don't allow parallelism as caches need each others work // also self-referencing plans can deadlock on the files they lock - Join { options, .. } if has_caches && options.allow_parallel => { - if let Join { options, .. } = lp_arena.get_mut(current_node) { + Join { options, .. } if options.allow_parallel => { + if let Join { options, .. } = lp_arena.get_mut(frame.current) { let options = Arc::make_mut(options); options.allow_parallel = false; } }, // don't allow parallelism as caches need each others work // also self-referencing plans can deadlock on the files they lock - Union { options, .. } if has_caches && options.parallel => { - if let Union { options, .. } = lp_arena.get_mut(current_node) { + Union { options, .. } if options.parallel => { + if let Union { options, .. } = lp_arena.get_mut(frame.current) { options.parallel = false; } }, Cache { input, id, .. } => { - if let Some(cache_id) = cache_id { - previous_cache = Some(cache_id) + if let Some(cache_id) = frame.cache_id { + frame.previous_cache = Some(cache_id) } - if let Some(parent_node) = parent { - // projection pushdown has already run and blocked on cache nodes + if frame.parent[0].is_some() { + // Projection pushdown has already run and blocked on cache nodes // the pushed down columns are projected just above this cache // if there were no pushed down column, we just take the current // nodes schema // we never want to naively take parents, as a join or aggregate for instance // change the schema - let (children, union_names) = cache_schema_and_children + let v = cache_schema_and_children .entry(*id) - .or_insert_with(|| (Vec::new(), PlHashSet::new())); - children.push(*input); + .or_insert_with(Value::default); + v.children.push(*input); + v.parents.push(frame.parent); + v.cache_nodes.push(frame.current); - if let Some(names) = get_upper_projections(parent_node, lp_arena, expr_arena) { - union_names.extend(names); + let mut found_columns = false; + + for parent_node in frame.parent.into_iter().flatten() { + let keep_going = + get_upper_projections(parent_node, lp_arena, &mut names_scratch); + if !names_scratch.is_empty() { + found_columns = true; + v.names_union.extend(names_scratch.drain(..)); + } + // We stop early as we want to find the first projection node above the cache. + if !keep_going { + break; + } } + + for parent_node in frame.parent.into_iter().flatten() { + let keep_going = get_upper_predicates( + parent_node, + lp_arena, + expr_arena, + &mut predicates_scratch, + ); + if !predicates_scratch.is_empty() { + for pred in predicates_scratch.drain(..) { + let count = v.predicate_union.entry(pred).or_insert(0); + *count += 1; + } + } + // We stop early as we want to find the first predicate node above the cache. + if !keep_going { + break; + } + } + // There was no explicit projection and we must take // all columns - else { + if !found_columns { let schema = lp.schema(lp_arena); - union_names.extend( + v.names_union.extend( schema .iter_names() .map(|name| ColumnName::from(name.as_str())), ); } } - cache_id = Some(*id); + frame.cache_id = Some(*id); }, _ => {}, } - parent = Some(current_node); + // Shift parents. + frame.parent[1] = frame.parent[0]; + frame.parent[0] = Some(frame.current); for n in scratch.iter() { - stack.push((*n, cache_id, parent, previous_cache)) + let mut new_frame = frame; + new_frame.current = *n; + stack.push(new_frame); } scratch.clear(); } @@ -127,11 +262,48 @@ pub(super) fn set_cache_states( // back to the cache node again if !cache_schema_and_children.is_empty() { let mut proj_pd = ProjectionPushDown::new(); - let pred_pd = PredicatePushDown::new(Default::default()); - for (_cache_id, (children, columns)) in cache_schema_and_children { - if !columns.is_empty() { - for child in children { - let columns = &columns; + let pred_pd = PredicatePushDown::new(Default::default()).block_at_cache(false); + for (_cache_id, v) in cache_schema_and_children { + // # CHECK IF WE NEED TO REMOVE CACHES + // If we encounter multiple predicates we remove the cache nodes completely as we don't + // want to loose predicate pushdown in favor of scan sharing. + if v.predicate_union.len() > 1 { + if verbose { + eprintln!("cache nodes will be removed because predicates don't match") + } + for ((&child, cache), parents) in + v.children.iter().zip(v.cache_nodes).zip(v.parents) + { + // Remove the cache and assign the child the cache location. + lp_arena.swap(child, cache); + + // Restart predicate and projection pushdown from most top parent. + // This to ensure we continue the optimization where it was blocked initially. + // We pick up the blocked filter and projection. + let mut node = cache; + for p_node in parents.into_iter().flatten() { + if matches!( + lp_arena.get(p_node), + ALogicalPlan::Selection { .. } | ALogicalPlan::SimpleProjection { .. } + ) { + node = p_node + } else { + break; + } + } + + let lp = lp_arena.take(node); + let lp = proj_pd.optimize(lp, lp_arena, expr_arena)?; + let lp = pred_pd.optimize(lp, lp_arena, expr_arena)?; + lp_arena.replace(node, lp); + } + return Ok(()); + } + + // # RUN PROJECTION PUSHDOWN + if !v.names_union.is_empty() { + for &child in &v.children { + let columns = &v.names_union; let child_lp = lp_arena.take(child); // Make sure we project in the order of the schema @@ -152,7 +324,6 @@ pub(super) fn set_cache_states( .build(); let lp = proj_pd.optimize(lp, lp_arena, expr_arena)?; - let lp = pred_pd.optimize(lp, lp_arena, expr_arena)?; // Remove the projection added by the optimization. let lp = if let ALogicalPlan::Projection { input, .. } | ALogicalPlan::SimpleProjection { input, .. } = lp @@ -163,8 +334,46 @@ pub(super) fn set_cache_states( }; lp_arena.replace(child, lp); } + } else { + // No upper projections to include, run projection pushdown from cache node. + for &child in &v.children { + let child_lp = lp_arena.take(child); + let lp = proj_pd.optimize(child_lp, lp_arena, expr_arena)?; + lp_arena.replace(child, lp); + } + } + + // # RUN PREDICATE PUSHDOWN + // Run this after projection pushdown, otherwise the predicate columns will not be projected. + + // - If all predicates of parent are the same we will restart predicate pushdown from the parent FILTER node. + // - Otherwise we will start predicate pushdown from the cache node. + let allow_parent_predicate_pushdown = v.predicate_union.len() == 1 && { + let (_pred, count) = v.predicate_union.iter().next().unwrap(); + *count == v.children.len() as u32 + }; + + for (&child, parents) in v.children.iter().zip(v.parents) { + if allow_parent_predicate_pushdown { + let node = get_filter_node(parents, lp_arena) + .expect("expected filter; this is an optimizer bug"); + let start_lp = lp_arena.take(node); + let lp = pred_pd.optimize(start_lp, lp_arena, expr_arena)?; + lp_arena.replace(node, lp); + } else { + let child_lp = lp_arena.take(child); + let lp = pred_pd.optimize(child_lp, lp_arena, expr_arena)?; + lp_arena.replace(child, lp); + } } } } Ok(()) } + +fn get_filter_node(parents: TwoParents, lp_arena: &Arena) -> Option { + parents + .into_iter() + .flatten() + .find(|&parent| matches!(lp_arena.get(parent), ALogicalPlan::Selection { .. })) +} diff --git a/crates/polars-plan/src/logical_plan/optimizer/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/mod.rs index e9921cbd495c..49c764c7c54a 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/mod.rs @@ -90,7 +90,7 @@ pub fn optimize( scratch: &mut Vec, hive_partition_eval: HiveEval<'_>, ) -> PolarsResult { - #[cfg(feature = "cse")] + #[allow(dead_code)] let verbose = verbose(); // get toggle values let predicate_pushdown = opt_state.predicate_pushdown; @@ -137,7 +137,7 @@ pub fn optimize( } #[cfg(feature = "cse")] - let cse_plan_changed = + let _cse_plan_changed = if comm_subplan_elim && members.has_joins_or_unions && members.has_duplicate_scans() { if verbose { eprintln!("found multiple sources; run comm_subplan_elim") @@ -153,7 +153,7 @@ pub fn optimize( false }; #[cfg(not(feature = "cse"))] - let cse_plan_changed = false; + let _cse_plan_changed = false; // Should be run before predicate pushdown. if projection_pushdown { @@ -211,7 +211,7 @@ pub fn optimize( lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top)?; if members.has_joins_or_unions && members.has_cache { - cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, cse_plan_changed)?; + cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, verbose)?; } // This one should run (nearly) last as this modifies the projections @@ -227,7 +227,7 @@ pub fn optimize( // Make sure that we do that once slice pushdowd and predicate pushdown are done. // At that moment the file fingerprints are finished. #[cfg(any(feature = "cse", feature = "parquet", feature = "ipc", feature = "csv"))] - if agg_scan_projection && !cse_plan_changed { + if agg_scan_projection && !_cse_plan_changed { // We do this so that expressions, created by the pushdown optimizations, are simplified . // We must clean up the predicates, because the agg_scan_projection // uses them in the hashtable to determine duplicates. @@ -240,7 +240,7 @@ pub fn optimize( } #[cfg(feature = "cse")] - if cse_plan_changed { + if _cse_plan_changed { // this must run after cse cse::decrement_file_counters_by_cache_hits(lp_top, lp_arena, expr_arena, 0, scratch); } diff --git a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index 6d73d18d5de1..0b1e9dbd4e93 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -21,6 +21,7 @@ pub type HiveEval<'a> = pub struct PredicatePushDown<'a> { hive_partition_eval: HiveEval<'a>, verbose: bool, + block_at_cache: bool, } impl<'a> PredicatePushDown<'a> { @@ -28,9 +29,15 @@ impl<'a> PredicatePushDown<'a> { Self { hive_partition_eval, verbose: verbose(), + block_at_cache: true, } } + pub(crate) fn block_at_cache(mut self, toggle: bool) -> Self { + self.block_at_cache = toggle; + self + } + fn optional_apply_predicate( &self, lp: ALogicalPlan, @@ -622,7 +629,13 @@ impl<'a> PredicatePushDown<'a> { self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena) }, // Caches will run predicate push-down in the `cache_states` run. - Cache { .. } => self.no_pushdown(lp, acc_predicates, lp_arena, expr_arena), + Cache { .. } => { + if self.block_at_cache { + self.no_pushdown(lp, acc_predicates, lp_arena, expr_arena) + } else { + self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false) + } + }, #[cfg(feature = "python")] PythonScan { mut options, @@ -692,7 +705,7 @@ impl<'a> PredicatePushDown<'a> { } } - pub fn optimize( + pub(crate) fn optimize( &self, logical_plan: ALogicalPlan, lp_arena: &mut Arena,