@@ -28,7 +28,11 @@ use crate::{
28
28
optimizer:: optimizer:: Optimizer ,
29
29
physical_optimizer:: optimizer:: { PhysicalOptimizer , PhysicalOptimizerRule } ,
30
30
} ;
31
- use datafusion_common:: { alias:: AliasGenerator , not_impl_err, plan_err} ;
31
+ use datafusion_common:: {
32
+ alias:: AliasGenerator ,
33
+ not_impl_err, plan_err,
34
+ tree_node:: { TreeNode , TreeNodeVisitor , VisitRecursion } ,
35
+ } ;
32
36
use datafusion_execution:: registry:: SerializerRegistry ;
33
37
use datafusion_expr:: {
34
38
logical_plan:: { DdlStatement , Statement } ,
@@ -163,35 +167,64 @@ where
163
167
/// * Register a custom data source that can be referenced from a SQL query.
164
168
/// * Execution a SQL query
165
169
///
170
+ /// # Example: DataFrame API
171
+ ///
166
172
/// The following example demonstrates how to use the context to execute a query against a CSV
167
173
/// data source using the DataFrame API:
168
174
///
169
175
/// ```
170
176
/// use datafusion::prelude::*;
171
- /// # use datafusion::error::Result;
177
+ /// # use datafusion::{ error::Result, assert_batches_eq} ;
172
178
/// # #[tokio::main]
173
179
/// # async fn main() -> Result<()> {
174
180
/// let ctx = SessionContext::new();
175
181
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
176
182
/// let df = df.filter(col("a").lt_eq(col("b")))?
177
183
/// .aggregate(vec![col("a")], vec![min(col("b"))])?
178
184
/// .limit(0, Some(100))?;
179
- /// let results = df.collect();
185
+ /// let results = df
186
+ /// .collect()
187
+ /// .await?;
188
+ /// assert_batches_eq!(
189
+ /// &[
190
+ /// "+---+----------------+",
191
+ /// "| a | MIN(?table?.b) |",
192
+ /// "+---+----------------+",
193
+ /// "| 1 | 2 |",
194
+ /// "+---+----------------+",
195
+ /// ],
196
+ /// &results
197
+ /// );
180
198
/// # Ok(())
181
199
/// # }
182
200
/// ```
183
201
///
202
+ /// # Example: SQL API
203
+ ///
184
204
/// The following example demonstrates how to execute the same query using SQL:
185
205
///
186
206
/// ```
187
207
/// use datafusion::prelude::*;
188
- ///
189
- /// # use datafusion::error::Result;
208
+ /// # use datafusion::{error::Result, assert_batches_eq};
190
209
/// # #[tokio::main]
191
210
/// # async fn main() -> Result<()> {
192
211
/// let mut ctx = SessionContext::new();
193
212
/// ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
194
- /// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;
213
+ /// let results = ctx
214
+ /// .sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")
215
+ /// .await?
216
+ /// .collect()
217
+ /// .await?;
218
+ /// assert_batches_eq!(
219
+ /// &[
220
+ /// "+---+----------------+",
221
+ /// "| a | MIN(example.b) |",
222
+ /// "+---+----------------+",
223
+ /// "| 1 | 2 |",
224
+ /// "+---+----------------+",
225
+ /// ],
226
+ /// &results
227
+ /// );
195
228
/// # Ok(())
196
229
/// # }
197
230
/// ```
@@ -342,22 +375,82 @@ impl SessionContext {
342
375
self . state . read ( ) . config . clone ( )
343
376
}
344
377
345
- /// Creates a [`DataFrame`] that will execute a SQL query.
378
+ /// Creates a [`DataFrame`] from SQL query text .
346
379
///
347
380
/// Note: This API implements DDL statements such as `CREATE TABLE` and
348
381
/// `CREATE VIEW` and DML statements such as `INSERT INTO` with in-memory
349
- /// default implementations.
382
+ /// default implementations. See [`Self::sql_with_options`].
383
+ ///
384
+ /// # Example: Running SQL queries
385
+ ///
386
+ /// See the example on [`Self`]
350
387
///
351
- /// If this is not desirable, consider using [`SessionState::create_logical_plan()`] which
352
- /// does not mutate the state based on such statements.
388
+ /// # Example: Creating a Table with SQL
389
+ ///
390
+ /// ```
391
+ /// use datafusion::prelude::*;
392
+ /// # use datafusion::{error::Result, assert_batches_eq};
393
+ /// # #[tokio::main]
394
+ /// # async fn main() -> Result<()> {
395
+ /// let mut ctx = SessionContext::new();
396
+ /// ctx
397
+ /// .sql("CREATE TABLE foo (x INTEGER)")
398
+ /// .await?
399
+ /// .collect()
400
+ /// .await?;
401
+ /// assert!(ctx.table_exist("foo").unwrap());
402
+ /// # Ok(())
403
+ /// # }
404
+ /// ```
353
405
pub async fn sql ( & self , sql : & str ) -> Result < DataFrame > {
354
- // create a query planner
406
+ self . sql_with_options ( sql, SQLOptions :: new ( ) ) . await
407
+ }
408
+
409
+ /// Creates a [`DataFrame`] from SQL query text, first validating
410
+ /// that the queries are allowed by `options`
411
+ ///
412
+ /// # Example: Preventing Creating a Table with SQL
413
+ ///
414
+ /// If you want to avoid creating tables, or modifying data or the
415
+ /// session, set [`SQLOptions`] appropriately:
416
+ ///
417
+ /// ```
418
+ /// use datafusion::prelude::*;
419
+ /// # use datafusion::{error::Result};
420
+ /// # use datafusion::physical_plan::collect;
421
+ /// # #[tokio::main]
422
+ /// # async fn main() -> Result<()> {
423
+ /// let mut ctx = SessionContext::new();
424
+ /// let options = SQLOptions::new()
425
+ /// .with_allow_ddl(false);
426
+ /// let err = ctx.sql_with_options("CREATE TABLE foo (x INTEGER)", options)
427
+ /// .await
428
+ /// .unwrap_err();
429
+ /// assert_eq!(
430
+ /// err.to_string(),
431
+ /// "Error during planning: DDL not supported: CreateMemoryTable"
432
+ /// );
433
+ /// # Ok(())
434
+ /// # }
435
+ /// ```
436
+ pub async fn sql_with_options (
437
+ & self ,
438
+ sql : & str ,
439
+ options : SQLOptions ,
440
+ ) -> Result < DataFrame > {
355
441
let plan = self . state ( ) . create_logical_plan ( sql) . await ?;
442
+ options. verify_plan ( & plan) ?;
356
443
357
444
self . execute_logical_plan ( plan) . await
358
445
}
359
446
360
- /// Execute the [`LogicalPlan`], return a [`DataFrame`]
447
+ /// Execute the [`LogicalPlan`], return a [`DataFrame`]. This API
448
+ /// is not featured limited (so all SQL such as `CREATE TABLE` and
449
+ /// `COPY` will be run).
450
+ ///
451
+ /// If you wish to limit the type of plan that can be run from
452
+ /// SQL, see [`Self::sql_with_options`] and
453
+ /// [`SQLOptions::verify_plan`].
361
454
pub async fn execute_logical_plan ( & self , plan : LogicalPlan ) -> Result < DataFrame > {
362
455
match plan {
363
456
LogicalPlan :: Ddl ( ddl) => match ddl {
@@ -1304,7 +1397,7 @@ impl FunctionRegistry for SessionContext {
1304
1397
/// A planner used to add extensions to DataFusion logical and physical plans.
1305
1398
#[ async_trait]
1306
1399
pub trait QueryPlanner {
1307
- /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
1400
+ /// Given a `LogicalPlan`, create an [ `ExecutionPlan`] suitable for execution
1308
1401
async fn create_physical_plan (
1309
1402
& self ,
1310
1403
logical_plan : & LogicalPlan ,
@@ -1317,7 +1410,7 @@ struct DefaultQueryPlanner {}
1317
1410
1318
1411
#[ async_trait]
1319
1412
impl QueryPlanner for DefaultQueryPlanner {
1320
- /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
1413
+ /// Given a `LogicalPlan`, create an [ `ExecutionPlan`] suitable for execution
1321
1414
async fn create_physical_plan (
1322
1415
& self ,
1323
1416
logical_plan : & LogicalPlan ,
@@ -1628,7 +1721,8 @@ impl SessionState {
1628
1721
& mut self . table_factories
1629
1722
}
1630
1723
1631
- /// Convert a SQL string into an AST Statement
1724
+ /// Parse an SQL string into an DataFusion specific AST
1725
+ /// [`Statement`]. See [`SessionContext::sql`] for running queries.
1632
1726
pub fn sql_to_statement (
1633
1727
& self ,
1634
1728
sql : & str ,
@@ -1787,9 +1881,15 @@ impl SessionState {
1787
1881
query. statement_to_plan ( statement)
1788
1882
}
1789
1883
1790
- /// Creates a [`LogicalPlan`] from the provided SQL string
1884
+ /// Creates a [`LogicalPlan`] from the provided SQL string. This
1885
+ /// interface will plan any SQL DataFusion supports, including DML
1886
+ /// like `CREATE TABLE`, and `COPY` (which can write to local
1887
+ /// files.
1791
1888
///
1792
- /// See [`SessionContext::sql`] for a higher-level interface that also handles DDL
1889
+ /// See [`SessionContext::sql`] and
1890
+ /// [`SessionContext::sql_with_options`] for a higher-level
1891
+ /// interface that handles DDL and verification of allowed
1892
+ /// statements.
1793
1893
pub async fn create_logical_plan ( & self , sql : & str ) -> Result < LogicalPlan > {
1794
1894
let dialect = self . config . options ( ) . sql_parser . dialect . as_str ( ) ;
1795
1895
let statement = self . sql_to_statement ( sql, dialect) ?;
@@ -1870,7 +1970,11 @@ impl SessionState {
1870
1970
1871
1971
/// Creates a physical plan from a logical plan.
1872
1972
///
1873
- /// Note: this first calls [`Self::optimize`] on the provided plan
1973
+ /// Note: this first calls [`Self::optimize`] on the provided
1974
+ /// plan.
1975
+ ///
1976
+ /// This function will error for [`LogicalPlan`]s such as catalog
1977
+ /// DDL `CREATE TABLE` must be handled by another layer.
1874
1978
pub async fn create_physical_plan (
1875
1979
& self ,
1876
1980
logical_plan : & LogicalPlan ,
@@ -2095,6 +2199,92 @@ impl SerializerRegistry for EmptySerializerRegistry {
2095
2199
}
2096
2200
}
2097
2201
2202
+ /// Describes which SQL statements can be run.
2203
+ ///
2204
+ /// See [`SessionContext::sql_with_options`] for more details.
2205
+ #[ derive( Clone , Debug , Copy ) ]
2206
+ pub struct SQLOptions {
2207
+ /// See [`Self::with_allow_ddl`]
2208
+ allow_ddl : bool ,
2209
+ /// See [`Self::with_allow_dml`]
2210
+ allow_dml : bool ,
2211
+ /// See [`Self::with_allow_statements`]
2212
+ allow_statements : bool ,
2213
+ }
2214
+
2215
+ impl Default for SQLOptions {
2216
+ fn default ( ) -> Self {
2217
+ Self {
2218
+ allow_ddl : true ,
2219
+ allow_dml : true ,
2220
+ allow_statements : true ,
2221
+ }
2222
+ }
2223
+ }
2224
+
2225
+ impl SQLOptions {
2226
+ /// Create a new `SQLOptions` with default values
2227
+ pub fn new ( ) -> Self {
2228
+ Default :: default ( )
2229
+ }
2230
+
2231
+ /// Should DML data modification commands (e.g. `INSERT and COPY`) be run? Defaults to `true`.
2232
+ pub fn with_allow_ddl ( mut self , allow : bool ) -> Self {
2233
+ self . allow_ddl = allow;
2234
+ self
2235
+ }
2236
+
2237
+ /// Should DML data modification commands (e.g. `INSERT and COPY`) be run? Defaults to `true`
2238
+ pub fn with_allow_dml ( mut self , allow : bool ) -> Self {
2239
+ self . allow_dml = allow;
2240
+ self
2241
+ }
2242
+
2243
+ /// Should Statements such as (e.g. `SET VARIABLE and `BEGIN TRANSACTION` ...`) be run?. Defaults to `true`
2244
+ pub fn with_allow_statements ( mut self , allow : bool ) -> Self {
2245
+ self . allow_statements = allow;
2246
+ self
2247
+ }
2248
+
2249
+ /// Return an error if the [`LogicalPlan`] has any nodes that are
2250
+ /// incompatible with this [`SQLOptions`].
2251
+ pub fn verify_plan ( & self , plan : & LogicalPlan ) -> Result < ( ) > {
2252
+ plan. visit ( & mut BadPlanVisitor :: new ( self ) ) ?;
2253
+ Ok ( ( ) )
2254
+ }
2255
+ }
2256
+
2257
+ struct BadPlanVisitor < ' a > {
2258
+ options : & ' a SQLOptions ,
2259
+ }
2260
+ impl < ' a > BadPlanVisitor < ' a > {
2261
+ fn new ( options : & ' a SQLOptions ) -> Self {
2262
+ Self { options }
2263
+ }
2264
+ }
2265
+
2266
+ impl < ' a > TreeNodeVisitor for BadPlanVisitor < ' a > {
2267
+ type N = LogicalPlan ;
2268
+
2269
+ fn pre_visit ( & mut self , node : & Self :: N ) -> Result < VisitRecursion > {
2270
+ match node {
2271
+ LogicalPlan :: Ddl ( ddl) if !self . options . allow_ddl => {
2272
+ plan_err ! ( "DDL not supported: {}" , ddl. name( ) )
2273
+ }
2274
+ LogicalPlan :: Dml ( dml) if !self . options . allow_dml => {
2275
+ plan_err ! ( "DML not supported: {}" , dml. op)
2276
+ }
2277
+ LogicalPlan :: Copy ( _) if !self . options . allow_dml => {
2278
+ plan_err ! ( "DML not supported: COPY" )
2279
+ }
2280
+ LogicalPlan :: Statement ( stmt) if !self . options . allow_statements => {
2281
+ plan_err ! ( "Statement not supported: {}" , stmt. name( ) )
2282
+ }
2283
+ _ => Ok ( VisitRecursion :: Continue ) ,
2284
+ }
2285
+ }
2286
+ }
2287
+
2098
2288
#[ cfg( test) ]
2099
2289
mod tests {
2100
2290
use super :: * ;
@@ -2646,43 +2836,6 @@ mod tests {
2646
2836
Ok ( ( ) )
2647
2837
}
2648
2838
2649
- #[ tokio:: test]
2650
- async fn unsupported_sql_returns_error ( ) -> Result < ( ) > {
2651
- let ctx = SessionContext :: new ( ) ;
2652
- ctx. register_table ( "test" , test:: table_with_sequence ( 1 , 1 ) . unwrap ( ) )
2653
- . unwrap ( ) ;
2654
- let state = ctx. state ( ) ;
2655
-
2656
- // create view
2657
- let sql = "create view test_view as select * from test" ;
2658
- let plan = state. create_logical_plan ( sql) . await ;
2659
- let physical_plan = state. create_physical_plan ( & plan. unwrap ( ) ) . await ;
2660
- assert ! ( physical_plan. is_err( ) ) ;
2661
- assert_eq ! (
2662
- format!( "{}" , physical_plan. unwrap_err( ) ) ,
2663
- "This feature is not implemented: Unsupported logical plan: CreateView"
2664
- ) ;
2665
- // // drop view
2666
- let sql = "drop view test_view" ;
2667
- let plan = state. create_logical_plan ( sql) . await ;
2668
- let physical_plan = state. create_physical_plan ( & plan. unwrap ( ) ) . await ;
2669
- assert ! ( physical_plan. is_err( ) ) ;
2670
- assert_eq ! (
2671
- format!( "{}" , physical_plan. unwrap_err( ) ) ,
2672
- "This feature is not implemented: Unsupported logical plan: DropView"
2673
- ) ;
2674
- // // drop table
2675
- let sql = "drop table test" ;
2676
- let plan = state. create_logical_plan ( sql) . await ;
2677
- let physical_plan = state. create_physical_plan ( & plan. unwrap ( ) ) . await ;
2678
- assert ! ( physical_plan. is_err( ) ) ;
2679
- assert_eq ! (
2680
- format!( "{}" , physical_plan. unwrap_err( ) ) ,
2681
- "This feature is not implemented: Unsupported logical plan: DropTable"
2682
- ) ;
2683
- Ok ( ( ) )
2684
- }
2685
-
2686
2839
struct MyPhysicalPlanner { }
2687
2840
2688
2841
#[ async_trait]
0 commit comments