@@ -26,6 +26,7 @@ use crate::{
26
26
} ,
27
27
print_options:: PrintOptions ,
28
28
} ;
29
+ use datafusion:: sql:: { parser:: DFParser , sqlparser:: dialect:: dialect_from_str} ;
29
30
use datafusion:: {
30
31
datasource:: listing:: ListingTableUrl ,
31
32
error:: { DataFusionError , Result } ,
@@ -192,18 +193,29 @@ async fn exec_and_print(
192
193
let now = Instant :: now ( ) ;
193
194
194
195
let sql = unescape_input ( & sql) ?;
195
- let plan = ctx. state ( ) . create_logical_plan ( & sql) . await ?;
196
- let df = match & plan {
197
- LogicalPlan :: Ddl ( DdlStatement :: CreateExternalTable ( cmd) ) => {
198
- create_external_table ( ctx, cmd) . await ?;
199
- ctx. execute_logical_plan ( plan) . await ?
200
- }
201
- _ => ctx. execute_logical_plan ( plan) . await ?,
202
- } ;
203
-
204
- let results = df. collect ( ) . await ?;
205
- print_options. print_batches ( & results, now) ?;
196
+ let task_ctx = ctx. task_ctx ( ) ;
197
+ let dialect = & task_ctx. session_config ( ) . options ( ) . sql_parser . dialect ;
198
+ let dialect = dialect_from_str ( dialect) . ok_or_else ( || {
199
+ DataFusionError :: Plan ( format ! (
200
+ "Unsupported SQL dialect: {dialect}. Available dialects: \
201
+ Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
202
+ MsSQL, ClickHouse, BigQuery, Ansi."
203
+ ) )
204
+ } ) ?;
205
+ let statements = DFParser :: parse_sql_with_dialect ( & sql, dialect. as_ref ( ) ) ?;
206
+ for statement in statements {
207
+ let plan = ctx. state ( ) . statement_to_plan ( statement) . await ?;
208
+ let df = match & plan {
209
+ LogicalPlan :: Ddl ( DdlStatement :: CreateExternalTable ( cmd) ) => {
210
+ create_external_table ( ctx, cmd) . await ?;
211
+ ctx. execute_logical_plan ( plan) . await ?
212
+ }
213
+ _ => ctx. execute_logical_plan ( plan) . await ?,
214
+ } ;
206
215
216
+ let results = df. collect ( ) . await ?;
217
+ print_options. print_batches ( & results, now) ?;
218
+ }
207
219
Ok ( ( ) )
208
220
}
209
221
0 commit comments