Skip to content

Commit 3438b35

Browse files
authored
Move wildcard expansions to the analyzer (#11681)
* allow qualified wildcard in the logical plan * move wildcard expansions to the analyzer * fix fmt * fix the view tests * expand wildcard for schema * fix for union query * cargo fmt clippy * move wildcard expanding tests to expand_wildcard_rule.rs * coercion the expanded wildcard expression in union * remove debug message * move wildcard options to logical plan * remove unused function * add the doc for expression function * fix cargo check * fix cargo fmt * fix test * extract expand_exprlist * expand wildcard for functional_dependencies * refine the doc * fix tests * fix expand exclude and except * remove unused import * fix check and update function * fix check * throw the error when exprlist to field * fix functional_dependency and exclude * fix projection_schema * fix the window functions * fix clippy and support unparsing wildcard * fix clippy and fmt * add the doc for util functions * fix unique expression check for projection * cargo fmt * move test and solve dependency issue * address review comments * add the fail reason * enhance the doc * add more doc
1 parent e8ac93a commit 3438b35

File tree

27 files changed

+1057
-254
lines changed

27 files changed

+1057
-254
lines changed

datafusion/core/src/datasource/view.rs

+38-6
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@
1919
2020
use std::{any::Any, sync::Arc};
2121

22-
use arrow::datatypes::SchemaRef;
23-
use async_trait::async_trait;
24-
use datafusion_catalog::Session;
25-
use datafusion_common::Column;
26-
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
27-
2822
use crate::{
2923
error::Result,
3024
logical_expr::{Expr, LogicalPlan},
3125
physical_plan::ExecutionPlan,
3226
};
27+
use arrow::datatypes::SchemaRef;
28+
use async_trait::async_trait;
29+
use datafusion_catalog::Session;
30+
use datafusion_common::config::ConfigOptions;
31+
use datafusion_common::Column;
32+
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
33+
use datafusion_optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule;
34+
use datafusion_optimizer::Analyzer;
3335

3436
use crate::datasource::{TableProvider, TableType};
3537

@@ -50,6 +52,7 @@ impl ViewTable {
5052
logical_plan: LogicalPlan,
5153
definition: Option<String>,
5254
) -> Result<Self> {
55+
let logical_plan = Self::apply_required_rule(logical_plan)?;
5356
let table_schema = logical_plan.schema().as_ref().to_owned().into();
5457

5558
let view = Self {
@@ -61,6 +64,15 @@ impl ViewTable {
6164
Ok(view)
6265
}
6366

67+
fn apply_required_rule(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
68+
let options = ConfigOptions::default();
69+
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]).execute_and_check(
70+
logical_plan,
71+
&options,
72+
|_, _| {},
73+
)
74+
}
75+
6476
/// Get definition ref
6577
pub fn definition(&self) -> Option<&String> {
6678
self.definition.as_ref()
@@ -232,6 +244,26 @@ mod tests {
232244

233245
assert_batches_eq!(expected, &results);
234246

247+
let view_sql =
248+
"CREATE VIEW replace_xyz AS SELECT * REPLACE (column1*2 as column1) FROM xyz";
249+
session_ctx.sql(view_sql).await?.collect().await?;
250+
251+
let results = session_ctx
252+
.sql("SELECT * FROM replace_xyz")
253+
.await?
254+
.collect()
255+
.await?;
256+
257+
let expected = [
258+
"+---------+---------+---------+",
259+
"| column1 | column2 | column3 |",
260+
"+---------+---------+---------+",
261+
"| 2 | 2 | 3 |",
262+
"| 8 | 5 | 6 |",
263+
"+---------+---------+---------+",
264+
];
265+
266+
assert_batches_eq!(expected, &results);
235267
Ok(())
236268
}
237269

datafusion/core/src/execution/context/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,6 @@ impl SessionContext {
718718
}
719719
(_, Err(_)) => {
720720
let table = Arc::new(ViewTable::try_new((*input).clone(), definition)?);
721-
722721
self.register_table(name, table)?;
723722
self.return_empty_dataframe()
724723
}

datafusion/expr/src/expr.rs

+211-10
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ use datafusion_common::{
4141
internal_err, not_impl_err, plan_err, Column, DFSchema, Result, ScalarValue,
4242
TableReference,
4343
};
44-
use sqlparser::ast::NullTreatment;
44+
use sqlparser::ast::{
45+
display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem,
46+
NullTreatment, RenameSelectItem, ReplaceSelectElement,
47+
};
4548

4649
/// Represents logical expressions such as `A + 1`, or `CAST(c1 AS int)`.
4750
///
@@ -315,7 +318,10 @@ pub enum Expr {
315318
///
316319
/// This expr has to be resolved to a list of columns before translating logical
317320
/// plan into physical plan.
318-
Wildcard { qualifier: Option<TableReference> },
321+
Wildcard {
322+
qualifier: Option<TableReference>,
323+
options: WildcardOptions,
324+
},
319325
/// List of grouping set expressions. Only valid in the context of an aggregate
320326
/// GROUP BY expression list
321327
GroupingSet(GroupingSet),
@@ -970,6 +976,89 @@ impl GroupingSet {
970976
}
971977
}
972978

979+
/// Additional options for wildcards, e.g. Snowflake `EXCLUDE`/`RENAME` and Bigquery `EXCEPT`.
980+
#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
981+
pub struct WildcardOptions {
982+
/// `[ILIKE...]`.
983+
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
984+
pub ilike: Option<IlikeSelectItem>,
985+
/// `[EXCLUDE...]`.
986+
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
987+
pub exclude: Option<ExcludeSelectItem>,
988+
/// `[EXCEPT...]`.
989+
/// BigQuery syntax: <https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#select_except>
990+
/// Clickhouse syntax: <https://clickhouse.com/docs/en/sql-reference/statements/select#except>
991+
pub except: Option<ExceptSelectItem>,
992+
/// `[REPLACE]`
993+
/// BigQuery syntax: <https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#select_replace>
994+
/// Clickhouse syntax: <https://clickhouse.com/docs/en/sql-reference/statements/select#replace>
995+
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
996+
pub replace: Option<PlannedReplaceSelectItem>,
997+
/// `[RENAME ...]`.
998+
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
999+
pub rename: Option<RenameSelectItem>,
1000+
}
1001+
1002+
impl WildcardOptions {
1003+
pub fn with_replace(self, replace: PlannedReplaceSelectItem) -> Self {
1004+
WildcardOptions {
1005+
ilike: self.ilike,
1006+
exclude: self.exclude,
1007+
except: self.except,
1008+
replace: Some(replace),
1009+
rename: self.rename,
1010+
}
1011+
}
1012+
}
1013+
1014+
impl Display for WildcardOptions {
1015+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1016+
if let Some(ilike) = &self.ilike {
1017+
write!(f, " {ilike}")?;
1018+
}
1019+
if let Some(exclude) = &self.exclude {
1020+
write!(f, " {exclude}")?;
1021+
}
1022+
if let Some(except) = &self.except {
1023+
write!(f, " {except}")?;
1024+
}
1025+
if let Some(replace) = &self.replace {
1026+
write!(f, " {replace}")?;
1027+
}
1028+
if let Some(rename) = &self.rename {
1029+
write!(f, " {rename}")?;
1030+
}
1031+
Ok(())
1032+
}
1033+
}
1034+
1035+
/// The planned expressions for `REPLACE`
1036+
#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
1037+
pub struct PlannedReplaceSelectItem {
1038+
/// The original ast nodes
1039+
pub items: Vec<ReplaceSelectElement>,
1040+
/// The expression planned from the ast nodes. They will be used when expanding the wildcard.
1041+
pub planned_expressions: Vec<Expr>,
1042+
}
1043+
1044+
impl Display for PlannedReplaceSelectItem {
1045+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1046+
write!(f, "REPLACE")?;
1047+
write!(f, " ({})", display_comma_separated(&self.items))?;
1048+
Ok(())
1049+
}
1050+
}
1051+
1052+
impl PlannedReplaceSelectItem {
1053+
pub fn items(&self) -> &[ReplaceSelectElement] {
1054+
&self.items
1055+
}
1056+
1057+
pub fn expressions(&self) -> &[Expr] {
1058+
&self.planned_expressions
1059+
}
1060+
}
1061+
9731062
/// Fixed seed for the hashing so that Ords are consistent across runs
9741063
const SEED: ahash::RandomState = ahash::RandomState::with_seeds(0, 0, 0, 0);
9751064

@@ -1720,8 +1809,9 @@ impl Expr {
17201809
Expr::ScalarSubquery(subquery) => {
17211810
subquery.hash(hasher);
17221811
}
1723-
Expr::Wildcard { qualifier } => {
1812+
Expr::Wildcard { qualifier, options } => {
17241813
qualifier.hash(hasher);
1814+
options.hash(hasher);
17251815
}
17261816
Expr::GroupingSet(grouping_set) => {
17271817
mem::discriminant(grouping_set).hash(hasher);
@@ -2242,9 +2332,9 @@ impl fmt::Display for Expr {
22422332
write!(f, "{expr} IN ([{}])", expr_vec_fmt!(list))
22432333
}
22442334
}
2245-
Expr::Wildcard { qualifier } => match qualifier {
2246-
Some(qualifier) => write!(f, "{qualifier}.*"),
2247-
None => write!(f, "*"),
2335+
Expr::Wildcard { qualifier, options } => match qualifier {
2336+
Some(qualifier) => write!(f, "{qualifier}.*{options}"),
2337+
None => write!(f, "*{options}"),
22482338
},
22492339
Expr::GroupingSet(grouping_sets) => match grouping_sets {
22502340
GroupingSet::Rollup(exprs) => {
@@ -2543,9 +2633,10 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
25432633
Expr::Sort { .. } => {
25442634
internal_err!("Create physical name does not support sort expression")
25452635
}
2546-
Expr::Wildcard { .. } => {
2547-
internal_err!("Create physical name does not support wildcard")
2548-
}
2636+
Expr::Wildcard { qualifier, options } => match qualifier {
2637+
Some(qualifier) => Ok(format!("{}.*{}", qualifier, options)),
2638+
None => Ok(format!("*{}", options)),
2639+
},
25492640
Expr::Placeholder(_) => {
25502641
internal_err!("Create physical name does not support placeholder")
25512642
}
@@ -2558,7 +2649,12 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
25582649
#[cfg(test)]
25592650
mod test {
25602651
use crate::expr_fn::col;
2561-
use crate::{case, lit, ColumnarValue, ScalarUDF, ScalarUDFImpl, Volatility};
2652+
use crate::{
2653+
case, lit, qualified_wildcard, wildcard, wildcard_with_options, ColumnarValue,
2654+
ScalarUDF, ScalarUDFImpl, Volatility,
2655+
};
2656+
use sqlparser::ast;
2657+
use sqlparser::ast::{Ident, IdentWithAlias};
25622658
use std::any::Any;
25632659

25642660
#[test]
@@ -2859,4 +2955,109 @@ mod test {
28592955
);
28602956
assert_eq!(find_df_window_func("not_exist"), None)
28612957
}
2958+
2959+
#[test]
2960+
fn test_display_wildcard() {
2961+
assert_eq!(format!("{}", wildcard()), "*");
2962+
assert_eq!(format!("{}", qualified_wildcard("t1")), "t1.*");
2963+
assert_eq!(
2964+
format!(
2965+
"{}",
2966+
wildcard_with_options(wildcard_options(
2967+
Some(IlikeSelectItem {
2968+
pattern: "c1".to_string()
2969+
}),
2970+
None,
2971+
None,
2972+
None,
2973+
None
2974+
))
2975+
),
2976+
"* ILIKE 'c1'"
2977+
);
2978+
assert_eq!(
2979+
format!(
2980+
"{}",
2981+
wildcard_with_options(wildcard_options(
2982+
None,
2983+
Some(ExcludeSelectItem::Multiple(vec![
2984+
Ident::from("c1"),
2985+
Ident::from("c2")
2986+
])),
2987+
None,
2988+
None,
2989+
None
2990+
))
2991+
),
2992+
"* EXCLUDE (c1, c2)"
2993+
);
2994+
assert_eq!(
2995+
format!(
2996+
"{}",
2997+
wildcard_with_options(wildcard_options(
2998+
None,
2999+
None,
3000+
Some(ExceptSelectItem {
3001+
first_element: Ident::from("c1"),
3002+
additional_elements: vec![Ident::from("c2")]
3003+
}),
3004+
None,
3005+
None
3006+
))
3007+
),
3008+
"* EXCEPT (c1, c2)"
3009+
);
3010+
assert_eq!(
3011+
format!(
3012+
"{}",
3013+
wildcard_with_options(wildcard_options(
3014+
None,
3015+
None,
3016+
None,
3017+
Some(PlannedReplaceSelectItem {
3018+
items: vec![ReplaceSelectElement {
3019+
expr: ast::Expr::Identifier(Ident::from("c1")),
3020+
column_name: Ident::from("a1"),
3021+
as_keyword: false
3022+
}],
3023+
planned_expressions: vec![]
3024+
}),
3025+
None
3026+
))
3027+
),
3028+
"* REPLACE (c1 a1)"
3029+
);
3030+
assert_eq!(
3031+
format!(
3032+
"{}",
3033+
wildcard_with_options(wildcard_options(
3034+
None,
3035+
None,
3036+
None,
3037+
None,
3038+
Some(RenameSelectItem::Multiple(vec![IdentWithAlias {
3039+
ident: Ident::from("c1"),
3040+
alias: Ident::from("a1")
3041+
}]))
3042+
))
3043+
),
3044+
"* RENAME (c1 AS a1)"
3045+
)
3046+
}
3047+
3048+
fn wildcard_options(
3049+
opt_ilike: Option<IlikeSelectItem>,
3050+
opt_exclude: Option<ExcludeSelectItem>,
3051+
opt_except: Option<ExceptSelectItem>,
3052+
opt_replace: Option<PlannedReplaceSelectItem>,
3053+
opt_rename: Option<RenameSelectItem>,
3054+
) -> WildcardOptions {
3055+
WildcardOptions {
3056+
ilike: opt_ilike,
3057+
exclude: opt_exclude,
3058+
except: opt_except,
3059+
replace: opt_replace,
3060+
rename: opt_rename,
3061+
}
3062+
}
28623063
}

0 commit comments

Comments
 (0)