diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 03e48a059183..9f1cfb194bfd 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -15,7 +15,7 @@ pub mod pivot; feature = "json" ))] use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; pub use anonymous_scan::*; #[cfg(feature = "csv")] @@ -55,6 +55,7 @@ impl IntoLazy for DataFrame { LazyFrame { logical_plan: lp, opt_state: Default::default(), + cached_arena: Default::default(), } } } @@ -65,6 +66,11 @@ impl IntoLazy for LazyFrame { } } +struct CachedArena { + lp_arena: Arena, + expr_arena: Arena, +} + /// Lazy abstraction over an eager `DataFrame`. /// It really is an abstraction over a logical plan. The methods of this struct will incrementally /// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)). @@ -73,6 +79,7 @@ impl IntoLazy for LazyFrame { pub struct LazyFrame { pub logical_plan: DslPlan, pub(crate) opt_state: OptState, + cached_arena: Arc>>, } impl From for LazyFrame { @@ -83,6 +90,7 @@ impl From for LazyFrame { file_caching: true, ..Default::default() }, + cached_arena: Default::default(), } } } @@ -93,8 +101,63 @@ impl LazyFrame { /// /// Returns an `Err` if the logical plan has already encountered an error (i.e., if /// `self.collect()` would fail), `Ok` otherwise. - pub fn schema(&self) -> PolarsResult { - self.logical_plan.compute_schema() + pub fn schema(&mut self) -> PolarsResult { + let mut cached_arenas = self.cached_arena.lock().unwrap(); + + match &mut *cached_arenas { + None => { + let mut lp_arena = Default::default(); + let mut expr_arena = Default::default(); + let node = to_alp( + self.logical_plan.clone(), + &mut expr_arena, + &mut lp_arena, + false, + true, + )?; + + let schema = lp_arena.get(node).schema(&lp_arena).into_owned(); + + // Cache the logical plan and the arenas, so that next schema call is cheap. + self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())}; + dbg!("set", lp_arena.len()); + *cached_arenas = Some(CachedArena { + lp_arena, + expr_arena, + }); + + Ok(schema) + }, + Some(arenas) => { + match self.logical_plan { + // We have got arenas and don't need to convert the DSL. + DslPlan::IR {node: Some(node), ..} => { + Ok(arenas + .lp_arena + .get(node) + .schema(&arenas.lp_arena) + .into_owned()) + }, + _ => { + // We have got arenas, but still need to convert (parts) of the DSL. + let node = to_alp( + self.logical_plan.clone(), + &mut arenas.expr_arena, + &mut arenas.lp_arena, + false, + true, + )?; + + dbg!("set", arenas.lp_arena.len()); + let schema = arenas.lp_arena.get(node).schema(&arenas.lp_arena).into_owned(); + // Cache the logical plan so that next schema call is cheap. + self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())}; + + Ok(schema) + } + } + }, + } } pub(crate) fn get_plan_builder(self) -> DslBuilder { @@ -109,6 +172,7 @@ impl LazyFrame { LazyFrame { logical_plan, opt_state, + cached_arena: Default::default(), } } @@ -516,17 +580,19 @@ impl LazyFrame { self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false) } - pub fn to_alp_optimized(self) -> PolarsResult { - let mut lp_arena = Arena::with_capacity(16); - let mut expr_arena = Arena::with_capacity(16); + pub fn to_alp_optimized(mut self) -> PolarsResult { + let (mut lp_arena, mut expr_arena) = self.get_arenas(); let node = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?; Ok(IRPlan::new(node, lp_arena, expr_arena)) } - pub fn to_alp(self) -> PolarsResult { - self.logical_plan.to_alp() + pub fn to_alp(mut self) -> PolarsResult { + let (mut lp_arena, mut expr_arena) = self.get_arenas(); + let node = to_alp(self.logical_plan, &mut expr_arena, &mut lp_arena, true, true)?; + let plan = IRPlan::new(node, lp_arena, expr_arena); + Ok(plan) } pub(crate) fn optimize_with_scratch( @@ -587,16 +653,27 @@ impl LazyFrame { Ok(lp_top) } + fn get_arenas(&mut self) -> (Arena, Arena) { + match self.cached_arena.lock().unwrap().as_mut() { + Some(arenas) => { + (arenas.lp_arena.clone(), arenas.expr_arena.clone()) + }, + None => { + (Arena::with_capacity(16), Arena::with_capacity(16)) + } + } + } + fn prepare_collect_post_opt

( - self, + mut self, check_sink: bool, post_opt: P, ) -> PolarsResult<(ExecutionState, Box, bool)> where P: Fn(Node, &mut Arena, &mut Arena) -> PolarsResult<()>, { - let mut expr_arena = Arena::with_capacity(16); - let mut lp_arena = Arena::with_capacity(16); + let (mut lp_arena, mut expr_arena) = self.get_arenas(); + let mut scratch = vec![]; let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?; @@ -935,7 +1012,7 @@ impl LazyFrame { /// *group_by_dynamic* #[cfg(feature = "dynamic_group_by")] pub fn rolling>( - self, + mut self, index_column: Expr, group_by: E, mut options: RollingGroupOptions, @@ -980,7 +1057,7 @@ impl LazyFrame { /// with a ordinary group_by on these keys. #[cfg(feature = "dynamic_group_by")] pub fn group_by_dynamic>( - self, + mut self, index_column: Expr, group_by: E, mut options: DynamicGroupOptions, @@ -1672,6 +1749,7 @@ impl From for LazyFrame { Self { logical_plan: lgb.logical_plan, opt_state: lgb.opt_state, + cached_arena: Default::default(), } } } diff --git a/crates/polars-lazy/src/tests/optimization_checks.rs b/crates/polars-lazy/src/tests/optimization_checks.rs index 2bb0a0727dd3..293496c52f6b 100644 --- a/crates/polars-lazy/src/tests/optimization_checks.rs +++ b/crates/polars-lazy/src/tests/optimization_checks.rs @@ -485,7 +485,7 @@ fn test_with_column_prune() -> PolarsResult<()> { }); // whole `with_columns` pruned - let q = df.lazy().with_column(col("c0")).select([col("c1")]); + let mut q = df.lazy().with_column(col("c0")).select([col("c1")]); let lp = q.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap(); diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs index 86754f11d02a..24c0aa4b3539 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs @@ -79,6 +79,7 @@ pub fn to_alp_impl( Ok(lp_node) } + dbg!(lp_arena.len()); let v = match lp { DslPlan::Scan { @@ -565,6 +566,14 @@ pub fn to_alp_impl( .map_err(|e| e.context(failed_input!(sink)))?; IR::Sink { input, payload } }, + DslPlan::IR { node, dsl } => { + dbg!("IR", lp_arena.len()); + return if let Some(node) = node { + Ok(node) + } else { + to_alp_impl(owned(dsl), expr_arena, lp_arena, convert) + } + }, }; Ok(lp_arena.add(v)) } diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index a52115fcd01d..e0ebebb1a212 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -167,6 +167,12 @@ pub enum DslPlan { input: Arc, payload: SinkType, }, + IR { + #[cfg_attr(feature = "serde", serde(skip))] + node: Option, + // Keep the original Dsl around as we need that for serialization. + dsl: Arc, + }, } impl Clone for DslPlan { @@ -195,6 +201,7 @@ impl Clone for DslPlan { Self::HConcat { inputs, options } => Self::HConcat { inputs: inputs.clone(), options: options.clone() }, Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() }, Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() }, + Self::IR {node, dsl} => Self::IR {node: *node, dsl: dsl.clone()} } } } diff --git a/crates/polars-plan/src/logical_plan/schema.rs b/crates/polars-plan/src/logical_plan/schema.rs index f3e3eabddf19..e589f95a8b64 100644 --- a/crates/polars-plan/src/logical_plan/schema.rs +++ b/crates/polars-plan/src/logical_plan/schema.rs @@ -18,22 +18,10 @@ impl DslPlan { /// Compute the schema. This requires conversion to [`IR`] and type-resolving. pub fn compute_schema(&self) -> PolarsResult { - let opt_state = OptState { - eager: true, - type_coercion: true, - simplify_expr: false, - ..Default::default() - }; - let mut lp_arena = Default::default(); - let node = optimize( - self.clone(), - opt_state, - &mut lp_arena, - &mut Default::default(), - &mut Default::default(), - Default::default(), - )?; + let mut expr_arena = Default::default(); + let node = to_alp(self.clone(), &mut expr_arena, &mut lp_arena, false, true)?; + Ok(lp_arena.get(node).schema(&lp_arena).into_owned()) } } diff --git a/crates/polars-sql/src/context.rs b/crates/polars-sql/src/context.rs index 46e1c055f645..0b2de194a83a 100644 --- a/crates/polars-sql/src/context.rs +++ b/crates/polars-sql/src/context.rs @@ -694,7 +694,11 @@ impl SQLContext { Ok((tbl_name, lf)) } - fn process_order_by(&mut self, lf: LazyFrame, ob: &[OrderByExpr]) -> PolarsResult { + fn process_order_by( + &mut self, + mut lf: LazyFrame, + ob: &[OrderByExpr], + ) -> PolarsResult { let mut by = Vec::with_capacity(ob.len()); let mut descending = Vec::with_capacity(ob.len()); @@ -718,7 +722,7 @@ impl SQLContext { fn process_group_by( &mut self, - lf: LazyFrame, + mut lf: LazyFrame, contains_wildcard: bool, group_by_keys: &[Expr], projections: &[Expr], @@ -846,7 +850,7 @@ impl SQLContext { let idents = idents.as_slice(); let e = match idents { [tbl_name] => { - let lf = self.table_map.get(&tbl_name.value).ok_or_else(|| { + let lf = self.table_map.get_mut(&tbl_name.value).ok_or_else(|| { polars_err!( ComputeError: "no table named '{}' found", tbl_name diff --git a/crates/polars-sql/src/sql_expr.rs b/crates/polars-sql/src/sql_expr.rs index 480b0cce23c3..c18b79eb4f87 100644 --- a/crates/polars-sql/src/sql_expr.rs +++ b/crates/polars-sql/src/sql_expr.rs @@ -338,7 +338,7 @@ impl SQLExprVisitor<'_> { fn visit_compound_identifier(&self, idents: &[Ident]) -> PolarsResult { match idents { [tbl_name, column_name] => { - let lf = self + let mut lf = self .ctx .get_table_from_current_scope(&tbl_name.value) .ok_or_else(|| { diff --git a/crates/polars-utils/src/arena.rs b/crates/polars-utils/src/arena.rs index a0073b6598cf..2e4505c3b060 100644 --- a/crates/polars-utils/src/arena.rs +++ b/crates/polars-utils/src/arena.rs @@ -79,6 +79,7 @@ impl Arena { #[inline] pub fn get(&self, idx: Node) -> &T { + dbg!(idx.0); self.items.get(idx.0).unwrap() } diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index ec5461398c28..7aa96c9640a9 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -33,7 +33,7 @@ pub struct PyLazyFrame { } impl PyLazyFrame { - fn get_schema(&self) -> PyResult { + fn get_schema(&mut self) -> PyResult { let schema = self.ldf.schema().map_err(PyPolarsErr::from)?; Ok(schema) } @@ -1165,13 +1165,13 @@ impl PyLazyFrame { self.ldf.clone().into() } - fn columns(&self, py: Python) -> PyResult { + fn columns(&mut self, py: Python) -> PyResult { let schema = self.get_schema()?; let iter = schema.iter_names().map(|s| s.as_str()); Ok(PyList::new_bound(py, iter).to_object(py)) } - fn dtypes(&self, py: Python) -> PyResult { + fn dtypes(&mut self, py: Python) -> PyResult { let schema = self.get_schema()?; let iter = schema .iter_dtypes() @@ -1179,7 +1179,7 @@ impl PyLazyFrame { Ok(PyList::new_bound(py, iter).to_object(py)) } - fn schema(&self, py: Python) -> PyResult { + fn schema(&mut self, py: Python) -> PyResult { let schema = self.get_schema()?; let schema_dict = PyDict::new_bound(py); @@ -1195,7 +1195,7 @@ impl PyLazyFrame { self.ldf.clone().unnest(columns).into() } - fn width(&self) -> PyResult { + fn width(&mut self) -> PyResult { Ok(self.get_schema()?.len()) }