From c2e26c58348c5b1e3d3545afd72749fc5994f1d4 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Wed, 16 Oct 2024 12:07:37 +0200 Subject: [PATCH 1/4] Set up IR update function for Polars Cloud --- crates/polars-python/Cargo.toml | 2 +- crates/polars-python/src/cloud.rs | 79 ++++++++++++++++++++++++++++++- py-polars/src/lib.rs | 3 ++ 3 files changed, 81 insertions(+), 3 deletions(-) diff --git a/crates/polars-python/Cargo.toml b/crates/polars-python/Cargo.toml index 16423dc4c15a..02602272b9d7 100644 --- a/crates/polars-python/Cargo.toml +++ b/crates/polars-python/Cargo.toml @@ -231,7 +231,7 @@ optimizations = [ "streaming", ] -polars_cloud = ["polars/polars_cloud"] +polars_cloud = ["polars/polars_cloud", "polars/ir_serde"] # also includes simd nightly = ["polars/nightly"] diff --git a/crates/polars-python/src/cloud.rs b/crates/polars-python/src/cloud.rs index dacca675c551..3eead24c73ae 100644 --- a/crates/polars-python/src/cloud.rs +++ b/crates/polars-python/src/cloud.rs @@ -1,7 +1,14 @@ -use pyo3::prelude::*; -use pyo3::types::PyBytes; +use std::io::Cursor; + +use polars_core::error::{polars_err, to_compute_err, PolarsResult}; +use polars_plan::plans::{AExpr, IRPlan, IR}; +use polars_plan::prelude::{Arena, Node}; +use pyo3::intern; +use pyo3::prelude::{PyAnyMethods, PyModule, Python, *}; +use pyo3::types::{IntoPyDict, PyBytes}; use crate::error::PyPolarsErr; +use crate::lazyframe::visit::NodeTraverser; use crate::PyLazyFrame; #[pyfunction] @@ -11,3 +18,71 @@ pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult { Ok(PyBytes::new_bound(py, &bytes).to_object(py)) } + +/// Update a serialized `IRPlan` for use with the GPU engine. +/// +/// This is done as a Python function because the `NodeTraverser` class created for this purpose +/// must exactly match the one expected by the `cudf_polars` package. +#[pyfunction] +pub fn _update_ir_plan_for_gpu(ir_plan_ser: Vec, py: Python) -> PyResult { + // Deserialize into IRPlan. + let reader = Cursor::new(ir_plan_ser); + let mut ir_plan = ciborium::from_reader::(reader) + .map_err(to_compute_err) + .map_err(PyPolarsErr::from)?; + + // Edit for use with GPU engine. + gpu_post_opt( + py, + ir_plan.lp_top, + &mut ir_plan.lp_arena, + &mut ir_plan.expr_arena, + ) + .map_err(PyPolarsErr::from)?; + + // Serialize the result. + let mut writer = Vec::new(); + ciborium::into_writer(&ir_plan, &mut writer) + .map_err(to_compute_err) + .map_err(PyPolarsErr::from)?; + + Ok(PyBytes::new_bound(py, &writer).to_object(py)) +} + +/// Prepare the IR for execution by the Polars GPU engine. +fn gpu_post_opt( + py: Python, + root: Node, + lp_arena: &mut Arena, + expr_arena: &mut Arena, +) -> PolarsResult<()> { + // Get cuDF Python function. + let cudf = PyModule::import_bound(py, intern!(py, "cudf_polars")).unwrap(); + let lambda = cudf.getattr(intern!(py, "execute_with_cudf")).unwrap(); + + // Define cuDF config. + let polars = PyModule::import_bound(py, intern!(py, "polars")).unwrap(); + let engine = polars.getattr(intern!(py, "GPUEngine")).unwrap(); + let kwargs = [("raise_on_fail", true)].into_py_dict_bound(py); + let engine = engine.call((), Some(&kwargs)).unwrap(); + + // Define node traverser. + let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena)); + + // Get a copy of the arenas. + let arenas = nt.get_arenas(); + + // Pass the node visitor which allows the Python callback to replace parts of the query plan. + // Remove "cuda" or specify better once we have multiple post-opt callbacks. + let kwargs = [("config", engine)].into_py_dict_bound(py); + lambda + .call((nt,), Some(&kwargs)) + .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?; + + // Unpack the arena's. + // At this point the `nt` is useless. + std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap()); + std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap()); + + Ok(()) +} diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index 1c645738102a..67603e6d93bb 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -377,6 +377,9 @@ fn polars(py: Python, m: &Bound) -> PyResult<()> { #[cfg(feature = "polars_cloud")] m.add_wrapped(wrap_pyfunction!(cloud::prepare_cloud_plan)) .unwrap(); + #[cfg(feature = "polars_cloud")] + m.add_wrapped(wrap_pyfunction!(cloud::_update_ir_plan_for_gpu)) + .unwrap(); // Build info m.add("__version__", env!("CARGO_PKG_VERSION"))?; From 290675c081f2767516ded5a09420013aa2c4ceb6 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Mon, 21 Oct 2024 18:05:37 +0200 Subject: [PATCH 2/4] Debug statements --- crates/polars-python/src/cloud.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/polars-python/src/cloud.rs b/crates/polars-python/src/cloud.rs index 3eead24c73ae..2886b694047c 100644 --- a/crates/polars-python/src/cloud.rs +++ b/crates/polars-python/src/cloud.rs @@ -25,12 +25,16 @@ pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult { /// must exactly match the one expected by the `cudf_polars` package. #[pyfunction] pub fn _update_ir_plan_for_gpu(ir_plan_ser: Vec, py: Python) -> PyResult { + eprintln!("Update function called"); + // Deserialize into IRPlan. let reader = Cursor::new(ir_plan_ser); let mut ir_plan = ciborium::from_reader::(reader) .map_err(to_compute_err) .map_err(PyPolarsErr::from)?; + eprintln!("Deserialized"); + // Edit for use with GPU engine. gpu_post_opt( py, @@ -40,12 +44,16 @@ pub fn _update_ir_plan_for_gpu(ir_plan_ser: Vec, py: Python) -> PyResult Date: Mon, 21 Oct 2024 18:29:12 +0200 Subject: [PATCH 3/4] Update to execute on the Python side --- Cargo.lock | 2 ++ crates/polars-python/Cargo.toml | 2 ++ crates/polars-python/src/cloud.rs | 23 +++++++++++++++-------- py-polars/src/lib.rs | 2 +- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd2c572785fb..f1b9bd7da534 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3174,8 +3174,10 @@ dependencies = [ "polars", "polars-core", "polars-error", + "polars-expr", "polars-io", "polars-lazy", + "polars-mem-engine", "polars-ops", "polars-parquet", "polars-plan", diff --git a/crates/polars-python/Cargo.toml b/crates/polars-python/Cargo.toml index 02602272b9d7..0bb9f3424edc 100644 --- a/crates/polars-python/Cargo.toml +++ b/crates/polars-python/Cargo.toml @@ -11,8 +11,10 @@ description = "Enable running Polars workloads in Python" [dependencies] polars-core = { workspace = true, features = ["python"] } polars-error = { workspace = true } +polars-expr = { workspace = true } polars-io = { workspace = true } polars-lazy = { workspace = true, features = ["python"] } +polars-mem-engine = { workspace = true } polars-ops = { workspace = true, features = ["bitwise"] } polars-parquet = { workspace = true, optional = true } polars-plan = { workspace = true } diff --git a/crates/polars-python/src/cloud.rs b/crates/polars-python/src/cloud.rs index 2886b694047c..31653e19d5f6 100644 --- a/crates/polars-python/src/cloud.rs +++ b/crates/polars-python/src/cloud.rs @@ -1,6 +1,8 @@ use std::io::Cursor; use polars_core::error::{polars_err, to_compute_err, PolarsResult}; +use polars_expr::state::ExecutionState; +use polars_mem_engine::create_physical_plan; use polars_plan::plans::{AExpr, IRPlan, IR}; use polars_plan::prelude::{Arena, Node}; use pyo3::intern; @@ -9,7 +11,7 @@ use pyo3::types::{IntoPyDict, PyBytes}; use crate::error::PyPolarsErr; use crate::lazyframe::visit::NodeTraverser; -use crate::PyLazyFrame; +use crate::{PyDataFrame, PyLazyFrame}; #[pyfunction] pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult { @@ -24,7 +26,7 @@ pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult { /// This is done as a Python function because the `NodeTraverser` class created for this purpose /// must exactly match the one expected by the `cudf_polars` package. #[pyfunction] -pub fn _update_ir_plan_for_gpu(ir_plan_ser: Vec, py: Python) -> PyResult { +pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec, py: Python) -> PyResult { eprintln!("Update function called"); // Deserialize into IRPlan. @@ -46,15 +48,20 @@ pub fn _update_ir_plan_for_gpu(ir_plan_ser: Vec, py: Python) -> PyResult) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(cloud::prepare_cloud_plan)) .unwrap(); #[cfg(feature = "polars_cloud")] - m.add_wrapped(wrap_pyfunction!(cloud::_update_ir_plan_for_gpu)) + m.add_wrapped(wrap_pyfunction!(cloud::_execute_ir_plan_with_gpu)) .unwrap(); // Build info From db137a86928b4ca1927f28611d30ed22bace51fe Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Mon, 21 Oct 2024 21:42:59 +0200 Subject: [PATCH 4/4] Clean up debug statements --- crates/polars-python/src/cloud.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/crates/polars-python/src/cloud.rs b/crates/polars-python/src/cloud.rs index 31653e19d5f6..39410a6fa7a1 100644 --- a/crates/polars-python/src/cloud.rs +++ b/crates/polars-python/src/cloud.rs @@ -21,22 +21,18 @@ pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult { Ok(PyBytes::new_bound(py, &bytes).to_object(py)) } -/// Update a serialized `IRPlan` for use with the GPU engine. +/// Take a serialized `IRPlan` and execute it on the GPU engine. /// /// This is done as a Python function because the `NodeTraverser` class created for this purpose /// must exactly match the one expected by the `cudf_polars` package. #[pyfunction] pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec, py: Python) -> PyResult { - eprintln!("Update function called"); - // Deserialize into IRPlan. let reader = Cursor::new(ir_plan_ser); let mut ir_plan = ciborium::from_reader::(reader) .map_err(to_compute_err) .map_err(PyPolarsErr::from)?; - eprintln!("Deserialized"); - // Edit for use with GPU engine. gpu_post_opt( py, @@ -46,21 +42,17 @@ pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec, py: Python) -> PyResult