Skip to content

Commit dc6fc68

Browse files
JasonLi-cnalamb
authored andcommitted
feat: support unnest in GROUP BY clause (apache#11469)
* feat: support group by unnest * pass slt * refactor: mv process_group_by_unnest into try_process_unnest * chore: add some documentation comments and tests * Avoid cloning input * use consistent field names --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent a09bb05 commit dc6fc68

File tree

2 files changed

+249
-3
lines changed

2 files changed

+249
-3
lines changed

datafusion/sql/src/select.rs

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,20 @@ use crate::utils::{
2626
resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest,
2727
};
2828

29+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
2930
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
3031
use datafusion_common::{Column, UnnestOptions};
3132
use datafusion_expr::expr::Alias;
3233
use datafusion_expr::expr_rewriter::{
3334
normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols,
3435
};
36+
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
3537
use datafusion_expr::utils::{
3638
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns,
3739
find_aggregate_exprs, find_window_exprs,
3840
};
3941
use datafusion_expr::{
40-
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
42+
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
4143
};
4244
use sqlparser::ast::{
4345
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr,
@@ -297,6 +299,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
297299
input: LogicalPlan,
298300
select_exprs: Vec<Expr>,
299301
) -> Result<LogicalPlan> {
302+
// Try process group by unnest
303+
let input = self.try_process_aggregate_unnest(input)?;
304+
300305
let mut intermediate_plan = input;
301306
let mut intermediate_select_exprs = select_exprs;
302307
// Each expr in select_exprs can contains multiple unnest stage
@@ -354,6 +359,117 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
354359
.build()
355360
}
356361

362+
fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result<LogicalPlan> {
363+
match input {
364+
LogicalPlan::Aggregate(agg) => {
365+
let agg_expr = agg.aggr_expr.clone();
366+
let (new_input, new_group_by_exprs) =
367+
self.try_process_group_by_unnest(agg)?;
368+
LogicalPlanBuilder::from(new_input)
369+
.aggregate(new_group_by_exprs, agg_expr)?
370+
.build()
371+
}
372+
LogicalPlan::Filter(mut filter) => {
373+
filter.input = Arc::new(
374+
self.try_process_aggregate_unnest(unwrap_arc(filter.input))?,
375+
);
376+
Ok(LogicalPlan::Filter(filter))
377+
}
378+
_ => Ok(input),
379+
}
380+
}
381+
382+
/// Try converting Unnest(Expr) of group by to Unnest/Projection
383+
/// Return the new input and group_by_exprs of Aggregate.
384+
fn try_process_group_by_unnest(
385+
&self,
386+
agg: Aggregate,
387+
) -> Result<(LogicalPlan, Vec<Expr>)> {
388+
let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;
389+
390+
let Aggregate {
391+
input,
392+
group_expr,
393+
aggr_expr,
394+
..
395+
} = agg;
396+
397+
// process unnest of group_by_exprs, and input of agg will be rewritten
398+
// for example:
399+
//
400+
// ```
401+
// Aggregate: groupBy=[[UNNEST(Column(Column { relation: Some(Bare { table: "tab" }), name: "array_col" }))]], aggr=[[]]
402+
// TableScan: tab
403+
// ```
404+
//
405+
// will be transformed into
406+
//
407+
// ```
408+
// Aggregate: groupBy=[[unnest(tab.array_col)]], aggr=[[]]
409+
// Unnest: lists[unnest(tab.array_col)] structs[]
410+
// Projection: tab.array_col AS unnest(tab.array_col)
411+
// TableScan: tab
412+
// ```
413+
let mut intermediate_plan = unwrap_arc(input);
414+
let mut intermediate_select_exprs = group_expr;
415+
416+
loop {
417+
let mut unnest_columns = vec![];
418+
let mut inner_projection_exprs = vec![];
419+
420+
let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs
421+
.iter()
422+
.map(|expr| {
423+
transform_bottom_unnest(
424+
&intermediate_plan,
425+
&mut unnest_columns,
426+
&mut inner_projection_exprs,
427+
expr,
428+
)
429+
})
430+
.collect::<Result<Vec<_>>>()?
431+
.into_iter()
432+
.flatten()
433+
.collect();
434+
435+
if unnest_columns.is_empty() {
436+
break;
437+
} else {
438+
let columns = unnest_columns.into_iter().map(|col| col.into()).collect();
439+
let unnest_options = UnnestOptions::new().with_preserve_nulls(false);
440+
441+
let mut projection_exprs = match &aggr_expr_using_columns {
442+
Some(exprs) => (*exprs).clone(),
443+
None => {
444+
let mut columns = HashSet::new();
445+
for expr in &aggr_expr {
446+
expr.apply(|expr| {
447+
if let Expr::Column(c) = expr {
448+
columns.insert(Expr::Column(c.clone()));
449+
}
450+
Ok(TreeNodeRecursion::Continue)
451+
})
452+
// As the closure always returns Ok, this "can't" error
453+
.expect("Unexpected error");
454+
}
455+
aggr_expr_using_columns = Some(columns.clone());
456+
columns
457+
}
458+
};
459+
projection_exprs.extend(inner_projection_exprs);
460+
461+
intermediate_plan = LogicalPlanBuilder::from(intermediate_plan)
462+
.project(projection_exprs)?
463+
.unnest_columns_with_options(columns, unnest_options)?
464+
.build()?;
465+
466+
intermediate_select_exprs = outer_projection_exprs;
467+
}
468+
}
469+
470+
Ok((intermediate_plan, intermediate_select_exprs))
471+
}
472+
357473
fn plan_selection(
358474
&self,
359475
selection: Option<SQLExpr>,

datafusion/sqllogictest/test_files/unnest.slt

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,6 @@ select unnest(column1) from (select * from (values([1,2,3]), ([4,5,6])) limit 1
500500
query error DataFusion error: Error during planning: Projections require unique expression names but the expression "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 0 and "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 1 have the same name. Consider aliasing \("AS"\) one of them.
501501
select unnest(column1), unnest(column1) from unnest_table;
502502

503-
statement ok
504-
drop table unnest_table;
505503

506504
## unnest list followed by unnest struct
507505
query ???
@@ -557,3 +555,135 @@ physical_plan
557555
06)----------UnnestExec
558556
07)------------ProjectionExec: expr=[column3@0 as unnest(recursive_unnest_table.column3), column3@0 as column3]
559557
08)--------------MemoryExec: partitions=1, partition_sizes=[1]
558+
559+
## group by unnest
560+
561+
### without agg exprs
562+
query I
563+
select unnest(column1) c1 from unnest_table group by c1 order by c1;
564+
----
565+
1
566+
2
567+
3
568+
4
569+
5
570+
6
571+
12
572+
573+
query II
574+
select unnest(column1) c1, unnest(column2) c2 from unnest_table group by c1, c2 order by c1, c2;
575+
----
576+
1 7
577+
2 NULL
578+
3 NULL
579+
4 8
580+
5 9
581+
6 11
582+
12 NULL
583+
NULL 10
584+
NULL 12
585+
NULL 42
586+
NULL NULL
587+
588+
query III
589+
select unnest(column1) c1, unnest(column2) c2, column3 c3 from unnest_table group by c1, c2, c3 order by c1, c2, c3;
590+
----
591+
1 7 1
592+
2 NULL 1
593+
3 NULL 1
594+
4 8 2
595+
5 9 2
596+
6 11 3
597+
12 NULL NULL
598+
NULL 10 2
599+
NULL 12 3
600+
NULL 42 NULL
601+
NULL NULL NULL
602+
603+
### with agg exprs
604+
605+
query IIII
606+
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(1) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
607+
----
608+
1 7 1 1
609+
2 NULL 1 1
610+
3 NULL 1 1
611+
4 8 2 1
612+
5 9 2 1
613+
6 11 3 1
614+
12 NULL NULL 1
615+
NULL 10 2 1
616+
NULL 12 3 1
617+
NULL 42 NULL 1
618+
NULL NULL NULL 1
619+
620+
query IIII
621+
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
622+
----
623+
1 7 1 1
624+
2 NULL 1 1
625+
3 NULL 1 1
626+
4 8 2 1
627+
5 9 2 1
628+
6 11 3 0
629+
12 NULL NULL 0
630+
NULL 10 2 1
631+
NULL 12 3 0
632+
NULL 42 NULL 0
633+
NULL NULL NULL 0
634+
635+
query IIIII
636+
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4), sum(column3) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
637+
----
638+
1 7 1 1 1
639+
2 NULL 1 1 1
640+
3 NULL 1 1 1
641+
4 8 2 1 2
642+
5 9 2 1 2
643+
6 11 3 0 3
644+
12 NULL NULL 0 NULL
645+
NULL 10 2 1 2
646+
NULL 12 3 0 3
647+
NULL 42 NULL 0 NULL
648+
NULL NULL NULL 0 NULL
649+
650+
query II
651+
select unnest(column1), count(*) from unnest_table group by unnest(column1) order by unnest(column1) desc;
652+
----
653+
12 1
654+
6 1
655+
5 1
656+
4 1
657+
3 1
658+
2 1
659+
1 1
660+
661+
### group by recursive unnest list
662+
663+
query ?
664+
select unnest(unnest(column2)) c2 from recursive_unnest_table group by c2 order by c2;
665+
----
666+
[1]
667+
[1, 1]
668+
[2]
669+
[3, 4]
670+
[5]
671+
[7, 8]
672+
[, 6]
673+
NULL
674+
675+
query ?I
676+
select unnest(unnest(column2)) c2, count(column3) from recursive_unnest_table group by c2 order by c2;
677+
----
678+
[1] 1
679+
[1, 1] 1
680+
[2] 1
681+
[3, 4] 1
682+
[5] 1
683+
[7, 8] 1
684+
[, 6] 1
685+
NULL 1
686+
687+
### TODO: group by unnest struct
688+
query error DataFusion error: Error during planning: Projection references non\-aggregate values
689+
select unnest(column1) c1 from nested_unnest_table group by c1.c0;

0 commit comments

Comments
 (0)