Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add functionality for supporting the GPU engine on Polars Cloud #19362

Merged
merged 4 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/polars-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ description = "Enable running Polars workloads in Python"
[dependencies]
polars-core = { workspace = true, features = ["python"] }
polars-error = { workspace = true }
polars-expr = { workspace = true }
polars-io = { workspace = true }
polars-lazy = { workspace = true, features = ["python"] }
polars-mem-engine = { workspace = true }
polars-ops = { workspace = true, features = ["bitwise"] }
polars-parquet = { workspace = true, optional = true }
polars-plan = { workspace = true }
Expand Down Expand Up @@ -231,7 +233,7 @@ optimizations = [
"streaming",
]

polars_cloud = ["polars/polars_cloud"]
polars_cloud = ["polars/polars_cloud", "polars/ir_serde"]

# also includes simd
nightly = ["polars/nightly"]
Expand Down
88 changes: 85 additions & 3 deletions crates/polars-python/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use std::io::Cursor;

use polars_core::error::{polars_err, to_compute_err, PolarsResult};
use polars_expr::state::ExecutionState;
use polars_mem_engine::create_physical_plan;
use polars_plan::plans::{AExpr, IRPlan, IR};
use polars_plan::prelude::{Arena, Node};
use pyo3::intern;
use pyo3::prelude::{PyAnyMethods, PyModule, Python, *};
use pyo3::types::{IntoPyDict, PyBytes};

use crate::error::PyPolarsErr;
use crate::PyLazyFrame;
use crate::lazyframe::visit::NodeTraverser;
use crate::{PyDataFrame, PyLazyFrame};

#[pyfunction]
pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult<PyObject> {
Expand All @@ -11,3 +20,76 @@ pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult<PyObject> {

Ok(PyBytes::new_bound(py, &bytes).to_object(py))
}

/// Take a serialized `IRPlan` and execute it on the GPU engine.
///
/// This is done as a Python function because the `NodeTraverser` class created for this purpose
/// must exactly match the one expected by the `cudf_polars` package.
#[pyfunction]
pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec<u8>, py: Python) -> PyResult<PyDataFrame> {
// Deserialize into IRPlan.
let reader = Cursor::new(ir_plan_ser);
let mut ir_plan = ciborium::from_reader::<IRPlan, _>(reader)
.map_err(to_compute_err)
.map_err(PyPolarsErr::from)?;

// Edit for use with GPU engine.
gpu_post_opt(
py,
ir_plan.lp_top,
&mut ir_plan.lp_arena,
&mut ir_plan.expr_arena,
)
.map_err(PyPolarsErr::from)?;

// Convert to physical plan.
let mut physical_plan =
create_physical_plan(ir_plan.lp_top, &mut ir_plan.lp_arena, &ir_plan.expr_arena)
.map_err(PyPolarsErr::from)?;

// Execute the plan.
let mut state = ExecutionState::new();
let df = physical_plan
.execute(&mut state)
.map_err(PyPolarsErr::from)?;

Ok(df.into())
}

/// Prepare the IR for execution by the Polars GPU engine.
fn gpu_post_opt(
py: Python,
root: Node,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<()> {
// Get cuDF Python function.
let cudf = PyModule::import_bound(py, intern!(py, "cudf_polars")).unwrap();
let lambda = cudf.getattr(intern!(py, "execute_with_cudf")).unwrap();

// Define cuDF config.
let polars = PyModule::import_bound(py, intern!(py, "polars")).unwrap();
let engine = polars.getattr(intern!(py, "GPUEngine")).unwrap();
let kwargs = [("raise_on_fail", true)].into_py_dict_bound(py);
let engine = engine.call((), Some(&kwargs)).unwrap();

// Define node traverser.
let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));

// Get a copy of the arenas.
let arenas = nt.get_arenas();

// Pass the node visitor which allows the Python callback to replace parts of the query plan.
// Remove "cuda" or specify better once we have multiple post-opt callbacks.
let kwargs = [("config", engine)].into_py_dict_bound(py);
lambda
.call((nt,), Some(&kwargs))
.map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;

// Unpack the arena's.
// At this point the `nt` is useless.
std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());

Ok(())
}
3 changes: 3 additions & 0 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,9 @@ fn polars(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
#[cfg(feature = "polars_cloud")]
m.add_wrapped(wrap_pyfunction!(cloud::prepare_cloud_plan))
.unwrap();
#[cfg(feature = "polars_cloud")]
m.add_wrapped(wrap_pyfunction!(cloud::_execute_ir_plan_with_gpu))
.unwrap();

// Build info
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
Expand Down
Loading