Skip to content

Commit 9311f38

Browse files
author
Brent Gardner
committed
Can compile and run test
Failing on scheduler due to no factories Tests pass Back to "no object store available for delta-rs://home-bgardner-workspace" Switch back to git refs CI fixes Add roundtrip test Passing deltalake test Passing serde test Remove unrelated refactor Formatting Fix typo that was hard to debug CI fixes delta & ballista tests pass
1 parent 6bcee13 commit 9311f38

File tree

8 files changed

+331
-60
lines changed

8 files changed

+331
-60
lines changed

datafusion/core/src/datasource/datasource.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::error::Result;
2828
use crate::execution::context::SessionState;
2929
use crate::logical_expr::Expr;
3030
use crate::physical_plan::ExecutionPlan;
31+
use crate::prelude::SessionContext;
3132

3233
/// Source table
3334
#[async_trait]
@@ -80,5 +81,13 @@ pub trait TableProvider: Sync + Send {
8081
#[async_trait]
8182
pub trait TableProviderFactory: Sync + Send {
8283
/// Create a TableProvider given name and url
83-
async fn create(&self, name: &str, url: &str) -> Result<Arc<dyn TableProvider>>;
84+
async fn create(&self, url: &str) -> Result<Arc<dyn TableProvider>>;
85+
86+
/// Create a TableProvider during execution with schema already known from planning
87+
fn with_schema(
88+
&self,
89+
ctx: &SessionContext,
90+
schema: SchemaRef,
91+
url: &str,
92+
) -> Result<Arc<dyn TableProvider>>;
8493
}

datafusion/core/src/execution/context.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -418,19 +418,18 @@ impl SessionContext {
418418
cmd: &CreateExternalTable,
419419
) -> Result<Arc<DataFrame>> {
420420
let state = self.state.read().clone();
421+
let file_type = cmd.file_type.to_lowercase();
421422
let factory = &state
422423
.runtime_env
423424
.table_factories
424-
.get(&cmd.file_type)
425+
.get(file_type.as_str())
425426
.ok_or_else(|| {
426427
DataFusionError::Execution(format!(
427428
"Unable to find factory for {}",
428429
cmd.file_type
429430
))
430431
})?;
431-
let table = (*factory)
432-
.create(cmd.name.as_str(), cmd.location.as_str())
433-
.await?;
432+
let table = (*factory).create(cmd.location.as_str()).await?;
434433
self.register_table(cmd.name.as_str(), table)?;
435434
let plan = LogicalPlanBuilder::empty(false).build()?;
436435
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))

datafusion/core/src/test_util.rs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,20 @@
1717

1818
//! Utility functions to make testing DataFusion based crates easier
1919
20+
use std::any::Any;
2021
use std::collections::BTreeMap;
2122
use std::{env, error::Error, path::PathBuf, sync::Arc};
2223

23-
use crate::datasource::{empty::EmptyTable, provider_as_source};
24+
use crate::datasource::datasource::TableProviderFactory;
25+
use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider};
26+
use crate::execution::context::SessionState;
2427
use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
28+
use crate::physical_plan::ExecutionPlan;
29+
use crate::prelude::SessionContext;
2530
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
31+
use async_trait::async_trait;
2632
use datafusion_common::DataFusionError;
33+
use datafusion_expr::{Expr, TableType};
2734

2835
/// Compares formatted output of a record batch with an expected
2936
/// vector of strings, with the result of pretty formatting record
@@ -317,6 +324,69 @@ pub fn aggr_test_schema_with_missing_col() -> SchemaRef {
317324
Arc::new(schema)
318325
}
319326

327+
/// TableFactory for tests
328+
pub struct TestTableFactory {}
329+
330+
#[async_trait]
331+
impl TableProviderFactory for TestTableFactory {
332+
async fn create(
333+
&self,
334+
url: &str,
335+
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
336+
Ok(Arc::new(TestTableProvider {
337+
url: url.to_string(),
338+
}))
339+
}
340+
341+
fn with_schema(
342+
&self,
343+
_ctx: &SessionContext,
344+
_schema: SchemaRef,
345+
url: &str,
346+
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
347+
Ok(Arc::new(TestTableProvider {
348+
url: url.to_string(),
349+
}))
350+
}
351+
}
352+
353+
/// TableProvider for testing purposes
354+
pub struct TestTableProvider {
355+
/// URL of table files or folder
356+
pub url: String,
357+
}
358+
359+
impl TestTableProvider {}
360+
361+
#[async_trait]
362+
impl TableProvider for TestTableProvider {
363+
fn as_any(&self) -> &dyn Any {
364+
self
365+
}
366+
367+
fn schema(&self) -> SchemaRef {
368+
let schema = Schema::new(vec![
369+
Field::new("a", DataType::Int64, true),
370+
Field::new("b", DataType::Decimal128(15, 2), true),
371+
]);
372+
Arc::new(schema)
373+
}
374+
375+
fn table_type(&self) -> TableType {
376+
unimplemented!("TestTableProvider is a stub for testing.")
377+
}
378+
379+
async fn scan(
380+
&self,
381+
_ctx: &SessionState,
382+
_projection: &Option<Vec<usize>>,
383+
_filters: &[Expr],
384+
_limit: Option<usize>,
385+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
386+
unimplemented!("TestTableProvider is a stub for testing.")
387+
}
388+
}
389+
320390
#[cfg(test)]
321391
mod tests {
322392
use super::*;

datafusion/core/tests/sql/create_drop.rs

Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use async_trait::async_trait;
19-
use std::any::Any;
2018
use std::collections::HashMap;
2119
use std::io::Write;
2220

2321
use datafusion::datasource::datasource::TableProviderFactory;
24-
use datafusion::execution::context::SessionState;
2522
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
26-
use datafusion_expr::TableType;
23+
use datafusion::test_util::TestTableFactory;
2724
use tempfile::TempDir;
2825

2926
use super::*;
@@ -369,49 +366,11 @@ async fn create_pipe_delimited_csv_table() -> Result<()> {
369366
Ok(())
370367
}
371368

372-
struct TestTableProvider {}
373-
374-
impl TestTableProvider {}
375-
376-
#[async_trait]
377-
impl TableProvider for TestTableProvider {
378-
fn as_any(&self) -> &dyn Any {
379-
unimplemented!("TestTableProvider is a stub for testing.")
380-
}
381-
382-
fn schema(&self) -> SchemaRef {
383-
unimplemented!("TestTableProvider is a stub for testing.")
384-
}
385-
386-
fn table_type(&self) -> TableType {
387-
unimplemented!("TestTableProvider is a stub for testing.")
388-
}
389-
390-
async fn scan(
391-
&self,
392-
_ctx: &SessionState,
393-
_projection: &Option<Vec<usize>>,
394-
_filters: &[Expr],
395-
_limit: Option<usize>,
396-
) -> Result<Arc<dyn ExecutionPlan>> {
397-
unimplemented!("TestTableProvider is a stub for testing.")
398-
}
399-
}
400-
401-
struct TestTableFactory {}
402-
403-
#[async_trait]
404-
impl TableProviderFactory for TestTableFactory {
405-
async fn create(&self, _name: &str, _url: &str) -> Result<Arc<dyn TableProvider>> {
406-
Ok(Arc::new(TestTableProvider {}))
407-
}
408-
}
409-
410369
#[tokio::test]
411370
async fn create_custom_table() -> Result<()> {
412371
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
413372
HashMap::new();
414-
table_factories.insert("DELTATABLE".to_string(), Arc::new(TestTableFactory {}));
373+
table_factories.insert("deltatable".to_string(), Arc::new(TestTableFactory {}));
415374
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
416375
let env = RuntimeEnv::new(cfg).unwrap();
417376
let ses = SessionConfig::new();

datafusion/proto/proto/datafusion.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ message LogicalPlanNode {
7070
CreateViewNode create_view = 22;
7171
DistinctNode distinct = 23;
7272
ViewTableScanNode view_scan = 24;
73+
CustomTableScanNode custom_scan = 25;
7374
}
7475
}
7576

@@ -118,6 +119,15 @@ message ViewTableScanNode {
118119
string definition = 5;
119120
}
120121

122+
// Logical Plan to Scan a CustomTableProvider registered at runtime
123+
message CustomTableScanNode {
124+
string table_name = 1;
125+
ProjectionColumns projection = 2;
126+
datafusion.Schema schema = 3;
127+
repeated datafusion.LogicalExprNode filters = 4;
128+
bytes custom_table_data = 5;
129+
}
130+
121131
message ProjectionNode {
122132
LogicalPlanNode input = 1;
123133
repeated datafusion.LogicalExprNode expr = 2;

datafusion/proto/src/bytes/mod.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
//! Serialization / Deserialization to Bytes
1919
use crate::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
2020
use crate::{from_proto::parse_expr, protobuf};
21+
use arrow::datatypes::SchemaRef;
22+
use datafusion::datasource::TableProvider;
2123
use datafusion_common::{DataFusionError, Result};
2224
use datafusion_expr::{Expr, Extension, LogicalPlan};
2325
use prost::{
2426
bytes::{Bytes, BytesMut},
2527
Message,
2628
};
29+
use std::sync::Arc;
2730

2831
// Reexport Bytes which appears in the API
2932
use datafusion::execution::registry::FunctionRegistry;
@@ -180,6 +183,27 @@ impl LogicalExtensionCodec for DefaultExtensionCodec {
180183
"No extension codec provided".to_string(),
181184
))
182185
}
186+
187+
fn try_decode_table_provider(
188+
&self,
189+
_buf: &[u8],
190+
_schema: SchemaRef,
191+
_ctx: &SessionContext,
192+
) -> std::result::Result<Arc<dyn TableProvider>, DataFusionError> {
193+
Err(DataFusionError::NotImplemented(
194+
"No extension codec provided".to_string(),
195+
))
196+
}
197+
198+
fn try_encode_table_provider(
199+
&self,
200+
_node: Arc<dyn TableProvider>,
201+
_buf: &mut Vec<u8>,
202+
) -> std::result::Result<(), DataFusionError> {
203+
Err(DataFusionError::NotImplemented(
204+
"No extension codec provided".to_string(),
205+
))
206+
}
183207
}
184208

185209
#[cfg(test)]

0 commit comments

Comments
 (0)