-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sql catalog #229
Sql catalog #229
Conversation
crates/catalog/sql/src/catalog.rs
Outdated
let rows = connection.transaction(|txn|{ | ||
let name = self.name.clone(); | ||
Box::pin(async move { | ||
sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you care about SQL injections ? Or the catalog / namespace / table names are assumed to be safe ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, it's better to use prepare statement here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several points with this implementation:
- If
parent
isNone
, we should list all namespaces. - We should also count namespaces in
iceberg_namespace_properties
- We should list only sub namespaces.
See java implementation here.
} | ||
|
||
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> { | ||
let metadata_location = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check the cache first? Given that it's inserted later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the insertion should not blind, we need to check its version first. My suggestion is to remove the cache for now so that things don't get too complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache is only intended for update_table
. Optimistically assuming that the metadata_location hasn't been changed since loading the table, the metadata and metadata_location from the cache can directly be used to perform the update. This way the database has to be queried only once for the optimistic case.
If the metadata_location changed, the update method has to be more involved.
I would not use the cache for loading tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking maybe we should have a standalone data structure for caching, just like CachingCatalog in java
crates/catalog/sql/src/catalog.rs
Outdated
Box::pin(async move { | ||
sqlx::query( | ||
"create table if not exists iceberg_namespace_properties ( | ||
catalog_name text not null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VARCHAR(255)
here. Maybe we can copy&paste SQL from java iceberg?
+ CATALOG_NAME
+ " VARCHAR(255) NOT NULL,"
+ NAMESPACE_NAME
+ " VARCHAR(255) NOT NULL,"
+ NAMESPACE_PROPERTY_KEY
+ " VARCHAR(255),"
+ NAMESPACE_PROPERTY_VALUE
+ " VARCHAR(1000),"
crates/catalog/sql/src/catalog.rs
Outdated
let rows = connection.transaction(|txn|{ | ||
let name = self.name.clone(); | ||
Box::pin(async move { | ||
sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, it's better to use prepare statement here.
crates/catalog/sql/src/catalog.rs
Outdated
let rows = connection.transaction(|txn|{ | ||
let name = self.name.clone(); | ||
Box::pin(async move { | ||
sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several points with this implementation:
- If
parent
isNone
, we should list all namespaces. - We should also count namespaces in
iceberg_namespace_properties
- We should list only sub namespaces.
See java implementation here.
crates/catalog/sql/src/catalog.rs
Outdated
y.table_namespace | ||
.split('.') | ||
.map(ToString::to_string) | ||
.collect::<Vec<_>>(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about extract this to a common method in NamespaceIdent
?
} | ||
|
||
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> { | ||
let metadata_location = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the insertion should not blind, we need to check its version first. My suggestion is to remove the cache for now so that things don't get too complicated.
@@ -44,21 +49,27 @@ pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0; | |||
/// Reference to [`TableMetadata`]. | |||
pub type TableMetadataRef = Arc<TableMetadata>; | |||
|
|||
#[derive(Debug, PartialEq, Deserialize, Eq, Clone)] | |||
#[derive(Debug, PartialEq, Deserialize, Eq, Clone, TypedBuilder)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had a discussion in this pr about the table metadata builder. I have concern on this derived builder since it's error prone and not easy to review. TableMetadataBuilder
will be heavily used by transaction api and we will need to do a lot of check for in it. I would suggest to maintain this struct manually, what do you think?
cc @JanKaul Is this pr ready for review or you need to do more updates? |
I have to add a couple of more changes. I'll notify you when I'm finished. |
@JanKaul WDYT? I think this PR is ready for review, I can add the update and delete in a separate PR. |
Cool, I'll take a look first. |
Thank you all for your helpful comments. I think the PR is ready for review again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @JanKaul for this pr, we moved a huge step forward! I think there are some places we can improve a little to make it more robust.
crates/catalog/sql/src/catalog.rs
Outdated
name: String, | ||
connection: AnyPool, | ||
storage: FileIO, | ||
cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm hesitating to add cache here, maybe we can add sth like CachedCatalog
in java so that all catalog implementations could benefit from it?
Ok(table) | ||
} | ||
|
||
async fn create_table( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some things missing here:
- We should first check namespace exists
- The location is optional, it should use warehouse's subdir as location
I would suggest to refer to python's implementation,
fix connection pool issue for sql catalog
create sqlconfig, fix rest of the tests and remove todo
Co-authored-by: Renjie Liu <[email protected]>
Co-authored-by: Renjie Liu <[email protected]>
the bind placeholder |
Thanks for bringing this up, I haven't thought about it. It's weird though, the test is using sqlite which it's supposed to not work with ? according to the docs. |
This PR implements the basic operations for a Sql catalog. The implementation uses the
sqlx
crate which enables Postgres, MySQL and Sqlite.The
update_table
method is to be implemented later.