16
16
// under the License.
17
17
18
18
use datafusion:: error:: Result ;
19
-
19
+ use datafusion :: logical_expr :: sqlparser :: ast :: Statement ;
20
20
use datafusion:: prelude:: * ;
21
21
use datafusion:: sql:: unparser:: expr_to_sql;
22
+ use datafusion_common:: DFSchemaRef ;
23
+ use datafusion_expr:: {
24
+ Extension , LogicalPlan , LogicalPlanBuilder , UserDefinedLogicalNode ,
25
+ UserDefinedLogicalNodeCore ,
26
+ } ;
27
+ use datafusion_sql:: unparser:: ast:: {
28
+ DerivedRelationBuilder , QueryBuilder , RelationBuilder , SelectBuilder ,
29
+ } ;
22
30
use datafusion_sql:: unparser:: dialect:: CustomDialectBuilder ;
31
+ use datafusion_sql:: unparser:: extension_unparser:: UserDefinedLogicalNodeUnparser ;
32
+ use datafusion_sql:: unparser:: extension_unparser:: {
33
+ UnparseToStatementResult , UnparseWithinStatementResult ,
34
+ } ;
23
35
use datafusion_sql:: unparser:: { plan_to_sql, Unparser } ;
36
+ use std:: fmt;
37
+ use std:: sync:: Arc ;
24
38
25
39
/// This example demonstrates the programmatic construction of SQL strings using
26
40
/// the DataFusion Expr [`Expr`] and LogicalPlan [`LogicalPlan`] API.
@@ -44,6 +58,10 @@ use datafusion_sql::unparser::{plan_to_sql, Unparser};
44
58
///
45
59
/// 5. [`round_trip_plan_to_sql_demo`]: Create a logical plan from a SQL string, modify it using the
46
60
/// DataFrames API and convert it back to a sql string.
61
+ ///
62
+ /// 6. [`unparse_my_logical_plan_as_statement`]: Create a custom logical plan and unparse it as a statement.
63
+ ///
64
+ /// 7. [`unparse_my_logical_plan_as_subquery`]: Create a custom logical plan and unparse it as a subquery.
47
65
48
66
#[ tokio:: main]
49
67
async fn main ( ) -> Result < ( ) > {
@@ -53,6 +71,8 @@ async fn main() -> Result<()> {
53
71
simple_expr_to_sql_demo_escape_mysql_style ( ) ?;
54
72
simple_plan_to_sql_demo ( ) . await ?;
55
73
round_trip_plan_to_sql_demo ( ) . await ?;
74
+ unparse_my_logical_plan_as_statement ( ) . await ?;
75
+ unparse_my_logical_plan_as_subquery ( ) . await ?;
56
76
Ok ( ( ) )
57
77
}
58
78
@@ -152,3 +172,144 @@ async fn round_trip_plan_to_sql_demo() -> Result<()> {
152
172
153
173
Ok ( ( ) )
154
174
}
175
+
176
+ #[ derive( Debug , PartialEq , Eq , Hash , PartialOrd ) ]
177
+ struct MyLogicalPlan {
178
+ input : LogicalPlan ,
179
+ }
180
+
181
+ impl UserDefinedLogicalNodeCore for MyLogicalPlan {
182
+ fn name ( & self ) -> & str {
183
+ "MyLogicalPlan"
184
+ }
185
+
186
+ fn inputs ( & self ) -> Vec < & LogicalPlan > {
187
+ vec ! [ & self . input]
188
+ }
189
+
190
+ fn schema ( & self ) -> & DFSchemaRef {
191
+ self . input . schema ( )
192
+ }
193
+
194
+ fn expressions ( & self ) -> Vec < Expr > {
195
+ vec ! [ ]
196
+ }
197
+
198
+ fn fmt_for_explain ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
199
+ write ! ( f, "MyLogicalPlan" )
200
+ }
201
+
202
+ fn with_exprs_and_inputs (
203
+ & self ,
204
+ _exprs : Vec < Expr > ,
205
+ inputs : Vec < LogicalPlan > ,
206
+ ) -> Result < Self > {
207
+ Ok ( MyLogicalPlan {
208
+ input : inputs. into_iter ( ) . next ( ) . unwrap ( ) ,
209
+ } )
210
+ }
211
+ }
212
+
213
+ struct PlanToStatement { }
214
+ impl UserDefinedLogicalNodeUnparser for PlanToStatement {
215
+ fn unparse_to_statement (
216
+ & self ,
217
+ node : & dyn UserDefinedLogicalNode ,
218
+ unparser : & Unparser ,
219
+ ) -> Result < UnparseToStatementResult > {
220
+ if let Some ( plan) = node. as_any ( ) . downcast_ref :: < MyLogicalPlan > ( ) {
221
+ let input = unparser. plan_to_sql ( & plan. input ) ?;
222
+ Ok ( UnparseToStatementResult :: Modified ( input) )
223
+ } else {
224
+ Ok ( UnparseToStatementResult :: Unmodified )
225
+ }
226
+ }
227
+ }
228
+
229
+ /// This example demonstrates how to unparse a custom logical plan as a statement.
230
+ /// The custom logical plan is a simple extension of the logical plan that reads from a parquet file.
231
+ /// It can be unparse as a statement that reads from the same parquet file.
232
+ async fn unparse_my_logical_plan_as_statement ( ) -> Result < ( ) > {
233
+ let ctx = SessionContext :: new ( ) ;
234
+ let testdata = datafusion:: test_util:: parquet_test_data ( ) ;
235
+ let inner_plan = ctx
236
+ . read_parquet (
237
+ & format ! ( "{testdata}/alltypes_plain.parquet" ) ,
238
+ ParquetReadOptions :: default ( ) ,
239
+ )
240
+ . await ?
241
+ . select_columns ( & [ "id" , "int_col" , "double_col" , "date_string_col" ] ) ?
242
+ . into_unoptimized_plan ( ) ;
243
+
244
+ let node = Arc :: new ( MyLogicalPlan { input : inner_plan } ) ;
245
+
246
+ let my_plan = LogicalPlan :: Extension ( Extension { node } ) ;
247
+ let unparser =
248
+ Unparser :: default ( ) . with_extension_unparsers ( vec ! [ Arc :: new( PlanToStatement { } ) ] ) ;
249
+ let sql = unparser. plan_to_sql ( & my_plan) ?. to_string ( ) ;
250
+ assert_eq ! (
251
+ sql,
252
+ r#"SELECT "?table?".id, "?table?".int_col, "?table?".double_col, "?table?".date_string_col FROM "?table?""#
253
+ ) ;
254
+ Ok ( ( ) )
255
+ }
256
+
257
+ struct PlanToSubquery { }
258
+ impl UserDefinedLogicalNodeUnparser for PlanToSubquery {
259
+ fn unparse (
260
+ & self ,
261
+ node : & dyn UserDefinedLogicalNode ,
262
+ unparser : & Unparser ,
263
+ _query : & mut Option < & mut QueryBuilder > ,
264
+ _select : & mut Option < & mut SelectBuilder > ,
265
+ relation : & mut Option < & mut RelationBuilder > ,
266
+ ) -> Result < UnparseWithinStatementResult > {
267
+ if let Some ( plan) = node. as_any ( ) . downcast_ref :: < MyLogicalPlan > ( ) {
268
+ let Statement :: Query ( input) = unparser. plan_to_sql ( & plan. input ) ? else {
269
+ return Ok ( UnparseWithinStatementResult :: Unmodified ) ;
270
+ } ;
271
+ let mut derived_builder = DerivedRelationBuilder :: default ( ) ;
272
+ derived_builder. subquery ( input) ;
273
+ derived_builder. lateral ( false ) ;
274
+ if let Some ( rel) = relation {
275
+ rel. derived ( derived_builder) ;
276
+ }
277
+ }
278
+ Ok ( UnparseWithinStatementResult :: Modified )
279
+ }
280
+ }
281
+
282
+ /// This example demonstrates how to unparse a custom logical plan as a subquery.
283
+ /// The custom logical plan is a simple extension of the logical plan that reads from a parquet file.
284
+ /// It can be unparse as a subquery that reads from the same parquet file, with some columns projected.
285
+ async fn unparse_my_logical_plan_as_subquery ( ) -> Result < ( ) > {
286
+ let ctx = SessionContext :: new ( ) ;
287
+ let testdata = datafusion:: test_util:: parquet_test_data ( ) ;
288
+ let inner_plan = ctx
289
+ . read_parquet (
290
+ & format ! ( "{testdata}/alltypes_plain.parquet" ) ,
291
+ ParquetReadOptions :: default ( ) ,
292
+ )
293
+ . await ?
294
+ . select_columns ( & [ "id" , "int_col" , "double_col" , "date_string_col" ] ) ?
295
+ . into_unoptimized_plan ( ) ;
296
+
297
+ let node = Arc :: new ( MyLogicalPlan { input : inner_plan } ) ;
298
+
299
+ let my_plan = LogicalPlan :: Extension ( Extension { node } ) ;
300
+ let plan = LogicalPlanBuilder :: from ( my_plan)
301
+ . project ( vec ! [
302
+ col( "id" ) . alias( "my_id" ) ,
303
+ col( "int_col" ) . alias( "my_int" ) ,
304
+ ] ) ?
305
+ . build ( ) ?;
306
+ let unparser =
307
+ Unparser :: default ( ) . with_extension_unparsers ( vec ! [ Arc :: new( PlanToSubquery { } ) ] ) ;
308
+ let sql = unparser. plan_to_sql ( & plan) ?. to_string ( ) ;
309
+ assert_eq ! (
310
+ sql,
311
+ "SELECT \" ?table?\" .id AS my_id, \" ?table?\" .int_col AS my_int FROM \
312
+ (SELECT \" ?table?\" .id, \" ?table?\" .int_col, \" ?table?\" .double_col, \" ?table?\" .date_string_col FROM \" ?table?\" )",
313
+ ) ;
314
+ Ok ( ( ) )
315
+ }
0 commit comments