Skip to content

Commit f379719

Browse files
authored
feat: add AsyncCatalogProvider helpers for asynchronous catalogs (#13800)
* Add asynchronous catalog traits to help users that have asynchronous catalogs * Apply clippy suggestions * Address PR reviews * Remove allow_unused exceptions * Update remote catalog example to demonstrate new helper structs * Move schema_name / catalog_name parameters into resolve function and out of trait
1 parent ac9584e commit f379719

File tree

4 files changed

+796
-150
lines changed

4 files changed

+796
-150
lines changed

datafusion-examples/examples/remote_catalog.rs

+44-150
Original file line numberDiff line numberDiff line change
@@ -32,46 +32,38 @@
3232
use arrow::array::record_batch;
3333
use arrow_schema::{Field, Fields, Schema, SchemaRef};
3434
use async_trait::async_trait;
35-
use datafusion::catalog::{SchemaProvider, TableProvider};
36-
use datafusion::common::DataFusionError;
35+
use datafusion::catalog::TableProvider;
3736
use datafusion::common::Result;
3837
use datafusion::execution::SendableRecordBatchStream;
3938
use datafusion::physical_plan::memory::MemoryExec;
4039
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
4140
use datafusion::physical_plan::ExecutionPlan;
4241
use datafusion::prelude::{DataFrame, SessionContext};
43-
use datafusion_catalog::Session;
44-
use datafusion_common::{
45-
assert_batches_eq, internal_datafusion_err, plan_err, HashMap, TableReference,
46-
};
42+
use datafusion_catalog::{AsyncSchemaProvider, Session};
43+
use datafusion_common::{assert_batches_eq, internal_datafusion_err, plan_err};
4744
use datafusion_expr::{Expr, TableType};
4845
use futures::TryStreamExt;
4946
use std::any::Any;
50-
use std::sync::{Arc, Mutex};
47+
use std::sync::Arc;
5148

5249
#[tokio::main]
5350
async fn main() -> Result<()> {
5451
// As always, we create a session context to interact with DataFusion
5552
let ctx = SessionContext::new();
5653

5754
// Make a connection to the remote catalog, asynchronously, and configure it
58-
let remote_catalog_interface = RemoteCatalogInterface::connect().await?;
55+
let remote_catalog_interface = Arc::new(RemoteCatalogInterface::connect().await?);
5956

60-
// Register a SchemaProvider for tables in a schema named "remote_schema".
61-
//
62-
// This will let DataFusion query tables such as
63-
// `datafusion.remote_schema.remote_table`
64-
let remote_schema: Arc<dyn SchemaProvider> =
65-
Arc::new(RemoteSchema::new(Arc::new(remote_catalog_interface)));
66-
ctx.catalog("datafusion")
67-
.ok_or_else(|| internal_datafusion_err!("default catalog was not installed"))?
68-
.register_schema("remote_schema", Arc::clone(&remote_schema))?;
57+
// Create an adapter to provide the AsyncSchemaProvider interface to DataFusion
58+
// based on our remote catalog interface
59+
let remote_catalog_adapter = RemoteCatalogDatafusionAdapter(remote_catalog_interface);
6960

7061
// Here is a query that selects data from a table in the remote catalog.
7162
let sql = "SELECT * from remote_schema.remote_table";
7263

7364
// The `SessionContext::sql` interface is async, but it does not
74-
// support asynchronous access to catalogs, so the following query errors.
65+
// support asynchronous access to catalogs, so we cannot register our schema provider
66+
// directly and the following query fails to find our table.
7567
let results = ctx.sql(sql).await;
7668
assert_eq!(
7769
results.unwrap_err().to_string(),
@@ -91,27 +83,26 @@ async fn main() -> Result<()> {
9183
// `remote_schema.remote_table`)
9284
let references = state.resolve_table_references(&statement)?;
9385

94-
// Call `load_tables` to load information from the remote catalog for each
95-
// of the referenced tables. Best practice is to fetch the the information
96-
// for all tables required by the query once (rather than one per table) to
97-
// minimize network overhead
98-
let table_names = references.iter().filter_map(|r| {
99-
if refers_to_schema("datafusion", "remote_schema", r) {
100-
Some(r.table())
101-
} else {
102-
None
103-
}
104-
});
105-
remote_schema
106-
.as_any()
107-
.downcast_ref::<RemoteSchema>()
108-
.expect("correct types")
109-
.load_tables(table_names)
86+
// Now we can asynchronously resolve the table references to get a cached catalog
87+
// that we can use for our query
88+
let resolved_catalog = remote_catalog_adapter
89+
.resolve(&references, state.config(), "datafusion", "remote_schema")
11090
.await?;
11191

112-
// Now continue planing the query after having fetched the remote table and
113-
// it can run as normal
114-
let plan = state.statement_to_plan(statement).await?;
92+
// This resolved catalog only makes sense for this query and so we create a clone
93+
// of the session context with the resolved catalog
94+
let query_ctx = ctx.clone();
95+
96+
query_ctx
97+
.catalog("datafusion")
98+
.ok_or_else(|| internal_datafusion_err!("default catalog was not installed"))?
99+
.register_schema("remote_schema", resolved_catalog)?;
100+
101+
// We can now continue planning the query with this new query-specific context that
102+
// contains our cached catalog
103+
let query_state = query_ctx.state();
104+
105+
let plan = query_state.statement_to_plan(statement).await?;
115106
let results = DataFrame::new(state, plan).collect().await?;
116107
assert_batches_eq!(
117108
[
@@ -145,9 +136,9 @@ impl RemoteCatalogInterface {
145136
}
146137

147138
/// Fetches information for a specific table
148-
pub async fn table_info(&self, name: &str) -> Result<SchemaRef> {
139+
pub async fn table_info(&self, name: &str) -> Result<Option<SchemaRef>> {
149140
if name != "remote_table" {
150-
return plan_err!("Remote table not found: {}", name);
141+
return Ok(None);
151142
}
152143

153144
// In this example, we'll model a remote table with columns "id" and
@@ -159,7 +150,7 @@ impl RemoteCatalogInterface {
159150
Field::new("id", arrow::datatypes::DataType::Int32, false),
160151
Field::new("name", arrow::datatypes::DataType::Utf8, false),
161152
]));
162-
Ok(Arc::new(schema))
153+
Ok(Some(Arc::new(schema)))
163154
}
164155

165156
/// Fetches data for a table from a remote data source
@@ -186,95 +177,22 @@ impl RemoteCatalogInterface {
186177
}
187178
}
188179

189-
/// Implements the DataFusion Catalog API interface for tables
180+
/// Implements an async version of the DataFusion SchemaProvider API for tables
190181
/// stored in a remote catalog.
191-
#[derive(Debug)]
192-
struct RemoteSchema {
193-
/// Connection with the remote catalog
194-
remote_catalog_interface: Arc<RemoteCatalogInterface>,
195-
/// Local cache of tables that have been preloaded from the remote
196-
/// catalog
197-
tables: Mutex<HashMap<String, Arc<dyn TableProvider>>>,
198-
}
199-
200-
impl RemoteSchema {
201-
/// Create a new RemoteSchema
202-
pub fn new(remote_catalog_interface: Arc<RemoteCatalogInterface>) -> Self {
203-
Self {
204-
remote_catalog_interface,
205-
tables: Mutex::new(HashMap::new()),
206-
}
207-
}
208-
209-
/// Load information for the specified tables from the remote source into
210-
/// the local cached copy.
211-
pub async fn load_tables(
212-
&self,
213-
references: impl IntoIterator<Item = &str>,
214-
) -> Result<()> {
215-
for table_name in references {
216-
if !self.table_exist(table_name) {
217-
// Fetch information about the table from the remote catalog
218-
//
219-
// Note that a real remote catalog interface could return more
220-
// information, but at the minimum, DataFusion requires the
221-
// table's schema for planing.
222-
let schema = self.remote_catalog_interface.table_info(table_name).await?;
223-
let remote_table = RemoteTable::new(
224-
Arc::clone(&self.remote_catalog_interface),
225-
table_name,
226-
schema,
227-
);
228-
229-
// Add the table to our local cached list
230-
self.tables
231-
.lock()
232-
.expect("mutex invalid")
233-
.insert(table_name.to_string(), Arc::new(remote_table));
234-
};
235-
}
236-
Ok(())
237-
}
238-
}
182+
struct RemoteCatalogDatafusionAdapter(Arc<RemoteCatalogInterface>);
239183

240-
/// Implement the DataFusion Catalog API for [`RemoteSchema`]
241184
#[async_trait]
242-
impl SchemaProvider for RemoteSchema {
243-
fn as_any(&self) -> &dyn Any {
244-
self
245-
}
246-
247-
fn table_names(&self) -> Vec<String> {
248-
// Note this API is not async so we can't directly call the RemoteCatalogInterface
249-
// instead we use the cached list of loaded tables
250-
self.tables
251-
.lock()
252-
.expect("mutex valid")
253-
.keys()
254-
.cloned()
255-
.collect()
256-
}
257-
258-
// While this API is actually `async` and thus could consult a remote
259-
// catalog directly it is more efficient to use a local cached copy instead,
260-
// which is what we model in this example
261-
async fn table(
262-
&self,
263-
name: &str,
264-
) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
265-
// Look for any pre-loaded tables
266-
let table = self
267-
.tables
268-
.lock()
269-
.expect("mutex valid")
270-
.get(name)
271-
.map(Arc::clone);
272-
Ok(table)
273-
}
274-
275-
fn table_exist(&self, name: &str) -> bool {
276-
// Look for any pre-loaded tables, note this function is also `async`
277-
self.tables.lock().expect("mutex valid").contains_key(name)
185+
impl AsyncSchemaProvider for RemoteCatalogDatafusionAdapter {
186+
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
187+
// Fetch information about the table from the remote catalog
188+
//
189+
// Note that a real remote catalog interface could return more
190+
// information, but at the minimum, DataFusion requires the
191+
// table's schema for planing.
192+
Ok(self.0.table_info(name).await?.map(|schema| {
193+
Arc::new(RemoteTable::new(Arc::clone(&self.0), name, schema))
194+
as Arc<dyn TableProvider>
195+
}))
278196
}
279197
}
280198

@@ -343,27 +261,3 @@ impl TableProvider for RemoteTable {
343261
)?))
344262
}
345263
}
346-
347-
/// Return true if this `table_reference` might be for a table in the specified
348-
/// catalog and schema.
349-
fn refers_to_schema(
350-
catalog_name: &str,
351-
schema_name: &str,
352-
table_reference: &TableReference,
353-
) -> bool {
354-
// Check the references are in the correct catalog and schema
355-
// references like foo.bar.baz
356-
if let Some(catalog) = table_reference.catalog() {
357-
if catalog != catalog_name {
358-
return false;
359-
}
360-
}
361-
// references like bar.baz
362-
if let Some(schema) = table_reference.schema() {
363-
if schema != schema_name {
364-
return false;
365-
}
366-
}
367-
368-
true
369-
}

datafusion/catalog/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,8 @@ datafusion-expr = { workspace = true }
3636
datafusion-physical-plan = { workspace = true }
3737
parking_lot = { workspace = true }
3838

39+
[dev-dependencies]
40+
tokio = { workspace = true }
41+
3942
[lints]
4043
workspace = true

0 commit comments

Comments
 (0)