Skip to content

Commit 8a7b66d

Browse files
devinjdangelofindepi
authored andcommitted
readability refactor (apache#10788)
1 parent a00f5d6 commit 8a7b66d

File tree

2 files changed

+102
-87
lines changed

2 files changed

+102
-87
lines changed

datafusion/sql/src/unparser/plan.rs

Lines changed: 83 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use datafusion_common::{internal_err, not_impl_err, plan_err, DataFusionError, Result};
1919
use datafusion_expr::{
20-
expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
20+
expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, Projection,
2121
};
2222
use sqlparser::ast::{self, SetExpr};
2323

@@ -131,6 +131,87 @@ impl Unparser<'_> {
131131
Ok(ast::SetExpr::Select(Box::new(select_builder.build()?)))
132132
}
133133

134+
/// Reconstructs a SELECT SQL statement from a logical plan by unprojecting column expressions
135+
/// found in a [Projection] node. This requires scanning the plan tree for relevant Aggregate
136+
/// and Window nodes and matching column expressions to the appropriate agg or window expressions.
137+
fn reconstruct_select_statement(
138+
&self,
139+
plan: &LogicalPlan,
140+
p: &Projection,
141+
select: &mut SelectBuilder,
142+
) -> Result<()> {
143+
match find_agg_node_within_select(plan, None, true) {
144+
Some(AggVariant::Aggregate(agg)) => {
145+
let items = p
146+
.expr
147+
.iter()
148+
.map(|proj_expr| {
149+
let unproj = unproject_agg_exprs(proj_expr, agg)?;
150+
self.select_item_to_sql(&unproj)
151+
})
152+
.collect::<Result<Vec<_>>>()?;
153+
154+
select.projection(items);
155+
select.group_by(ast::GroupByExpr::Expressions(
156+
agg.group_expr
157+
.iter()
158+
.map(|expr| self.expr_to_sql(expr))
159+
.collect::<Result<Vec<_>>>()?,
160+
));
161+
}
162+
Some(AggVariant::Window(window)) => {
163+
let items = p
164+
.expr
165+
.iter()
166+
.map(|proj_expr| {
167+
let unproj = unproject_window_exprs(proj_expr, &window)?;
168+
self.select_item_to_sql(&unproj)
169+
})
170+
.collect::<Result<Vec<_>>>()?;
171+
172+
select.projection(items);
173+
}
174+
None => {
175+
let items = p
176+
.expr
177+
.iter()
178+
.map(|e| self.select_item_to_sql(e))
179+
.collect::<Result<Vec<_>>>()?;
180+
select.projection(items);
181+
}
182+
}
183+
Ok(())
184+
}
185+
186+
fn projection_to_sql(
187+
&self,
188+
plan: &LogicalPlan,
189+
p: &Projection,
190+
query: &mut Option<QueryBuilder>,
191+
select: &mut SelectBuilder,
192+
relation: &mut RelationBuilder,
193+
) -> Result<()> {
194+
// A second projection implies a derived tablefactor
195+
if !select.already_projected() {
196+
self.reconstruct_select_statement(plan, p, select)?;
197+
self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
198+
} else {
199+
let mut derived_builder = DerivedRelationBuilder::default();
200+
derived_builder.lateral(false).alias(None).subquery({
201+
let inner_statment = self.plan_to_sql(plan)?;
202+
if let ast::Statement::Query(inner_query) = inner_statment {
203+
inner_query
204+
} else {
205+
return internal_err!(
206+
"Subquery must be a Query, but found {inner_statment:?}"
207+
);
208+
}
209+
});
210+
relation.derived(derived_builder);
211+
Ok(())
212+
}
213+
}
214+
134215
fn select_to_sql_recursively(
135216
&self,
136217
plan: &LogicalPlan,
@@ -159,74 +240,7 @@ impl Unparser<'_> {
159240
Ok(())
160241
}
161242
LogicalPlan::Projection(p) => {
162-
// A second projection implies a derived tablefactor
163-
if !select.already_projected() {
164-
// Special handling when projecting an agregation plan
165-
if let Some(aggvariant) =
166-
find_agg_node_within_select(plan, None, true)
167-
{
168-
match aggvariant {
169-
AggVariant::Aggregate(agg) => {
170-
let items = p
171-
.expr
172-
.iter()
173-
.map(|proj_expr| {
174-
let unproj = unproject_agg_exprs(proj_expr, agg)?;
175-
self.select_item_to_sql(&unproj)
176-
})
177-
.collect::<Result<Vec<_>>>()?;
178-
179-
select.projection(items);
180-
select.group_by(ast::GroupByExpr::Expressions(
181-
agg.group_expr
182-
.iter()
183-
.map(|expr| self.expr_to_sql(expr))
184-
.collect::<Result<Vec<_>>>()?,
185-
));
186-
}
187-
AggVariant::Window(window) => {
188-
let items = p
189-
.expr
190-
.iter()
191-
.map(|proj_expr| {
192-
let unproj =
193-
unproject_window_exprs(proj_expr, &window)?;
194-
self.select_item_to_sql(&unproj)
195-
})
196-
.collect::<Result<Vec<_>>>()?;
197-
198-
select.projection(items);
199-
}
200-
}
201-
} else {
202-
let items = p
203-
.expr
204-
.iter()
205-
.map(|e| self.select_item_to_sql(e))
206-
.collect::<Result<Vec<_>>>()?;
207-
select.projection(items);
208-
}
209-
self.select_to_sql_recursively(
210-
p.input.as_ref(),
211-
query,
212-
select,
213-
relation,
214-
)
215-
} else {
216-
let mut derived_builder = DerivedRelationBuilder::default();
217-
derived_builder.lateral(false).alias(None).subquery({
218-
let inner_statment = self.plan_to_sql(plan)?;
219-
if let ast::Statement::Query(inner_query) = inner_statment {
220-
inner_query
221-
} else {
222-
return internal_err!(
223-
"Subquery must be a Query, but found {inner_statment:?}"
224-
);
225-
}
226-
});
227-
relation.derived(derived_builder);
228-
Ok(())
229-
}
243+
self.projection_to_sql(plan, p, query, select, relation)
230244
}
231245
LogicalPlan::Filter(filter) => {
232246
if let Some(AggVariant::Aggregate(agg)) =

datafusion/sql/src/unparser/utils.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -46,29 +46,30 @@ pub(crate) fn find_agg_node_within_select<'a>(
4646
} else {
4747
input.first()?
4848
};
49+
4950
// Agg nodes explicitly return immediately with a single node
5051
// Window nodes accumulate in a vec until encountering a TableScan or 2nd projection
51-
if let LogicalPlan::Aggregate(agg) = input {
52-
Some(AggVariant::Aggregate(agg))
53-
} else if let LogicalPlan::Window(window) = input {
54-
prev_windows = match &mut prev_windows {
55-
Some(AggVariant::Window(windows)) => {
56-
windows.push(window);
52+
match input {
53+
LogicalPlan::Aggregate(agg) => Some(AggVariant::Aggregate(agg)),
54+
LogicalPlan::Window(window) => {
55+
prev_windows = match &mut prev_windows {
56+
Some(AggVariant::Window(windows)) => {
57+
windows.push(window);
58+
prev_windows
59+
}
60+
_ => Some(AggVariant::Window(vec![window])),
61+
};
62+
find_agg_node_within_select(input, prev_windows, already_projected)
63+
}
64+
LogicalPlan::Projection(_) => {
65+
if already_projected {
5766
prev_windows
67+
} else {
68+
find_agg_node_within_select(input, prev_windows, true)
5869
}
59-
_ => Some(AggVariant::Window(vec![window])),
60-
};
61-
find_agg_node_within_select(input, prev_windows, already_projected)
62-
} else if let LogicalPlan::TableScan(_) = input {
63-
prev_windows
64-
} else if let LogicalPlan::Projection(_) = input {
65-
if already_projected {
66-
prev_windows
67-
} else {
68-
find_agg_node_within_select(input, prev_windows, true)
6970
}
70-
} else {
71-
find_agg_node_within_select(input, prev_windows, already_projected)
71+
LogicalPlan::TableScan(_) => prev_windows,
72+
_ => find_agg_node_within_select(input, prev_windows, already_projected),
7273
}
7374
}
7475

0 commit comments

Comments
 (0)