Skip to content

Commit

Permalink
WIP: schema cache [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 28, 2024
1 parent c56a3cd commit 92babdd
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 38 deletions.
104 changes: 91 additions & 13 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub mod pivot;
feature = "json"
))]
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

pub use anonymous_scan::*;
#[cfg(feature = "csv")]
Expand Down Expand Up @@ -55,6 +55,7 @@ impl IntoLazy for DataFrame {
LazyFrame {
logical_plan: lp,
opt_state: Default::default(),
cached_arena: Default::default(),
}
}
}
Expand All @@ -65,6 +66,11 @@ impl IntoLazy for LazyFrame {
}
}

struct CachedArena {
lp_arena: Arena<IR>,
expr_arena: Arena<AExpr>,
}

/// Lazy abstraction over an eager `DataFrame`.
/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
Expand All @@ -73,6 +79,7 @@ impl IntoLazy for LazyFrame {
pub struct LazyFrame {
pub logical_plan: DslPlan,
pub(crate) opt_state: OptState,
cached_arena: Arc<Mutex<Option<CachedArena>>>,
}

impl From<DslPlan> for LazyFrame {
Expand All @@ -83,6 +90,7 @@ impl From<DslPlan> for LazyFrame {
file_caching: true,
..Default::default()
},
cached_arena: Default::default(),
}
}
}
Expand All @@ -93,8 +101,63 @@ impl LazyFrame {
///
/// Returns an `Err` if the logical plan has already encountered an error (i.e., if
/// `self.collect()` would fail), `Ok` otherwise.
pub fn schema(&self) -> PolarsResult<SchemaRef> {
self.logical_plan.compute_schema()
pub fn schema(&mut self) -> PolarsResult<SchemaRef> {
let mut cached_arenas = self.cached_arena.lock().unwrap();

match &mut *cached_arenas {
None => {
let mut lp_arena = Default::default();
let mut expr_arena = Default::default();
let node = to_alp(
self.logical_plan.clone(),
&mut expr_arena,
&mut lp_arena,
false,
true,
)?;

let schema = lp_arena.get(node).schema(&lp_arena).into_owned();

// Cache the logical plan and the arenas, so that next schema call is cheap.
self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())};
dbg!("set", lp_arena.len());
*cached_arenas = Some(CachedArena {
lp_arena,
expr_arena,
});

Ok(schema)
},
Some(arenas) => {
match self.logical_plan {
// We have got arenas and don't need to convert the DSL.
DslPlan::IR {node: Some(node), ..} => {
Ok(arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned())
},
_ => {
// We have got arenas, but still need to convert (parts) of the DSL.
let node = to_alp(
self.logical_plan.clone(),
&mut arenas.expr_arena,
&mut arenas.lp_arena,
false,
true,
)?;

dbg!("set", arenas.lp_arena.len());
let schema = arenas.lp_arena.get(node).schema(&arenas.lp_arena).into_owned();
// Cache the logical plan so that next schema call is cheap.
self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())};

Ok(schema)
}
}
},
}
}

pub(crate) fn get_plan_builder(self) -> DslBuilder {
Expand All @@ -109,6 +172,7 @@ impl LazyFrame {
LazyFrame {
logical_plan,
opt_state,
cached_arena: Default::default(),
}
}

Expand Down Expand Up @@ -516,17 +580,19 @@ impl LazyFrame {
self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false)
}

pub fn to_alp_optimized(self) -> PolarsResult<IRPlan> {
let mut lp_arena = Arena::with_capacity(16);
let mut expr_arena = Arena::with_capacity(16);
pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
let (mut lp_arena, mut expr_arena) = self.get_arenas();
let node =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?;

Ok(IRPlan::new(node, lp_arena, expr_arena))
}

pub fn to_alp(self) -> PolarsResult<IRPlan> {
self.logical_plan.to_alp()
pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
let (mut lp_arena, mut expr_arena) = self.get_arenas();
let node = to_alp(self.logical_plan, &mut expr_arena, &mut lp_arena, true, true)?;
let plan = IRPlan::new(node, lp_arena, expr_arena);
Ok(plan)
}

pub(crate) fn optimize_with_scratch(
Expand Down Expand Up @@ -587,16 +653,27 @@ impl LazyFrame {
Ok(lp_top)
}

fn get_arenas(&mut self) -> (Arena<IR>, Arena<AExpr>) {
match self.cached_arena.lock().unwrap().as_mut() {
Some(arenas) => {
(arenas.lp_arena.clone(), arenas.expr_arena.clone())
},
None => {
(Arena::with_capacity(16), Arena::with_capacity(16))
}
}
}

fn prepare_collect_post_opt<P>(
self,
mut self,
check_sink: bool,
post_opt: P,
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
where
P: Fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<()>,
{
let mut expr_arena = Arena::with_capacity(16);
let mut lp_arena = Arena::with_capacity(16);
let (mut lp_arena, mut expr_arena) = self.get_arenas();

let mut scratch = vec![];
let lp_top =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;
Expand Down Expand Up @@ -935,7 +1012,7 @@ impl LazyFrame {
/// *group_by_dynamic*
#[cfg(feature = "dynamic_group_by")]
pub fn rolling<E: AsRef<[Expr]>>(
self,
mut self,
index_column: Expr,
group_by: E,
mut options: RollingGroupOptions,
Expand Down Expand Up @@ -980,7 +1057,7 @@ impl LazyFrame {
/// with a ordinary group_by on these keys.
#[cfg(feature = "dynamic_group_by")]
pub fn group_by_dynamic<E: AsRef<[Expr]>>(
self,
mut self,
index_column: Expr,
group_by: E,
mut options: DynamicGroupOptions,
Expand Down Expand Up @@ -1672,6 +1749,7 @@ impl From<LazyGroupBy> for LazyFrame {
Self {
logical_plan: lgb.logical_plan,
opt_state: lgb.opt_state,
cached_arena: Default::default(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ fn test_with_column_prune() -> PolarsResult<()> {
});

// whole `with_columns` pruned
let q = df.lazy().with_column(col("c0")).select([col("c1")]);
let mut q = df.lazy().with_column(col("c0")).select([col("c1")]);

let lp = q.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();

Expand Down
9 changes: 9 additions & 0 deletions crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub fn to_alp_impl(

Ok(lp_node)
}
dbg!(lp_arena.len());

let v = match lp {
DslPlan::Scan {
Expand Down Expand Up @@ -565,6 +566,14 @@ pub fn to_alp_impl(
.map_err(|e| e.context(failed_input!(sink)))?;
IR::Sink { input, payload }
},
DslPlan::IR { node, dsl } => {
dbg!("IR", lp_arena.len());
return if let Some(node) = node {
Ok(node)
} else {
to_alp_impl(owned(dsl), expr_arena, lp_arena, convert)
}
},
};
Ok(lp_arena.add(v))
}
Expand Down
7 changes: 7 additions & 0 deletions crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ pub enum DslPlan {
input: Arc<DslPlan>,
payload: SinkType,
},
IR {
#[cfg_attr(feature = "serde", serde(skip))]
node: Option<Node>,
// Keep the original Dsl around as we need that for serialization.
dsl: Arc<DslPlan>,
},
}

impl Clone for DslPlan {
Expand Down Expand Up @@ -195,6 +201,7 @@ impl Clone for DslPlan {
Self::HConcat { inputs, options } => Self::HConcat { inputs: inputs.clone(), options: options.clone() },
Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() },
Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() },
Self::IR {node, dsl} => Self::IR {node: *node, dsl: dsl.clone()}
}
}
}
Expand Down
18 changes: 3 additions & 15 deletions crates/polars-plan/src/logical_plan/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,10 @@ impl DslPlan {

/// Compute the schema. This requires conversion to [`IR`] and type-resolving.
pub fn compute_schema(&self) -> PolarsResult<SchemaRef> {
let opt_state = OptState {
eager: true,
type_coercion: true,
simplify_expr: false,
..Default::default()
};

let mut lp_arena = Default::default();
let node = optimize(
self.clone(),
opt_state,
&mut lp_arena,
&mut Default::default(),
&mut Default::default(),
Default::default(),
)?;
let mut expr_arena = Default::default();
let node = to_alp(self.clone(), &mut expr_arena, &mut lp_arena, false, true)?;

Ok(lp_arena.get(node).schema(&lp_arena).into_owned())
}
}
Expand Down
10 changes: 7 additions & 3 deletions crates/polars-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,11 @@ impl SQLContext {
Ok((tbl_name, lf))
}

fn process_order_by(&mut self, lf: LazyFrame, ob: &[OrderByExpr]) -> PolarsResult<LazyFrame> {
fn process_order_by(
&mut self,
mut lf: LazyFrame,
ob: &[OrderByExpr],
) -> PolarsResult<LazyFrame> {
let mut by = Vec::with_capacity(ob.len());
let mut descending = Vec::with_capacity(ob.len());

Expand All @@ -718,7 +722,7 @@ impl SQLContext {

fn process_group_by(
&mut self,
lf: LazyFrame,
mut lf: LazyFrame,
contains_wildcard: bool,
group_by_keys: &[Expr],
projections: &[Expr],
Expand Down Expand Up @@ -846,7 +850,7 @@ impl SQLContext {
let idents = idents.as_slice();
let e = match idents {
[tbl_name] => {
let lf = self.table_map.get(&tbl_name.value).ok_or_else(|| {
let lf = self.table_map.get_mut(&tbl_name.value).ok_or_else(|| {
polars_err!(
ComputeError: "no table named '{}' found",
tbl_name
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-sql/src/sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl SQLExprVisitor<'_> {
fn visit_compound_identifier(&self, idents: &[Ident]) -> PolarsResult<Expr> {
match idents {
[tbl_name, column_name] => {
let lf = self
let mut lf = self
.ctx
.get_table_from_current_scope(&tbl_name.value)
.ok_or_else(|| {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-utils/src/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl<T> Arena<T> {

#[inline]
pub fn get(&self, idx: Node) -> &T {
dbg!(idx.0);
self.items.get(idx.0).unwrap()
}

Expand Down
10 changes: 5 additions & 5 deletions py-polars/src/lazyframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct PyLazyFrame {
}

impl PyLazyFrame {
fn get_schema(&self) -> PyResult<SchemaRef> {
fn get_schema(&mut self) -> PyResult<SchemaRef> {
let schema = self.ldf.schema().map_err(PyPolarsErr::from)?;
Ok(schema)
}
Expand Down Expand Up @@ -1165,21 +1165,21 @@ impl PyLazyFrame {
self.ldf.clone().into()
}

fn columns(&self, py: Python) -> PyResult<PyObject> {
fn columns(&mut self, py: Python) -> PyResult<PyObject> {
let schema = self.get_schema()?;
let iter = schema.iter_names().map(|s| s.as_str());
Ok(PyList::new_bound(py, iter).to_object(py))
}

fn dtypes(&self, py: Python) -> PyResult<PyObject> {
fn dtypes(&mut self, py: Python) -> PyResult<PyObject> {
let schema = self.get_schema()?;
let iter = schema
.iter_dtypes()
.map(|dt| Wrap(dt.clone()).to_object(py));
Ok(PyList::new_bound(py, iter).to_object(py))
}

fn schema(&self, py: Python) -> PyResult<PyObject> {
fn schema(&mut self, py: Python) -> PyResult<PyObject> {
let schema = self.get_schema()?;
let schema_dict = PyDict::new_bound(py);

Expand All @@ -1195,7 +1195,7 @@ impl PyLazyFrame {
self.ldf.clone().unnest(columns).into()
}

fn width(&self) -> PyResult<usize> {
fn width(&mut self) -> PyResult<usize> {
Ok(self.get_schema()?.len())
}

Expand Down

0 comments on commit 92babdd

Please sign in to comment.