Skip to content

Commit

Permalink
Triage raft repl issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
sanebay committed Jul 15, 2024
1 parent 4e73b8b commit 5d8d052
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 30 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def build_requirements(self):
def requirements(self):
self.requires("iomgr/[~11.3, include_prerelease=True]@oss/master")
self.requires("sisl/[~12.2, include_prerelease=True]@oss/master")
self.requires("nuraft_mesg/[^3.4, include_prerelease=True]@oss/main")
self.requires("nuraft_mesg/[3.5.3]@oss/main")

self.requires("farmhash/cci.20190513@")
if self.settings.arch in ['x86', 'x86_64']:
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/logstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
// Sync flush sections
std::atomic< logstore_seq_num_t > m_sync_flush_waiter_lsn{invalid_lsn()};
std::mutex m_sync_flush_mtx;
std::mutex m_single_sync_flush_mtx;
std::condition_variable m_sync_flush_cv;

std::vector< seq_ld_key_pair > m_truncation_barriers; // List of truncation barriers
Expand Down
10 changes: 5 additions & 5 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ table ResourceLimits {

/* num entries that raft logstore wants to reserve -- its truncate should not across this */
/* 0 means HomeStore doesn't reserve anything and let nuraft controlls the truncation */
raft_logstore_reserve_threshold: uint32 = 0 (hotswap);
raft_logstore_reserve_threshold: uint32 = 0 (hotswap);

/* resource audit timer in ms */
resource_audit_timer_ms: uint32 = 120000;

Expand Down Expand Up @@ -212,7 +212,7 @@ table Consensus {
rpc_backoff_ms: uint32 = 250;

// Frequency of Raft heartbeat
heartbeat_period_ms: uint32 = 250;
heartbeat_period_ms: uint32 = 500;

// Re-election timeout low and high mark
elect_to_low_ms: uint32 = 800;
Expand All @@ -238,10 +238,10 @@ table Consensus {

// Minimum log gap a replica has to be from leader before joining the replica set.
min_log_gap_to_join: int32 = 30;

// amount of time in millis to wait on data write before fetch data from remote;
wait_data_write_timer_ms: uint64 = 1500 (hotswap);

// Leadership expiry (=0 indicates 20 times heartbeat period), set -1 to never expire
leadership_expiry_ms: int32 = 0;

Expand Down
2 changes: 1 addition & 1 deletion src/lib/device/journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) {
// update reserved size;
m_reserved_sz += sz;

high_watermark_check();
// high_watermark_check();

// assert that returnning logical offset is in good range
HS_DBG_ASSERT_LE(tail_off, m_end_offset);
Expand Down
6 changes: 4 additions & 2 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void LogDev::start(bool format) {
m_last_flush_idx = m_log_idx - 1;
}

start_timer();
// start_timer();
handle_unopened_log_stores(format);

{
Expand Down Expand Up @@ -265,7 +265,7 @@ int64_t LogDev::append_async(const logstore_id_t store_id, const logstore_seq_nu
if (flush_wait ||
((prev_size < threshold_size && ((prev_size + data.size()) >= threshold_size) &&
!m_is_flushing.load(std::memory_order_relaxed)))) {
flush_if_needed(flush_wait ? 1 : -1);
// flush_if_needed(flush_wait ? 1 : -1);
}
return idx;
}
Expand Down Expand Up @@ -476,6 +476,8 @@ void LogDev::on_flush_completion(LogGroup* lg) {
lg->m_post_flush_msg_rcvd_time = Clock::now();
THIS_LOGDEV_LOG(TRACE, "Flush completed for logid[{} - {}]", lg->m_flush_log_idx_from, lg->m_flush_log_idx_upto);

LOGINFO("Flush completed for logid[{} - {}]", lg->m_flush_log_idx_from, lg->m_flush_log_idx_upto);

m_log_records->complete(lg->m_flush_log_idx_from, lg->m_flush_log_idx_upto);
m_last_flush_idx = lg->m_flush_log_idx_upto;
const auto flush_ld_key = logdev_key{m_last_flush_idx, lg->m_log_dev_offset + lg->header()->total_size()};
Expand Down
32 changes: 28 additions & 4 deletions src/lib/logstore/log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, b
}

bool HomeLogStore::write_sync(logstore_seq_num_t seq_num, const sisl::io_blob& b) {
HS_LOG_ASSERT((!iomanager.am_i_worker_reactor()), "Sync can not be done in worker reactor thread");
// HS_LOG_ASSERT((!iomanager.am_i_worker_reactor()), "Sync can not be done in worker reactor thread");

// these should be static so that they stay in scope in the lambda in case function ends before lambda completes
struct Context {
Expand Down Expand Up @@ -123,6 +123,13 @@ logstore_seq_num_t HomeLogStore::append_async(const sisl::io_blob& b, void* cook
return seq_num;
}

logstore_seq_num_t HomeLogStore::append_sync(const sisl::io_blob& b) {
HS_DBG_ASSERT_EQ(m_append_mode, true, "append_async can be called only on append only mode");
const auto seq_num = m_seq_num.fetch_add(1, std::memory_order_acq_rel);
write_sync(seq_num, b);
return seq_num;
}

log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) {
// If seq_num has not been flushed yet, but issued, then we flush them before reading
auto const s = m_records.status(seq_num);
Expand Down Expand Up @@ -173,10 +180,11 @@ void HomeLogStore::read_async(logstore_seq_num_t seq_num, void* cookie, const lo
#endif

void HomeLogStore::on_write_completion(logstore_req* req, const logdev_key& ld_key) {
std::unique_lock lk(m_sync_flush_mtx);
// Upon completion, create the mapping between seq_num and log dev key
m_records.update(req->seq_num, [&](logstore_record& rec) -> bool {
rec.m_dev_key = ld_key;
// THIS_LOGSTORE_LOG(DEBUG, "Completed write of lsn {} logdev_key={}", req->seq_num, ld_key);
LOGINFO("Completed write of store lsn {} logdev_key={}", req->seq_num, ld_key);
return true;
});
// assert(flush_ld_key.idx >= m_last_flush_ldkey.idx);
Expand Down Expand Up @@ -402,10 +410,18 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) {
HS_DBG_ASSERT_EQ(LogDev::can_flush_in_this_thread(), false,
"Logstore flush sync cannot be called on same thread which could do logdev flush");

std::unique_lock lk(m_single_sync_flush_mtx);
// HS_DBG_ASSERT_EQ(m_flush_twice.load(), false, "m_flush_twice is not false");
// m_flush_twice = true;
if (upto_seq_num == invalid_lsn()) { upto_seq_num = m_records.active_upto(); }

// if we have flushed already, we are done
if (!m_records.status(upto_seq_num).is_active) { return; }
if (!m_records.status(upto_seq_num).is_active) {
auto s = m_records.status(upto_seq_num);
LOGINFO("not is_active {}, not flushing is_active {}, is_complete {}, is_hole {}, is_out_of_range {}",
upto_seq_num, s.is_active, s.is_completed, s.is_hole, s.is_out_of_range);
return;
}

{
std::unique_lock lk(m_sync_flush_mtx);
Expand All @@ -416,17 +432,25 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) {

// Step 2: After marking this lsn, we again do a check, to avoid a race where completion checked for no lsn
// and the lsn is stored in step 1 above.
if (!m_records.status(upto_seq_num).is_active) { return; }
if (!m_records.status(upto_seq_num).is_active) {
auto s = m_records.status(upto_seq_num);
LOGINFO("not is_active {}, not flushing is_active {}, is_complete {}, is_hole {}, is_out_of_range {}",
upto_seq_num, s.is_active, s.is_completed, s.is_hole, s.is_out_of_range);
return;
}

// Step 3: Force a flush (with least threshold)
m_logdev->flush_if_needed(1);

// Step 4: Wait for completion
LOGINFO("flush_sync wait for not active {}", upto_seq_num);
m_sync_flush_cv.wait(lk, [this, upto_seq_num] { return !m_records.status(upto_seq_num).is_active; });

// NOTE: We are not resetting the lsn because same seq number should never have 2 completions and thus not
// doing it saves an atomic instruction
LOGINFO("flush_sync over {}", upto_seq_num);
}
// m_flush_twice = false;
}

uint64_t HomeLogStore::rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_t cb) {
Expand Down
14 changes: 12 additions & 2 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ ulong HomeRaftLogStore::next_slot() const {
}

ulong HomeRaftLogStore::last_index() const {
uint64_t last_index = m_log_store->get_contiguous_completed_seq_num(m_last_durable_lsn);
uint64_t last_index = to_repl_lsn(m_log_store->get_contiguous_completed_seq_num(m_last_durable_lsn));
return last_index;
}

Expand Down Expand Up @@ -160,9 +160,14 @@ ulong HomeRaftLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
REPL_STORE_LOG(TRACE, "append entry term={}, log_val_type={} size={}", entry->get_term(),
static_cast< uint32_t >(entry->get_val_type()), entry->get_buf().size());
auto buf = entry->serialize();
#if 1
auto const next_seq =
m_log_store->append_async(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */},
nullptr /* cookie */, [buf](int64_t, sisl::io_blob&, logdev_key, void*) {});
#else
auto const next_seq =
m_log_store->append_sync(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */});
#endif
return to_repl_lsn(next_seq);
}

Expand All @@ -189,9 +194,14 @@ nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore:
auto out_vec = std::make_shared< std::vector< nuraft::ptr< nuraft::log_entry > > >();
m_log_store->foreach (to_store_lsn(start), [end, &out_vec](store_lsn_t cur, const log_buffer& entry) -> bool {
bool ret = (cur < to_store_lsn(end) - 1);
if (cur < to_store_lsn(end)) { out_vec->emplace_back(to_nuraft_log_entry(entry)); }
if (cur < to_store_lsn(end)) {
out_vec->emplace_back(to_nuraft_log_entry(entry));
LOGINFO("HomeRaftLogStore::log_entries {}", to_repl_lsn(cur));
}
return ret;
});

LOGINFO("HomeRaftLogStore::log_entries start {} end {} size {}", start, end, out_vec->size());
return out_vec;
}

Expand Down
29 changes: 24 additions & 5 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
// We don't want to transform anything that is not an app log
if (entry->get_val_type() != nuraft::log_val_type::app_log) {
ulong lsn = HomeRaftLogStore::append(entry);
RD_LOGD("append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(),
LOGINFO("append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(),
static_cast< uint32_t >(entry->get_val_type()), lsn, entry->get_buf().size());
return lsn;
}
Expand All @@ -19,7 +19,7 @@ uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
ulong lsn = HomeRaftLogStore::append(entry);
m_sm.link_lsn_to_req(rreq, int64_cast(lsn));

RD_LOGD("Raft Channel: Received append log entry rreq=[{}]", rreq->to_compact_string());
LOGINFO("Raft Channel: Received append log entry rreq=[{}]", rreq->to_compact_string());
return lsn;
}

Expand All @@ -41,17 +41,23 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {

// Start fetch the batch of data for this lsn range from remote if its not available yet.
auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
auto proposer_reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) {
auto rreq = m_sm.lsn_to_req(lsn);
// Skip this call in proposer, since this method will synchronously flush the data, which is not required for
// leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a
// high possibility the log entry is already flushed. Skip it for rreq == nullptr which is the case for raft
// config entries.
if ((rreq == nullptr) || rreq->is_proposer()) { continue; }
reqs->emplace_back(std::move(rreq));
if ((rreq == nullptr) /*|| rreq->is_proposer()*/) {
continue;
} else if (rreq->is_proposer()) {
proposer_reqs->emplace_back(std::move(rreq));
} else {
reqs->emplace_back(std::move(rreq));
}
}

RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count,
LOGINFO("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count,
reqs->size());

// All requests are from proposer for data write, so as mentioned above we can skip the flush for now
Expand All @@ -73,8 +79,21 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
for (auto const& rreq : *reqs) {
if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); }
}
for (auto const& rreq : *reqs) {
if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; }
LOGINFO("Raft Channel: Data after future wait: rreq=[{}]", rreq->to_compact_string());
}
} else if (!proposer_reqs->empty()) {
LOGINFO("Raft Channel: end_of_append_batch, I am proposer, only flush log s from {} , count {}", start_lsn,
count);
// Mark all the reqs also completely written
HomeRaftLogStore::end_of_append_batch(start_lsn, count);
for (auto const& rreq : *proposer_reqs) {
if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); }
}
}
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
sisl::VectorPool< repl_req_ptr_t >::free(proposer_reqs);
}

std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); }
Expand Down
26 changes: 19 additions & 7 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list
flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t),
PushDataRequestTypeTable()));*/

RD_LOGD("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());
LOGINFO("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string());

group_msg_service()
->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->m_pkts)
Expand All @@ -224,7 +224,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list
return;
}
// Release the buffer which holds the packets
RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string());
LOGINFO("Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string());
rreq->release_fb_builder();
rreq->m_pkts.clear();
});
Expand Down Expand Up @@ -260,8 +260,11 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d
return;
}

LOGINFO("Data Channel: Data Write started rreq=[{}]", rreq->to_compact_string());

if (!rreq->save_pushed_data(rpc_data, incoming_buf.cbytes() + fb_size, push_req->data_size())) {
RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_compact_string());
RD_LOG(ERROR, "Data Channel: Data already received for rreq=[{}], ignoring this data",
rreq->to_compact_string());
return;
}

Expand All @@ -276,7 +279,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d
} else {
rreq->add_state(repl_req_state_t::DATA_WRITTEN);
rreq->m_data_written_promise.setValue();
RD_LOGD("Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
LOGINFO("Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string());
}
});
}
Expand Down Expand Up @@ -382,6 +385,10 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector<

// All the entries are done already, no need to wait
if (futs.size() == 0) { return folly::makeFuture< folly::Unit >(folly::Unit{}); }
for (auto const& rreq : *rreqs) {
if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; }
LOGINFO("Raft Channel: Data future wait: rreq=[{}]", rreq->to_compact_string());
}

return folly::collectAllUnsafe(futs).thenValue([this, rreqs](auto&& e) {
#ifndef NDEBUG
Expand All @@ -390,7 +397,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector<
HS_DBG_ASSERT(rreq->has_state(repl_req_state_t::DATA_WRITTEN),
"Data written promise raised without updating DATA_WRITTEN state for rkey={}",
rreq->rkey().to_string());
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string());
LOGINFO("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string());
}
#endif
RD_LOGT("Data Channel: {} pending reqs's data are written", rreqs->size());
Expand Down Expand Up @@ -473,7 +480,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
if (rreqs.size() == 0) { return; }

std::vector<::flatbuffers::Offset< RequestEntry > > entries;
std::vector< ::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs.size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
Expand Down Expand Up @@ -867,12 +874,14 @@ void RaftReplDev::save_config(const nuraft::cluster_config& config) {
std::unique_lock lg{m_config_mtx};
(*m_raft_config_sb)["config"] = serialize_cluster_config(config);
m_raft_config_sb.write();
LOGINFO("{}", __FUNCTION__);
}

void RaftReplDev::save_state(const nuraft::srv_state& state) {
std::unique_lock lg{m_config_mtx};
(*m_raft_config_sb)["state"] = nlohmann::json{{"term", state.get_term()}, {"voted_for", state.get_voted_for()}};
m_raft_config_sb.write();
LOGINFO("{}", __FUNCTION__);
}

nuraft::ptr< nuraft::srv_state > RaftReplDev::read_state() {
Expand Down Expand Up @@ -930,6 +939,7 @@ void RaftReplDev::leave() {
// post restart.
m_rd_sb->destroy_pending = 0x1;
m_rd_sb.write();
LOGINFO("{}", __FUNCTION__);

RD_LOGI("RaftReplDev leave group");
m_destroy_promise.setValue(ReplServiceError::OK); // In case proposer is waiting for the destroy to complete
Expand Down Expand Up @@ -978,6 +988,7 @@ void RaftReplDev::flush_durable_commit_lsn() {
std::unique_lock lg{m_sb_mtx};
m_rd_sb->durable_commit_lsn = lsn;
m_rd_sb.write();
// LOGINFO("{}", __FUNCTION__);
}

/////////////////////////////////// Private metohds ////////////////////////////////////
Expand All @@ -996,6 +1007,7 @@ void RaftReplDev::cp_flush(CP*) {
m_rd_sb->checkpoint_lsn = lsn;
m_rd_sb->last_applied_dsn = m_next_dsn.load();
m_rd_sb.write();
LOGINFO("{}", __FUNCTION__);
m_last_flushed_commit_lsn = lsn;
}

Expand Down Expand Up @@ -1045,7 +1057,7 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
nuraft::log_entry const* lentry = r_cast< nuraft::log_entry const* >(buf.bytes());

// TODO: Handle the case where the log entry is not app_log, example config logs
if(lentry->get_val_type() != nuraft::log_val_type::app_log) { return; }
if (lentry->get_val_type() != nuraft::log_val_type::app_log) { return; }

repl_journal_entry* jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin());
RELEASE_ASSERT_EQ(jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR,
Expand Down
Loading

0 comments on commit 5d8d052

Please sign in to comment.