Skip to content

Commit 020b8fc

Browse files
Implement StreamTable and StreamTableProvider (#7994) (#8021)
* Implement FIFO using extension points (#7994) * Clippy * Rename to StreamTable and make public * Add StreamEncoding * Rework sort order * Fix logical conflicts * Format * Add DefaultTableProvider * Fix doc * Fix project sort keys and CSV headers * Respect batch size on read * Tests are updated * Resolving clippy --------- Co-authored-by: metesynnada <[email protected]>
1 parent 77a6326 commit 020b8fc

File tree

11 files changed

+553
-190
lines changed

11 files changed

+553
-190
lines changed

datafusion/core/src/datasource/listing/table.rs

+5-32
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use super::PartitionedFile;
2626
#[cfg(feature = "parquet")]
2727
use crate::datasource::file_format::parquet::ParquetFormat;
2828
use crate::datasource::{
29+
create_ordering,
2930
file_format::{
3031
arrow::ArrowFormat,
3132
avro::AvroFormat,
@@ -40,15 +41,13 @@ use crate::datasource::{
4041
TableProvider, TableType,
4142
};
4243
use crate::logical_expr::TableProviderFilterPushDown;
43-
use crate::physical_plan;
4444
use crate::{
4545
error::{DataFusionError, Result},
4646
execution::context::SessionState,
4747
logical_expr::Expr,
4848
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
4949
};
5050

51-
use arrow::compute::SortOptions;
5251
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
5352
use arrow_schema::Schema;
5453
use datafusion_common::{
@@ -57,10 +56,9 @@ use datafusion_common::{
5756
};
5857
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
5958
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
60-
use datafusion_expr::expr::Sort;
6159
use datafusion_optimizer::utils::conjunction;
6260
use datafusion_physical_expr::{
63-
create_physical_expr, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
61+
create_physical_expr, LexOrdering, PhysicalSortRequirement,
6462
};
6563

6664
use async_trait::async_trait;
@@ -677,34 +675,7 @@ impl ListingTable {
677675

678676
/// If file_sort_order is specified, creates the appropriate physical expressions
679677
fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
680-
let mut all_sort_orders = vec![];
681-
682-
for exprs in &self.options.file_sort_order {
683-
// Construct PhsyicalSortExpr objects from Expr objects:
684-
let sort_exprs = exprs
685-
.iter()
686-
.map(|expr| {
687-
if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr {
688-
if let Expr::Column(col) = expr.as_ref() {
689-
let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?;
690-
Ok(PhysicalSortExpr {
691-
expr,
692-
options: SortOptions {
693-
descending: !asc,
694-
nulls_first: *nulls_first,
695-
},
696-
})
697-
} else {
698-
plan_err!("Expected single column references in output_ordering, got {expr}")
699-
}
700-
} else {
701-
plan_err!("Expected Expr::Sort in output_ordering, but got {expr}")
702-
}
703-
})
704-
.collect::<Result<Vec<_>>>()?;
705-
all_sort_orders.push(sort_exprs);
706-
}
707-
Ok(all_sort_orders)
678+
create_ordering(&self.table_schema, &self.options.file_sort_order)
708679
}
709680
}
710681

@@ -1040,9 +1011,11 @@ mod tests {
10401011

10411012
use arrow::datatypes::{DataType, Schema};
10421013
use arrow::record_batch::RecordBatch;
1014+
use arrow_schema::SortOptions;
10431015
use datafusion_common::stats::Precision;
10441016
use datafusion_common::{assert_contains, GetExt, ScalarValue};
10451017
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
1018+
use datafusion_physical_expr::PhysicalSortExpr;
10461019
use rstest::*;
10471020
use tempfile::TempDir;
10481021

datafusion/core/src/datasource/listing_table_factory.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,13 @@ use datafusion_expr::CreateExternalTable;
4444
use async_trait::async_trait;
4545

4646
/// A `TableProviderFactory` capable of creating new `ListingTable`s
47+
#[derive(Debug, Default)]
4748
pub struct ListingTableFactory {}
4849

4950
impl ListingTableFactory {
5051
/// Creates a new `ListingTableFactory`
5152
pub fn new() -> Self {
52-
Self {}
53-
}
54-
}
55-
56-
impl Default for ListingTableFactory {
57-
fn default() -> Self {
58-
Self::new()
53+
Self::default()
5954
}
6055
}
6156

datafusion/core/src/datasource/mod.rs

+44
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub mod memory;
2929
pub mod physical_plan;
3030
pub mod provider;
3131
mod statistics;
32+
pub mod stream;
3233
pub mod streaming;
3334
pub mod view;
3435

@@ -43,3 +44,46 @@ pub use self::provider::TableProvider;
4344
pub use self::view::ViewTable;
4445
pub use crate::logical_expr::TableType;
4546
pub use statistics::get_statistics_with_limit;
47+
48+
use arrow_schema::{Schema, SortOptions};
49+
use datafusion_common::{plan_err, DataFusionError, Result};
50+
use datafusion_expr::Expr;
51+
use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr};
52+
53+
fn create_ordering(
54+
schema: &Schema,
55+
sort_order: &[Vec<Expr>],
56+
) -> Result<Vec<LexOrdering>> {
57+
let mut all_sort_orders = vec![];
58+
59+
for exprs in sort_order {
60+
// Construct PhysicalSortExpr objects from Expr objects:
61+
let mut sort_exprs = vec![];
62+
for expr in exprs {
63+
match expr {
64+
Expr::Sort(sort) => match sort.expr.as_ref() {
65+
Expr::Column(col) => match expressions::col(&col.name, schema) {
66+
Ok(expr) => {
67+
sort_exprs.push(PhysicalSortExpr {
68+
expr,
69+
options: SortOptions {
70+
descending: !sort.asc,
71+
nulls_first: sort.nulls_first,
72+
},
73+
});
74+
}
75+
// Cannot find expression in the projected_schema, stop iterating
76+
// since rest of the orderings are violated
77+
Err(_) => break,
78+
}
79+
expr => return plan_err!("Expected single column references in output_ordering, got {expr}"),
80+
}
81+
expr => return plan_err!("Expected Expr::Sort in output_ordering, but got {expr}"),
82+
}
83+
}
84+
if !sort_exprs.is_empty() {
85+
all_sort_orders.push(sort_exprs);
86+
}
87+
}
88+
Ok(all_sort_orders)
89+
}

datafusion/core/src/datasource/provider.rs

+40
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use datafusion_expr::{CreateExternalTable, LogicalPlan};
2626
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
2727

2828
use crate::arrow::datatypes::SchemaRef;
29+
use crate::datasource::listing_table_factory::ListingTableFactory;
30+
use crate::datasource::stream::StreamTableFactory;
2931
use crate::error::Result;
3032
use crate::execution::context::SessionState;
3133
use crate::logical_expr::Expr;
@@ -214,3 +216,41 @@ pub trait TableProviderFactory: Sync + Send {
214216
cmd: &CreateExternalTable,
215217
) -> Result<Arc<dyn TableProvider>>;
216218
}
219+
220+
/// The default [`TableProviderFactory`]
221+
///
222+
/// If [`CreateExternalTable`] is unbounded calls [`StreamTableFactory::create`],
223+
/// otherwise calls [`ListingTableFactory::create`]
224+
#[derive(Debug, Default)]
225+
pub struct DefaultTableFactory {
226+
stream: StreamTableFactory,
227+
listing: ListingTableFactory,
228+
}
229+
230+
impl DefaultTableFactory {
231+
/// Creates a new [`DefaultTableFactory`]
232+
pub fn new() -> Self {
233+
Self::default()
234+
}
235+
}
236+
237+
#[async_trait]
238+
impl TableProviderFactory for DefaultTableFactory {
239+
async fn create(
240+
&self,
241+
state: &SessionState,
242+
cmd: &CreateExternalTable,
243+
) -> Result<Arc<dyn TableProvider>> {
244+
let mut unbounded = cmd.unbounded;
245+
for (k, v) in &cmd.options {
246+
if k.eq_ignore_ascii_case("unbounded") && v.eq_ignore_ascii_case("true") {
247+
unbounded = true
248+
}
249+
}
250+
251+
match unbounded {
252+
true => self.stream.create(state, cmd).await,
253+
false => self.listing.create(state, cmd).await,
254+
}
255+
}
256+
}

0 commit comments

Comments
 (0)