Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed Sep 1, 2023
1 parent a025fef commit 72273a5
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 337 deletions.
1 change: 0 additions & 1 deletion datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3183,7 +3183,6 @@ mod tests {
use std::cmp::Ordering;
use std::sync::Arc;

use crate::Result;
use arrow::compute::kernels;
use arrow::compute::{concat, is_null};
use arrow::datatypes::ArrowPrimitiveType;
Expand Down
13 changes: 6 additions & 7 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use crate::datasource::source_as_provider;
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
use crate::logical_expr::{
Aggregate, EmptyRelation, Join, Projection, Sort, SubqueryAlias, TableScan, Unnest,
Window,
Aggregate, EmptyRelation, Join, Projection, Sort, SubqueryAlias, TableScan,
Unnest as UnnestPlan, Window,
};
use crate::logical_expr::{
CrossJoin, Expr, LogicalPlan, Partitioning as LogicalPartitioning, PlanType,
Expand Down Expand Up @@ -79,8 +79,9 @@ use datafusion_common::{
use datafusion_expr::expr::{
self, AggregateFunction, AggregateUDF, Alias, Between, BinaryExpr, Cast,
GetFieldAccess, GetIndexedField, GroupingSet, InList, Like, ScalarUDF, TryCast,
Unnest as UnnestExpr, WindowFunction,
Unnest, WindowFunction,
};

use datafusion_expr::expr_rewriter::{unalias, unnormalize_cols};
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{DmlStatement, StringifiedPlan, WriteOp};
Expand Down Expand Up @@ -212,7 +213,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {

Ok(name)
}
Expr::Unnest(UnnestExpr { array_exprs, .. }) => {
Expr::Unnest(Unnest { array_exprs, .. }) => {
create_function_physical_name("unnest", false, array_exprs)
}
Expr::ScalarFunction(func) => {
Expand Down Expand Up @@ -443,11 +444,9 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
match self.handle_explain(logical_plan, session_state).await? {
Some(plan) => Ok(plan),
None => {
// println!("create init plan");
let plan = self
.create_initial_plan(logical_plan, session_state)
.await?;
// println!("create init plan done");
self.optimize_internal(plan, session_state, |_, _| {})
}
}
Expand Down Expand Up @@ -1217,7 +1216,7 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::Unnest(Unnest { input, column, schema, options }) => {
LogicalPlan::Unnest(UnnestPlan { input, column, schema, options }) => {
let input = self.create_initial_plan(input, session_state).await?;
let column_exec = schema.index_of_column(column)
.map(|idx| Column::new(&column.name, idx))?;
Expand Down
14 changes: 6 additions & 8 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,10 +1177,9 @@ impl fmt::Display for Expr {
write!(f, " NULLS LAST")
}
}
Expr::Unnest(Unnest {
array_exprs,
..
}) => fmt_function(f, "unnest", false, array_exprs, false),
Expr::Unnest(Unnest { array_exprs, .. }) => {
fmt_function(f, "unnest", false, array_exprs, false)
}
Expr::ScalarFunction(func) => {
fmt_function(f, &func.fun.to_string(), false, &func.args, true)
}
Expand Down Expand Up @@ -1514,10 +1513,9 @@ fn create_name(e: &Expr) -> Result<String> {
}
}
}
Expr::Unnest(Unnest {
array_exprs,
..
}) => create_function_name("unnest", false, array_exprs),
Expr::Unnest(Unnest { array_exprs, .. }) => {
create_function_name("unnest", false, array_exprs)
}
Expr::ScalarFunction(func) => {
create_function_name(&func.fun.to_string(), false, &func.args)
}
Expand Down
5 changes: 1 addition & 4 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ impl ExprSchemable for Expr {
.collect::<Result<Vec<_>>>()?;
Ok((fun.return_type)(&data_types)?.as_ref().clone())
}
Expr::Unnest(Unnest {
array_exprs,
..
}) => {
Expr::Unnest(Unnest { array_exprs, .. }) => {
let data_types = array_exprs
.iter()
.map(|e| e.get_type(schema))
Expand Down
20 changes: 10 additions & 10 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::expr_rewriter::{
use crate::type_coercion::binary::comparison_coercion;
use crate::utils::{columnize_expr, compare_sort_expr};
use crate::{
and, binary_expr, expr, BuiltInWindowFunction, DmlStatement, Operator,
WindowFrame, WriteOp,
and, binary_expr, expr, BuiltInWindowFunction, DmlStatement, Operator, WindowFrame,
WriteOp,
};
use crate::{col, ident, WindowFunction};
use crate::{
Expand Down Expand Up @@ -1173,10 +1173,10 @@ impl LogicalPlanBuilder {
pub fn unnest_arrays(
self,
array_exprs: Vec<Expr>,
options: UnnestOptions,
array_options: Vec<UnnestOptions>,
) -> Result<Self> {
let (unnest_plans, columns_name) =
build_unnest_plans(self.plan.clone(), array_exprs.clone(), options.clone())?;
build_unnest_plans(self.plan.clone(), array_exprs.clone(), array_options)?;

Self::join_unnest_plans(unnest_plans, columns_name)
}
Expand Down Expand Up @@ -1626,23 +1626,23 @@ pub fn unnest_with_options(
fn build_unnest_plans(
input: LogicalPlan,
array_exprs: Vec<Expr>,
options: UnnestOptions,
array_options: Vec<UnnestOptions>,
) -> Result<(Vec<LogicalPlan>, Vec<String>)> {
let array_exprs = align_arrays_with_nulls(&array_exprs)?;
let array_expression = align_arrays_with_nulls(&array_exprs)?;

let project_plan_builder =
LogicalPlanBuilder::from(input.clone()).project(array_exprs.clone())?;
LogicalPlanBuilder::from(input.clone()).project(array_expression.clone())?;

let mut unnest_plans = vec![];
let mut columns_name = vec![];

for array_expr in array_exprs.iter() {
let column = array_expr.display_name()?;
for (expr, options) in array_expression.iter().zip(array_options.into_iter()) {
let column = expr.display_name()?;
columns_name.push(column.clone());

let unnest_plan = project_plan_builder
.clone()
.unnest_column_with_options(column, options.clone())?
.unnest_column_with_options(column, options)?
.build()?;
unnest_plans.push(unnest_plan);
}
Expand Down
8 changes: 6 additions & 2 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use arrow::datatypes::{
DataType, Field, IntervalMonthDayNanoType, IntervalUnit, Schema, SchemaRef, TimeUnit,
UnionMode,
};
use datafusion_common::{Column, DFField, DFSchemaRef, OwnedTableReference, ScalarValue};
use datafusion_common::{
not_impl_err, Column, DFField, DFSchemaRef, OwnedTableReference, ScalarValue,
};
use datafusion_expr::expr::{
self, Alias, Between, BinaryExpr, Cast, GetFieldAccess, GetIndexedField, GroupingSet,
InList, Like, Placeholder, ScalarFunction, ScalarUDF, Sort,
Expand Down Expand Up @@ -741,7 +743,9 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
.to_string(),
))
}
Expr::Unnest(..) => todo!("Unnest not supported"),
Expr::Unnest(..) => {
return not_impl_err!("try_from() for Unnest is not implemented")
}
Expr::ScalarFunction(ScalarFunction { fun, args }) => {
let fun: protobuf::ScalarFunction = fun.try_into()?;
let args: Vec<Self> = args
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Each `WITH` block can change the column names in the last
// projection (e.g. "WITH table(t1, t2) AS SELECT 1, 2").
let logical_plan = self.apply_table_alias(logical_plan, cte.alias)?;

planner_context.insert_cte(cte_name, logical_plan);
}
}
Expand Down
10 changes: 7 additions & 3 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
self.plan_table_with_joins(*table_with_joins, planner_context)?,
alias,
),

// TODO: Support UnnestOptions
TableFactor::UNNEST {
alias,
array_exprs,
with_offset: _,
with_offset_alias: _,
} => {
let options: UnnestOptions = Default::default();
// TODO: Support UnnestOptions
let array_options: Vec<UnnestOptions> =
vec![UnnestOptions::default(); array_exprs.len()];

// If column aliases are not supplied, then for a function returning a base data type,
// the column name is also the same as the function name.
Expand All @@ -77,12 +81,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
alias.columns = vec![alias.name.clone()];
}
(
self.plan_unnest(array_exprs, planner_context, options)?,
self.plan_unnest(array_exprs, planner_context, array_options)?,
Some(alias),
)
} else {
(
self.plan_unnest(array_exprs, planner_context, options)?,
self.plan_unnest(array_exprs, planner_context, array_options)?,
None,
)
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/relation/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
array_exprs: Vec<SQLExpr>,
planner_context: &mut PlannerContext,
options: UnnestOptions,
array_options: Vec<UnnestOptions>,
) -> Result<LogicalPlan> {
// No pre-defiend schema for Unnest
let schema = DFSchema::empty();
Expand All @@ -40,7 +40,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = LogicalPlanBuilder::empty(true).build()?;

LogicalPlanBuilder::from(plan)
.unnest_arrays(exprs, options)?
.unnest_arrays(exprs, array_options)?
.build()
}
}
18 changes: 8 additions & 10 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::collections::HashSet;
use std::sync::Arc;
use std::vec;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::utils::{
Expand All @@ -25,10 +26,9 @@ use crate::utils::{
};

use datafusion_common::{
get_target_functional_dependencies, not_impl_err, plan_err, DFSchemaRef,
get_target_functional_dependencies, not_impl_err, plan_err, Column, DFSchemaRef,
DataFusionError, Result,
};
use datafusion_common::{Column, UnnestOptions};
use datafusion_expr::expr::{Alias, Unnest};
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check,
Expand Down Expand Up @@ -318,18 +318,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
empty_from: bool,
planner_context: &mut PlannerContext,
) -> Result<Vec<Expr>> {
let select_exprs = projection
projection
.into_iter()
.map(|expr| self.sql_select_to_rex(expr, plan, empty_from, planner_context))
.flat_map(|result| match result {
Ok(vec) => vec.into_iter().map(Ok).collect(),
Err(err) => vec![Err(err)],
})
.collect::<Result<Vec<Expr>>>();

// TODO: Align unnest expr here

select_exprs
.collect::<Result<Vec<Expr>>>()
}

/// Generate a relational expression from a select SQL expression
Expand Down Expand Up @@ -688,6 +684,7 @@ fn get_updated_group_by_exprs(

// convert Expr::Unnest to LogicalPlan::Unnest
fn process_unnest_expr(input: LogicalPlan, select_exprs: &[Expr]) -> Result<LogicalPlan> {
let mut array_options = vec![];
let mut array_exprs_to_unnest = vec![];
for expr in select_exprs.iter() {
if let Expr::Unnest(Unnest {
Expand All @@ -696,22 +693,23 @@ fn process_unnest_expr(input: LogicalPlan, select_exprs: &[Expr]) -> Result<Logi
}) = expr
{
array_exprs_to_unnest.push(array_exprs[0].clone());
array_options.push(options.clone());
} else if let Expr::Alias(Alias { expr, .. }) = expr {
if let Expr::Unnest(Unnest {
array_exprs,
options,
}) = expr.as_ref()
{
array_exprs_to_unnest.push(array_exprs[0].clone());
array_options.push(options.clone());
}
}
}
if array_exprs_to_unnest.is_empty() {
Ok(input)
} else {
let options: UnnestOptions = Default::default();
LogicalPlanBuilder::from(input)
.unnest_arrays(array_exprs_to_unnest, options)?
.unnest_arrays(array_exprs_to_unnest, array_options)?
.build()
}
}
Loading

0 comments on commit 72273a5

Please sign in to comment.