Skip to content

Commit

Permalink
fix: Ensure 'CachedSchema' doesn't get synced between plans (#15661)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Apr 15, 2024
1 parent f800001 commit 2eb575b
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 6 deletions.
5 changes: 3 additions & 2 deletions crates/polars-plan/src/logical_plan/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ mod schema;
use std::borrow::Cow;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

use polars_core::prelude::*;
use schema::CachedSchema;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use smartstring::alias::String as SmartString;
Expand All @@ -23,8 +25,6 @@ use crate::dsl::python_udf::PythonFunction;
use crate::logical_plan::functions::merge_sorted::merge_sorted;
use crate::prelude::*;

type CachedSchema = Arc<Mutex<Option<SchemaRef>>>;

#[derive(Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum FunctionNode {
Expand Down Expand Up @@ -96,6 +96,7 @@ pub enum FunctionNode {
RowIndex {
name: Arc<str>,
// Might be cached.
#[cfg_attr(feature = "serde", serde(skip))]
schema: CachedSchema,
offset: Option<IdxSize>,
},
Expand Down
26 changes: 26 additions & 0 deletions crates/polars-plan/src/logical_plan/functions/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,29 @@ fn row_index_schema(
*guard = Some(schema_ref.clone());
schema_ref
}

// We don't use an `Arc<Mutex>` because caches should live in different query plans.
// For that reason we have a specialized deep clone.
#[derive(Default)]
pub struct CachedSchema(Mutex<Option<SchemaRef>>);

impl AsRef<Mutex<Option<SchemaRef>>> for CachedSchema {
fn as_ref(&self) -> &Mutex<Option<SchemaRef>> {
&self.0
}
}

impl Deref for CachedSchema {
type Target = Mutex<Option<SchemaRef>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Clone for CachedSchema {
fn clone(&self) -> Self {
let inner = self.0.lock().unwrap();
Self(Mutex::new(inner.clone()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ pub(super) fn process_functions(
)
},
_ => {
let lp = IR::MapFunction {
input,
function: function.clone(),
};
if function.allow_projection_pd() && !acc_projections.is_empty() {
let original_acc_projection_len = acc_projections.len();

Expand All @@ -109,6 +105,10 @@ pub(super) fn process_functions(

// Remove the cached schema
function.clear_cached_schema();
let lp = IR::MapFunction {
input,
function: function.clone(),
};

if local_projections.is_empty() {
Ok(lp)
Expand All @@ -127,6 +127,10 @@ pub(super) fn process_functions(
}
}
} else {
let lp = IR::MapFunction {
input,
function: function.clone(),
};
// restart projection pushdown
proj_pd.no_pushdown_restart_opt(
lp,
Expand Down
11 changes: 11 additions & 0 deletions py-polars/tests/unit/test_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,3 +408,14 @@ def test_projection_drop_with_series_lit_14382() -> None:
"b_name": ["b"],
"b_old": [7],
}


def test_cached_schema_15651() -> None:
q = pl.LazyFrame({"col1": [1], "col2": [2], "col3": [3]})
q = q.with_row_index()
q = q.filter(~pl.col("col1").is_null())
# create a subplan diverging from q
_ = q.select(pl.len()).collect(projection_pushdown=True)

# ensure that q's "cached" columns are still correct
assert q.columns == q.collect().columns

0 comments on commit 2eb575b

Please sign in to comment.