diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 535a557b0c02..ab1f1bc1835c 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -619,10 +619,6 @@ impl LazyFrame { // The new streaming engine can't deal with the way the common // subexpression elimination adds length-incorrect with_columns. opt_state &= !OptFlags::COMM_SUBEXPR_ELIM; - - // The new streaming engine can't yet deal with the cache nodes - // introduced by common subplan elimination. - opt_state &= !OptFlags::COMM_SUBPLAN_ELIM; } let lp_top = optimize( diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index bad63a0e767b..b9b74c9295f1 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -86,6 +86,7 @@ pub fn lower_ir( phys_sm: &mut SlotMap, schema_cache: &mut PlHashMap>, expr_cache: &mut ExprCache, + cache_nodes: &mut PlHashMap, ) -> PolarsResult { // Helper macro to simplify recursive calls. macro_rules! lower_ir { @@ -97,6 +98,7 @@ pub fn lower_ir( phys_sm, schema_cache, expr_cache, + cache_nodes, ) }; } @@ -431,7 +433,18 @@ pub fn lower_ir( #[cfg(feature = "python")] IR::PythonScan { .. } => todo!(), - IR::Cache { .. } => todo!(), + + IR::Cache { input, id, cache_hits: _ } => { + let id = *id; + if let Some(cached) = cache_nodes.get(&id) { + return Ok(*cached); + } + + let phys_input = lower_ir!(*input)?; + cache_nodes.insert(id, phys_input); + return Ok(phys_input); + }, + IR::GroupBy { input, keys, diff --git a/crates/polars-stream/src/physical_plan/mod.rs b/crates/polars-stream/src/physical_plan/mod.rs index c5f679c7fe56..217131b18b03 100644 --- a/crates/polars-stream/src/physical_plan/mod.rs +++ b/crates/polars-stream/src/physical_plan/mod.rs @@ -253,6 +253,7 @@ pub fn build_physical_plan( ) -> PolarsResult { let mut schema_cache = PlHashMap::with_capacity(ir_arena.len()); let mut expr_cache = ExprCache::with_capacity(expr_arena.len()); + let mut cache_nodes = PlHashMap::new(); let phys_root = lower_ir::lower_ir( root, ir_arena, @@ -260,6 +261,7 @@ pub fn build_physical_plan( phys_sm, &mut schema_cache, &mut expr_cache, + &mut cache_nodes, )?; let mut referenced = SecondaryMap::with_capacity(phys_sm.capacity()); insert_multiplexers(phys_root, phys_sm, &mut referenced);