From 9b0a7489e69f45c115919640095a10acccb30866 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Sun, 22 Dec 2024 07:52:54 -0500 Subject: [PATCH] archival: Add rw-fence feature flag Previous implementation of the archival STM had a bug because of which the last applied offset wasn't added to the snapshot. This could potentially make replicas diverge. The solution is to add a feature flag and use the rw-fence mechanism only if all replicas are upgraded. Signed-off-by: Evgeny Lazin <4lazin@gmail.com> --- .../cluster/archival/ntp_archiver_service.cc | 34 +++++++++++++++---- src/v/features/feature_table.cc | 2 ++ src/v/features/feature_table.h | 1 + 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index 1cdb1cfe4fe5..ad7b5020b055 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -771,8 +771,13 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() { _parent.archival_meta_stm()->get_insync_offset()); auto [non_compacted_upload_result, compacted_upload_result] - = co_await upload_next_candidates( - archival_stm_fence{.read_write_fence = fence}); + = co_await upload_next_candidates(archival_stm_fence{ + .read_write_fence = fence, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), + }); if (non_compacted_upload_result.num_failed != 0) { // The logic in class `remote` already does retries: if we get here, // it means the upload failed after several retries, justifying @@ -2630,7 +2635,10 @@ ss::future<> ntp_archiver::apply_archive_retention() { archival_stm_fence fence = { .read_write_fence = _parent.archival_meta_stm()->manifest().get_applied_offset(), - .unsafe_add = false, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), }; std::optional retention_bytes = ntp_conf.retention_bytes(); @@ -2700,7 +2708,10 @@ ss::future<> ntp_archiver::garbage_collect_archive() { archival_stm_fence fence = { .read_write_fence = _parent.archival_meta_stm()->manifest().get_applied_offset(), - .unsafe_add = false, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), }; auto backlog = co_await _manifest_view->get_retention_backlog(); if (backlog.has_failure()) { @@ -3149,7 +3160,10 @@ ss::future<> ntp_archiver::apply_retention() { archival_stm_fence fence = { .read_write_fence = _parent.archival_meta_stm()->manifest().get_applied_offset(), - .unsafe_add = false, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), }; auto arch_so = manifest().get_archive_start_offset(); auto stm_so = manifest().get_start_offset(); @@ -3244,7 +3258,10 @@ ss::future<> ntp_archiver::garbage_collect() { archival_stm_fence fence = { .read_write_fence = _parent.archival_meta_stm()->manifest().get_applied_offset(), - .unsafe_add = false, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), }; // If we are about to delete segments, we must ensure that the remote @@ -3369,7 +3386,10 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) { archival_stm_fence rw_fence{ .read_write_fence = _parent.archival_meta_stm()->manifest().get_applied_offset(), - .unsafe_add = false, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), }; if (!may_begin_uploads()) { co_return find_reupload_candidate_result{ diff --git a/src/v/features/feature_table.cc b/src/v/features/feature_table.cc index eae3596b6c59..cc16faf59a12 100644 --- a/src/v/features/feature_table.cc +++ b/src/v/features/feature_table.cc @@ -98,6 +98,8 @@ std::string_view to_string_view(feature f) { return "datalake_iceberg"; case feature::raft_symmetric_reconfiguration_cancel: return "raft_symmetric_reconfiguration_cancel"; + case feature::cloud_storage_metadata_rw_fence: + return "cloud_storage_metadata_rw_fence"; /* * testing features diff --git a/src/v/features/feature_table.h b/src/v/features/feature_table.h index c6fc643a6aa0..98116cb69477 100644 --- a/src/v/features/feature_table.h +++ b/src/v/features/feature_table.h @@ -69,6 +69,7 @@ enum class feature : std::uint64_t { shadow_indexing_split_topic_property_update = 1ULL << 53U, datalake_iceberg = 1ULL << 54U, raft_symmetric_reconfiguration_cancel = 1ULL << 55U, + cloud_storage_metadata_rw_fence = 1ULL << 56U, // Dummy features for testing only test_alpha = 1ULL << 61U, test_bravo = 1ULL << 62U,