Skip to content

Commit 72705a3

Browse files
authored
Add CatalogProvider and SchemaProvider to FFI Crate (#15280)
* initial commit for schema and catalog providers * Add unit tests * Update docstrings * Add integration test
1 parent 4780a11 commit 72705a3

File tree

7 files changed

+944
-1
lines changed

7 files changed

+944
-1
lines changed
+338
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::{any::Any, ffi::c_void, sync::Arc};
19+
20+
use abi_stable::{
21+
std_types::{ROption, RResult, RString, RVec},
22+
StableAbi,
23+
};
24+
use datafusion::catalog::{CatalogProvider, SchemaProvider};
25+
use tokio::runtime::Handle;
26+
27+
use crate::{
28+
df_result, rresult_return,
29+
schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider},
30+
};
31+
32+
use datafusion::error::Result;
33+
34+
/// A stable struct for sharing [`CatalogProvider`] across FFI boundaries.
35+
#[repr(C)]
36+
#[derive(Debug, StableAbi)]
37+
#[allow(non_camel_case_types)]
38+
pub struct FFI_CatalogProvider {
39+
pub schema_names: unsafe extern "C" fn(provider: &Self) -> RVec<RString>,
40+
41+
pub schema: unsafe extern "C" fn(
42+
provider: &Self,
43+
name: RString,
44+
) -> ROption<FFI_SchemaProvider>,
45+
46+
pub register_schema:
47+
unsafe extern "C" fn(
48+
provider: &Self,
49+
name: RString,
50+
schema: &FFI_SchemaProvider,
51+
) -> RResult<ROption<FFI_SchemaProvider>, RString>,
52+
53+
pub deregister_schema:
54+
unsafe extern "C" fn(
55+
provider: &Self,
56+
name: RString,
57+
cascade: bool,
58+
) -> RResult<ROption<FFI_SchemaProvider>, RString>,
59+
60+
/// Used to create a clone on the provider of the execution plan. This should
61+
/// only need to be called by the receiver of the plan.
62+
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
63+
64+
/// Release the memory of the private data when it is no longer being used.
65+
pub release: unsafe extern "C" fn(arg: &mut Self),
66+
67+
/// Return the major DataFusion version number of this provider.
68+
pub version: unsafe extern "C" fn() -> u64,
69+
70+
/// Internal data. This is only to be accessed by the provider of the plan.
71+
/// A [`ForeignCatalogProvider`] should never attempt to access this data.
72+
pub private_data: *mut c_void,
73+
}
74+
75+
unsafe impl Send for FFI_CatalogProvider {}
76+
unsafe impl Sync for FFI_CatalogProvider {}
77+
78+
struct ProviderPrivateData {
79+
provider: Arc<dyn CatalogProvider + Send>,
80+
runtime: Option<Handle>,
81+
}
82+
83+
impl FFI_CatalogProvider {
84+
unsafe fn inner(&self) -> &Arc<dyn CatalogProvider + Send> {
85+
let private_data = self.private_data as *const ProviderPrivateData;
86+
&(*private_data).provider
87+
}
88+
89+
unsafe fn runtime(&self) -> Option<Handle> {
90+
let private_data = self.private_data as *const ProviderPrivateData;
91+
(*private_data).runtime.clone()
92+
}
93+
}
94+
95+
unsafe extern "C" fn schema_names_fn_wrapper(
96+
provider: &FFI_CatalogProvider,
97+
) -> RVec<RString> {
98+
let names = provider.inner().schema_names();
99+
names.into_iter().map(|s| s.into()).collect()
100+
}
101+
102+
unsafe extern "C" fn schema_fn_wrapper(
103+
provider: &FFI_CatalogProvider,
104+
name: RString,
105+
) -> ROption<FFI_SchemaProvider> {
106+
let maybe_schema = provider.inner().schema(name.as_str());
107+
maybe_schema
108+
.map(|schema| FFI_SchemaProvider::new(schema, provider.runtime()))
109+
.into()
110+
}
111+
112+
unsafe extern "C" fn register_schema_fn_wrapper(
113+
provider: &FFI_CatalogProvider,
114+
name: RString,
115+
schema: &FFI_SchemaProvider,
116+
) -> RResult<ROption<FFI_SchemaProvider>, RString> {
117+
let runtime = provider.runtime();
118+
let provider = provider.inner();
119+
let schema = Arc::new(ForeignSchemaProvider::from(schema));
120+
121+
let returned_schema =
122+
rresult_return!(provider.register_schema(name.as_str(), schema))
123+
.map(|schema| FFI_SchemaProvider::new(schema, runtime))
124+
.into();
125+
126+
RResult::ROk(returned_schema)
127+
}
128+
129+
unsafe extern "C" fn deregister_schema_fn_wrapper(
130+
provider: &FFI_CatalogProvider,
131+
name: RString,
132+
cascade: bool,
133+
) -> RResult<ROption<FFI_SchemaProvider>, RString> {
134+
let runtime = provider.runtime();
135+
let provider = provider.inner();
136+
137+
let maybe_schema =
138+
rresult_return!(provider.deregister_schema(name.as_str(), cascade));
139+
140+
RResult::ROk(
141+
maybe_schema
142+
.map(|schema| FFI_SchemaProvider::new(schema, runtime))
143+
.into(),
144+
)
145+
}
146+
147+
unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProvider) {
148+
let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData);
149+
drop(private_data);
150+
}
151+
152+
unsafe extern "C" fn clone_fn_wrapper(
153+
provider: &FFI_CatalogProvider,
154+
) -> FFI_CatalogProvider {
155+
let old_private_data = provider.private_data as *const ProviderPrivateData;
156+
let runtime = (*old_private_data).runtime.clone();
157+
158+
let private_data = Box::into_raw(Box::new(ProviderPrivateData {
159+
provider: Arc::clone(&(*old_private_data).provider),
160+
runtime,
161+
})) as *mut c_void;
162+
163+
FFI_CatalogProvider {
164+
schema_names: schema_names_fn_wrapper,
165+
schema: schema_fn_wrapper,
166+
register_schema: register_schema_fn_wrapper,
167+
deregister_schema: deregister_schema_fn_wrapper,
168+
clone: clone_fn_wrapper,
169+
release: release_fn_wrapper,
170+
version: super::version,
171+
private_data,
172+
}
173+
}
174+
175+
impl Drop for FFI_CatalogProvider {
176+
fn drop(&mut self) {
177+
unsafe { (self.release)(self) }
178+
}
179+
}
180+
181+
impl FFI_CatalogProvider {
182+
/// Creates a new [`FFI_CatalogProvider`].
183+
pub fn new(
184+
provider: Arc<dyn CatalogProvider + Send>,
185+
runtime: Option<Handle>,
186+
) -> Self {
187+
let private_data = Box::new(ProviderPrivateData { provider, runtime });
188+
189+
Self {
190+
schema_names: schema_names_fn_wrapper,
191+
schema: schema_fn_wrapper,
192+
register_schema: register_schema_fn_wrapper,
193+
deregister_schema: deregister_schema_fn_wrapper,
194+
clone: clone_fn_wrapper,
195+
release: release_fn_wrapper,
196+
version: super::version,
197+
private_data: Box::into_raw(private_data) as *mut c_void,
198+
}
199+
}
200+
}
201+
202+
/// This wrapper struct exists on the receiver side of the FFI interface, so it has
203+
/// no guarantees about being able to access the data in `private_data`. Any functions
204+
/// defined on this struct must only use the stable functions provided in
205+
/// FFI_CatalogProvider to interact with the foreign table provider.
206+
#[derive(Debug)]
207+
pub struct ForeignCatalogProvider(FFI_CatalogProvider);
208+
209+
unsafe impl Send for ForeignCatalogProvider {}
210+
unsafe impl Sync for ForeignCatalogProvider {}
211+
212+
impl From<&FFI_CatalogProvider> for ForeignCatalogProvider {
213+
fn from(provider: &FFI_CatalogProvider) -> Self {
214+
Self(provider.clone())
215+
}
216+
}
217+
218+
impl Clone for FFI_CatalogProvider {
219+
fn clone(&self) -> Self {
220+
unsafe { (self.clone)(self) }
221+
}
222+
}
223+
224+
impl CatalogProvider for ForeignCatalogProvider {
225+
fn as_any(&self) -> &dyn Any {
226+
self
227+
}
228+
229+
fn schema_names(&self) -> Vec<String> {
230+
unsafe {
231+
(self.0.schema_names)(&self.0)
232+
.into_iter()
233+
.map(|s| s.into())
234+
.collect()
235+
}
236+
}
237+
238+
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
239+
unsafe {
240+
let maybe_provider: Option<FFI_SchemaProvider> =
241+
(self.0.schema)(&self.0, name.into()).into();
242+
243+
maybe_provider.map(|provider| {
244+
Arc::new(ForeignSchemaProvider(provider)) as Arc<dyn SchemaProvider>
245+
})
246+
}
247+
}
248+
249+
fn register_schema(
250+
&self,
251+
name: &str,
252+
schema: Arc<dyn SchemaProvider>,
253+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
254+
unsafe {
255+
let schema = match schema.as_any().downcast_ref::<ForeignSchemaProvider>() {
256+
Some(s) => &s.0,
257+
None => &FFI_SchemaProvider::new(schema, None),
258+
};
259+
let returned_schema: Option<FFI_SchemaProvider> =
260+
df_result!((self.0.register_schema)(&self.0, name.into(), schema))?
261+
.into();
262+
263+
Ok(returned_schema
264+
.map(|s| Arc::new(ForeignSchemaProvider(s)) as Arc<dyn SchemaProvider>))
265+
}
266+
}
267+
268+
fn deregister_schema(
269+
&self,
270+
name: &str,
271+
cascade: bool,
272+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
273+
unsafe {
274+
let returned_schema: Option<FFI_SchemaProvider> =
275+
df_result!((self.0.deregister_schema)(&self.0, name.into(), cascade))?
276+
.into();
277+
278+
Ok(returned_schema
279+
.map(|s| Arc::new(ForeignSchemaProvider(s)) as Arc<dyn SchemaProvider>))
280+
}
281+
}
282+
}
283+
284+
#[cfg(test)]
285+
mod tests {
286+
use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider};
287+
288+
use super::*;
289+
290+
#[test]
291+
fn test_round_trip_ffi_catalog_provider() {
292+
let prior_schema = Arc::new(MemorySchemaProvider::new());
293+
294+
let catalog = Arc::new(MemoryCatalogProvider::new());
295+
assert!(catalog
296+
.as_ref()
297+
.register_schema("prior_schema", prior_schema)
298+
.unwrap()
299+
.is_none());
300+
301+
let ffi_catalog = FFI_CatalogProvider::new(catalog, None);
302+
303+
let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into();
304+
305+
let prior_schema_names = foreign_catalog.schema_names();
306+
assert_eq!(prior_schema_names.len(), 1);
307+
assert_eq!(prior_schema_names[0], "prior_schema");
308+
309+
// Replace an existing schema with one of the same name
310+
let returned_schema = foreign_catalog
311+
.register_schema("prior_schema", Arc::new(MemorySchemaProvider::new()))
312+
.expect("Unable to register schema");
313+
assert!(returned_schema.is_some());
314+
assert_eq!(foreign_catalog.schema_names().len(), 1);
315+
316+
// Add a new schema name
317+
let returned_schema = foreign_catalog
318+
.register_schema("second_schema", Arc::new(MemorySchemaProvider::new()))
319+
.expect("Unable to register schema");
320+
assert!(returned_schema.is_none());
321+
assert_eq!(foreign_catalog.schema_names().len(), 2);
322+
323+
// Remove a schema
324+
let returned_schema = foreign_catalog
325+
.deregister_schema("prior_schema", false)
326+
.expect("Unable to deregister schema");
327+
assert!(returned_schema.is_some());
328+
assert_eq!(foreign_catalog.schema_names().len(), 1);
329+
330+
// Retrieve non-existant schema
331+
let returned_schema = foreign_catalog.schema("prior_schema");
332+
assert!(returned_schema.is_none());
333+
334+
// Retrieve valid schema
335+
let returned_schema = foreign_catalog.schema("second_schema");
336+
assert!(returned_schema.is_some());
337+
}
338+
}

datafusion/ffi/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@
2525
#![deny(clippy::clone_on_ref_ptr)]
2626

2727
pub mod arrow_wrappers;
28+
pub mod catalog_provider;
2829
pub mod execution_plan;
2930
pub mod insert_op;
3031
pub mod plan_properties;
3132
pub mod record_batch_stream;
33+
pub mod schema_provider;
3234
pub mod session_config;
3335
pub mod table_provider;
3436
pub mod table_source;

0 commit comments

Comments
 (0)