Skip to content

Commit

Permalink
perf: Re-enable common subplan elim for new-streaming engine
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Dec 24, 2024
1 parent 0ed4b4c commit e48614e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
4 changes: 0 additions & 4 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,10 +619,6 @@ impl LazyFrame {
// The new streaming engine can't deal with the way the common
// subexpression elimination adds length-incorrect with_columns.
opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;

// The new streaming engine can't yet deal with the cache nodes
// introduced by common subplan elimination.
opt_state &= !OptFlags::COMM_SUBPLAN_ELIM;
}

let lp_top = optimize(
Expand Down
15 changes: 14 additions & 1 deletion crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub fn lower_ir(
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
schema_cache: &mut PlHashMap<Node, Arc<Schema>>,
expr_cache: &mut ExprCache,
cache_nodes: &mut PlHashMap<usize, PhysNodeKey>,
) -> PolarsResult<PhysNodeKey> {
// Helper macro to simplify recursive calls.
macro_rules! lower_ir {
Expand All @@ -97,6 +98,7 @@ pub fn lower_ir(
phys_sm,
schema_cache,
expr_cache,
cache_nodes,
)
};
}
Expand Down Expand Up @@ -431,7 +433,18 @@ pub fn lower_ir(

#[cfg(feature = "python")]
IR::PythonScan { .. } => todo!(),
IR::Cache { .. } => todo!(),

IR::Cache { input, id, cache_hits: _ } => {
let id = *id;
if let Some(cached) = cache_nodes.get(&id) {
return Ok(*cached);
}

let phys_input = lower_ir!(*input)?;
cache_nodes.insert(id, phys_input);
return Ok(phys_input);
},

IR::GroupBy {
input,
keys,
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,15 @@ pub fn build_physical_plan(
) -> PolarsResult<PhysNodeKey> {
let mut schema_cache = PlHashMap::with_capacity(ir_arena.len());
let mut expr_cache = ExprCache::with_capacity(expr_arena.len());
let mut cache_nodes = PlHashMap::new();
let phys_root = lower_ir::lower_ir(
root,
ir_arena,
expr_arena,
phys_sm,
&mut schema_cache,
&mut expr_cache,
&mut cache_nodes,
)?;
let mut referenced = SecondaryMap::with_capacity(phys_sm.capacity());
insert_multiplexers(phys_root, phys_sm, &mut referenced);
Expand Down

0 comments on commit e48614e

Please sign in to comment.