From 70d91f20b494c20c7beb360ed19e1cefc43de3f4 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Fri, 1 Mar 2024 15:49:12 +0100 Subject: [PATCH 01/35] initialize sql catalog crate --- crates/catalog/sql/Cargo.toml | 51 ++++ crates/catalog/sql/README.md | 21 ++ crates/catalog/sql/src/catalog.rs | 396 ++++++++++++++++++++++++++++++ crates/catalog/sql/src/lib.rs | 23 ++ 4 files changed, 491 insertions(+) create mode 100644 crates/catalog/sql/Cargo.toml create mode 100644 crates/catalog/sql/README.md create mode 100644 crates/catalog/sql/src/catalog.rs create mode 100644 crates/catalog/sql/src/lib.rs diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml new file mode 100644 index 000000000..5aa9bc35a --- /dev/null +++ b/crates/catalog/sql/Cargo.toml @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-catalog-sql" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Rust Sql Catalog" +repository = { workspace = true } +license = { workspace = true } +keywords = ["iceberg", "sql", "catalog"] + +[dependencies] +# async-trait = { workspace = true } +async-trait = { workspace = true } +chrono = { workspace = true } +iceberg = { workspace = true } +log = "0.4.20" +serde = { workspace = true } +serde_derive = { workspace = true } +serde_json = { workspace = true } +typed-builder = { workspace = true } +urlencoding = { workspace = true } +uuid = { workspace = true, features = ["v4"] } +dashmap = "5.5.3" +futures.workspace = true +sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } +url.workspace = true +opendal.workspace = true + +[dev-dependencies] +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +tokio = { workspace = true } diff --git a/crates/catalog/sql/README.md b/crates/catalog/sql/README.md new file mode 100644 index 000000000..35f70a6b5 --- /dev/null +++ b/crates/catalog/sql/README.md @@ -0,0 +1,21 @@ + + +# Apache Iceberg Sql Catalog Official Native Rust Implementation + diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs new file mode 100644 index 000000000..4ec56f27a --- /dev/null +++ b/crates/catalog/sql/src/catalog.rs @@ -0,0 +1,396 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use dashmap::DashMap; +use futures::lock::Mutex; +use sqlx::{ + any::{install_default_drivers, AnyConnectOptions, AnyRow}, + AnyConnection, ConnectOptions, Connection, Row, +}; +use std::collections::HashMap; + +use iceberg::{ + spec::TableMetadata, table::Table, Catalog, Error, ErrorKind, Namespace, NamespaceIdent, + Result, TableCommit, TableCreation, TableIdent, +}; +use opendal::Operator; +use uuid::Uuid; + +#[derive(Debug)] +/// Sql catalog implementation. +pub struct SqlCatalog { + name: String, + connection: Arc>, + operator: Operator, + cache: Arc>, +} + +// impl SqlCatalog { +// pub async fn new(url: &str, name: &str, operator: Operator) -> Result { +// install_default_drivers(); + +// let mut connection = +// AnyConnectOptions::connect(&AnyConnectOptions::from_url(&url.try_into()?)?).await?; + +// connection +// .transaction(|txn| { +// Box::pin(async move { +// sqlx::query( +// "create table if not exists iceberg_tables ( +// catalog_name text not null, +// table_namespace text not null, +// table_name text not null, +// metadata_location text not null, +// previous_metadata_location text, +// primary key (catalog_name, table_namespace, table_name) +// );", +// ) +// .execute(&mut **txn) +// .await +// }) +// }) +// .await?; + +// connection +// .transaction(|txn| { +// Box::pin(async move { +// sqlx::query( +// "create table if not exists iceberg_namespace_properties ( +// catalog_name text not null, +// namespace text not null, +// property_key text, +// property_value text, +// primary key (catalog_name, namespace, property_key) +// );", +// ) +// .execute(&mut **txn) +// .await +// }) +// }) +// .await?; + +// Ok(SqlCatalog { +// name: name.to_owned(), +// connection: Arc::new(Mutex::new(connection)), +// operator, +// cache: Arc::new(DashMap::new()), +// }) +// } +// } + +// #[derive(Debug)] +// struct TableRef { +// table_namespace: String, +// table_name: String, +// metadata_location: String, +// _previous_metadata_location: Option, +// } + +// fn query_map(row: &AnyRow) -> std::result::Result { +// Ok(TableRef { +// table_namespace: row.try_get(0)?, +// table_name: row.try_get(1)?, +// metadata_location: row.try_get(2)?, +// _previous_metadata_location: row.try_get::(3).map(Some).or_else(|err| { +// if let sqlx::Error::ColumnDecode { +// index: _, +// source: _, +// } = err +// { +// Ok(None) +// } else { +// Err(err) +// } +// })?, +// }) +// } + +#[async_trait] +impl Catalog for SqlCatalog { + async fn list_namespaces( + &self, + _parent: Option<&NamespaceIdent>, + ) -> Result> { + // let mut connection = self.connection.lock().await; + // let rows = connection.transaction(|txn|{ + // let name = self.name.clone(); + // Box::pin(async move { + // sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await + // })}).await.map_err(Error::from)?; + // let iter = rows.iter().map(|row| row.try_get::(0)); + + // Ok(iter + // .map(|x| { + // x.and_then(|y| { + // Namespace::try_new(&y.split('.').map(ToString::to_string).collect::>()) + // .map_err(|err| sqlx::Error::Decode(Box::new(err))) + // }) + // }) + // .collect::>() + // .map_err(Error::from)?) + todo!() + } + async fn create_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result { + todo!() + } + + async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result { + todo!() + } + + async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result { + todo!() + } + + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result<()> { + todo!() + } + + async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { + todo!() + } + + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { + // let mut connection = self.connection.lock().await; + // let rows = connection.transaction(|txn|{ + // let name = self.name.clone(); + // let namespace = namespace.to_string(); + // Box::pin(async move { + // sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&mut **txn).await + // })}).await.map_err(Error::from)?; + // let iter = rows.iter().map(query_map); + todo!() + + // Ok(iter + // .map(|x| { + // x.and_then(|y| { + // TableIdent::parse(&(y.table_namespace.to_string() + "." + &y.table_name)) + // .map_err(|err| sqlx::Error::Decode(Box::new(err))) + // }) + // }) + // .collect::>() + // .map_err(Error::from)?) + } + + async fn stat_table(&self, identifier: &TableIdent) -> Result { + // let mut connection = self.connection.lock().await; + // let rows = connection.transaction(|txn|{ + // let catalog_name = self.name.clone(); + // let namespace = identifier.namespace().to_string(); + // let name = identifier.name().to_string(); + // Box::pin(async move { + // sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, + // &namespace, + // &name)).fetch_all(&mut **txn).await + // })}).await.map_err(Error::from)?; + // let mut iter = rows.iter().map(query_map); + + // Ok(iter.next().is_some()) + todo!() + } + + async fn drop_table(&self, identifier: &TableIdent) -> Result<()> { + // let mut connection = self.connection.lock().await; + // connection.transaction(|txn|{ + // let catalog_name = self.name.clone(); + // let namespace = identifier.namespace().to_string(); + // let name = identifier.name().to_string(); + // Box::pin(async move { + // sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, + // &namespace, + // &name)).execute(&mut **txn).await + // })}).await.map_err(Error::from)?; + Ok(()) + } + + async fn load_table(&self, identifier: &TableIdent) -> Result { + // let path = { + // let mut connection = self.connection.lock().await; + // let row = connection.transaction(|txn|{ + // let catalog_name = self.name.clone(); + // let namespace = identifier.namespace().to_string(); + // let name = identifier.name().to_string(); + // Box::pin(async move { + // sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, + // &namespace, + // &name)).fetch_one(&mut **txn).await + // })}).await.map_err(Error::from)?; + // let row = query_map(&row).map_err(Error::from)?; + + // row.metadata_location + // }; + todo!() + // let bytes = &self + // .operator + // .get(&strip_prefix(&path).as_str().into()) + // .await? + // .bytes() + // .await?; + // let metadata: TabularMetadata = serde_json::from_str(std::str::from_utf8(bytes)?)?; + // self.cache + // .insert(identifier.clone(), (path.clone(), metadata.clone())); + // match metadata { + // TabularMetadata::Table(metadata) => Ok(Tabular::Table( + // Table::new(identifier.clone(), self.clone(), metadata).await?, + // )), + // TabularMetadata::View(metadata) => Ok(Tabular::View( + // View::new(identifier.clone(), self.clone(), metadata).await?, + // )), + // TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView( + // MaterializedView::new(identifier.clone(), self.clone(), metadata).await?, + // )), + // } + } + + async fn create_table( + &self, + namespace: &NamespaceIdent, + creation: TableCreation, + ) -> Result
{ + // Create metadata + // let location = metadata.location.to_string(); + + // let uuid = Uuid::new_v4(); + // let version = &metadata.last_sequence_number; + // let metadata_json = serde_json::to_string(&metadata)?; + // let metadata_location = location + // + "/metadata/" + // + &version.to_string() + // + "-" + // + &uuid.to_string() + // + ".metadata.json"; + // operator + // .put( + // &strip_prefix(&metadata_location).into(), + // metadata_json.into(), + // ) + // .await?; + // { + // let mut connection = self.connection.lock().await; + // connection.transaction(|txn|{ + // let catalog_name = self.name.clone(); + // let namespace = identifier.namespace().to_string(); + // let name = identifier.name().to_string(); + // let metadata_location = metadata_location.to_string(); + // Box::pin(async move { + // sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&mut **txn).await + // })}).await.map_err(Error::from)?; + // } + // self.clone() + // .load_tabular(&identifier) + // .await + todo!() + } + + async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { + todo!() + } + + async fn update_table(&self, commit: TableCommit) -> Result
{ + todo!() + } +} + +// #[cfg(test)] +// pub mod tests { +// use iceberg_rust::{ +// catalog::{identifier::TableIdent, namespace::Namespace, Catalog}, +// spec::{ +// schema::Schema, +// types::{PrimitiveType, StructField, StructType, Type}, +// }, +// table::table_builder::TableBuilder, +// }; +// use operator::{memory::InMemory, ObjectStore}; +// use std::sync::Arc; + +// use crate::SqlCatalog; + +// #[tokio::test] +// async fn test_create_update_drop_table() { +// let operator: Arc = Arc::new(InMemory::new()); +// let catalog: Arc = Arc::new( +// SqlCatalog::new("sqlite://", "test", operator) +// .await +// .unwrap(), +// ); +// let identifier = TableIdent::parse("load_table.table3").unwrap(); +// let schema = Schema::builder() +// .with_schema_id(1) +// .with_identifier_field_ids(vec![1, 2]) +// .with_fields( +// StructType::builder() +// .with_struct_field(StructField { +// id: 1, +// name: "one".to_string(), +// required: false, +// field_type: Type::Primitive(PrimitiveType::String), +// doc: None, +// }) +// .with_struct_field(StructField { +// id: 2, +// name: "two".to_string(), +// required: false, +// field_type: Type::Primitive(PrimitiveType::String), +// doc: None, +// }) +// .build() +// .unwrap(), +// ) +// .build() +// .unwrap(); + +// let mut builder = TableBuilder::new(&identifier, catalog.clone()) +// .expect("Failed to create table builder."); +// builder +// .location("/") +// .with_schema((1, schema)) +// .current_schema_id(1); +// let mut table = builder.build().await.expect("Failed to create table."); + +// let exists = Arc::clone(&catalog) +// .table_exists(&identifier) +// .await +// .expect("Table doesn't exist"); +// assert!(exists); + +// let tables = catalog +// .clone() +// .list_tables( +// &Namespace::try_new(&["load_table".to_owned()]) +// .expect("Failed to create namespace"), +// ) +// .await +// .expect("Failed to list Tables"); +// assert_eq!(tables[0].to_string(), "load_table.table3".to_owned()); + +// let namespaces = catalog +// .clone() +// .list_namespaces(None) +// .await +// .expect("Failed to list namespaces"); +// assert_eq!(namespaces[0].to_string(), "load_table"); + +// let transaction = table.new_transaction(None); +// transaction.commit().await.expect("Transaction failed."); + +// catalog +// .drop_table(&identifier) +// .await +// .expect("Failed to drop table."); + +// let exists = Arc::clone(&catalog) +// .table_exists(&identifier) +// .await +// .expect("Table exists failed"); +// assert!(!exists); +// } +// } diff --git a/crates/catalog/sql/src/lib.rs b/crates/catalog/sql/src/lib.rs new file mode 100644 index 000000000..023fe7ab2 --- /dev/null +++ b/crates/catalog/sql/src/lib.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg REST API implementation. + +#![deny(missing_docs)] + +mod catalog; +pub use catalog::*; From 2df82ae778ad0e84a824a6f1aa3de3d5e2b043e6 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 4 Mar 2024 16:05:16 +0100 Subject: [PATCH 02/35] implement basic sql catalog functionality --- crates/catalog/sql/Cargo.toml | 3 +- crates/catalog/sql/src/catalog.rs | 625 +++++++++++----------- crates/catalog/sql/src/error.rs | 27 + crates/catalog/sql/src/lib.rs | 1 + crates/iceberg/src/spec/table_metadata.rs | 30 +- 5 files changed, 370 insertions(+), 316 deletions(-) create mode 100644 crates/catalog/sql/src/error.rs diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 5aa9bc35a..0c3d7ffb5 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -29,7 +29,7 @@ license = { workspace = true } keywords = ["iceberg", "sql", "catalog"] [dependencies] -# async-trait = { workspace = true } +anyhow = "1" async-trait = { workspace = true } chrono = { workspace = true } iceberg = { workspace = true } @@ -48,4 +48,5 @@ opendal.workspace = true [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +tempfile = { workspace = true } tokio = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 4ec56f27a..cec5b3f8e 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -1,8 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::sync::Arc; use async_trait::async_trait; use dashmap::DashMap; -use futures::lock::Mutex; +use futures::{lock::Mutex, AsyncReadExt, AsyncWriteExt}; use sqlx::{ any::{install_default_drivers, AnyConnectOptions, AnyRow}, AnyConnection, ConnectOptions, Connection, Row, @@ -10,100 +27,107 @@ use sqlx::{ use std::collections::HashMap; use iceberg::{ - spec::TableMetadata, table::Table, Catalog, Error, ErrorKind, Namespace, NamespaceIdent, - Result, TableCommit, TableCreation, TableIdent, + io::FileIO, spec::TableMetadata, table::Table, Catalog, Error, ErrorKind, Namespace, + NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; -use opendal::Operator; use uuid::Uuid; +use crate::error::from_sqlx_error; + #[derive(Debug)] /// Sql catalog implementation. pub struct SqlCatalog { name: String, connection: Arc>, - operator: Operator, + storage: FileIO, cache: Arc>, } -// impl SqlCatalog { -// pub async fn new(url: &str, name: &str, operator: Operator) -> Result { -// install_default_drivers(); - -// let mut connection = -// AnyConnectOptions::connect(&AnyConnectOptions::from_url(&url.try_into()?)?).await?; - -// connection -// .transaction(|txn| { -// Box::pin(async move { -// sqlx::query( -// "create table if not exists iceberg_tables ( -// catalog_name text not null, -// table_namespace text not null, -// table_name text not null, -// metadata_location text not null, -// previous_metadata_location text, -// primary key (catalog_name, table_namespace, table_name) -// );", -// ) -// .execute(&mut **txn) -// .await -// }) -// }) -// .await?; - -// connection -// .transaction(|txn| { -// Box::pin(async move { -// sqlx::query( -// "create table if not exists iceberg_namespace_properties ( -// catalog_name text not null, -// namespace text not null, -// property_key text, -// property_value text, -// primary key (catalog_name, namespace, property_key) -// );", -// ) -// .execute(&mut **txn) -// .await -// }) -// }) -// .await?; - -// Ok(SqlCatalog { -// name: name.to_owned(), -// connection: Arc::new(Mutex::new(connection)), -// operator, -// cache: Arc::new(DashMap::new()), -// }) -// } -// } - -// #[derive(Debug)] -// struct TableRef { -// table_namespace: String, -// table_name: String, -// metadata_location: String, -// _previous_metadata_location: Option, -// } - -// fn query_map(row: &AnyRow) -> std::result::Result { -// Ok(TableRef { -// table_namespace: row.try_get(0)?, -// table_name: row.try_get(1)?, -// metadata_location: row.try_get(2)?, -// _previous_metadata_location: row.try_get::(3).map(Some).or_else(|err| { -// if let sqlx::Error::ColumnDecode { -// index: _, -// source: _, -// } = err -// { -// Ok(None) -// } else { -// Err(err) -// } -// })?, -// }) -// } +impl SqlCatalog { + /// Create new sql catalog instance + pub async fn new(url: &str, name: &str, storage: FileIO) -> Result { + install_default_drivers(); + + let mut connection = AnyConnectOptions::connect( + &AnyConnectOptions::from_url(&url.try_into()?).map_err(from_sqlx_error)?, + ) + .await + .map_err(from_sqlx_error)?; + + connection + .transaction(|txn| { + Box::pin(async move { + sqlx::query( + "create table if not exists iceberg_tables ( + catalog_name text not null, + table_namespace text not null, + table_name text not null, + metadata_location text not null, + previous_metadata_location text, + primary key (catalog_name, table_namespace, table_name) + );", + ) + .execute(&mut **txn) + .await + }) + }) + .await + .map_err(from_sqlx_error)?; + + connection + .transaction(|txn| { + Box::pin(async move { + sqlx::query( + "create table if not exists iceberg_namespace_properties ( + catalog_name text not null, + namespace text not null, + property_key text, + property_value text, + primary key (catalog_name, namespace, property_key) + );", + ) + .execute(&mut **txn) + .await + }) + }) + .await + .map_err(from_sqlx_error)?; + + Ok(SqlCatalog { + name: name.to_owned(), + connection: Arc::new(Mutex::new(connection)), + storage, + cache: Arc::new(DashMap::new()), + }) + } +} + +#[derive(Debug)] +struct TableRef { + table_namespace: String, + table_name: String, + metadata_location: String, + _previous_metadata_location: Option, +} + +fn query_map(row: &AnyRow) -> std::result::Result { + Ok(TableRef { + table_namespace: row.try_get(0)?, + table_name: row.try_get(1)?, + metadata_location: row.try_get(2)?, + _previous_metadata_location: row.try_get::(3).map(Some).or_else(|err| { + if let sqlx::Error::ColumnDecode { + index: _, + source: _, + } = err + { + Ok(None) + } else { + Err(err) + } + })?, + }) +} #[async_trait] impl Catalog for SqlCatalog { @@ -111,25 +135,27 @@ impl Catalog for SqlCatalog { &self, _parent: Option<&NamespaceIdent>, ) -> Result> { - // let mut connection = self.connection.lock().await; - // let rows = connection.transaction(|txn|{ - // let name = self.name.clone(); - // Box::pin(async move { - // sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await - // })}).await.map_err(Error::from)?; - // let iter = rows.iter().map(|row| row.try_get::(0)); - - // Ok(iter - // .map(|x| { - // x.and_then(|y| { - // Namespace::try_new(&y.split('.').map(ToString::to_string).collect::>()) - // .map_err(|err| sqlx::Error::Decode(Box::new(err))) - // }) - // }) - // .collect::>() - // .map_err(Error::from)?) - todo!() + let mut connection = self.connection.lock().await; + let rows = connection.transaction(|txn|{ + let name = self.name.clone(); + Box::pin(async move { + sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await + })}).await.map_err(from_sqlx_error)?; + let iter = rows.iter().map(|row| row.try_get::(0)); + + Ok(iter + .map(|x| { + x.and_then(|y| { + NamespaceIdent::from_vec( + y.split('.').map(ToString::to_string).collect::>(), + ) + .map_err(|err| sqlx::Error::Decode(Box::new(err))) + }) + }) + .collect::>() + .map_err(from_sqlx_error)?) } + async fn create_namespace( &self, _namespace: &NamespaceIdent, @@ -159,95 +185,85 @@ impl Catalog for SqlCatalog { } async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { - // let mut connection = self.connection.lock().await; - // let rows = connection.transaction(|txn|{ - // let name = self.name.clone(); - // let namespace = namespace.to_string(); - // Box::pin(async move { - // sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&mut **txn).await - // })}).await.map_err(Error::from)?; - // let iter = rows.iter().map(query_map); - todo!() - - // Ok(iter - // .map(|x| { - // x.and_then(|y| { - // TableIdent::parse(&(y.table_namespace.to_string() + "." + &y.table_name)) - // .map_err(|err| sqlx::Error::Decode(Box::new(err))) - // }) - // }) - // .collect::>() - // .map_err(Error::from)?) + let mut connection = self.connection.lock().await; + let rows = connection.transaction(|txn|{ + let name = self.name.clone(); + let namespace = namespace.encode_in_url(); + Box::pin(async move { + sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&mut **txn).await + })}).await.map_err(from_sqlx_error)?; + let iter = rows.iter().map(query_map); + + Ok(iter + .map(|x| { + x.and_then(|y| { + let namespace = NamespaceIdent::from_vec( + y.table_namespace + .split('.') + .map(ToString::to_string) + .collect::>(), + ) + .map_err(|err| sqlx::Error::Decode(Box::new(err)))?; + Ok(TableIdent::new(namespace, y.table_name)) + }) + }) + .collect::>() + .map_err(from_sqlx_error)?) } async fn stat_table(&self, identifier: &TableIdent) -> Result { - // let mut connection = self.connection.lock().await; - // let rows = connection.transaction(|txn|{ - // let catalog_name = self.name.clone(); - // let namespace = identifier.namespace().to_string(); - // let name = identifier.name().to_string(); - // Box::pin(async move { - // sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, - // &namespace, - // &name)).fetch_all(&mut **txn).await - // })}).await.map_err(Error::from)?; - // let mut iter = rows.iter().map(query_map); - - // Ok(iter.next().is_some()) - todo!() + let mut connection = self.connection.lock().await; + let rows = connection.transaction(|txn|{ + let catalog_name = self.name.clone(); + let namespace = identifier.namespace().encode_in_url(); + let name = identifier.name().to_string(); + Box::pin(async move { + sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, + &namespace, + &name)).fetch_all(&mut **txn).await + })}).await.map_err(from_sqlx_error)?; + let mut iter = rows.iter().map(query_map); + + Ok(iter.next().is_some()) } - async fn drop_table(&self, identifier: &TableIdent) -> Result<()> { - // let mut connection = self.connection.lock().await; - // connection.transaction(|txn|{ - // let catalog_name = self.name.clone(); - // let namespace = identifier.namespace().to_string(); - // let name = identifier.name().to_string(); - // Box::pin(async move { - // sqlx::query(&format!("delete from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, - // &namespace, - // &name)).execute(&mut **txn).await - // })}).await.map_err(Error::from)?; - Ok(()) + async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> { + todo!() } async fn load_table(&self, identifier: &TableIdent) -> Result
{ - // let path = { - // let mut connection = self.connection.lock().await; - // let row = connection.transaction(|txn|{ - // let catalog_name = self.name.clone(); - // let namespace = identifier.namespace().to_string(); - // let name = identifier.name().to_string(); - // Box::pin(async move { - // sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, - // &namespace, - // &name)).fetch_one(&mut **txn).await - // })}).await.map_err(Error::from)?; - // let row = query_map(&row).map_err(Error::from)?; - - // row.metadata_location - // }; - todo!() - // let bytes = &self - // .operator - // .get(&strip_prefix(&path).as_str().into()) - // .await? - // .bytes() - // .await?; - // let metadata: TabularMetadata = serde_json::from_str(std::str::from_utf8(bytes)?)?; - // self.cache - // .insert(identifier.clone(), (path.clone(), metadata.clone())); - // match metadata { - // TabularMetadata::Table(metadata) => Ok(Tabular::Table( - // Table::new(identifier.clone(), self.clone(), metadata).await?, - // )), - // TabularMetadata::View(metadata) => Ok(Tabular::View( - // View::new(identifier.clone(), self.clone(), metadata).await?, - // )), - // TabularMetadata::MaterializedView(metadata) => Ok(Tabular::MaterializedView( - // MaterializedView::new(identifier.clone(), self.clone(), metadata).await?, - // )), - // } + let metadata_location = { + let mut connection = self.connection.lock().await; + let row = connection.transaction(|txn|{ + let catalog_name = self.name.clone(); + let namespace = identifier.namespace().encode_in_url(); + let name = identifier.name().to_string(); + Box::pin(async move { + sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, + &namespace, + &name)).fetch_one(&mut **txn).await + })}).await.map_err(from_sqlx_error)?; + let row = query_map(&row).map_err(from_sqlx_error)?; + + row.metadata_location + }; + let file = self.storage.new_input(&metadata_location)?; + + let mut json = String::new(); + file.reader().await?.read_to_string(&mut json).await?; + + let metadata: TableMetadata = serde_json::from_str(&json)?; + + self.cache + .insert(identifier.clone(), (metadata_location, metadata.clone())); + + let table = Table::builder() + .file_io(self.storage.clone()) + .identifier(identifier.clone()) + .metadata(metadata) + .build(); + + Ok(table) } async fn create_table( @@ -255,142 +271,127 @@ impl Catalog for SqlCatalog { namespace: &NamespaceIdent, creation: TableCreation, ) -> Result
{ - // Create metadata - // let location = metadata.location.to_string(); - - // let uuid = Uuid::new_v4(); - // let version = &metadata.last_sequence_number; - // let metadata_json = serde_json::to_string(&metadata)?; - // let metadata_location = location - // + "/metadata/" - // + &version.to_string() - // + "-" - // + &uuid.to_string() - // + ".metadata.json"; - // operator - // .put( - // &strip_prefix(&metadata_location).into(), - // metadata_json.into(), - // ) - // .await?; - // { - // let mut connection = self.connection.lock().await; - // connection.transaction(|txn|{ - // let catalog_name = self.name.clone(); - // let namespace = identifier.namespace().to_string(); - // let name = identifier.name().to_string(); - // let metadata_location = metadata_location.to_string(); - // Box::pin(async move { - // sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&mut **txn).await - // })}).await.map_err(Error::from)?; - // } - // self.clone() - // .load_tabular(&identifier) - // .await - todo!() + let location = creation.location.ok_or(Error::new( + ErrorKind::DataInvalid, + "Table creation with the Sql catalog requires a location.", + ))?; + + let uuid = Uuid::new_v4(); + let metadata_location = + location.clone() + "/metadata/" + "0-" + &uuid.to_string() + ".metadata.json"; + + let metadata = TableMetadata::builder() + .location(location) + .current_schema_id(creation.schema.schema_id()) + .last_column_id(creation.schema.highest_field_id()) + .schemas(HashMap::from_iter(vec![( + creation.schema.schema_id(), + creation.schema.into(), + )])) + .partition_specs(HashMap::new()) + .last_partition_id(0) + .current_snapshot_id(-1) + .sort_orders(HashMap::new()) + .default_sort_order_id(0) + .build(); + + let file = self.storage.new_output(&metadata_location)?; + file.writer() + .await? + .write_all(&serde_json::to_vec(&metadata)?) + .await?; + { + let mut connection = self.connection.lock().await; + connection.transaction(|txn|{ + let catalog_name = self.name.clone(); + let namespace = namespace.encode_in_url(); + let name = creation.name.clone(); + let metadata_location = metadata_location.to_string(); + Box::pin(async move { + sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&mut **txn).await + })}).await.map_err(from_sqlx_error)?; + } + Ok(Table::builder() + .file_io(self.storage.clone()) + .metadata_location(metadata_location) + .identifier(TableIdent::new(namespace.clone(), creation.name)) + .metadata(metadata) + .build()) } async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { todo!() } - async fn update_table(&self, commit: TableCommit) -> Result
{ + async fn update_table(&self, _commit: TableCommit) -> Result
{ todo!() } } -// #[cfg(test)] -// pub mod tests { -// use iceberg_rust::{ -// catalog::{identifier::TableIdent, namespace::Namespace, Catalog}, -// spec::{ -// schema::Schema, -// types::{PrimitiveType, StructField, StructType, Type}, -// }, -// table::table_builder::TableBuilder, -// }; -// use operator::{memory::InMemory, ObjectStore}; -// use std::sync::Arc; - -// use crate::SqlCatalog; - -// #[tokio::test] -// async fn test_create_update_drop_table() { -// let operator: Arc = Arc::new(InMemory::new()); -// let catalog: Arc = Arc::new( -// SqlCatalog::new("sqlite://", "test", operator) -// .await -// .unwrap(), -// ); -// let identifier = TableIdent::parse("load_table.table3").unwrap(); -// let schema = Schema::builder() -// .with_schema_id(1) -// .with_identifier_field_ids(vec![1, 2]) -// .with_fields( -// StructType::builder() -// .with_struct_field(StructField { -// id: 1, -// name: "one".to_string(), -// required: false, -// field_type: Type::Primitive(PrimitiveType::String), -// doc: None, -// }) -// .with_struct_field(StructField { -// id: 2, -// name: "two".to_string(), -// required: false, -// field_type: Type::Primitive(PrimitiveType::String), -// doc: None, -// }) -// .build() -// .unwrap(), -// ) -// .build() -// .unwrap(); - -// let mut builder = TableBuilder::new(&identifier, catalog.clone()) -// .expect("Failed to create table builder."); -// builder -// .location("/") -// .with_schema((1, schema)) -// .current_schema_id(1); -// let mut table = builder.build().await.expect("Failed to create table."); - -// let exists = Arc::clone(&catalog) -// .table_exists(&identifier) -// .await -// .expect("Table doesn't exist"); -// assert!(exists); - -// let tables = catalog -// .clone() -// .list_tables( -// &Namespace::try_new(&["load_table".to_owned()]) -// .expect("Failed to create namespace"), -// ) -// .await -// .expect("Failed to list Tables"); -// assert_eq!(tables[0].to_string(), "load_table.table3".to_owned()); - -// let namespaces = catalog -// .clone() -// .list_namespaces(None) -// .await -// .expect("Failed to list namespaces"); -// assert_eq!(namespaces[0].to_string(), "load_table"); - -// let transaction = table.new_transaction(None); -// transaction.commit().await.expect("Transaction failed."); - -// catalog -// .drop_table(&identifier) -// .await -// .expect("Failed to drop table."); - -// let exists = Arc::clone(&catalog) -// .table_exists(&identifier) -// .await -// .expect("Table exists failed"); -// assert!(!exists); -// } -// } +#[cfg(test)] +pub mod tests { + + use iceberg::{ + io::FileIOBuilder, + spec::{NestedField, PrimitiveType, Schema, SortOrder, Type, UnboundPartitionSpec}, + Catalog, NamespaceIdent, TableCreation, TableIdent, + }; + use tempfile::TempDir; + + use crate::SqlCatalog; + + #[tokio::test] + async fn test_create_update_drop_table() { + let dir = TempDir::new().unwrap(); + let storage = FileIOBuilder::new_fs_io().build().unwrap(); + + let catalog = SqlCatalog::new("sqlite://", "iceberg", storage) + .await + .unwrap(); + + let namespace = NamespaceIdent::new("test".to_owned()); + + let identifier = TableIdent::new(namespace.clone(), "table1".to_owned()); + + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "one", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(2, "two", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let creation = TableCreation::builder() + .name("table1".to_owned()) + .location(dir.path().to_str().unwrap().to_owned() + "/warehouse/table1") + .schema(schema) + .partition_spec(UnboundPartitionSpec::default()) + .sort_order(SortOrder::default()) + .build(); + + catalog.create_table(&namespace, creation).await.unwrap(); + + let exists = catalog + .stat_table(&identifier) + .await + .expect("Table doesn't exist"); + assert!(exists); + + let tables = catalog + .list_tables(&namespace) + .await + .expect("Failed to list Tables"); + assert_eq!(tables[0].name(), "table1".to_owned()); + + let namespaces = catalog + .list_namespaces(None) + .await + .expect("Failed to list namespaces"); + assert_eq!(namespaces[0].encode_in_url(), "test"); + + let table = catalog.load_table(&identifier).await.unwrap(); + + assert!(table.metadata().location().ends_with("/warehouse/table1")) + } +} diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs new file mode 100644 index 000000000..c6b998beb --- /dev/null +++ b/crates/catalog/sql/src/error.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iceberg::{Error, ErrorKind}; + +/// Format an sqlx error into iceberg error. +pub fn from_sqlx_error(error: sqlx::Error) -> Error { + Error::new( + ErrorKind::Unexpected, + "operation failed for hitting io error".to_string(), + ) + .with_source(error) +} diff --git a/crates/catalog/sql/src/lib.rs b/crates/catalog/sql/src/lib.rs index 023fe7ab2..cfacac3d9 100644 --- a/crates/catalog/sql/src/lib.rs +++ b/crates/catalog/sql/src/lib.rs @@ -20,4 +20,5 @@ #![deny(missing_docs)] mod catalog; +mod error; pub use catalog::*; diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 4892c2623..ae175de18 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -22,7 +22,12 @@ use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use std::cmp::Ordering; use std::fmt::{Display, Formatter}; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; +use typed_builder::TypedBuilder; use uuid::Uuid; use super::{ @@ -47,21 +52,27 @@ pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0; /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; -#[derive(Debug, PartialEq, Deserialize, Eq, Clone)] +#[derive(Debug, PartialEq, Deserialize, Eq, Clone, TypedBuilder)] #[serde(try_from = "TableMetadataEnum")] /// Fields for the version 2 of the table metadata. /// /// We assume that this data structure is always valid, so we will panic when invalid error happens. /// We check the validity of this data structure when constructing. pub struct TableMetadata { + #[builder(default)] /// Integer Version for the format. pub(crate) format_version: FormatVersion, + #[builder(default_code = "Uuid::new_v4()")] /// A UUID that identifies the table pub(crate) table_uuid: Uuid, /// Location tables base location pub(crate) location: String, + #[builder(default)] /// The tables highest sequence number pub(crate) last_sequence_number: i64, + #[builder( + default_code = "SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as i64" + )] /// Timestamp in milliseconds from the unix epoch when the table was last updated. pub(crate) last_updated_ms: i64, /// An integer; the highest assigned column ID for the table. @@ -72,22 +83,27 @@ pub struct TableMetadata { pub(crate) current_schema_id: i32, /// A list of partition specs, stored as full partition spec objects. pub(crate) partition_specs: HashMap, + #[builder(default = DEFAULT_SPEC_ID)] /// ID of the “current” spec that writers should use by default. pub(crate) default_spec_id: i32, /// An integer; the highest assigned partition field ID across all partition specs for the table. pub(crate) last_partition_id: i32, + #[builder(default)] ///A string to string map of table properties. This is used to control settings that /// affect reading and writing and is not intended to be used for arbitrary metadata. /// For example, commit.retry.num-retries is used to control the number of commit retries. pub(crate) properties: HashMap, + #[builder(setter(strip_option))] /// long ID of the current table snapshot; must be the same as the current /// ID of the main branch in refs. pub(crate) current_snapshot_id: Option, + #[builder(default)] ///A list of valid snapshots. Valid snapshots are snapshots for which all /// data files exist in the file system. A data file must not be deleted /// from the file system until the last snapshot in which it was listed is /// garbage collected. pub(crate) snapshots: HashMap, + #[builder(default)] /// A list (optional) of timestamp and snapshot ID pairs that encodes changes /// to the current snapshot for the table. Each time the current-snapshot-id /// is changed, a new entry should be added with the last-updated-ms @@ -95,6 +111,7 @@ pub struct TableMetadata { /// the list of valid snapshots, all entries before a snapshot that has /// expired should be removed. pub(crate) snapshot_log: Vec, + #[builder(default)] /// A list (optional) of timestamp and metadata file location pairs /// that encodes changes to the previous metadata files for the table. @@ -103,13 +120,14 @@ pub struct TableMetadata { /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. pub(crate) metadata_log: Vec, - /// A list of sort orders, stored as full sort order objects. pub(crate) sort_orders: HashMap, + #[builder(default = DEFAULT_SORT_ORDER_ID)] /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. pub(crate) default_sort_order_id: i64, + #[builder(default)] ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id @@ -895,6 +913,12 @@ impl Display for FormatVersion { } } +impl Default for FormatVersion { + fn default() -> Self { + FormatVersion::V2 + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] /// Encodes changes to the previous metadata files for the table From 8de33738ff6ecee6a76e0a1fb52b29ce7aac9a2f Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 4 Mar 2024 16:20:01 +0100 Subject: [PATCH 03/35] fix clippy warnings --- crates/iceberg/src/spec/table_metadata.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index ae175de18..b57110e57 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -885,10 +885,12 @@ pub(super) mod _serde { #[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)] #[repr(u8)] /// Iceberg format version +#[derive(Default)] pub enum FormatVersion { /// Iceberg spec version 1 V1 = 1u8, /// Iceberg spec version 2 + #[default] V2 = 2u8, } @@ -913,11 +915,7 @@ impl Display for FormatVersion { } } -impl Default for FormatVersion { - fn default() -> Self { - FormatVersion::V2 - } -} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] From e73fce27b195bc08669e89c124998cf12311f34b Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 4 Mar 2024 16:23:30 +0100 Subject: [PATCH 04/35] fix format --- crates/iceberg/src/spec/table_metadata.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index b57110e57..36961a7ea 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -915,8 +915,6 @@ impl Display for FormatVersion { } } - - #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] /// Encodes changes to the previous metadata files for the table From caeba8fbef7c1355d4ae670257e33617674d2acd Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 4 Mar 2024 16:29:04 +0100 Subject: [PATCH 05/35] fix cargo sort --- crates/catalog/sql/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 0c3d7ffb5..ddd121ead 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -41,10 +41,10 @@ typed-builder = { workspace = true } urlencoding = { workspace = true } uuid = { workspace = true, features = ["v4"] } dashmap = "5.5.3" -futures.workspace = true +futures = { workspace = true } sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } -url.workspace = true -opendal.workspace = true +url = { workspace = true } +opendal = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } From da3aac632f4bff85e7c07e7a6194950041773c14 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 4 Mar 2024 16:49:11 +0100 Subject: [PATCH 06/35] fix ordering --- crates/catalog/sql/Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index ddd121ead..01a123f89 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -32,19 +32,19 @@ keywords = ["iceberg", "sql", "catalog"] anyhow = "1" async-trait = { workspace = true } chrono = { workspace = true } +dashmap = "5.5.3" +futures = { workspace = true } iceberg = { workspace = true } log = "0.4.20" +opendal = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } typed-builder = { workspace = true } +url = { workspace = true } urlencoding = { workspace = true } uuid = { workspace = true, features = ["v4"] } -dashmap = "5.5.3" -futures = { workspace = true } sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } -url = { workspace = true } -opendal = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } From f9580344d69e6eef4a280f604515f0d064444f5e Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 4 Mar 2024 17:05:46 +0100 Subject: [PATCH 07/35] fix ordering --- crates/catalog/sql/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 01a123f89..1c1c7244f 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -40,11 +40,11 @@ opendal = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } +sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } typed-builder = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } uuid = { workspace = true, features = ["v4"] } -sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } From 08d0e861959e163a726d4dd755bd518edae3f3f6 Mon Sep 17 00:00:00 2001 From: hpal Date: Wed, 10 Apr 2024 02:33:42 -0700 Subject: [PATCH 08/35] fix connection pool issue for sql catalog --- crates/catalog/sql/Cargo.toml | 4 +- crates/catalog/sql/src/catalog.rs | 83 +++++++++++++++++-------------- 2 files changed, 47 insertions(+), 40 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index f727ad837..893a47cbe 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -40,7 +40,7 @@ opendal = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -sqlx = { version = "0.7.2", features = ["tls-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } +sqlx = { version = "0.7.2", features = ["tls-rustls", "any", "sqlite", "postgres", "mysql", "migrate"], default-features = false } typed-builder = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } @@ -48,6 +48,6 @@ uuid = { workspace = true, features = ["v4"] } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } -sqlx = { version = "0.7.2", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "postgres", "mysql"], default-features = false } +sqlx = { version = "0.7.2", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "postgres", "mysql","migrate"], default-features = false } tempfile = { workspace = true } tokio = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index f812b5b10..baccf40f3 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -20,11 +20,10 @@ use std::sync::Arc; use async_trait::async_trait; use dashmap::DashMap; use futures::{AsyncReadExt, AsyncWriteExt}; -use sqlx::{ - any::{install_default_drivers, AnyRow}, - AnyPool, Connection, Row, -}; +use sqlx::{any::{install_default_drivers, AnyRow}, AnyPool, Connection, Database, Row}; use std::collections::HashMap; +use sqlx::any::AnyPoolOptions; +use sqlx::migrate::MigrateDatabase; use iceberg::{ io::FileIO, @@ -34,6 +33,9 @@ use iceberg::{ TableIdent, }; use uuid::Uuid; +use std::time::Duration; +use opendal::Scheme::Sqlite; +use sqlx::Error::Migrate; use crate::error::from_sqlx_error; @@ -51,15 +53,20 @@ impl SqlCatalog { pub async fn new(url: &str, name: &str, storage: FileIO) -> Result { install_default_drivers(); - let pool = AnyPool::connect(url).await.map_err(from_sqlx_error)?; + if !sqlx::Sqlite::database_exists(&url).await.map_err(from_sqlx_error)? { + sqlx::Sqlite::create_database(&url).await.map_err(from_sqlx_error)?; + } - let mut connection = pool.acquire().await.map_err(from_sqlx_error)?; + let pool = AnyPoolOptions::new() + .max_connections(20) //configurable? + .idle_timeout(Duration::from_secs(20)) + .test_before_acquire(true) + .connect(url) + .await + .map_err(from_sqlx_error)?; - connection - .transaction(|txn| { - Box::pin(async move { - sqlx::query( - "create table if not exists iceberg_tables ( + sqlx::query( + "create table if not exists iceberg_tables ( catalog_name text not null, table_namespace text not null, table_name text not null, @@ -67,34 +74,21 @@ impl SqlCatalog { previous_metadata_location text, primary key (catalog_name, table_namespace, table_name) );", - ) - .fetch_all(&mut **txn) - .await - }) - }) - .await - .map_err(from_sqlx_error)?; + ) + .execute(&pool) + .await.map_err(from_sqlx_error)?; - connection - .transaction(|txn| { - Box::pin(async move { - sqlx::query( - "create table if not exists iceberg_namespace_properties ( + sqlx::query( + "create table if not exists iceberg_namespace_properties ( catalog_name text not null, namespace text not null, property_key text, property_value text, primary key (catalog_name, namespace, property_key) );", - ) - .fetch_all(&mut **txn) - .await - }) - }) - .await - .map_err(from_sqlx_error)?; - - connection.close().await.map_err(from_sqlx_error)?; + ) + .execute(&pool) + .await.map_err(from_sqlx_error)?; Ok(SqlCatalog { name: name.to_owned(), @@ -216,7 +210,7 @@ impl Catalog for SqlCatalog { let catalog_name = self.name.clone(); let namespace = identifier.namespace().encode_in_url(); let name = identifier.name().to_string(); - let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ? and table_name = ?;").bind(&catalog_name).bind(&namespace).bind(&name).fetch_all(&self.connection).await.map_err(from_sqlx_error)?; + let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables")/* where catalog_name = ? and table_namespace = ? and table_name = ?;").bind(&catalog_name).bind(&namespace).bind(&name)*/.fetch_all(&self.connection).await.map_err(from_sqlx_error)?; let mut iter = rows.iter().map(query_map); Ok(iter.next().is_some()) @@ -282,8 +276,17 @@ impl Catalog for SqlCatalog { let namespace = namespace.encode_in_url(); let name = name.clone(); let metadata_location = metadata_location.to_string(); - sqlx::query("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values (?, ?, ?, ?);").bind(&catalog_name).bind(&namespace).bind(&name).bind(&metadata_location).execute(&self.connection).await.map_err(from_sqlx_error)?; + let identifier = TableIdent::new( NamespaceIdent::new(namespace.clone()), name.to_owned()); + if !self.table_exists(&identifier).await? { + //check if table exists, may be update then, otherwise unique constraint violation + sqlx::query("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values (?, ?, ?, ?);").bind(&catalog_name).bind(&namespace).bind(&name).bind(&metadata_location).execute(&self.connection).await.map_err(from_sqlx_error)?; + } + else { + //update + println!("update is not implemented") + } } + Ok(Table::builder() .file_io(self.storage.clone()) .metadata_location(metadata_location) @@ -303,7 +306,7 @@ impl Catalog for SqlCatalog { #[cfg(test)] pub mod tests { - + use std::thread::sleep; use iceberg::{ io::FileIOBuilder, spec::{NestedField, PrimitiveType, Schema, Type}, @@ -318,7 +321,7 @@ pub mod tests { let dir = TempDir::new().unwrap(); let storage = FileIOBuilder::new_fs_io().build().unwrap(); - let catalog = SqlCatalog::new("sqlite://", "iceberg", storage) + let catalog = SqlCatalog::new("sqlite://iceberg", "iceberg", storage) .await .unwrap(); @@ -361,8 +364,12 @@ pub mod tests { .expect("Failed to list namespaces"); assert_eq!(namespaces[0].encode_in_url(), "test"); - let table = catalog.load_table(&identifier).await.unwrap(); + //load table points to a /var location - check why + + // let table = catalog.load_table(&identifier).await.unwrap(); + + // assert!(table.metadata().location().ends_with("/warehouse/table1")) - assert!(table.metadata().location().ends_with("/warehouse/table1")) + //need to tear down the database and tables } } From 8a10ad1a57ee932946687288cc2b8f9ab3cf6396 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Tue, 12 Mar 2024 18:10:52 +0100 Subject: [PATCH 09/35] use prepared statements --- crates/catalog/sql/Cargo.toml | 6 +- crates/catalog/sql/src/catalog.rs | 112 +++++++++++------------------- 2 files changed, 42 insertions(+), 76 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 1c1c7244f..5440935b1 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -29,18 +29,18 @@ license = { workspace = true } keywords = ["iceberg", "sql", "catalog"] [dependencies] -anyhow = "1" +anyhow = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } dashmap = "5.5.3" futures = { workspace = true } iceberg = { workspace = true } -log = "0.4.20" +log = { workspace = true } opendal = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } +sqlx = { version = "0.7.2", features = ["tls-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } typed-builder = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index cec5b3f8e..fc4296be9 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -19,10 +19,10 @@ use std::sync::Arc; use async_trait::async_trait; use dashmap::DashMap; -use futures::{lock::Mutex, AsyncReadExt, AsyncWriteExt}; +use futures::{AsyncReadExt, AsyncWriteExt}; use sqlx::{ - any::{install_default_drivers, AnyConnectOptions, AnyRow}, - AnyConnection, ConnectOptions, Connection, Row, + any::{install_default_drivers, AnyRow}, + AnyPool, Row, }; use std::collections::HashMap; @@ -38,7 +38,7 @@ use crate::error::from_sqlx_error; /// Sql catalog implementation. pub struct SqlCatalog { name: String, - connection: Arc>, + connection: AnyPool, storage: FileIO, cache: Arc>, } @@ -48,17 +48,10 @@ impl SqlCatalog { pub async fn new(url: &str, name: &str, storage: FileIO) -> Result { install_default_drivers(); - let mut connection = AnyConnectOptions::connect( - &AnyConnectOptions::from_url(&url.try_into()?).map_err(from_sqlx_error)?, - ) - .await - .map_err(from_sqlx_error)?; + let pool = AnyPool::connect(url).await.map_err(from_sqlx_error)?; - connection - .transaction(|txn| { - Box::pin(async move { - sqlx::query( - "create table if not exists iceberg_tables ( + sqlx::query( + "create table if not exists iceberg_tables ( catalog_name text not null, table_namespace text not null, table_name text not null, @@ -66,36 +59,27 @@ impl SqlCatalog { previous_metadata_location text, primary key (catalog_name, table_namespace, table_name) );", - ) - .execute(&mut **txn) - .await - }) - }) - .await - .map_err(from_sqlx_error)?; + ) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; - connection - .transaction(|txn| { - Box::pin(async move { - sqlx::query( - "create table if not exists iceberg_namespace_properties ( + sqlx::query( + "create table if not exists iceberg_namespace_properties ( catalog_name text not null, namespace text not null, property_key text, property_value text, primary key (catalog_name, namespace, property_key) );", - ) - .execute(&mut **txn) - .await - }) - }) - .await - .map_err(from_sqlx_error)?; + ) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; Ok(SqlCatalog { name: name.to_owned(), - connection: Arc::new(Mutex::new(connection)), + connection: pool, storage, cache: Arc::new(DashMap::new()), }) @@ -135,12 +119,14 @@ impl Catalog for SqlCatalog { &self, _parent: Option<&NamespaceIdent>, ) -> Result> { - let mut connection = self.connection.lock().await; - let rows = connection.transaction(|txn|{ - let name = self.name.clone(); - Box::pin(async move { - sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await - })}).await.map_err(from_sqlx_error)?; + let name = self.name.clone(); + let rows = sqlx::query( + "select distinct table_namespace from iceberg_tables where catalog_name = ?;", + ) + .bind(&name) + .fetch_all(&self.connection) + .await + .map_err(from_sqlx_error)?; let iter = rows.iter().map(|row| row.try_get::(0)); Ok(iter @@ -185,13 +171,9 @@ impl Catalog for SqlCatalog { } async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { - let mut connection = self.connection.lock().await; - let rows = connection.transaction(|txn|{ - let name = self.name.clone(); - let namespace = namespace.encode_in_url(); - Box::pin(async move { - sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&mut **txn).await - })}).await.map_err(from_sqlx_error)?; + let name = self.name.clone(); + let namespace = namespace.encode_in_url(); + let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ?;").bind(&name).bind(&namespace).fetch_all(&self.connection).await.map_err(from_sqlx_error)?; let iter = rows.iter().map(query_map); Ok(iter @@ -212,16 +194,10 @@ impl Catalog for SqlCatalog { } async fn stat_table(&self, identifier: &TableIdent) -> Result { - let mut connection = self.connection.lock().await; - let rows = connection.transaction(|txn|{ - let catalog_name = self.name.clone(); - let namespace = identifier.namespace().encode_in_url(); - let name = identifier.name().to_string(); - Box::pin(async move { - sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, - &namespace, - &name)).fetch_all(&mut **txn).await - })}).await.map_err(from_sqlx_error)?; + let catalog_name = self.name.clone(); + let namespace = identifier.namespace().encode_in_url(); + let name = identifier.name().to_string(); + let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ? and table_name = ?;").bind(&catalog_name).bind(&namespace).bind(&name).fetch_all(&self.connection).await.map_err(from_sqlx_error)?; let mut iter = rows.iter().map(query_map); Ok(iter.next().is_some()) @@ -233,16 +209,10 @@ impl Catalog for SqlCatalog { async fn load_table(&self, identifier: &TableIdent) -> Result
{ let metadata_location = { - let mut connection = self.connection.lock().await; - let row = connection.transaction(|txn|{ let catalog_name = self.name.clone(); let namespace = identifier.namespace().encode_in_url(); let name = identifier.name().to_string(); - Box::pin(async move { - sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, - &namespace, - &name)).fetch_one(&mut **txn).await - })}).await.map_err(from_sqlx_error)?; + let row = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ? and table_name = ?;").bind(&catalog_name).bind(&namespace).bind(&name).fetch_one(&self.connection).await.map_err(from_sqlx_error)?; let row = query_map(&row).map_err(from_sqlx_error)?; row.metadata_location @@ -301,15 +271,11 @@ impl Catalog for SqlCatalog { .write_all(&serde_json::to_vec(&metadata)?) .await?; { - let mut connection = self.connection.lock().await; - connection.transaction(|txn|{ - let catalog_name = self.name.clone(); - let namespace = namespace.encode_in_url(); - let name = creation.name.clone(); - let metadata_location = metadata_location.to_string(); - Box::pin(async move { - sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&mut **txn).await - })}).await.map_err(from_sqlx_error)?; + let catalog_name = self.name.clone(); + let namespace = namespace.encode_in_url(); + let name = creation.name.clone(); + let metadata_location = metadata_location.to_string(); + sqlx::query("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values (?, ?, ?, ?);").bind(&catalog_name).bind(&namespace).bind(&name).bind(&metadata_location).execute(&self.connection).await.map_err(from_sqlx_error)?; } Ok(Table::builder() .file_io(self.storage.clone()) From 8556d3bd047bedfca459ce2f1ad6073729fcd873 Mon Sep 17 00:00:00 2001 From: hpal Date: Wed, 10 Apr 2024 10:44:05 -0700 Subject: [PATCH 10/35] move the sqllite database creation part inside test case. --- crates/catalog/sql/Cargo.toml | 2 +- crates/catalog/sql/src/catalog.rs | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 893a47cbe..5f0992657 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -40,7 +40,7 @@ opendal = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -sqlx = { version = "0.7.2", features = ["tls-rustls", "any", "sqlite", "postgres", "mysql", "migrate"], default-features = false } +sqlx = { version = "0.7.2", features = ["tls-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } typed-builder = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index baccf40f3..c6448b9dc 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -23,7 +23,6 @@ use futures::{AsyncReadExt, AsyncWriteExt}; use sqlx::{any::{install_default_drivers, AnyRow}, AnyPool, Connection, Database, Row}; use std::collections::HashMap; use sqlx::any::AnyPoolOptions; -use sqlx::migrate::MigrateDatabase; use iceberg::{ io::FileIO, @@ -53,10 +52,6 @@ impl SqlCatalog { pub async fn new(url: &str, name: &str, storage: FileIO) -> Result { install_default_drivers(); - if !sqlx::Sqlite::database_exists(&url).await.map_err(from_sqlx_error)? { - sqlx::Sqlite::create_database(&url).await.map_err(from_sqlx_error)?; - } - let pool = AnyPoolOptions::new() .max_connections(20) //configurable? .idle_timeout(Duration::from_secs(20)) @@ -315,16 +310,25 @@ pub mod tests { use tempfile::TempDir; use crate::SqlCatalog; + use sqlx::migrate::MigrateDatabase; #[tokio::test] async fn test_create_update_drop_table() { let dir = TempDir::new().unwrap(); let storage = FileIOBuilder::new_fs_io().build().unwrap(); - let catalog = SqlCatalog::new("sqlite://iceberg", "iceberg", storage) + //name of the database should be part of the url. usually for sqllite it creates or opens one if (.db found) + let sqlLiteUrl = "sqlite://iceberg"; + + if !sqlx::Sqlite::database_exists(&sqlLiteUrl).await.unwrap() { + sqlx::Sqlite::create_database(&sqlLiteUrl).await.unwrap(); + } + + let catalog = SqlCatalog::new(sqlLiteUrl, "iceberg", storage) .await .unwrap(); + let namespace = NamespaceIdent::new("test".to_owned()); let identifier = TableIdent::new(namespace.clone(), "table1".to_owned()); @@ -370,6 +374,7 @@ pub mod tests { // assert!(table.metadata().location().ends_with("/warehouse/table1")) - //need to tear down the database and tables + //tear down the database and tables + sqlx::Sqlite::drop_database(&sqlLiteUrl).await.unwrap(); } } From 0ce0fda1b03ae6d3e7080f752ec8012d6b517eae Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Wed, 13 Mar 2024 11:25:08 +0100 Subject: [PATCH 11/35] run test --- crates/catalog/sql/Cargo.toml | 1 + crates/catalog/sql/src/catalog.rs | 44 +++++++++++++++++++++---------- crates/catalog/sql/src/error.rs | 2 +- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 5440935b1..f727ad837 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -48,5 +48,6 @@ uuid = { workspace = true, features = ["v4"] } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +sqlx = { version = "0.7.2", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "postgres", "mysql"], default-features = false } tempfile = { workspace = true } tokio = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index fc4296be9..c47bc5514 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -22,7 +22,7 @@ use dashmap::DashMap; use futures::{AsyncReadExt, AsyncWriteExt}; use sqlx::{ any::{install_default_drivers, AnyRow}, - AnyPool, Row, + AnyPool, Connection, Row, }; use std::collections::HashMap; @@ -50,32 +50,48 @@ impl SqlCatalog { let pool = AnyPool::connect(url).await.map_err(from_sqlx_error)?; - sqlx::query( - "create table if not exists iceberg_tables ( + let mut connection = pool.acquire().await.map_err(from_sqlx_error)?; + + connection + .transaction(|txn| { + Box::pin(async move { + sqlx::query( + "create table if not exists iceberg_tables ( catalog_name text not null, table_namespace text not null, table_name text not null, - metadata_location text not null, + metadata_location text, previous_metadata_location text, primary key (catalog_name, table_namespace, table_name) );", - ) - .execute(&pool) - .await - .map_err(from_sqlx_error)?; + ) + .fetch_all(&mut **txn) + .await + }) + }) + .await + .map_err(from_sqlx_error)?; - sqlx::query( - "create table if not exists iceberg_namespace_properties ( + connection + .transaction(|txn| { + Box::pin(async move { + sqlx::query( + "create table if not exists iceberg_namespace_properties ( catalog_name text not null, namespace text not null, property_key text, property_value text, primary key (catalog_name, namespace, property_key) );", - ) - .execute(&pool) - .await - .map_err(from_sqlx_error)?; + ) + .fetch_all(&mut **txn) + .await + }) + }) + .await + .map_err(from_sqlx_error)?; + + connection.close().await.map_err(from_sqlx_error)?; Ok(SqlCatalog { name: name.to_owned(), diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs index c6b998beb..90bba1f05 100644 --- a/crates/catalog/sql/src/error.rs +++ b/crates/catalog/sql/src/error.rs @@ -21,7 +21,7 @@ use iceberg::{Error, ErrorKind}; pub fn from_sqlx_error(error: sqlx::Error) -> Error { Error::new( ErrorKind::Unexpected, - "operation failed for hitting io error".to_string(), + "operation failed for hitting sqlx error".to_string(), ) .with_source(error) } From a9bdb81b9f1f52bc03a0fe4e6d31f05eb538ce26 Mon Sep 17 00:00:00 2001 From: hpal Date: Wed, 10 Apr 2024 13:25:10 -0700 Subject: [PATCH 12/35] style fix and few additional logic removal and add todo check. --- crates/catalog/sql/src/catalog.rs | 94 +++++++++++++++++-------------- 1 file changed, 51 insertions(+), 43 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c6448b9dc..185cc89c7 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -20,9 +20,12 @@ use std::sync::Arc; use async_trait::async_trait; use dashmap::DashMap; use futures::{AsyncReadExt, AsyncWriteExt}; -use sqlx::{any::{install_default_drivers, AnyRow}, AnyPool, Connection, Database, Row}; -use std::collections::HashMap; use sqlx::any::AnyPoolOptions; +use sqlx::{ + any::{install_default_drivers, AnyRow}, + AnyPool, Row, +}; +use std::collections::HashMap; use iceberg::{ io::FileIO, @@ -31,10 +34,8 @@ use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; -use uuid::Uuid; use std::time::Duration; -use opendal::Scheme::Sqlite; -use sqlx::Error::Migrate; +use uuid::Uuid; use crate::error::from_sqlx_error; @@ -62,28 +63,30 @@ impl SqlCatalog { sqlx::query( "create table if not exists iceberg_tables ( - catalog_name text not null, - table_namespace text not null, - table_name text not null, - metadata_location text, - previous_metadata_location text, - primary key (catalog_name, table_namespace, table_name) - );", - ) - .execute(&pool) - .await.map_err(from_sqlx_error)?; - - sqlx::query( + catalog_name text not null, + table_namespace text not null, + table_name text not null, + metadata_location text, + previous_metadata_location text, + primary key (catalog_name, table_namespace, table_name) + );", + ) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; + + sqlx::query( "create table if not exists iceberg_namespace_properties ( - catalog_name text not null, - namespace text not null, - property_key text, - property_value text, - primary key (catalog_name, namespace, property_key) - );", - ) - .execute(&pool) - .await.map_err(from_sqlx_error)?; + catalog_name text not null, + namespace text not null, + property_key text, + property_value text, + primary key (catalog_name, namespace, property_key) + );", + ) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; Ok(SqlCatalog { name: name.to_owned(), @@ -205,7 +208,13 @@ impl Catalog for SqlCatalog { let catalog_name = self.name.clone(); let namespace = identifier.namespace().encode_in_url(); let name = identifier.name().to_string(); - let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables")/* where catalog_name = ? and table_namespace = ? and table_name = ?;").bind(&catalog_name).bind(&namespace).bind(&name)*/.fetch_all(&self.connection).await.map_err(from_sqlx_error)?; + let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ? and table_name = ?;") + .bind(&catalog_name) + .bind(&namespace) + .bind(&name) + .fetch_all(&self.connection) + .await + .map_err(from_sqlx_error)?; let mut iter = rows.iter().map(query_map); Ok(iter.next().is_some()) @@ -271,15 +280,16 @@ impl Catalog for SqlCatalog { let namespace = namespace.encode_in_url(); let name = name.clone(); let metadata_location = metadata_location.to_string(); - let identifier = TableIdent::new( NamespaceIdent::new(namespace.clone()), name.to_owned()); - if !self.table_exists(&identifier).await? { - //check if table exists, may be update then, otherwise unique constraint violation - sqlx::query("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values (?, ?, ?, ?);").bind(&catalog_name).bind(&namespace).bind(&name).bind(&metadata_location).execute(&self.connection).await.map_err(from_sqlx_error)?; - } - else { - //update - println!("update is not implemented") - } + + //TODO do we need to check if table exists,then update or delete and insert, otherwise unique constraint violation occurs + sqlx::query("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values (?, ?, ?, ?);") + .bind(&catalog_name) + .bind(&namespace) + .bind(&name) + .bind(&metadata_location) + .execute(&self.connection) + .await + .map_err(from_sqlx_error)?; } Ok(Table::builder() @@ -301,7 +311,6 @@ impl Catalog for SqlCatalog { #[cfg(test)] pub mod tests { - use std::thread::sleep; use iceberg::{ io::FileIOBuilder, spec::{NestedField, PrimitiveType, Schema, Type}, @@ -318,17 +327,16 @@ pub mod tests { let storage = FileIOBuilder::new_fs_io().build().unwrap(); //name of the database should be part of the url. usually for sqllite it creates or opens one if (.db found) - let sqlLiteUrl = "sqlite://iceberg"; + let sql_lite_url = "sqlite://iceberg"; - if !sqlx::Sqlite::database_exists(&sqlLiteUrl).await.unwrap() { - sqlx::Sqlite::create_database(&sqlLiteUrl).await.unwrap(); + if !sqlx::Sqlite::database_exists(sql_lite_url).await.unwrap() { + sqlx::Sqlite::create_database(sql_lite_url).await.unwrap(); } - let catalog = SqlCatalog::new(sqlLiteUrl, "iceberg", storage) + let catalog = SqlCatalog::new(sql_lite_url, "iceberg", storage) .await .unwrap(); - let namespace = NamespaceIdent::new("test".to_owned()); let identifier = TableIdent::new(namespace.clone(), "table1".to_owned()); @@ -375,6 +383,6 @@ pub mod tests { // assert!(table.metadata().location().ends_with("/warehouse/table1")) //tear down the database and tables - sqlx::Sqlite::drop_database(&sqlLiteUrl).await.unwrap(); + sqlx::Sqlite::drop_database(sql_lite_url).await.unwrap(); } } From 769f8f54e8a347b1421d091da0e10921a4acaf8e Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 8 Apr 2024 21:06:31 +0200 Subject: [PATCH 13/35] rebase --- crates/catalog/sql/src/catalog.rs | 37 ++++++++--------------- crates/iceberg/src/spec/table_metadata.rs | 23 ++------------ 2 files changed, 15 insertions(+), 45 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c47bc5514..f812b5b10 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -27,8 +27,11 @@ use sqlx::{ use std::collections::HashMap; use iceberg::{ - io::FileIO, spec::TableMetadata, table::Table, Catalog, Error, ErrorKind, Namespace, - NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, + io::FileIO, + spec::{TableMetadata, TableMetadataBuilder}, + table::Table, + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, + TableIdent, }; use uuid::Uuid; @@ -209,7 +212,7 @@ impl Catalog for SqlCatalog { .map_err(from_sqlx_error)?) } - async fn stat_table(&self, identifier: &TableIdent) -> Result { + async fn table_exists(&self, identifier: &TableIdent) -> Result { let catalog_name = self.name.clone(); let namespace = identifier.namespace().encode_in_url(); let name = identifier.name().to_string(); @@ -257,29 +260,17 @@ impl Catalog for SqlCatalog { namespace: &NamespaceIdent, creation: TableCreation, ) -> Result
{ - let location = creation.location.ok_or(Error::new( + let location = creation.location.as_ref().ok_or(Error::new( ErrorKind::DataInvalid, "Table creation with the Sql catalog requires a location.", ))?; + let name = creation.name.clone(); let uuid = Uuid::new_v4(); let metadata_location = location.clone() + "/metadata/" + "0-" + &uuid.to_string() + ".metadata.json"; - let metadata = TableMetadata::builder() - .location(location) - .current_schema_id(creation.schema.schema_id()) - .last_column_id(creation.schema.highest_field_id()) - .schemas(HashMap::from_iter(vec![( - creation.schema.schema_id(), - creation.schema.into(), - )])) - .partition_specs(HashMap::new()) - .last_partition_id(0) - .current_snapshot_id(-1) - .sort_orders(HashMap::new()) - .default_sort_order_id(0) - .build(); + let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?; let file = self.storage.new_output(&metadata_location)?; file.writer() @@ -289,14 +280,14 @@ impl Catalog for SqlCatalog { { let catalog_name = self.name.clone(); let namespace = namespace.encode_in_url(); - let name = creation.name.clone(); + let name = name.clone(); let metadata_location = metadata_location.to_string(); sqlx::query("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values (?, ?, ?, ?);").bind(&catalog_name).bind(&namespace).bind(&name).bind(&metadata_location).execute(&self.connection).await.map_err(from_sqlx_error)?; } Ok(Table::builder() .file_io(self.storage.clone()) .metadata_location(metadata_location) - .identifier(TableIdent::new(namespace.clone(), creation.name)) + .identifier(TableIdent::new(namespace.clone(), name)) .metadata(metadata) .build()) } @@ -315,7 +306,7 @@ pub mod tests { use iceberg::{ io::FileIOBuilder, - spec::{NestedField, PrimitiveType, Schema, SortOrder, Type, UnboundPartitionSpec}, + spec::{NestedField, PrimitiveType, Schema, Type}, Catalog, NamespaceIdent, TableCreation, TableIdent, }; use tempfile::TempDir; @@ -348,14 +339,12 @@ pub mod tests { .name("table1".to_owned()) .location(dir.path().to_str().unwrap().to_owned() + "/warehouse/table1") .schema(schema) - .partition_spec(UnboundPartitionSpec::default()) - .sort_order(SortOrder::default()) .build(); catalog.create_table(&namespace, creation).await.unwrap(); let exists = catalog - .stat_table(&identifier) + .table_exists(&identifier) .await .expect("Table doesn't exist"); assert!(exists); diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 36961a7ea..b907b0556 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -22,12 +22,7 @@ use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use std::cmp::Ordering; use std::fmt::{Display, Formatter}; -use std::{ - collections::HashMap, - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; -use typed_builder::TypedBuilder; +use std::{collections::HashMap, sync::Arc}; use uuid::Uuid; use super::{ @@ -52,27 +47,21 @@ pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0; /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; -#[derive(Debug, PartialEq, Deserialize, Eq, Clone, TypedBuilder)] +#[derive(Debug, PartialEq, Deserialize, Eq, Clone)] #[serde(try_from = "TableMetadataEnum")] /// Fields for the version 2 of the table metadata. /// /// We assume that this data structure is always valid, so we will panic when invalid error happens. /// We check the validity of this data structure when constructing. pub struct TableMetadata { - #[builder(default)] /// Integer Version for the format. pub(crate) format_version: FormatVersion, - #[builder(default_code = "Uuid::new_v4()")] /// A UUID that identifies the table pub(crate) table_uuid: Uuid, /// Location tables base location pub(crate) location: String, - #[builder(default)] /// The tables highest sequence number pub(crate) last_sequence_number: i64, - #[builder( - default_code = "SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as i64" - )] /// Timestamp in milliseconds from the unix epoch when the table was last updated. pub(crate) last_updated_ms: i64, /// An integer; the highest assigned column ID for the table. @@ -83,27 +72,22 @@ pub struct TableMetadata { pub(crate) current_schema_id: i32, /// A list of partition specs, stored as full partition spec objects. pub(crate) partition_specs: HashMap, - #[builder(default = DEFAULT_SPEC_ID)] /// ID of the “current” spec that writers should use by default. pub(crate) default_spec_id: i32, /// An integer; the highest assigned partition field ID across all partition specs for the table. pub(crate) last_partition_id: i32, - #[builder(default)] ///A string to string map of table properties. This is used to control settings that /// affect reading and writing and is not intended to be used for arbitrary metadata. /// For example, commit.retry.num-retries is used to control the number of commit retries. pub(crate) properties: HashMap, - #[builder(setter(strip_option))] /// long ID of the current table snapshot; must be the same as the current /// ID of the main branch in refs. pub(crate) current_snapshot_id: Option, - #[builder(default)] ///A list of valid snapshots. Valid snapshots are snapshots for which all /// data files exist in the file system. A data file must not be deleted /// from the file system until the last snapshot in which it was listed is /// garbage collected. pub(crate) snapshots: HashMap, - #[builder(default)] /// A list (optional) of timestamp and snapshot ID pairs that encodes changes /// to the current snapshot for the table. Each time the current-snapshot-id /// is changed, a new entry should be added with the last-updated-ms @@ -111,7 +95,6 @@ pub struct TableMetadata { /// the list of valid snapshots, all entries before a snapshot that has /// expired should be removed. pub(crate) snapshot_log: Vec, - #[builder(default)] /// A list (optional) of timestamp and metadata file location pairs /// that encodes changes to the previous metadata files for the table. @@ -122,12 +105,10 @@ pub struct TableMetadata { pub(crate) metadata_log: Vec, /// A list of sort orders, stored as full sort order objects. pub(crate) sort_orders: HashMap, - #[builder(default = DEFAULT_SORT_ORDER_ID)] /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. pub(crate) default_sort_order_id: i64, - #[builder(default)] ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id From 4dc95c35e88f498cf417c92135e0d6b47e62f696 Mon Sep 17 00:00:00 2001 From: himadripal Date: Wed, 17 Apr 2024 00:29:52 -0700 Subject: [PATCH 14/35] create sqlconfig, fix rest of the tests and remove todo --- crates/catalog/sql/src/catalog.rs | 68 +++++++++++++++++++++++-------- 1 file changed, 50 insertions(+), 18 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 185cc89c7..a95b2554b 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -35,10 +35,21 @@ use iceberg::{ TableIdent, }; use std::time::Duration; +use typed_builder::TypedBuilder; use uuid::Uuid; use crate::error::from_sqlx_error; +/// Sql catalog config +#[derive(Debug, TypedBuilder)] +pub struct SqlCatalogConfig { + url: String, + name: String, + warehouse: String, + #[builder(default)] + props: HashMap, +} + #[derive(Debug)] /// Sql catalog implementation. pub struct SqlCatalog { @@ -50,14 +61,29 @@ pub struct SqlCatalog { impl SqlCatalog { /// Create new sql catalog instance - pub async fn new(url: &str, name: &str, storage: FileIO) -> Result { + pub async fn new(config: SqlCatalogConfig) -> Result { install_default_drivers(); + let max_connections: u32 = config + .props + .get("pool.max-connections") + .map(|v| v.parse().unwrap()) + .unwrap_or(10); + let idle_timeout: u64 = config + .props + .get("pool.idle-timeout") + .map(|v| v.parse().unwrap()) + .unwrap_or(10); + let test_before_acquire: bool = config + .props + .get("pool.test-before-acquire") + .map(|v| v.parse().unwrap()) + .unwrap_or(true); let pool = AnyPoolOptions::new() - .max_connections(20) //configurable? - .idle_timeout(Duration::from_secs(20)) - .test_before_acquire(true) - .connect(url) + .max_connections(max_connections) + .idle_timeout(Duration::from_secs(idle_timeout)) + .test_before_acquire(test_before_acquire) + .connect(&config.url) .await .map_err(from_sqlx_error)?; @@ -88,10 +114,14 @@ impl SqlCatalog { .await .map_err(from_sqlx_error)?; + let file_io = FileIO::from_path(&config.warehouse)? + .with_props(&config.props) + .build()?; + Ok(SqlCatalog { - name: name.to_owned(), + name: config.name.to_owned(), connection: pool, - storage, + storage: file_io, cache: Arc::new(DashMap::new()), }) } @@ -281,7 +311,6 @@ impl Catalog for SqlCatalog { let name = name.clone(); let metadata_location = metadata_location.to_string(); - //TODO do we need to check if table exists,then update or delete and insert, otherwise unique constraint violation occurs sqlx::query("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values (?, ?, ?, ?);") .bind(&catalog_name) .bind(&namespace) @@ -312,19 +341,18 @@ impl Catalog for SqlCatalog { #[cfg(test)] pub mod tests { use iceberg::{ - io::FileIOBuilder, spec::{NestedField, PrimitiveType, Schema, Type}, Catalog, NamespaceIdent, TableCreation, TableIdent, }; use tempfile::TempDir; - use crate::SqlCatalog; + use crate::{SqlCatalog, SqlCatalogConfig}; use sqlx::migrate::MigrateDatabase; #[tokio::test] async fn test_create_update_drop_table() { - let dir = TempDir::new().unwrap(); - let storage = FileIOBuilder::new_fs_io().build().unwrap(); + let dir = TempDir::with_prefix("sql-test").unwrap(); + let warehouse_root = dir.path().to_str().unwrap(); //name of the database should be part of the url. usually for sqllite it creates or opens one if (.db found) let sql_lite_url = "sqlite://iceberg"; @@ -333,9 +361,13 @@ pub mod tests { sqlx::Sqlite::create_database(sql_lite_url).await.unwrap(); } - let catalog = SqlCatalog::new(sql_lite_url, "iceberg", storage) - .await - .unwrap(); + let config = SqlCatalogConfig::builder() + .url(sql_lite_url.to_string()) + .name("iceberg".to_string()) + .warehouse(warehouse_root.to_owned()) + .build(); + + let catalog = SqlCatalog::new(config).await.unwrap(); let namespace = NamespaceIdent::new("test".to_owned()); @@ -352,7 +384,7 @@ pub mod tests { let creation = TableCreation::builder() .name("table1".to_owned()) - .location(dir.path().to_str().unwrap().to_owned() + "/warehouse/table1") + .location(warehouse_root.to_owned() + "/warehouse/table1") .schema(schema) .build(); @@ -378,9 +410,9 @@ pub mod tests { //load table points to a /var location - check why - // let table = catalog.load_table(&identifier).await.unwrap(); + let table = catalog.load_table(&identifier).await.unwrap(); - // assert!(table.metadata().location().ends_with("/warehouse/table1")) + assert!(table.metadata().location().ends_with("/warehouse/table1")); //tear down the database and tables sqlx::Sqlite::drop_database(sql_lite_url).await.unwrap(); From 0e7b2ea03518deb441dcb3aa1c5d94084dde6e7c Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Wed, 24 Apr 2024 21:00:17 +0200 Subject: [PATCH 15/35] use varchar --- crates/catalog/sql/Cargo.toml | 4 ++-- crates/catalog/sql/src/catalog.rs | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 5f0992657..40fba3f1d 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -40,7 +40,7 @@ opendal = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -sqlx = { version = "0.7.2", features = ["tls-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } +sqlx = { version = "0.7.4", features = ["tls-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } typed-builder = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } @@ -48,6 +48,6 @@ uuid = { workspace = true, features = ["v4"] } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } -sqlx = { version = "0.7.2", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "postgres", "mysql","migrate"], default-features = false } +sqlx = { version = "0.7.4", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "postgres", "mysql","migrate"], default-features = false } tempfile = { workspace = true } tokio = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index a95b2554b..96a4e62aa 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -89,11 +89,11 @@ impl SqlCatalog { sqlx::query( "create table if not exists iceberg_tables ( - catalog_name text not null, - table_namespace text not null, - table_name text not null, - metadata_location text, - previous_metadata_location text, + catalog_name varchar(255) not null, + table_namespace varchar(255) not null, + table_name varchar(255) not null, + metadata_location varchar(255), + previous_metadata_location varchar(255), primary key (catalog_name, table_namespace, table_name) );", ) @@ -103,10 +103,10 @@ impl SqlCatalog { sqlx::query( "create table if not exists iceberg_namespace_properties ( - catalog_name text not null, - namespace text not null, - property_key text, - property_value text, + catalog_name varchar(255) not null, + namespace varchar(255) not null, + property_key varchar(255), + property_value varchar(255), primary key (catalog_name, namespace, property_key) );", ) From 01c9e43c3b546f895f5583e8b24ae139b7970da9 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Thu, 25 Apr 2024 09:26:53 +0200 Subject: [PATCH 16/35] use static strings for sql identifiers --- crates/catalog/sql/src/catalog.rs | 144 ++++++++++++++++++++++++------ 1 file changed, 119 insertions(+), 25 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 96a4e62aa..c95070327 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -40,6 +40,13 @@ use uuid::Uuid; use crate::error::from_sqlx_error; +static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables"; +static CATALOG_NAME: &str = "catalog_name"; +static TABLE_NAME: &str = "table_name"; +static TABLE_NAMESPACE: &str = "table_namespace"; +static METADATA_LOCATION_PROP: &str = "metadata_locaion"; +static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; + /// Sql catalog config #[derive(Debug, TypedBuilder)] pub struct SqlCatalogConfig { @@ -88,14 +95,26 @@ impl SqlCatalog { .map_err(from_sqlx_error)?; sqlx::query( - "create table if not exists iceberg_tables ( - catalog_name varchar(255) not null, - table_namespace varchar(255) not null, - table_name varchar(255) not null, - metadata_location varchar(255), - previous_metadata_location varchar(255), - primary key (catalog_name, table_namespace, table_name) - );", + &("create table if not exists ".to_string() + + CATALOG_TABLE_VIEW_NAME + + " (" + + CATALOG_NAME + + " varchar(255) not null," + + TABLE_NAMESPACE + + " varchar(255) not null," + + TABLE_NAME + + " varchar(255) not null," + + METADATA_LOCATION_PROP + + " varchar(255)," + + PREVIOUS_METADATA_LOCATION_PROP + + " varchar(255), primary key (" + + CATALOG_NAME + + ", " + + TABLE_NAMESPACE + + ", " + + TABLE_NAME + + ") + );"), ) .execute(&pool) .await @@ -214,7 +233,28 @@ impl Catalog for SqlCatalog { async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { let name = self.name.clone(); let namespace = namespace.encode_in_url(); - let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ?;").bind(&name).bind(&namespace).fetch_all(&self.connection).await.map_err(from_sqlx_error)?; + let rows = sqlx::query( + &("select ".to_string() + + TABLE_NAMESPACE + + ", " + + TABLE_NAME + + ", " + + METADATA_LOCATION_PROP + + ", " + + PREVIOUS_METADATA_LOCATION_PROP + + " from " + + CATALOG_TABLE_VIEW_NAME + + " where " + + CATALOG_NAME + + " = ? and " + + TABLE_NAMESPACE + + "= ?;"), + ) + .bind(&name) + .bind(&namespace) + .fetch_all(&self.connection) + .await + .map_err(from_sqlx_error)?; let iter = rows.iter().map(query_map); Ok(iter @@ -238,13 +278,31 @@ impl Catalog for SqlCatalog { let catalog_name = self.name.clone(); let namespace = identifier.namespace().encode_in_url(); let name = identifier.name().to_string(); - let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ? and table_name = ?;") - .bind(&catalog_name) - .bind(&namespace) - .bind(&name) - .fetch_all(&self.connection) - .await - .map_err(from_sqlx_error)?; + let rows = sqlx::query( + &("select ".to_string() + + TABLE_NAMESPACE + + ", " + + TABLE_NAME + + ", " + + METADATA_LOCATION_PROP + + ", " + + PREVIOUS_METADATA_LOCATION_PROP + + " from " + + CATALOG_TABLE_VIEW_NAME + + " where " + + CATALOG_NAME + + " = ? and " + + TABLE_NAMESPACE + + " = ? and " + + TABLE_NAME + + " = ?;"), + ) + .bind(&catalog_name) + .bind(&namespace) + .bind(&name) + .fetch_all(&self.connection) + .await + .map_err(from_sqlx_error)?; let mut iter = rows.iter().map(query_map); Ok(iter.next().is_some()) @@ -259,7 +317,31 @@ impl Catalog for SqlCatalog { let catalog_name = self.name.clone(); let namespace = identifier.namespace().encode_in_url(); let name = identifier.name().to_string(); - let row = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ? and table_name = ?;").bind(&catalog_name).bind(&namespace).bind(&name).fetch_one(&self.connection).await.map_err(from_sqlx_error)?; + let row = sqlx::query( + &("select ".to_string() + + TABLE_NAMESPACE + + ", " + + TABLE_NAME + + ", " + + METADATA_LOCATION_PROP + + ", " + + PREVIOUS_METADATA_LOCATION_PROP + + " from " + + CATALOG_TABLE_VIEW_NAME + + " where " + + CATALOG_NAME + + " = ? and " + + TABLE_NAMESPACE + + " = ? and " + + TABLE_NAME + + " = ?;"), + ) + .bind(&catalog_name) + .bind(&namespace) + .bind(&name) + .fetch_one(&self.connection) + .await + .map_err(from_sqlx_error)?; let row = query_map(&row).map_err(from_sqlx_error)?; row.metadata_location @@ -311,14 +393,26 @@ impl Catalog for SqlCatalog { let name = name.clone(); let metadata_location = metadata_location.to_string(); - sqlx::query("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values (?, ?, ?, ?);") - .bind(&catalog_name) - .bind(&namespace) - .bind(&name) - .bind(&metadata_location) - .execute(&self.connection) - .await - .map_err(from_sqlx_error)?; + sqlx::query( + &("insert into ".to_string() + + CATALOG_TABLE_VIEW_NAME + + " (" + + CATALOG_NAME + + ", " + + TABLE_NAMESPACE + + ", " + + TABLE_NAME + + ", " + + METADATA_LOCATION_PROP + + ") values (?, ?, ?, ?);"), + ) + .bind(&catalog_name) + .bind(&namespace) + .bind(&name) + .bind(&metadata_location) + .execute(&self.connection) + .await + .map_err(from_sqlx_error)?; } Ok(Table::builder() From fead32ce02da535664c8d277e735d3a5a0fe7257 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Thu, 25 Apr 2024 09:36:57 +0200 Subject: [PATCH 17/35] fix namespace encoding --- crates/catalog/sql/src/catalog.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c95070327..b1675c367 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -46,6 +46,7 @@ static TABLE_NAME: &str = "table_name"; static TABLE_NAMESPACE: &str = "table_namespace"; static METADATA_LOCATION_PROP: &str = "metadata_locaion"; static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; +static RECORD_TYPE: &str = "iceberg_type"; /// Sql catalog config #[derive(Debug, TypedBuilder)] @@ -107,7 +108,9 @@ impl SqlCatalog { + METADATA_LOCATION_PROP + " varchar(255)," + PREVIOUS_METADATA_LOCATION_PROP - + " varchar(255), primary key (" + + " varchar(255)," + + RECORD_TYPE + + " varchar(5), primary key (" + CATALOG_NAME + ", " + TABLE_NAMESPACE @@ -232,7 +235,7 @@ impl Catalog for SqlCatalog { async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { let name = self.name.clone(); - let namespace = namespace.encode_in_url(); + let namespace = namespace.join("."); let rows = sqlx::query( &("select ".to_string() + TABLE_NAMESPACE From d18393dfedc27b1806427ccb714b05e2f1847aaa Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Thu, 25 Apr 2024 09:40:44 +0200 Subject: [PATCH 18/35] fix typo --- crates/catalog/sql/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index b1675c367..ee6562ffc 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -44,7 +44,7 @@ static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables"; static CATALOG_NAME: &str = "catalog_name"; static TABLE_NAME: &str = "table_name"; static TABLE_NAMESPACE: &str = "table_namespace"; -static METADATA_LOCATION_PROP: &str = "metadata_locaion"; +static METADATA_LOCATION_PROP: &str = "metadata_location"; static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; static RECORD_TYPE: &str = "iceberg_type"; From 0b1141cc60ebd498a806dbc558e3c99b8def8b0d Mon Sep 17 00:00:00 2001 From: JanKaul Date: Thu, 25 Apr 2024 09:41:57 +0200 Subject: [PATCH 19/35] Fix heading Co-authored-by: Renjie Liu --- crates/catalog/sql/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/lib.rs b/crates/catalog/sql/src/lib.rs index cfacac3d9..6861dab3f 100644 --- a/crates/catalog/sql/src/lib.rs +++ b/crates/catalog/sql/src/lib.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Iceberg REST API implementation. +//! Iceberg sql catalog implementation. #![deny(missing_docs)] From 80e22d526334ea57cb9b2475615a6547c1479928 Mon Sep 17 00:00:00 2001 From: JanKaul Date: Fri, 26 Apr 2024 20:43:53 +0200 Subject: [PATCH 20/35] Update crates/catalog/sql/src/catalog.rs Co-authored-by: Renjie Liu --- crates/catalog/sql/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index ee6562ffc..d904c1a8f 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -63,7 +63,7 @@ pub struct SqlCatalogConfig { pub struct SqlCatalog { name: String, connection: AnyPool, - storage: FileIO, + fileio: FileIO, cache: Arc>, } From 193da0bc153d8988e7ff1a656c37773c85b92a2e Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Fri, 26 Apr 2024 20:56:12 +0200 Subject: [PATCH 21/35] rename fileio --- crates/catalog/sql/src/catalog.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index d904c1a8f..13a7742d9 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -143,7 +143,7 @@ impl SqlCatalog { Ok(SqlCatalog { name: config.name.to_owned(), connection: pool, - storage: file_io, + fileio: file_io, cache: Arc::new(DashMap::new()), }) } @@ -349,7 +349,7 @@ impl Catalog for SqlCatalog { row.metadata_location }; - let file = self.storage.new_input(&metadata_location)?; + let file = self.fileio.new_input(&metadata_location)?; let mut json = String::new(); file.reader().await?.read_to_string(&mut json).await?; @@ -360,7 +360,7 @@ impl Catalog for SqlCatalog { .insert(identifier.clone(), (metadata_location, metadata.clone())); let table = Table::builder() - .file_io(self.storage.clone()) + .file_io(self.fileio.clone()) .identifier(identifier.clone()) .metadata(metadata) .build(); @@ -385,7 +385,7 @@ impl Catalog for SqlCatalog { let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?; - let file = self.storage.new_output(&metadata_location)?; + let file = self.fileio.new_output(&metadata_location)?; file.writer() .await? .write_all(&serde_json::to_vec(&metadata)?) @@ -419,7 +419,7 @@ impl Catalog for SqlCatalog { } Ok(Table::builder() - .file_io(self.storage.clone()) + .file_io(self.fileio.clone()) .metadata_location(metadata_location) .identifier(TableIdent::new(namespace.clone(), name)) .metadata(metadata) From 0318576add5e0a2425791e7a54cb8ff43b874e93 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Fri, 26 Apr 2024 21:00:39 +0200 Subject: [PATCH 22/35] simplify conversion from string to NamespaceIdent --- crates/catalog/sql/src/catalog.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 13a7742d9..5b9ee6007 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -195,10 +195,8 @@ impl Catalog for SqlCatalog { Ok(iter .map(|x| { x.and_then(|y| { - NamespaceIdent::from_vec( - y.split('.').map(ToString::to_string).collect::>(), - ) - .map_err(|err| sqlx::Error::Decode(Box::new(err))) + NamespaceIdent::from_strs(y.split(".")) + .map_err(|err| sqlx::Error::Decode(Box::new(err))) }) }) .collect::>() @@ -263,13 +261,8 @@ impl Catalog for SqlCatalog { Ok(iter .map(|x| { x.and_then(|y| { - let namespace = NamespaceIdent::from_vec( - y.table_namespace - .split('.') - .map(ToString::to_string) - .collect::>(), - ) - .map_err(|err| sqlx::Error::Decode(Box::new(err)))?; + let namespace = NamespaceIdent::from_strs(y.table_namespace.split(".")) + .map_err(|err| sqlx::Error::Decode(Box::new(err)))?; Ok(TableIdent::new(namespace, y.table_name)) }) }) @@ -279,7 +272,7 @@ impl Catalog for SqlCatalog { async fn table_exists(&self, identifier: &TableIdent) -> Result { let catalog_name = self.name.clone(); - let namespace = identifier.namespace().encode_in_url(); + let namespace = identifier.namespace().join("."); let name = identifier.name().to_string(); let rows = sqlx::query( &("select ".to_string() From ab657932a8e66209d164e8c036fb0576c2ca40d7 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Fri, 26 Apr 2024 21:02:26 +0200 Subject: [PATCH 23/35] use uri for database connection --- crates/catalog/sql/src/catalog.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 5b9ee6007..cc2f6b3bf 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -51,7 +51,7 @@ static RECORD_TYPE: &str = "iceberg_type"; /// Sql catalog config #[derive(Debug, TypedBuilder)] pub struct SqlCatalogConfig { - url: String, + uri: String, name: String, warehouse: String, #[builder(default)] @@ -91,7 +91,7 @@ impl SqlCatalog { .max_connections(max_connections) .idle_timeout(Duration::from_secs(idle_timeout)) .test_before_acquire(test_before_acquire) - .connect(&config.url) + .connect(&config.uri) .await .map_err(from_sqlx_error)?; @@ -445,14 +445,14 @@ pub mod tests { let warehouse_root = dir.path().to_str().unwrap(); //name of the database should be part of the url. usually for sqllite it creates or opens one if (.db found) - let sql_lite_url = "sqlite://iceberg"; + let sql_lite_uri = "sqlite://iceberg"; - if !sqlx::Sqlite::database_exists(sql_lite_url).await.unwrap() { - sqlx::Sqlite::create_database(sql_lite_url).await.unwrap(); + if !sqlx::Sqlite::database_exists(sql_lite_uri).await.unwrap() { + sqlx::Sqlite::create_database(sql_lite_uri).await.unwrap(); } let config = SqlCatalogConfig::builder() - .url(sql_lite_url.to_string()) + .uri(sql_lite_uri.to_string()) .name("iceberg".to_string()) .warehouse(warehouse_root.to_owned()) .build(); @@ -505,6 +505,6 @@ pub mod tests { assert!(table.metadata().location().ends_with("/warehouse/table1")); //tear down the database and tables - sqlx::Sqlite::drop_database(sql_lite_url).await.unwrap(); + sqlx::Sqlite::drop_database(sql_lite_uri).await.unwrap(); } } From 214274ac7acb5ebdc3848ed9b5e3472575c4131a Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Fri, 26 Apr 2024 21:08:28 +0200 Subject: [PATCH 24/35] use features for sqlx database support --- crates/catalog/sql/Cargo.toml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 40fba3f1d..8e03cf6e6 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -40,7 +40,7 @@ opendal = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -sqlx = { version = "0.7.4", features = ["tls-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } +sqlx = { version = "0.7.4", features = ["tls-rustls", "any" ], default-features = false } typed-builder = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } @@ -48,6 +48,11 @@ uuid = { workspace = true, features = ["v4"] } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } -sqlx = { version = "0.7.4", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "postgres", "mysql","migrate"], default-features = false } +sqlx = { version = "0.7.4", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "migrate"], default-features = false } tempfile = { workspace = true } tokio = { workspace = true } + +[features] +sqlite = ["sqlx/sqlite"] +postgres = ["sqlx/postgres"] +mysql = ["sqlx/mysql"] From 23db91193233896291bf62f847481e6798578a9d Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Fri, 26 Apr 2024 21:13:08 +0200 Subject: [PATCH 25/35] use statics for pool default values --- crates/catalog/sql/src/catalog.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index cc2f6b3bf..401a7655b 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -48,6 +48,10 @@ static METADATA_LOCATION_PROP: &str = "metadata_location"; static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; static RECORD_TYPE: &str = "iceberg_type"; +static MAX_CONNECTIONS: u32 = 10; +static IDLE_TIMEOUT: u64 = 10; +static TEST_BEFORE_AQUIRE = true; + /// Sql catalog config #[derive(Debug, TypedBuilder)] pub struct SqlCatalogConfig { @@ -75,17 +79,17 @@ impl SqlCatalog { .props .get("pool.max-connections") .map(|v| v.parse().unwrap()) - .unwrap_or(10); + .unwrap_or(MAX_CONNECTIONS); let idle_timeout: u64 = config .props .get("pool.idle-timeout") .map(|v| v.parse().unwrap()) - .unwrap_or(10); + .unwrap_or(IDLE_TIMEOUT); let test_before_acquire: bool = config .props .get("pool.test-before-acquire") .map(|v| v.parse().unwrap()) - .unwrap_or(true); + .unwrap_or(TEST_BEFORE_AQUIRE); let pool = AnyPoolOptions::new() .max_connections(max_connections) From 75bf2f3e6b110af720325b5a41ad8ccc19cbe091 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Fri, 26 Apr 2024 21:14:29 +0200 Subject: [PATCH 26/35] move derive Default --- crates/iceberg/src/spec/table_metadata.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index b907b0556..837d290c7 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -863,10 +863,9 @@ pub(super) mod _serde { } } -#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy, Default)] #[repr(u8)] /// Iceberg format version -#[derive(Default)] pub enum FormatVersion { /// Iceberg spec version 1 V1 = 1u8, From 841f91603335d94da8d71b5f2ae52a39099fbb13 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Fri, 26 Apr 2024 21:20:58 +0200 Subject: [PATCH 27/35] create new tempdir for every test run --- crates/catalog/sql/src/catalog.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 401a7655b..f4df30527 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -50,7 +50,7 @@ static RECORD_TYPE: &str = "iceberg_type"; static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; -static TEST_BEFORE_AQUIRE = true; +static TEST_BEFORE_AQUIRE: bool = true; /// Sql catalog config #[derive(Debug, TypedBuilder)] @@ -186,11 +186,11 @@ impl Catalog for SqlCatalog { &self, _parent: Option<&NamespaceIdent>, ) -> Result> { - let name = self.name.clone(); + let name = &self.name; let rows = sqlx::query( "select distinct table_namespace from iceberg_tables where catalog_name = ?;", ) - .bind(&name) + .bind(name) .fetch_all(&self.connection) .await .map_err(from_sqlx_error)?; @@ -445,7 +445,7 @@ pub mod tests { #[tokio::test] async fn test_create_update_drop_table() { - let dir = TempDir::with_prefix("sql-test").unwrap(); + let dir = TempDir::new().unwrap(); let warehouse_root = dir.path().to_str().unwrap(); //name of the database should be part of the url. usually for sqllite it creates or opens one if (.db found) From fe1ea0c33ad4d7512dd7035bfb2306bb348fe401 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Fri, 26 Apr 2024 21:45:43 +0200 Subject: [PATCH 28/35] use table record type --- crates/catalog/sql/src/catalog.rs | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index f4df30527..96d6bd94e 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -47,6 +47,7 @@ static TABLE_NAMESPACE: &str = "table_namespace"; static METADATA_LOCATION_PROP: &str = "metadata_location"; static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; static RECORD_TYPE: &str = "iceberg_type"; +static TABLE_RECORD_TYPE: &str = "TABLE"; static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; @@ -121,7 +122,7 @@ impl SqlCatalog { + ", " + TABLE_NAME + ") - );"), + );"), ) .execute(&pool) .await @@ -253,7 +254,13 @@ impl Catalog for SqlCatalog { + CATALOG_NAME + " = ? and " + TABLE_NAMESPACE - + "= ?;"), + + " = ? and (" + + RECORD_TYPE + + " = '" + + TABLE_RECORD_TYPE + + "' or " + + RECORD_TYPE + + " is null);"), ) .bind(&name) .bind(&namespace) @@ -295,7 +302,13 @@ impl Catalog for SqlCatalog { + TABLE_NAMESPACE + " = ? and " + TABLE_NAME - + " = ?;"), + + " = ? and (" + + RECORD_TYPE + + " = '" + + TABLE_RECORD_TYPE + + "' or " + + RECORD_TYPE + + " is null);"), ) .bind(&catalog_name) .bind(&namespace) @@ -334,7 +347,13 @@ impl Catalog for SqlCatalog { + TABLE_NAMESPACE + " = ? and " + TABLE_NAME - + " = ?;"), + + " = ? and (" + + RECORD_TYPE + + " = '" + + TABLE_RECORD_TYPE + + "' or " + + RECORD_TYPE + + " is null);"), ) .bind(&catalog_name) .bind(&namespace) @@ -445,7 +464,7 @@ pub mod tests { #[tokio::test] async fn test_create_update_drop_table() { - let dir = TempDir::new().unwrap(); + let dir = TempDir::with_prefix("sql-test").unwrap(); let warehouse_root = dir.path().to_str().unwrap(); //name of the database should be part of the url. usually for sqllite it creates or opens one if (.db found) From 8851e3ce1b7c1b502906a78b78a7fe3662b798ff Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 29 Apr 2024 21:07:27 +0200 Subject: [PATCH 29/35] use parent for list namespaces --- crates/catalog/sql/src/catalog.rs | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 96d6bd94e..afc4d58f0 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -185,16 +185,26 @@ fn query_map(row: &AnyRow) -> std::result::Result { impl Catalog for SqlCatalog { async fn list_namespaces( &self, - _parent: Option<&NamespaceIdent>, + parent: Option<&NamespaceIdent>, ) -> Result> { let name = &self.name; - let rows = sqlx::query( - "select distinct table_namespace from iceberg_tables where catalog_name = ?;", - ) - .bind(name) - .fetch_all(&self.connection) - .await - .map_err(from_sqlx_error)?; + let rows = match parent { + None => sqlx::query( + "select distinct table_namespace from iceberg_tables where catalog_name = ?;", + ) + .bind(name) + .fetch_all(&self.connection) + .await + .map_err(from_sqlx_error)?, + Some(parent) => sqlx::query( + "select distinct table_namespace from iceberg_tables where catalog_name = ? and table_namespace like ?%;", + ) + .bind(name) + .bind(parent.join(".")) + .fetch_all(&self.connection) + .await + .map_err(from_sqlx_error)?, + }; let iter = rows.iter().map(|row| row.try_get::(0)); Ok(iter From f1d05827de14f93289877b262a63fbdb4dc61408 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 29 Apr 2024 21:11:09 +0200 Subject: [PATCH 30/35] fix typo --- crates/catalog/sql/src/catalog.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index afc4d58f0..49e114746 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -51,7 +51,7 @@ static TABLE_RECORD_TYPE: &str = "TABLE"; static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; -static TEST_BEFORE_AQUIRE: bool = true; +static TEST_BEFORE_ACQUIRE: bool = true; /// Sql catalog config #[derive(Debug, TypedBuilder)] @@ -90,7 +90,7 @@ impl SqlCatalog { .props .get("pool.test-before-acquire") .map(|v| v.parse().unwrap()) - .unwrap_or(TEST_BEFORE_AQUIRE); + .unwrap_or(TEST_BEFORE_ACQUIRE); let pool = AnyPoolOptions::new() .max_connections(max_connections) From 4967b7604675110c541d4be3a7827678ebefc756 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 29 Apr 2024 21:12:24 +0200 Subject: [PATCH 31/35] fix clippy warning --- crates/catalog/sql/src/catalog.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 49e114746..b5f5dd09b 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -210,7 +210,7 @@ impl Catalog for SqlCatalog { Ok(iter .map(|x| { x.and_then(|y| { - NamespaceIdent::from_strs(y.split(".")) + NamespaceIdent::from_strs(y.split('.')) .map_err(|err| sqlx::Error::Decode(Box::new(err))) }) }) @@ -282,7 +282,7 @@ impl Catalog for SqlCatalog { Ok(iter .map(|x| { x.and_then(|y| { - let namespace = NamespaceIdent::from_strs(y.table_namespace.split(".")) + let namespace = NamespaceIdent::from_strs(y.table_namespace.split('.')) .map_err(|err| sqlx::Error::Decode(Box::new(err)))?; Ok(TableIdent::new(namespace, y.table_name)) }) From 6975123b8d9ed3d58a21b431536ce470438d8b14 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Mon, 29 Apr 2024 21:29:29 +0200 Subject: [PATCH 32/35] use statics for namespace sql --- crates/catalog/sql/src/catalog.rs | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index b5f5dd09b..51e201c2c 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -49,6 +49,11 @@ static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; static RECORD_TYPE: &str = "iceberg_type"; static TABLE_RECORD_TYPE: &str = "TABLE"; +static NAMESPACE_PROPERTIES_TABLE_NAME: &str = "iceberg_namespace_properties"; +static NAMESPACE_NAME: &str = "namespace"; +static NAMESPACE_PROPERTY_KEY: &str = "property_key"; +static NAMESPACE_PROPERTY_VALUE: &str = "property_value"; + static MAX_CONNECTIONS: u32 = 10; static IDLE_TIMEOUT: u64 = 10; static TEST_BEFORE_ACQUIRE: bool = true; @@ -129,13 +134,23 @@ impl SqlCatalog { .map_err(from_sqlx_error)?; sqlx::query( - "create table if not exists iceberg_namespace_properties ( - catalog_name varchar(255) not null, - namespace varchar(255) not null, - property_key varchar(255), - property_value varchar(255), - primary key (catalog_name, namespace, property_key) - );", + &("create table if not exists ".to_owned() + + NAMESPACE_PROPERTIES_TABLE_NAME + + " ( " + + CATALOG_NAME + + " varchar(255) not null, " + + NAMESPACE_NAME + + " varchar(255) not null, " + + NAMESPACE_PROPERTY_KEY + + " varchar(255), " + + NAMESPACE_PROPERTY_VALUE + + " varchar(255), primary key (" + + CATALOG_NAME + + ", " + + NAMESPACE_NAME + + ", " + + NAMESPACE_PROPERTY_KEY + + ") );"), ) .execute(&pool) .await From bef93b2a37bc12024335e82b08f78900184dea93 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Thu, 20 Jun 2024 11:36:19 +0200 Subject: [PATCH 33/35] create namespace --- crates/catalog/sql/src/catalog.rs | 40 ++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 51e201c2c..1cfaf42ea 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -235,10 +235,37 @@ impl Catalog for SqlCatalog { async fn create_namespace( &self, - _namespace: &NamespaceIdent, - _properties: HashMap, + namespace: &NamespaceIdent, + properties: HashMap, ) -> Result { - todo!() + { + let catalog_name = self.name.clone(); + let namespace = namespace.encode_in_url(); + + let query_string = "insert into ".to_string() + + NAMESPACE_PROPERTIES_TABLE_NAME + + " (" + + CATALOG_NAME + + ", " + + TABLE_NAMESPACE + + ", " + + NAMESPACE_PROPERTY_KEY + + ", " + + NAMESPACE_PROPERTY_VALUE + + ") values (?, ?, ?, ?);"; + for (key, value) in properties.iter() { + sqlx::query(&query_string) + .bind(&catalog_name) + .bind(&namespace) + .bind(&key) + .bind(&value) + .execute(&self.connection) + .await + .map_err(from_sqlx_error)?; + } + } + + Ok(Namespace::with_properties(namespace.clone(), properties)) } async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result { @@ -478,6 +505,8 @@ impl Catalog for SqlCatalog { #[cfg(test)] pub mod tests { + use std::collections::HashMap; + use iceberg::{ spec::{NestedField, PrimitiveType, Schema, Type}, Catalog, NamespaceIdent, TableCreation, TableIdent, @@ -509,6 +538,11 @@ pub mod tests { let namespace = NamespaceIdent::new("test".to_owned()); + catalog + .create_namespace(&namespace, HashMap::new()) + .await + .unwrap(); + let identifier = TableIdent::new(namespace.clone(), "table1".to_owned()); let schema = Schema::builder() From 4bf7e2f63b7ca2fcf42948847a8a72362367283d Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Thu, 20 Jun 2024 11:52:52 +0200 Subject: [PATCH 34/35] remove cache --- crates/catalog/sql/Cargo.toml | 1 - crates/catalog/sql/src/catalog.rs | 8 -------- 2 files changed, 9 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 8e03cf6e6..ddba25397 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -32,7 +32,6 @@ keywords = ["iceberg", "sql", "catalog"] anyhow = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } -dashmap = "5.5.3" futures = { workspace = true } iceberg = { workspace = true } log = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 1cfaf42ea..a57d928d0 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -15,10 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - use async_trait::async_trait; -use dashmap::DashMap; use futures::{AsyncReadExt, AsyncWriteExt}; use sqlx::any::AnyPoolOptions; use sqlx::{ @@ -74,7 +71,6 @@ pub struct SqlCatalog { name: String, connection: AnyPool, fileio: FileIO, - cache: Arc>, } impl SqlCatalog { @@ -164,7 +160,6 @@ impl SqlCatalog { name: config.name.to_owned(), connection: pool, fileio: file_io, - cache: Arc::new(DashMap::new()), }) } } @@ -424,9 +419,6 @@ impl Catalog for SqlCatalog { let metadata: TableMetadata = serde_json::from_str(&json)?; - self.cache - .insert(identifier.clone(), (metadata_location, metadata.clone())); - let table = Table::builder() .file_io(self.fileio.clone()) .identifier(identifier.clone()) From 75898b5b099620e2d22fb295b188ae1231c43d3f Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Thu, 20 Jun 2024 21:33:58 +0200 Subject: [PATCH 35/35] fix list_namespaces --- crates/catalog/sql/src/catalog.rs | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index a57d928d0..e96502653 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -200,14 +200,28 @@ impl Catalog for SqlCatalog { let name = &self.name; let rows = match parent { None => sqlx::query( - "select distinct table_namespace from iceberg_tables where catalog_name = ?;", + &("select distinct ".to_string() + + NAMESPACE_NAME + + " from " + + NAMESPACE_PROPERTIES_TABLE_NAME + + " where " + + CATALOG_NAME + + " = ?;"), ) .bind(name) .fetch_all(&self.connection) .await .map_err(from_sqlx_error)?, Some(parent) => sqlx::query( - "select distinct table_namespace from iceberg_tables where catalog_name = ? and table_namespace like ?%;", + &("select distinct ".to_string() + + NAMESPACE_NAME + + " from " + + NAMESPACE_PROPERTIES_TABLE_NAME + + " where " + + CATALOG_NAME + + " = ? and " + + NAMESPACE_NAME + + "table_namespace like ?%;"), ) .bind(name) .bind(parent.join(".")) @@ -242,12 +256,20 @@ impl Catalog for SqlCatalog { + " (" + CATALOG_NAME + ", " - + TABLE_NAMESPACE + + NAMESPACE_NAME + ", " + NAMESPACE_PROPERTY_KEY + ", " + NAMESPACE_PROPERTY_VALUE + ") values (?, ?, ?, ?);"; + sqlx::query(&query_string) + .bind(&catalog_name) + .bind(&namespace) + .bind(&None::) + .bind(&None::) + .execute(&self.connection) + .await + .map_err(from_sqlx_error)?; for (key, value) in properties.iter() { sqlx::query(&query_string) .bind(&catalog_name)