Skip to content

Reuse bulk serialization helpers for protobuf #12179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 60 additions & 145 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
},
};

use crate::protobuf::{proto_error, FromProtoError, ToProtoError};
use crate::protobuf::{proto_error, ToProtoError};
use arrow::datatypes::{DataType, Schema, SchemaRef};
#[cfg(feature = "parquet")]
use datafusion::datasource::file_format::parquet::ParquetFormat;
Expand Down Expand Up @@ -66,11 +66,10 @@ use datafusion_expr::{
};
use datafusion_expr::{AggregateUDF, Unnest};

use self::to_proto::{serialize_expr, serialize_exprs};
use prost::bytes::BufMut;
use prost::Message;

use self::to_proto::serialize_expr;

pub mod file_formats;
pub mod from_proto;
pub mod to_proto;
Expand Down Expand Up @@ -273,13 +272,7 @@ impl AsLogicalPlan for LogicalPlanNode {
values
.values_list
.chunks_exact(n_cols)
.map(|r| {
r.iter()
.map(|expr| {
from_proto::parse_expr(expr, ctx, extension_codec)
})
.collect::<Result<Vec<_>, FromProtoError>>()
})
.map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| e.into())
}?;
Expand All @@ -288,11 +281,8 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanType::Projection(projection) => {
let input: LogicalPlan =
into_logical_plan!(projection.input, ctx, extension_codec)?;
let expr: Vec<Expr> = projection
.expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<_>, _>>()?;
let expr: Vec<Expr> =
from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;

let new_proj = project(input, expr)?;
match projection.optional_alias.as_ref() {
Expand Down Expand Up @@ -324,26 +314,17 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanType::Window(window) => {
let input: LogicalPlan =
into_logical_plan!(window.input, ctx, extension_codec)?;
let window_expr = window
.window_expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<Expr>, _>>()?;
let window_expr =
from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
LogicalPlanBuilder::from(input).window(window_expr)?.build()
}
LogicalPlanType::Aggregate(aggregate) => {
let input: LogicalPlan =
into_logical_plan!(aggregate.input, ctx, extension_codec)?;
let group_expr = aggregate
.group_expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<Expr>, _>>()?;
let aggr_expr = aggregate
.aggr_expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<Expr>, _>>()?;
let group_expr =
from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
let aggr_expr =
from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
LogicalPlanBuilder::from(input)
.aggregate(group_expr, aggr_expr)?
.build()
Expand All @@ -361,20 +342,16 @@ impl AsLogicalPlan for LogicalPlanNode {
projection = Some(column_indices);
}

let filters = scan
.filters
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<_>, _>>()?;
let filters =
from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;

let mut all_sort_orders = vec![];
for order in &scan.file_sort_order {
let file_sort_order = order
.logical_expr_nodes
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<_>, _>>()?;
all_sort_orders.push(file_sort_order)
all_sort_orders.push(from_proto::parse_exprs(
&order.logical_expr_nodes,
ctx,
extension_codec,
)?)
}

let file_format: Arc<dyn FileFormat> =
Expand Down Expand Up @@ -475,11 +452,8 @@ impl AsLogicalPlan for LogicalPlanNode {
projection = Some(column_indices);
}

let filters = scan
.filters
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<_>, _>>()?;
let filters =
from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;

let table_name =
from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
Expand All @@ -502,11 +476,8 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanType::Sort(sort) => {
let input: LogicalPlan =
into_logical_plan!(sort.input, ctx, extension_codec)?;
let sort_expr: Vec<Expr> = sort
.expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<Expr>, _>>()?;
let sort_expr: Vec<Expr> =
from_proto::parse_exprs(&sort.expr, ctx, extension_codec)?;
LogicalPlanBuilder::from(input).sort(sort_expr)?.build()
}
LogicalPlanType::Repartition(repartition) => {
Expand All @@ -525,12 +496,7 @@ impl AsLogicalPlan for LogicalPlanNode {
hash_expr: pb_hash_expr,
partition_count,
}) => Partitioning::Hash(
pb_hash_expr
.iter()
.map(|expr| {
from_proto::parse_expr(expr, ctx, extension_codec)
})
.collect::<Result<Vec<_>, _>>()?,
from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
*partition_count as usize,
),
PartitionMethod::RoundRobin(partition_count) => {
Expand Down Expand Up @@ -570,12 +536,11 @@ impl AsLogicalPlan for LogicalPlanNode {

let mut order_exprs = vec![];
for expr in &create_extern_table.order_exprs {
let order_expr = expr
.logical_expr_nodes
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<Expr>, _>>()?;
order_exprs.push(order_expr)
order_exprs.push(from_proto::parse_exprs(
&expr.logical_expr_nodes,
ctx,
extension_codec,
)?);
}

let mut column_defaults =
Expand Down Expand Up @@ -693,16 +658,10 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
}
LogicalPlanType::Join(join) => {
let left_keys: Vec<Expr> = join
.left_join_key
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<_>, _>>()?;
let right_keys: Vec<Expr> = join
.right_join_key
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<_>, _>>()?;
let left_keys: Vec<Expr> =
from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
let right_keys: Vec<Expr> =
from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
let join_type =
protobuf::JoinType::try_from(join.join_type).map_err(|_| {
proto_error(format!(
Expand Down Expand Up @@ -804,27 +763,20 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanType::DistinctOn(distinct_on) => {
let input: LogicalPlan =
into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
let on_expr = distinct_on
.on_expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<Expr>, _>>()?;
let select_expr = distinct_on
.select_expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
.collect::<Result<Vec<Expr>, _>>()?;
let on_expr =
from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
let select_expr = from_proto::parse_exprs(
&distinct_on.select_expr,
ctx,
extension_codec,
)?;
let sort_expr = match distinct_on.sort_expr.len() {
0 => None,
_ => Some(
distinct_on
.sort_expr
.iter()
.map(|expr| {
from_proto::parse_expr(expr, ctx, extension_codec)
})
.collect::<Result<Vec<Expr>, _>>()?,
),
_ => Some(from_proto::parse_exprs(
&distinct_on.sort_expr,
ctx,
extension_codec,
)?),
};
LogicalPlanBuilder::from(input)
.distinct_on(on_expr, select_expr, sort_expr)?
Expand Down Expand Up @@ -943,11 +895,8 @@ impl AsLogicalPlan for LogicalPlanNode {
} else {
values[0].len()
} as u64;
let values_list = values
.iter()
.flatten()
.map(|v| serialize_expr(v, extension_codec))
.collect::<Result<Vec<_>, _>>()?;
let values_list =
serialize_exprs(values.iter().flatten(), extension_codec)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Values(
protobuf::ValuesNode {
Expand Down Expand Up @@ -982,10 +931,8 @@ impl AsLogicalPlan for LogicalPlanNode {
};
let schema: protobuf::Schema = schema.as_ref().try_into()?;

let filters: Vec<protobuf::LogicalExprNode> = filters
.iter()
.map(|filter| serialize_expr(filter, extension_codec))
.collect::<Result<Vec<_>, _>>()?;
let filters: Vec<protobuf::LogicalExprNode> =
serialize_exprs(filters, extension_codec)?;

if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
let any = listing_table.options().format.as_any();
Expand Down Expand Up @@ -1037,10 +984,7 @@ impl AsLogicalPlan for LogicalPlanNode {
let mut exprs_vec: Vec<LogicalExprNodeCollection> = vec![];
for order in &options.file_sort_order {
let expr_vec = LogicalExprNodeCollection {
logical_expr_nodes: order
.iter()
.map(|expr| serialize_expr(expr, extension_codec))
.collect::<Result<Vec<_>, ToProtoError>>()?,
logical_expr_nodes: serialize_exprs(order, extension_codec)?,
};
exprs_vec.push(expr_vec);
}
Expand Down Expand Up @@ -1118,10 +1062,7 @@ impl AsLogicalPlan for LogicalPlanNode {
extension_codec,
)?,
)),
expr: expr
.iter()
.map(|expr| serialize_expr(expr, extension_codec))
.collect::<Result<Vec<_>, ToProtoError>>()?,
expr: serialize_exprs(expr, extension_codec)?,
optional_alias: None,
},
))),
Expand Down Expand Up @@ -1173,22 +1114,13 @@ impl AsLogicalPlan for LogicalPlanNode {
)?;
let sort_expr = match sort_expr {
None => vec![],
Some(sort_expr) => sort_expr
.iter()
.map(|expr| serialize_expr(expr, extension_codec))
.collect::<Result<Vec<_>, _>>()?,
Some(sort_expr) => serialize_exprs(sort_expr, extension_codec)?,
};
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
protobuf::DistinctOnNode {
on_expr: on_expr
.iter()
.map(|expr| serialize_expr(expr, extension_codec))
.collect::<Result<Vec<_>, _>>()?,
select_expr: select_expr
.iter()
.map(|expr| serialize_expr(expr, extension_codec))
.collect::<Result<Vec<_>, _>>()?,
on_expr: serialize_exprs(on_expr, extension_codec)?,
select_expr: serialize_exprs(select_expr, extension_codec)?,
sort_expr,
input: Some(Box::new(input)),
},
Expand All @@ -1207,10 +1139,7 @@ impl AsLogicalPlan for LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Window(Box::new(
protobuf::WindowNode {
input: Some(Box::new(input)),
window_expr: window_expr
.iter()
.map(|expr| serialize_expr(expr, extension_codec))
.collect::<Result<Vec<_>, _>>()?,
window_expr: serialize_exprs(window_expr, extension_codec)?,
},
))),
})
Expand All @@ -1230,14 +1159,8 @@ impl AsLogicalPlan for LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
protobuf::AggregateNode {
input: Some(Box::new(input)),
group_expr: group_expr
.iter()
.map(|expr| serialize_expr(expr, extension_codec))
.collect::<Result<Vec<_>, _>>()?,
aggr_expr: aggr_expr
.iter()
.map(|expr| serialize_expr(expr, extension_codec))
.collect::<Result<Vec<_>, _>>()?,
group_expr: serialize_exprs(group_expr, extension_codec)?,
aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
},
))),
})
Expand Down Expand Up @@ -1335,10 +1258,8 @@ impl AsLogicalPlan for LogicalPlanNode {
input.as_ref(),
extension_codec,
)?;
let selection_expr: Vec<protobuf::LogicalExprNode> = expr
.iter()
.map(|expr| serialize_expr(expr, extension_codec))
.collect::<Result<Vec<_>, ToProtoError>>()?;
let selection_expr: Vec<protobuf::LogicalExprNode> =
serialize_exprs(expr, extension_codec)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
protobuf::SortNode {
Expand Down Expand Up @@ -1367,10 +1288,7 @@ impl AsLogicalPlan for LogicalPlanNode {
let pb_partition_method = match partitioning_scheme {
Partitioning::Hash(exprs, partition_count) => {
PartitionMethod::Hash(protobuf::HashRepartition {
hash_expr: exprs
.iter()
.map(|expr| serialize_expr(expr, extension_codec))
.collect::<Result<Vec<_>, ToProtoError>>()?,
hash_expr: serialize_exprs(exprs, extension_codec)?,
partition_count: *partition_count as u64,
})
}
Expand Down Expand Up @@ -1419,10 +1337,7 @@ impl AsLogicalPlan for LogicalPlanNode {
let mut converted_order_exprs: Vec<LogicalExprNodeCollection> = vec![];
for order in order_exprs {
let temp = LogicalExprNodeCollection {
logical_expr_nodes: order
.iter()
.map(|expr| serialize_expr(expr, extension_codec))
.collect::<Result<Vec<_>, ToProtoError>>()?,
logical_expr_nodes: serialize_exprs(order, extension_codec)?,
};
converted_order_exprs.push(temp);
}
Expand Down