Skip to content

Commit

Permalink
Triage raft repl issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
sanebay committed Jul 11, 2024
1 parent 4e73b8b commit 7577ab8
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 14 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def build_requirements(self):
def requirements(self):
self.requires("iomgr/[~11.3, include_prerelease=True]@oss/master")
self.requires("sisl/[~12.2, include_prerelease=True]@oss/master")
self.requires("nuraft_mesg/[^3.4, include_prerelease=True]@oss/main")
self.requires("nuraft_mesg/[3.5.2]@oss/main")

self.requires("farmhash/cci.20190513@")
if self.settings.arch in ['x86', 'x86_64']:
Expand Down
2 changes: 1 addition & 1 deletion src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void LogDev::start(bool format) {
m_last_flush_idx = m_log_idx - 1;
}

start_timer();
// start_timer();
handle_unopened_log_stores(format);

{
Expand Down
13 changes: 11 additions & 2 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,18 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
// leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a
// high possibility the log entry is already flushed. Skip it for rreq == nullptr which is the case for raft
// config entries.
if ((rreq == nullptr) || rreq->is_proposer()) { continue; }
if ((rreq == nullptr) /*|| rreq->is_proposer()*/) { continue; }
reqs->emplace_back(std::move(rreq));
}

RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count,
LOGINFO("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count,
reqs->size());

for (auto const& rreq : *reqs) {
if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; }
LOGINFO("Raft Channel: Data before future wait: rreq=[{}]", rreq->to_compact_string());
}

// All requests are from proposer for data write, so as mentioned above we can skip the flush for now
if (!reqs->empty()) {
// Check the map if data corresponding to all of these requsts have been received and written. If not, schedule
Expand All @@ -73,6 +78,10 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
for (auto const& rreq : *reqs) {
if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); }
}
for (auto const& rreq : *reqs) {
if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; }
LOGINFO("Raft Channel: Data after future wait: rreq=[{}]", rreq->to_compact_string());
}
}
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
}
Expand Down
27 changes: 20 additions & 7 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk
m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id();
}
m_rd_sb.write();
LOGINFO("{}", __FUNCTION__);
}

RD_LOG(INFO,
Expand Down Expand Up @@ -211,7 +212,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list
flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t),
PushDataRequestTypeTable()));*/

RD_LOGD("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());
LOGINFO("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());

group_msg_service()
->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->m_pkts)
Expand All @@ -224,7 +225,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list
return;
}
// Release the buffer which holds the packets
RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string());
LOGINFO("Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string());
rreq->release_fb_builder();
rreq->m_pkts.clear();
});
Expand Down Expand Up @@ -260,8 +261,11 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d
return;
}

LOGINFO("Data Channel: Data Write started rreq=[{}]", rreq->to_compact_string());

if (!rreq->save_pushed_data(rpc_data, incoming_buf.cbytes() + fb_size, push_req->data_size())) {
RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_compact_string());
RD_LOG(ERROR, "Data Channel: Data already received for rreq=[{}], ignoring this data",
rreq->to_compact_string());
return;
}

Expand All @@ -276,7 +280,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d
} else {
rreq->add_state(repl_req_state_t::DATA_WRITTEN);
rreq->m_data_written_promise.setValue();
RD_LOGD("Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
LOGINFO("Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
}
});
}
Expand Down Expand Up @@ -382,6 +386,10 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector<

// All the entries are done already, no need to wait
if (futs.size() == 0) { return folly::makeFuture< folly::Unit >(folly::Unit{}); }
for (auto const& rreq : *rreqs) {
if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; }
LOGINFO("Raft Channel: Data future wait: rreq=[{}]", rreq->to_compact_string());
}

return folly::collectAllUnsafe(futs).thenValue([this, rreqs](auto&& e) {
#ifndef NDEBUG
Expand All @@ -390,7 +398,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector<
HS_DBG_ASSERT(rreq->has_state(repl_req_state_t::DATA_WRITTEN),
"Data written promise raised without updating DATA_WRITTEN state for rkey={}",
rreq->rkey().to_string());
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string());
LOGINFO("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string());
}
#endif
RD_LOGT("Data Channel: {} pending reqs's data are written", rreqs->size());
Expand Down Expand Up @@ -473,7 +481,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
if (rreqs.size() == 0) { return; }

std::vector<::flatbuffers::Offset< RequestEntry > > entries;
std::vector< ::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs.size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
Expand Down Expand Up @@ -867,12 +875,14 @@ void RaftReplDev::save_config(const nuraft::cluster_config& config) {
std::unique_lock lg{m_config_mtx};
(*m_raft_config_sb)["config"] = serialize_cluster_config(config);
m_raft_config_sb.write();
LOGINFO("{}", __FUNCTION__);
}

void RaftReplDev::save_state(const nuraft::srv_state& state) {
std::unique_lock lg{m_config_mtx};
(*m_raft_config_sb)["state"] = nlohmann::json{{"term", state.get_term()}, {"voted_for", state.get_voted_for()}};
m_raft_config_sb.write();
LOGINFO("{}", __FUNCTION__);
}

nuraft::ptr< nuraft::srv_state > RaftReplDev::read_state() {
Expand Down Expand Up @@ -930,6 +940,7 @@ void RaftReplDev::leave() {
// post restart.
m_rd_sb->destroy_pending = 0x1;
m_rd_sb.write();
LOGINFO("{}", __FUNCTION__);

RD_LOGI("RaftReplDev leave group");
m_destroy_promise.setValue(ReplServiceError::OK); // In case proposer is waiting for the destroy to complete
Expand Down Expand Up @@ -978,6 +989,7 @@ void RaftReplDev::flush_durable_commit_lsn() {
std::unique_lock lg{m_sb_mtx};
m_rd_sb->durable_commit_lsn = lsn;
m_rd_sb.write();
LOGINFO("{}", __FUNCTION__);
}

/////////////////////////////////// Private metohds ////////////////////////////////////
Expand All @@ -996,6 +1008,7 @@ void RaftReplDev::cp_flush(CP*) {
m_rd_sb->checkpoint_lsn = lsn;
m_rd_sb->last_applied_dsn = m_next_dsn.load();
m_rd_sb.write();
LOGINFO("{}", __FUNCTION__);
m_last_flushed_commit_lsn = lsn;
}

Expand Down Expand Up @@ -1045,7 +1058,7 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
nuraft::log_entry const* lentry = r_cast< nuraft::log_entry const* >(buf.bytes());

// TODO: Handle the case where the log entry is not app_log, example config logs
if(lentry->get_val_type() != nuraft::log_val_type::app_log) { return; }
if (lentry->get_val_type() != nuraft::log_val_type::app_log) { return; }

repl_journal_entry* jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin());
RELEASE_ASSERT_EQ(jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR,
Expand Down
7 changes: 5 additions & 2 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include "service/raft_repl_service.h"
#include "repl_dev/raft_state_machine.h"
#include "repl_dev/raft_repl_dev.h"
#include <homestore/homestore.hpp>
#include "common/homestore_config.hpp"

SISL_LOGGING_DECL(replication)

Expand Down Expand Up @@ -188,10 +190,11 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id(), params.data->size());
repl_req_ptr_t rreq = lsn_to_req(lsn);
if (!rreq) { RD_LOGD("Raft channel got null rreq"); }
RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string());

if (rreq->is_proposer()) {
LOGINFO("Leader Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string());
// This is the time to ensure flushing of journal happens in the proposer
if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); }
// if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); }
rreq->add_state(repl_req_state_t::LOG_FLUSHED);
}

Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class HSTestHelper {
/* create files */
LOGINFO("creating {} device files with each of size {} ", ndevices, homestore::in_bytes(dev_size));
for (uint32_t i{0}; i < ndevices; ++i) {
s_dev_names.emplace_back(std::string{"/tmp/" + token.name_ + "_" + std::to_string(i + 1)});
s_dev_names.emplace_back(std::string{"/tmp/source/tests/" + token.name_ + "_" + std::to_string(i + 1)});
}

if (!fake_restart && init_device) { init_files(s_dev_names, dev_size); }
Expand Down

0 comments on commit 7577ab8

Please sign in to comment.