From b9fd7305b3a5f937ed3efb16d8378faddd1077f8 Mon Sep 17 00:00:00 2001 From: Stijn de Gooijer Date: Tue, 22 Oct 2024 19:21:31 +0200 Subject: [PATCH] feat: Add functionality for supporting the GPU engine on Polars Cloud (#19362) --- Cargo.lock | 2 + crates/polars-python/Cargo.toml | 4 +- crates/polars-python/src/cloud.rs | 88 +++++++++++++++++++++++++++++-- py-polars/src/lib.rs | 3 ++ 4 files changed, 93 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd2c572785fb..f1b9bd7da534 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3174,8 +3174,10 @@ dependencies = [ "polars", "polars-core", "polars-error", + "polars-expr", "polars-io", "polars-lazy", + "polars-mem-engine", "polars-ops", "polars-parquet", "polars-plan", diff --git a/crates/polars-python/Cargo.toml b/crates/polars-python/Cargo.toml index 16423dc4c15a..0bb9f3424edc 100644 --- a/crates/polars-python/Cargo.toml +++ b/crates/polars-python/Cargo.toml @@ -11,8 +11,10 @@ description = "Enable running Polars workloads in Python" [dependencies] polars-core = { workspace = true, features = ["python"] } polars-error = { workspace = true } +polars-expr = { workspace = true } polars-io = { workspace = true } polars-lazy = { workspace = true, features = ["python"] } +polars-mem-engine = { workspace = true } polars-ops = { workspace = true, features = ["bitwise"] } polars-parquet = { workspace = true, optional = true } polars-plan = { workspace = true } @@ -231,7 +233,7 @@ optimizations = [ "streaming", ] -polars_cloud = ["polars/polars_cloud"] +polars_cloud = ["polars/polars_cloud", "polars/ir_serde"] # also includes simd nightly = ["polars/nightly"] diff --git a/crates/polars-python/src/cloud.rs b/crates/polars-python/src/cloud.rs index dacca675c551..39410a6fa7a1 100644 --- a/crates/polars-python/src/cloud.rs +++ b/crates/polars-python/src/cloud.rs @@ -1,8 +1,17 @@ -use pyo3::prelude::*; -use pyo3::types::PyBytes; +use std::io::Cursor; + +use polars_core::error::{polars_err, to_compute_err, PolarsResult}; +use polars_expr::state::ExecutionState; +use polars_mem_engine::create_physical_plan; +use polars_plan::plans::{AExpr, IRPlan, IR}; +use polars_plan::prelude::{Arena, Node}; +use pyo3::intern; +use pyo3::prelude::{PyAnyMethods, PyModule, Python, *}; +use pyo3::types::{IntoPyDict, PyBytes}; use crate::error::PyPolarsErr; -use crate::PyLazyFrame; +use crate::lazyframe::visit::NodeTraverser; +use crate::{PyDataFrame, PyLazyFrame}; #[pyfunction] pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult { @@ -11,3 +20,76 @@ pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult { Ok(PyBytes::new_bound(py, &bytes).to_object(py)) } + +/// Take a serialized `IRPlan` and execute it on the GPU engine. +/// +/// This is done as a Python function because the `NodeTraverser` class created for this purpose +/// must exactly match the one expected by the `cudf_polars` package. +#[pyfunction] +pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec, py: Python) -> PyResult { + // Deserialize into IRPlan. + let reader = Cursor::new(ir_plan_ser); + let mut ir_plan = ciborium::from_reader::(reader) + .map_err(to_compute_err) + .map_err(PyPolarsErr::from)?; + + // Edit for use with GPU engine. + gpu_post_opt( + py, + ir_plan.lp_top, + &mut ir_plan.lp_arena, + &mut ir_plan.expr_arena, + ) + .map_err(PyPolarsErr::from)?; + + // Convert to physical plan. + let mut physical_plan = + create_physical_plan(ir_plan.lp_top, &mut ir_plan.lp_arena, &ir_plan.expr_arena) + .map_err(PyPolarsErr::from)?; + + // Execute the plan. + let mut state = ExecutionState::new(); + let df = physical_plan + .execute(&mut state) + .map_err(PyPolarsErr::from)?; + + Ok(df.into()) +} + +/// Prepare the IR for execution by the Polars GPU engine. +fn gpu_post_opt( + py: Python, + root: Node, + lp_arena: &mut Arena, + expr_arena: &mut Arena, +) -> PolarsResult<()> { + // Get cuDF Python function. + let cudf = PyModule::import_bound(py, intern!(py, "cudf_polars")).unwrap(); + let lambda = cudf.getattr(intern!(py, "execute_with_cudf")).unwrap(); + + // Define cuDF config. + let polars = PyModule::import_bound(py, intern!(py, "polars")).unwrap(); + let engine = polars.getattr(intern!(py, "GPUEngine")).unwrap(); + let kwargs = [("raise_on_fail", true)].into_py_dict_bound(py); + let engine = engine.call((), Some(&kwargs)).unwrap(); + + // Define node traverser. + let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena)); + + // Get a copy of the arenas. + let arenas = nt.get_arenas(); + + // Pass the node visitor which allows the Python callback to replace parts of the query plan. + // Remove "cuda" or specify better once we have multiple post-opt callbacks. + let kwargs = [("config", engine)].into_py_dict_bound(py); + lambda + .call((nt,), Some(&kwargs)) + .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?; + + // Unpack the arena's. + // At this point the `nt` is useless. + std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap()); + std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap()); + + Ok(()) +} diff --git a/py-polars/src/lib.rs b/py-polars/src/lib.rs index 1c645738102a..c4cbab12f056 100644 --- a/py-polars/src/lib.rs +++ b/py-polars/src/lib.rs @@ -377,6 +377,9 @@ fn polars(py: Python, m: &Bound) -> PyResult<()> { #[cfg(feature = "polars_cloud")] m.add_wrapped(wrap_pyfunction!(cloud::prepare_cloud_plan)) .unwrap(); + #[cfg(feature = "polars_cloud")] + m.add_wrapped(wrap_pyfunction!(cloud::_execute_ir_plan_with_gpu)) + .unwrap(); // Build info m.add("__version__", env!("CARGO_PKG_VERSION"))?;