Skip to content

Commit

Permalink
Update to execute on the Python side
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Oct 21, 2024
1 parent 290675c commit 7047891
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/polars-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ description = "Enable running Polars workloads in Python"
[dependencies]
polars-core = { workspace = true, features = ["python"] }
polars-error = { workspace = true }
polars-expr = { workspace = true }
polars-io = { workspace = true }
polars-lazy = { workspace = true, features = ["python"] }
polars-mem-engine = { workspace = true }
polars-ops = { workspace = true, features = ["bitwise"] }
polars-parquet = { workspace = true, optional = true }
polars-plan = { workspace = true }
Expand Down
23 changes: 15 additions & 8 deletions crates/polars-python/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::io::Cursor;

use polars_core::error::{polars_err, to_compute_err, PolarsResult};
use polars_expr::state::ExecutionState;
use polars_mem_engine::create_physical_plan;
use polars_plan::plans::{AExpr, IRPlan, IR};
use polars_plan::prelude::{Arena, Node};
use pyo3::intern;
Expand All @@ -9,7 +11,7 @@ use pyo3::types::{IntoPyDict, PyBytes};

use crate::error::PyPolarsErr;
use crate::lazyframe::visit::NodeTraverser;
use crate::PyLazyFrame;
use crate::{PyDataFrame, PyLazyFrame};

#[pyfunction]
pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult<PyObject> {
Expand All @@ -24,7 +26,7 @@ pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python) -> PyResult<PyObject> {
/// This is done as a Python function because the `NodeTraverser` class created for this purpose
/// must exactly match the one expected by the `cudf_polars` package.
#[pyfunction]
pub fn _update_ir_plan_for_gpu(ir_plan_ser: Vec<u8>, py: Python) -> PyResult<PyObject> {
pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec<u8>, py: Python) -> PyResult<PyDataFrame> {
eprintln!("Update function called");

// Deserialize into IRPlan.
Expand All @@ -46,15 +48,20 @@ pub fn _update_ir_plan_for_gpu(ir_plan_ser: Vec<u8>, py: Python) -> PyResult<PyO

eprintln!("Updated");

// Serialize the result.
let mut writer = Vec::new();
ciborium::into_writer(&ir_plan, &mut writer)
.map_err(to_compute_err)
let mut physical_plan =
create_physical_plan(ir_plan.lp_top, &mut ir_plan.lp_arena, &ir_plan.expr_arena)
.map_err(PyPolarsErr::from)?;

eprintln!("Physical plan created");

let mut state = ExecutionState::new();
let df = physical_plan
.execute(&mut state)
.map_err(PyPolarsErr::from)?;

eprintln!("Re-serialized");
eprintln!("Executed");

Ok(PyBytes::new_bound(py, &writer).to_object(py))
Ok(df.into())
}

/// Prepare the IR for execution by the Polars GPU engine.
Expand Down
2 changes: 1 addition & 1 deletion py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ fn polars(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(cloud::prepare_cloud_plan))
.unwrap();
#[cfg(feature = "polars_cloud")]
m.add_wrapped(wrap_pyfunction!(cloud::_update_ir_plan_for_gpu))
m.add_wrapped(wrap_pyfunction!(cloud::_execute_ir_plan_with_gpu))
.unwrap();

// Build info
Expand Down

0 comments on commit 7047891

Please sign in to comment.