diff --git a/src/v/datalake/coordinator/iceberg_file_committer.cc b/src/v/datalake/coordinator/iceberg_file_committer.cc index 31d835db12ac..5b6e819841da 100644 --- a/src/v/datalake/coordinator/iceberg_file_committer.cc +++ b/src/v/datalake/coordinator/iceberg_file_committer.cc @@ -62,6 +62,8 @@ log_and_convert_action_errc(iceberg::action::errc e, std::string_view msg) { } constexpr auto commit_meta_prop = "redpanda.commit-metadata"; +constexpr auto commit_tag_name = "redpanda.tag"; + // Look for the Redpanda commit property in the current snapshot, or the most // recent ancestor if none. checked, parse_offset_error> @@ -305,11 +307,17 @@ class table_commit_builder { "Adding {} files to Iceberg table {}", icb_files_.size(), table_id_); + // NOTE: a non-expiring tag is added to the new snapshot to ensure that + // snapshot expiration doesn't clear this snapshot and its commit + // metadata properties. The tag ensures that we retain them e.g. in + // case of external table updates and low snapshot retention. iceberg::transaction txn(std::move(table_)); auto icb_append_res = co_await txn.merge_append( io, std::move(icb_files_), - {{commit_meta_prop, to_json_str(commit_meta)}}); + {{commit_meta_prop, to_json_str(commit_meta)}}, + commit_tag_name, + /*tag_expiration_ms=*/std::numeric_limits::max()); if (icb_append_res.has_error()) { co_return log_and_convert_action_errc( icb_append_res.error(), diff --git a/src/v/datalake/translation/partition_translator.cc b/src/v/datalake/translation/partition_translator.cc index e11df16ab767..5fce43f46e41 100644 --- a/src/v/datalake/translation/partition_translator.cc +++ b/src/v/datalake/translation/partition_translator.cc @@ -251,6 +251,9 @@ ss::future<> partition_translator::stop() { } kafka::offset partition_translator::min_offset_for_translation() const { + if (_partition->is_read_replica_mode_enabled()) { + return model::offset_cast(_partition_proxy->start_offset()); + } return model::offset_cast(_partition_proxy->local_start_offset()); } diff --git a/src/v/iceberg/merge_append_action.cc b/src/v/iceberg/merge_append_action.cc index 5589bd6dbec1..3799f3668464 100644 --- a/src/v/iceberg/merge_append_action.cc +++ b/src/v/iceberg/merge_append_action.cc @@ -314,6 +314,16 @@ ss::future merge_append_action::build_updates() && { .type = snapshot_ref_type::branch, }, }); + if (tag_name_.has_value()) { + ret.updates.emplace_back(table_update::set_snapshot_ref{ + .ref_name = tag_name_.value(), + .ref = snapshot_reference{ + .snapshot_id = new_snap_id, + .type = snapshot_ref_type::tag, + .max_ref_age_ms = tag_expiration_ms_, + }, + }); + } ret.requirements.emplace_back(table_requirement::assert_ref_snapshot_id{ .ref = "main", .snapshot_id = old_snap_id, diff --git a/src/v/iceberg/merge_append_action.h b/src/v/iceberg/merge_append_action.h index e8cf83ba897f..aa40540c5de7 100644 --- a/src/v/iceberg/merge_append_action.h +++ b/src/v/iceberg/merge_append_action.h @@ -65,6 +65,8 @@ class merge_append_action : public action { const table_metadata& table, chunked_vector files, chunked_vector> snapshot_props = {}, + std::optional tag_name = std::nullopt, + std::optional tag_expiration_ms = std::nullopt, size_t min_to_merge_new_files = default_min_to_merge_new_files, size_t mfile_target_size_bytes = default_target_size_bytes) : io_(io) @@ -73,7 +75,9 @@ class merge_append_action : public action { , min_to_merge_new_files_(min_to_merge_new_files) , mfile_target_size_bytes_(mfile_target_size_bytes) , new_data_files_(std::move(files)) - , snapshot_props_(std::move(snapshot_props)) {} + , snapshot_props_(std::move(snapshot_props)) + , tag_name_(std::move(tag_name)) + , tag_expiration_ms_(tag_expiration_ms) {} protected: ss::future build_updates() && final; @@ -151,6 +155,8 @@ class merge_append_action : public action { size_t next_manifest_num_{0}; chunked_vector new_data_files_; chunked_vector> snapshot_props_; + std::optional tag_name_; + std::optional tag_expiration_ms_; }; } // namespace iceberg diff --git a/src/v/iceberg/tests/merge_append_action_test.cc b/src/v/iceberg/tests/merge_append_action_test.cc index 40468b93dbe5..51fdcdf28194 100644 --- a/src/v/iceberg/tests/merge_append_action_test.cc +++ b/src/v/iceberg/tests/merge_append_action_test.cc @@ -397,3 +397,115 @@ TEST_F(MergeAppendActionTest, TestBadFile) { ASSERT_TRUE(res.has_error()); ASSERT_EQ(res.error(), action::errc::unexpected_state); } + +TEST_F(MergeAppendActionTest, TestTagSnapshot) { + transaction tx(create_table()); + const auto& table = tx.table(); + auto res + = tx.merge_append( + io, create_data_files("foo", 1, 1), /*snapshot_props=*/{}, "tag") + .get(); + ASSERT_FALSE(res.has_error()) << res.error(); + ASSERT_TRUE(table.current_snapshot_id.has_value()); + + auto snap_id = table.current_snapshot_id.value(); + ASSERT_TRUE(table.refs.has_value()); + ASSERT_TRUE(table.refs->contains("main")); + ASSERT_TRUE(table.refs->contains("tag")); + + // Sanity check, main is always updated. + auto main_snap = table.refs->at("main"); + ASSERT_EQ(snap_id, main_snap.snapshot_id); + ASSERT_EQ(main_snap.type, snapshot_ref_type::branch); + + // Since we passed a tag, it should exist. + auto tag_snap = table.refs->at("tag"); + ASSERT_EQ(snap_id, tag_snap.snapshot_id); + ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag); + + // Merge again with a tag. + res = tx.merge_append( + io, create_data_files("foo", 1, 1), /*snapshot_props=*/{}, "tag") + .get(); + ASSERT_FALSE(res.has_error()) << res.error(); + ASSERT_TRUE(table.current_snapshot_id.has_value()); + snap_id = table.current_snapshot_id.value(); + + // The snapshot references should follow. + tag_snap = table.refs->at("tag"); + ASSERT_EQ(snap_id, tag_snap.snapshot_id); + ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag); + + // Now merge with a different tag. The old tag shouldn't be affected. + res = tx.merge_append( + io, + create_data_files("foo", 1, 1), + /*snapshot_props=*/{}, + /*tag_name=*/"other") + .get(); + ASSERT_FALSE(res.has_error()) << res.error(); + ASSERT_TRUE(table.current_snapshot_id.has_value()); + auto old_snap_id = snap_id; + snap_id = table.current_snapshot_id.value(); + ASSERT_NE(old_snap_id, snap_id); + + // The new tag should have a new snapshot id. + auto other_snap = table.refs->at("other"); + ASSERT_EQ(snap_id, other_snap.snapshot_id); + ASSERT_EQ(other_snap.type, snapshot_ref_type::tag); + + // The old tag should refer to the last snapshot that was appended with it. + tag_snap = table.refs->at("tag"); + ASSERT_EQ(old_snap_id, tag_snap.snapshot_id); + ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag); +} + +TEST_F(MergeAppendActionTest, TestTagWithExpiration) { + transaction tx(create_table()); + const auto& table = tx.table(); + chunked_hash_set snap_ids; + // Add a snapshot without an explicit tag expiration. + auto res + = tx.merge_append( + io, create_data_files("foo", 1, 1), /*snapshot_props=*/{}, "tag") + .get(); + ASSERT_FALSE(res.has_error()) << res.error(); + ASSERT_TRUE(table.current_snapshot_id.has_value()); + + auto snap_id = table.current_snapshot_id.value(); + ASSERT_TRUE(table.refs.has_value()); + ASSERT_TRUE(table.refs->contains("tag")); + + // Sanity check, no snapshot reference properties are set. + auto tag_snap = table.refs->at("tag"); + ASSERT_EQ(snap_id, tag_snap.snapshot_id); + ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag); + ASSERT_FALSE(tag_snap.max_snapshot_age_ms.has_value()); + ASSERT_FALSE(tag_snap.min_snapshots_to_keep.has_value()); + ASSERT_FALSE(tag_snap.max_ref_age_ms.has_value()); + + // Now try again with a tag expiration. + auto long_max = std::numeric_limits::max(); + res = tx.merge_append( + io, + create_data_files("foo", 1, 1), + /*snapshot_props=*/{}, + "tag", + /*tag_expiration_ms=*/long_max) + .get(); + ASSERT_FALSE(res.has_error()) << res.error(); + ASSERT_TRUE(table.current_snapshot_id.has_value()); + + snap_id = table.current_snapshot_id.value(); + ASSERT_TRUE(table.refs.has_value()); + ASSERT_TRUE(table.refs->contains("tag")); + + // Sanity check, just the reference expiration is set. + tag_snap = table.refs->at("tag"); + ASSERT_EQ(snap_id, tag_snap.snapshot_id); + ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag); + ASSERT_FALSE(tag_snap.max_snapshot_age_ms.has_value()); + ASSERT_FALSE(tag_snap.min_snapshots_to_keep.has_value()); + ASSERT_TRUE(tag_snap.max_ref_age_ms.has_value()); + ASSERT_EQ(long_max, tag_snap.max_ref_age_ms.value()); +} diff --git a/src/v/iceberg/transaction.cc b/src/v/iceberg/transaction.cc index c7bb9e2df193..8ab97810a16f 100644 --- a/src/v/iceberg/transaction.cc +++ b/src/v/iceberg/transaction.cc @@ -72,9 +72,16 @@ ss::future transaction::set_schema(schema s) { ss::future transaction::merge_append( manifest_io& io, chunked_vector files, - chunked_vector> snapshot_props) { + chunked_vector> snapshot_props, + std::optional tag_name, + std::optional tag_expiration_ms) { auto a = std::make_unique( - io, table_, std::move(files), std::move(snapshot_props)); + io, + table_, + std::move(files), + std::move(snapshot_props), + std::move(tag_name), + tag_expiration_ms); co_return co_await apply(std::move(a)); } diff --git a/src/v/iceberg/transaction.h b/src/v/iceberg/transaction.h index 7ff123a2d4f8..173ad20d5cd2 100644 --- a/src/v/iceberg/transaction.h +++ b/src/v/iceberg/transaction.h @@ -36,10 +36,25 @@ class transaction { // schema id if it doesn't exist. Note, the schema id is ignored, and one // is assigned based on the state of the table. ss::future set_schema(schema); + + // Adds the given data files to a new snapshot for the table, potentially + // merging manifests to reduce manifest list size. + // + // If supplied, the given snapshot properties are added to the resulting + // snapshot. + // + // If a tag name is supplied, a snapshot reference "tag" with the given + // reference expiration will be added to the table with the new snapshot. + // This will create the tag or replace an existing tag. As long as this tag + // is not expired, the snapshot is expected to not be removed by snapshot + // expiration. If no tag expiration is set, snapshot expiration will fall + // back on table-wide properties. ss::future merge_append( manifest_io&, chunked_vector, - chunked_vector> snapshot_props = {}); + chunked_vector> snapshot_props = {}, + std::optional tag_name = std::nullopt, + std::optional tag_expiration_ms = std::nullopt); // Removes expired snapshots from the table, computing expiration based on // the given timestamp. Note, this does not perform IO to delete any diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index c57afe40d055..c2088e216068 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -6,7 +6,7 @@ # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -from time import time +import time from rptest.clients.rpk import RpkTool from rptest.services.cluster import cluster from random import randint @@ -100,7 +100,7 @@ def test_avro_schema(self, cloud_storage_type, query_engine, catalog_type): }, default_value_schema=schema) for _ in range(count): - t = time() + t = time.time() record = {"number": int(t), "timestamp_us": int(t * 1000000)} producer.produce(topic=self.topic_name, value=record) producer.flush() @@ -180,25 +180,56 @@ def test_remove_expired_snapshots(self, cloud_storage_type): dl.wait_for_translation(self.topic_name, count) spark = dl.spark() - snapshots_out = spark.run_query_fetch_all( - f"select * from {table_name}.snapshots") - assert len( - snapshots_out - ) >= num_rounds, f"Expected >={num_rounds} snapshots, got {len(snapshots_out)}: {snapshots_out}" - spark.make_client().cursor().execute( - f"alter table {table_name} " - "set tblproperties ('history.expire.max-snapshot-age-ms'='1000')" - ) - - def has_one_snapshot(): + def num_snapshots() -> int: snapshots_out = spark.run_query_fetch_all( f"select * from {table_name}.snapshots") - return len(snapshots_out) == 1 + return len(snapshots_out) + + num_snaps = num_snapshots() + assert num_snaps >= num_rounds, f"Expected >={num_rounds} snapshots, got {num_snaps}" - wait_until(has_one_snapshot, timeout_sec=30, backoff_sec=1) + # Encourage aggressive snapshot cleanup for the table. This + # shouldn't affect Redpanda's snapshots, since Redpanda will tag + # its metadata with separate retention policy. + spark.make_client().cursor().execute( + f"alter table {table_name} set tblproperties(" + "'history.expire.max-snapshot-age-ms'='1000', " + "'history.expire.max-ref-age-ms'='1000')") + + wait_until(lambda: num_snapshots() == 1, + timeout_sec=30, + backoff_sec=1) dl.wait_for_translation(self.topic_name, count) + # Externally create another snapshot. + spark.make_client().cursor().execute( + f"insert into {table_name} (select * from {table_name} limit 1)" + ) + num_snaps = num_snapshots() + assert num_snaps == 2, f"Expected 2 snapshots after writing: {num_snaps}" + + # Wait for some commit intervals to let snapshot expiration run. + # Redpanda should retain the new snapshot _and_ the most recent one + # generated by Redpanda. + time.sleep(10) + num_snaps = num_snapshots() + assert num_snaps == 2, f"Expected Redpanda to retain 2 snapshots: {num_snaps}" + + # Validate that Spark does the same. + spark.make_client().cursor().execute( + f"call system.expire_snapshots('{table_name}')") + assert num_snaps == 2, f"Expected Spark to retain 2 snapshots: {num_snaps}" + + # Produce more to Redpanda. This will make the externally-created + # snapshot no longer the current snapshot and thus eligible for + # expiry. We'll still retain any Redpanda-created snapshot. + dl.produce_to_topic(self.topic_name, 1, 1) + dl.wait_for_translation(self.topic_name, count + 2) + wait_until(lambda: num_snapshots() == 1, + timeout_sec=30, + backoff_sec=1) + @cluster(num_nodes=4) @matrix(cloud_storage_type=supported_storage_types(), catalog_type=supported_catalog_types())