From a7b4dfdcd98f7661c7d6bce2013059880b94a675 Mon Sep 17 00:00:00 2001 From: ritchie Date: Tue, 28 May 2024 14:05:52 +0200 Subject: [PATCH] partial schema cache --- crates/polars-lazy/src/frame/mod.rs | 62 +++++++++++-------- .../src/logical_plan/conversion/dsl_to_ir.rs | 6 +- crates/polars-plan/src/logical_plan/mod.rs | 3 +- crates/polars-utils/src/arena.rs | 17 ++++- 4 files changed, 56 insertions(+), 32 deletions(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 9f1cfb194bfd..66a757721872 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -66,11 +66,6 @@ impl IntoLazy for LazyFrame { } } -struct CachedArena { - lp_arena: Arena, - expr_arena: Arena, -} - /// Lazy abstraction over an eager `DataFrame`. /// It really is an abstraction over a logical plan. The methods of this struct will incrementally /// modify a logical plan until output is requested (via [`collect`](crate::frame::LazyFrame::collect)). @@ -95,6 +90,11 @@ impl From for LazyFrame { } } +struct CachedArena { + lp_arena: Arena, + expr_arena: Arena, +} + impl LazyFrame { /// Get a handle to the schema — a map from column names to data types — of the current /// `LazyFrame` computation. @@ -119,8 +119,11 @@ impl LazyFrame { let schema = lp_arena.get(node).schema(&lp_arena).into_owned(); // Cache the logical plan and the arenas, so that next schema call is cheap. - self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())}; - dbg!("set", lp_arena.len()); + self.logical_plan = DslPlan::IR { + node: Some(node), + dsl: Arc::new(self.logical_plan.clone()), + version: lp_arena.version(), + }; *cached_arenas = Some(CachedArena { lp_arena, expr_arena, @@ -131,13 +134,13 @@ impl LazyFrame { Some(arenas) => { match self.logical_plan { // We have got arenas and don't need to convert the DSL. - DslPlan::IR {node: Some(node), ..} => { - Ok(arenas - .lp_arena - .get(node) - .schema(&arenas.lp_arena) - .into_owned()) - }, + DslPlan::IR { + node: Some(node), .. + } => Ok(arenas + .lp_arena + .get(node) + .schema(&arenas.lp_arena) + .into_owned()), _ => { // We have got arenas, but still need to convert (parts) of the DSL. let node = to_alp( @@ -148,13 +151,20 @@ impl LazyFrame { true, )?; - dbg!("set", arenas.lp_arena.len()); - let schema = arenas.lp_arena.get(node).schema(&arenas.lp_arena).into_owned(); + let schema = arenas + .lp_arena + .get(node) + .schema(&arenas.lp_arena) + .into_owned(); // Cache the logical plan so that next schema call is cheap. - self.logical_plan = DslPlan::IR {node: Some(node), dsl: Arc::new(self.logical_plan.clone())}; + self.logical_plan = DslPlan::IR { + node: Some(node), + dsl: Arc::new(self.logical_plan.clone()), + version: arenas.lp_arena.version(), + }; Ok(schema) - } + }, } }, } @@ -590,7 +600,13 @@ impl LazyFrame { pub fn to_alp(mut self) -> PolarsResult { let (mut lp_arena, mut expr_arena) = self.get_arenas(); - let node = to_alp(self.logical_plan, &mut expr_arena, &mut lp_arena, true, true)?; + let node = to_alp( + self.logical_plan, + &mut expr_arena, + &mut lp_arena, + true, + true, + )?; let plan = IRPlan::new(node, lp_arena, expr_arena); Ok(plan) } @@ -655,12 +671,8 @@ impl LazyFrame { fn get_arenas(&mut self) -> (Arena, Arena) { match self.cached_arena.lock().unwrap().as_mut() { - Some(arenas) => { - (arenas.lp_arena.clone(), arenas.expr_arena.clone()) - }, - None => { - (Arena::with_capacity(16), Arena::with_capacity(16)) - } + Some(arenas) => (arenas.lp_arena.clone(), arenas.expr_arena.clone()), + None => (Arena::with_capacity(16), Arena::with_capacity(16)), } } diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs index 24c0aa4b3539..786d22f06d37 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs @@ -79,7 +79,6 @@ pub fn to_alp_impl( Ok(lp_node) } - dbg!(lp_arena.len()); let v = match lp { DslPlan::Scan { @@ -566,9 +565,8 @@ pub fn to_alp_impl( .map_err(|e| e.context(failed_input!(sink)))?; IR::Sink { input, payload } }, - DslPlan::IR { node, dsl } => { - dbg!("IR", lp_arena.len()); - return if let Some(node) = node { + DslPlan::IR { node, dsl, version } => { + return if let (true, Some(node)) = (version == lp_arena.version(), node) { Ok(node) } else { to_alp_impl(owned(dsl), expr_arena, lp_arena, convert) diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index e0ebebb1a212..313d6cbe6644 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -170,6 +170,7 @@ pub enum DslPlan { IR { #[cfg_attr(feature = "serde", serde(skip))] node: Option, + version: u32, // Keep the original Dsl around as we need that for serialization. dsl: Arc, }, @@ -201,7 +202,7 @@ impl Clone for DslPlan { Self::HConcat { inputs, options } => Self::HConcat { inputs: inputs.clone(), options: options.clone() }, Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() }, Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() }, - Self::IR {node, dsl} => Self::IR {node: *node, dsl: dsl.clone()} + Self::IR {node, dsl, version} => Self::IR {node: *node, dsl: dsl.clone(), version: *version} } } } diff --git a/crates/polars-utils/src/arena.rs b/crates/polars-utils/src/arena.rs index 2e4505c3b060..ccd09b66171e 100644 --- a/crates/polars-utils/src/arena.rs +++ b/crates/polars-utils/src/arena.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::{AtomicU32, Ordering}; + use crate::error::*; use crate::slice::GetSaferUnchecked; @@ -27,8 +29,11 @@ impl Default for Node { } } +static ARENA_VERSION: AtomicU32 = AtomicU32::new(0); + #[derive(Clone)] pub struct Arena { + version: u32, items: Vec, } @@ -41,6 +46,11 @@ impl Default for Arena { /// Simple Arena implementation /// Allocates memory and stores item in a Vec. Only deallocates when being dropped itself. impl Arena { + #[inline] + pub fn version(&self) -> u32 { + self.version + } + pub fn add(&mut self, val: T) -> Node { let idx = self.items.len(); self.items.push(val); @@ -60,12 +70,16 @@ impl Arena { } pub fn new() -> Self { - Arena { items: vec![] } + Arena { + items: vec![], + version: ARENA_VERSION.fetch_add(1, Ordering::Relaxed), + } } pub fn with_capacity(cap: usize) -> Self { Arena { items: Vec::with_capacity(cap), + version: ARENA_VERSION.fetch_add(1, Ordering::Relaxed), } } @@ -79,7 +93,6 @@ impl Arena { #[inline] pub fn get(&self, idx: Node) -> &T { - dbg!(idx.0); self.items.get(idx.0).unwrap() }