Skip to content

Commit

Permalink
Merge pull request #15677 from nvartolomei/nv/last-replicate-bug-v2
Browse files Browse the repository at this point in the history
c/archival_stm: do not reset _last_replicate on timeout
  • Loading branch information
nvartolomei authored Dec 19, 2023
2 parents 50d0446 + 299e12d commit 5f34eed
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 18 deletions.
11 changes: 11 additions & 0 deletions src/v/archival/tests/archival_metadata_stm_gtest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,17 @@ TEST_F_CORO(
});
ASSERT_FALSE_CORO(sync_result_before_replication);

// Subsequent calls to sync should fail too.
auto second_sync_result_before_replication = co_await with_leader(
10s, [this, &plagued_node](raft::raft_node_instance& node) mutable {
if (node.get_vnode() != plagued_node) {
throw std::runtime_error{"Leadership moved"};
}

return get_leader_stm().sync(10ms);
});
ASSERT_FALSE_CORO(second_sync_result_before_replication);

// Allow replication to progress.
may_resume_append.set_value();

Expand Down
36 changes: 20 additions & 16 deletions src/v/cluster/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
#include "vlog.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/core/sleep.hh>
#include <seastar/util/bool_class.hh>
#include <seastar/util/defer.hh>

#include <algorithm>
#include <optional>

namespace cluster {

Expand Down Expand Up @@ -599,15 +601,6 @@ archival_metadata_stm::archival_metadata_stm(
, _cloud_storage_api(remote)
, _feature_table(ft) {}

archival_metadata_stm::~archival_metadata_stm() {
// Last replicate future is an internal barrier for sync operations. Ignore
// its value if we're shutting down to prevent seastar logging warnings for
// unhandled exception.
if (_last_replicate.has_value()) {
_last_replicate->ignore_ready_future();
}
}

ss::future<std::error_code> archival_metadata_stm::truncate(
model::offset start_rp_offset,
ss::lowres_clock::time_point deadline,
Expand Down Expand Up @@ -696,18 +689,16 @@ archival_metadata_stm::sync(model::timeout_clock::duration timeout) {
// below. If replication failed, exit unsuccessfully.

if (_last_replicate) {
auto fut = std::exchange(_last_replicate, std::nullopt).value();

if (!fut.available()) {
if (!_last_replicate->result.available()) {
vlog(
_logger.debug, "Waiting for ongoing replication before syncing");
}

try {
const auto before = model::timeout_clock::now();

const auto res = co_await ss::with_timeout(
before + timeout, std::move(fut));
const auto res = co_await _last_replicate->result.get_future(
before + timeout);

const auto after = model::timeout_clock::now();
// Update the timeout whille accounting for under/overflow.
Expand All @@ -721,11 +712,23 @@ archival_metadata_stm::sync(model::timeout_clock::duration timeout) {

timeout -= duration;

// If we've got this far it means that the _last_replicate future
// was resolved. If it resolved successfully, then we can continue.
// If it failed for any reason, it is safe to "forget" about it in
// only if the term changed. Otherwise we can't make any assumptions
// about its state.
// Stepping down (liveness) is guaranteed by the logic in the
// `do_replicate_commands` method.
if (res || _last_replicate->term < _raft->term()) {
_last_replicate = std::nullopt;
}

if (!res) {
vlog(
_logger.warn,
"Replication failed for archival STM command: {}",
res.error());

co_return false;
}
} catch (const ss::timed_out_error&) {
Expand Down Expand Up @@ -806,8 +809,9 @@ ss::future<std::error_code> archival_metadata_stm::do_replicate_commands(

const auto current_term = _insync_term;

ss::promise<result<raft::replicate_result>> replication_promise;
_last_replicate = replication_promise.get_future();
ss::shared_promise<result<raft::replicate_result>> replication_promise;
_last_replicate = last_replicate{
.term = current_term, .result = replication_promise.get_shared_future()};

auto fut
= _raft
Expand Down
8 changes: 6 additions & 2 deletions src/v/cluster/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/types.h"
#include "features/fwd.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/record.h"
#include "raft/persisted_stm.h"
Expand Down Expand Up @@ -111,7 +112,6 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
features::feature_table&,
ss::logger& logger,
ss::shared_ptr<util::mem_tracker> partition_mem_tracker = nullptr);
~archival_metadata_stm() override;

/// Add segments to the raft log, replicate them and
/// wait until it is applied to the STM.
Expand Down Expand Up @@ -332,7 +332,11 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
model::offset _last_dirty_at;

// The last replication future
std::optional<ss::future<result<raft::replicate_result>>> _last_replicate;
struct last_replicate {
model::term_id term;
ss::shared_future<result<raft::replicate_result>> result;
};
std::optional<last_replicate> _last_replicate;

cloud_storage::remote& _cloud_storage_api;
features::feature_table& _feature_table;
Expand Down

0 comments on commit 5f34eed

Please sign in to comment.