From d3f1710ac44b77f296fcca88d757823b1625ce44 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Wed, 5 Feb 2025 00:30:29 -0800 Subject: [PATCH] dl/coordinator: add tag to redpanda commits Adds a tag to snapshots created by Redpanda. This ensures that the latest Redpanda snapshot won't be removed by snapshot expiry, since tags don't expire by default. This assurance helps us guarantee exactly once delivery even in the face of external table updates. --- .../coordinator/iceberg_file_committer.cc | 11 +++- .../tests/datalake/datalake_e2e_test.py | 61 ++++++++++++++----- 2 files changed, 56 insertions(+), 16 deletions(-) diff --git a/src/v/datalake/coordinator/iceberg_file_committer.cc b/src/v/datalake/coordinator/iceberg_file_committer.cc index 31d835db12ac0..b0ab7af58039d 100644 --- a/src/v/datalake/coordinator/iceberg_file_committer.cc +++ b/src/v/datalake/coordinator/iceberg_file_committer.cc @@ -62,6 +62,8 @@ log_and_convert_action_errc(iceberg::action::errc e, std::string_view msg) { } constexpr auto commit_meta_prop = "redpanda.commit-metadata"; +constexpr auto commit_tag_name = "redpanda.tag"; + // Look for the Redpanda commit property in the current snapshot, or the most // recent ancestor if none. checked, parse_offset_error> @@ -305,11 +307,18 @@ class table_commit_builder { "Adding {} files to Iceberg table {}", icb_files_.size(), table_id_); + // NOTE: a tag is added to the new snapshot to ensure that snapshot + // expiry doesn't clear the snapshot with these commit metadata + // properties. Tagged snapshots are by default never expired, ensuring + // that we retain them e.g. in case of external table updates and low + // snapshot retention. iceberg::transaction txn(std::move(table_)); auto icb_append_res = co_await txn.merge_append( io, std::move(icb_files_), - {{commit_meta_prop, to_json_str(commit_meta)}}); + {{commit_meta_prop, to_json_str(commit_meta)}}, + commit_tag_name, + /*tag_expiration_ms=*/std::numeric_limits::max()); if (icb_append_res.has_error()) { co_return log_and_convert_action_errc( icb_append_res.error(), diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index c57afe40d0550..c2088e216068f 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -6,7 +6,7 @@ # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -from time import time +import time from rptest.clients.rpk import RpkTool from rptest.services.cluster import cluster from random import randint @@ -100,7 +100,7 @@ def test_avro_schema(self, cloud_storage_type, query_engine, catalog_type): }, default_value_schema=schema) for _ in range(count): - t = time() + t = time.time() record = {"number": int(t), "timestamp_us": int(t * 1000000)} producer.produce(topic=self.topic_name, value=record) producer.flush() @@ -180,25 +180,56 @@ def test_remove_expired_snapshots(self, cloud_storage_type): dl.wait_for_translation(self.topic_name, count) spark = dl.spark() - snapshots_out = spark.run_query_fetch_all( - f"select * from {table_name}.snapshots") - assert len( - snapshots_out - ) >= num_rounds, f"Expected >={num_rounds} snapshots, got {len(snapshots_out)}: {snapshots_out}" - spark.make_client().cursor().execute( - f"alter table {table_name} " - "set tblproperties ('history.expire.max-snapshot-age-ms'='1000')" - ) - - def has_one_snapshot(): + def num_snapshots() -> int: snapshots_out = spark.run_query_fetch_all( f"select * from {table_name}.snapshots") - return len(snapshots_out) == 1 + return len(snapshots_out) + + num_snaps = num_snapshots() + assert num_snaps >= num_rounds, f"Expected >={num_rounds} snapshots, got {num_snaps}" - wait_until(has_one_snapshot, timeout_sec=30, backoff_sec=1) + # Encourage aggressive snapshot cleanup for the table. This + # shouldn't affect Redpanda's snapshots, since Redpanda will tag + # its metadata with separate retention policy. + spark.make_client().cursor().execute( + f"alter table {table_name} set tblproperties(" + "'history.expire.max-snapshot-age-ms'='1000', " + "'history.expire.max-ref-age-ms'='1000')") + + wait_until(lambda: num_snapshots() == 1, + timeout_sec=30, + backoff_sec=1) dl.wait_for_translation(self.topic_name, count) + # Externally create another snapshot. + spark.make_client().cursor().execute( + f"insert into {table_name} (select * from {table_name} limit 1)" + ) + num_snaps = num_snapshots() + assert num_snaps == 2, f"Expected 2 snapshots after writing: {num_snaps}" + + # Wait for some commit intervals to let snapshot expiration run. + # Redpanda should retain the new snapshot _and_ the most recent one + # generated by Redpanda. + time.sleep(10) + num_snaps = num_snapshots() + assert num_snaps == 2, f"Expected Redpanda to retain 2 snapshots: {num_snaps}" + + # Validate that Spark does the same. + spark.make_client().cursor().execute( + f"call system.expire_snapshots('{table_name}')") + assert num_snaps == 2, f"Expected Spark to retain 2 snapshots: {num_snaps}" + + # Produce more to Redpanda. This will make the externally-created + # snapshot no longer the current snapshot and thus eligible for + # expiry. We'll still retain any Redpanda-created snapshot. + dl.produce_to_topic(self.topic_name, 1, 1) + dl.wait_for_translation(self.topic_name, count + 2) + wait_until(lambda: num_snapshots() == 1, + timeout_sec=30, + backoff_sec=1) + @cluster(num_nodes=4) @matrix(cloud_storage_type=supported_storage_types(), catalog_type=supported_catalog_types())