Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Remove IR info from DSL #18712

Merged
merged 3 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ impl LazyFileListReader for LazyJsonLineReader {
Ok(LazyFrame::from(DslPlan::Scan {
sources: Arc::new(Mutex::new(self.sources.to_dsl(false))),
file_info: Arc::new(RwLock::new(None)),
predicate: None,
file_options,
scan_type,
}))
Expand Down
14 changes: 1 addition & 13 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use polars_io::HiveOptions;
#[cfg(any(feature = "parquet", feature = "csv", feature = "ipc"))]
use polars_io::RowIndex;

use crate::constants::UNLIMITED_CACHE;
#[cfg(feature = "python")]
use crate::prelude::python_udf::PythonFunction;
use crate::prelude::*;
Expand Down Expand Up @@ -63,7 +62,6 @@ impl DslBuilder {
is_expanded: true,
})),
file_info: Arc::new(RwLock::new(Some(file_info))),
predicate: None,
file_options,
scan_type: FileScan::Anonymous {
function,
Expand Down Expand Up @@ -106,7 +104,6 @@ impl DslBuilder {
Ok(DslPlan::Scan {
sources: Arc::new(Mutex::new(sources)),
file_info: Arc::new(RwLock::new(None)),
predicate: None,
file_options: options,
scan_type: FileScan::Parquet {
options: ParquetOptions {
Expand Down Expand Up @@ -148,7 +145,6 @@ impl DslBuilder {
glob: true,
include_file_paths,
},
predicate: None,
scan_type: FileScan::Ipc {
options,
cloud_options,
Expand Down Expand Up @@ -190,7 +186,6 @@ impl DslBuilder {
sources: Arc::new(Mutex::new(sources)),
file_info: Arc::new(RwLock::new(None)),
file_options: options,
predicate: None,
scan_type: FileScan::Csv {
options: read_options,
cloud_options,
Expand All @@ -202,12 +197,7 @@ impl DslBuilder {
pub fn cache(self) -> Self {
let input = Arc::new(self.0);
let id = input.as_ref() as *const DslPlan as usize;
DslPlan::Cache {
input,
id,
cache_hits: UNLIMITED_CACHE,
}
.into()
DslPlan::Cache { input, id }.into()
}

pub fn drop(self, to_drop: Vec<Selector>, strict: bool) -> Self {
Expand Down Expand Up @@ -321,8 +311,6 @@ impl DslBuilder {
DslPlan::DataFrameScan {
df: Arc::new(df),
schema,
output_schema: None,
filter: None,
}
.into()
}
Expand Down
26 changes: 6 additions & 20 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
DslPlan::Scan {
sources,
file_info,
predicate,
mut file_options,
mut scan_type,
} => {
Expand Down Expand Up @@ -276,9 +275,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
file_info: resolved_file_info,
hive_parts,
output_schema: None,
predicate: predicate
.map(|expr| to_expr_ir(expr, ctxt.expr_arena))
.transpose()?,
predicate: None,
scan_type,
file_options,
}
Expand Down Expand Up @@ -374,18 +371,11 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_input!(slice)))?;
IR::Slice { input, offset, len }
},
DslPlan::DataFrameScan {
DslPlan::DataFrameScan { df, schema } => IR::DataFrameScan {
df,
schema,
output_schema,
filter: selection,
} => IR::DataFrameScan {
df,
schema,
output_schema,
filter: selection
.map(|expr| to_expr_ir(expr, ctxt.expr_arena))
.transpose()?,
output_schema: None,
filter: None,
},
DslPlan::Select {
expr,
Expand Down Expand Up @@ -488,17 +478,13 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult

return run_conversion(lp, ctxt, "sort");
},
DslPlan::Cache {
input,
id,
cache_hits,
} => {
DslPlan::Cache { input, id } => {
let input =
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_input!(cache)))?;
IR::Cache {
input,
id,
cache_hits,
cache_hits: crate::constants::UNLIMITED_CACHE,
}
},
DslPlan::GroupBy {
Expand Down
22 changes: 6 additions & 16 deletions crates/polars-plan/src/plans/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl IR {
sources,
file_info,
hive_parts: _,
predicate,
predicate: _,
scan_type,
output_schema: _,
file_options: options,
Expand All @@ -63,7 +63,6 @@ impl IR {
is_expanded: true,
})),
file_info: Arc::new(RwLock::new(Some(file_info))),
predicate: predicate.map(|e| e.to_expr(expr_arena)),
scan_type,
file_options: options,
},
Expand Down Expand Up @@ -109,14 +108,9 @@ impl IR {
IR::DataFrameScan {
df,
schema,
output_schema,
filter: selection,
} => DslPlan::DataFrameScan {
df,
schema,
output_schema,
filter: selection.map(|e| e.to_expr(expr_arena)),
},
output_schema: _,
filter: _,
} => DslPlan::DataFrameScan { df, schema },
IR::Select {
expr,
input,
Expand Down Expand Up @@ -170,14 +164,10 @@ impl IR {
IR::Cache {
input,
id,
cache_hits,
cache_hits: _,
} => {
let input = Arc::new(convert_to_lp(input, lp_arena));
DslPlan::Cache {
input,
id,
cache_hits,
}
DslPlan::Cache { input, id }
},
IR::GroupBy {
input,
Expand Down
19 changes: 4 additions & 15 deletions crates/polars-plan/src/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub struct DslScanSources {
pub is_expanded: bool,
}

// https://stackoverflow.com/questions/1031076/what-are-projection-and-selection
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum DslPlan {
#[cfg(feature = "python")]
Expand All @@ -75,11 +74,7 @@ pub enum DslPlan {
predicate: Expr,
},
/// Cache the input at this point in the LP
Cache {
input: Arc<DslPlan>,
id: usize,
cache_hits: u32,
},
Cache { input: Arc<DslPlan>, id: usize },
Scan {
sources: Arc<Mutex<DslScanSources>>,
// Option as this is mostly materialized on the IR phase.
Expand All @@ -88,7 +83,6 @@ pub enum DslPlan {
// are used as base of different queries in a loop. That way
// the expensive schema resolving is cached.
file_info: Arc<RwLock<Option<FileInfo>>>,
predicate: Option<Expr>,
file_options: FileScanOptions,
scan_type: FileScan,
},
Expand All @@ -97,9 +91,6 @@ pub enum DslPlan {
DataFrameScan {
df: Arc<DataFrame>,
schema: SchemaRef,
// schema of the projected file
output_schema: Option<SchemaRef>,
filter: Option<Expr>,
},
/// Polars' `select` operation, this can mean projection, but also full data access.
Select {
Expand Down Expand Up @@ -196,9 +187,9 @@ impl Clone for DslPlan {
#[cfg(feature = "python")]
Self::PythonScan { options } => Self::PythonScan { options: options.clone() },
Self::Filter { input, predicate } => Self::Filter { input: input.clone(), predicate: predicate.clone() },
Self::Cache { input, id, cache_hits } => Self::Cache { input: input.clone(), id: id.clone(), cache_hits: cache_hits.clone() },
Self::Scan { sources, file_info, predicate, file_options, scan_type } => Self::Scan { sources: sources.clone(), file_info: file_info.clone(), predicate: predicate.clone(), file_options: file_options.clone(), scan_type: scan_type.clone() },
Self::DataFrameScan { df, schema, output_schema, filter: selection } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), output_schema: output_schema.clone(), filter: selection.clone() },
Self::Cache { input, id } => Self::Cache { input: input.clone(), id: id.clone() },
Self::Scan { sources, file_info, file_options, scan_type } => Self::Scan { sources: sources.clone(), file_info: file_info.clone(), file_options: file_options.clone(), scan_type: scan_type.clone() },
Self::DataFrameScan { df, schema, } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), },
Self::Select { expr, input, options } => Self::Select { expr: expr.clone(), input: input.clone(), options: options.clone() },
Self::GroupBy { input, keys, aggs, apply, maintain_order, options } => Self::GroupBy { input: input.clone(), keys: keys.clone(), aggs: aggs.clone(), apply: apply.clone(), maintain_order: maintain_order.clone(), options: options.clone() },
Self::Join { input_left, input_right, left_on, right_on, predicates, options } => Self::Join { input_left: input_left.clone(), input_right: input_right.clone(), left_on: left_on.clone(), right_on: right_on.clone(), options: options.clone(), predicates: predicates.clone() },
Expand All @@ -223,8 +214,6 @@ impl Default for DslPlan {
DslPlan::DataFrameScan {
df: Arc::new(df),
schema: Arc::new(schema),
output_schema: None,
filter: None,
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,6 @@ def serialize(

>>> lf = pl.LazyFrame({"a": [1, 2, 3]}).sum()
>>> bytes = lf.serialize()
>>> bytes # doctest: +ELLIPSIS
b'\xa1kMapFunction\xa2einput\xa1mDataFrameScan\xa4bdf\xa1gcolumns\x81\xa4d...'

The bytes can later be deserialized back into a LazyFrame.

Expand Down
Loading