Skip to content

Commit

Permalink
Merge pull request #1 from itamarst/poc
Browse files Browse the repository at this point in the history
Get the PoC to state where the new test passes
  • Loading branch information
tmct authored Oct 30, 2024
2 parents 3200da6 + 93c4ccb commit 62a4b3b
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 23 deletions.
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ fn create_physical_plan_impl(
e,
Context::Default,
expr_arena,
&options.schema,
&options.schema.get_schema()?,
&mut state,
)
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ fn get_schema(lp_arena: &Arena<IR>, lp_node: Node) -> Cow<'_, SchemaRef> {
let inputs = get_input(lp_arena, lp_node);
if inputs.is_empty() {
// Files don't have an input, so we must take their schema.
Cow::Borrowed(lp_arena.get(lp_node).scan_schema())
Cow::Owned(lp_arena.get(lp_node).scan_schema())
} else {
let input = inputs[0];
lp_arena.get(input).schema(lp_arena)
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/ir/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<'a> IRDotDisplay<'a> {
PythonPredicate::None => "none".to_string(),
};
let with_columns = NumColumns(options.with_columns.as_ref().map(|s| s.as_ref()));
let total_columns = options.schema.len();
let total_columns = options.schema.get_schema().unwrap().len();

write_label(f, id, |f| {
write!(
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/ir/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl<'a> IRDisplay<'a> {
match self.root() {
#[cfg(feature = "python")]
PythonScan { options } => {
let total_columns = options.schema.len();
let total_columns = options.schema.get_schema().unwrap().len();
let n_columns = options
.with_columns
.as_ref()
Expand Down
21 changes: 7 additions & 14 deletions crates/polars-plan/src/plans/ir/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use super::*;
impl IR {
/// Get the schema of the logical plan node but don't take projections into account at the scan
/// level. This ensures we can apply the predicate
pub(crate) fn scan_schema(&self) -> &SchemaRef {
pub(crate) fn scan_schema(&self) -> SchemaRef {
use IR::*;
match self {
Scan { file_info, .. } => &file_info.schema,
Scan { file_info, .. } => file_info.schema.clone(),
#[cfg(feature = "python")]
PythonScan { options, .. } => &options.schema,
PythonScan { options, .. } => options.schema.get_schema().unwrap(),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -51,7 +51,7 @@ impl IR {
use IR::*;
let schema = match self {
#[cfg(feature = "python")]
PythonScan { options } => &options.schema,
PythonScan { options } => return Some(Cow::Owned(options.schema.get_schema().unwrap())),
DataFrameScan { schema, .. } => schema,
Scan { file_info, .. } => &file_info.schema,
node => {
Expand All @@ -68,7 +68,7 @@ impl IR {
use IR::*;
let schema = match self {
#[cfg(feature = "python")]
PythonScan { options } => options.output_schema.as_ref().unwrap_or(&options.schema),
PythonScan { options } => return Cow::Owned(options.output_schema.as_ref().map(|s| s.clone()).unwrap_or_else(|| options.schema.get_schema().unwrap())),
Union { inputs, .. } => return arena.get(inputs[0]).schema(arena),
HConcat { schema, .. } => schema,
Cache { input, .. } => return arena.get(*input).schema(arena),
Expand Down Expand Up @@ -123,16 +123,9 @@ impl IR {
let schema = match arena.get(node) {
#[cfg(feature = "python")]
PythonScan {
options: PythonOptions { schema: PySchemaSource::SchemaRef(schema), output_schema, .. }
} =>
output_schema
.as_ref()
.unwrap_or(schema)
.clone(),
PythonScan {
options: PythonOptions { schema: PySchemaSource::PythonFunction(schemaFn), output_schema, .. }
options: PythonOptions { schema, .. }
} =>
panic!("TODO invoke py function and get a schema"),
schema.get_schema().unwrap(), // TODO better error handling
Union { inputs, .. } => IR::schema_with_cache(inputs[0], arena, cache),
HConcat { schema, .. } => schema.clone(),
Cache { input, .. }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl ProjectionPushDown {
Some(Arc::new(update_scan_schema(
&acc_projections,
expr_arena,
&options.schema,
&options.schema.get_schema().unwrap(),
true,
)?))
};
Expand Down
28 changes: 25 additions & 3 deletions crates/polars-plan/src/plans/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,37 @@ pub struct LogicalPlanUdfOptions {
pub fmt_str: &'static str,
}

#[derive(Clone, PartialEq, Eq, Debug, Default)]
#[derive(Clone, PartialEq, Eq, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg(feature = "python")]
pub enum PySchemaSource {
#[default]
SchemaRef(SchemaRef),
PythonFunction(Py),
PythonFunction(PythonFunction),
}

#[cfg(feature = "python")]
impl Default for PySchemaSource {
fn default() -> Self {
Self::SchemaRef(SchemaRef::default())
}
}

#[cfg(feature = "python")]
impl PySchemaSource {
pub fn get_schema(&self) -> PolarsResult<SchemaRef> {
use pyo3::prelude::*;

let result = match self {
Self::SchemaRef(schema) => Ok(schema.clone()),
Self::PythonFunction(schema_fn) => Python::with_gil(|py| {
let schema_addr: usize = schema_fn.0.call0(py)?.extract(py)?;
let schema = unsafe {Box::from_raw(schema_addr as *mut Schema)};
PyResult::<SchemaRef>::Ok(Arc::new(*schema))
}),
}.unwrap(); // TODO convert error
Ok(result)
}
}

#[derive(Clone, PartialEq, Eq, Debug, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down
12 changes: 12 additions & 0 deletions crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,18 @@ impl PyLazyFrame {
Ok(LazyFrame::scan_from_python_function(schema, scan_fn, pyarrow).into())
}

/// Return address of Box<Schema>.
#[staticmethod]
fn _load_schema(schema: Vec<(PyBackedStr, Wrap<DataType>)>) -> PyResult<usize> {
// TODO duplicate of code below, clean it up later
let schema = Schema::from_iter(
schema
.into_iter()
.map(|(name, dt)| Field::new((&*name).into(), dt.0)),
);
Ok(Box::into_raw(Box::new(schema)) as usize)
}

#[staticmethod]
fn scan_from_python_function_deferred_schema(
schema_fn: PyObject,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-python/src/lazyframe/visitor/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult<PyObject> {
PythonScanSource::Pyarrow => "pyarrow",
PythonScanSource::Cuda => "cuda",
PythonScanSource::IOPlugin => "io_plugin",
PythonScanSource::IOPluginDeferredSchema => "deferred_io_plugin",
};

PythonScan {
Expand Down
9 changes: 8 additions & 1 deletion py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,15 @@ def _scan_python_function(
) -> LazyFrame:
self = cls.__new__(cls)
if callable(schema):

def convert_schema():
schema_dict = schema()
if isinstance(schema_dict, Mapping):
return PyLazyFrame._load_schema(list(schema_dict.items()))
raise NotImplementedError("TODO support other things?")

self._ldf = PyLazyFrame.scan_from_python_function_deferred_schema(
schema, scan_fn
convert_schema, scan_fn
)
elif isinstance(schema, Mapping):
self._ldf = PyLazyFrame.scan_from_python_function_pl_schema(
Expand Down

0 comments on commit 62a4b3b

Please sign in to comment.