Skip to content

Commit 14beb79

Browse files
Move DataSink to datasource and add session crate (#15371)
* session * clippy * fmt * session * minor * Update README.md * doc * fmt * doc --------- Co-authored-by: berkaysynnada <[email protected]>
1 parent 923bfb7 commit 14beb79

File tree

38 files changed

+299
-121
lines changed

38 files changed

+299
-121
lines changed

Cargo.lock

+31-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ members = [
4747
"datafusion/proto/gen",
4848
"datafusion/proto-common",
4949
"datafusion/proto-common/gen",
50+
"datafusion/session",
5051
"datafusion/sql",
5152
"datafusion/sqllogictest",
5253
"datafusion/substrait",
@@ -136,6 +137,7 @@ datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", versio
136137
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "46.0.1" }
137138
datafusion-proto = { path = "datafusion/proto", version = "46.0.1" }
138139
datafusion-proto-common = { path = "datafusion/proto-common", version = "46.0.1" }
140+
datafusion-session = { path = "datafusion/session", version = "46.0.1" }
139141
datafusion-sql = { path = "datafusion/sql", version = "46.0.1" }
140142
doc-comment = "0.3"
141143
env_logger = "0.11"

datafusion/catalog-listing/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ datafusion-expr = { workspace = true }
4141
datafusion-physical-expr = { workspace = true }
4242
datafusion-physical-expr-common = { workspace = true }
4343
datafusion-physical-plan = { workspace = true }
44+
datafusion-session = { workspace = true }
4445
futures = { workspace = true }
4546
log = { workspace = true }
4647
object_store = { workspace = true }

datafusion/catalog/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@ async-trait = { workspace = true }
3636
dashmap = { workspace = true }
3737
datafusion-common = { workspace = true }
3838
datafusion-common-runtime = { workspace = true }
39+
datafusion-datasource = { workspace = true }
3940
datafusion-execution = { workspace = true }
4041
datafusion-expr = { workspace = true }
4142
datafusion-physical-expr = { workspace = true }
4243
datafusion-physical-plan = { workspace = true }
44+
datafusion-session = { workspace = true }
4345
datafusion-sql = { workspace = true }
4446
futures = { workspace = true }
4547
itertools = { workspace = true }

datafusion/catalog/src/lib.rs

+18-12
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,32 @@
3131
//! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`]
3232
//! * Listing schema: [`listing_schema`]
3333
34+
pub mod cte_worktable;
35+
pub mod default_table_source;
36+
pub mod information_schema;
37+
pub mod listing_schema;
3438
pub mod memory;
35-
pub use memory::{
36-
MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
37-
};
39+
pub mod stream;
40+
pub mod streaming;
41+
pub mod view;
42+
3843
mod r#async;
3944
mod catalog;
4045
mod dynamic_file;
41-
pub mod information_schema;
42-
pub mod listing_schema;
4346
mod schema;
44-
mod session;
4547
mod table;
48+
4649
pub use catalog::*;
50+
pub use datafusion_session::Session;
4751
pub use dynamic_file::catalog::*;
52+
pub use memory::{
53+
MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
54+
};
4855
pub use r#async::*;
4956
pub use schema::*;
50-
pub use session::*;
5157
pub use table::*;
52-
pub mod cte_worktable;
53-
pub mod default_table_source;
54-
pub mod stream;
55-
pub mod streaming;
56-
pub mod view;
58+
59+
// For backwards compatibility,
60+
mod session {
61+
pub use datafusion_session::Session;
62+
}

datafusion/catalog/src/stream.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ use arrow::array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
3030
use arrow::datatypes::SchemaRef;
3131
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
3232
use datafusion_common_runtime::SpawnedTask;
33+
use datafusion_datasource::sink::{DataSink, DataSinkExec};
3334
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3435
use datafusion_expr::dml::InsertOp;
3536
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
3637
use datafusion_physical_expr::create_ordering;
37-
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
3838
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
3939
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
4040
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};

datafusion/catalog/src/table.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ pub trait TableProvider: Debug + Sync + Send {
288288
/// See [`DataSinkExec`] for the common pattern of inserting a
289289
/// streams of `RecordBatch`es as files to an ObjectStore.
290290
///
291-
/// [`DataSinkExec`]: datafusion_physical_plan::insert::DataSinkExec
291+
/// [`DataSinkExec`]: datafusion_datasource::sink::DataSinkExec
292292
async fn insert_into(
293293
&self,
294294
_state: &dyn Session,

datafusion/core/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ datafusion-physical-expr = { workspace = true }
123123
datafusion-physical-expr-common = { workspace = true }
124124
datafusion-physical-optimizer = { workspace = true }
125125
datafusion-physical-plan = { workspace = true }
126+
datafusion-session = { workspace = true }
126127
datafusion-sql = { workspace = true }
127128
flate2 = { version = "1.1.0", optional = true }
128129
futures = { workspace = true }

datafusion/core/src/datasource/dynamic_file.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,17 @@
2020
2121
use std::sync::Arc;
2222

23-
use async_trait::async_trait;
24-
use datafusion_catalog::{SessionStore, UrlTableFactory};
25-
use datafusion_common::plan_datafusion_err;
26-
2723
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
2824
use crate::datasource::TableProvider;
2925
use crate::error::Result;
3026
use crate::execution::context::SessionState;
3127

28+
use datafusion_catalog::UrlTableFactory;
29+
use datafusion_common::plan_datafusion_err;
30+
use datafusion_session::SessionStore;
31+
32+
use async_trait::async_trait;
33+
3234
/// [DynamicListTableFactory] is a factory that can create a [ListingTable] from the given url.
3335
#[derive(Default, Debug)]
3436
pub struct DynamicListTableFactory {

datafusion/core/src/datasource/file_format/arrow.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ use datafusion_common_runtime::{JoinSet, SpawnedTask};
5050
use datafusion_datasource::display::FileGroupDisplay;
5151
use datafusion_datasource::file::FileSource;
5252
use datafusion_datasource::file_scan_config::FileScanConfig;
53+
use datafusion_datasource::sink::{DataSink, DataSinkExec};
5354
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5455
use datafusion_expr::dml::InsertOp;
5556
use datafusion_physical_expr::PhysicalExpr;
5657
use datafusion_physical_expr_common::sort_expr::LexRequirement;
57-
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
5858

5959
use async_trait::async_trait;
6060
use bytes::Bytes;

datafusion/core/src/datasource/memory.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use std::sync::Arc;
2525
use crate::datasource::{TableProvider, TableType};
2626
use crate::error::Result;
2727
use crate::logical_expr::Expr;
28-
use crate::physical_plan::insert::{DataSink, DataSinkExec};
2928
use crate::physical_plan::repartition::RepartitionExec;
3029
use crate::physical_plan::{
3130
common, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
@@ -39,6 +38,7 @@ use datafusion_catalog::Session;
3938
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
4039
use datafusion_common_runtime::JoinSet;
4140
pub use datafusion_datasource::memory::MemorySourceConfig;
41+
use datafusion_datasource::sink::{DataSink, DataSinkExec};
4242
pub use datafusion_datasource::source::DataSourceExec;
4343
use datafusion_execution::TaskContext;
4444
use datafusion_expr::dml::InsertOp;

datafusion/core/src/datasource/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub use datafusion_catalog::default_table_source;
4343
pub use datafusion_catalog::stream;
4444
pub use datafusion_catalog::view;
4545
pub use datafusion_datasource::schema_adapter;
46+
pub use datafusion_datasource::sink;
4647
pub use datafusion_datasource::source;
4748
pub use datafusion_execution::object_store;
4849
pub use datafusion_physical_expr::create_ordering;

datafusion/core/src/execution/context/mod.rs

+18-17
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717

1818
//! [`SessionContext`] API for registering data sources and executing queries
1919
20-
use datafusion_catalog::memory::MemorySchemaProvider;
21-
use datafusion_catalog::MemoryCatalogProvider;
2220
use std::collections::HashSet;
2321
use std::fmt::Debug;
2422
use std::sync::{Arc, Weak};
2523

2624
use super::options::ReadOptions;
25+
use crate::datasource::dynamic_file::DynamicListTableFactory;
26+
use crate::execution::session_state::SessionStateBuilder;
2727
use crate::{
2828
catalog::listing_schema::ListingSchemaProvider,
2929
catalog::{
@@ -49,39 +49,40 @@ use crate::{
4949
variable::{VarProvider, VarType},
5050
};
5151

52+
// backwards compatibility
53+
pub use crate::execution::session_state::SessionState;
54+
5255
use arrow::datatypes::{Schema, SchemaRef};
5356
use arrow::record_batch::RecordBatch;
57+
use datafusion_catalog::memory::MemorySchemaProvider;
58+
use datafusion_catalog::MemoryCatalogProvider;
59+
use datafusion_catalog::{
60+
DynamicFileCatalog, TableFunction, TableFunctionImpl, UrlTableFactory,
61+
};
62+
use datafusion_common::config::ConfigOptions;
5463
use datafusion_common::{
5564
config::{ConfigExtension, TableOptions},
5665
exec_datafusion_err, exec_err, not_impl_err, plan_datafusion_err, plan_err,
5766
tree_node::{TreeNodeRecursion, TreeNodeVisitor},
5867
DFSchema, ParamValues, ScalarValue, SchemaReference, TableReference,
5968
};
69+
pub use datafusion_execution::config::SessionConfig;
6070
use datafusion_execution::registry::SerializerRegistry;
71+
pub use datafusion_execution::TaskContext;
72+
pub use datafusion_expr::execution_props::ExecutionProps;
6173
use datafusion_expr::{
6274
expr_rewriter::FunctionRewrite,
6375
logical_plan::{DdlStatement, Statement},
6476
planner::ExprPlanner,
6577
Expr, UserDefinedLogicalNode, WindowUDF,
6678
};
67-
68-
// backwards compatibility
69-
pub use crate::execution::session_state::SessionState;
70-
71-
use crate::datasource::dynamic_file::DynamicListTableFactory;
72-
use crate::execution::session_state::SessionStateBuilder;
73-
use async_trait::async_trait;
74-
use chrono::{DateTime, Utc};
75-
use datafusion_catalog::{
76-
DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl, UrlTableFactory,
77-
};
78-
use datafusion_common::config::ConfigOptions;
79-
pub use datafusion_execution::config::SessionConfig;
80-
pub use datafusion_execution::TaskContext;
81-
pub use datafusion_expr::execution_props::ExecutionProps;
8279
use datafusion_optimizer::analyzer::type_coercion::TypeCoercion;
8380
use datafusion_optimizer::Analyzer;
8481
use datafusion_optimizer::{AnalyzerRule, OptimizerRule};
82+
use datafusion_session::SessionStore;
83+
84+
use async_trait::async_trait;
85+
use chrono::{DateTime, Utc};
8586
use object_store::ObjectStore;
8687
use parking_lot::RwLock;
8788
use url::Url;

datafusion/core/src/execution/session_state.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
3333
use datafusion_catalog::information_schema::{
3434
InformationSchemaProvider, INFORMATION_SCHEMA,
3535
};
36-
use datafusion_catalog::MemoryCatalogProviderList;
3736

3837
use arrow::datatypes::{DataType, SchemaRef};
39-
use datafusion_catalog::{Session, TableFunction, TableFunctionImpl};
38+
use datafusion_catalog::MemoryCatalogProviderList;
39+
use datafusion_catalog::{TableFunction, TableFunctionImpl};
4040
use datafusion_common::alias::AliasGenerator;
4141
use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
4242
use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
@@ -68,6 +68,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
6868
use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
6969
use datafusion_physical_optimizer::PhysicalOptimizerRule;
7070
use datafusion_physical_plan::ExecutionPlan;
71+
use datafusion_session::Session;
7172
use datafusion_sql::parser::{DFParserBuilder, Statement};
7273
use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel};
7374

datafusion/core/src/lib.rs

+5
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,8 @@
649649
//!
650650
//! * [datafusion_common]: Common traits and types
651651
//! * [datafusion_catalog]: Catalog APIs such as [`SchemaProvider`] and [`CatalogProvider`]
652+
//! * [datafusion_datasource]: File and Data IO such as [`FileSource`] and [`DataSink`]
653+
//! * [datafusion_session]: [`Session`] and related structures
652654
//! * [datafusion_execution]: State and structures needed for execution
653655
//! * [datafusion_expr]: [`LogicalPlan`], [`Expr`] and related logical planning structure
654656
//! * [datafusion_functions]: Scalar function packages
@@ -664,6 +666,9 @@
664666
//!
665667
//! [`SchemaProvider`]: datafusion_catalog::SchemaProvider
666668
//! [`CatalogProvider`]: datafusion_catalog::CatalogProvider
669+
//! [`Session`]: datafusion_session::Session
670+
//! [`FileSource`]: datafusion_datasource::file::FileSource
671+
//! [`DataSink`]: datafusion_datasource::sink::DataSink
667672
//!
668673
//! ## Citing DataFusion in Academic Papers
669674
//!

datafusion/datasource-avro/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ datafusion-execution = { workspace = true }
4343
datafusion-physical-expr = { workspace = true }
4444
datafusion-physical-expr-common = { workspace = true }
4545
datafusion-physical-plan = { workspace = true }
46+
datafusion-session = { workspace = true }
4647
futures = { workspace = true }
4748
num-traits = { version = "0.2" }
4849
object_store = { workspace = true }

0 commit comments

Comments
 (0)