Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dl/coordinator: add tag to redpanda commits #25038

Merged
merged 2 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/v/datalake/coordinator/iceberg_file_committer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ log_and_convert_action_errc(iceberg::action::errc e, std::string_view msg) {
}

constexpr auto commit_meta_prop = "redpanda.commit-metadata";
constexpr auto commit_tag_name = "redpanda.tag";

// Look for the Redpanda commit property in the current snapshot, or the most
// recent ancestor if none.
checked<std::optional<model::offset>, parse_offset_error>
Expand Down Expand Up @@ -305,11 +307,17 @@ class table_commit_builder {
"Adding {} files to Iceberg table {}",
icb_files_.size(),
table_id_);
// NOTE: a non-expiring tag is added to the new snapshot to ensure that
// snapshot expiration doesn't clear this snapshot and its commit
// metadata properties. The tag ensures that we retain them e.g. in
// case of external table updates and low snapshot retention.
iceberg::transaction txn(std::move(table_));
auto icb_append_res = co_await txn.merge_append(
io,
std::move(icb_files_),
{{commit_meta_prop, to_json_str(commit_meta)}});
{{commit_meta_prop, to_json_str(commit_meta)}},
commit_tag_name,
/*tag_expiration_ms=*/std::numeric_limits<long>::max());
Copy link
Contributor

@nvartolomei nvartolomei Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this should be std::numeric_limits<long long>::max() std::numeric_limits<int64_t>::max() (together with the underlying type def). We never compile for a platform where this would result in 32 bit integer afaik but being explicit doesn't hurt.

long is defined as "at least" 32 bit by the C++ standard, on unix x86/arm 64bit it is actually a 64 bit number so all good, windows is 32 bit (who cares)

32 bit max as ms is ~24 days

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this should be int64_t in case the definition of long long changes... (i.e. it becomes 128 bit) long in iceberg spec is defined as a 64 bit wide integer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, #25118

if (icb_append_res.has_error()) {
co_return log_and_convert_action_errc(
icb_append_res.error(),
Expand Down
3 changes: 3 additions & 0 deletions src/v/datalake/translation/partition_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ ss::future<> partition_translator::stop() {
}

kafka::offset partition_translator::min_offset_for_translation() const {
if (_partition->is_read_replica_mode_enabled()) {
return model::offset_cast(_partition_proxy->start_offset());
}
return model::offset_cast(_partition_proxy->local_start_offset());
}

Expand Down
10 changes: 10 additions & 0 deletions src/v/iceberg/merge_append_action.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,16 @@ ss::future<action::action_outcome> merge_append_action::build_updates() && {
.type = snapshot_ref_type::branch,
},
});
if (tag_name_.has_value()) {
ret.updates.emplace_back(table_update::set_snapshot_ref{
.ref_name = tag_name_.value(),
.ref = snapshot_reference{
.snapshot_id = new_snap_id,
.type = snapshot_ref_type::tag,
.max_ref_age_ms = tag_expiration_ms_,
},
});
}
ret.requirements.emplace_back(table_requirement::assert_ref_snapshot_id{
.ref = "main",
.snapshot_id = old_snap_id,
Expand Down
8 changes: 7 additions & 1 deletion src/v/iceberg/merge_append_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class merge_append_action : public action {
const table_metadata& table,
chunked_vector<data_file> files,
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props = {},
std::optional<ss::sstring> tag_name = std::nullopt,
std::optional<long> tag_expiration_ms = std::nullopt,
size_t min_to_merge_new_files = default_min_to_merge_new_files,
size_t mfile_target_size_bytes = default_target_size_bytes)
: io_(io)
Expand All @@ -73,7 +75,9 @@ class merge_append_action : public action {
, min_to_merge_new_files_(min_to_merge_new_files)
, mfile_target_size_bytes_(mfile_target_size_bytes)
, new_data_files_(std::move(files))
, snapshot_props_(std::move(snapshot_props)) {}
, snapshot_props_(std::move(snapshot_props))
, tag_name_(std::move(tag_name))
, tag_expiration_ms_(tag_expiration_ms) {}

protected:
ss::future<action_outcome> build_updates() && final;
Expand Down Expand Up @@ -151,6 +155,8 @@ class merge_append_action : public action {
size_t next_manifest_num_{0};
chunked_vector<data_file> new_data_files_;
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props_;
std::optional<ss::sstring> tag_name_;
std::optional<long> tag_expiration_ms_;
};

} // namespace iceberg
112 changes: 112 additions & 0 deletions src/v/iceberg/tests/merge_append_action_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,115 @@ TEST_F(MergeAppendActionTest, TestBadFile) {
ASSERT_TRUE(res.has_error());
ASSERT_EQ(res.error(), action::errc::unexpected_state);
}

TEST_F(MergeAppendActionTest, TestTagSnapshot) {
transaction tx(create_table());
const auto& table = tx.table();
auto res
= tx.merge_append(
io, create_data_files("foo", 1, 1), /*snapshot_props=*/{}, "tag")
.get();
ASSERT_FALSE(res.has_error()) << res.error();
ASSERT_TRUE(table.current_snapshot_id.has_value());

auto snap_id = table.current_snapshot_id.value();
ASSERT_TRUE(table.refs.has_value());
ASSERT_TRUE(table.refs->contains("main"));
ASSERT_TRUE(table.refs->contains("tag"));

// Sanity check, main is always updated.
auto main_snap = table.refs->at("main");
ASSERT_EQ(snap_id, main_snap.snapshot_id);
ASSERT_EQ(main_snap.type, snapshot_ref_type::branch);

// Since we passed a tag, it should exist.
auto tag_snap = table.refs->at("tag");
ASSERT_EQ(snap_id, tag_snap.snapshot_id);
ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag);

// Merge again with a tag.
res = tx.merge_append(
io, create_data_files("foo", 1, 1), /*snapshot_props=*/{}, "tag")
.get();
ASSERT_FALSE(res.has_error()) << res.error();
ASSERT_TRUE(table.current_snapshot_id.has_value());
snap_id = table.current_snapshot_id.value();

// The snapshot references should follow.
tag_snap = table.refs->at("tag");
ASSERT_EQ(snap_id, tag_snap.snapshot_id);
ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag);

// Now merge with a different tag. The old tag shouldn't be affected.
res = tx.merge_append(
io,
create_data_files("foo", 1, 1),
/*snapshot_props=*/{},
/*tag_name=*/"other")
.get();
ASSERT_FALSE(res.has_error()) << res.error();
ASSERT_TRUE(table.current_snapshot_id.has_value());
auto old_snap_id = snap_id;
snap_id = table.current_snapshot_id.value();
ASSERT_NE(old_snap_id, snap_id);

// The new tag should have a new snapshot id.
auto other_snap = table.refs->at("other");
ASSERT_EQ(snap_id, other_snap.snapshot_id);
ASSERT_EQ(other_snap.type, snapshot_ref_type::tag);

// The old tag should refer to the last snapshot that was appended with it.
tag_snap = table.refs->at("tag");
ASSERT_EQ(old_snap_id, tag_snap.snapshot_id);
ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag);
}

TEST_F(MergeAppendActionTest, TestTagWithExpiration) {
transaction tx(create_table());
const auto& table = tx.table();
chunked_hash_set<snapshot_id> snap_ids;
// Add a snapshot without an explicit tag expiration.
auto res
= tx.merge_append(
io, create_data_files("foo", 1, 1), /*snapshot_props=*/{}, "tag")
.get();
ASSERT_FALSE(res.has_error()) << res.error();
ASSERT_TRUE(table.current_snapshot_id.has_value());

auto snap_id = table.current_snapshot_id.value();
ASSERT_TRUE(table.refs.has_value());
ASSERT_TRUE(table.refs->contains("tag"));

// Sanity check, no snapshot reference properties are set.
auto tag_snap = table.refs->at("tag");
ASSERT_EQ(snap_id, tag_snap.snapshot_id);
ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag);
ASSERT_FALSE(tag_snap.max_snapshot_age_ms.has_value());
ASSERT_FALSE(tag_snap.min_snapshots_to_keep.has_value());
ASSERT_FALSE(tag_snap.max_ref_age_ms.has_value());

// Now try again with a tag expiration.
auto long_max = std::numeric_limits<long>::max();
res = tx.merge_append(
io,
create_data_files("foo", 1, 1),
/*snapshot_props=*/{},
"tag",
/*tag_expiration_ms=*/long_max)
.get();
ASSERT_FALSE(res.has_error()) << res.error();
ASSERT_TRUE(table.current_snapshot_id.has_value());

snap_id = table.current_snapshot_id.value();
ASSERT_TRUE(table.refs.has_value());
ASSERT_TRUE(table.refs->contains("tag"));

// Sanity check, just the reference expiration is set.
tag_snap = table.refs->at("tag");
ASSERT_EQ(snap_id, tag_snap.snapshot_id);
ASSERT_EQ(tag_snap.type, snapshot_ref_type::tag);
ASSERT_FALSE(tag_snap.max_snapshot_age_ms.has_value());
ASSERT_FALSE(tag_snap.min_snapshots_to_keep.has_value());
ASSERT_TRUE(tag_snap.max_ref_age_ms.has_value());
ASSERT_EQ(long_max, tag_snap.max_ref_age_ms.value());
}
11 changes: 9 additions & 2 deletions src/v/iceberg/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,16 @@ ss::future<transaction::txn_outcome> transaction::set_schema(schema s) {
ss::future<transaction::txn_outcome> transaction::merge_append(
manifest_io& io,
chunked_vector<data_file> files,
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props) {
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props,
std::optional<ss::sstring> tag_name,
std::optional<long> tag_expiration_ms) {
auto a = std::make_unique<merge_append_action>(
io, table_, std::move(files), std::move(snapshot_props));
io,
table_,
std::move(files),
std::move(snapshot_props),
std::move(tag_name),
tag_expiration_ms);
co_return co_await apply(std::move(a));
}

Expand Down
17 changes: 16 additions & 1 deletion src/v/iceberg/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,25 @@ class transaction {
// schema id if it doesn't exist. Note, the schema id is ignored, and one
// is assigned based on the state of the table.
ss::future<txn_outcome> set_schema(schema);

// Adds the given data files to a new snapshot for the table, potentially
// merging manifests to reduce manifest list size.
//
// If supplied, the given snapshot properties are added to the resulting
// snapshot.
//
// If a tag name is supplied, a snapshot reference "tag" with the given
// reference expiration will be added to the table with the new snapshot.
// This will create the tag or replace an existing tag. As long as this tag
// is not expired, the snapshot is expected to not be removed by snapshot
// expiration. If no tag expiration is set, snapshot expiration will fall
// back on table-wide properties.
ss::future<txn_outcome> merge_append(
manifest_io&,
chunked_vector<data_file>,
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props = {});
chunked_vector<std::pair<ss::sstring, ss::sstring>> snapshot_props = {},
std::optional<ss::sstring> tag_name = std::nullopt,
std::optional<long> tag_expiration_ms = std::nullopt);

// Removes expired snapshots from the table, computing expiration based on
// the given timestamp. Note, this does not perform IO to delete any
Expand Down
61 changes: 46 additions & 15 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
from time import time
import time
from rptest.clients.rpk import RpkTool
from rptest.services.cluster import cluster
from random import randint
Expand Down Expand Up @@ -100,7 +100,7 @@ def test_avro_schema(self, cloud_storage_type, query_engine, catalog_type):
},
default_value_schema=schema)
for _ in range(count):
t = time()
t = time.time()
record = {"number": int(t), "timestamp_us": int(t * 1000000)}
producer.produce(topic=self.topic_name, value=record)
producer.flush()
Expand Down Expand Up @@ -180,25 +180,56 @@ def test_remove_expired_snapshots(self, cloud_storage_type):
dl.wait_for_translation(self.topic_name, count)

spark = dl.spark()
snapshots_out = spark.run_query_fetch_all(
f"select * from {table_name}.snapshots")
assert len(
snapshots_out
) >= num_rounds, f"Expected >={num_rounds} snapshots, got {len(snapshots_out)}: {snapshots_out}"

spark.make_client().cursor().execute(
f"alter table {table_name} "
"set tblproperties ('history.expire.max-snapshot-age-ms'='1000')"
)

def has_one_snapshot():
def num_snapshots() -> int:
snapshots_out = spark.run_query_fetch_all(
f"select * from {table_name}.snapshots")
return len(snapshots_out) == 1
return len(snapshots_out)

num_snaps = num_snapshots()
assert num_snaps >= num_rounds, f"Expected >={num_rounds} snapshots, got {num_snaps}"

wait_until(has_one_snapshot, timeout_sec=30, backoff_sec=1)
# Encourage aggressive snapshot cleanup for the table. This
# shouldn't affect Redpanda's snapshots, since Redpanda will tag
# its metadata with separate retention policy.
spark.make_client().cursor().execute(
f"alter table {table_name} set tblproperties("
"'history.expire.max-snapshot-age-ms'='1000', "
"'history.expire.max-ref-age-ms'='1000')")

wait_until(lambda: num_snapshots() == 1,
timeout_sec=30,
backoff_sec=1)
dl.wait_for_translation(self.topic_name, count)

# Externally create another snapshot.
spark.make_client().cursor().execute(
f"insert into {table_name} (select * from {table_name} limit 1)"
)
num_snaps = num_snapshots()
assert num_snaps == 2, f"Expected 2 snapshots after writing: {num_snaps}"

# Wait for some commit intervals to let snapshot expiration run.
# Redpanda should retain the new snapshot _and_ the most recent one
# generated by Redpanda.
time.sleep(10)
num_snaps = num_snapshots()
assert num_snaps == 2, f"Expected Redpanda to retain 2 snapshots: {num_snaps}"

# Validate that Spark does the same.
spark.make_client().cursor().execute(
f"call system.expire_snapshots('{table_name}')")
assert num_snaps == 2, f"Expected Spark to retain 2 snapshots: {num_snaps}"

# Produce more to Redpanda. This will make the externally-created
# snapshot no longer the current snapshot and thus eligible for
# expiry. We'll still retain any Redpanda-created snapshot.
dl.produce_to_topic(self.topic_name, 1, 1)
dl.wait_for_translation(self.topic_name, count + 2)
wait_until(lambda: num_snapshots() == 1,
timeout_sec=30,
backoff_sec=1)

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types(),
catalog_type=supported_catalog_types())
Expand Down