diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index b94592919..14f64416d 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -31,8 +31,9 @@ use crate::error::Result; use crate::io::OutputFile; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter, - ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention, - SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH, + ManifestWriterBuilder, NestedFieldRef, NullOrder, Operation, Snapshot, SnapshotReference, + SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, + MAIN_BRANCH, }; use crate::table::Table; use crate::writer::file_writer::ParquetWriter; @@ -162,6 +163,17 @@ impl<'a> Transaction<'a> { Ok(self) } + /// Creates an add fields action. + pub fn add_fields( + self, + fields: impl IntoIterator, + ) -> AddFieldsAction<'a> { + AddFieldsAction { + tx: self, + fields: fields.into_iter().collect(), + } + } + /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { let table_commit = TableCommit::builder() @@ -608,6 +620,40 @@ impl<'a> SnapshotProduceAction<'a> { } } +/// Transaction action for easily adding fields to the table. +pub struct AddFieldsAction<'a> { + tx: Transaction<'a>, + fields: Vec, +} + +/// Transaction action for easily adding fields to the table. +impl<'a> AddFieldsAction<'a> { + /// Creates a new `AddFieldsAction` for the given transaction. + pub fn new(tx: Transaction<'a>) -> Self { + Self { tx, fields: vec![] } + } + + /// Finish building the action and apply it to the transaction. + pub fn apply(mut self) -> Result> { + let base_schema = self.tx.table.metadata().current_schema(); + let schema_builder = ::clone(base_schema).into_builder(); + let schema = schema_builder.with_fields(self.fields).build()?; + let schema_id = schema.schema_id(); + + let updates = vec![ + TableUpdate::AddSchema { schema }, + TableUpdate::SetCurrentSchema { + schema_id: schema_id + 1, + }, + ]; + let requirements = vec![TableRequirement::CurrentSchemaIdMatch { + current_schema_id: self.tx.table.metadata().current_schema().schema_id(), + }]; + self.tx.append_updates(updates)?; + self.tx.append_requirements(requirements)?; + Ok(self.tx) + } +} /// Transaction action for replacing sort order. pub struct ReplaceSortOrderAction<'a> { tx: Transaction<'a>, diff --git a/crates/integration_tests/tests/shared_tests/add_field_test.rs b/crates/integration_tests/tests/shared_tests/add_field_test.rs new file mode 100644 index 000000000..a5be843c0 --- /dev/null +++ b/crates/integration_tests/tests/shared_tests/add_field_test.rs @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use futures::TryStreamExt; +use iceberg::transaction::Transaction; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, TableCreation}; +use iceberg_catalog_rest::RestCatalog; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::file::properties::WriterProperties; + +use crate::get_shared_containers; +use crate::shared_tests::{random_ns, test_schema}; + +/// Mimics the behavior of `append_data_file_test` to set up a table, but then adds a new field +/// to the table before appending more data. +#[tokio::test] +async fn test_add_field() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = test_schema(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Create the writer and write the data + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // check parquet file schema + let content = table + .file_io() + .new_input(data_file[0].file_path()) + .unwrap() + .read() + .await + .unwrap(); + let parquet_reader = parquet::arrow::arrow_reader::ArrowReaderMetadata::load( + &content, + ArrowReaderOptions::default(), + ) + .unwrap(); + let field_ids: Vec = parquet_reader + .parquet_schema() + .columns() + .iter() + .map(|col| col.self_type().get_basic_info().id()) + .collect(); + assert_eq!(field_ids, vec![1, 2, 3]); + + // commit result + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx = append_action.apply().await.unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); + + // check result + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); + + // add a new field to the table + let tx = Transaction::new(&table); + let add_action = tx.add_fields(vec![iceberg::spec::NestedFieldRef::new( + iceberg::spec::NestedField::new( + 4, + "a".to_string(), + iceberg::spec::Type::Primitive(iceberg::spec::PrimitiveType::Int), + true, + ), + )]); + let tx = add_action.apply().unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); + + // check result + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); + + // generate a batch including the new field + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let col4 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + Arc::new(col4) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // commit result + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx = append_action.apply().await.unwrap(); + let _table = tx.commit(&rest_catalog).await.unwrap(); + + // the preference would be to read the data out and verify that the number of batches matches, + // but the current implementation of `to_arrow` does not support optional fields that don't + // exist in some parquet files. + // let batch_stream = table + // .scan() + // .select_all() + // .build() + // .unwrap() + // .to_arrow() + // .await + // .unwrap(); + // let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + // assert_eq!(batches.len(), 2); + // assert_eq!(batches[0], batch); +} diff --git a/crates/integration_tests/tests/shared_tests/mod.rs b/crates/integration_tests/tests/shared_tests/mod.rs index feb1c4e58..982f91c39 100644 --- a/crates/integration_tests/tests/shared_tests/mod.rs +++ b/crates/integration_tests/tests/shared_tests/mod.rs @@ -23,6 +23,7 @@ use iceberg_catalog_rest::RestCatalog; use crate::get_shared_containers; +mod add_field_test; mod append_data_file_test; mod append_partition_data_file_test; mod conflict_commit_test;