From 4048c3abc63b1b2d471684de1ebbcc5ce599a89c Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 29 May 2024 11:50:41 +0200 Subject: [PATCH] perf: Cache arena's (and conversion) in SQL context (#16566) --- crates/polars-lazy/src/frame/cached_arenas.rs | 114 ++++++++++++++++++ crates/polars-lazy/src/frame/mod.rs | 88 +------------- crates/polars-sql/src/context.rs | 36 ++++-- crates/polars-utils/src/arena.rs | 3 +- 4 files changed, 144 insertions(+), 97 deletions(-) create mode 100644 crates/polars-lazy/src/frame/cached_arenas.rs diff --git a/crates/polars-lazy/src/frame/cached_arenas.rs b/crates/polars-lazy/src/frame/cached_arenas.rs new file mode 100644 index 000000000000..ecca97c06ac5 --- /dev/null +++ b/crates/polars-lazy/src/frame/cached_arenas.rs @@ -0,0 +1,114 @@ +use super::*; + +pub(crate) struct CachedArena { + lp_arena: Arena, + expr_arena: Arena, +} + +impl LazyFrame { + pub fn set_cached_arena(&self, lp_arena: Arena, expr_arena: Arena) { + let mut cached = self.cached_arena.lock().unwrap(); + *cached = Some(CachedArena { + lp_arena, + expr_arena, + }); + } + + pub fn schema_with_arenas( + &mut self, + lp_arena: &mut Arena, + expr_arena: &mut Arena, + ) -> PolarsResult { + let node = to_alp(self.logical_plan.clone(), expr_arena, lp_arena, false, true)?; + + let schema = lp_arena.get(node).schema(lp_arena).into_owned(); + // Cache the logical plan so that next schema call is cheap. + self.logical_plan = DslPlan::IR { + node: Some(node), + dsl: Arc::new(self.logical_plan.clone()), + version: lp_arena.version(), + }; + Ok(schema) + } + + /// Get a handle to the schema — a map from column names to data types — of the current + /// `LazyFrame` computation. + /// + /// Returns an `Err` if the logical plan has already encountered an error (i.e., if + /// `self.collect()` would fail), `Ok` otherwise. + pub fn schema(&mut self) -> PolarsResult { + let mut cached_arenas = self.cached_arena.lock().unwrap(); + + match &mut *cached_arenas { + None => { + let mut lp_arena = Default::default(); + let mut expr_arena = Default::default(); + // Code duplication because of bchk. :( + let node = to_alp( + self.logical_plan.clone(), + &mut expr_arena, + &mut lp_arena, + false, + true, + )?; + + let schema = lp_arena.get(node).schema(&lp_arena).into_owned(); + // Cache the logical plan so that next schema call is cheap. + self.logical_plan = DslPlan::IR { + node: Some(node), + dsl: Arc::new(self.logical_plan.clone()), + version: lp_arena.version(), + }; + *cached_arenas = Some(CachedArena { + lp_arena, + expr_arena, + }); + + Ok(schema) + }, + Some(arenas) => { + match self.logical_plan { + // We have got arenas and don't need to convert the DSL. + DslPlan::IR { + node: Some(node), .. + } => Ok(arenas + .lp_arena + .get(node) + .schema(&arenas.lp_arena) + .into_owned()), + _ => { + // We have got arenas, but still need to convert (parts) of the DSL. + // Code duplication because of bchk. :( + let node = to_alp( + self.logical_plan.clone(), + &mut arenas.expr_arena, + &mut arenas.lp_arena, + false, + true, + )?; + + let schema = arenas + .lp_arena + .get(node) + .schema(&arenas.lp_arena) + .into_owned(); + // Cache the logical plan so that next schema call is cheap. + self.logical_plan = DslPlan::IR { + node: Some(node), + dsl: Arc::new(self.logical_plan.clone()), + version: arenas.lp_arena.version(), + }; + Ok(schema) + }, + } + }, + } + } + + pub(super) fn get_arenas(&mut self) -> (Arena, Arena) { + match self.cached_arena.lock().unwrap().as_mut() { + Some(arenas) => (arenas.lp_arena.clone(), arenas.expr_arena.clone()), + None => (Arena::with_capacity(16), Arena::with_capacity(16)), + } + } +} diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index fb79c19cf7c3..66d12dc18b8a 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -2,6 +2,7 @@ #[cfg(feature = "python")] mod python; +mod cached_arenas; mod err; #[cfg(not(target_arch = "wasm32"))] mod exitable; @@ -36,6 +37,7 @@ pub use polars_plan::frame::{AllowedOptimizations, OptState}; use polars_plan::global::FETCH_ROWS; use smartstring::alias::String as SmartString; +use crate::frame::cached_arenas::CachedArena; use crate::physical_plan::executors::Executor; use crate::physical_plan::planner::{ create_physical_expr, create_physical_plan, ExpressionConversionState, @@ -90,11 +92,6 @@ impl From for LazyFrame { } } -pub(crate) struct CachedArena { - lp_arena: Arena, - expr_arena: Arena, -} - impl LazyFrame { pub(crate) fn from_inner( logical_plan: DslPlan, @@ -108,80 +105,6 @@ impl LazyFrame { } } - /// Get a handle to the schema — a map from column names to data types — of the current - /// `LazyFrame` computation. - /// - /// Returns an `Err` if the logical plan has already encountered an error (i.e., if - /// `self.collect()` would fail), `Ok` otherwise. - pub fn schema(&mut self) -> PolarsResult { - let mut cached_arenas = self.cached_arena.lock().unwrap(); - - match &mut *cached_arenas { - None => { - let mut lp_arena = Default::default(); - let mut expr_arena = Default::default(); - let node = to_alp( - self.logical_plan.clone(), - &mut expr_arena, - &mut lp_arena, - false, - true, - )?; - - let schema = lp_arena.get(node).schema(&lp_arena).into_owned(); - - // Cache the logical plan and the arenas, so that next schema call is cheap. - self.logical_plan = DslPlan::IR { - node: Some(node), - dsl: Arc::new(self.logical_plan.clone()), - version: lp_arena.version(), - }; - *cached_arenas = Some(CachedArena { - lp_arena, - expr_arena, - }); - - Ok(schema) - }, - Some(arenas) => { - match self.logical_plan { - // We have got arenas and don't need to convert the DSL. - DslPlan::IR { - node: Some(node), .. - } => Ok(arenas - .lp_arena - .get(node) - .schema(&arenas.lp_arena) - .into_owned()), - _ => { - // We have got arenas, but still need to convert (parts) of the DSL. - let node = to_alp( - self.logical_plan.clone(), - &mut arenas.expr_arena, - &mut arenas.lp_arena, - false, - true, - )?; - - let schema = arenas - .lp_arena - .get(node) - .schema(&arenas.lp_arena) - .into_owned(); - // Cache the logical plan so that next schema call is cheap. - self.logical_plan = DslPlan::IR { - node: Some(node), - dsl: Arc::new(self.logical_plan.clone()), - version: arenas.lp_arena.version(), - }; - - Ok(schema) - }, - } - }, - } - } - pub(crate) fn get_plan_builder(self) -> DslBuilder { DslBuilder::from(self.logical_plan) } @@ -681,13 +604,6 @@ impl LazyFrame { Ok(lp_top) } - fn get_arenas(&mut self) -> (Arena, Arena) { - match self.cached_arena.lock().unwrap().as_mut() { - Some(arenas) => (arenas.lp_arena.clone(), arenas.expr_arena.clone()), - None => (Arena::with_capacity(16), Arena::with_capacity(16)), - } - } - fn prepare_collect_post_opt

( mut self, check_sink: bool, diff --git a/crates/polars-sql/src/context.rs b/crates/polars-sql/src/context.rs index 0b2de194a83a..995f9df5af43 100644 --- a/crates/polars-sql/src/context.rs +++ b/crates/polars-sql/src/context.rs @@ -24,6 +24,8 @@ pub struct SQLContext { pub(crate) function_registry: Arc, cte_map: RefCell>, aliases: RefCell>, + lp_arena: Arena, + expr_arena: Arena, } impl Default for SQLContext { @@ -33,6 +35,8 @@ impl Default for SQLContext { table_map: Default::default(), cte_map: Default::default(), aliases: Default::default(), + lp_arena: Default::default(), + expr_arena: Default::default(), } } } @@ -111,11 +115,18 @@ impl SQLContext { .parse_statements() .map_err(to_compute_err)?; polars_ensure!(ast.len() == 1, ComputeError: "One and only one statement at a time please"); - let res = self.execute_statement(ast.first().unwrap()); + let res = self.execute_statement(ast.first().unwrap())?; + + // Ensure the result uses the proper arenas. + // This will instantiate new arenas with a new version. + let lp_arena = std::mem::take(&mut self.lp_arena); + let expr_arena = std::mem::take(&mut self.expr_arena); + res.set_cached_arena(lp_arena, expr_arena); + // Every execution should clear the CTE map. self.cte_map.borrow_mut().clear(); self.aliases.borrow_mut().clear(); - res + Ok(res) } /// add a function registry to the SQLContext @@ -283,7 +294,12 @@ impl SQLContext { None => { let tbl = table_name.to_string(); if let Some(lf) = self.table_map.get_mut(&tbl) { - *lf = DataFrame::from(lf.schema().unwrap().as_ref()).lazy(); + *lf = DataFrame::from( + lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena) + .unwrap() + .as_ref(), + ) + .lazy(); Ok(lf.clone()) } else { polars_bail!(ComputeError: "table '{}' does not exist", tbl); @@ -369,7 +385,7 @@ impl SQLContext { let mut contains_wildcard_exclude = false; // Filter expression. - let schema = Some(lf.schema()?); + let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?); if let Some(expr) = select_stmt.selection.as_ref() { let mut filter_expression = parse_sql_expr(expr, self, schema.as_deref())?; lf = self.process_subqueries(lf, vec![&mut filter_expression]); @@ -473,7 +489,7 @@ impl SQLContext { if query.order_by.is_empty() { lf.select(projections) } else if !contains_wildcard { - let schema = lf.schema()?; + let schema = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?; let mut column_names = schema.get_names(); let mut retained_names = PlHashSet::new(); @@ -538,7 +554,7 @@ impl SQLContext { lf = self.process_order_by(lf, &query.order_by)?; // Apply optional 'having' clause, post-aggregation. - let schema = Some(lf.schema()?); + let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?); match select_stmt.having.as_ref() { Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?), None => lf, @@ -550,7 +566,7 @@ impl SQLContext { Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any), Some(Distinct::On(exprs)) => { // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760 - let schema = Some(lf.schema()?); + let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?); let cols = exprs .iter() .map(|e| { @@ -702,7 +718,7 @@ impl SQLContext { let mut by = Vec::with_capacity(ob.len()); let mut descending = Vec::with_capacity(ob.len()); - let schema = Some(lf.schema()?); + let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?); for ob in ob { by.push(parse_sql_expr(&ob.expr, self, schema.as_deref())?); descending.push(!ob.asc.unwrap_or(true)); @@ -731,7 +747,7 @@ impl SQLContext { !contains_wildcard, ComputeError: "group_by error: can't process wildcard in group_by" ); - let schema_before = lf.schema()?; + let schema_before = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?; let group_by_keys_schema = expressions_to_schema(group_by_keys, &schema_before, Context::Default)?; @@ -856,7 +872,7 @@ impl SQLContext { tbl_name ) })?; - let schema = lf.schema()?; + let schema = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?; cols(schema.iter_names()) }, e => polars_bail!( diff --git a/crates/polars-utils/src/arena.rs b/crates/polars-utils/src/arena.rs index ccd09b66171e..4a0021c2a9e8 100644 --- a/crates/polars-utils/src/arena.rs +++ b/crates/polars-utils/src/arena.rs @@ -157,7 +157,8 @@ impl Arena { } pub fn clear(&mut self) { - self.items.clear() + self.items.clear(); + self.version = ARENA_VERSION.fetch_add(1, Ordering::Relaxed); } }