diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs new file mode 100644 index 000000000000..0886d4749d72 --- /dev/null +++ b/datafusion/ffi/src/catalog_provider.rs @@ -0,0 +1,338 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, ffi::c_void, sync::Arc}; + +use abi_stable::{ + std_types::{ROption, RResult, RString, RVec}, + StableAbi, +}; +use datafusion::catalog::{CatalogProvider, SchemaProvider}; +use tokio::runtime::Handle; + +use crate::{ + df_result, rresult_return, + schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider}, +}; + +use datafusion::error::Result; + +/// A stable struct for sharing [`CatalogProvider`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_CatalogProvider { + pub schema_names: unsafe extern "C" fn(provider: &Self) -> RVec, + + pub schema: unsafe extern "C" fn( + provider: &Self, + name: RString, + ) -> ROption, + + pub register_schema: + unsafe extern "C" fn( + provider: &Self, + name: RString, + schema: &FFI_SchemaProvider, + ) -> RResult, RString>, + + pub deregister_schema: + unsafe extern "C" fn( + provider: &Self, + name: RString, + cascade: bool, + ) -> RResult, RString>, + + /// Used to create a clone on the provider of the execution plan. This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Return the major DataFusion version number of this provider. + pub version: unsafe extern "C" fn() -> u64, + + /// Internal data. This is only to be accessed by the provider of the plan. + /// A [`ForeignCatalogProvider`] should never attempt to access this data. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_CatalogProvider {} +unsafe impl Sync for FFI_CatalogProvider {} + +struct ProviderPrivateData { + provider: Arc, + runtime: Option, +} + +impl FFI_CatalogProvider { + unsafe fn inner(&self) -> &Arc { + let private_data = self.private_data as *const ProviderPrivateData; + &(*private_data).provider + } + + unsafe fn runtime(&self) -> Option { + let private_data = self.private_data as *const ProviderPrivateData; + (*private_data).runtime.clone() + } +} + +unsafe extern "C" fn schema_names_fn_wrapper( + provider: &FFI_CatalogProvider, +) -> RVec { + let names = provider.inner().schema_names(); + names.into_iter().map(|s| s.into()).collect() +} + +unsafe extern "C" fn schema_fn_wrapper( + provider: &FFI_CatalogProvider, + name: RString, +) -> ROption { + let maybe_schema = provider.inner().schema(name.as_str()); + maybe_schema + .map(|schema| FFI_SchemaProvider::new(schema, provider.runtime())) + .into() +} + +unsafe extern "C" fn register_schema_fn_wrapper( + provider: &FFI_CatalogProvider, + name: RString, + schema: &FFI_SchemaProvider, +) -> RResult, RString> { + let runtime = provider.runtime(); + let provider = provider.inner(); + let schema = Arc::new(ForeignSchemaProvider::from(schema)); + + let returned_schema = + rresult_return!(provider.register_schema(name.as_str(), schema)) + .map(|schema| FFI_SchemaProvider::new(schema, runtime)) + .into(); + + RResult::ROk(returned_schema) +} + +unsafe extern "C" fn deregister_schema_fn_wrapper( + provider: &FFI_CatalogProvider, + name: RString, + cascade: bool, +) -> RResult, RString> { + let runtime = provider.runtime(); + let provider = provider.inner(); + + let maybe_schema = + rresult_return!(provider.deregister_schema(name.as_str(), cascade)); + + RResult::ROk( + maybe_schema + .map(|schema| FFI_SchemaProvider::new(schema, runtime)) + .into(), + ) +} + +unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProvider) { + let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); + drop(private_data); +} + +unsafe extern "C" fn clone_fn_wrapper( + provider: &FFI_CatalogProvider, +) -> FFI_CatalogProvider { + let old_private_data = provider.private_data as *const ProviderPrivateData; + let runtime = (*old_private_data).runtime.clone(); + + let private_data = Box::into_raw(Box::new(ProviderPrivateData { + provider: Arc::clone(&(*old_private_data).provider), + runtime, + })) as *mut c_void; + + FFI_CatalogProvider { + schema_names: schema_names_fn_wrapper, + schema: schema_fn_wrapper, + register_schema: register_schema_fn_wrapper, + deregister_schema: deregister_schema_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + } +} + +impl Drop for FFI_CatalogProvider { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl FFI_CatalogProvider { + /// Creates a new [`FFI_CatalogProvider`]. + pub fn new( + provider: Arc, + runtime: Option, + ) -> Self { + let private_data = Box::new(ProviderPrivateData { provider, runtime }); + + Self { + schema_names: schema_names_fn_wrapper, + schema: schema_fn_wrapper, + register_schema: register_schema_fn_wrapper, + deregister_schema: deregister_schema_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data: Box::into_raw(private_data) as *mut c_void, + } + } +} + +/// This wrapper struct exists on the receiver side of the FFI interface, so it has +/// no guarantees about being able to access the data in `private_data`. Any functions +/// defined on this struct must only use the stable functions provided in +/// FFI_CatalogProvider to interact with the foreign table provider. +#[derive(Debug)] +pub struct ForeignCatalogProvider(FFI_CatalogProvider); + +unsafe impl Send for ForeignCatalogProvider {} +unsafe impl Sync for ForeignCatalogProvider {} + +impl From<&FFI_CatalogProvider> for ForeignCatalogProvider { + fn from(provider: &FFI_CatalogProvider) -> Self { + Self(provider.clone()) + } +} + +impl Clone for FFI_CatalogProvider { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +impl CatalogProvider for ForeignCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + unsafe { + (self.0.schema_names)(&self.0) + .into_iter() + .map(|s| s.into()) + .collect() + } + } + + fn schema(&self, name: &str) -> Option> { + unsafe { + let maybe_provider: Option = + (self.0.schema)(&self.0, name.into()).into(); + + maybe_provider.map(|provider| { + Arc::new(ForeignSchemaProvider(provider)) as Arc + }) + } + } + + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Result>> { + unsafe { + let schema = match schema.as_any().downcast_ref::() { + Some(s) => &s.0, + None => &FFI_SchemaProvider::new(schema, None), + }; + let returned_schema: Option = + df_result!((self.0.register_schema)(&self.0, name.into(), schema))? + .into(); + + Ok(returned_schema + .map(|s| Arc::new(ForeignSchemaProvider(s)) as Arc)) + } + } + + fn deregister_schema( + &self, + name: &str, + cascade: bool, + ) -> Result>> { + unsafe { + let returned_schema: Option = + df_result!((self.0.deregister_schema)(&self.0, name.into(), cascade))? + .into(); + + Ok(returned_schema + .map(|s| Arc::new(ForeignSchemaProvider(s)) as Arc)) + } + } +} + +#[cfg(test)] +mod tests { + use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider}; + + use super::*; + + #[test] + fn test_round_trip_ffi_catalog_provider() { + let prior_schema = Arc::new(MemorySchemaProvider::new()); + + let catalog = Arc::new(MemoryCatalogProvider::new()); + assert!(catalog + .as_ref() + .register_schema("prior_schema", prior_schema) + .unwrap() + .is_none()); + + let ffi_catalog = FFI_CatalogProvider::new(catalog, None); + + let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into(); + + let prior_schema_names = foreign_catalog.schema_names(); + assert_eq!(prior_schema_names.len(), 1); + assert_eq!(prior_schema_names[0], "prior_schema"); + + // Replace an existing schema with one of the same name + let returned_schema = foreign_catalog + .register_schema("prior_schema", Arc::new(MemorySchemaProvider::new())) + .expect("Unable to register schema"); + assert!(returned_schema.is_some()); + assert_eq!(foreign_catalog.schema_names().len(), 1); + + // Add a new schema name + let returned_schema = foreign_catalog + .register_schema("second_schema", Arc::new(MemorySchemaProvider::new())) + .expect("Unable to register schema"); + assert!(returned_schema.is_none()); + assert_eq!(foreign_catalog.schema_names().len(), 2); + + // Remove a schema + let returned_schema = foreign_catalog + .deregister_schema("prior_schema", false) + .expect("Unable to deregister schema"); + assert!(returned_schema.is_some()); + assert_eq!(foreign_catalog.schema_names().len(), 1); + + // Retrieve non-existant schema + let returned_schema = foreign_catalog.schema("prior_schema"); + assert!(returned_schema.is_none()); + + // Retrieve valid schema + let returned_schema = foreign_catalog.schema("second_schema"); + assert!(returned_schema.is_some()); + } +} diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 4eabf91d892a..90a21c75cc95 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -24,10 +24,12 @@ #![deny(clippy::clone_on_ref_ptr)] pub mod arrow_wrappers; +pub mod catalog_provider; pub mod execution_plan; pub mod insert_op; pub mod plan_properties; pub mod record_batch_stream; +pub mod schema_provider; pub mod session_config; pub mod table_provider; pub mod table_source; diff --git a/datafusion/ffi/src/schema_provider.rs b/datafusion/ffi/src/schema_provider.rs new file mode 100644 index 000000000000..6e5a590e1a09 --- /dev/null +++ b/datafusion/ffi/src/schema_provider.rs @@ -0,0 +1,385 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, ffi::c_void, sync::Arc}; + +use abi_stable::{ + std_types::{ROption, RResult, RString, RVec}, + StableAbi, +}; +use async_ffi::{FfiFuture, FutureExt}; +use async_trait::async_trait; +use datafusion::{ + catalog::{SchemaProvider, TableProvider}, + error::DataFusionError, +}; +use tokio::runtime::Handle; + +use crate::{ + df_result, rresult_return, + table_provider::{FFI_TableProvider, ForeignTableProvider}, +}; + +use datafusion::error::Result; + +/// A stable struct for sharing [`SchemaProvider`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_SchemaProvider { + pub owner_name: ROption, + + pub table_names: unsafe extern "C" fn(provider: &Self) -> RVec, + + pub table: unsafe extern "C" fn( + provider: &Self, + name: RString, + ) -> FfiFuture< + RResult, RString>, + >, + + pub register_table: + unsafe extern "C" fn( + provider: &Self, + name: RString, + table: FFI_TableProvider, + ) -> RResult, RString>, + + pub deregister_table: + unsafe extern "C" fn( + provider: &Self, + name: RString, + ) -> RResult, RString>, + + pub table_exist: unsafe extern "C" fn(provider: &Self, name: RString) -> bool, + + /// Used to create a clone on the provider of the execution plan. This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Return the major DataFusion version number of this provider. + pub version: unsafe extern "C" fn() -> u64, + + /// Internal data. This is only to be accessed by the provider of the plan. + /// A [`ForeignSchemaProvider`] should never attempt to access this data. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_SchemaProvider {} +unsafe impl Sync for FFI_SchemaProvider {} + +struct ProviderPrivateData { + provider: Arc, + runtime: Option, +} + +impl FFI_SchemaProvider { + unsafe fn inner(&self) -> &Arc { + let private_data = self.private_data as *const ProviderPrivateData; + &(*private_data).provider + } + + unsafe fn runtime(&self) -> Option { + let private_data = self.private_data as *const ProviderPrivateData; + (*private_data).runtime.clone() + } +} + +unsafe extern "C" fn table_names_fn_wrapper( + provider: &FFI_SchemaProvider, +) -> RVec { + let provider = provider.inner(); + + let table_names = provider.table_names(); + table_names.into_iter().map(|s| s.into()).collect() +} + +unsafe extern "C" fn table_fn_wrapper( + provider: &FFI_SchemaProvider, + name: RString, +) -> FfiFuture, RString>> { + let runtime = provider.runtime(); + let provider = Arc::clone(provider.inner()); + + async move { + let table = rresult_return!(provider.table(name.as_str()).await) + .map(|t| FFI_TableProvider::new(t, true, runtime)) + .into(); + + RResult::ROk(table) + } + .into_ffi() +} + +unsafe extern "C" fn register_table_fn_wrapper( + provider: &FFI_SchemaProvider, + name: RString, + table: FFI_TableProvider, +) -> RResult, RString> { + let runtime = provider.runtime(); + let provider = provider.inner(); + + let table = Arc::new(ForeignTableProvider(table)); + + let returned_table = rresult_return!(provider.register_table(name.into(), table)) + .map(|t| FFI_TableProvider::new(t, true, runtime)); + + RResult::ROk(returned_table.into()) +} + +unsafe extern "C" fn deregister_table_fn_wrapper( + provider: &FFI_SchemaProvider, + name: RString, +) -> RResult, RString> { + let runtime = provider.runtime(); + let provider = provider.inner(); + + let returned_table = rresult_return!(provider.deregister_table(name.as_str())) + .map(|t| FFI_TableProvider::new(t, true, runtime)); + + RResult::ROk(returned_table.into()) +} + +unsafe extern "C" fn table_exist_fn_wrapper( + provider: &FFI_SchemaProvider, + name: RString, +) -> bool { + provider.inner().table_exist(name.as_str()) +} + +unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SchemaProvider) { + let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); + drop(private_data); +} + +unsafe extern "C" fn clone_fn_wrapper( + provider: &FFI_SchemaProvider, +) -> FFI_SchemaProvider { + let old_private_data = provider.private_data as *const ProviderPrivateData; + let runtime = (*old_private_data).runtime.clone(); + + let private_data = Box::into_raw(Box::new(ProviderPrivateData { + provider: Arc::clone(&(*old_private_data).provider), + runtime, + })) as *mut c_void; + + FFI_SchemaProvider { + owner_name: provider.owner_name.clone(), + table_names: table_names_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + table: table_fn_wrapper, + register_table: register_table_fn_wrapper, + deregister_table: deregister_table_fn_wrapper, + table_exist: table_exist_fn_wrapper, + } +} + +impl Drop for FFI_SchemaProvider { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl FFI_SchemaProvider { + /// Creates a new [`FFI_SchemaProvider`]. + pub fn new( + provider: Arc, + runtime: Option, + ) -> Self { + let owner_name = provider.owner_name().map(|s| s.into()).into(); + let private_data = Box::new(ProviderPrivateData { provider, runtime }); + + Self { + owner_name, + table_names: table_names_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data: Box::into_raw(private_data) as *mut c_void, + table: table_fn_wrapper, + register_table: register_table_fn_wrapper, + deregister_table: deregister_table_fn_wrapper, + table_exist: table_exist_fn_wrapper, + } + } +} + +/// This wrapper struct exists on the receiver side of the FFI interface, so it has +/// no guarantees about being able to access the data in `private_data`. Any functions +/// defined on this struct must only use the stable functions provided in +/// FFI_SchemaProvider to interact with the foreign table provider. +#[derive(Debug)] +pub struct ForeignSchemaProvider(pub FFI_SchemaProvider); + +unsafe impl Send for ForeignSchemaProvider {} +unsafe impl Sync for ForeignSchemaProvider {} + +impl From<&FFI_SchemaProvider> for ForeignSchemaProvider { + fn from(provider: &FFI_SchemaProvider) -> Self { + Self(provider.clone()) + } +} + +impl Clone for FFI_SchemaProvider { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +#[async_trait] +impl SchemaProvider for ForeignSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn owner_name(&self) -> Option<&str> { + let name: Option<&RString> = self.0.owner_name.as_ref().into(); + name.map(|s| s.as_str()) + } + + fn table_names(&self) -> Vec { + unsafe { + (self.0.table_names)(&self.0) + .into_iter() + .map(|s| s.into()) + .collect() + } + } + + async fn table( + &self, + name: &str, + ) -> Result>, DataFusionError> { + unsafe { + let table: Option = + df_result!((self.0.table)(&self.0, name.into()).await)?.into(); + + let table = table.as_ref().map(|t| { + Arc::new(ForeignTableProvider::from(t)) as Arc + }); + + Ok(table) + } + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> Result>> { + unsafe { + let ffi_table = match table.as_any().downcast_ref::() { + Some(t) => t.0.clone(), + None => FFI_TableProvider::new(table, true, None), + }; + + let returned_provider: Option = + df_result!((self.0.register_table)(&self.0, name.into(), ffi_table))? + .into(); + + Ok(returned_provider + .map(|t| Arc::new(ForeignTableProvider(t)) as Arc)) + } + } + + fn deregister_table(&self, name: &str) -> Result>> { + let returned_provider: Option = unsafe { + df_result!((self.0.deregister_table)(&self.0, name.into()))?.into() + }; + + Ok(returned_provider + .map(|t| Arc::new(ForeignTableProvider(t)) as Arc)) + } + + /// Returns true if table exist in the schema provider, false otherwise. + fn table_exist(&self, name: &str) -> bool { + unsafe { (self.0.table_exist)(&self.0, name.into()) } + } +} + +#[cfg(test)] +mod tests { + use arrow::datatypes::Schema; + use datafusion::{catalog::MemorySchemaProvider, datasource::empty::EmptyTable}; + + use super::*; + + fn empty_table() -> Arc { + Arc::new(EmptyTable::new(Arc::new(Schema::empty()))) + } + + #[tokio::test] + async fn test_round_trip_ffi_schema_provider() { + let schema_provider = Arc::new(MemorySchemaProvider::new()); + assert!(schema_provider + .as_ref() + .register_table("prior_table".to_string(), empty_table()) + .unwrap() + .is_none()); + + let ffi_schema_provider = FFI_SchemaProvider::new(schema_provider, None); + + let foreign_schema_provider: ForeignSchemaProvider = + (&ffi_schema_provider).into(); + + let prior_table_names = foreign_schema_provider.table_names(); + assert_eq!(prior_table_names.len(), 1); + assert_eq!(prior_table_names[0], "prior_table"); + + // Replace an existing table with one of the same name generates an error + let returned_schema = foreign_schema_provider + .register_table("prior_table".to_string(), empty_table()); + assert!(returned_schema.is_err()); + assert_eq!(foreign_schema_provider.table_names().len(), 1); + + // Add a new table + let returned_schema = foreign_schema_provider + .register_table("second_table".to_string(), empty_table()) + .expect("Unable to register table"); + assert!(returned_schema.is_none()); + assert_eq!(foreign_schema_provider.table_names().len(), 2); + + // Remove a table + let returned_schema = foreign_schema_provider + .deregister_table("prior_table") + .expect("Unable to deregister table"); + assert!(returned_schema.is_some()); + assert_eq!(foreign_schema_provider.table_names().len(), 1); + + // Retrieve non-existant table + let returned_schema = foreign_schema_provider + .table("prior_table") + .await + .expect("Unable to query table"); + assert!(returned_schema.is_none()); + assert!(!foreign_schema_provider.table_exist("prior_table")); + + // Retrieve valid table + let returned_schema = foreign_schema_provider + .table("second_table") + .await + .expect("Unable to query table"); + assert!(returned_schema.is_some()); + assert!(foreign_schema_provider.table_exist("second_table")); + } +} diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index 0b4080abcb55..a7391a85031e 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -382,7 +382,7 @@ impl FFI_TableProvider { /// defined on this struct must only use the stable functions provided in /// FFI_TableProvider to interact with the foreign table provider. #[derive(Debug)] -pub struct ForeignTableProvider(FFI_TableProvider); +pub struct ForeignTableProvider(pub FFI_TableProvider); unsafe impl Send for ForeignTableProvider {} unsafe impl Sync for ForeignTableProvider {} diff --git a/datafusion/ffi/src/tests/catalog.rs b/datafusion/ffi/src/tests/catalog.rs new file mode 100644 index 000000000000..f4293adb41b9 --- /dev/null +++ b/datafusion/ffi/src/tests/catalog.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This is an example of an async table provider that will call functions on +//! the tokio runtime of the library providing the function. Since we cannot +//! share a tokio runtime across the ffi boundary and the producer and consumer +//! may have different runtimes, we need to store a reference to the runtime +//! and enter it during streaming calls. The entering of the runtime will +//! occur by the datafusion_ffi crate during the streaming calls. This code +//! serves as an integration test of this feature. If we do not correctly +//! access the runtime, then you will get a panic when trying to do operations +//! such as spawning a tokio task. + +use std::{any::Any, fmt::Debug, sync::Arc}; + +use crate::catalog_provider::FFI_CatalogProvider; +use arrow::datatypes::Schema; +use async_trait::async_trait; +use datafusion::{ + catalog::{ + CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider, + TableProvider, + }, + common::exec_err, + datasource::MemTable, + error::{DataFusionError, Result}, +}; + +/// This schema provider is intended only for unit tests. It prepopulates with one +/// table and only allows for tables named sales and purchases. +#[derive(Debug)] +pub struct FixedSchemaProvider { + inner: MemorySchemaProvider, +} + +pub fn fruit_table() -> Arc { + use arrow::datatypes::{DataType, Field}; + use datafusion::common::record_batch; + + let schema = Arc::new(Schema::new(vec![ + Field::new("units", DataType::Int32, true), + Field::new("price", DataType::Float64, true), + ])); + + let partitions = vec![ + record_batch!( + ("units", Int32, vec![10, 20, 30]), + ("price", Float64, vec![1.0, 2.0, 5.0]) + ) + .unwrap(), + record_batch!( + ("units", Int32, vec![5, 7]), + ("price", Float64, vec![1.5, 2.5]) + ) + .unwrap(), + ]; + + Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap()) +} + +impl Default for FixedSchemaProvider { + fn default() -> Self { + let inner = MemorySchemaProvider::new(); + + let table = fruit_table(); + + let _ = inner + .register_table("purchases".to_string(), table) + .unwrap(); + + Self { inner } + } +} + +#[async_trait] +impl SchemaProvider for FixedSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.inner.table_names() + } + + async fn table( + &self, + name: &str, + ) -> Result>, DataFusionError> { + self.inner.table(name).await + } + + fn table_exist(&self, name: &str) -> bool { + self.inner.table_exist(name) + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> Result>> { + if name.as_str() != "sales" && name.as_str() != "purchases" { + return exec_err!( + "FixedSchemaProvider only provides two tables: sales and purchases" + ); + } + + self.inner.register_table(name, table) + } + + fn deregister_table(&self, name: &str) -> Result>> { + self.inner.deregister_table(name) + } +} + +/// This catalog provider is intended only for unit tests. It prepopulates with one +/// schema and only allows for schemas named after four types of fruit. +#[derive(Debug)] +pub struct FixedCatalogProvider { + inner: MemoryCatalogProvider, +} + +impl Default for FixedCatalogProvider { + fn default() -> Self { + let inner = MemoryCatalogProvider::new(); + + let _ = inner.register_schema("apple", Arc::new(FixedSchemaProvider::default())); + + Self { inner } + } +} + +impl CatalogProvider for FixedCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.inner.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + self.inner.schema(name) + } + + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Result>> { + if !["apple", "banana", "cherry", "date"].contains(&name) { + return exec_err!("FixedCatalogProvider only provides four schemas: apple, banana, cherry, date"); + } + + self.inner.register_schema(name, schema) + } + + fn deregister_schema( + &self, + name: &str, + cascade: bool, + ) -> Result>> { + self.inner.deregister_schema(name, cascade) + } +} + +pub(crate) extern "C" fn create_catalog_provider() -> FFI_CatalogProvider { + let catalog_provider = Arc::new(FixedCatalogProvider::default()); + FFI_CatalogProvider::new(catalog_provider, None) +} diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs index 5a471cb8fe43..4b4a29276d9a 100644 --- a/datafusion/ffi/src/tests/mod.rs +++ b/datafusion/ffi/src/tests/mod.rs @@ -25,6 +25,9 @@ use abi_stable::{ sabi_types::VersionStrings, StableAbi, }; +use catalog::create_catalog_provider; + +use crate::catalog_provider::FFI_CatalogProvider; use super::{table_provider::FFI_TableProvider, udf::FFI_ScalarUDF}; use arrow::array::RecordBatch; @@ -37,6 +40,7 @@ use sync_provider::create_sync_table_provider; use udf_udaf_udwf::create_ffi_abs_func; mod async_provider; +pub mod catalog; mod sync_provider; mod udf_udaf_udwf; @@ -47,6 +51,9 @@ mod udf_udaf_udwf; /// both the module loading program and library that implements the /// module. pub struct ForeignLibraryModule { + /// Construct an opinionated catalog provider + pub create_catalog: extern "C" fn() -> FFI_CatalogProvider, + /// Constructs the table provider pub create_table: extern "C" fn(synchronous: bool) -> FFI_TableProvider, @@ -95,6 +102,7 @@ extern "C" fn construct_table_provider(synchronous: bool) -> FFI_TableProvider { /// This defines the entry point for using the module. pub fn get_foreign_library_module() -> ForeignLibraryModuleRef { ForeignLibraryModule { + create_catalog: create_catalog_provider, create_table: construct_table_provider, create_scalar_udf: create_ffi_abs_func, version: super::version, diff --git a/datafusion/ffi/tests/ffi_integration.rs b/datafusion/ffi/tests/ffi_integration.rs index 84e120df4299..f610f12c8244 100644 --- a/datafusion/ffi/tests/ffi_integration.rs +++ b/datafusion/ffi/tests/ffi_integration.rs @@ -25,6 +25,7 @@ mod tests { use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::ScalarUDF; use datafusion::prelude::{col, SessionContext}; + use datafusion_ffi::catalog_provider::ForeignCatalogProvider; use datafusion_ffi::table_provider::ForeignTableProvider; use datafusion_ffi::tests::{create_record_batch, ForeignLibraryModuleRef}; use datafusion_ffi::udf::ForeignScalarUDF; @@ -179,4 +180,30 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_catalog() -> Result<()> { + let module = get_module()?; + + let ffi_catalog = + module + .create_catalog() + .ok_or(DataFusionError::NotImplemented( + "External catalog provider failed to implement create_catalog" + .to_string(), + ))?(); + let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into(); + + let ctx = SessionContext::default(); + let _ = ctx.register_catalog("fruit", Arc::new(foreign_catalog)); + + let df = ctx.table("fruit.apple.purchases").await?; + + let results = df.collect().await?; + + assert!(!results.is_empty()); + assert!(results[0].num_rows() != 0); + + Ok(()) + } }