Skip to content

feat: expand ListingSchemaProvider to support register and deregister table #3150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Nordalf
Copy link
Contributor

@Nordalf Nordalf commented Jan 21, 2025

Description

The ListingSchemaProvider was missing the implementations of the register and deregister traits from the SchemaProvider. These are now implemented, and to meet requirements from remote object stores, the DeltaTableBuilder is now used to pass along the storage options.

The tables property has been modified to instead of having DashMap<String, String>, it is now using DashMap<String, Arc<dyn TableProvider>> to support downcast_ref and mutability elsewhere.

Inspiration from: Datafusion ListingSchemaProvider

Related Issue(s)

There is no related issues. I have a clone of this with extra modifications in my own project. I find it useful to have when adding a new schema and wanted to share this with others 👍

Documentation

None

@github-actions github-actions bot added the binding/rust Issues for the Rust crate label Jan 21, 2025
@Nordalf
Copy link
Contributor Author

Nordalf commented Jan 21, 2025

Open for discussion on this one. Have had this locally for a long time and wanted to share the branch. I use this quite a lot in my project and others may find it useful to their project.

@ion-elgreco
Copy link
Collaborator

@hntd187 can you take a look? :)

Copy link
Collaborator

@hntd187 hntd187 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm supportive of this change, but I would like some changes to the internals.

@Nordalf
Copy link
Contributor Author

Nordalf commented Jan 21, 2025

Thanks for the great comments @roeap and @hntd187. I will take a look and handle them tomorrow 🚀

@Nordalf
Copy link
Contributor Author

Nordalf commented Feb 17, 2025

Thanks for the great comments @roeap and @hntd187. I will take a look and handle them tomorrow 🚀

This did not age well but hopefully I get some time this week to check out. I did figure out that DashMap already handles locking.

@ion-elgreco
Copy link
Collaborator

@Nordalf is this ready?

@Nordalf
Copy link
Contributor Author

Nordalf commented Mar 2, 2025

@ion-elgreco - to me it is ready and I have just squashed the commits 👍

The latest changes resolves the comments but those changes may require a quick look 😄

@Nordalf Nordalf force-pushed the feat/support-register-deregister-listing-schema-provider branch from 34ee8dd to 33f03b9 Compare March 2, 2025 07:28
@Nordalf
Copy link
Contributor Author

Nordalf commented Apr 7, 2025

@ion-elgreco and @rtyler - This PR should be ready

@ion-elgreco
Copy link
Collaborator

@Nordalf there are some failed tests

@Nordalf
Copy link
Contributor Author

Nordalf commented Apr 7, 2025

@Nordalf there are some failed tests

Oh damn... taking a look at it when I get home later

@Nordalf
Copy link
Contributor Author

Nordalf commented Apr 9, 2025

Looking into fixing the tests now.

Edit: Fixed and rebased

…incl. func to load a table at a later point

Signed-off-by: Alexander Falk <[email protected]>
@Nordalf Nordalf force-pushed the feat/support-register-deregister-listing-schema-provider branch from c7b2d14 to 983aedd Compare April 9, 2025 08:37
Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comment.

The general Issue we are facing is that we right now eagerly load the state of the table. THings will get easier once #3137 and we have done some follow up work ...

Comment on lines +151 to +154
if !self.table_exist(name.as_str()) {
self.tables.insert(name, table.clone());
}
Ok(Some(table))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the sematics here are a little bit different, i.e. we should raise if the table already exists.

https://github.com/apache/datafusion/blob/784df33f8930f91eada0d67aa5acc25a4c25cea2/datafusion/catalog/src/schema.rs#L57-L69

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right on this one. I will raise the error but should the table be returned if registered then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change the code later to:

        if self.table_exist(name.as_str()) {
            return exec_err!("The table {name} already exists");
        };
        Ok(self.tables.insert(name, table))

Comment on lines +140 to +143
let Some(provider) = self.tables.get(name).map(|t| t.clone()) else {
return Ok(None);
};
let provider =
open_table_with_storage_options(location, self.storage_options.0.clone()).await?;
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
Ok(Some(provider))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure we are returning an up to date table here. Datafusion internally will not make any calls to update the log data o.a. so if any operations happend (including the ones we may have done in the same DF session) we would serve a stale snapshot.

Comment on lines +96 to 113

/// Tables are not initialized but have a reference setup. To initialize the delta
/// table, the `load()` function must be called on the delta table. This function helps with
/// that and ensures the DashMap is updated
pub async fn load_table(&self, table_name: &str) -> datafusion::common::Result<()> {
if let Some(mut table) = self.tables.get_mut(&table_name.to_string()) {
if let Some(delta_table) = table.value().as_any().downcast_ref::<DeltaTable>() {
// If table has not yet been loaded, we remove it from the tables map and add it again
if delta_table.state.is_none() {
let mut delta_table = delta_table.clone();
delta_table.load().await?;
*table = Arc::from(delta_table);
}
}
}

Ok(())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the table function is the right place to create an up to date snapshot. see comment below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So an idea could be to move some of the logic down to the table function and then ensure it is updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea - if this is what you meant, @roeap

Copy link
Contributor Author

@Nordalf Nordalf Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roeap - what do you think of just moving the code to the table func and remove the state.is_none() check. Then it will become:

    async fn table(
        &self,
        name: &str,
    ) -> datafusion::common::Result<Option<Arc<dyn TableProvider>>> {
        if let Some(mut table) = self.tables.get_mut(&name.to_string()) {
            if let Some(delta_table) = table.value().as_any().downcast_ref::<DeltaTable>() {

                let mut delta_table = delta_table.clone();
                delta_table.update().await?;
                *table = Arc::from(delta_table);
            }
             return Ok(Some(Arc::clone(table.value())));

        }
        Ok(None)
    }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work ... eventually we may just want to do that in the scan method of the table provider, since right now we are kind-of locking in the version for every provider.

But that would go a bit to far for this PR ...

@rtyler rtyler marked this pull request as draft May 4, 2025 04:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants