Skip to content

Commit 3e1d59d

Browse files
ZENOTMEZENOTMEliurenjie1024
authored
feat: add apply in transaction to support stack action (#949)
This PR resolve: #596. I refer the implementation from pyiceberg: https://github.com/apache/iceberg-python/blob/4d648bb834963efba886d62e79fa3e1e26288dd0/pyiceberg/table/__init__.py#L258 --------- Co-authored-by: ZENOTME <[email protected]> Co-authored-by: Renjie Liu <[email protected]>
1 parent 53ceb2b commit 3e1d59d

File tree

5 files changed

+146
-89
lines changed

5 files changed

+146
-89
lines changed

crates/iceberg/src/table.rs

+4
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ pub struct Table {
162162
}
163163

164164
impl Table {
165+
pub(crate) fn with_metadata(&mut self, metadata: TableMetadataRef) {
166+
self.metadata = metadata;
167+
}
168+
165169
/// Returns a TableBuilder to build a table
166170
pub fn builder() -> TableBuilder {
167171
TableBuilder::new()

crates/iceberg/src/transaction/append.rs

+14-9
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl<'a> FastAppendAction<'a> {
8383
if !self
8484
.snapshot_produce_action
8585
.tx
86-
.table
86+
.current_table
8787
.metadata()
8888
.default_spec
8989
.is_unpartitioned()
@@ -94,10 +94,10 @@ impl<'a> FastAppendAction<'a> {
9494
));
9595
}
9696

97-
let table_metadata = self.snapshot_produce_action.tx.table.metadata();
97+
let table_metadata = self.snapshot_produce_action.tx.current_table.metadata();
9898

9999
let data_files = ParquetWriter::parquet_files_to_data_files(
100-
self.snapshot_produce_action.tx.table.file_io(),
100+
self.snapshot_produce_action.tx.current_table.file_io(),
101101
file_path,
102102
table_metadata,
103103
)
@@ -122,7 +122,7 @@ impl<'a> FastAppendAction<'a> {
122122
let mut manifest_stream = self
123123
.snapshot_produce_action
124124
.tx
125-
.table
125+
.current_table
126126
.inspect()
127127
.manifests()
128128
.scan()
@@ -184,14 +184,19 @@ impl SnapshotProduceOperation for FastAppendOperation {
184184
&self,
185185
snapshot_produce: &SnapshotProduceAction<'_>,
186186
) -> Result<Vec<ManifestFile>> {
187-
let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else {
187+
let Some(snapshot) = snapshot_produce
188+
.tx
189+
.current_table
190+
.metadata()
191+
.current_snapshot()
192+
else {
188193
return Ok(vec![]);
189194
};
190195

191196
let manifest_list = snapshot
192197
.load_manifest_list(
193-
snapshot_produce.tx.table.file_io(),
194-
&snapshot_produce.tx.table.metadata_ref(),
198+
snapshot_produce.tx.current_table.file_io(),
199+
&snapshot_produce.tx.current_table.metadata_ref(),
195200
)
196201
.await?;
197202

@@ -253,11 +258,11 @@ mod tests {
253258
assert_eq!(
254259
vec![
255260
TableRequirement::UuidMatch {
256-
uuid: tx.table.metadata().uuid()
261+
uuid: table.metadata().uuid()
257262
},
258263
TableRequirement::RefSnapshotIdMatch {
259264
r#ref: MAIN_BRANCH.to_string(),
260-
snapshot_id: tx.table.metadata().current_snapshot_id
265+
snapshot_id: table.metadata().current_snapshot_id
261266
}
262267
],
263268
tx.requirements

crates/iceberg/src/transaction/mod.rs

+65-36
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod sort_order;
2424
use std::cmp::Ordering;
2525
use std::collections::HashMap;
2626
use std::mem::discriminant;
27+
use std::sync::Arc;
2728

2829
use uuid::Uuid;
2930

@@ -37,7 +38,8 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat
3738

3839
/// Table transaction.
3940
pub struct Transaction<'a> {
40-
table: &'a Table,
41+
base_table: &'a Table,
42+
current_table: Table,
4143
updates: Vec<TableUpdate>,
4244
requirements: Vec<TableRequirement>,
4345
}
@@ -46,38 +48,60 @@ impl<'a> Transaction<'a> {
4648
/// Creates a new transaction.
4749
pub fn new(table: &'a Table) -> Self {
4850
Self {
49-
table,
51+
base_table: table,
52+
current_table: table.clone(),
5053
updates: vec![],
5154
requirements: vec![],
5255
}
5356
}
5457

55-
fn append_updates(&mut self, updates: Vec<TableUpdate>) -> Result<()> {
56-
for update in &updates {
57-
for up in &self.updates {
58-
if discriminant(up) == discriminant(update) {
59-
return Err(Error::new(
60-
ErrorKind::DataInvalid,
61-
format!(
62-
"Cannot apply update with same type at same time: {:?}",
63-
update
64-
),
65-
));
66-
}
67-
}
58+
fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> {
59+
let mut metadata_builder = self.current_table.metadata().clone().into_builder(None);
60+
for update in updates {
61+
metadata_builder = update.clone().apply(metadata_builder)?;
6862
}
69-
self.updates.extend(updates);
63+
64+
self.current_table
65+
.with_metadata(Arc::new(metadata_builder.build()?.metadata));
66+
7067
Ok(())
7168
}
7269

73-
fn append_requirements(&mut self, requirements: Vec<TableRequirement>) -> Result<()> {
74-
self.requirements.extend(requirements);
70+
fn apply(
71+
&mut self,
72+
updates: Vec<TableUpdate>,
73+
requirements: Vec<TableRequirement>,
74+
) -> Result<()> {
75+
for requirement in &requirements {
76+
requirement.check(Some(self.current_table.metadata()))?;
77+
}
78+
79+
self.update_table_metadata(&updates)?;
80+
81+
self.updates.extend(updates);
82+
83+
// For the requirements, it does not make sense to add a requirement more than once
84+
// For example, you cannot assert that the current schema has two different IDs
85+
for new_requirement in requirements {
86+
if self
87+
.requirements
88+
.iter()
89+
.map(discriminant)
90+
.all(|d| d != discriminant(&new_requirement))
91+
{
92+
self.requirements.push(new_requirement);
93+
}
94+
}
95+
96+
// # TODO
97+
// Support auto commit later.
98+
7599
Ok(())
76100
}
77101

78102
/// Sets table to a new version.
79103
pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result<Self> {
80-
let current_version = self.table.metadata().format_version();
104+
let current_version = self.current_table.metadata().format_version();
81105
match current_version.cmp(&format_version) {
82106
Ordering::Greater => {
83107
return Err(Error::new(
@@ -89,7 +113,7 @@ impl<'a> Transaction<'a> {
89113
));
90114
}
91115
Ordering::Less => {
92-
self.append_updates(vec![UpgradeFormatVersion { format_version }])?;
116+
self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?;
93117
}
94118
Ordering::Equal => {
95119
// Do nothing.
@@ -100,7 +124,7 @@ impl<'a> Transaction<'a> {
100124

101125
/// Update table's property.
102126
pub fn set_properties(mut self, props: HashMap<String, String>) -> Result<Self> {
103-
self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?;
127+
self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?;
104128
Ok(self)
105129
}
106130

@@ -116,7 +140,7 @@ impl<'a> Transaction<'a> {
116140
};
117141
let mut snapshot_id = generate_random_id();
118142
while self
119-
.table
143+
.current_table
120144
.metadata()
121145
.snapshots()
122146
.any(|s| s.snapshot_id() == snapshot_id)
@@ -152,14 +176,17 @@ impl<'a> Transaction<'a> {
152176

153177
/// Remove properties in table.
154178
pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
155-
self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?;
179+
self.apply(
180+
vec![TableUpdate::RemoveProperties { removals: keys }],
181+
vec![],
182+
)?;
156183
Ok(self)
157184
}
158185

159186
/// Commit transaction.
160187
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
161188
let table_commit = TableCommit::builder()
162-
.ident(self.table.identifier().clone())
189+
.ident(self.base_table.identifier().clone())
163190
.updates(self.updates)
164191
.requirements(self.requirements)
165192
.build();
@@ -308,19 +335,21 @@ mod tests {
308335
);
309336
}
310337

311-
#[test]
312-
fn test_do_same_update_in_same_transaction() {
313-
let table = make_v2_table();
338+
#[tokio::test]
339+
async fn test_transaction_apply_upgrade() {
340+
let table = make_v1_table();
314341
let tx = Transaction::new(&table);
315-
let tx = tx
316-
.remove_properties(vec!["a".to_string(), "b".to_string()])
317-
.unwrap();
318-
319-
let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]);
320-
321-
assert!(
322-
tx.is_err(),
323-
"Should not allow to do same kinds update in same transaction"
342+
// Upgrade v1 to v1, do nothing.
343+
let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap();
344+
// Upgrade v1 to v2, success.
345+
let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
346+
assert_eq!(
347+
vec![TableUpdate::UpgradeFormatVersion {
348+
format_version: FormatVersion::V2
349+
}],
350+
tx.updates
324351
);
352+
// Upgrade v2 to v1, return error.
353+
assert!(tx.upgrade_table_version(FormatVersion::V1).is_err());
325354
}
326355
}

0 commit comments

Comments
 (0)