Skip to content

Commit

Permalink
Merge branch 'master' into fix-some-data-races
Browse files Browse the repository at this point in the history
  • Loading branch information
greensky00 authored Dec 3, 2024
2 parents ccc6c30 + 149cd03 commit 0e58d5c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
run: ./github_action_build.sh

- name: Code Coverage Metrics
uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # pin@v3
uses: codecov/codecov-action@015f24e6818733317a2da2edd6290ab26238649a # pin@v3
with:
verbose: true
files: ./build/raft_cov.info.cleaned
2 changes: 1 addition & 1 deletion include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -1460,7 +1460,7 @@ protected:
/**
* Lock of handling client request and role change.
*/
std::mutex cli_lock_;
std::recursive_mutex cli_lock_;

/**
* Condition variable to invoke BG commit thread.
Expand Down
10 changes: 5 additions & 5 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1813,12 +1813,12 @@ class asio_rpc_client

// 3) Context.
assert(remaining_len >= 0);
if (remaining_len) {
size_t ctx_len = remaining_len;
if (flags & INCLUDE_RESULT_CODE) {
ctx_len -= sizeof(int32_t);
}
if (ctx_len) {
// It has context, read it.
size_t ctx_len = remaining_len;
if (flags & INCLUDE_RESULT_CODE) {
ctx_len -= sizeof(int32_t);
}
ptr<buffer> actual_ctx = buffer::alloc(ctx_len);
bs.get_buffer(actual_ctx);
rsp->set_ctx(actual_ctx);
Expand Down
2 changes: 1 addition & 1 deletion src/handle_client_request.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ ptr<resp_msg> raft_server::handle_cli_req_prelock(req_msg& req,
case raft_params::dual_mutex:
default: {
// TODO: Use RW lock here.
auto_lock(cli_lock_);
recur_lock(cli_lock_);
resp = handle_cli_req(req, ext_params, timestamp_us);
break;
}
Expand Down
36 changes: 33 additions & 3 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ void raft_server::become_leader() {
}

ptr<raft_params> params = ctx_->get_params();
{ auto_lock(cli_lock_);
{ recur_lock(cli_lock_);
role_ = srv_role::leader;
leader_ = id_;
srv_to_join_.reset();
Expand Down Expand Up @@ -1397,7 +1397,7 @@ bool raft_server::request_leadership() {
void raft_server::become_follower() {
// stop hb for all peers
p_in("[BECOME FOLLOWER] term %" PRIu64 "", state_->get_term());
{ std::lock_guard<std::mutex> ll(cli_lock_);
{ std::lock_guard<std::recursive_mutex> ll(cli_lock_);
for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
it->second->enable_hb(false);
it->second->reset_stream();
Expand Down Expand Up @@ -1455,7 +1455,7 @@ bool raft_server::update_term(ulong term) {
//
// To avoid this issue, we acquire `cli_lock_`,
// and change `role_` first before setting the term.
std::lock_guard<std::mutex> ll(cli_lock_);
std::lock_guard<std::recursive_mutex> ll(cli_lock_);
role_ = srv_role::follower;
state_->set_term(term);
}
Expand Down Expand Up @@ -1773,6 +1773,36 @@ ulong raft_server::store_log_entry(ptr<log_entry>& entry, ulong index) {
}

if ( role_ == srv_role::leader ) {
// WARNING:
// Configuration changes, such as adding or removing a member,
// can run concurrently with normal log appending operations. This
// concurrency can lead to an inversion issue between precommit and
// commit orders, particularly when there is only one member in the
// cluster.
//
// Let's consider the following scenario: T1 is the thread handling
// log appending, T2 is the thread processing configuration changes,
// and T3 is the commit thread.
// The initial precommit and commit index is 10.
//
// [T1] Acquires `cli_lock_` and enters `handle_cli_req()`.
// [T1] Appends a log at index 11 by calling `store_log_entry()`.
// [T2] Appends a log at index 12 by calling `store_log_entry()`.
// [T2] Calls `try_update_precommit_index()`,
// updating the precommit index to 12.
// [T2] Calls `request_append_entries()` and `commit()`,
// updating the commit index to 12.
// [T3] Calls `state_machine::commit()` for logs 11 and 12.
// [T1] Calls `state_machine::pre_commit()` for log 11.
// => order inversion happens here.
//
// To prevent this inversion, T2 should acquire the same `cli_lock_`
// before calling `try_update_precommit_index()`. This ensures that T2
// cannot update the precommit index between T1's `store_log_entry()`
// and `state_machine::pre_commit()` calls, maintaining the correct
// order of operations.
recur_lock(cli_lock_);

// Need to progress precommit index for config.
try_update_precommit_index(log_index);
}
Expand Down

0 comments on commit 0e58d5c

Please sign in to comment.