diff --git a/datafusion/__init__.py b/datafusion/__init__.py index df53b396a..6c6c270a0 100644 --- a/datafusion/__init__.py +++ b/datafusion/__init__.py @@ -33,6 +33,7 @@ SessionConfig, RuntimeConfig, ScalarUDF, + SQLOptions, ) from .common import ( @@ -96,6 +97,7 @@ "DataFrame", "SessionContext", "SessionConfig", + "SQLOptions", "RuntimeConfig", "Expr", "AggregateUDF", diff --git a/datafusion/context.py b/datafusion/context.py index 30602402c..c5c6d7965 100644 --- a/datafusion/context.py +++ b/datafusion/context.py @@ -18,6 +18,7 @@ from abc import ABC, abstractmethod from typing import Any, Dict, List +from datafusion import SQLOptions from datafusion.common import SqlSchema, SqlTable @@ -140,3 +141,7 @@ def explain(self, sql): @abstractmethod def sql(self, sql): pass + + @abstractmethod + def sql_with_options(self, sql, options: SQLOptions): + pass diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py index 97bff9bb9..ce9b502ce 100644 --- a/datafusion/tests/test_context.py +++ b/datafusion/tests/test_context.py @@ -27,6 +27,7 @@ SessionConfig, RuntimeConfig, DataFrame, + SQLOptions, ) import pytest @@ -395,3 +396,28 @@ def test_read_parquet(ctx): def test_read_avro(ctx): csv_df = ctx.read_avro(path="testing/data/avro/alltypes_plain.avro") csv_df.show() + + +def test_create_sql_options(): + SQLOptions() + + +def test_sql_with_options_no_ddl(ctx): + options = SQLOptions().with_allow_ddl(False) + sql = "CREATE TABLE IF NOT EXISTS valuetable AS VALUES(1,'HELLO'),(12,'DATAFUSION')" + with pytest.raises(Exception, match="DDL"): + ctx.sql_with_options(sql, options=options) + + +def test_sql_with_options_no_dml(ctx): + table_name = "t" + options = SQLOptions().with_allow_dml(False) + batch = pa.RecordBatch.from_arrays( + [pa.array([1, 2, 3]), pa.array([4, 5, 6])], + names=["a", "b"], + ) + dataset = ds.dataset([batch]) + ctx.register_dataset(table_name, dataset) + sql = f'INSERT INTO "{table_name}" VALUES (1, 2), (2, 3);' + with pytest.raises(Exception, match="DML"): + ctx.sql_with_options(sql, options=options) diff --git a/src/context.rs b/src/context.rs index f34fbce81..825d3e321 100644 --- a/src/context.rs +++ b/src/context.rs @@ -45,7 +45,9 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::MemTable; use datafusion::datasource::TableProvider; -use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext}; +use datafusion::execution::context::{ + SQLOptions, SessionConfig, SessionContext, SessionState, TaskContext, +}; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, UnboundedMemoryPool}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; @@ -210,6 +212,43 @@ impl PyRuntimeConfig { } } +/// `PySQLOptions` allows you to specify options to the sql execution. +#[pyclass(name = "SQLOptions", module = "datafusion", subclass)] +#[derive(Clone)] +pub struct PySQLOptions { + pub options: SQLOptions, +} + +impl From for PySQLOptions { + fn from(options: SQLOptions) -> Self { + Self { options } + } +} + +#[pymethods] +impl PySQLOptions { + #[new] + fn new() -> Self { + let options = SQLOptions::new(); + Self { options } + } + + /// Should DDL data modification commands (e.g. `CREATE TABLE`) be run? Defaults to `true`. + fn with_allow_ddl(&self, allow: bool) -> Self { + Self::from(self.options.with_allow_ddl(allow)) + } + + /// Should DML data modification commands (e.g. `INSERT and COPY`) be run? Defaults to `true` + pub fn with_allow_dml(&self, allow: bool) -> Self { + Self::from(self.options.with_allow_dml(allow)) + } + + /// Should Statements such as (e.g. `SET VARIABLE and `BEGIN TRANSACTION` ...`) be run?. Defaults to `true` + pub fn with_allow_statements(&self, allow: bool) -> Self { + Self::from(self.options.with_allow_statements(allow)) + } +} + /// `PySessionContext` is able to plan and execute DataFusion plans. /// It has a powerful optimizer, a physical planner for local execution, and a /// multi-threaded execution engine to perform the execution. @@ -285,6 +324,22 @@ impl PySessionContext { Ok(PyDataFrame::new(df)) } + pub fn sql_with_options( + &mut self, + query: &str, + options: Option, + py: Python, + ) -> PyResult { + let options = if let Some(options) = options { + options.options + } else { + SQLOptions::new() + }; + let result = self.ctx.sql_with_options(query, options); + let df = wait_for_future(py, result).map_err(DataFusionError::from)?; + Ok(PyDataFrame::new(df)) + } + pub fn create_dataframe( &mut self, partitions: PyArrowType>>, diff --git a/src/lib.rs b/src/lib.rs index 49c325a53..a696ebff4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,6 +85,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?;