diff --git a/src/v/iceberg/manifest_io.cc b/src/v/iceberg/manifest_io.cc index f1c8dde8acb74..69e630fb506d7 100644 --- a/src/v/iceberg/manifest_io.cc +++ b/src/v/iceberg/manifest_io.cc @@ -23,6 +23,18 @@ using namespace std::chrono_literals; namespace iceberg { +ss::future> +manifest_io::download_manifest_uri( + const ss::sstring& uri, const partition_key_type& pk_type) { + auto path = manifest_path(from_uri(uri)); + co_return co_await download_manifest(path, pk_type); +} + +ss::future> +manifest_io::download_manifest_list_uri(const ss::sstring& uri) { + auto path = manifest_list_path(from_uri(uri)); + co_return co_await download_manifest_list(path); +} ss::future> manifest_io::download_manifest( const manifest_path& path, const partition_key_type& pk_type) { @@ -55,4 +67,20 @@ manifest_io::upload_manifest_list( }); } +ss::sstring manifest_io::to_uri(const std::filesystem::path& p) const { + return fmt::format("{}{}", uri_base(), p.native()); +} + +std::filesystem::path manifest_io::from_uri(const ss::sstring& s) const { + const auto base = uri_base(); + if (!s.starts_with(base)) { + return std::filesystem::path{s}; + } + return std::filesystem::path{s.substr(base.size())}; +} + +ss::sstring manifest_io::uri_base() const { + return fmt::format("s3://{}/", bucket_); +} + } // namespace iceberg diff --git a/src/v/iceberg/manifest_io.h b/src/v/iceberg/manifest_io.h index 9fe2f2794f598..4386ca44ad1c5 100644 --- a/src/v/iceberg/manifest_io.h +++ b/src/v/iceberg/manifest_io.h @@ -36,13 +36,29 @@ class manifest_io : public metadata_io { ss::future> download_manifest( const manifest_path& path, const partition_key_type& pk_type); + ss::future> download_manifest_uri( + const ss::sstring& uri, const partition_key_type& pk_type); + ss::future> download_manifest_list(const manifest_list_path& path); + ss::future> + download_manifest_list_uri(const ss::sstring& uri); ss::future> upload_manifest(const manifest_path& path, const manifest&); ss::future> upload_manifest_list(const manifest_list_path& path, const manifest_list&); + + ss::sstring to_uri(const std::filesystem::path& p) const; + +private: + // TODO: make URIs less fragile with an explicit type? + // E.g. s3://bucket/ + ss::sstring uri_base() const; + + // E.g. s3://bucket/path/to/file => path/to/file + // Leaves the path as is if it doesn't match the expected URI base. + std::filesystem::path from_uri(const ss::sstring& s) const; }; } // namespace iceberg diff --git a/src/v/iceberg/merge_append_action.cc b/src/v/iceberg/merge_append_action.cc index 8e96c9b9d1c65..50a711c44122e 100644 --- a/src/v/iceberg/merge_append_action.cc +++ b/src/v/iceberg/merge_append_action.cc @@ -229,8 +229,8 @@ ss::future merge_append_action::build_updates() && { table_cur_snap_id); co_return action::errc::unexpected_state; } - auto mlist_res = co_await io_.download_manifest_list( - manifest_list_path{snap_it->manifest_list_path}); + auto mlist_res = co_await io_.download_manifest_list_uri( + snap_it->manifest_list_path); if (mlist_res.has_error()) { co_return to_action_errc(mlist_res.error()); } @@ -294,7 +294,7 @@ ss::future merge_append_action::build_updates() && { .operation = snapshot_operation::append, .other = {}, }, - .manifest_list_path = new_mlist_path().native(), + .manifest_list_path = io_.to_uri(new_mlist_path()), .schema_id = schema.schema_id, }; updates_and_reqs ret; @@ -382,7 +382,7 @@ merge_append_action::maybe_merge_mfiles_and_new_data( // Since this bin was too small to merge, we won't do anything else to // its manifests, just add them back to the returned container. ret.emplace_back(manifest_file{ - .manifest_path = new_manifest_path().native(), + .manifest_path = io_.to_uri(new_manifest_path()), .manifest_length = mfile_up_res.value(), .partition_spec_id = ctx.pspec.spec_id, .content = manifest_file_content::data, @@ -437,8 +437,8 @@ merge_append_action::merge_mfiles( for (const auto& mfile : to_merge) { // Download the manifest file and collect the entries into the merged // container. - auto mfile_res = co_await io_.download_manifest( - manifest_path{mfile.manifest_path}, ctx.pk_type); + auto mfile_res = co_await io_.download_manifest_uri( + mfile.manifest_path, ctx.pk_type); if (mfile_res.has_error()) { co_return mfile_res.error(); } @@ -473,7 +473,7 @@ merge_append_action::merge_mfiles( co_return mfile_up_res.error(); } manifest_file merged_file{ - .manifest_path = merged_manifest_path().native(), + .manifest_path = io_.to_uri(merged_manifest_path()), .manifest_length = mfile_up_res.value(), .partition_spec_id = ctx.pspec.spec_id, .content = manifest_file_content::data, diff --git a/src/v/iceberg/metadata_io.h b/src/v/iceberg/metadata_io.h index 6a5fa01f0524b..a74d3e410aff5 100644 --- a/src/v/iceberg/metadata_io.h +++ b/src/v/iceberg/metadata_io.h @@ -179,7 +179,7 @@ class metadata_io { } } -private: +protected: cloud_io::remote& io_; const cloud_storage_clients::bucket_name bucket_; }; diff --git a/src/v/iceberg/metadata_query.cc b/src/v/iceberg/metadata_query.cc index 25d9f5eb47b0b..917d9a49b6851 100644 --- a/src/v/iceberg/metadata_query.cc +++ b/src/v/iceberg/metadata_query.cc @@ -186,8 +186,8 @@ do_execute_query( collector.collect(s); continue; } - auto m_list_result = co_await io.download_manifest_list( - manifest_list_path{s.manifest_list_path}); + auto m_list_result = co_await io.download_manifest_list_uri( + s.manifest_list_path); if (m_list_result.has_error()) { vlog( @@ -217,9 +217,8 @@ do_execute_query( if (pk_result.has_error()) { co_return pk_result.error(); } - auto m_result = co_await io.download_manifest( - manifest_path(manifest_file.manifest_path), - std::move(pk_result.value())); + auto m_result = co_await io.download_manifest_uri( + manifest_file.manifest_path, std::move(pk_result.value())); if (m_result.has_error()) { vlog( diff --git a/src/v/iceberg/tests/manifest_io_test.cc b/src/v/iceberg/tests/manifest_io_test.cc index e31178eb2f944..53abf02d699ba 100644 --- a/src/v/iceberg/tests/manifest_io_test.cc +++ b/src/v/iceberg/tests/manifest_io_test.cc @@ -32,17 +32,13 @@ class ManifestIOTest set_expectations_and_listen({}); } auto& remote() { return sr->remote.local(); } - - std::unique_ptr sr; -}; - -TEST_F(ManifestIOTest, TestManifestRoundtrip) { - schema s{ - .schema_struct = std::get(test_nested_schema_type()), - .schema_id = schema::id_t{12}, - .identifier_field_ids = {nested_field::id_t{1}}, - }; - partition_spec p{ + manifest make_manifest() const { + schema s{ + .schema_struct = std::get(test_nested_schema_type()), + .schema_id = schema::id_t{12}, + .identifier_field_ids = {nested_field::id_t{1}}, + }; + partition_spec p{ .spec_id = partition_spec::id_t{8}, .fields = { partition_field{ @@ -53,17 +49,44 @@ TEST_F(ManifestIOTest, TestManifestRoundtrip) { }, }, }; - manifest_metadata meta{ - .schema = std::move(s), - .partition_spec = std::move(p), - .format_version = format_version::v1, - .manifest_content_type = manifest_content_type::data, - }; - manifest m{ - .metadata = std::move(meta), - .entries = {}, - }; + manifest_metadata meta{ + .schema = std::move(s), + .partition_spec = std::move(p), + .format_version = format_version::v1, + .manifest_content_type = manifest_content_type::data, + }; + return manifest{ + .metadata = std::move(meta), + .entries = {}, + }; + } + + manifest_list make_manifest_list() const { + manifest_list m; + for (int i = 0; i < 1024; i++) { + manifest_file file; + file.manifest_path = "path/to/file"; + file.partition_spec_id = partition_spec::id_t{1}; + file.content = manifest_file_content::data; + file.seq_number = sequence_number{3}; + file.min_seq_number = sequence_number{4}; + file.added_snapshot_id = snapshot_id{5}; + file.added_files_count = 6; + file.existing_files_count = 7; + file.deleted_files_count = 8; + file.added_rows_count = 9; + file.existing_rows_count = 10; + file.deleted_rows_count = 11; + m.files.emplace_back(std::move(file)); + } + return m; + } + + std::unique_ptr sr; +}; +TEST_F(ManifestIOTest, TestManifestRoundtrip) { + auto m = make_manifest(); // Missing manifest. auto io = manifest_io(remote(), bucket_name); auto test_path = manifest_path{"foo/bar/baz"}; @@ -83,23 +106,7 @@ TEST_F(ManifestIOTest, TestManifestRoundtrip) { } TEST_F(ManifestIOTest, TestManifestListRoundtrip) { - manifest_list m; - for (int i = 0; i < 1024; i++) { - manifest_file file; - file.manifest_path = "path/to/file"; - file.partition_spec_id = partition_spec::id_t{1}; - file.content = manifest_file_content::data; - file.seq_number = sequence_number{3}; - file.min_seq_number = sequence_number{4}; - file.added_snapshot_id = snapshot_id{5}; - file.added_files_count = 6; - file.existing_files_count = 7; - file.deleted_files_count = 8; - file.added_rows_count = 9; - file.existing_rows_count = 10; - file.deleted_rows_count = 11; - m.files.emplace_back(std::move(file)); - } + manifest_list m = make_manifest_list(); // Missing manifest list. auto io = manifest_io(remote(), bucket_name); @@ -118,6 +125,48 @@ TEST_F(ManifestIOTest, TestManifestListRoundtrip) { ASSERT_EQ(m, m_roundtrip); } +TEST_F(ManifestIOTest, TestManifestRoundtripURIs) { + auto m = make_manifest(); + auto io = manifest_io(remote(), bucket_name); + auto path = manifest_path{"foo/bar/baz"}; + auto test_uri = io.to_uri(path); + ASSERT_TRUE(test_uri.starts_with("s3://")); + + auto up_res = io.upload_manifest(path, m).get(); + ASSERT_FALSE(up_res.has_error()); + + // Use the URI string. + auto dl_res = io.download_manifest_uri(test_uri, empty_pk_type()).get(); + ASSERT_FALSE(dl_res.has_error()); + ASSERT_EQ(m, dl_res.value()); + + // As a safety measure, we'll still parse the raw path if given. + dl_res = io.download_manifest_uri(path().native(), empty_pk_type()).get(); + ASSERT_FALSE(dl_res.has_error()); + ASSERT_EQ(m, dl_res.value()); +} + +TEST_F(ManifestIOTest, TestManifestListRoundtripURIs) { + auto m = make_manifest_list(); + auto io = manifest_io(remote(), bucket_name); + auto path = manifest_list_path{"foo/bar/baz"}; + auto test_uri = io.to_uri(path); + ASSERT_TRUE(test_uri.starts_with("s3://")); + + auto up_res = io.upload_manifest_list(path, m).get(); + ASSERT_FALSE(up_res.has_error()); + + // Use the URI string. + auto dl_res = io.download_manifest_list_uri(test_uri).get(); + ASSERT_FALSE(dl_res.has_error()); + ASSERT_EQ(m, dl_res.value()); + + // As a safety measure, we'll still parse the raw path if given. + dl_res = io.download_manifest_list_uri(path().native()).get(); + ASSERT_FALSE(dl_res.has_error()); + ASSERT_EQ(m, dl_res.value()); +} + TEST_F(ManifestIOTest, TestShutdown) { auto test_path = manifest_path{"foo/bar/baz"}; sr->request_stop(); diff --git a/src/v/iceberg/tests/merge_append_action_test.cc b/src/v/iceberg/tests/merge_append_action_test.cc index 39efd3d08c52b..a10aed12b3843 100644 --- a/src/v/iceberg/tests/merge_append_action_test.cc +++ b/src/v/iceberg/tests/merge_append_action_test.cc @@ -125,9 +125,8 @@ TEST_F(MergeAppendActionTest, TestMergeByCount) { auto latest_mlist_path = table.snapshots.value().back().manifest_list_path; - auto latest_mlist = io.download_manifest_list( - manifest_list_path{latest_mlist_path}) - .get(); + auto latest_mlist + = io.download_manifest_list_uri(latest_mlist_path).get(); ASSERT_TRUE(latest_mlist.has_value()); ASSERT_EQ(latest_mlist.value().files.size(), expected_manifests); } @@ -144,8 +143,7 @@ TEST_F(MergeAppendActionTest, TestMergeByCount) { // Validate that the latest snapshot indeed contains a single manifest. auto latest_mlist_path = table.snapshots.value().back().manifest_list_path; - auto latest_mlist - = io.download_manifest_list(manifest_list_path{latest_mlist_path}).get(); + auto latest_mlist = io.download_manifest_list_uri(latest_mlist_path).get(); ASSERT_TRUE(latest_mlist.has_value()); ASSERT_EQ(latest_mlist.value().files.size(), 1); const auto& merged_mfile = latest_mlist.value().files[0]; @@ -186,9 +184,8 @@ TEST_F(MergeAppendActionTest, TestMergeByBytes) { auto latest_mlist_path = table.snapshots.value().back().manifest_list_path; - auto latest_mlist = io.download_manifest_list( - manifest_list_path{latest_mlist_path}) - .get(); + auto latest_mlist + = io.download_manifest_list_uri(latest_mlist_path).get(); ASSERT_TRUE(latest_mlist.has_value()); ASSERT_EQ(latest_mlist.value().files.size(), expected_manifests); } @@ -207,8 +204,7 @@ TEST_F(MergeAppendActionTest, TestMergeByBytes) { // - the one containing 8 merged 1MB paths // - the two we added that weren't merged auto latest_mlist_path = table.snapshots.value().back().manifest_list_path; - auto latest_mlist - = io.download_manifest_list(manifest_list_path{latest_mlist_path}).get(); + auto latest_mlist = io.download_manifest_list_uri(latest_mlist_path).get(); ASSERT_TRUE(latest_mlist.has_value()); ASSERT_EQ(latest_mlist.value().files.size(), 3); @@ -296,9 +292,8 @@ TEST_F(MergeAppendActionTest, TestPartitionSummaries) { // manifest (no merge yet). auto latest_mlist_path = table.snapshots.value().back().manifest_list_path; - auto latest_mlist_res = io.download_manifest_list( - manifest_list_path{latest_mlist_path}) - .get(); + auto latest_mlist_res + = io.download_manifest_list_uri(latest_mlist_path).get(); ASSERT_TRUE(latest_mlist_res.has_value()); ASSERT_EQ(expected_manifests, latest_mlist_res.value().files.size()); @@ -325,7 +320,7 @@ TEST_F(MergeAppendActionTest, TestPartitionSummaries) { ASSERT_FALSE(res.has_error()) << res.error(); auto latest_mlist_path = table.snapshots.value().back().manifest_list_path; auto latest_mlist_res - = io.download_manifest_list(manifest_list_path{latest_mlist_path}).get(); + = io.download_manifest_list_uri(latest_mlist_path).get(); ASSERT_TRUE(latest_mlist_res.has_value()); ASSERT_EQ(1, latest_mlist_res.value().files.size()); diff --git a/src/v/iceberg/tests/metadata_query_test.cc b/src/v/iceberg/tests/metadata_query_test.cc index 273f6c9d41688..8fdd2190e2c7f 100644 --- a/src/v/iceberg/tests/metadata_query_test.cc +++ b/src/v/iceberg/tests/metadata_query_test.cc @@ -114,8 +114,8 @@ class MetadataQueryTest chunked_vector files; for (auto& s : *table.snapshots) { - auto m_list = co_await io.download_manifest_list( - manifest_list_path(s.manifest_list_path)); + auto m_list = co_await io.download_manifest_list_uri( + s.manifest_list_path); for (auto& f : m_list.assume_value().files) { if (!paths.contains(f.manifest_path)) { paths.emplace(f.manifest_path); @@ -136,8 +136,8 @@ class MetadataQueryTest chunked_vector manifests; auto files = co_await collect_all_manifest_files(table); for (auto& f : files) { - auto m = co_await io.download_manifest( - manifest_path(f.manifest_path), make_partition_key_type(table)); + auto m = co_await io.download_manifest_uri( + f.manifest_path, make_partition_key_type(table)); manifests.push_back(std::move(m.assume_value())); } co_return manifests;