Skip to content

Commit

Permalink
Removing datafusion-python as a rust dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
timsaucer committed Oct 4, 2024
1 parent 3990555 commit 6ff8c7c
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 775 deletions.
891 changes: 178 additions & 713 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ rust-version = "1.62"
build = "build.rs"

[dependencies]
datafusion = { version = "41.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "41.0.0"
datafusion-python = "41.0.0"
datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "42.0.0"
futures = "0.3"
glob = "0.3"
log = "0.4"
prost = "0.12"
prost-types = "0.12"
pyo3 = { version = "0.21", features = ["extension-module", "abi3", "abi3-py38"] }
prost = "0.13.1"
prost-types = "0.13.1"
pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] }
tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread", "sync"] }
uuid = "1.2"

Expand Down
8 changes: 4 additions & 4 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ fn main() -> Result<(), String> {

// We don't include the proto files in releases so that downstreams
// do not need to have PROTOC included
if Path::new("src/proto/datafusion-ray.proto").exists() {
if Path::new("src/proto/datafusion_ray.proto").exists() {
println!("cargo:rerun-if-changed=src/proto/datafusion.proto");
println!("cargo:rerun-if-changed=src/proto/datafusion-ray.proto");
println!("cargo:rerun-if-changed=src/proto/datafusion_ray.proto");
tonic_build::configure()
.extern_path(".datafusion", "::datafusion_proto::protobuf")
.compile(&["src/proto/datafusion-ray.proto"], &["src/proto"])
.compile(&["src/proto/datafusion_ray.proto"], &["src/proto"])
.map_err(|e| format!("protobuf compilation failed: {e}"))?;
let generated_source_path = out.join("datafusion-ray.protobuf.rs");
let generated_source_path = out.join("datafusion_ray.protobuf.rs");
let code = std::fs::read_to_string(generated_source_path).unwrap();
let mut file = std::fs::OpenOptions::new()
.write(true)
Expand Down
4 changes: 2 additions & 2 deletions datafusion_ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
ExecutionGraph,
QueryStage,
execute_partition,
serialize_execution_plan,
deserialize_execution_plan,
# serialize_execution_plan,
# deserialize_execution_plan,
)
from .context import DatafusionRayContext

Expand Down
8 changes: 5 additions & 3 deletions datafusion_ray/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
# specific language governing permissions and limitations
# under the License.

import pytest
from datafusion_ray import Context
from datafusion import SessionContext


def test():
ctx = Context(1, False)
ctx.register_csv('tips', 'examples/tips.csv', True)
df_ctx = SessionContext()
ctx = Context(df_ctx, False)
df_ctx.register_csv("tips", "examples/tips.csv", has_header=True)
ctx.plan("SELECT * FROM tips")
84 changes: 47 additions & 37 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use datafusion_proto::bytes::{
};
use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec};
use datafusion_proto::protobuf;
use datafusion_python::physical_plan::PyExecutionPlan;
use futures::StreamExt;
use prost::{DecodeError, Message};
use pyo3::exceptions::PyRuntimeError;
Expand All @@ -56,6 +55,26 @@ pub struct PyContext {
use_ray_shuffle: bool,
}

pub(crate) fn execution_plan_from_pyany(
py_plan: &Bound<PyAny>,
) -> PyResult<Arc<dyn ExecutionPlan>> {
let py_proto = py_plan.call_method0("to_proto")?;
let plan_bytes: &[u8] = py_proto.extract()?;
let plan_node = protobuf::PhysicalPlanNode::try_decode(plan_bytes).map_err(|e| {
PyRuntimeError::new_err(format!(
"Unable to decode physical plan protobuf message: {}",
e
))
})?;

let codec = DefaultPhysicalExtensionCodec {};
let runtime = RuntimeEnv::default();
let registry = SessionContext::new();
plan_node
.try_into_physical_plan(&registry, &runtime, &codec)
.map_err(|e| e.into())
}

#[pymethods]
impl PyContext {
#[new]
Expand Down Expand Up @@ -117,20 +136,9 @@ impl PyContext {
// let df = wait_for_future(py, self.ctx.sql(sql))?;
let py_df = self.run_sql(sql, py)?;
let py_plan = py_df.call_method0(py, "execution_plan")?;
let py_proto = py_plan.call_method0(py, "to_proto")?;
let plan_bytes: &[u8] = py_proto.extract(py)?;
let plan_node = protobuf::PhysicalPlanNode::decode(plan_bytes).map_err(|e| {
PyRuntimeError::new_err(format!(
"Unable to decode physical plan protobuf message: {}",
e
))
})?;

let codec = DefaultPhysicalExtensionCodec {};
let runtime = RuntimeEnv::default();
let registry = SessionContext::new();
let plan = plan_node.try_into_physical_plan(&registry, &runtime, &codec)?;
let py_plan = py_plan.bind(py);

let plan = execution_plan_from_pyany(py_plan)?;
let graph = make_execution_graph(plan.clone(), self.use_ray_shuffle)?;

// debug logging
Expand All @@ -150,7 +158,7 @@ impl PyContext {
/// Execute a partition of a query plan. This will typically be executing a shuffle write and write the results to disk
pub fn execute_partition(
&self,
plan: PyExecutionPlan,
plan: &Bound<'_, PyAny>,
part: usize,
inputs: PyObject,
py: Python,
Expand All @@ -161,7 +169,7 @@ impl PyContext {

#[pyfunction]
pub fn execute_partition(
plan: PyExecutionPlan,
plan: &Bound<'_, PyAny>,
part: usize,
inputs: PyObject,
py: Python,
Expand All @@ -174,25 +182,25 @@ pub fn execute_partition(
}

// TODO(@lsf) change this to use pickle
#[pyfunction]
pub fn serialize_execution_plan(plan: PyExecutionPlan) -> PyResult<Vec<u8>> {
let codec = ShuffleCodec {};
Ok(physical_plan_to_bytes_with_extension_codec(plan.plan, &codec)?.to_vec())
}
// #[pyfunction]
// pub fn serialize_execution_plan(plan: Py<PyAny>) -> PyResult<Vec<u8>> {
// let codec = ShuffleCodec {};
// Ok(physical_plan_to_bytes_with_extension_codec(plan.plan, &codec)?.to_vec())
// }

#[pyfunction]
pub fn deserialize_execution_plan(bytes: Vec<u8>) -> PyResult<PyExecutionPlan> {
let ctx = SessionContext::new();
let codec = ShuffleCodec {};
Ok(PyExecutionPlan::new(
physical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?,
))
}
// #[pyfunction]
// pub fn deserialize_execution_plan(bytes: Vec<u8>) -> PyResult<PyExecutionPlan> {
// let ctx = SessionContext::new();
// let codec = ShuffleCodec {};
// Ok(PyExecutionPlan::new(
// physical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?,
// ))
// }

/// Iterate down an ExecutionPlan and set the input objects for RayShuffleReaderExec.
fn _set_inputs_for_ray_shuffle_reader(
plan: Arc<dyn ExecutionPlan>,
input_partitions: &PyList,
input_partitions: &Bound<'_, PyList>,
) -> Result<()> {
if let Some(reader_exec) = plan.as_any().downcast_ref::<RayShuffleReaderExec>() {
let exec_stage_id = reader_exec.stage_id;
Expand All @@ -218,8 +226,8 @@ fn _set_inputs_for_ray_shuffle_reader(
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?
.extract::<usize>()
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
let batch = RecordBatch::from_pyarrow(
pytuple
let batch = RecordBatch::from_pyarrow_bound(
&pytuple
.get_item(2)
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?,
)
Expand All @@ -238,7 +246,7 @@ fn _set_inputs_for_ray_shuffle_reader(
/// write the results to disk, except for the final query stage, which will return the data.
/// inputs is a list of tuples of (stage_id, partition_id, bytes) for each input partition.
fn _execute_partition(
plan: PyExecutionPlan,
py_plan: &Bound<'_, PyAny>,
part: usize,
inputs: PyObject,
) -> Result<Vec<RecordBatch>> {
Expand All @@ -251,19 +259,21 @@ fn _execute_partition(
HashMap::new(),
Arc::new(RuntimeEnv::default()),
));

let plan = execution_plan_from_pyany(py_plan)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
Python::with_gil(|py| {
let input_partitions = inputs
.as_ref(py)
.downcast::<PyList>()
.downcast_bound::<PyList>(py)
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
_set_inputs_for_ray_shuffle_reader(plan.plan.clone(), input_partitions)
_set_inputs_for_ray_shuffle_reader(plan.clone(), input_partitions)
})?;

// create a Tokio runtime to run the async code
let rt = Runtime::new().unwrap();

let fut: JoinHandle<Result<Vec<RecordBatch>>> = rt.spawn(async move {
let mut stream = plan.plan.execute(part, ctx)?;
let mut stream = plan.execute(part, ctx)?;
let mut results = vec![];
while let Some(result) = stream.next().await {
results.push(result?);
Expand Down
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ extern crate core;
use pyo3::prelude::*;

mod proto;
use crate::context::{deserialize_execution_plan, execute_partition, serialize_execution_plan};
// use crate::context::{deserialize_execution_plan, execute_partition, serialize_execution_plan};
use crate::context::execute_partition;
pub use proto::generated::protobuf;

pub mod context;
Expand All @@ -37,7 +38,7 @@ fn _datafusion_ray_internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<planner::PyExecutionGraph>()?;
m.add_class::<query_stage::PyQueryStage>()?;
m.add_function(wrap_pyfunction!(execute_partition, m)?)?;
m.add_function(wrap_pyfunction!(serialize_execution_plan, m)?)?;
m.add_function(wrap_pyfunction!(deserialize_execution_plan, m)?)?;
// m.add_function(wrap_pyfunction!(serialize_execution_plan, m)?)?;
// m.add_function(wrap_pyfunction!(deserialize_execution_plan, m)?)?;
Ok(())
}
7 changes: 3 additions & 4 deletions src/query_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use datafusion::error::Result;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use datafusion::prelude::SessionContext;
use datafusion_proto::bytes::physical_plan_from_bytes_with_extension_codec;
use datafusion_python::physical_plan::PyExecutionPlan;
use pyo3::prelude::*;
use std::sync::Arc;

Expand Down Expand Up @@ -51,9 +50,9 @@ impl PyQueryStage {
self.stage.id
}

pub fn get_execution_plan(&self) -> PyExecutionPlan {
PyExecutionPlan::new(self.stage.plan.clone())
}
// pub fn get_execution_plan(&self) -> PyExecutionPlan {
// PyExecutionPlan::new(self.stage.plan.clone())
// }

pub fn get_child_stage_ids(&self) -> Vec<usize> {
self.stage.get_child_stage_ids()
Expand Down
4 changes: 1 addition & 3 deletions src/shuffle/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ fn encode_partitioning_scheme(partitioning: &Partitioning) -> Result<PhysicalHas
Partitioning::Hash(expr, partition_count) => Ok(protobuf::PhysicalHashRepartition {
hash_expr: expr
.iter()
.map(|expr| {
serialize_physical_expr(expr.clone(), &DefaultPhysicalExtensionCodec {})
})
.map(|expr| serialize_physical_expr(expr, &DefaultPhysicalExtensionCodec {}))
.collect::<Result<Vec<_>, DataFusionError>>()?,
partition_count: *partition_count as u64,
}),
Expand Down

0 comments on commit 6ff8c7c

Please sign in to comment.