Skip to content

Commit

Permalink
Generic FlightTableFactory with a default FlightSqlDriver
Browse files Browse the repository at this point in the history
  • Loading branch information
ccciudatu committed Aug 19, 2024
1 parent d47bf9c commit 8b49989
Show file tree
Hide file tree
Showing 8 changed files with 1,225 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ Cargo.lock
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea/
37 changes: 37 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[package]
name = "datafusion-table-provider-flight"
version = "0.1.0"
edition = "2021"

[dependencies]
arrow-array = "52.2.0"
arrow-flight = { version = "52.2.0", features = ["flight-sql-experimental", "tls"] }
arrow-schema = { version = "52.2.0", features = ["serde"] }
async-trait = "0.1.81"
base64 = "0.22.1"
bytes = "1.7.1"
datafusion = "41.0.0"
datafusion-expr = "41.0.0"
datafusion-physical-expr = "41.0.0"
datafusion-physical-plan = "41.0.0"
datafusion-proto = "41.0.0"
futures = "0.3.30"
prost = "0.12" # pinned for arrow-flight compat
serde = { version = "1.0.208", features = ["derive"] }
serde_json = "1.0.125"
tokio = { version = "1.36", features = [
"macros",
"rt",
"sync",
"rt-multi-thread",
"parking_lot",
"fs",
] }
tonic = "0.11" # pinned for arrow-flight compat

[dev-dependencies]
tokio-stream = { version = "0.1.15", features = ["net"] }

[[example]]
name = "flight-sql"
path = "examples/flight-sql.rs"
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
# datafusion-table-provider-flightsql
FlightSQL TableProvider for DataFusion
# DataFusion TableProviderFactory for Arrow Flight
A generic `FlightTableFactory` that can integrate any Arrow Flight RPC service
as a `TableProviderFactory`. Relies on a `FlightDriver` trait implementation to
handle the `GetFlightInfo` call and all its prerequisites.

## Flight SQL
This crate includes a `FlightSqlDriver` that has been tested with
[Ballista](https://github.com/apache/datafusion-ballista),
[Dremio](https://github.com/dremio/dremio-oss) and
[ROAPI](https://github.com/roapi/roapi).
56 changes: 56 additions & 0 deletions examples/flight-sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::prelude::SessionContext;
use datafusion_table_provider_flight::sql::FlightSqlDriver;
use datafusion_table_provider_flight::FlightTableFactory;
use std::sync::Arc;

/// Prerequisites:
/// ```
/// $ brew install roapi
/// $ roapi -t taxi=https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
/// ```
#[tokio::main]
async fn main() -> datafusion::common::Result<()> {
let ctx = SessionContext::new();
ctx.state_ref().write().table_factories_mut().insert(
"FLIGHT_SQL".into(),
Arc::new(FlightTableFactory::new(
Arc::new(FlightSqlDriver::default()),
)),
);
let _ = ctx
.sql(r#"
CREATE EXTERNAL TABLE trip_data STORED AS FLIGHT_SQL
LOCATION 'http://localhost:32010'
OPTIONS (
'flight.sql.query' 'SELECT * FROM taxi'
)
"#)
.await?;

let df = ctx
.sql(r#"
SELECT "VendorID", COUNT(*), SUM(passenger_count), SUM(total_amount)
FROM trip_data
GROUP BY "VendorID"
ORDER BY COUNT(*) DESC
"#)
.await?;
df.show().await
}
67 changes: 67 additions & 0 deletions src/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Flight plan codecs

use std::sync::Arc;

use datafusion::common::DataFusionError;
use datafusion_expr::registry::FunctionRegistry;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;

use crate::exec::{FlightConfig, FlightExec};

/// Physical extension codec for FlightExec
#[derive(Clone, Debug, Default)]
pub struct FlightPhysicalCodec {}

impl PhysicalExtensionCodec for FlightPhysicalCodec {
fn try_decode(
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_registry: &dyn FunctionRegistry,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
if inputs.is_empty() {
let config: FlightConfig = serde_json::from_slice(buf)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(Arc::from(FlightExec::from(config)))
} else {
Err(DataFusionError::Internal(
"FlightExec is not supposed to have any inputs".into(),
))
}
}

fn try_encode(
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
) -> datafusion::common::Result<()> {
if let Some(flight) = node.as_any().downcast_ref::<FlightExec>() {
let mut bytes = serde_json::to_vec(flight.config())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
buf.append(&mut bytes);
Ok(())
} else {
Err(DataFusionError::Internal(
"This codec only supports the FlightExec physical plan".into(),
))
}
}
}
Loading

0 comments on commit 8b49989

Please sign in to comment.