From 0132484ede25ef973fa1312abb5764bc0c443886 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 5 Mar 2025 15:46:08 +0800 Subject: [PATCH 1/3] add apply in transaction to support stack action --- crates/iceberg/src/transaction.rs | 202 ++++++++++++++++++------------ 1 file changed, 121 insertions(+), 81 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 007a3745f..3dba221a8 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -44,7 +44,8 @@ const META_ROOT_PATH: &str = "metadata"; /// Table transaction. pub struct Transaction<'a> { - table: &'a Table, + base_table: &'a Table, + current_metadata: TableMetadata, updates: Vec, requirements: Vec, } @@ -53,38 +54,59 @@ impl<'a> Transaction<'a> { /// Creates a new transaction. pub fn new(table: &'a Table) -> Self { Self { - table, + base_table: table, + current_metadata: table.metadata().clone(), updates: vec![], requirements: vec![], } } - fn append_updates(&mut self, updates: Vec) -> Result<()> { - for update in &updates { - for up in &self.updates { - if discriminant(up) == discriminant(update) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot apply update with same type at same time: {:?}", - update - ), - )); - } - } + fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> { + let mut metadata_builder = self.current_metadata.clone().into_builder(None); + for update in updates { + metadata_builder = update.clone().apply(metadata_builder)?; } - self.updates.extend(updates); + + self.current_metadata = metadata_builder.build()?.metadata; + Ok(()) } - fn append_requirements(&mut self, requirements: Vec) -> Result<()> { - self.requirements.extend(requirements); + fn apply( + &mut self, + updates: Vec, + requirements: Vec, + ) -> Result<()> { + for requirement in &requirements { + requirement.check(Some(&self.current_metadata))?; + } + + self.update_table_metadata(&updates)?; + + self.updates.extend(updates); + + // For the requirements, it does not make sense to add a requirement more than once + // For example, you cannot assert that the current schema has two different IDs + for new_requirement in requirements { + if self + .requirements + .iter() + .map(discriminant) + .all(|d| d != discriminant(&new_requirement)) + { + self.requirements.push(new_requirement); + } + } + + // # TODO + // Support auto commit later. + Ok(()) } /// Sets table to a new version. pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { - let current_version = self.table.metadata().format_version(); + let current_version = self.current_metadata.format_version(); match current_version.cmp(&format_version) { Ordering::Greater => { return Err(Error::new( @@ -96,7 +118,7 @@ impl<'a> Transaction<'a> { )); } Ordering::Less => { - self.append_updates(vec![UpgradeFormatVersion { format_version }])?; + self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?; } Ordering::Equal => { // Do nothing. @@ -107,7 +129,7 @@ impl<'a> Transaction<'a> { /// Update table's property. pub fn set_properties(mut self, props: HashMap) -> Result { - self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?; + self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?; Ok(self) } @@ -123,8 +145,7 @@ impl<'a> Transaction<'a> { }; let mut snapshot_id = generate_random_id(); while self - .table - .metadata() + .current_metadata .snapshots() .any(|s| s.snapshot_id() == snapshot_id) { @@ -159,14 +180,17 @@ impl<'a> Transaction<'a> { /// Remove properties in table. pub fn remove_properties(mut self, keys: Vec) -> Result { - self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?; + self.apply( + vec![TableUpdate::RemoveProperties { removals: keys }], + vec![], + )?; Ok(self) } /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { let table_commit = TableCommit::builder() - .ident(self.table.identifier().clone()) + .ident(self.base_table.identifier().clone()) .updates(self.updates) .requirements(self.requirements) .build(); @@ -215,8 +239,7 @@ impl<'a> FastAppendAction<'a> { if !self .snapshot_produce_action .tx - .table - .metadata() + .current_metadata .default_spec .is_unpartitioned() { @@ -226,12 +249,10 @@ impl<'a> FastAppendAction<'a> { )); } - let table_metadata = self.snapshot_produce_action.tx.table.metadata(); - let data_files = ParquetWriter::parquet_files_to_data_files( - self.snapshot_produce_action.tx.table.file_io(), + self.snapshot_produce_action.tx.base_table.file_io(), file_path, - table_metadata, + &self.snapshot_produce_action.tx.current_metadata, ) .await?; @@ -253,7 +274,7 @@ impl<'a> FastAppendAction<'a> { let mut manifest_stream = self .snapshot_produce_action .tx - .table + .base_table .inspect() .manifests() .scan() @@ -314,14 +335,14 @@ impl SnapshotProduceOperation for FastAppendOperation { &self, snapshot_produce: &SnapshotProduceAction<'_>, ) -> Result> { - let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else { + let Some(snapshot) = snapshot_produce.tx.current_metadata.current_snapshot() else { return Ok(vec![]); }; let manifest_list = snapshot .load_manifest_list( - snapshot_produce.tx.table.file_io(), - &snapshot_produce.tx.table.metadata_ref(), + snapshot_produce.tx.base_table.file_io(), + &snapshot_produce.tx.current_metadata, ) .await?; @@ -435,7 +456,7 @@ impl<'a> SnapshotProduceAction<'a> { for data_file in data_files { Self::validate_partition_value( data_file.partition(), - self.tx.table.metadata().default_partition_type(), + self.tx.current_metadata.default_partition_type(), )?; if data_file.content_type() == DataContentType::Data { self.added_data_files.push(data_file); @@ -449,13 +470,13 @@ impl<'a> SnapshotProduceAction<'a> { fn new_manifest_output(&mut self) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", - self.tx.table.metadata().location(), + self.tx.current_metadata.location(), META_ROOT_PATH, self.commit_uuid, self.manifest_counter.next().unwrap(), DataFileFormat::Avro ); - self.tx.table.file_io().new_output(new_manifest_path) + self.tx.base_table.file_io().new_output(new_manifest_path) } // Write manifest file for added data files and return the ManifestFile for ManifestList. @@ -464,6 +485,7 @@ impl<'a> SnapshotProduceAction<'a> { added_data_files: Vec, ) -> Result { let snapshot_id = self.snapshot_id; + let format_version = self.tx.current_metadata.format_version(); let content_type = { let mut data_num = 0; let mut delete_num = 0; @@ -489,7 +511,7 @@ impl<'a> SnapshotProduceAction<'a> { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) .data_file(data_file); - if self.tx.table.metadata().format_version() == FormatVersion::V1 { + if format_version == FormatVersion::V1 { builder.snapshot_id(snapshot_id).build() } else { // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when @@ -502,15 +524,14 @@ impl<'a> SnapshotProduceAction<'a> { self.new_manifest_output()?, Some(self.snapshot_id), self.key_metadata.clone(), - self.tx.table.metadata().current_schema().clone(), + self.tx.current_metadata.current_schema().clone(), self.tx - .table - .metadata() + .current_metadata .default_partition_spec() .as_ref() .clone(), ); - if self.tx.table.metadata().format_version() == FormatVersion::V1 { + if self.tx.current_metadata.format_version() == FormatVersion::V1 { builder.build_v1() } else { match content_type { @@ -560,7 +581,7 @@ impl<'a> SnapshotProduceAction<'a> { fn generate_manifest_list_file_path(&self, attempt: i64) -> String { format!( "{}/{}/snap-{}-{}-{}.{}", - self.tx.table.metadata().location(), + self.tx.current_metadata.location(), META_ROOT_PATH, self.snapshot_id, attempt, @@ -578,28 +599,28 @@ impl<'a> SnapshotProduceAction<'a> { let new_manifests = self .manifest_file(&snapshot_produce_operation, &process) .await?; - let next_seq_num = self.tx.table.metadata().next_sequence_number(); + let next_seq_num = self.tx.current_metadata.next_sequence_number(); let summary = self.summary(&snapshot_produce_operation); let manifest_list_path = self.generate_manifest_list_file_path(0); - let mut manifest_list_writer = match self.tx.table.metadata().format_version() { + let mut manifest_list_writer = match self.tx.current_metadata.format_version() { FormatVersion::V1 => ManifestListWriter::v1( self.tx - .table + .base_table .file_io() .new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.table.metadata().current_snapshot_id(), + self.tx.current_metadata.current_snapshot_id(), ), FormatVersion::V2 => ManifestListWriter::v2( self.tx - .table + .base_table .file_io() .new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.table.metadata().current_snapshot_id(), + self.tx.current_metadata.current_snapshot_id(), next_seq_num, ), }; @@ -610,34 +631,36 @@ impl<'a> SnapshotProduceAction<'a> { let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path) .with_snapshot_id(self.snapshot_id) - .with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id()) + .with_parent_snapshot_id(self.tx.current_metadata.current_snapshot_id()) .with_sequence_number(next_seq_num) .with_summary(summary) - .with_schema_id(self.tx.table.metadata().current_schema_id()) + .with_schema_id(self.tx.current_metadata.current_schema_id()) .with_timestamp_ms(commit_ts) .build(); - self.tx.append_updates(vec![ - TableUpdate::AddSnapshot { - snapshot: new_snapshot, - }, - TableUpdate::SetSnapshotRef { - ref_name: MAIN_BRANCH.to_string(), - reference: SnapshotReference::new( - self.snapshot_id, - SnapshotRetention::branch(None, None, None), - ), - }, - ])?; - self.tx.append_requirements(vec![ - TableRequirement::UuidMatch { - uuid: self.tx.table.metadata().uuid(), - }, - TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRANCH.to_string(), - snapshot_id: self.tx.table.metadata().current_snapshot_id(), - }, - ])?; + self.tx.apply( + vec![ + TableUpdate::AddSnapshot { + snapshot: new_snapshot.clone(), + }, + TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: SnapshotReference::new( + self.snapshot_id, + SnapshotRetention::branch(None, None, None), + ), + }, + ], + vec![ + TableRequirement::UuidMatch { + uuid: self.tx.current_metadata.uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: self.tx.current_metadata.current_snapshot_id(), + }, + ], + )?; Ok(self.tx) } } @@ -674,15 +697,14 @@ impl<'a> ReplaceSortOrderAction<'a> { let requirements = vec![ TableRequirement::CurrentSchemaIdMatch { - current_schema_id: self.tx.table.metadata().current_schema().schema_id(), + current_schema_id: self.tx.current_metadata.current_schema().schema_id(), }, TableRequirement::DefaultSortOrderIdMatch { - default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id, + default_sort_order_id: self.tx.current_metadata.default_sort_order().order_id, }, ]; - self.tx.append_requirements(requirements)?; - self.tx.append_updates(updates)?; + self.tx.apply(updates, requirements)?; Ok(self.tx) } @@ -694,8 +716,7 @@ impl<'a> ReplaceSortOrderAction<'a> { ) -> Result { let field_id = self .tx - .table - .metadata() + .current_metadata .current_schema() .field_id_by_name(name) .ok_or_else(|| { @@ -924,14 +945,15 @@ mod tests { assert!( matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) ); + // requriments is based on original table metadata assert_eq!( vec![ TableRequirement::UuidMatch { - uuid: tx.table.metadata().uuid() + uuid: table.metadata().uuid() }, TableRequirement::RefSnapshotIdMatch { r#ref: MAIN_BRANCH.to_string(), - snapshot_id: tx.table.metadata().current_snapshot_id + snapshot_id: table.metadata().current_snapshot_id() } ], tx.requirements @@ -1067,4 +1089,22 @@ mod tests { assert!(manifest_paths.contains(&path)); } } + + #[tokio::test] + async fn test_transaction_apply_upgrade() { + let table = make_v1_table(); + let tx = Transaction::new(&table); + // Upgrade v1 to v1, do nothing. + let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap(); + // Upgrade v1 to v2, success. + let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); + assert_eq!( + vec![TableUpdate::UpgradeFormatVersion { + format_version: FormatVersion::V2 + }], + tx.updates + ); + // Upgrade v2 to v1, return error. + assert!(tx.upgrade_table_version(FormatVersion::V1).is_err()); + } } From 21b3dae53dcbe61c709fd2bf1baa6e7d368b01b5 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 10 Mar 2025 03:21:04 +0800 Subject: [PATCH 2/3] fix test --- crates/iceberg/src/transaction.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 3dba221a8..09afba5b2 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -995,22 +995,6 @@ mod tests { assert_eq!(data_file, *manifest.entries()[0].data_file()); } - #[test] - fn test_do_same_update_in_same_transaction() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx - .remove_properties(vec!["a".to_string(), "b".to_string()]) - .unwrap(); - - let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]); - - assert!( - tx.is_err(), - "Should not allow to do same kinds update in same transaction" - ); - } - #[tokio::test] async fn test_add_existing_parquet_files_to_unpartitioned_table() { let mut fixture = TableTestFixture::new_unpartitioned(); From 645afbaa97478d273563ddb4f3d993b3cf1eff7c Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 12 Mar 2025 15:22:31 +0800 Subject: [PATCH 3/3] store current table instead of current metadata --- crates/iceberg/src/table.rs | 4 ++ crates/iceberg/src/transaction.rs | 94 +++++++++++++++++++------------ 2 files changed, 63 insertions(+), 35 deletions(-) diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index ebee670f4..d910b5c8f 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -162,6 +162,10 @@ pub struct Table { } impl Table { + pub(crate) fn with_metadata(&mut self, metadata: TableMetadataRef) { + self.metadata = metadata; + } + /// Returns a TableBuilder to build a table pub fn builder() -> TableBuilder { TableBuilder::new() diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 09afba5b2..46e86d6a9 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -22,6 +22,7 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::mem::discriminant; use std::ops::RangeFrom; +use std::sync::Arc; use arrow_array::StringArray; use futures::TryStreamExt; @@ -45,7 +46,7 @@ const META_ROOT_PATH: &str = "metadata"; /// Table transaction. pub struct Transaction<'a> { base_table: &'a Table, - current_metadata: TableMetadata, + current_table: Table, updates: Vec, requirements: Vec, } @@ -55,19 +56,20 @@ impl<'a> Transaction<'a> { pub fn new(table: &'a Table) -> Self { Self { base_table: table, - current_metadata: table.metadata().clone(), + current_table: table.clone(), updates: vec![], requirements: vec![], } } fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> { - let mut metadata_builder = self.current_metadata.clone().into_builder(None); + let mut metadata_builder = self.current_table.metadata().clone().into_builder(None); for update in updates { metadata_builder = update.clone().apply(metadata_builder)?; } - self.current_metadata = metadata_builder.build()?.metadata; + self.current_table + .with_metadata(Arc::new(metadata_builder.build()?.metadata)); Ok(()) } @@ -78,7 +80,7 @@ impl<'a> Transaction<'a> { requirements: Vec, ) -> Result<()> { for requirement in &requirements { - requirement.check(Some(&self.current_metadata))?; + requirement.check(Some(self.current_table.metadata()))?; } self.update_table_metadata(&updates)?; @@ -106,7 +108,7 @@ impl<'a> Transaction<'a> { /// Sets table to a new version. pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { - let current_version = self.current_metadata.format_version(); + let current_version = self.current_table.metadata().format_version(); match current_version.cmp(&format_version) { Ordering::Greater => { return Err(Error::new( @@ -145,7 +147,8 @@ impl<'a> Transaction<'a> { }; let mut snapshot_id = generate_random_id(); while self - .current_metadata + .current_table + .metadata() .snapshots() .any(|s| s.snapshot_id() == snapshot_id) { @@ -239,7 +242,8 @@ impl<'a> FastAppendAction<'a> { if !self .snapshot_produce_action .tx - .current_metadata + .current_table + .metadata() .default_spec .is_unpartitioned() { @@ -250,9 +254,9 @@ impl<'a> FastAppendAction<'a> { } let data_files = ParquetWriter::parquet_files_to_data_files( - self.snapshot_produce_action.tx.base_table.file_io(), + self.snapshot_produce_action.tx.current_table.file_io(), file_path, - &self.snapshot_produce_action.tx.current_metadata, + self.snapshot_produce_action.tx.current_table.metadata(), ) .await?; @@ -274,7 +278,7 @@ impl<'a> FastAppendAction<'a> { let mut manifest_stream = self .snapshot_produce_action .tx - .base_table + .current_table .inspect() .manifests() .scan() @@ -335,14 +339,19 @@ impl SnapshotProduceOperation for FastAppendOperation { &self, snapshot_produce: &SnapshotProduceAction<'_>, ) -> Result> { - let Some(snapshot) = snapshot_produce.tx.current_metadata.current_snapshot() else { + let Some(snapshot) = snapshot_produce + .tx + .current_table + .metadata() + .current_snapshot() + else { return Ok(vec![]); }; let manifest_list = snapshot .load_manifest_list( - snapshot_produce.tx.base_table.file_io(), - &snapshot_produce.tx.current_metadata, + snapshot_produce.tx.current_table.file_io(), + snapshot_produce.tx.current_table.metadata(), ) .await?; @@ -456,7 +465,7 @@ impl<'a> SnapshotProduceAction<'a> { for data_file in data_files { Self::validate_partition_value( data_file.partition(), - self.tx.current_metadata.default_partition_type(), + self.tx.current_table.metadata().default_partition_type(), )?; if data_file.content_type() == DataContentType::Data { self.added_data_files.push(data_file); @@ -470,13 +479,16 @@ impl<'a> SnapshotProduceAction<'a> { fn new_manifest_output(&mut self) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", - self.tx.current_metadata.location(), + self.tx.current_table.metadata().location(), META_ROOT_PATH, self.commit_uuid, self.manifest_counter.next().unwrap(), DataFileFormat::Avro ); - self.tx.base_table.file_io().new_output(new_manifest_path) + self.tx + .current_table + .file_io() + .new_output(new_manifest_path) } // Write manifest file for added data files and return the ManifestFile for ManifestList. @@ -485,7 +497,7 @@ impl<'a> SnapshotProduceAction<'a> { added_data_files: Vec, ) -> Result { let snapshot_id = self.snapshot_id; - let format_version = self.tx.current_metadata.format_version(); + let format_version = self.tx.current_table.metadata().format_version(); let content_type = { let mut data_num = 0; let mut delete_num = 0; @@ -524,14 +536,15 @@ impl<'a> SnapshotProduceAction<'a> { self.new_manifest_output()?, Some(self.snapshot_id), self.key_metadata.clone(), - self.tx.current_metadata.current_schema().clone(), + self.tx.current_table.metadata().current_schema().clone(), self.tx - .current_metadata + .current_table + .metadata() .default_partition_spec() .as_ref() .clone(), ); - if self.tx.current_metadata.format_version() == FormatVersion::V1 { + if self.tx.current_table.metadata().format_version() == FormatVersion::V1 { builder.build_v1() } else { match content_type { @@ -581,7 +594,7 @@ impl<'a> SnapshotProduceAction<'a> { fn generate_manifest_list_file_path(&self, attempt: i64) -> String { format!( "{}/{}/snap-{}-{}-{}.{}", - self.tx.current_metadata.location(), + self.tx.current_table.metadata().location(), META_ROOT_PATH, self.snapshot_id, attempt, @@ -599,28 +612,28 @@ impl<'a> SnapshotProduceAction<'a> { let new_manifests = self .manifest_file(&snapshot_produce_operation, &process) .await?; - let next_seq_num = self.tx.current_metadata.next_sequence_number(); + let next_seq_num = self.tx.current_table.metadata().next_sequence_number(); let summary = self.summary(&snapshot_produce_operation); let manifest_list_path = self.generate_manifest_list_file_path(0); - let mut manifest_list_writer = match self.tx.current_metadata.format_version() { + let mut manifest_list_writer = match self.tx.current_table.metadata().format_version() { FormatVersion::V1 => ManifestListWriter::v1( self.tx - .base_table + .current_table .file_io() .new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.current_metadata.current_snapshot_id(), + self.tx.current_table.metadata().current_snapshot_id(), ), FormatVersion::V2 => ManifestListWriter::v2( self.tx - .base_table + .current_table .file_io() .new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.current_metadata.current_snapshot_id(), + self.tx.current_table.metadata().current_snapshot_id(), next_seq_num, ), }; @@ -631,10 +644,10 @@ impl<'a> SnapshotProduceAction<'a> { let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path) .with_snapshot_id(self.snapshot_id) - .with_parent_snapshot_id(self.tx.current_metadata.current_snapshot_id()) + .with_parent_snapshot_id(self.tx.current_table.metadata().current_snapshot_id()) .with_sequence_number(next_seq_num) .with_summary(summary) - .with_schema_id(self.tx.current_metadata.current_schema_id()) + .with_schema_id(self.tx.current_table.metadata().current_schema_id()) .with_timestamp_ms(commit_ts) .build(); @@ -653,11 +666,11 @@ impl<'a> SnapshotProduceAction<'a> { ], vec![ TableRequirement::UuidMatch { - uuid: self.tx.current_metadata.uuid(), + uuid: self.tx.current_table.metadata().uuid(), }, TableRequirement::RefSnapshotIdMatch { r#ref: MAIN_BRANCH.to_string(), - snapshot_id: self.tx.current_metadata.current_snapshot_id(), + snapshot_id: self.tx.current_table.metadata().current_snapshot_id(), }, ], )?; @@ -697,10 +710,20 @@ impl<'a> ReplaceSortOrderAction<'a> { let requirements = vec![ TableRequirement::CurrentSchemaIdMatch { - current_schema_id: self.tx.current_metadata.current_schema().schema_id(), + current_schema_id: self + .tx + .current_table + .metadata() + .current_schema() + .schema_id(), }, TableRequirement::DefaultSortOrderIdMatch { - default_sort_order_id: self.tx.current_metadata.default_sort_order().order_id, + default_sort_order_id: self + .tx + .current_table + .metadata() + .default_sort_order() + .order_id, }, ]; @@ -716,7 +739,8 @@ impl<'a> ReplaceSortOrderAction<'a> { ) -> Result { let field_id = self .tx - .current_metadata + .current_table + .metadata() .current_schema() .field_id_by_name(name) .ok_or_else(|| {