Skip to content

Commit c527c3e

Browse files
dharanadfindepi
authored andcommitted
Implement user defined planner for create_struct & create_named_struct (apache#11273)
* add UserDefinedSQLPlanner for create struct * fix linting * add create name struct user defined sql planner * simplify * refactor * refactor * remove named_struct from functions * formatting * revert 953ad31 * update docs
1 parent 42944c5 commit c527c3e

File tree

4 files changed

+75
-56
lines changed

4 files changed

+75
-56
lines changed

datafusion/expr/src/planner.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,21 @@ pub trait UserDefinedSQLPlanner: Send + Sync {
139139
fn plan_extract(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
140140
Ok(PlannerResult::Original(args))
141141
}
142+
143+
/// Plans a struct `struct(expression1[, ..., expression_n])`
144+
/// literal based on the given input expressions.
145+
/// This function takes a vector of expressions and a boolean flag indicating whether
146+
/// the struct uses the optional name
147+
///
148+
/// Returns a `PlannerResult` containing either the planned struct expressions or the original
149+
/// input expressions if planning is not possible.
150+
fn plan_struct_literal(
151+
&self,
152+
args: Vec<Expr>,
153+
_is_named_struct: bool,
154+
) -> Result<PlannerResult<Vec<Expr>>> {
155+
Ok(PlannerResult::Original(args))
156+
}
142157
}
143158

144159
/// An operator with two arguments to plan

datafusion/functions/src/core/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
9393
nvl(),
9494
nvl2(),
9595
arrow_typeof(),
96-
r#struct(),
9796
named_struct(),
9897
get_field(),
9998
coalesce(),

datafusion/functions/src/core/planner.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
use datafusion_common::DFSchema;
1919
use datafusion_common::Result;
20+
use datafusion_expr::expr::ScalarFunction;
2021
use datafusion_expr::planner::{PlannerResult, RawDictionaryExpr, UserDefinedSQLPlanner};
22+
use datafusion_expr::Expr;
2123

2224
use super::named_struct;
2325

@@ -37,4 +39,21 @@ impl UserDefinedSQLPlanner for CoreFunctionPlanner {
3739
}
3840
Ok(PlannerResult::Planned(named_struct().call(args)))
3941
}
42+
43+
fn plan_struct_literal(
44+
&self,
45+
args: Vec<Expr>,
46+
is_named_struct: bool,
47+
) -> Result<PlannerResult<Vec<Expr>>> {
48+
Ok(PlannerResult::Planned(Expr::ScalarFunction(
49+
ScalarFunction::new_udf(
50+
if is_named_struct {
51+
crate::core::named_struct()
52+
} else {
53+
crate::core::r#struct()
54+
},
55+
args,
56+
),
57+
)))
58+
}
4059
}

datafusion/sql/src/expr/mod.rs

Lines changed: 41 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use datafusion_expr::planner::PlannerResult;
2121
use datafusion_expr::planner::RawDictionaryExpr;
2222
use datafusion_expr::planner::RawFieldAccessExpr;
2323
use sqlparser::ast::{
24-
CastKind, DictionaryField, Expr as SQLExpr, Subscript, TrimWhereField, Value,
24+
CastKind, DictionaryField, Expr as SQLExpr, StructField, Subscript, TrimWhereField,
25+
Value,
2526
};
2627

2728
use datafusion_common::{
@@ -597,7 +598,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
597598
}
598599

599600
SQLExpr::Struct { values, fields } => {
600-
self.parse_struct(values, fields, schema, planner_context)
601+
self.parse_struct(schema, planner_context, values, fields)
601602
}
602603
SQLExpr::Position { expr, r#in } => {
603604
self.sql_position_to_expr(*expr, *r#in, schema, planner_context)
@@ -629,6 +630,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
629630
}
630631
}
631632

633+
/// Parses a struct(..) expression and plans it creation
634+
fn parse_struct(
635+
&self,
636+
schema: &DFSchema,
637+
planner_context: &mut PlannerContext,
638+
values: Vec<sqlparser::ast::Expr>,
639+
fields: Vec<StructField>,
640+
) -> Result<Expr> {
641+
if !fields.is_empty() {
642+
return not_impl_err!("Struct fields are not supported yet");
643+
}
644+
let is_named_struct = values
645+
.iter()
646+
.any(|value| matches!(value, SQLExpr::Named { .. }));
647+
648+
let mut create_struct_args = if is_named_struct {
649+
self.create_named_struct_expr(values, schema, planner_context)?
650+
} else {
651+
self.create_struct_expr(values, schema, planner_context)?
652+
};
653+
654+
for planner in self.planners.iter() {
655+
match planner.plan_struct_literal(create_struct_args, is_named_struct)? {
656+
PlannerResult::Planned(expr) => return Ok(expr),
657+
PlannerResult::Original(args) => create_struct_args = args,
658+
}
659+
}
660+
not_impl_err!("Struct not supported by UserDefinedExtensionPlanners: {create_struct_args:?}")
661+
}
662+
632663
fn sql_position_to_expr(
633664
&self,
634665
substr_expr: SQLExpr,
@@ -683,37 +714,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
683714
not_impl_err!("Unsupported dictionary literal: {raw_expr:?}")
684715
}
685716

686-
/// Parses a struct(..) expression
687-
fn parse_struct(
688-
&self,
689-
values: Vec<SQLExpr>,
690-
fields: Vec<sqlparser::ast::StructField>,
691-
input_schema: &DFSchema,
692-
planner_context: &mut PlannerContext,
693-
) -> Result<Expr> {
694-
if !fields.is_empty() {
695-
return not_impl_err!("Struct fields are not supported yet");
696-
}
697-
698-
if values
699-
.iter()
700-
.any(|value| matches!(value, SQLExpr::Named { .. }))
701-
{
702-
self.create_named_struct(values, input_schema, planner_context)
703-
} else {
704-
self.create_struct(values, input_schema, planner_context)
705-
}
706-
}
707-
708717
// Handles a call to struct(...) where the arguments are named. For example
709718
// `struct (v as foo, v2 as bar)` by creating a call to the `named_struct` function
710-
fn create_named_struct(
719+
fn create_named_struct_expr(
711720
&self,
712721
values: Vec<SQLExpr>,
713722
input_schema: &DFSchema,
714723
planner_context: &mut PlannerContext,
715-
) -> Result<Expr> {
716-
let args = values
724+
) -> Result<Vec<Expr>> {
725+
Ok(values
717726
.into_iter()
718727
.enumerate()
719728
.map(|(i, value)| {
@@ -742,47 +751,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
742751
.collect::<Result<Vec<_>>>()?
743752
.into_iter()
744753
.flatten()
745-
.collect();
746-
747-
let named_struct_func = self
748-
.context_provider
749-
.get_function_meta("named_struct")
750-
.ok_or_else(|| {
751-
internal_datafusion_err!("Unable to find expected 'named_struct' function")
752-
})?;
753-
754-
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
755-
named_struct_func,
756-
args,
757-
)))
754+
.collect())
758755
}
759756

760757
// Handles a call to struct(...) where the arguments are not named. For example
761758
// `struct (v, v2)` by creating a call to the `struct` function
762759
// which will create a struct with fields named `c0`, `c1`, etc.
763-
fn create_struct(
760+
fn create_struct_expr(
764761
&self,
765762
values: Vec<SQLExpr>,
766763
input_schema: &DFSchema,
767764
planner_context: &mut PlannerContext,
768-
) -> Result<Expr> {
769-
let args = values
765+
) -> Result<Vec<Expr>> {
766+
values
770767
.into_iter()
771768
.map(|value| {
772769
self.sql_expr_to_logical_expr(value, input_schema, planner_context)
773770
})
774-
.collect::<Result<Vec<_>>>()?;
775-
let struct_func = self
776-
.context_provider
777-
.get_function_meta("struct")
778-
.ok_or_else(|| {
779-
internal_datafusion_err!("Unable to find expected 'struct' function")
780-
})?;
781-
782-
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
783-
struct_func,
784-
args,
785-
)))
771+
.collect::<Result<Vec<_>>>()
786772
}
787773

788774
fn sql_in_list_to_expr(

0 commit comments

Comments
 (0)