Skip to content

Commit

Permalink
perf: Cache arena's (and conversion) in SQL context (#16566)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored May 29, 2024
1 parent 10ea42b commit 4048c3a
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 97 deletions.
114 changes: 114 additions & 0 deletions crates/polars-lazy/src/frame/cached_arenas.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use super::*;

pub(crate) struct CachedArena {
lp_arena: Arena<IR>,
expr_arena: Arena<AExpr>,
}

impl LazyFrame {
pub fn set_cached_arena(&self, lp_arena: Arena<IR>, expr_arena: Arena<AExpr>) {
let mut cached = self.cached_arena.lock().unwrap();
*cached = Some(CachedArena {
lp_arena,
expr_arena,
});
}

pub fn schema_with_arenas(
&mut self,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<SchemaRef> {
let node = to_alp(self.logical_plan.clone(), expr_arena, lp_arena, false, true)?;

let schema = lp_arena.get(node).schema(lp_arena).into_owned();
// Cache the logical plan so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: lp_arena.version(),
};
Ok(schema)
}

/// Get a handle to the schema — a map from column names to data types — of the current
/// `LazyFrame` computation.
///
/// Returns an `Err` if the logical plan has already encountered an error (i.e., if
/// `self.collect()` would fail), `Ok` otherwise.
pub fn schema(&mut self) -> PolarsResult<SchemaRef> {
let mut cached_arenas = self.cached_arena.lock().unwrap();

match &mut *cached_arenas {
None => {
let mut lp_arena = Default::default();
let mut expr_arena = Default::default();
// Code duplication because of bchk. :(
let node = to_alp(
self.logical_plan.clone(),
&mut expr_arena,
&mut lp_arena,
false,
true,
)?;

let schema = lp_arena.get(node).schema(&lp_arena).into_owned();
// Cache the logical plan so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: lp_arena.version(),
};
*cached_arenas = Some(CachedArena {
lp_arena,
expr_arena,
});

Ok(schema)
},
Some(arenas) => {
match self.logical_plan {
// We have got arenas and don't need to convert the DSL.
DslPlan::IR {
node: Some(node), ..
} => Ok(arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned()),
_ => {
// We have got arenas, but still need to convert (parts) of the DSL.
// Code duplication because of bchk. :(
let node = to_alp(
self.logical_plan.clone(),
&mut arenas.expr_arena,
&mut arenas.lp_arena,
false,
true,
)?;

let schema = arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned();
// Cache the logical plan so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: arenas.lp_arena.version(),
};
Ok(schema)
},
}
},
}
}

pub(super) fn get_arenas(&mut self) -> (Arena<IR>, Arena<AExpr>) {
match self.cached_arena.lock().unwrap().as_mut() {
Some(arenas) => (arenas.lp_arena.clone(), arenas.expr_arena.clone()),
None => (Arena::with_capacity(16), Arena::with_capacity(16)),
}
}
}
88 changes: 2 additions & 86 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#[cfg(feature = "python")]
mod python;

mod cached_arenas;
mod err;
#[cfg(not(target_arch = "wasm32"))]
mod exitable;
Expand Down Expand Up @@ -36,6 +37,7 @@ pub use polars_plan::frame::{AllowedOptimizations, OptState};
use polars_plan::global::FETCH_ROWS;
use smartstring::alias::String as SmartString;

use crate::frame::cached_arenas::CachedArena;
use crate::physical_plan::executors::Executor;
use crate::physical_plan::planner::{
create_physical_expr, create_physical_plan, ExpressionConversionState,
Expand Down Expand Up @@ -90,11 +92,6 @@ impl From<DslPlan> for LazyFrame {
}
}

pub(crate) struct CachedArena {
lp_arena: Arena<IR>,
expr_arena: Arena<AExpr>,
}

impl LazyFrame {
pub(crate) fn from_inner(
logical_plan: DslPlan,
Expand All @@ -108,80 +105,6 @@ impl LazyFrame {
}
}

/// Get a handle to the schema — a map from column names to data types — of the current
/// `LazyFrame` computation.
///
/// Returns an `Err` if the logical plan has already encountered an error (i.e., if
/// `self.collect()` would fail), `Ok` otherwise.
pub fn schema(&mut self) -> PolarsResult<SchemaRef> {
let mut cached_arenas = self.cached_arena.lock().unwrap();

match &mut *cached_arenas {
None => {
let mut lp_arena = Default::default();
let mut expr_arena = Default::default();
let node = to_alp(
self.logical_plan.clone(),
&mut expr_arena,
&mut lp_arena,
false,
true,
)?;

let schema = lp_arena.get(node).schema(&lp_arena).into_owned();

// Cache the logical plan and the arenas, so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: lp_arena.version(),
};
*cached_arenas = Some(CachedArena {
lp_arena,
expr_arena,
});

Ok(schema)
},
Some(arenas) => {
match self.logical_plan {
// We have got arenas and don't need to convert the DSL.
DslPlan::IR {
node: Some(node), ..
} => Ok(arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned()),
_ => {
// We have got arenas, but still need to convert (parts) of the DSL.
let node = to_alp(
self.logical_plan.clone(),
&mut arenas.expr_arena,
&mut arenas.lp_arena,
false,
true,
)?;

let schema = arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned();
// Cache the logical plan so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: arenas.lp_arena.version(),
};

Ok(schema)
},
}
},
}
}

pub(crate) fn get_plan_builder(self) -> DslBuilder {
DslBuilder::from(self.logical_plan)
}
Expand Down Expand Up @@ -681,13 +604,6 @@ impl LazyFrame {
Ok(lp_top)
}

fn get_arenas(&mut self) -> (Arena<IR>, Arena<AExpr>) {
match self.cached_arena.lock().unwrap().as_mut() {
Some(arenas) => (arenas.lp_arena.clone(), arenas.expr_arena.clone()),
None => (Arena::with_capacity(16), Arena::with_capacity(16)),
}
}

fn prepare_collect_post_opt<P>(
mut self,
check_sink: bool,
Expand Down
36 changes: 26 additions & 10 deletions crates/polars-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct SQLContext {
pub(crate) function_registry: Arc<dyn FunctionRegistry>,
cte_map: RefCell<PlHashMap<String, LazyFrame>>,
aliases: RefCell<PlHashMap<String, String>>,
lp_arena: Arena<IR>,
expr_arena: Arena<AExpr>,
}

impl Default for SQLContext {
Expand All @@ -33,6 +35,8 @@ impl Default for SQLContext {
table_map: Default::default(),
cte_map: Default::default(),
aliases: Default::default(),
lp_arena: Default::default(),
expr_arena: Default::default(),
}
}
}
Expand Down Expand Up @@ -111,11 +115,18 @@ impl SQLContext {
.parse_statements()
.map_err(to_compute_err)?;
polars_ensure!(ast.len() == 1, ComputeError: "One and only one statement at a time please");
let res = self.execute_statement(ast.first().unwrap());
let res = self.execute_statement(ast.first().unwrap())?;

// Ensure the result uses the proper arenas.
// This will instantiate new arenas with a new version.
let lp_arena = std::mem::take(&mut self.lp_arena);
let expr_arena = std::mem::take(&mut self.expr_arena);
res.set_cached_arena(lp_arena, expr_arena);

// Every execution should clear the CTE map.
self.cte_map.borrow_mut().clear();
self.aliases.borrow_mut().clear();
res
Ok(res)
}

/// add a function registry to the SQLContext
Expand Down Expand Up @@ -283,7 +294,12 @@ impl SQLContext {
None => {
let tbl = table_name.to_string();
if let Some(lf) = self.table_map.get_mut(&tbl) {
*lf = DataFrame::from(lf.schema().unwrap().as_ref()).lazy();
*lf = DataFrame::from(
lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
.unwrap()
.as_ref(),
)
.lazy();
Ok(lf.clone())
} else {
polars_bail!(ComputeError: "table '{}' does not exist", tbl);
Expand Down Expand Up @@ -369,7 +385,7 @@ impl SQLContext {
let mut contains_wildcard_exclude = false;

// Filter expression.
let schema = Some(lf.schema()?);
let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?);
if let Some(expr) = select_stmt.selection.as_ref() {
let mut filter_expression = parse_sql_expr(expr, self, schema.as_deref())?;
lf = self.process_subqueries(lf, vec![&mut filter_expression]);
Expand Down Expand Up @@ -473,7 +489,7 @@ impl SQLContext {
if query.order_by.is_empty() {
lf.select(projections)
} else if !contains_wildcard {
let schema = lf.schema()?;
let schema = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?;
let mut column_names = schema.get_names();
let mut retained_names = PlHashSet::new();

Expand Down Expand Up @@ -538,7 +554,7 @@ impl SQLContext {
lf = self.process_order_by(lf, &query.order_by)?;

// Apply optional 'having' clause, post-aggregation.
let schema = Some(lf.schema()?);
let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?);
match select_stmt.having.as_ref() {
Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
None => lf,
Expand All @@ -550,7 +566,7 @@ impl SQLContext {
Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
Some(Distinct::On(exprs)) => {
// TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
let schema = Some(lf.schema()?);
let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?);
let cols = exprs
.iter()
.map(|e| {
Expand Down Expand Up @@ -702,7 +718,7 @@ impl SQLContext {
let mut by = Vec::with_capacity(ob.len());
let mut descending = Vec::with_capacity(ob.len());

let schema = Some(lf.schema()?);
let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?);
for ob in ob {
by.push(parse_sql_expr(&ob.expr, self, schema.as_deref())?);
descending.push(!ob.asc.unwrap_or(true));
Expand Down Expand Up @@ -731,7 +747,7 @@ impl SQLContext {
!contains_wildcard,
ComputeError: "group_by error: can't process wildcard in group_by"
);
let schema_before = lf.schema()?;
let schema_before = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?;
let group_by_keys_schema =
expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;

Expand Down Expand Up @@ -856,7 +872,7 @@ impl SQLContext {
tbl_name
)
})?;
let schema = lf.schema()?;
let schema = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?;
cols(schema.iter_names())
},
e => polars_bail!(
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-utils/src/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ impl<T> Arena<T> {
}

pub fn clear(&mut self) {
self.items.clear()
self.items.clear();
self.version = ARENA_VERSION.fetch_add(1, Ordering::Relaxed);
}
}

Expand Down

0 comments on commit 4048c3a

Please sign in to comment.