Skip to content

Commit

Permalink
archival: Add rw-fence feature flag
Browse files Browse the repository at this point in the history
Previous implementation of the archival STM had a bug because of which
the last applied offset wasn't added to the snapshot. This could
potentially make replicas diverge. The solution is to add a feature flag
and use the rw-fence mechanism only if all replicas are upgraded.

Signed-off-by: Evgeny Lazin <[email protected]>
  • Loading branch information
Lazin committed Dec 22, 2024
1 parent 8d5bdfd commit 9b0a748
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
34 changes: 27 additions & 7 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,13 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() {
_parent.archival_meta_stm()->get_insync_offset());

auto [non_compacted_upload_result, compacted_upload_result]
= co_await upload_next_candidates(
archival_stm_fence{.read_write_fence = fence});
= co_await upload_next_candidates(archival_stm_fence{
.read_write_fence = fence,
// Only use the rw-fence if the feature is enabled which requires
// major version upgrade.
.unsafe_add = !_feature_table.local().is_active(
features::feature::cloud_storage_metadata_rw_fence),
});
if (non_compacted_upload_result.num_failed != 0) {
// The logic in class `remote` already does retries: if we get here,
// it means the upload failed after several retries, justifying
Expand Down Expand Up @@ -2630,7 +2635,10 @@ ss::future<> ntp_archiver::apply_archive_retention() {
archival_stm_fence fence = {
.read_write_fence
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
.unsafe_add = false,
// Only use the rw-fence if the feature is enabled which requires
// major version upgrade.
.unsafe_add = !_feature_table.local().is_active(
features::feature::cloud_storage_metadata_rw_fence),
};

std::optional<size_t> retention_bytes = ntp_conf.retention_bytes();
Expand Down Expand Up @@ -2700,7 +2708,10 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
archival_stm_fence fence = {
.read_write_fence
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
.unsafe_add = false,
// Only use the rw-fence if the feature is enabled which requires
// major version upgrade.
.unsafe_add = !_feature_table.local().is_active(
features::feature::cloud_storage_metadata_rw_fence),
};
auto backlog = co_await _manifest_view->get_retention_backlog();
if (backlog.has_failure()) {
Expand Down Expand Up @@ -3149,7 +3160,10 @@ ss::future<> ntp_archiver::apply_retention() {
archival_stm_fence fence = {
.read_write_fence
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
.unsafe_add = false,
// Only use the rw-fence if the feature is enabled which requires
// major version upgrade.
.unsafe_add = !_feature_table.local().is_active(
features::feature::cloud_storage_metadata_rw_fence),
};
auto arch_so = manifest().get_archive_start_offset();
auto stm_so = manifest().get_start_offset();
Expand Down Expand Up @@ -3244,7 +3258,10 @@ ss::future<> ntp_archiver::garbage_collect() {
archival_stm_fence fence = {
.read_write_fence
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
.unsafe_add = false,
// Only use the rw-fence if the feature is enabled which requires
// major version upgrade.
.unsafe_add = !_feature_table.local().is_active(
features::feature::cloud_storage_metadata_rw_fence),
};

// If we are about to delete segments, we must ensure that the remote
Expand Down Expand Up @@ -3369,7 +3386,10 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) {
archival_stm_fence rw_fence{
.read_write_fence
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
.unsafe_add = false,
// Only use the rw-fence if the feature is enabled which requires
// major version upgrade.
.unsafe_add = !_feature_table.local().is_active(
features::feature::cloud_storage_metadata_rw_fence),
};
if (!may_begin_uploads()) {
co_return find_reupload_candidate_result{
Expand Down
2 changes: 2 additions & 0 deletions src/v/features/feature_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ std::string_view to_string_view(feature f) {
return "datalake_iceberg";
case feature::raft_symmetric_reconfiguration_cancel:
return "raft_symmetric_reconfiguration_cancel";
case feature::cloud_storage_metadata_rw_fence:
return "cloud_storage_metadata_rw_fence";

/*
* testing features
Expand Down
1 change: 1 addition & 0 deletions src/v/features/feature_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ enum class feature : std::uint64_t {
shadow_indexing_split_topic_property_update = 1ULL << 53U,
datalake_iceberg = 1ULL << 54U,
raft_symmetric_reconfiguration_cancel = 1ULL << 55U,
cloud_storage_metadata_rw_fence = 1ULL << 56U,
// Dummy features for testing only
test_alpha = 1ULL << 61U,
test_bravo = 1ULL << 62U,
Expand Down

0 comments on commit 9b0a748

Please sign in to comment.