From 330e4e8e8c4e3d31b1a2cd8fed5df398485045c5 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Fri, 23 Aug 2024 10:03:27 +0200 Subject: [PATCH 01/12] delete redundant github workflows & commitlint --- .github/workflows/check.yml | 13 ------------- .github/workflows/pull-request.yml | 30 ------------------------------ .github/workflows/test.yml | 4 ++-- commitlint.config.js | 8 -------- 4 files changed, 2 insertions(+), 53 deletions(-) delete mode 100644 .github/workflows/check.yml delete mode 100644 .github/workflows/pull-request.yml delete mode 100644 commitlint.config.js diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml deleted file mode 100644 index fe66d27..0000000 --- a/.github/workflows/check.yml +++ /dev/null @@ -1,13 +0,0 @@ -name: Check - -on: [push, pull_request] - -jobs: - formatting: - name: Formatting - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-node@v4 - - run: npm install prettier prettier-plugin-toml - - run: npx prettier --check --no-config . diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml deleted file mode 100644 index 62da82f..0000000 --- a/.github/workflows/pull-request.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: Pull Request - -on: - pull_request_target: - types: - - opened - - reopened - - edited - - synchronize - -jobs: - conventional-commits: - name: Conventional Commits - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-node@v4 - - run: npm install @commitlint/config-conventional - - run: npx commitlint <<< $CONVENTIONAL_COMMIT - env: - CONVENTIONAL_COMMIT: | - ${{ github.event.pull_request.title }} - - ${{ github.event.pull_request.body }} - - if: failure() - run: - echo "Datafusion-federation follows the [Conventional Commits specification](https://www.conventionalcommits.org/en/v1.0.0/) for release automation. - The PR title and body are used as the merge commit message. - - Please update your PR title to match the specification." >> $GITHUB_STEP_SUMMARY diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8e548ee..0792434 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -55,7 +55,7 @@ jobs: - uses: arduino/setup-protoc@v3 with: repo-token: ${{ secrets.GITHUB_TOKEN }} - - run: cargo clippy -- -Dwarnings + - run: cargo clippy -- -D warnings package: name: Package @@ -69,5 +69,5 @@ jobs: - uses: arduino/setup-protoc@v3 with: repo-token: ${{ secrets.GITHUB_TOKEN }} - - run: cargo build + - run: cargo build --all - run: cargo package -p datafusion-federation --allow-dirty diff --git a/commitlint.config.js b/commitlint.config.js deleted file mode 100644 index 0a2216d..0000000 --- a/commitlint.config.js +++ /dev/null @@ -1,8 +0,0 @@ -module.exports = { - extends: ["@commitlint/config-conventional"], - rules: { - "body-max-line-length": [0, "always", Infinity], - "footer-max-line-length": [0, "always", Infinity], - "header-max-length": [0, "always", Infinity], - }, -}; From f7182a3d45a5401e8b1976c46727593c5e6c3784 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 22 Aug 2024 16:13:52 +0200 Subject: [PATCH 02/12] update deps && use datafusion 41 --- Cargo.toml | 12 +++--------- datafusion-federation/src/lib.rs | 7 +++---- datafusion-federation/src/optimizer.rs | 8 ++++---- datafusion-federation/src/plan_node.rs | 8 ++++++++ datafusion-federation/src/table_provider.rs | 6 +++--- sources/flight-sql/Cargo.toml | 14 ++++++++------ sources/sql/Cargo.toml | 14 +++++--------- sources/sql/src/connectorx/executor.rs | 1 + sources/sql/src/lib.rs | 6 +++++- sources/sql/src/schema.rs | 3 +-- 10 files changed, 41 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 744de0b..09d77a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,23 +3,17 @@ resolver = "2" members = [ "datafusion-federation", - "examples", "sources/sql", "sources/flight-sql", ] -[patch.crates-io] -# connectorx = { path = "../connector-x/connectorx" } -# datafusion = { path = "../arrow-datafusion/datafusion/core" } - [workspace.package] version = "0.1.3" edition = "2021" license = "MIT" readme = "README.md" - [workspace.dependencies] -async-trait = "0.1.77" -datafusion = "37.0.0" -datafusion-substrait = "37.0.0" +async-trait = "0.1.81" +datafusion = "41.0.0" +datafusion-substrait = "41.0.0" diff --git a/datafusion-federation/src/lib.rs b/datafusion-federation/src/lib.rs index b6bd949..81de605 100644 --- a/datafusion-federation/src/lib.rs +++ b/datafusion-federation/src/lib.rs @@ -5,7 +5,7 @@ use std::{ }; use datafusion::{ - execution::context::{SessionContext, SessionState}, + execution::session_state::{SessionState, SessionStateBuilder}, optimizer::{optimizer::Optimizer, OptimizerRule}, }; @@ -18,12 +18,11 @@ mod plan_node; pub use plan_node::*; pub fn default_session_state() -> SessionState { - let df_state = SessionContext::new().state(); - let rules = default_optimizer_rules(); - df_state + SessionStateBuilder::new() .with_optimizer_rules(rules) .with_query_planner(Arc::new(FederatedQueryPlanner::new())) + .build() } pub fn default_optimizer_rules() -> Vec> { diff --git a/datafusion-federation/src/optimizer.rs b/datafusion-federation/src/optimizer.rs index 4afac32..d0fc24b 100644 --- a/datafusion-federation/src/optimizer.rs +++ b/datafusion-federation/src/optimizer.rs @@ -258,7 +258,7 @@ impl FederationOptimizerRule { }; // If this is the root plan node; federate the entire plan - let optimized = optimizer.optimize(plan, _config, |_, _| {})?; + let optimized = optimizer.optimize(plan.clone(), _config, |_, _| {})?; return Ok((Some(optimized), ScanResult::None)); } @@ -296,7 +296,7 @@ impl FederationOptimizerRule { // Replace the input with the federated counterpart let wrapped = wrap_projection(original_input)?; - let optimized = optimizer.optimize(&wrapped, _config, |_, _| {})?; + let optimized = optimizer.optimize(wrapped, _config, |_, _| {})?; Ok(optimized) }) @@ -398,9 +398,9 @@ fn wrap_projection(plan: LogicalPlan) -> Result { _ => { let expr = plan .schema() - .fields() + .columns() .iter() - .map(|f| Expr::Column(f.qualified_column())) + .map(|c| Expr::Column(c.clone())) .collect::>(); Ok(LogicalPlan::Projection(Projection::try_new( expr, diff --git a/datafusion-federation/src/plan_node.rs b/datafusion-federation/src/plan_node.rs index 35a9306..c81b152 100644 --- a/datafusion-federation/src/plan_node.rs +++ b/datafusion-federation/src/plan_node.rs @@ -65,6 +65,14 @@ impl UserDefinedLogicalNodeCore for FederatedPlanNode { planner: self.planner.clone(), } } + + /// XXX should consider something else here ? + fn with_exprs_and_inputs(&self, _exprs: Vec, _inputs: Vec) -> Result { + Ok(Self { + plan: self.plan.clone(), + planner: self.planner.clone(), + }) + } } #[derive(Default)] diff --git a/datafusion-federation/src/table_provider.rs b/datafusion-federation/src/table_provider.rs index a1acc30..b820b6e 100644 --- a/datafusion-federation/src/table_provider.rs +++ b/datafusion-federation/src/table_provider.rs @@ -3,10 +3,10 @@ use std::{any::Any, sync::Arc}; use async_trait::async_trait; use datafusion::{ arrow::datatypes::SchemaRef, + catalog::Session, common::Constraints, datasource::TableProvider, error::{DataFusionError, Result}, - execution::context::SessionState, logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableSource, TableType}, physical_plan::ExecutionPlan, }; @@ -106,7 +106,7 @@ impl TableProvider for FederatedTableProviderAdaptor { // with a virtual TableProvider that provides federation for a sub-plan. async fn scan( &self, - state: &SessionState, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, @@ -122,7 +122,7 @@ impl TableProvider for FederatedTableProviderAdaptor { async fn insert_into( &self, - _state: &SessionState, + _state: &dyn Session, input: Arc, overwrite: bool, ) -> Result> { diff --git a/sources/flight-sql/Cargo.toml b/sources/flight-sql/Cargo.toml index f778dfc..a7ed673 100644 --- a/sources/flight-sql/Cargo.toml +++ b/sources/flight-sql/Cargo.toml @@ -3,7 +3,6 @@ name = "datafusion-federation-flight-sql" version.workspace = true edition.workspace = true license.workspace = true -readme.workspace = true [lib] name = "datafusion_federation_flight_sql" @@ -13,11 +12,14 @@ path = "src/lib.rs" async-trait.workspace = true datafusion.workspace = true datafusion-substrait.workspace = true + +# XXX use the release verion on crates.io datafusion-federation.path = "../../datafusion-federation" datafusion-federation-sql.path = "../sql" + futures = "0.3.30" -tonic = {version="0.11.0", features=["tls"] } -prost = "0.12.3" -arrow = "51.0.0" -arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] } -log = "0.4.20" +tonic = {version="0.12.0", features=["tls"] } +prost = "0.13" +arrow = "52.0.0" +arrow-flight = { version = "52.0.0", features = ["flight-sql-experimental"] } +log = "0.4.22" diff --git a/sources/sql/Cargo.toml b/sources/sql/Cargo.toml index bc89d95..31d8218 100644 --- a/sources/sql/Cargo.toml +++ b/sources/sql/Cargo.toml @@ -3,7 +3,6 @@ name = "datafusion-federation-sql" version.workspace = true edition.workspace = true license.workspace = true -readme.workspace = true [lib] name = "datafusion_federation_sql" @@ -11,14 +10,11 @@ path = "src/lib.rs" [dependencies] async-trait.workspace = true -# connectorx = { version = "0.3.2", features = ["src_sqlite"] } -# https://github.com/sfu-db/connector-x/pull/555 -connectorx = { git = "https://github.com/devinjdangelo/connector-x.git", features = [ - "dst_arrow", - "src_sqlite" -] } datafusion.workspace = true + +# XXX use the release verion on crates.io datafusion-federation.path = "../../datafusion-federation" -# derive_builder = "0.13.0" + +connectorx = { version = "0.3.3" , features = ["dst_arrow", "src_sqlite"] } futures = "0.3.30" -tokio = "1.35.1" +tokio = "1.39" diff --git a/sources/sql/src/connectorx/executor.rs b/sources/sql/src/connectorx/executor.rs index fc5ea3d..033f39c 100644 --- a/sources/sql/src/connectorx/executor.rs +++ b/sources/sql/src/connectorx/executor.rs @@ -53,6 +53,7 @@ impl SQLExecutor for CXExecutor { fn compute_context(&self) -> Option { Some(self.context.clone()) } + fn execute(&self, sql: &str, schema: SchemaRef) -> Result { let conn = self.conn.clone(); let query: CXQuery = sql.into(); diff --git a/sources/sql/src/lib.rs b/sources/sql/src/lib.rs index 7619186..217d3d7 100644 --- a/sources/sql/src/lib.rs +++ b/sources/sql/src/lib.rs @@ -162,6 +162,10 @@ impl DisplayAs for VirtualExecutionPlan { } impl ExecutionPlan for VirtualExecutionPlan { + fn name(&self) -> &str { + "sql_federation_exec" + } + fn as_any(&self) -> &dyn Any { self } @@ -170,7 +174,7 @@ impl ExecutionPlan for VirtualExecutionPlan { self.schema() } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/sources/sql/src/schema.rs b/sources/sql/src/schema.rs index c780f23..83d6e08 100644 --- a/sources/sql/src/schema.rs +++ b/sources/sql/src/schema.rs @@ -2,8 +2,7 @@ use async_trait::async_trait; use datafusion::logical_expr::{TableSource, TableType}; use datafusion::{ - arrow::datatypes::SchemaRef, catalog::schema::SchemaProvider, datasource::TableProvider, - error::Result, + arrow::datatypes::SchemaRef, catalog::SchemaProvider, datasource::TableProvider, error::Result, }; use futures::future::join_all; use std::{any::Any, sync::Arc}; From 05519e8e38f9d3543d53bfe9bb6095f27b2907fb Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 22 Aug 2024 16:26:41 +0200 Subject: [PATCH 03/12] remove connectorx from sources/sql crate --- sources/sql/README.md | 4 + sources/sql/src/connectorx/executor.rs | 126 ------------------------- sources/sql/src/connectorx/mod.rs | 2 - sources/sql/src/lib.rs | 1 - 4 files changed, 4 insertions(+), 129 deletions(-) create mode 100644 sources/sql/README.md delete mode 100644 sources/sql/src/connectorx/executor.rs delete mode 100644 sources/sql/src/connectorx/mod.rs diff --git a/sources/sql/README.md b/sources/sql/README.md new file mode 100644 index 0000000..def0668 --- /dev/null +++ b/sources/sql/README.md @@ -0,0 +1,4 @@ + + +This will be move to +[datafusion-table-providers](https://github.com/datafusion-contrib/datafusion-table-providers) repository diff --git a/sources/sql/src/connectorx/executor.rs b/sources/sql/src/connectorx/executor.rs deleted file mode 100644 index 033f39c..0000000 --- a/sources/sql/src/connectorx/executor.rs +++ /dev/null @@ -1,126 +0,0 @@ -use async_trait::async_trait; -use connectorx::{ - destinations::arrow::ArrowDestinationError, - errors::{ConnectorXError, ConnectorXOutError}, - prelude::{get_arrow, CXQuery, SourceConn, SourceType}, -}; -use datafusion::{ - arrow::datatypes::{Field, Schema, SchemaRef}, - error::{DataFusionError, Result}, - physical_plan::{ - stream::RecordBatchStreamAdapter, EmptyRecordBatchStream, SendableRecordBatchStream, - }, - sql::sqlparser::dialect::{Dialect, GenericDialect, PostgreSqlDialect, SQLiteDialect}, -}; -use futures::executor::block_on; -use std::sync::Arc; -use tokio::task; - -use crate::executor::SQLExecutor; - -pub struct CXExecutor { - context: String, - conn: SourceConn, -} - -impl CXExecutor { - pub fn new(dsn: String) -> Result { - let conn = SourceConn::try_from(dsn.as_str()).map_err(cx_error_to_df)?; - Ok(Self { context: dsn, conn }) - } - - pub fn new_with_conn(conn: SourceConn) -> Self { - Self { - context: conn.conn.to_string(), - conn, - } - } - - pub fn context(&mut self, context: String) { - self.context = context; - } -} - -fn cx_error_to_df(err: ConnectorXError) -> DataFusionError { - DataFusionError::External(format!("ConnectorX: {err:?}").into()) -} - -#[async_trait] -impl SQLExecutor for CXExecutor { - fn name(&self) -> &str { - "connector_x_executor" - } - fn compute_context(&self) -> Option { - Some(self.context.clone()) - } - - fn execute(&self, sql: &str, schema: SchemaRef) -> Result { - let conn = self.conn.clone(); - let query: CXQuery = sql.into(); - - let mut dst = block_on(task::spawn_blocking(move || -> Result<_, _> { - get_arrow(&conn, None, &[query.clone()]).map_err(cx_out_error_to_df) - })) - .map_err(|err| DataFusionError::External(err.to_string().into()))??; - let stream = if let Some(batch) = dst.record_batch().map_err(cx_dst_error_to_df)? { - futures::stream::once(async move { Ok(batch) }) - } else { - return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::new( - Schema::empty(), - )))); - }; - - Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) - } - - async fn table_names(&self) -> Result> { - Err(DataFusionError::NotImplemented( - "connector_x source: table inference not implemented".to_string(), - )) - } - - async fn get_table_schema(&self, table_name: &str) -> Result { - let conn = self.conn.clone(); - let query: CXQuery = format!("select * from {table_name} limit 1") - .as_str() - .into(); - - let dst = get_arrow(&conn, None, &[query.clone()]).map_err(cx_out_error_to_df)?; - let schema = schema_to_lowercase(dst.arrow_schema()); - Ok(schema) - } - - fn dialect(&self) -> Arc { - match &self.conn.ty { - SourceType::Postgres => Arc::new(PostgreSqlDialect {}), - SourceType::SQLite => Arc::new(SQLiteDialect {}), - _ => Arc::new(GenericDialect {}), - } - } -} - -fn cx_dst_error_to_df(err: ArrowDestinationError) -> DataFusionError { - DataFusionError::External(format!("ConnectorX failed to run query: {err:?}").into()) -} - -/// Get the schema with lowercase field names -fn schema_to_lowercase(schema: SchemaRef) -> SchemaRef { - // DF needs lower case schema - let lower_fields: Vec<_> = schema - .fields - .iter() - .map(|f| { - Field::new( - f.name().to_ascii_lowercase(), - f.data_type().clone(), - f.is_nullable(), - ) - }) - .collect(); - - Arc::new(Schema::new(lower_fields)) -} - -fn cx_out_error_to_df(err: ConnectorXOutError) -> DataFusionError { - DataFusionError::External(format!("ConnectorX failed to run query: {err:?}").into()) -} diff --git a/sources/sql/src/connectorx/mod.rs b/sources/sql/src/connectorx/mod.rs deleted file mode 100644 index 600069a..0000000 --- a/sources/sql/src/connectorx/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod executor; -pub use executor::*; diff --git a/sources/sql/src/lib.rs b/sources/sql/src/lib.rs index 217d3d7..917fa8d 100644 --- a/sources/sql/src/lib.rs +++ b/sources/sql/src/lib.rs @@ -20,7 +20,6 @@ use datafusion_federation::{FederatedPlanNode, FederationPlanner, FederationProv mod schema; pub use schema::*; -pub mod connectorx; mod executor; pub use executor::*; From 3408e797b90eefbce0bf2f6e03a74e717cb75ef0 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Thu, 22 Aug 2024 17:09:50 +0200 Subject: [PATCH 04/12] flight-sql: fix up dependencie issues --- sources/flight-sql/Cargo.toml | 6 +-- sources/flight-sql/src/server/service.rs | 47 ++++++++++++------------ sources/flight-sql/src/server/state.rs | 3 +- 3 files changed, 29 insertions(+), 27 deletions(-) diff --git a/sources/flight-sql/Cargo.toml b/sources/flight-sql/Cargo.toml index a7ed673..07c39e7 100644 --- a/sources/flight-sql/Cargo.toml +++ b/sources/flight-sql/Cargo.toml @@ -18,8 +18,8 @@ datafusion-federation.path = "../../datafusion-federation" datafusion-federation-sql.path = "../sql" futures = "0.3.30" -tonic = {version="0.12.0", features=["tls"] } -prost = "0.13" +tonic = {version="0.11.0", features=["tls", "transport", "codegen", "prost"] } +prost = "0.12.3" arrow = "52.0.0" -arrow-flight = { version = "52.0.0", features = ["flight-sql-experimental"] } +arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] } log = "0.4.22" diff --git a/sources/flight-sql/src/server/service.rs b/sources/flight-sql/src/server/service.rs index c49cf32..afa4a5f 100644 --- a/sources/flight-sql/src/server/service.rs +++ b/sources/flight-sql/src/server/service.rs @@ -1,14 +1,11 @@ -use arrow::datatypes::SchemaRef; -use arrow::error::ArrowError; -use arrow::ipc::writer::IpcWriteOptions; -use arrow_flight::encode::FlightDataEncoderBuilder; -use arrow_flight::error::FlightError; -use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; -use arrow_flight::sql::server::{ - FlightSqlService as ArrowFlightSqlService, PeekableFlightDataStream, -}; +use std::pin::Pin; +use std::sync::Arc; + +use arrow::{datatypes::SchemaRef, error::ArrowError, ipc::writer::IpcWriteOptions}; use arrow_flight::sql::{ - self, ActionBeginSavepointRequest, ActionBeginSavepointResult, ActionBeginTransactionRequest, + self, + server::{FlightSqlService as ArrowFlightSqlService, PeekableFlightDataStream}, + ActionBeginSavepointRequest, ActionBeginSavepointResult, ActionBeginTransactionRequest, ActionBeginTransactionResult, ActionCancelQueryRequest, ActionCancelQueryResult, ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, ActionCreatePreparedSubstraitPlanRequest, @@ -16,26 +13,30 @@ use arrow_flight::sql::{ CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo, CommandPreparedStatementQuery, CommandPreparedStatementUpdate, - CommandStatementQuery, CommandStatementSubstraitPlan, CommandStatementUpdate, SqlInfo, - TicketStatementQuery, + CommandStatementQuery, CommandStatementSubstraitPlan, CommandStatementUpdate, + DoPutPreparedStatementResult, SqlInfo, TicketStatementQuery, }; use arrow_flight::{ + encode::FlightDataEncoderBuilder, + error::FlightError, + flight_service_server::{FlightService, FlightServiceServer}, Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, IpcMessage, SchemaAsIpc, Ticket, }; -use datafusion::common::arrow::datatypes::Schema; -use datafusion::dataframe::DataFrame; -use datafusion::error::{DataFusionError, Result as DataFusionResult}; -use datafusion::execution::context::{SQLOptions, SessionContext, SessionState}; -use datafusion::logical_expr::LogicalPlan; -use datafusion::physical_plan::SendableRecordBatchStream; -use datafusion_substrait::logical_plan::consumer::from_substrait_plan; -use datafusion_substrait::serializer::deserialize_bytes; +use datafusion::{ + common::arrow::datatypes::Schema, + dataframe::DataFrame, + error::{DataFusionError, Result as DataFusionResult}, + execution::context::{SQLOptions, SessionContext, SessionState}, + logical_expr::LogicalPlan, + physical_plan::SendableRecordBatchStream, +}; +use datafusion_substrait::{ + logical_plan::consumer::from_substrait_plan, serializer::deserialize_bytes, +}; use futures::{Stream, StreamExt, TryStreamExt}; use log::info; use prost::bytes::Bytes; -use std::pin::Pin; -use std::sync::Arc; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; @@ -601,7 +602,7 @@ impl ArrowFlightSqlService for FlightSqlService { &self, _query: CommandPreparedStatementQuery, request: Request, - ) -> Result::DoPutStream>> { + ) -> Result { info!("do_put_prepared_statement_query"); let (_, _) = self.new_context(request)?; diff --git a/sources/flight-sql/src/server/state.rs b/sources/flight-sql/src/server/state.rs index 0b4ec09..e28ffdd 100644 --- a/sources/flight-sql/src/server/state.rs +++ b/sources/flight-sql/src/server/state.rs @@ -1,9 +1,10 @@ +use std::fmt::Display; + use arrow_flight::{ error::FlightError, sql::{self, Any, Command}, }; use prost::{bytes::Bytes, Message}; -use std::fmt::Display; pub type Result = std::result::Result; From a514a6cd4734de1ab1a0529deacc2eab1b21eb4d Mon Sep 17 00:00:00 2001 From: hozan23 Date: Fri, 23 Aug 2024 09:45:32 +0200 Subject: [PATCH 05/12] move sources/sql code to datafusion-federation crate and add sql feature --- Cargo.toml | 3 +-- datafusion-federation/Cargo.toml | 5 +++++ datafusion-federation/src/lib.rs | 18 ++++++++++-------- .../src/sql}/executor.rs | 0 .../src/sql/mod.rs | 15 +++++++-------- .../src/sql}/schema.rs | 11 +++++------ 6 files changed, 28 insertions(+), 24 deletions(-) rename {sources/sql/src => datafusion-federation/src/sql}/executor.rs (100%) rename sources/sql/src/lib.rs => datafusion-federation/src/sql/mod.rs (95%) rename {sources/sql/src => datafusion-federation/src/sql}/schema.rs (96%) diff --git a/Cargo.toml b/Cargo.toml index 09d77a8..f149159 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,8 +3,7 @@ resolver = "2" members = [ "datafusion-federation", - "sources/sql", - "sources/flight-sql", + # "sources/flight-sql", ] [workspace.package] diff --git a/datafusion-federation/Cargo.toml b/datafusion-federation/Cargo.toml index 83e2723..fa6bd74 100644 --- a/datafusion-federation/Cargo.toml +++ b/datafusion-federation/Cargo.toml @@ -10,10 +10,15 @@ description = "Datafusion federation." name = "datafusion_federation" path = "src/lib.rs" +[features] +sql = ["futures"] + [dependencies] async-trait.workspace = true datafusion.workspace = true +futures = { version = "0.3.30", optional = true } + [package.metadata.docs.rs] # Whether to pass `--all-features` to Cargo (default: false) diff --git a/datafusion-federation/src/lib.rs b/datafusion-federation/src/lib.rs index 81de605..012c62d 100644 --- a/datafusion-federation/src/lib.rs +++ b/datafusion-federation/src/lib.rs @@ -1,5 +1,11 @@ -use core::fmt; +mod optimizer; +mod plan_node; +mod table_provider; +#[cfg(feature = "sql")] +pub mod sql; + use std::{ + fmt, hash::{Hash, Hasher}, sync::Arc, }; @@ -9,13 +15,9 @@ use datafusion::{ optimizer::{optimizer::Optimizer, OptimizerRule}, }; -mod optimizer; -pub use optimizer::*; -mod table_provider; -pub use table_provider::*; - -mod plan_node; -pub use plan_node::*; +pub use optimizer::{get_table_source, FederationOptimizerRule}; +pub use plan_node::{FederatedPlanNode, FederatedQueryPlanner, FederationPlanner}; +pub use table_provider::{FederatedTableProviderAdaptor, FederatedTableSource}; pub fn default_session_state() -> SessionState { let rules = default_optimizer_rules(); diff --git a/sources/sql/src/executor.rs b/datafusion-federation/src/sql/executor.rs similarity index 100% rename from sources/sql/src/executor.rs rename to datafusion-federation/src/sql/executor.rs diff --git a/sources/sql/src/lib.rs b/datafusion-federation/src/sql/mod.rs similarity index 95% rename from sources/sql/src/lib.rs rename to datafusion-federation/src/sql/mod.rs index 917fa8d..d37f2b5 100644 --- a/sources/sql/src/lib.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -1,6 +1,9 @@ -use core::fmt; -use std::{any::Any, sync::Arc, vec}; +mod executor; +mod schema; + +use std::{any::Any, fmt, sync::Arc, vec}; +use crate::{FederatedPlanNode, FederationPlanner, FederationProvider}; use async_trait::async_trait; use datafusion::{ arrow::datatypes::{Schema, SchemaRef}, @@ -15,13 +18,9 @@ use datafusion::{ }, sql::unparser::plan_to_sql, }; -use datafusion_federation::{FederatedPlanNode, FederationPlanner, FederationProvider}; -mod schema; -pub use schema::*; - -mod executor; -pub use executor::*; +pub use executor::{SQLExecutor, SQLExecutorRef}; +pub use schema::{MultiSchemaProvider, SQLSchemaProvider, SQLTableSource}; // #[macro_use] // extern crate derive_builder; diff --git a/sources/sql/src/schema.rs b/datafusion-federation/src/sql/schema.rs similarity index 96% rename from sources/sql/src/schema.rs rename to datafusion-federation/src/sql/schema.rs index 83d6e08..cb35ee6 100644 --- a/sources/sql/src/schema.rs +++ b/datafusion-federation/src/sql/schema.rs @@ -1,18 +1,17 @@ -use async_trait::async_trait; +use std::{any::Any, sync::Arc}; +use async_trait::async_trait; use datafusion::logical_expr::{TableSource, TableType}; use datafusion::{ arrow::datatypes::SchemaRef, catalog::SchemaProvider, datasource::TableProvider, error::Result, }; use futures::future::join_all; -use std::{any::Any, sync::Arc}; -use datafusion_federation::{ - FederatedTableProviderAdaptor, FederatedTableSource, FederationProvider, +use crate::{ + sql::SQLFederationProvider, FederatedTableProviderAdaptor, FederatedTableSource, + FederationProvider, }; -use crate::SQLFederationProvider; - pub struct SQLSchemaProvider { // provider: Arc, tables: Vec>, From b699eeca1ec71a07b86b9ab1f8b7f1211e5badc8 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Fri, 23 Aug 2024 10:00:34 +0200 Subject: [PATCH 06/12] move sources/flight-sql to datafusion-flight-sql-server --- Cargo.toml | 2 +- datafusion-federation/src/lib.rs | 2 +- datafusion-federation/src/sql/mod.rs | 3 ++- .../Cargo.toml | 17 ++++++++++------ .../src/executor/mod.rs | 6 +++--- .../src/lib.rs | 0 .../src/server/mod.rs | 0 .../src/server/service.rs | 0 .../src/server/session.rs | 0 .../src/server/state.rs | 0 sources/sql/Cargo.toml | 20 ------------------- sources/sql/README.md | 4 ---- 12 files changed, 18 insertions(+), 36 deletions(-) rename {sources/flight-sql => datafusion-flight-sql-server}/Cargo.toml (60%) rename {sources/flight-sql => datafusion-flight-sql-server}/src/executor/mod.rs (98%) rename {sources/flight-sql => datafusion-flight-sql-server}/src/lib.rs (100%) rename {sources/flight-sql => datafusion-flight-sql-server}/src/server/mod.rs (100%) rename {sources/flight-sql => datafusion-flight-sql-server}/src/server/service.rs (100%) rename {sources/flight-sql => datafusion-flight-sql-server}/src/server/session.rs (100%) rename {sources/flight-sql => datafusion-flight-sql-server}/src/server/state.rs (100%) delete mode 100644 sources/sql/Cargo.toml delete mode 100644 sources/sql/README.md diff --git a/Cargo.toml b/Cargo.toml index f149159..53ee52c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ resolver = "2" members = [ "datafusion-federation", - # "sources/flight-sql", + "datafusion-flight-sql-server", ] [workspace.package] diff --git a/datafusion-federation/src/lib.rs b/datafusion-federation/src/lib.rs index 012c62d..b3eaff3 100644 --- a/datafusion-federation/src/lib.rs +++ b/datafusion-federation/src/lib.rs @@ -1,8 +1,8 @@ mod optimizer; mod plan_node; -mod table_provider; #[cfg(feature = "sql")] pub mod sql; +mod table_provider; use std::{ fmt, diff --git a/datafusion-federation/src/sql/mod.rs b/datafusion-federation/src/sql/mod.rs index d37f2b5..e19950d 100644 --- a/datafusion-federation/src/sql/mod.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -3,7 +3,6 @@ mod schema; use std::{any::Any, fmt, sync::Arc, vec}; -use crate::{FederatedPlanNode, FederationPlanner, FederationProvider}; use async_trait::async_trait; use datafusion::{ arrow::datatypes::{Schema, SchemaRef}, @@ -22,6 +21,8 @@ use datafusion::{ pub use executor::{SQLExecutor, SQLExecutorRef}; pub use schema::{MultiSchemaProvider, SQLSchemaProvider, SQLTableSource}; +use crate::{FederatedPlanNode, FederationPlanner, FederationProvider}; + // #[macro_use] // extern crate derive_builder; diff --git a/sources/flight-sql/Cargo.toml b/datafusion-flight-sql-server/Cargo.toml similarity index 60% rename from sources/flight-sql/Cargo.toml rename to datafusion-flight-sql-server/Cargo.toml index 07c39e7..8e466e2 100644 --- a/sources/flight-sql/Cargo.toml +++ b/datafusion-flight-sql-server/Cargo.toml @@ -1,24 +1,29 @@ [package] -name = "datafusion-federation-flight-sql" +name = "datafusion-flight-sql-server" version.workspace = true edition.workspace = true license.workspace = true [lib] -name = "datafusion_federation_flight_sql" +name = "datafusion_flight_sql_server" path = "src/lib.rs" [dependencies] async-trait.workspace = true datafusion.workspace = true datafusion-substrait.workspace = true - # XXX use the release verion on crates.io -datafusion-federation.path = "../../datafusion-federation" -datafusion-federation-sql.path = "../sql" +datafusion-federation = { path = "../datafusion-federation", features = [ + "sql", +] } futures = "0.3.30" -tonic = {version="0.11.0", features=["tls", "transport", "codegen", "prost"] } +tonic = { version = "0.11.0", features = [ + "tls", + "transport", + "codegen", + "prost", +] } prost = "0.12.3" arrow = "52.0.0" arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] } diff --git a/sources/flight-sql/src/executor/mod.rs b/datafusion-flight-sql-server/src/executor/mod.rs similarity index 98% rename from sources/flight-sql/src/executor/mod.rs rename to datafusion-flight-sql-server/src/executor/mod.rs index a5c5a38..930939d 100644 --- a/sources/flight-sql/src/executor/mod.rs +++ b/datafusion-flight-sql-server/src/executor/mod.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use arrow::{datatypes::SchemaRef, error::ArrowError}; use arrow_flight::sql::client::FlightSqlServiceClient; use async_trait::async_trait; @@ -6,10 +8,8 @@ use datafusion::{ physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}, sql::sqlparser::dialect::{Dialect, GenericDialect}, }; -use datafusion_federation_sql::SQLExecutor; +use datafusion_federation::sql::SQLExecutor; use futures::TryStreamExt; - -use std::sync::Arc; use tonic::transport::Channel; pub struct FlightSQLExecutor { diff --git a/sources/flight-sql/src/lib.rs b/datafusion-flight-sql-server/src/lib.rs similarity index 100% rename from sources/flight-sql/src/lib.rs rename to datafusion-flight-sql-server/src/lib.rs diff --git a/sources/flight-sql/src/server/mod.rs b/datafusion-flight-sql-server/src/server/mod.rs similarity index 100% rename from sources/flight-sql/src/server/mod.rs rename to datafusion-flight-sql-server/src/server/mod.rs diff --git a/sources/flight-sql/src/server/service.rs b/datafusion-flight-sql-server/src/server/service.rs similarity index 100% rename from sources/flight-sql/src/server/service.rs rename to datafusion-flight-sql-server/src/server/service.rs diff --git a/sources/flight-sql/src/server/session.rs b/datafusion-flight-sql-server/src/server/session.rs similarity index 100% rename from sources/flight-sql/src/server/session.rs rename to datafusion-flight-sql-server/src/server/session.rs diff --git a/sources/flight-sql/src/server/state.rs b/datafusion-flight-sql-server/src/server/state.rs similarity index 100% rename from sources/flight-sql/src/server/state.rs rename to datafusion-flight-sql-server/src/server/state.rs diff --git a/sources/sql/Cargo.toml b/sources/sql/Cargo.toml deleted file mode 100644 index 31d8218..0000000 --- a/sources/sql/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "datafusion-federation-sql" -version.workspace = true -edition.workspace = true -license.workspace = true - -[lib] -name = "datafusion_federation_sql" -path = "src/lib.rs" - -[dependencies] -async-trait.workspace = true -datafusion.workspace = true - -# XXX use the release verion on crates.io -datafusion-federation.path = "../../datafusion-federation" - -connectorx = { version = "0.3.3" , features = ["dst_arrow", "src_sqlite"] } -futures = "0.3.30" -tokio = "1.39" diff --git a/sources/sql/README.md b/sources/sql/README.md deleted file mode 100644 index def0668..0000000 --- a/sources/sql/README.md +++ /dev/null @@ -1,4 +0,0 @@ - - -This will be move to -[datafusion-table-providers](https://github.com/datafusion-contrib/datafusion-table-providers) repository From d9449e5ea0cc13afdf71118216a2141188566bec Mon Sep 17 00:00:00 2001 From: hozan23 Date: Fri, 23 Aug 2024 10:59:20 +0200 Subject: [PATCH 07/12] "create datafusion-flight-sql-table-provider crate & move the executor from datafusion-flight-sql-server" --- Cargo.toml | 1 + README.md | 3 +++ datafusion-flight-sql-server/Cargo.toml | 5 +++- .../examples/flight-sql.rs | 13 ++++------ .../examples/test.csv | 4 ++++ datafusion-flight-sql-server/src/lib.rs | 5 ++-- .../src/server/mod.rs | 6 ----- .../src/{server => }/service.rs | 5 ++-- .../src/{server => }/session.rs | 0 .../src/{server => }/state.rs | 0 .../Cargo.toml | 24 +++++++++++++++++++ .../src/lib.rs | 0 12 files changed, 46 insertions(+), 20 deletions(-) rename {examples => datafusion-flight-sql-server}/examples/flight-sql.rs (88%) create mode 100644 datafusion-flight-sql-server/examples/test.csv delete mode 100644 datafusion-flight-sql-server/src/server/mod.rs rename datafusion-flight-sql-server/src/{server => }/service.rs (99%) rename datafusion-flight-sql-server/src/{server => }/session.rs (100%) rename datafusion-flight-sql-server/src/{server => }/state.rs (100%) create mode 100644 datafusion-flight-sql-table-provider/Cargo.toml rename datafusion-flight-sql-server/src/executor/mod.rs => datafusion-flight-sql-table-provider/src/lib.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index 53ee52c..83ca62e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ resolver = "2" members = [ "datafusion-federation", "datafusion-flight-sql-server", + "datafusion-flight-sql-table-provider", ] [workspace.package] diff --git a/README.md b/README.md index 028db87..ed7fdef 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,9 @@ The goal of this repo is to allow [DataFusion](https://github.com/apache/arrow-datafusion) to resolve queries across remote query engines while pushing down as much compute as possible down. + +> :warning: **All the examples are deprecated for now** + Check out [the examples](./examples/) to get a feel for how it works. Potential use-cases: diff --git a/datafusion-flight-sql-server/Cargo.toml b/datafusion-flight-sql-server/Cargo.toml index 8e466e2..ede6979 100644 --- a/datafusion-flight-sql-server/Cargo.toml +++ b/datafusion-flight-sql-server/Cargo.toml @@ -9,7 +9,6 @@ name = "datafusion_flight_sql_server" path = "src/lib.rs" [dependencies] -async-trait.workspace = true datafusion.workspace = true datafusion-substrait.workspace = true # XXX use the release verion on crates.io @@ -28,3 +27,7 @@ prost = "0.12.3" arrow = "52.0.0" arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] } log = "0.4.22" + +[dev-dependencies] +tokio = { version = "1.39.3", features = ["full"] } +datafusion-flight-sql-table-provider = { path = "../datafusion-flight-sql-table-provider" } diff --git a/examples/examples/flight-sql.rs b/datafusion-flight-sql-server/examples/flight-sql.rs similarity index 88% rename from examples/examples/flight-sql.rs rename to datafusion-flight-sql-server/examples/flight-sql.rs index e0899cb..021227a 100644 --- a/examples/examples/flight-sql.rs +++ b/datafusion-flight-sql-server/examples/flight-sql.rs @@ -2,15 +2,16 @@ use std::{sync::Arc, time::Duration}; use arrow_flight::sql::client::FlightSqlServiceClient; use datafusion::{ - catalog::schema::SchemaProvider, + catalog::SchemaProvider, error::{DataFusionError, Result}, execution::{ context::{SessionContext, SessionState}, options::CsvReadOptions, }, }; -use datafusion_federation_flight_sql::{executor::FlightSQLExecutor, server::FlightSqlService}; -use datafusion_federation_sql::{SQLFederationProvider, SQLSchemaProvider}; +use datafusion_federation::sql::{SQLFederationProvider, SQLSchemaProvider}; +use datafusion_flight_sql_server::service::FlightSqlService; +use datafusion_flight_sql_table_provider::FlightSQLExecutor; use tokio::time::sleep; use tonic::transport::Endpoint; @@ -19,11 +20,7 @@ async fn main() -> Result<()> { let dsn: String = "0.0.0.0:50051".to_string(); let remote_ctx = SessionContext::new(); remote_ctx - .register_csv( - "test", - "./examples/examples/test.csv", - CsvReadOptions::new(), - ) + .register_csv("test", "./examples/test.csv", CsvReadOptions::new()) .await?; // Remote context diff --git a/datafusion-flight-sql-server/examples/test.csv b/datafusion-flight-sql-server/examples/test.csv new file mode 100644 index 0000000..811d276 --- /dev/null +++ b/datafusion-flight-sql-server/examples/test.csv @@ -0,0 +1,4 @@ +foo,bar +a,1 +b,2 +c,3 \ No newline at end of file diff --git a/datafusion-flight-sql-server/src/lib.rs b/datafusion-flight-sql-server/src/lib.rs index a8795c9..f7f6eac 100644 --- a/datafusion-flight-sql-server/src/lib.rs +++ b/datafusion-flight-sql-server/src/lib.rs @@ -1,2 +1,3 @@ -pub mod executor; -pub mod server; +pub mod service; +pub mod session; +pub mod state; diff --git a/datafusion-flight-sql-server/src/server/mod.rs b/datafusion-flight-sql-server/src/server/mod.rs deleted file mode 100644 index 0c054b9..0000000 --- a/datafusion-flight-sql-server/src/server/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -mod service; -pub use service::*; -mod state; -pub use state::*; -mod session; -pub use session::*; diff --git a/datafusion-flight-sql-server/src/server/service.rs b/datafusion-flight-sql-server/src/service.rs similarity index 99% rename from datafusion-flight-sql-server/src/server/service.rs rename to datafusion-flight-sql-server/src/service.rs index afa4a5f..8a85132 100644 --- a/datafusion-flight-sql-server/src/server/service.rs +++ b/datafusion-flight-sql-server/src/service.rs @@ -1,5 +1,4 @@ -use std::pin::Pin; -use std::sync::Arc; +use std::{pin::Pin, sync::Arc}; use arrow::{datatypes::SchemaRef, error::ArrowError, ipc::writer::IpcWriteOptions}; use arrow_flight::sql::{ @@ -40,8 +39,8 @@ use prost::bytes::Bytes; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; +use super::session::{SessionStateProvider, StaticSessionStateProvider}; use super::state::{CommandTicket, QueryHandle}; -use super::{SessionStateProvider, StaticSessionStateProvider}; type Result = std::result::Result; diff --git a/datafusion-flight-sql-server/src/server/session.rs b/datafusion-flight-sql-server/src/session.rs similarity index 100% rename from datafusion-flight-sql-server/src/server/session.rs rename to datafusion-flight-sql-server/src/session.rs diff --git a/datafusion-flight-sql-server/src/server/state.rs b/datafusion-flight-sql-server/src/state.rs similarity index 100% rename from datafusion-flight-sql-server/src/server/state.rs rename to datafusion-flight-sql-server/src/state.rs diff --git a/datafusion-flight-sql-table-provider/Cargo.toml b/datafusion-flight-sql-table-provider/Cargo.toml new file mode 100644 index 0000000..4302133 --- /dev/null +++ b/datafusion-flight-sql-table-provider/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "datafusion-flight-sql-table-provider" +version.workspace = true +edition.workspace = true +license.workspace = true +readme.workspace = true + +[dependencies] +async-trait.workspace = true +datafusion.workspace = true +# XXX use the release verion on crates.io +datafusion-federation = { path = "../datafusion-federation", features = [ + "sql", +] } + +futures = "0.3.30" +tonic = { version = "0.11.0", features = [ + "tls", + "transport", + "codegen", + "prost", +] } +arrow = "52.0.0" +arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental"] } diff --git a/datafusion-flight-sql-server/src/executor/mod.rs b/datafusion-flight-sql-table-provider/src/lib.rs similarity index 100% rename from datafusion-flight-sql-server/src/executor/mod.rs rename to datafusion-flight-sql-table-provider/src/lib.rs From ac84a5023b0cf71ec1be5675e2adcf027085fe82 Mon Sep 17 00:00:00 2001 From: hozan23 Date: Fri, 23 Aug 2024 11:27:17 +0200 Subject: [PATCH 08/12] override supports_rewrite for FederationOptimizerRule and SQLFederationOptimizerRule --- datafusion-federation/src/optimizer.rs | 6 ++++++ datafusion-federation/src/sql/mod.rs | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/datafusion-federation/src/optimizer.rs b/datafusion-federation/src/optimizer.rs index d0fc24b..8f4cf04 100644 --- a/datafusion-federation/src/optimizer.rs +++ b/datafusion-federation/src/optimizer.rs @@ -35,6 +35,12 @@ impl OptimizerRule for FederationOptimizerRule { fn name(&self) -> &str { "federation_optimizer_rule" } + + /// XXX + /// Does this rule support rewriting owned plans (rather than by reference)? + fn supports_rewrite(&self) -> bool { + false + } } enum ScanResult { diff --git a/datafusion-federation/src/sql/mod.rs b/datafusion-federation/src/sql/mod.rs index e19950d..e68f8fa 100644 --- a/datafusion-federation/src/sql/mod.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -93,6 +93,12 @@ impl OptimizerRule for SQLFederationOptimizerRule { fn name(&self) -> &str { "federate_sql" } + + /// XXX + /// Does this rule support rewriting owned plans (rather than by reference)? + fn supports_rewrite(&self) -> bool { + false + } } struct SQLFederationPlanner { executor: Arc, From 5d240d7584fc0ee7f1f305e5d732abc9c49c586e Mon Sep 17 00:00:00 2001 From: Michiel De Backker Date: Fri, 23 Aug 2024 14:56:37 +0200 Subject: [PATCH 09/12] Improve project overview --- README.md | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ed7fdef..9d39ffa 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,20 @@ [![crates.io](https://img.shields.io/crates/v/datafusion-federation.svg)](https://crates.io/crates/datafusion-federation) [![docs.rs](https://docs.rs/datafusion-federation/badge.svg)](https://docs.rs/datafusion-federation) -The goal of this repo is to allow [DataFusion](https://github.com/apache/arrow-datafusion) to resolve queries across remote query engines while pushing down as much compute as possible down. +DataFusion Federation allows [DataFusion](https://github.com/apache/arrow-datafusion) to execute (part of) a query plan by a remote execution engine. + ┌────────────────┐ + ┌────────────┐ │ Remote DBMS(s) │ + SQL Query ───> │ DataFusion │ ───> │ ( execution │ + └────────────┘ │ happens here ) │ + └────────────────┘ + +The goal is to allow resolving queries across remote query engines while pushing down as much compute as possible to the remote database(s). This allows execution to happen as close to the storage as possible. This concept is referred to as 'query federation'. + +> [!TIP] +> This repository implements the federation framework itself. If you want to connect to a specific database, check out the compatible providers available in [datafusion-contrib/datafusion-table-providers](https://github.com/datafusion-contrib/datafusion-table-providers/). + +#### Usage > :warning: **All the examples are deprecated for now** @@ -17,6 +29,71 @@ Potential use-cases: - DataFusion -> Flight SQL -> DataFusion - .. +#### Design concept + +Say you have a query plan as follows: + + ┌────────────┐ + │ Join │ + └────────────┘ + ▲ + ┌───────┴────────┐ + ┌────────────┐ ┌────────────┐ + │ Scan A │ │ Join │ + └────────────┘ └────────────┘ + ▲ + ┌───────┴────────┐ + ┌────────────┐ ┌────────────┐ + │ Scan B │ │ Scan C │ + └────────────┘ └────────────┘ + +DataFusion Federation will identify the largest possible sub-plans that +can be executed by an external database: + + ┌────────────┐ Optimizer pass + │ Join │ recognizes B and C + └────────────┘ are available in an + ▲ external database + ┌──────────────┴────────┐ + │ ┌ ─ ─ ─ ─ ─ ─ ┴ ─ ── ─ ─ ─ ─ ─┐ + ┌────────────┐ ┌────────────┐ │ + │ Scan A │ │ │ Join │ + └────────────┘ └────────────┘ │ + │ ▲ + ┌───────┴────────┐ │ + ┌────────────┐ ┌────────────┐ │ + ││ Scan B │ │ Scan C │ + └────────────┘ └────────────┘ │ + ─ ── ─ ─ ── ─ ─ ─ ─ ─ ─ ─ ── ─ ┘ + +The sub-plans are cut out and replaced by an opaque federation node in the plan: + + ┌────────────┐ + │ Join │ + └────────────┘ Rewritten Plan + ▲ + ┌────────┴───────────┐ + │ │ + ┌────────────┐ ┏━━━━━━━━━━━━━━━━━━┓ + │ Scan A │ ┃ Scan B+C ┃ + └────────────┘ ┃ (TableProvider ┃ + ┃ that can execute ┃ + ┃ sub-plan in an ┃ + ┃external database)┃ + ┗━━━━━━━━━━━━━━━━━━┛ + +Different databases may have different query languages and execution capabilities. To accommodate for this, we allow each 'federation provider' to self-determine what part of a sub-plan it will actually federate. This is done by letting each federation provider define its own optimizer rule. When a sub-plan is 'cut out' of the overall plan, it is first passed the federation provider's optimizer rule. This optimizer rule determines the part of the plan that is cut out, based based on the execution capabilities of the database it represents. + +#### Implementation + +A remote database is represented by the `FederationProvider` trait. To identify table scans that are available in the same database, they implement `FederatedTableSource` trait. This trait allows lookup of the corresponding `FederationProvider`. + +Identifying sub-plans to federate is done by the `FederationOptimizerRule`. This rule needs to be registered in your DataFusion SessionState. One easy way to do this is using `default_session_state`. To do its job, the `FederationOptimizerRule` currently requires that all TableProviders that need to be federated are `FederatedTableProviderAdaptor`s. The `FederatedTableProviderAdaptor` also has a fallback mechanism that allows implementations to fallback to a 'vanilla' TableProvider in case the `FederationOptimizerRule` isn't registered. + +The `FederationProvider` can provide a `compute_context`. This allows it to differentiate between multiple remote execution context of the same type. For example two different mysql instances, database schemas, access level, etc. The `FederationProvider` also returns the `Optimizer` that is allows it to self-determine what part of a sub-plan it can federate. + +The `sql` module implements a generic `FederationProvider` for SQL execution engines. A specific SQL engine implements the `SQLExecutor` trait for its engine specific execution. There are a number of compatible providers available in [datafusion-contrib/datafusion-table-providers](https://github.com/datafusion-contrib/datafusion-table-providers/). + #### Status The project is in alpha status. Contributions welcome; land a PR = commit access. From f4babeeac31d23f8316195e6132fa39ab6e01683 Mon Sep 17 00:00:00 2001 From: hozan23 <119854621+hozan23@users.noreply.github.com> Date: Sat, 24 Aug 2024 21:06:53 +0200 Subject: [PATCH 10/12] update README.md (#48) --- README.md | 84 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 9d39ffa..0553c83 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,11 @@ -## DataFusion Federation +# DataFusion Federation [![crates.io](https://img.shields.io/crates/v/datafusion-federation.svg)](https://crates.io/crates/datafusion-federation) [![docs.rs](https://docs.rs/datafusion-federation/badge.svg)](https://docs.rs/datafusion-federation) -DataFusion Federation allows [DataFusion](https://github.com/apache/arrow-datafusion) to execute (part of) a query plan by a remote execution engine. +DataFusion Federation allows +[DataFusion](https://github.com/apache/arrow-datafusion) to execute (part of) a +query plan by a remote execution engine. ┌────────────────┐ ┌────────────┐ │ Remote DBMS(s) │ @@ -11,25 +13,31 @@ DataFusion Federation allows [DataFusion](https://github.com/apache/arrow-datafu └────────────┘ │ happens here ) │ └────────────────┘ -The goal is to allow resolving queries across remote query engines while pushing down as much compute as possible to the remote database(s). This allows execution to happen as close to the storage as possible. This concept is referred to as 'query federation'. +The goal is to allow resolving queries across remote query engines while +pushing down as much compute as possible to the remote database(s). This allows +execution to happen as close to the storage as possible. This concept is +referred to as 'query federation'. > [!TIP] -> This repository implements the federation framework itself. If you want to connect to a specific database, check out the compatible providers available in [datafusion-contrib/datafusion-table-providers](https://github.com/datafusion-contrib/datafusion-table-providers/). +> This repository implements the federation framework itself. If you want to +> connect to a specific database, check out the compatible providers available +> in +> [datafusion-contrib/datafusion-table-providers](https://github.com/datafusion-contrib/datafusion-table-providers/). -#### Usage +## Usage > :warning: **All the examples are deprecated for now** Check out [the examples](./examples/) to get a feel for how it works. -Potential use-cases: +## Potential use-cases: - Querying across SQLite, MySQL, PostgreSQL, ... - Pushing down SQL or [Substrait](https://substrait.io/) plans. - DataFusion -> Flight SQL -> DataFusion - .. -#### Design concept +## Design concept Say you have a query plan as follows: @@ -50,9 +58,9 @@ Say you have a query plan as follows: DataFusion Federation will identify the largest possible sub-plans that can be executed by an external database: - ┌────────────┐ Optimizer pass - │ Join │ recognizes B and C - └────────────┘ are available in an + ┌────────────┐ Optimizer recognizes + │ Join │ that B and C are + └────────────┘ available in an ▲ external database ┌──────────────┴────────┐ │ ┌ ─ ─ ─ ─ ─ ─ ┴ ─ ── ─ ─ ─ ─ ─┐ @@ -82,21 +90,47 @@ The sub-plans are cut out and replaced by an opaque federation node in the plan: ┃external database)┃ ┗━━━━━━━━━━━━━━━━━━┛ -Different databases may have different query languages and execution capabilities. To accommodate for this, we allow each 'federation provider' to self-determine what part of a sub-plan it will actually federate. This is done by letting each federation provider define its own optimizer rule. When a sub-plan is 'cut out' of the overall plan, it is first passed the federation provider's optimizer rule. This optimizer rule determines the part of the plan that is cut out, based based on the execution capabilities of the database it represents. - -#### Implementation - -A remote database is represented by the `FederationProvider` trait. To identify table scans that are available in the same database, they implement `FederatedTableSource` trait. This trait allows lookup of the corresponding `FederationProvider`. - -Identifying sub-plans to federate is done by the `FederationOptimizerRule`. This rule needs to be registered in your DataFusion SessionState. One easy way to do this is using `default_session_state`. To do its job, the `FederationOptimizerRule` currently requires that all TableProviders that need to be federated are `FederatedTableProviderAdaptor`s. The `FederatedTableProviderAdaptor` also has a fallback mechanism that allows implementations to fallback to a 'vanilla' TableProvider in case the `FederationOptimizerRule` isn't registered. - -The `FederationProvider` can provide a `compute_context`. This allows it to differentiate between multiple remote execution context of the same type. For example two different mysql instances, database schemas, access level, etc. The `FederationProvider` also returns the `Optimizer` that is allows it to self-determine what part of a sub-plan it can federate. - -The `sql` module implements a generic `FederationProvider` for SQL execution engines. A specific SQL engine implements the `SQLExecutor` trait for its engine specific execution. There are a number of compatible providers available in [datafusion-contrib/datafusion-table-providers](https://github.com/datafusion-contrib/datafusion-table-providers/). - -#### Status - -The project is in alpha status. Contributions welcome; land a PR = commit access. +Different databases may have different query languages and execution +capabilities. To accommodate for this, we allow each 'federation provider' to +self-determine what part of a sub-plan it will actually federate. This is done +by letting each federation provider define its own optimizer rule. When a +sub-plan is 'cut out' of the overall plan, it is first passed the federation +provider's optimizer rule. This optimizer rule determines the part of the plan +that is cut out, based on the execution capabilities of the database it +represents. + +## Implementation + +A remote database is represented by the `FederationProvider` trait. To identify +table scans that are available in the same database, they implement +`FederatedTableSource` trait. This trait allows lookup of the corresponding +`FederationProvider`. + +Identifying sub-plans to federate is done by the `FederationOptimizerRule`. +This rule needs to be registered in your DataFusion SessionState. One easy way +to do this is using `default_session_state`. To do its job, the +`FederationOptimizerRule` currently requires that all TableProviders that need +to be federated are `FederatedTableProviderAdaptor`s. The +`FederatedTableProviderAdaptor` also has a fallback mechanism that allows +implementations to fallback to a 'vanilla' TableProvider in case the +`FederationOptimizerRule` isn't registered. + +The `FederationProvider` can provide a `compute_context`. This allows it to +differentiate between multiple remote execution context of the same type. For +example two different mysql instances, database schemas, access level, etc. The +`FederationProvider` also returns the `Optimizer` that is allows it to +self-determine what part of a sub-plan it can federate. + +The `sql` module implements a generic `FederationProvider` for SQL execution +engines. A specific SQL engine implements the `SQLExecutor` trait for its +engine specific execution. There are a number of compatible providers available +in +[datafusion-contrib/datafusion-table-providers](https://github.com/datafusion-contrib/datafusion-table-providers/). + +## Status + +The project is in alpha status. Contributions welcome; land a PR = commit +access. - [Docs (release)](https://docs.rs/datafusion-federation) - [Docs (main)](https://datafusion-contrib.github.io/datafusion-federation/) From 48dbea52e626d0ac316a747d07b75cf4d29c989d Mon Sep 17 00:00:00 2001 From: hozan23 <119854621+hozan23@users.noreply.github.com> Date: Mon, 26 Aug 2024 09:27:43 +0200 Subject: [PATCH 11/12] datafusion-fedeartion: Remove assert_eq macros and handle errors properly (#49) --- datafusion-federation/src/plan_node.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/datafusion-federation/src/plan_node.rs b/datafusion-federation/src/plan_node.rs index c81b152..5a8cd00 100644 --- a/datafusion-federation/src/plan_node.rs +++ b/datafusion-federation/src/plan_node.rs @@ -8,7 +8,7 @@ use std::{ use async_trait::async_trait; use datafusion::{ common::DFSchemaRef, - error::Result, + error::{DataFusionError, Result}, execution::context::{QueryPlanner, SessionState}, logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore}, physical_plan::ExecutionPlan, @@ -57,17 +57,14 @@ impl UserDefinedLogicalNodeCore for FederatedPlanNode { write!(f, "Federated\n {:?}", self.plan) } - fn from_template(&self, exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { - assert_eq!(inputs.len(), 0, "input size inconsistent"); - assert_eq!(exprs.len(), 0, "expression size inconsistent"); - Self { - plan: self.plan.clone(), - planner: self.planner.clone(), + fn with_exprs_and_inputs(&self, exprs: Vec, inputs: Vec) -> Result { + if !inputs.is_empty() { + return Err(DataFusionError::Plan("input size inconsistent".into())); + } + if !exprs.is_empty() { + return Err(DataFusionError::Plan("expression size inconsistent".into())); } - } - /// XXX should consider something else here ? - fn with_exprs_and_inputs(&self, _exprs: Vec, _inputs: Vec) -> Result { Ok(Self { plan: self.plan.clone(), planner: self.planner.clone(), @@ -149,8 +146,11 @@ impl ExtensionPlanner for FederatedPlanner { ) -> Result>> { let dc_node = node.as_any().downcast_ref::(); if let Some(fed_node) = dc_node { - assert_eq!(logical_inputs.len(), 0, "Inconsistent number of inputs"); - assert_eq!(physical_inputs.len(), 0, "Inconsistent number of inputs"); + if !logical_inputs.is_empty() || !physical_inputs.is_empty() { + return Err(DataFusionError::Plan( + "Inconsistent number of inputs".into(), + )); + } let fed_planner = fed_node.planner.clone(); let exec_plan = fed_planner.plan_federation(fed_node, session_state).await?; From c8fa466b011c5b59f440bb08ec80b676737f7bf4 Mon Sep 17 00:00:00 2001 From: hozan23 <119854621+hozan23@users.noreply.github.com> Date: Mon, 26 Aug 2024 17:59:24 +0200 Subject: [PATCH 12/12] Add example in datafusion-federation crate (#50) --- README.md | 5 +- datafusion-federation/Cargo.toml | 18 ++-- datafusion-federation/examples/df-csv.rs | 115 +++++++++++++++++++++++ datafusion-federation/examples/test.csv | 4 + 4 files changed, 133 insertions(+), 9 deletions(-) create mode 100644 datafusion-federation/examples/df-csv.rs create mode 100644 datafusion-federation/examples/test.csv diff --git a/README.md b/README.md index 0553c83..dcfeaa4 100644 --- a/README.md +++ b/README.md @@ -26,9 +26,8 @@ referred to as 'query federation'. ## Usage -> :warning: **All the examples are deprecated for now** - -Check out [the examples](./examples/) to get a feel for how it works. +Check out the [examples](./datafusion-federation/examples/) to get a feel for +how it works. ## Potential use-cases: diff --git a/datafusion-federation/Cargo.toml b/datafusion-federation/Cargo.toml index fa6bd74..6af448c 100644 --- a/datafusion-federation/Cargo.toml +++ b/datafusion-federation/Cargo.toml @@ -10,6 +10,12 @@ description = "Datafusion federation." name = "datafusion_federation" path = "src/lib.rs" +[package.metadata.docs.rs] +# Whether to pass `--all-features` to Cargo (default: false) +all-features = true +# Whether to pass `--no-default-features` to Cargo (default: false) +no-default-features = true + [features] sql = ["futures"] @@ -19,10 +25,10 @@ datafusion.workspace = true futures = { version = "0.3.30", optional = true } -[package.metadata.docs.rs] - -# Whether to pass `--all-features` to Cargo (default: false) -all-features = true +[dev-dependencies] +tokio = { version = "1.39.3", features = ["full"] } -# Whether to pass `--no-default-features` to Cargo (default: false) -no-default-features = true +[[example]] +name = "df-csv" +path = "examples/df-csv.rs" +required-features = ["sql"] diff --git a/datafusion-federation/examples/df-csv.rs b/datafusion-federation/examples/df-csv.rs new file mode 100644 index 0000000..24ac495 --- /dev/null +++ b/datafusion-federation/examples/df-csv.rs @@ -0,0 +1,115 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::{ + arrow::datatypes::SchemaRef, + catalog::SchemaProvider, + error::{DataFusionError, Result}, + execution::{ + context::{SessionContext, SessionState}, + options::CsvReadOptions, + }, + physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}, + sql::sqlparser::dialect::{Dialect, GenericDialect}, +}; +use datafusion_federation::sql::{SQLExecutor, SQLFederationProvider, SQLSchemaProvider}; +use futures::TryStreamExt; + +const CSV_PATH: &str = "./examples/test.csv"; +const TABLE_NAME: &str = "test"; + +#[tokio::main] +async fn main() -> Result<()> { + // Create a remote context + let remote_ctx = Arc::new(SessionContext::new()); + + // Registers a CSV file + remote_ctx + .register_csv(TABLE_NAME, CSV_PATH, CsvReadOptions::new()) + .await?; + let known_tables: Vec = [TABLE_NAME].iter().map(|&x| x.into()).collect(); + + // Register schema + let executor = Arc::new(InMemorySQLExecutor::new(remote_ctx)); + let provider = Arc::new(SQLFederationProvider::new(executor)); + let schema_provider = + Arc::new(SQLSchemaProvider::new_with_tables(provider, known_tables).await?); + + // Local context + let state = datafusion_federation::default_session_state(); + overwrite_default_schema(&state, schema_provider)?; + let ctx = SessionContext::new_with_state(state); + + // Run query + let query = r#"SELECT * from test"#; + let df = ctx.sql(query).await?; + + // let explain = df.clone().explain(true, false)?; + // explain.show().await?; + + df.show().await +} + +fn overwrite_default_schema(state: &SessionState, schema: Arc) -> Result<()> { + let options = &state.config().options().catalog; + let catalog = state + .catalog_list() + .catalog(options.default_catalog.as_str()) + .unwrap(); + + catalog.register_schema(options.default_schema.as_str(), schema)?; + + Ok(()) +} + +pub struct InMemorySQLExecutor { + session: Arc, +} + +impl InMemorySQLExecutor { + pub fn new(session: Arc) -> Self { + Self { session } + } +} + +#[async_trait] +impl SQLExecutor for InMemorySQLExecutor { + fn name(&self) -> &str { + "in_memory_sql_executor" + } + + fn compute_context(&self) -> Option { + None + } + + fn execute(&self, sql: &str, schema: SchemaRef) -> Result { + // Execute it using the remote datafusion session context + let future_stream = _execute(self.session.clone(), sql.to_string()); + let stream = futures::stream::once(future_stream).try_flatten(); + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema.clone(), + stream, + ))) + } + + async fn table_names(&self) -> Result> { + Err(DataFusionError::NotImplemented( + "table inference not implemented".to_string(), + )) + } + + async fn get_table_schema(&self, table_name: &str) -> Result { + let sql = format!("select * from {table_name} limit 1"); + let df = self.session.sql(&sql).await?; + let schema = df.schema().as_arrow().clone(); + Ok(Arc::new(schema)) + } + + fn dialect(&self) -> Arc { + Arc::new(GenericDialect {}) + } +} + +async fn _execute(ctx: Arc, sql: String) -> Result { + ctx.sql(&sql).await?.execute_stream().await +} diff --git a/datafusion-federation/examples/test.csv b/datafusion-federation/examples/test.csv new file mode 100644 index 0000000..811d276 --- /dev/null +++ b/datafusion-federation/examples/test.csv @@ -0,0 +1,4 @@ +foo,bar +a,1 +b,2 +c,3 \ No newline at end of file