Skip to content

Commit

Permalink
run test
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Apr 8, 2024
1 parent d87edea commit 042fd03
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
1 change: 1 addition & 0 deletions crates/catalog/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,6 @@ uuid = { workspace = true, features = ["v4"] }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
sqlx = { version = "0.7.2", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "postgres", "mysql"], default-features = false }
tempfile = { workspace = true }
tokio = { workspace = true }
44 changes: 30 additions & 14 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use dashmap::DashMap;
use futures::{AsyncReadExt, AsyncWriteExt};
use sqlx::{
any::{install_default_drivers, AnyRow},
AnyPool, Row,
AnyPool, Connection, Row,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -50,32 +50,48 @@ impl SqlCatalog {

let pool = AnyPool::connect(url).await.map_err(from_sqlx_error)?;

sqlx::query(
"create table if not exists iceberg_tables (
let mut connection = pool.acquire().await.map_err(from_sqlx_error)?;

connection
.transaction(|txn| {
Box::pin(async move {
sqlx::query(
"create table if not exists iceberg_tables (
catalog_name text not null,
table_namespace text not null,
table_name text not null,
metadata_location text not null,
metadata_location text,
previous_metadata_location text,
primary key (catalog_name, table_namespace, table_name)
);",
)
.execute(&pool)
.await
.map_err(from_sqlx_error)?;
)
.fetch_all(&mut **txn)
.await
})
})
.await
.map_err(from_sqlx_error)?;

sqlx::query(
"create table if not exists iceberg_namespace_properties (
connection
.transaction(|txn| {
Box::pin(async move {
sqlx::query(
"create table if not exists iceberg_namespace_properties (
catalog_name text not null,
namespace text not null,
property_key text,
property_value text,
primary key (catalog_name, namespace, property_key)
);",
)
.execute(&pool)
.await
.map_err(from_sqlx_error)?;
)
.fetch_all(&mut **txn)
.await
})
})
.await
.map_err(from_sqlx_error)?;

connection.close().await.map_err(from_sqlx_error)?;

Ok(SqlCatalog {
name: name.to_owned(),
Expand Down
2 changes: 1 addition & 1 deletion crates/catalog/sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use iceberg::{Error, ErrorKind};
pub fn from_sqlx_error(error: sqlx::Error) -> Error {
Error::new(
ErrorKind::Unexpected,
"operation failed for hitting io error".to_string(),
"operation failed for hitting sqlx error".to_string(),
)
.with_source(error)
}

0 comments on commit 042fd03

Please sign in to comment.