Skip to content

Commit bd289a5

Browse files
committed
Encapsulate create table/view construction
1 parent e7d9504 commit bd289a5

File tree

8 files changed

+624
-158
lines changed

8 files changed

+624
-158
lines changed

datafusion/core/src/catalog_common/listing_schema.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ use std::sync::{Arc, Mutex};
2525
use crate::catalog::{SchemaProvider, TableProvider, TableProviderFactory};
2626
use crate::execution::context::SessionState;
2727

28-
use datafusion_common::{
29-
Constraints, DFSchema, DataFusionError, HashMap, TableReference,
30-
};
28+
use datafusion_common::{DFSchema, DataFusionError, HashMap, TableReference};
3129
use datafusion_expr::CreateExternalTable;
3230

3331
use async_trait::async_trait;
@@ -131,21 +129,12 @@ impl ListingSchemaProvider {
131129
.factory
132130
.create(
133131
state,
134-
&CreateExternalTable {
135-
schema: Arc::new(DFSchema::empty()),
136-
name,
137-
location: table_url,
138-
file_type: self.format.clone(),
139-
table_partition_cols: vec![],
140-
if_not_exists: false,
141-
temporary: false,
142-
definition: None,
143-
order_exprs: vec![],
144-
unbounded: false,
145-
options: Default::default(),
146-
constraints: Constraints::empty(),
147-
column_defaults: Default::default(),
148-
},
132+
&CreateExternalTable::builder()
133+
.schema(Arc::new(DFSchema::empty()))
134+
.name(name)
135+
.location(table_url)
136+
.file_type(self.format.clone())
137+
.build()?,
149138
)
150139
.await?;
151140
let _ =

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,10 @@ mod tests {
176176
datasource::file_format::csv::CsvFormat, execution::context::SessionContext,
177177
};
178178

179-
use datafusion_common::{Constraints, DFSchema, TableReference};
179+
use datafusion_common::{DFSchema, TableReference};
180180

181181
#[tokio::test]
182-
async fn test_create_using_non_std_file_ext() {
182+
async fn test_create_using_non_std_file_ext() -> Result<()> {
183183
let csv_file = tempfile::Builder::new()
184184
.prefix("foo")
185185
.suffix(".tbl")
@@ -190,32 +190,25 @@ mod tests {
190190
let context = SessionContext::new();
191191
let state = context.state();
192192
let name = TableReference::bare("foo");
193-
let cmd = CreateExternalTable {
194-
name,
195-
location: csv_file.path().to_str().unwrap().to_string(),
196-
file_type: "csv".to_string(),
197-
schema: Arc::new(DFSchema::empty()),
198-
table_partition_cols: vec![],
199-
if_not_exists: false,
200-
temporary: false,
201-
definition: None,
202-
order_exprs: vec![],
203-
unbounded: false,
204-
options: HashMap::from([("format.has_header".into(), "true".into())]),
205-
constraints: Constraints::empty(),
206-
column_defaults: HashMap::new(),
207-
};
193+
let cmd = CreateExternalTable::builder()
194+
.name(name)
195+
.location(csv_file.path().to_str().unwrap().to_string())
196+
.file_type("csv".to_string())
197+
.schema(Arc::new(DFSchema::empty()))
198+
.options(HashMap::from([("format.has_header".into(), "true".into())]))
199+
.build()?;
208200
let table_provider = factory.create(&state, &cmd).await.unwrap();
209201
let listing_table = table_provider
210202
.as_any()
211203
.downcast_ref::<ListingTable>()
212204
.unwrap();
213205
let listing_options = listing_table.options();
214206
assert_eq!(".tbl", listing_options.file_extension);
207+
Ok(())
215208
}
216209

217210
#[tokio::test]
218-
async fn test_create_using_non_std_file_ext_csv_options() {
211+
async fn test_create_using_non_std_file_ext_csv_options() -> Result<()> {
219212
let csv_file = tempfile::Builder::new()
220213
.prefix("foo")
221214
.suffix(".tbl")
@@ -230,21 +223,13 @@ mod tests {
230223
let mut options = HashMap::new();
231224
options.insert("format.schema_infer_max_rec".to_owned(), "1000".to_owned());
232225
options.insert("format.has_header".into(), "true".into());
233-
let cmd = CreateExternalTable {
234-
name,
235-
location: csv_file.path().to_str().unwrap().to_string(),
236-
file_type: "csv".to_string(),
237-
schema: Arc::new(DFSchema::empty()),
238-
table_partition_cols: vec![],
239-
if_not_exists: false,
240-
temporary: false,
241-
definition: None,
242-
order_exprs: vec![],
243-
unbounded: false,
244-
options,
245-
constraints: Constraints::empty(),
246-
column_defaults: HashMap::new(),
247-
};
226+
let cmd = CreateExternalTable::builder()
227+
.name(name)
228+
.location(csv_file.path().to_str().unwrap().to_string())
229+
.file_type("csv".to_string())
230+
.schema(Arc::new(DFSchema::empty()))
231+
.options(options)
232+
.build()?;
248233
let table_provider = factory.create(&state, &cmd).await.unwrap();
249234
let listing_table = table_provider
250235
.as_any()
@@ -257,5 +242,6 @@ mod tests {
257242
assert_eq!(csv_options.schema_infer_max_rec, Some(1000));
258243
let listing_options = listing_table.options();
259244
assert_eq!(".tbl", listing_options.file_extension);
245+
Ok(())
260246
}
261247
}

datafusion/core/src/execution/context/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ use crate::{
4141
logical_expr::ScalarUDF,
4242
logical_expr::{
4343
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
44-
CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable,
45-
DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, SetVariable,
46-
TableType, UNNAMED_TABLE,
44+
CreateMemoryTable, CreateMemoryTableFields, CreateView, CreateViewFields,
45+
DropCatalogSchema, DropFunction, DropTable, DropView, Execute, LogicalPlan,
46+
LogicalPlanBuilder, Prepare, SetVariable, TableType, UNNAMED_TABLE,
4747
},
4848
physical_expr::PhysicalExpr,
4949
physical_plan::ExecutionPlan,
@@ -792,15 +792,15 @@ impl SessionContext {
792792
}
793793

794794
async fn create_memory_table(&self, cmd: CreateMemoryTable) -> Result<DataFrame> {
795-
let CreateMemoryTable {
795+
let CreateMemoryTableFields {
796796
name,
797797
input,
798798
if_not_exists,
799799
or_replace,
800800
constraints,
801801
column_defaults,
802802
temporary,
803-
} = cmd;
803+
} = cmd.into_fields();
804804

805805
let input = Arc::unwrap_or_clone(input);
806806
let input = self.state().optimize(&input)?;
@@ -852,13 +852,13 @@ impl SessionContext {
852852
}
853853

854854
async fn create_view(&self, cmd: CreateView) -> Result<DataFrame> {
855-
let CreateView {
855+
let CreateViewFields {
856856
name,
857857
input,
858858
or_replace,
859859
definition,
860860
temporary,
861-
} = cmd;
861+
} = cmd.into_fields();
862862

863863
let view = self.table(name.clone()).await;
864864

0 commit comments

Comments
 (0)