Skip to content

Commit

Permalink
Merge pull request #25038 from andrwng/dl-coordinator-tag-appends
Browse files Browse the repository at this point in the history
dl/coordinator: add tag to redpanda commits
  • Loading branch information
andrwng authored Feb 7, 2025
2 parents 6d657f9 + e2e7ff2 commit 094551f
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 20 deletions.
10 changes: 9 additions & 1 deletion src/v/datalake/coordinator/iceberg_file_committer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ log_and_convert_action_errc(iceberg::action::errc e, std::string_view msg) {
}

constexpr auto commit_meta_prop = "redpanda.commit-metadata";
constexpr auto commit_tag_name = "redpanda.tag";

// Look for the Redpanda commit property in the current snapshot, or the most
// recent ancestor if none.
checked<std::optional<model::offset>, parse_offset_error>
Expand Down Expand Up @@ -305,11 +307,17 @@ class table_commit_builder {
"Adding {} files to Iceberg table {}",
icb_files_.size(),
table_id_);
// NOTE: a non-expiring tag is added to the new snapshot to ensure that
// snapshot expiration doesn't clear this snapshot and its commit
// metadata properties. The tag ensures that we retain them e.g. in
// case of external table updates and low snapshot retention.
iceberg::transaction txn(std::move(table_));
auto icb_append_res = co_await txn.merge_append(
io,
std::move(icb_files_),
{{commit_meta_prop, to_json_str(commit_meta)}});
{{commit_meta_prop, to_json_str(commit_meta)}},
commit_tag_name,
/*tag_expiration_ms=*/std::numeric_limits<long>::max());
if (icb_append_res.has_error()) {
co_return log_and_convert_action_errc(
icb_append_res.error(),
Expand Down
3 changes: 3 additions & 0 deletions src/v/datalake/translation/partition_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ ss::future<> partition_translator::stop() {
}

kafka::offset partition_translator::min_offset_for_translation() const {
if (_partition->is_read_replica_mode_enabled()) {
return model::offset_cast(_partition_proxy->start_offset());
}
return model::offset_cast(_partition_proxy->local_start_offset());
}

Expand Down
10 changes: 10 additions & 0 deletions src/v/iceberg/merge_append_action.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,16 @@ ss::future<action::action_outcome> merge_append_action::build_updates() && {
.type = snapshot_ref_type::branch,
},
});
if (tag_name_.has_value()) {
ret.updates.emplace_back(table_update::set_snapshot_ref{
.ref_name = tag_name_.value(),
.ref = snapshot_reference{
.snapshot_id = new_snap_id,
.type = snapshot_ref_type::tag,
.max_ref_age_ms = tag_expiration_ms_,
},
});
}
ret.requirements.emplace_back(table_requirement::assert_ref_snapshot_id{
.ref = "main",
.snapshot_id = old_snap_id,
Expand Down
8 changes: 7 additions & 1 deletion src/v/iceberg/merge_append_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class merge_append_action : public action {
const table_metadata& table,
chunked_vector<data_file> files,
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props = {},
std::optional<ss::sstring> tag_name = std::nullopt,
std::optional<long> tag_expiration_ms = std::nullopt,
size_t min_to_merge_new_files = default_min_to_merge_new_files,
size_t mfile_target_size_bytes = default_target_size_bytes)
: io_(io)
Expand All @@ -73,7 +75,9 @@ class merge_append_action : public action {
, min_to_merge_new_files_(min_to_merge_new_files)
, mfile_target_size_bytes_(mfile_target_size_bytes)
, new_data_files_(std::move(files))
, snapshot_props_(std::move(snapshot_props)) {}
, snapshot_props_(std::move(snapshot_props))
, tag_name_(std::move(tag_name))
, tag_expiration_ms_(tag_expiration_ms) {}

protected:
ss::future<action_outcome> build_updates() && final;
Expand Down Expand Up @@ -151,6 +155,8 @@ class merge_append_action : public action {
size_t next_manifest_num_{0};
chunked_vector<data_file> new_data_files_;
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props_;
std::optional<ss::sstring> tag_name_;
std::optional<long> tag_expiration_ms_;
};

} // namespace iceberg
112 changes: 112 additions & 0 deletions src/v/iceberg/tests/merge_append_action_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,115 @@ TEST_F(MergeAppendActionTest, TestBadFile) {
ASSERT_TRUE(res.has_error());
ASSERT_EQ(res.error(), action::errc::unexpected_state);
}

TEST_F(MergeAppendActionTest, TestTagSnapshot) {
transaction tx(create_table());
const auto& table = tx.table();
auto res
= tx.merge_append(
io, create_data_files("foo", 1, 1), /*snapshot_props=*/{}, "tag")
.get();
ASSERT_FALSE(res.has_error()) << res.error();
ASSERT_TRUE(table.current_snapshot_id.has_value());

auto snap_id = table.current_snapshot_id.value();
ASSERT_TRUE(table.refs.has_value());
ASSERT_TRUE(table.refs->contains("main"));
ASSERT_TRUE(table.refs->contains("tag"));

// Sanity check, main is always updated.
auto main_snap = table.refs->at("main");
ASSERT_EQ(snap_id, main_snap.snapshot_id);
ASSERT_EQ(main_snap.type, snapshot_ref_type::branch);

// Since we passed a tag, it should exist.
auto tag_snap = table.refs->at("tag");
ASSERT_EQ(snap_id, tag_snap.snapshot_id);
ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag);

// Merge again with a tag.
res = tx.merge_append(
io, create_data_files("foo", 1, 1), /*snapshot_props=*/{}, "tag")
.get();
ASSERT_FALSE(res.has_error()) << res.error();
ASSERT_TRUE(table.current_snapshot_id.has_value());
snap_id = table.current_snapshot_id.value();

// The snapshot references should follow.
tag_snap = table.refs->at("tag");
ASSERT_EQ(snap_id, tag_snap.snapshot_id);
ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag);

// Now merge with a different tag. The old tag shouldn't be affected.
res = tx.merge_append(
io,
create_data_files("foo", 1, 1),
/*snapshot_props=*/{},
/*tag_name=*/"other")
.get();
ASSERT_FALSE(res.has_error()) << res.error();
ASSERT_TRUE(table.current_snapshot_id.has_value());
auto old_snap_id = snap_id;
snap_id = table.current_snapshot_id.value();
ASSERT_NE(old_snap_id, snap_id);

// The new tag should have a new snapshot id.
auto other_snap = table.refs->at("other");
ASSERT_EQ(snap_id, other_snap.snapshot_id);
ASSERT_EQ(other_snap.type, snapshot_ref_type::tag);

// The old tag should refer to the last snapshot that was appended with it.
tag_snap = table.refs->at("tag");
ASSERT_EQ(old_snap_id, tag_snap.snapshot_id);
ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag);
}

TEST_F(MergeAppendActionTest, TestTagWithExpiration) {
transaction tx(create_table());
const auto& table = tx.table();
chunked_hash_set<snapshot_id> snap_ids;
// Add a snapshot without an explicit tag expiration.
auto res
= tx.merge_append(
io, create_data_files("foo", 1, 1), /*snapshot_props=*/{}, "tag")
.get();
ASSERT_FALSE(res.has_error()) << res.error();
ASSERT_TRUE(table.current_snapshot_id.has_value());

auto snap_id = table.current_snapshot_id.value();
ASSERT_TRUE(table.refs.has_value());
ASSERT_TRUE(table.refs->contains("tag"));

// Sanity check, no snapshot reference properties are set.
auto tag_snap = table.refs->at("tag");
ASSERT_EQ(snap_id, tag_snap.snapshot_id);
ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag);
ASSERT_FALSE(tag_snap.max_snapshot_age_ms.has_value());
ASSERT_FALSE(tag_snap.min_snapshots_to_keep.has_value());
ASSERT_FALSE(tag_snap.max_ref_age_ms.has_value());

// Now try again with a tag expiration.
auto long_max = std::numeric_limits<long>::max();
res = tx.merge_append(
io,
create_data_files("foo", 1, 1),
/*snapshot_props=*/{},
"tag",
/*tag_expiration_ms=*/long_max)
.get();
ASSERT_FALSE(res.has_error()) << res.error();
ASSERT_TRUE(table.current_snapshot_id.has_value());

snap_id = table.current_snapshot_id.value();
ASSERT_TRUE(table.refs.has_value());
ASSERT_TRUE(table.refs->contains("tag"));

// Sanity check, just the reference expiration is set.
tag_snap = table.refs->at("tag");
ASSERT_EQ(snap_id, tag_snap.snapshot_id);
ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag);
ASSERT_FALSE(tag_snap.max_snapshot_age_ms.has_value());
ASSERT_FALSE(tag_snap.min_snapshots_to_keep.has_value());
ASSERT_TRUE(tag_snap.max_ref_age_ms.has_value());
ASSERT_EQ(long_max, tag_snap.max_ref_age_ms.value());
}
11 changes: 9 additions & 2 deletions src/v/iceberg/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,16 @@ ss::future<transaction::txn_outcome> transaction::set_schema(schema s) {
ss::future<transaction::txn_outcome> transaction::merge_append(
manifest_io& io,
chunked_vector<data_file> files,
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props) {
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props,
std::optional<ss::sstring> tag_name,
std::optional<long> tag_expiration_ms) {
auto a = std::make_unique<merge_append_action>(
io, table_, std::move(files), std::move(snapshot_props));
io,
table_,
std::move(files),
std::move(snapshot_props),
std::move(tag_name),
tag_expiration_ms);
co_return co_await apply(std::move(a));
}

Expand Down
17 changes: 16 additions & 1 deletion src/v/iceberg/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,25 @@ class transaction {
// schema id if it doesn't exist. Note, the schema id is ignored, and one
// is assigned based on the state of the table.
ss::future<txn_outcome> set_schema(schema);

// Adds the given data files to a new snapshot for the table, potentially
// merging manifests to reduce manifest list size.
//
// If supplied, the given snapshot properties are added to the resulting
// snapshot.
//
// If a tag name is supplied, a snapshot reference "tag" with the given
// reference expiration will be added to the table with the new snapshot.
// This will create the tag or replace an existing tag. As long as this tag
// is not expired, the snapshot is expected to not be removed by snapshot
// expiration. If no tag expiration is set, snapshot expiration will fall
// back on table-wide properties.
ss::future<txn_outcome> merge_append(
manifest_io&,
chunked_vector<data_file>,
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props = {});
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props = {},
std::optional<ss::sstring> tag_name = std::nullopt,
std::optional<long> tag_expiration_ms = std::nullopt);

// Removes expired snapshots from the table, computing expiration based on
// the given timestamp. Note, this does not perform IO to delete any
Expand Down
61 changes: 46 additions & 15 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
from time import time
import time
from rptest.clients.rpk import RpkTool
from rptest.services.cluster import cluster
from random import randint
Expand Down Expand Up @@ -100,7 +100,7 @@ def test_avro_schema(self, cloud_storage_type, query_engine, catalog_type):
},
default_value_schema=schema)
for _ in range(count):
t = time()
t = time.time()
record = {"number": int(t), "timestamp_us": int(t * 1000000)}
producer.produce(topic=self.topic_name, value=record)
producer.flush()
Expand Down Expand Up @@ -180,25 +180,56 @@ def test_remove_expired_snapshots(self, cloud_storage_type):
dl.wait_for_translation(self.topic_name, count)

spark = dl.spark()
snapshots_out = spark.run_query_fetch_all(
f"select * from {table_name}.snapshots")
assert len(
snapshots_out
) >= num_rounds, f"Expected >={num_rounds} snapshots, got {len(snapshots_out)}: {snapshots_out}"

spark.make_client().cursor().execute(
f"alter table {table_name} "
"set tblproperties ('history.expire.max-snapshot-age-ms'='1000')"
)

def has_one_snapshot():
def num_snapshots() -> int:
snapshots_out = spark.run_query_fetch_all(
f"select * from {table_name}.snapshots")
return len(snapshots_out) == 1
return len(snapshots_out)

num_snaps = num_snapshots()
assert num_snaps >= num_rounds, f"Expected >={num_rounds} snapshots, got {num_snaps}"

wait_until(has_one_snapshot, timeout_sec=30, backoff_sec=1)
# Encourage aggressive snapshot cleanup for the table. This
# shouldn't affect Redpanda's snapshots, since Redpanda will tag
# its metadata with separate retention policy.
spark.make_client().cursor().execute(
f"alter table {table_name} set tblproperties("
"'history.expire.max-snapshot-age-ms'='1000', "
"'history.expire.max-ref-age-ms'='1000')")

wait_until(lambda: num_snapshots() == 1,
timeout_sec=30,
backoff_sec=1)
dl.wait_for_translation(self.topic_name, count)

# Externally create another snapshot.
spark.make_client().cursor().execute(
f"insert into {table_name} (select * from {table_name} limit 1)"
)
num_snaps = num_snapshots()
assert num_snaps == 2, f"Expected 2 snapshots after writing: {num_snaps}"

# Wait for some commit intervals to let snapshot expiration run.
# Redpanda should retain the new snapshot _and_ the most recent one
# generated by Redpanda.
time.sleep(10)
num_snaps = num_snapshots()
assert num_snaps == 2, f"Expected Redpanda to retain 2 snapshots: {num_snaps}"

# Validate that Spark does the same.
spark.make_client().cursor().execute(
f"call system.expire_snapshots('{table_name}')")
assert num_snaps == 2, f"Expected Spark to retain 2 snapshots: {num_snaps}"

# Produce more to Redpanda. This will make the externally-created
# snapshot no longer the current snapshot and thus eligible for
# expiry. We'll still retain any Redpanda-created snapshot.
dl.produce_to_topic(self.topic_name, 1, 1)
dl.wait_for_translation(self.topic_name, count + 2)
wait_until(lambda: num_snapshots() == 1,
timeout_sec=30,
backoff_sec=1)

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
catalog_type=supported_catalog_types())
Expand Down

0 comments on commit 094551f

Please sign in to comment.