Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Cache arena's (and conversion) in SQL context #16566

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions crates/polars-lazy/src/frame/cached_arenas.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use super::*;

pub(crate) struct CachedArena {
lp_arena: Arena<IR>,
expr_arena: Arena<AExpr>,
}

impl LazyFrame {
pub fn set_cached_arena(&self, lp_arena: Arena<IR>, expr_arena: Arena<AExpr>) {
let mut cached = self.cached_arena.lock().unwrap();
*cached = Some(CachedArena {
lp_arena,
expr_arena,
});
}

pub fn schema_with_arenas(
&mut self,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<SchemaRef> {
let node = to_alp(self.logical_plan.clone(), expr_arena, lp_arena, false, true)?;

let schema = lp_arena.get(node).schema(lp_arena).into_owned();
// Cache the logical plan so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: lp_arena.version(),
};
Ok(schema)
}

/// Get a handle to the schema — a map from column names to data types — of the current
/// `LazyFrame` computation.
///
/// Returns an `Err` if the logical plan has already encountered an error (i.e., if
/// `self.collect()` would fail), `Ok` otherwise.
pub fn schema(&mut self) -> PolarsResult<SchemaRef> {
let mut cached_arenas = self.cached_arena.lock().unwrap();

match &mut *cached_arenas {
None => {
let mut lp_arena = Default::default();
let mut expr_arena = Default::default();
// Code duplication because of bchk. :(
let node = to_alp(
self.logical_plan.clone(),
&mut expr_arena,
&mut lp_arena,
false,
true,
)?;

let schema = lp_arena.get(node).schema(&lp_arena).into_owned();
// Cache the logical plan so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: lp_arena.version(),
};
*cached_arenas = Some(CachedArena {
lp_arena,
expr_arena,
});

Ok(schema)
},
Some(arenas) => {
match self.logical_plan {
// We have got arenas and don't need to convert the DSL.
DslPlan::IR {
node: Some(node), ..
} => Ok(arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned()),
_ => {
// We have got arenas, but still need to convert (parts) of the DSL.
// Code duplication because of bchk. :(
let node = to_alp(
self.logical_plan.clone(),
&mut arenas.expr_arena,
&mut arenas.lp_arena,
false,
true,
)?;

let schema = arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned();
// Cache the logical plan so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: arenas.lp_arena.version(),
};
Ok(schema)
},
}
},
}
}

pub(super) fn get_arenas(&mut self) -> (Arena<IR>, Arena<AExpr>) {
match self.cached_arena.lock().unwrap().as_mut() {
Some(arenas) => (arenas.lp_arena.clone(), arenas.expr_arena.clone()),
None => (Arena::with_capacity(16), Arena::with_capacity(16)),
}
}
}
88 changes: 2 additions & 86 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#[cfg(feature = "python")]
mod python;

mod cached_arenas;
mod err;
#[cfg(not(target_arch = "wasm32"))]
mod exitable;
Expand Down Expand Up @@ -36,6 +37,7 @@ pub use polars_plan::frame::{AllowedOptimizations, OptState};
use polars_plan::global::FETCH_ROWS;
use smartstring::alias::String as SmartString;

use crate::frame::cached_arenas::CachedArena;
use crate::physical_plan::executors::Executor;
use crate::physical_plan::planner::{
create_physical_expr, create_physical_plan, ExpressionConversionState,
Expand Down Expand Up @@ -90,11 +92,6 @@ impl From<DslPlan> for LazyFrame {
}
}

pub(crate) struct CachedArena {
lp_arena: Arena<IR>,
expr_arena: Arena<AExpr>,
}

impl LazyFrame {
pub(crate) fn from_inner(
logical_plan: DslPlan,
Expand All @@ -108,80 +105,6 @@ impl LazyFrame {
}
}

/// Get a handle to the schema — a map from column names to data types — of the current
/// `LazyFrame` computation.
///
/// Returns an `Err` if the logical plan has already encountered an error (i.e., if
/// `self.collect()` would fail), `Ok` otherwise.
pub fn schema(&mut self) -> PolarsResult<SchemaRef> {
let mut cached_arenas = self.cached_arena.lock().unwrap();

match &mut *cached_arenas {
None => {
let mut lp_arena = Default::default();
let mut expr_arena = Default::default();
let node = to_alp(
self.logical_plan.clone(),
&mut expr_arena,
&mut lp_arena,
false,
true,
)?;

let schema = lp_arena.get(node).schema(&lp_arena).into_owned();

// Cache the logical plan and the arenas, so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: lp_arena.version(),
};
*cached_arenas = Some(CachedArena {
lp_arena,
expr_arena,
});

Ok(schema)
},
Some(arenas) => {
match self.logical_plan {
// We have got arenas and don't need to convert the DSL.
DslPlan::IR {
node: Some(node), ..
} => Ok(arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned()),
_ => {
// We have got arenas, but still need to convert (parts) of the DSL.
let node = to_alp(
self.logical_plan.clone(),
&mut arenas.expr_arena,
&mut arenas.lp_arena,
false,
true,
)?;

let schema = arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned();
// Cache the logical plan so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: arenas.lp_arena.version(),
};

Ok(schema)
},
}
},
}
}

pub(crate) fn get_plan_builder(self) -> DslBuilder {
DslBuilder::from(self.logical_plan)
}
Expand Down Expand Up @@ -681,13 +604,6 @@ impl LazyFrame {
Ok(lp_top)
}

fn get_arenas(&mut self) -> (Arena<IR>, Arena<AExpr>) {
match self.cached_arena.lock().unwrap().as_mut() {
Some(arenas) => (arenas.lp_arena.clone(), arenas.expr_arena.clone()),
None => (Arena::with_capacity(16), Arena::with_capacity(16)),
}
}

fn prepare_collect_post_opt<P>(
mut self,
check_sink: bool,
Expand Down
36 changes: 26 additions & 10 deletions crates/polars-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct SQLContext {
pub(crate) function_registry: Arc<dyn FunctionRegistry>,
cte_map: RefCell<PlHashMap<String, LazyFrame>>,
aliases: RefCell<PlHashMap<String, String>>,
lp_arena: Arena<IR>,
expr_arena: Arena<AExpr>,
}

impl Default for SQLContext {
Expand All @@ -33,6 +35,8 @@ impl Default for SQLContext {
table_map: Default::default(),
cte_map: Default::default(),
aliases: Default::default(),
lp_arena: Default::default(),
expr_arena: Default::default(),
}
}
}
Expand Down Expand Up @@ -111,11 +115,18 @@ impl SQLContext {
.parse_statements()
.map_err(to_compute_err)?;
polars_ensure!(ast.len() == 1, ComputeError: "One and only one statement at a time please");
let res = self.execute_statement(ast.first().unwrap());
let res = self.execute_statement(ast.first().unwrap())?;

// Ensure the result uses the proper arenas.
// This will instantiate new arenas with a new version.
let lp_arena = std::mem::take(&mut self.lp_arena);
let expr_arena = std::mem::take(&mut self.expr_arena);
res.set_cached_arena(lp_arena, expr_arena);

// Every execution should clear the CTE map.
self.cte_map.borrow_mut().clear();
self.aliases.borrow_mut().clear();
res
Ok(res)
}

/// add a function registry to the SQLContext
Expand Down Expand Up @@ -283,7 +294,12 @@ impl SQLContext {
None => {
let tbl = table_name.to_string();
if let Some(lf) = self.table_map.get_mut(&tbl) {
*lf = DataFrame::from(lf.schema().unwrap().as_ref()).lazy();
*lf = DataFrame::from(
lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
.unwrap()
.as_ref(),
)
.lazy();
Ok(lf.clone())
} else {
polars_bail!(ComputeError: "table '{}' does not exist", tbl);
Expand Down Expand Up @@ -369,7 +385,7 @@ impl SQLContext {
let mut contains_wildcard_exclude = false;

// Filter expression.
let schema = Some(lf.schema()?);
let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?);
if let Some(expr) = select_stmt.selection.as_ref() {
let mut filter_expression = parse_sql_expr(expr, self, schema.as_deref())?;
lf = self.process_subqueries(lf, vec![&mut filter_expression]);
Expand Down Expand Up @@ -473,7 +489,7 @@ impl SQLContext {
if query.order_by.is_empty() {
lf.select(projections)
} else if !contains_wildcard {
let schema = lf.schema()?;
let schema = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?;
let mut column_names = schema.get_names();
let mut retained_names = PlHashSet::new();

Expand Down Expand Up @@ -538,7 +554,7 @@ impl SQLContext {
lf = self.process_order_by(lf, &query.order_by)?;

// Apply optional 'having' clause, post-aggregation.
let schema = Some(lf.schema()?);
let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?);
match select_stmt.having.as_ref() {
Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
None => lf,
Expand All @@ -550,7 +566,7 @@ impl SQLContext {
Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
Some(Distinct::On(exprs)) => {
// TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
let schema = Some(lf.schema()?);
let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?);
let cols = exprs
.iter()
.map(|e| {
Expand Down Expand Up @@ -702,7 +718,7 @@ impl SQLContext {
let mut by = Vec::with_capacity(ob.len());
let mut descending = Vec::with_capacity(ob.len());

let schema = Some(lf.schema()?);
let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?);
for ob in ob {
by.push(parse_sql_expr(&ob.expr, self, schema.as_deref())?);
descending.push(!ob.asc.unwrap_or(true));
Expand Down Expand Up @@ -731,7 +747,7 @@ impl SQLContext {
!contains_wildcard,
ComputeError: "group_by error: can't process wildcard in group_by"
);
let schema_before = lf.schema()?;
let schema_before = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?;
let group_by_keys_schema =
expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;

Expand Down Expand Up @@ -856,7 +872,7 @@ impl SQLContext {
tbl_name
)
})?;
let schema = lf.schema()?;
let schema = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?;
cols(schema.iter_names())
},
e => polars_bail!(
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-utils/src/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ impl<T> Arena<T> {
}

pub fn clear(&mut self) {
self.items.clear()
self.items.clear();
self.version = ARENA_VERSION.fetch_add(1, Ordering::Relaxed);
}
}

Expand Down