Skip to content

Commit

Permalink
perf: Re-enable common subplan elim for new-streaming engine (#20443)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Dec 25, 2024
1 parent d27f7b2 commit ffc5538
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
4 changes: 0 additions & 4 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,10 +619,6 @@ impl LazyFrame {
// The new streaming engine can't deal with the way the common
// subexpression elimination adds length-incorrect with_columns.
opt_state &= !OptFlags::COMM_SUBEXPR_ELIM;

// The new streaming engine can't yet deal with the cache nodes
// introduced by common subplan elimination.
opt_state &= !OptFlags::COMM_SUBPLAN_ELIM;
}

let lp_top = optimize(
Expand Down
19 changes: 18 additions & 1 deletion crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub fn lower_ir(
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
schema_cache: &mut PlHashMap<Node, Arc<Schema>>,
expr_cache: &mut ExprCache,
cache_nodes: &mut PlHashMap<usize, PhysNodeKey>,
) -> PolarsResult<PhysNodeKey> {
// Helper macro to simplify recursive calls.
macro_rules! lower_ir {
Expand All @@ -97,6 +98,7 @@ pub fn lower_ir(
phys_sm,
schema_cache,
expr_cache,
cache_nodes,
)
};
}
Expand Down Expand Up @@ -431,7 +433,22 @@ pub fn lower_ir(

#[cfg(feature = "python")]
IR::PythonScan { .. } => todo!(),
IR::Cache { .. } => todo!(),

IR::Cache {
input,
id,
cache_hits: _,
} => {
let id = *id;
if let Some(cached) = cache_nodes.get(&id) {
return Ok(*cached);
}

let phys_input = lower_ir!(*input)?;
cache_nodes.insert(id, phys_input);
return Ok(phys_input);
},

IR::GroupBy {
input,
keys,
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,15 @@ pub fn build_physical_plan(
) -> PolarsResult<PhysNodeKey> {
let mut schema_cache = PlHashMap::with_capacity(ir_arena.len());
let mut expr_cache = ExprCache::with_capacity(expr_arena.len());
let mut cache_nodes = PlHashMap::new();
let phys_root = lower_ir::lower_ir(
root,
ir_arena,
expr_arena,
phys_sm,
&mut schema_cache,
&mut expr_cache,
&mut cache_nodes,
)?;
let mut referenced = SecondaryMap::with_capacity(phys_sm.capacity());
insert_multiplexers(phys_root, phys_sm, &mut referenced);
Expand Down
1 change: 1 addition & 0 deletions py-polars/tests/unit/lazyframe/test_lazyframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,7 @@ def test_lazy_cache_same_key() -> None:
assert_frame_equal(result, expected)


@pytest.mark.may_fail_auto_streaming
def test_lazy_cache_hit(monkeypatch: Any, capfd: Any) -> None:
monkeypatch.setenv("POLARS_VERBOSE", "1")

Expand Down

0 comments on commit ffc5538

Please sign in to comment.