Skip to content

Commit

Permalink
dl/coordinator: add tag to redpanda commits
Browse files Browse the repository at this point in the history
Adds a tag to snapshots created by Redpanda. This ensures that the
latest Redpanda snapshot won't be removed by snapshot expiry, since tags
don't expire by default. This assurance helps us guarantee exactly once
delivery even in the face of external table updates.
  • Loading branch information
andrwng committed Feb 6, 2025
1 parent 592556c commit e2e7ff2
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 16 deletions.
10 changes: 9 additions & 1 deletion src/v/datalake/coordinator/iceberg_file_committer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ log_and_convert_action_errc(iceberg::action::errc e, std::string_view msg) {
}

constexpr auto commit_meta_prop = "redpanda.commit-metadata";
constexpr auto commit_tag_name = "redpanda.tag";

// Look for the Redpanda commit property in the current snapshot, or the most
// recent ancestor if none.
checked<std::optional<model::offset>, parse_offset_error>
Expand Down Expand Up @@ -305,11 +307,17 @@ class table_commit_builder {
"Adding {} files to Iceberg table {}",
icb_files_.size(),
table_id_);
// NOTE: a non-expiring tag is added to the new snapshot to ensure that
// snapshot expiration doesn't clear this snapshot and its commit
// metadata properties. The tag ensures that we retain them e.g. in
// case of external table updates and low snapshot retention.
iceberg::transaction txn(std::move(table_));
auto icb_append_res = co_await txn.merge_append(
io,
std::move(icb_files_),
{{commit_meta_prop, to_json_str(commit_meta)}});
{{commit_meta_prop, to_json_str(commit_meta)}},
commit_tag_name,
/*tag_expiration_ms=*/std::numeric_limits<long>::max());
if (icb_append_res.has_error()) {
co_return log_and_convert_action_errc(
icb_append_res.error(),
Expand Down
3 changes: 3 additions & 0 deletions src/v/datalake/translation/partition_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ ss::future<> partition_translator::stop() {
}

kafka::offset partition_translator::min_offset_for_translation() const {
if (_partition->is_read_replica_mode_enabled()) {
return model::offset_cast(_partition_proxy->start_offset());
}
return model::offset_cast(_partition_proxy->local_start_offset());
}

Expand Down
61 changes: 46 additions & 15 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
from time import time
import time
from rptest.clients.rpk import RpkTool
from rptest.services.cluster import cluster
from random import randint
Expand Down Expand Up @@ -100,7 +100,7 @@ def test_avro_schema(self, cloud_storage_type, query_engine, catalog_type):
},
default_value_schema=schema)
for _ in range(count):
t = time()
t = time.time()
record = {"number": int(t), "timestamp_us": int(t * 1000000)}
producer.produce(topic=self.topic_name, value=record)
producer.flush()
Expand Down Expand Up @@ -180,25 +180,56 @@ def test_remove_expired_snapshots(self, cloud_storage_type):
dl.wait_for_translation(self.topic_name, count)

spark = dl.spark()
snapshots_out = spark.run_query_fetch_all(
f"select * from {table_name}.snapshots")
assert len(
snapshots_out
) >= num_rounds, f"Expected >={num_rounds} snapshots, got {len(snapshots_out)}: {snapshots_out}"

spark.make_client().cursor().execute(
f"alter table {table_name} "
"set tblproperties ('history.expire.max-snapshot-age-ms'='1000')"
)

def has_one_snapshot():
def num_snapshots() -> int:
snapshots_out = spark.run_query_fetch_all(
f"select * from {table_name}.snapshots")
return len(snapshots_out) == 1
return len(snapshots_out)

num_snaps = num_snapshots()
assert num_snaps >= num_rounds, f"Expected >={num_rounds} snapshots, got {num_snaps}"

wait_until(has_one_snapshot, timeout_sec=30, backoff_sec=1)
# Encourage aggressive snapshot cleanup for the table. This
# shouldn't affect Redpanda's snapshots, since Redpanda will tag
# its metadata with separate retention policy.
spark.make_client().cursor().execute(
f"alter table {table_name} set tblproperties("
"'history.expire.max-snapshot-age-ms'='1000', "
"'history.expire.max-ref-age-ms'='1000')")

wait_until(lambda: num_snapshots() == 1,
timeout_sec=30,
backoff_sec=1)
dl.wait_for_translation(self.topic_name, count)

# Externally create another snapshot.
spark.make_client().cursor().execute(
f"insert into {table_name} (select * from {table_name} limit 1)"
)
num_snaps = num_snapshots()
assert num_snaps == 2, f"Expected 2 snapshots after writing: {num_snaps}"

# Wait for some commit intervals to let snapshot expiration run.
# Redpanda should retain the new snapshot _and_ the most recent one
# generated by Redpanda.
time.sleep(10)
num_snaps = num_snapshots()
assert num_snaps == 2, f"Expected Redpanda to retain 2 snapshots: {num_snaps}"

# Validate that Spark does the same.
spark.make_client().cursor().execute(
f"call system.expire_snapshots('{table_name}')")
assert num_snaps == 2, f"Expected Spark to retain 2 snapshots: {num_snaps}"

# Produce more to Redpanda. This will make the externally-created
# snapshot no longer the current snapshot and thus eligible for
# expiry. We'll still retain any Redpanda-created snapshot.
dl.produce_to_topic(self.topic_name, 1, 1)
dl.wait_for_translation(self.topic_name, count + 2)
wait_until(lambda: num_snapshots() == 1,
timeout_sec=30,
backoff_sec=1)

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
catalog_type=supported_catalog_types())
Expand Down

0 comments on commit e2e7ff2

Please sign in to comment.