Skip to content

Commit

Permalink
feat: Add functionality for supporting the GPU engine on Polars Cloud (
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego authored Oct 22, 2024
1 parent 94a58ff commit b9fd730
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 4 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/polars-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ description = "Enable running Polars workloads in Python"
[dependencies]
polars-core = { workspace = true, features = ["python"] }
polars-error = { workspace = true }
polars-expr = { workspace = true }
polars-io = { workspace = true }
polars-lazy = { workspace = true, features = ["python"] }
polars-mem-engine = { workspace = true }
polars-ops = { workspace = true, features = ["bitwise"] }
polars-parquet = { workspace = true, optional = true }
polars-plan = { workspace = true }
Expand Down Expand Up @@ -231,7 +233,7 @@ optimizations = [
"streaming",
]

polars_cloud = ["polars/polars_cloud"]
polars_cloud = ["polars/polars_cloud", "polars/ir_serde"]

# also includes simd
nightly = ["polars/nightly"]
Expand Down
88 changes: 85 additions & 3 deletions crates/polars-python/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use std::io::Cursor;

use polars_core::error::{polars_err, to_compute_err, PolarsResult};
use polars_expr::state::ExecutionState;
use polars_mem_engine::create_physical_plan;
use polars_plan::plans::{AExpr, IRPlan, IR};
use polars_plan::prelude::{Arena, Node};
use pyo3::intern;
use pyo3::prelude::{PyAnyMethods, PyModule, Python, *};
use pyo3::types::{IntoPyDict, PyBytes};

use crate::error::PyPolarsErr;
use crate::PyLazyFrame;
use crate::lazyframe::visit::NodeTraverser;
use crate::{PyDataFrame, PyLazyFrame};

#[pyfunction]
pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult<PyObject> {
Expand All @@ -11,3 +20,76 @@ pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult<PyObject> {

Ok(PyBytes::new_bound(py, &bytes).to_object(py))
}

/// Take a serialized `IRPlan` and execute it on the GPU engine.
///
/// This is done as a Python function because the `NodeTraverser` class created for this purpose
/// must exactly match the one expected by the `cudf_polars` package.
#[pyfunction]
pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec<u8>, py: Python) -> PyResult<PyDataFrame> {
// Deserialize into IRPlan.
let reader = Cursor::new(ir_plan_ser);
let mut ir_plan = ciborium::from_reader::<IRPlan, _>(reader)
.map_err(to_compute_err)
.map_err(PyPolarsErr::from)?;

// Edit for use with GPU engine.
gpu_post_opt(
py,
ir_plan.lp_top,
&mut ir_plan.lp_arena,
&mut ir_plan.expr_arena,
)
.map_err(PyPolarsErr::from)?;

// Convert to physical plan.
let mut physical_plan =
create_physical_plan(ir_plan.lp_top, &mut ir_plan.lp_arena, &ir_plan.expr_arena)
.map_err(PyPolarsErr::from)?;

// Execute the plan.
let mut state = ExecutionState::new();
let df = physical_plan
.execute(&mut state)
.map_err(PyPolarsErr::from)?;

Ok(df.into())
}

/// Prepare the IR for execution by the Polars GPU engine.
fn gpu_post_opt(
py: Python,
root: Node,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<()> {
// Get cuDF Python function.
let cudf = PyModule::import_bound(py, intern!(py, "cudf_polars")).unwrap();
let lambda = cudf.getattr(intern!(py, "execute_with_cudf")).unwrap();

// Define cuDF config.
let polars = PyModule::import_bound(py, intern!(py, "polars")).unwrap();
let engine = polars.getattr(intern!(py, "GPUEngine")).unwrap();
let kwargs = [("raise_on_fail", true)].into_py_dict_bound(py);
let engine = engine.call((), Some(&kwargs)).unwrap();

// Define node traverser.
let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));

// Get a copy of the arenas.
let arenas = nt.get_arenas();

// Pass the node visitor which allows the Python callback to replace parts of the query plan.
// Remove "cuda" or specify better once we have multiple post-opt callbacks.
let kwargs = [("config", engine)].into_py_dict_bound(py);
lambda
.call((nt,), Some(&kwargs))
.map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;

// Unpack the arena's.
// At this point the `nt` is useless.
std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());

Ok(())
}
3 changes: 3 additions & 0 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,9 @@ fn polars(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
#[cfg(feature = "polars_cloud")]
m.add_wrapped(wrap_pyfunction!(cloud::prepare_cloud_plan))
.unwrap();
#[cfg(feature = "polars_cloud")]
m.add_wrapped(wrap_pyfunction!(cloud::_execute_ir_plan_with_gpu))
.unwrap();

// Build info
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
Expand Down

0 comments on commit b9fd730

Please sign in to comment.