Skip to content

Commit

Permalink
partial schema cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 28, 2024
1 parent 92babdd commit a7b4dfd
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 32 deletions.
62 changes: 37 additions & 25 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ impl IntoLazy for LazyFrame {
}
}

struct CachedArena {
lp_arena: Arena<IR>,
expr_arena: Arena<AExpr>,
}

/// Lazy abstraction over an eager `DataFrame`.
/// It really is an abstraction over a logical plan. The methods of this struct will incrementally
/// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)).
Expand All @@ -95,6 +90,11 @@ impl From<DslPlan> for LazyFrame {
}
}

struct CachedArena {
lp_arena: Arena<IR>,
expr_arena: Arena<AExpr>,
}

impl LazyFrame {
/// Get a handle to the schema — a map from column names to data types — of the current
/// `LazyFrame` computation.
Expand All @@ -119,8 +119,11 @@ impl LazyFrame {
let schema = lp_arena.get(node).schema(&lp_arena).into_owned();

// Cache the logical plan and the arenas, so that next schema call is cheap.
self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())};
dbg!("set", lp_arena.len());
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: lp_arena.version(),
};
*cached_arenas = Some(CachedArena {
lp_arena,
expr_arena,
Expand All @@ -131,13 +134,13 @@ impl LazyFrame {
Some(arenas) => {
match self.logical_plan {
// We have got arenas and don't need to convert the DSL.
DslPlan::IR {node: Some(node), ..} => {
Ok(arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned())
},
DslPlan::IR {
node: Some(node), ..
} => Ok(arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned()),
_ => {
// We have got arenas, but still need to convert (parts) of the DSL.
let node = to_alp(
Expand All @@ -148,13 +151,20 @@ impl LazyFrame {
true,
)?;

dbg!("set", arenas.lp_arena.len());
let schema = arenas.lp_arena.get(node).schema(&arenas.lp_arena).into_owned();
let schema = arenas
.lp_arena
.get(node)
.schema(&arenas.lp_arena)
.into_owned();
// Cache the logical plan so that next schema call is cheap.
self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())};
self.logical_plan = DslPlan::IR {
node: Some(node),
dsl: Arc::new(self.logical_plan.clone()),
version: arenas.lp_arena.version(),
};

Ok(schema)
}
},
}
},
}
Expand Down Expand Up @@ -590,7 +600,13 @@ impl LazyFrame {

pub fn to_alp(mut self) -> PolarsResult<IRPlan> {
let (mut lp_arena, mut expr_arena) = self.get_arenas();
let node = to_alp(self.logical_plan, &mut expr_arena, &mut lp_arena, true, true)?;
let node = to_alp(
self.logical_plan,
&mut expr_arena,
&mut lp_arena,
true,
true,
)?;
let plan = IRPlan::new(node, lp_arena, expr_arena);
Ok(plan)
}
Expand Down Expand Up @@ -655,12 +671,8 @@ impl LazyFrame {

fn get_arenas(&mut self) -> (Arena<IR>, Arena<AExpr>) {
match self.cached_arena.lock().unwrap().as_mut() {
Some(arenas) => {
(arenas.lp_arena.clone(), arenas.expr_arena.clone())
},
None => {
(Arena::with_capacity(16), Arena::with_capacity(16))
}
Some(arenas) => (arenas.lp_arena.clone(), arenas.expr_arena.clone()),
None => (Arena::with_capacity(16), Arena::with_capacity(16)),
}
}

Expand Down
6 changes: 2 additions & 4 deletions crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ pub fn to_alp_impl(

Ok(lp_node)
}
dbg!(lp_arena.len());

let v = match lp {
DslPlan::Scan {
Expand Down Expand Up @@ -566,9 +565,8 @@ pub fn to_alp_impl(
.map_err(|e| e.context(failed_input!(sink)))?;
IR::Sink { input, payload }
},
DslPlan::IR { node, dsl } => {
dbg!("IR", lp_arena.len());
return if let Some(node) = node {
DslPlan::IR { node, dsl, version } => {
return if let (true, Some(node)) = (version == lp_arena.version(), node) {
Ok(node)
} else {
to_alp_impl(owned(dsl), expr_arena, lp_arena, convert)
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ pub enum DslPlan {
IR {
#[cfg_attr(feature = "serde", serde(skip))]
node: Option<Node>,
version: u32,
// Keep the original Dsl around as we need that for serialization.
dsl: Arc<DslPlan>,
},
Expand Down Expand Up @@ -201,7 +202,7 @@ impl Clone for DslPlan {
Self::HConcat { inputs, options } => Self::HConcat { inputs: inputs.clone(), options: options.clone() },
Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() },
Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() },
Self::IR {node, dsl} => Self::IR {node: *node, dsl: dsl.clone()}
Self::IR {node, dsl, version} => Self::IR {node: *node, dsl: dsl.clone(), version: *version}
}
}
}
Expand Down
17 changes: 15 additions & 2 deletions crates/polars-utils/src/arena.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::{AtomicU32, Ordering};

use crate::error::*;
use crate::slice::GetSaferUnchecked;

Expand Down Expand Up @@ -27,8 +29,11 @@ impl Default for Node {
}
}

static ARENA_VERSION: AtomicU32 = AtomicU32::new(0);

#[derive(Clone)]
pub struct Arena<T> {
version: u32,
items: Vec<T>,
}

Expand All @@ -41,6 +46,11 @@ impl<T> Default for Arena<T> {
/// Simple Arena implementation
/// Allocates memory and stores item in a Vec. Only deallocates when being dropped itself.
impl<T> Arena<T> {
#[inline]
pub fn version(&self) -> u32 {
self.version
}

pub fn add(&mut self, val: T) -> Node {
let idx = self.items.len();
self.items.push(val);
Expand All @@ -60,12 +70,16 @@ impl<T> Arena<T> {
}

pub fn new() -> Self {
Arena { items: vec![] }
Arena {
items: vec![],
version: ARENA_VERSION.fetch_add(1, Ordering::Relaxed),
}
}

pub fn with_capacity(cap: usize) -> Self {
Arena {
items: Vec::with_capacity(cap),
version: ARENA_VERSION.fetch_add(1, Ordering::Relaxed),
}
}

Expand All @@ -79,7 +93,6 @@ impl<T> Arena<T> {

#[inline]
pub fn get(&self, idx: Node) -> &T {
dbg!(idx.0);
self.items.get(idx.0).unwrap()
}

Expand Down

0 comments on commit a7b4dfd

Please sign in to comment.