From 6dc7753eac84c7950e41050a969d9f42e824b6d3 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 1 Apr 2025 22:32:57 -0400 Subject: [PATCH 1/2] feat: Add basic conflict detection --- crates/iceberg/src/table.rs | 44 +++++++++++++++++++++++---- crates/iceberg/src/transaction/mod.rs | 8 +++-- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index ebee670f4..39d4a89bd 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -19,32 +19,36 @@ use std::sync::Arc; +use futures::TryFutureExt; + use crate::arrow::ArrowReaderBuilder; use crate::inspect::MetadataTable; use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; -use crate::{Error, ErrorKind, Result, TableIdent}; +use crate::{Catalog, Error, ErrorKind, Result, TableIdent}; /// Builder to create table scan. -pub struct TableBuilder { +pub struct TableBuilder<'a> { file_io: Option, metadata_location: Option, metadata: Option, identifier: Option, + catalog: Option<&'a dyn Catalog>, readonly: bool, disable_cache: bool, cache_size_bytes: Option, } -impl TableBuilder { +impl<'a> TableBuilder<'a> { pub(crate) fn new() -> Self { Self { file_io: None, metadata_location: None, metadata: None, identifier: None, + catalog: None, readonly: false, disable_cache: false, cache_size_bytes: None, @@ -75,6 +79,12 @@ impl TableBuilder { self } + /// required - passes in the reference to the Catalog to use for the Table + pub fn catalog(mut self, catalog: &'a dyn Catalog) -> Self { + self.catalog = Some(catalog); + self + } + /// specifies if the Table is readonly or not (default not) pub fn readonly(mut self, readonly: bool) -> Self { self.readonly = readonly; @@ -102,6 +112,7 @@ impl TableBuilder { metadata_location, metadata, identifier, + catalog, readonly, disable_cache, cache_size_bytes, @@ -128,6 +139,13 @@ impl TableBuilder { )); }; + let Some(catalog) = catalog else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Catalog must be provided with TableBuilder.catalog()", + )); + }; + let object_cache = if disable_cache { Arc::new(ObjectCache::with_disabled_cache(file_io.clone())) } else if let Some(cache_size_bytes) = cache_size_bytes { @@ -144,6 +162,7 @@ impl TableBuilder { metadata_location, metadata, identifier, + catalog, readonly, object_cache, }) @@ -152,18 +171,19 @@ impl TableBuilder { /// Table represents a table in the catalog. #[derive(Debug, Clone)] -pub struct Table { +pub struct Table<'a> { file_io: FileIO, metadata_location: Option, metadata: TableMetadataRef, identifier: TableIdent, + catalog: &'a dyn Catalog, readonly: bool, object_cache: Arc, } -impl Table { +impl <'a> Table<'a> { /// Returns a TableBuilder to build a table - pub fn builder() -> TableBuilder { + pub fn builder<'b>() -> TableBuilder<'a> { TableBuilder::new() } @@ -216,6 +236,18 @@ impl Table { pub fn reader_builder(&self) -> ArrowReaderBuilder { ArrowReaderBuilder::new(self.file_io.clone()) } + + /// Returns latest table metadata and updates current table metadata + pub async fn refresh(&mut self) -> Result { + let table = self.catalog.load_table(self.identifier()).await.unwrap(); + let metadata: TableMetadata = (*table.metadata).clone(); + + self.metadata = Arc::new(metadata.clone()); + self.file_io = table.file_io.clone(); + self.metadata_location = table.metadata_location.clone(); + + Ok(metadata) + } } /// `StaticTable` is a read-only table struct that can be created from a metadata file or from `TableMetaData` without a catalog. diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index d3c7bc3f9..bee438924 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -37,14 +37,14 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat /// Table transaction. pub struct Transaction<'a> { - table: &'a Table, + table: &'a mut Table<'a>, updates: Vec, requirements: Vec, } impl<'a> Transaction<'a> { /// Creates a new transaction. - pub fn new(table: &'a Table) -> Self { + pub fn new(table: &'a mut Table) -> Self { Self { table, updates: vec![], @@ -127,12 +127,14 @@ impl<'a> Transaction<'a> { } /// Creates a fast append action. - pub fn fast_append( + pub async fn fast_append( self, commit_uuid: Option, key_metadata: Vec, ) -> Result> { let snapshot_id = self.generate_unique_snapshot_id(); + let _ = self.table.refresh().await?; + FastAppendAction::new( self, snapshot_id, From c03274126a7d4ca09752de8f2841cb92e9b29a67 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 8 Apr 2025 16:46:23 -0400 Subject: [PATCH 2/2] fixes --- crates/iceberg/src/table.rs | 18 ++++++++---------- crates/iceberg/src/transaction/mod.rs | 8 +++----- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 39d4a89bd..14679fe93 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -19,8 +19,6 @@ use std::sync::Arc; -use futures::TryFutureExt; - use crate::arrow::ArrowReaderBuilder; use crate::inspect::MetadataTable; use crate::io::object_cache::ObjectCache; @@ -30,18 +28,18 @@ use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Catalog, Error, ErrorKind, Result, TableIdent}; /// Builder to create table scan. -pub struct TableBuilder<'a> { +pub struct TableBuilder { file_io: Option, metadata_location: Option, metadata: Option, identifier: Option, - catalog: Option<&'a dyn Catalog>, + catalog: Option>, readonly: bool, disable_cache: bool, cache_size_bytes: Option, } -impl<'a> TableBuilder<'a> { +impl TableBuilder { pub(crate) fn new() -> Self { Self { file_io: None, @@ -80,7 +78,7 @@ impl<'a> TableBuilder<'a> { } /// required - passes in the reference to the Catalog to use for the Table - pub fn catalog(mut self, catalog: &'a dyn Catalog) -> Self { + pub fn catalog(mut self, catalog: Arc) -> Self { self.catalog = Some(catalog); self } @@ -171,19 +169,19 @@ impl<'a> TableBuilder<'a> { /// Table represents a table in the catalog. #[derive(Debug, Clone)] -pub struct Table<'a> { +pub struct Table { file_io: FileIO, metadata_location: Option, metadata: TableMetadataRef, identifier: TableIdent, - catalog: &'a dyn Catalog, + catalog: Arc, readonly: bool, object_cache: Arc, } -impl <'a> Table<'a> { +impl Table { /// Returns a TableBuilder to build a table - pub fn builder<'b>() -> TableBuilder<'a> { + pub fn builder<'b>() -> TableBuilder { TableBuilder::new() } diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index bee438924..d3c7bc3f9 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -37,14 +37,14 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat /// Table transaction. pub struct Transaction<'a> { - table: &'a mut Table<'a>, + table: &'a Table, updates: Vec, requirements: Vec, } impl<'a> Transaction<'a> { /// Creates a new transaction. - pub fn new(table: &'a mut Table) -> Self { + pub fn new(table: &'a Table) -> Self { Self { table, updates: vec![], @@ -127,14 +127,12 @@ impl<'a> Transaction<'a> { } /// Creates a fast append action. - pub async fn fast_append( + pub fn fast_append( self, commit_uuid: Option, key_metadata: Vec, ) -> Result> { let snapshot_id = self.generate_unique_snapshot_id(); - let _ = self.table.refresh().await?; - FastAppendAction::new( self, snapshot_id,