Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Partial schema cache. #16549

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions crates/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
);

let mut opt_state = lf.opt_state;
let cached_arenas = lf.cached_arena.clone();

let mut lps = Vec::with_capacity(inputs.len());
lps.push(lf.logical_plan);
Expand All @@ -34,9 +35,7 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
}

let lp = DslPlan::Union { inputs: lps, args };
let mut lf = LazyFrame::from(lp);
lf.opt_state = opt_state;
Ok(lf)
Ok(LazyFrame::from_inner(lp, opt_state, cached_arenas))
}

#[cfg(feature = "diagonal_concat")]
Expand All @@ -56,9 +55,12 @@ pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
args: UnionArgs,
) -> PolarsResult<LazyFrame> {
let lfs = inputs.as_ref();
let mut opt_state = lfs.first().map(|lf| lf.opt_state).ok_or_else(
|| polars_err!(NoData: "Require at least one LazyFrame for horizontal concatenation"),
)?;
let (mut opt_state, cached_arena) = lfs
.first()
.map(|lf| (lf.opt_state, lf.cached_arena.clone()))
.ok_or_else(
|| polars_err!(NoData: "Require at least one LazyFrame for horizontal concatenation"),
)?;

for lf in &lfs[1..] {
// ensure we enable file caching if any lf has it enabled
Expand All @@ -72,10 +74,7 @@ pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
inputs: lfs.iter().map(|lf| lf.logical_plan.clone()).collect(),
options,
};
let mut lf = LazyFrame::from(lp);
lf.opt_state = opt_state;

Ok(lf)
Ok(LazyFrame::from_inner(lp, opt_state, cached_arena))
}

/// Concat multiple [`LazyFrame`]s vertically.
Expand Down
128 changes: 115 additions & 13 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub mod pivot;
feature = "json"
))]
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

pub use anonymous_scan::*;
#[cfg(feature = "csv")]
Expand Down Expand Up @@ -55,6 +55,7 @@ impl IntoLazy for DataFrame {
LazyFrame {
logical_plan: lp,
opt_state: Default::default(),
cached_arena: Default::default(),
}
}
}
Expand All @@ -73,6 +74,7 @@ impl IntoLazy for LazyFrame {
pub struct LazyFrame {
pub logical_plan: DslPlan,
pub(crate) opt_state: OptState,
pub(crate) cached_arena: Arc<Mutex<Option<CachedArena>>>,
}

impl From<DslPlan> for LazyFrame {
Expand All @@ -83,18 +85,101 @@ impl From<DslPlan> for LazyFrame {
file_caching: true,
..Default::default()
},
cached_arena: Default::default(),
}
}
}

pub(crate) struct CachedArena {
lp_arena: Arena<IR>,
expr_arena: Arena<AExpr>,
}

impl LazyFrame {
pub(crate) fn from_inner(
logical_plan: DslPlan,
opt_state: OptState,
cached_arena: Arc<Mutex<Option<CachedArena>>>,
) -> Self {
Self {
logical_plan,
opt_state,
cached_arena,
}
}

/// Get a handle to the schema — a map from column names to data types — of the current
/// `LazyFrame` computation.
///
/// Returns an `Err` if the logical plan has already encountered an error (i.e., if
/// `self.collect()` would fail), `Ok` otherwise.
pub fn schema(&self) -> PolarsResult<SchemaRef> {
self.logical_plan.compute_schema()
pub fn schema(&mut self) -> PolarsResult<SchemaRef> {
let mut cached_arenas = self.cached_arena.lock().unwrap();

match &mut *cached_arenas {
None => {
let mut lp_arena = Default::default();
let mut expr_arena = Default::default();
let node = to_alp(
self.logical_plan.clone(),
&mut expr_arena,
&mut lp_arena,
false,
true,
)?;

let schema = lp_arena.get(node).schema(&lp_arena).into_owned();

// Cache the logical plan and the arenas, so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: lp_arena.version(),
};
*cached_arenas = Some(CachedArena {
lp_arena,
expr_arena,
});

Ok(schema)
},
Some(arenas) => {
match self.logical_plan {
// We have got arenas and don't need to convert the DSL.
DslPlan::IR {
node: Some(node), ..
} => Ok(arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned()),
_ => {
// We have got arenas, but still need to convert (parts) of the DSL.
let node = to_alp(
self.logical_plan.clone(),
&mut arenas.expr_arena,
&mut arenas.lp_arena,
false,
true,
)?;

let schema = arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned();
// Cache the logical plan so that next schema call is cheap.
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: arenas.lp_arena.version(),
};

Ok(schema)
},
}
},
}
}

pub(crate) fn get_plan_builder(self) -> DslBuilder {
Expand All @@ -109,6 +194,7 @@ impl LazyFrame {
LazyFrame {
logical_plan,
opt_state,
cached_arena: Default::default(),
}
}

Expand Down Expand Up @@ -516,17 +602,25 @@ impl LazyFrame {
self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false)
}

pub fn to_alp_optimized(self) -> PolarsResult<IRPlan> {
let mut lp_arena = Arena::with_capacity(16);
let mut expr_arena = Arena::with_capacity(16);
pub fn to_alp_optimized(mut self) -> PolarsResult<IRPlan> {
let (mut lp_arena, mut expr_arena) = self.get_arenas();
let node =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?;

Ok(IRPlan::new(node, lp_arena, expr_arena))
}

pub fn to_alp(self) -> PolarsResult<IRPlan> {
self.logical_plan.to_alp()
pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
let (mut lp_arena, mut expr_arena) = self.get_arenas();
let node = to_alp(
self.logical_plan,
&mut expr_arena,
&mut lp_arena,
true,
true,
)?;
let plan = IRPlan::new(node, lp_arena, expr_arena);
Ok(plan)
}

pub(crate) fn optimize_with_scratch(
Expand Down Expand Up @@ -587,16 +681,23 @@ impl LazyFrame {
Ok(lp_top)
}

fn get_arenas(&mut self) -> (Arena<IR>, Arena<AExpr>) {
match self.cached_arena.lock().unwrap().as_mut() {
Some(arenas) => (arenas.lp_arena.clone(), arenas.expr_arena.clone()),
None => (Arena::with_capacity(16), Arena::with_capacity(16)),
}
}

fn prepare_collect_post_opt<P>(
self,
mut self,
check_sink: bool,
post_opt: P,
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
where
P: Fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<()>,
{
let mut expr_arena = Arena::with_capacity(16);
let mut lp_arena = Arena::with_capacity(16);
let (mut lp_arena, mut expr_arena) = self.get_arenas();

let mut scratch = vec![];
let lp_top =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;
Expand Down Expand Up @@ -935,7 +1036,7 @@ impl LazyFrame {
/// *group_by_dynamic*
#[cfg(feature = "dynamic_group_by")]
pub fn rolling<E: AsRef<[Expr]>>(
self,
mut self,
index_column: Expr,
group_by: E,
mut options: RollingGroupOptions,
Expand Down Expand Up @@ -980,7 +1081,7 @@ impl LazyFrame {
/// with a ordinary group_by on these keys.
#[cfg(feature = "dynamic_group_by")]
pub fn group_by_dynamic<E: AsRef<[Expr]>>(
self,
mut self,
index_column: Expr,
group_by: E,
mut options: DynamicGroupOptions,
Expand Down Expand Up @@ -1672,6 +1773,7 @@ impl From<LazyGroupBy> for LazyFrame {
Self {
logical_plan: lgb.logical_plan,
opt_state: lgb.opt_state,
cached_arena: Default::default(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ fn test_with_column_prune() -> PolarsResult<()> {
});

// whole `with_columns` pruned
let q = df.lazy().with_column(col("c0")).select([col("c1")]);
let mut q = df.lazy().with_column(col("c0")).select([col("c1")]);

let lp = q.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();

Expand Down
7 changes: 7 additions & 0 deletions crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,13 @@ pub fn to_alp_impl(
.map_err(|e| e.context(failed_input!(sink)))?;
IR::Sink { input, payload }
},
DslPlan::IR { node, dsl, version } => {
return if let (true, Some(node)) = (version == lp_arena.version(), node) {
Ok(node)
} else {
to_alp_impl(owned(dsl), expr_arena, lp_arena, convert)
}
},
};
Ok(lp_arena.add(v))
}
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ pub enum DslPlan {
input: Arc<DslPlan>,
payload: SinkType,
},
IR {
#[cfg_attr(feature = "serde", serde(skip))]
node: Option<Node>,
version: u32,
// Keep the original Dsl around as we need that for serialization.
dsl: Arc<DslPlan>,
},
}

impl Clone for DslPlan {
Expand Down Expand Up @@ -195,6 +202,7 @@ impl Clone for DslPlan {
Self::HConcat { inputs, options } => Self::HConcat { inputs: inputs.clone(), options: options.clone() },
Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() },
Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() },
Self::IR {node, dsl, version} => Self::IR {node: *node, dsl: dsl.clone(), version: *version}
}
}
}
Expand Down
18 changes: 3 additions & 15 deletions crates/polars-plan/src/logical_plan/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,10 @@ impl DslPlan {

/// Compute the schema. This requires conversion to [`IR`] and type-resolving.
pub fn compute_schema(&self) -> PolarsResult<SchemaRef> {
let opt_state = OptState {
eager: true,
type_coercion: true,
simplify_expr: false,
..Default::default()
};

let mut lp_arena = Default::default();
let node = optimize(
self.clone(),
opt_state,
&mut lp_arena,
&mut Default::default(),
&mut Default::default(),
Default::default(),
)?;
let mut expr_arena = Default::default();
let node = to_alp(self.clone(), &mut expr_arena, &mut lp_arena, false, true)?;

Ok(lp_arena.get(node).schema(&lp_arena).into_owned())
}
}
Expand Down
10 changes: 7 additions & 3 deletions crates/polars-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,11 @@ impl SQLContext {
Ok((tbl_name, lf))
}

fn process_order_by(&mut self, lf: LazyFrame, ob: &[OrderByExpr]) -> PolarsResult<LazyFrame> {
fn process_order_by(
&mut self,
mut lf: LazyFrame,
ob: &[OrderByExpr],
) -> PolarsResult<LazyFrame> {
let mut by = Vec::with_capacity(ob.len());
let mut descending = Vec::with_capacity(ob.len());

Expand All @@ -718,7 +722,7 @@ impl SQLContext {

fn process_group_by(
&mut self,
lf: LazyFrame,
mut lf: LazyFrame,
contains_wildcard: bool,
group_by_keys: &[Expr],
projections: &[Expr],
Expand Down Expand Up @@ -846,7 +850,7 @@ impl SQLContext {
let idents = idents.as_slice();
let e = match idents {
[tbl_name] => {
let lf = self.table_map.get(&tbl_name.value).ok_or_else(|| {
let lf = self.table_map.get_mut(&tbl_name.value).ok_or_else(|| {
polars_err!(
ComputeError: "no table named '{}' found",
tbl_name
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-sql/src/sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl SQLExprVisitor<'_> {
fn visit_compound_identifier(&self, idents: &[Ident]) -> PolarsResult<Expr> {
match idents {
[tbl_name, column_name] => {
let lf = self
let mut lf = self
.ctx
.get_table_from_current_scope(&tbl_name.value)
.ok_or_else(|| {
Expand Down
Loading