From 5d8d052cd94d5cb969cb187da1de715766e6da87 Mon Sep 17 00:00:00 2001 From: Sanal P Date: Sun, 30 Jun 2024 16:50:27 -0700 Subject: [PATCH] Triage raft repl issue. --- conanfile.py | 2 +- src/include/homestore/logstore/log_store.hpp | 1 + src/lib/common/homestore_config.fbs | 10 +++--- src/lib/device/journal_vdev.cpp | 2 +- src/lib/logstore/log_dev.cpp | 6 ++-- src/lib/logstore/log_store.cpp | 32 ++++++++++++++++--- .../log_store/home_raft_log_store.cpp | 14 ++++++-- .../replication/log_store/repl_log_store.cpp | 29 ++++++++++++++--- .../replication/repl_dev/raft_repl_dev.cpp | 26 +++++++++++---- .../repl_dev/raft_state_machine.cpp | 7 ++-- .../test_common/homestore_test_common.hpp | 2 +- 11 files changed, 101 insertions(+), 30 deletions(-) diff --git a/conanfile.py b/conanfile.py index 6787c8b9c..775c2d0a8 100644 --- a/conanfile.py +++ b/conanfile.py @@ -56,7 +56,7 @@ def build_requirements(self): def requirements(self): self.requires("iomgr/[~11.3, include_prerelease=True]@oss/master") self.requires("sisl/[~12.2, include_prerelease=True]@oss/master") - self.requires("nuraft_mesg/[^3.4, include_prerelease=True]@oss/main") + self.requires("nuraft_mesg/[3.5.3]@oss/main") self.requires("farmhash/cci.20190513@") if self.settings.arch in ['x86', 'x86_64']: diff --git a/src/include/homestore/logstore/log_store.hpp b/src/include/homestore/logstore/log_store.hpp index 6c0b493ec..eec7eac50 100644 --- a/src/include/homestore/logstore/log_store.hpp +++ b/src/include/homestore/logstore/log_store.hpp @@ -368,6 +368,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > { // Sync flush sections std::atomic< logstore_seq_num_t > m_sync_flush_waiter_lsn{invalid_lsn()}; std::mutex m_sync_flush_mtx; + std::mutex m_single_sync_flush_mtx; std::condition_variable m_sync_flush_cv; std::vector< seq_ld_key_pair > m_truncation_barriers; // List of truncation barriers diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index c283bc9ba..34a95f78f 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -172,8 +172,8 @@ table ResourceLimits { /* num entries that raft logstore wants to reserve -- its truncate should not across this */ /* 0 means HomeStore doesn't reserve anything and let nuraft controlls the truncation */ - raft_logstore_reserve_threshold: uint32 = 0 (hotswap); - + raft_logstore_reserve_threshold: uint32 = 0 (hotswap); + /* resource audit timer in ms */ resource_audit_timer_ms: uint32 = 120000; @@ -212,7 +212,7 @@ table Consensus { rpc_backoff_ms: uint32 = 250; // Frequency of Raft heartbeat - heartbeat_period_ms: uint32 = 250; + heartbeat_period_ms: uint32 = 500; // Re-election timeout low and high mark elect_to_low_ms: uint32 = 800; @@ -238,10 +238,10 @@ table Consensus { // Minimum log gap a replica has to be from leader before joining the replica set. min_log_gap_to_join: int32 = 30; - + // amount of time in millis to wait on data write before fetch data from remote; wait_data_write_timer_ms: uint64 = 1500 (hotswap); - + // Leadership expiry (=0 indicates 20 times heartbeat period), set -1 to never expire leadership_expiry_ms: int32 = 0; diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index d82feb5bf..a018bba26 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -284,7 +284,7 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) { // update reserved size; m_reserved_sz += sz; - high_watermark_check(); + // high_watermark_check(); // assert that returnning logical offset is in good range HS_DBG_ASSERT_LE(tail_off, m_end_offset); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 4ae040ae9..cc72dcbed 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -86,7 +86,7 @@ void LogDev::start(bool format) { m_last_flush_idx = m_log_idx - 1; } - start_timer(); + // start_timer(); handle_unopened_log_stores(format); { @@ -265,7 +265,7 @@ int64_t LogDev::append_async(const logstore_id_t store_id, const logstore_seq_nu if (flush_wait || ((prev_size < threshold_size && ((prev_size + data.size()) >= threshold_size) && !m_is_flushing.load(std::memory_order_relaxed)))) { - flush_if_needed(flush_wait ? 1 : -1); + // flush_if_needed(flush_wait ? 1 : -1); } return idx; } @@ -476,6 +476,8 @@ void LogDev::on_flush_completion(LogGroup* lg) { lg->m_post_flush_msg_rcvd_time = Clock::now(); THIS_LOGDEV_LOG(TRACE, "Flush completed for logid[{} - {}]", lg->m_flush_log_idx_from, lg->m_flush_log_idx_upto); + LOGINFO("Flush completed for logid[{} - {}]", lg->m_flush_log_idx_from, lg->m_flush_log_idx_upto); + m_log_records->complete(lg->m_flush_log_idx_from, lg->m_flush_log_idx_upto); m_last_flush_idx = lg->m_flush_log_idx_upto; const auto flush_ld_key = logdev_key{m_last_flush_idx, lg->m_log_dev_offset + lg->header()->total_size()}; diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index bd60291c6..2fbc0973a 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -48,7 +48,7 @@ HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, b } bool HomeLogStore::write_sync(logstore_seq_num_t seq_num, const sisl::io_blob& b) { - HS_LOG_ASSERT((!iomanager.am_i_worker_reactor()), "Sync can not be done in worker reactor thread"); + // HS_LOG_ASSERT((!iomanager.am_i_worker_reactor()), "Sync can not be done in worker reactor thread"); // these should be static so that they stay in scope in the lambda in case function ends before lambda completes struct Context { @@ -123,6 +123,13 @@ logstore_seq_num_t HomeLogStore::append_async(const sisl::io_blob& b, void* cook return seq_num; } +logstore_seq_num_t HomeLogStore::append_sync(const sisl::io_blob& b) { + HS_DBG_ASSERT_EQ(m_append_mode, true, "append_async can be called only on append only mode"); + const auto seq_num = m_seq_num.fetch_add(1, std::memory_order_acq_rel); + write_sync(seq_num, b); + return seq_num; +} + log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { // If seq_num has not been flushed yet, but issued, then we flush them before reading auto const s = m_records.status(seq_num); @@ -173,10 +180,11 @@ void HomeLogStore::read_async(logstore_seq_num_t seq_num, void* cookie, const lo #endif void HomeLogStore::on_write_completion(logstore_req* req, const logdev_key& ld_key) { + std::unique_lock lk(m_sync_flush_mtx); // Upon completion, create the mapping between seq_num and log dev key m_records.update(req->seq_num, [&](logstore_record& rec) -> bool { rec.m_dev_key = ld_key; - // THIS_LOGSTORE_LOG(DEBUG, "Completed write of lsn {} logdev_key={}", req->seq_num, ld_key); + LOGINFO("Completed write of store lsn {} logdev_key={}", req->seq_num, ld_key); return true; }); // assert(flush_ld_key.idx >= m_last_flush_ldkey.idx); @@ -402,10 +410,18 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) { HS_DBG_ASSERT_EQ(LogDev::can_flush_in_this_thread(), false, "Logstore flush sync cannot be called on same thread which could do logdev flush"); + std::unique_lock lk(m_single_sync_flush_mtx); + // HS_DBG_ASSERT_EQ(m_flush_twice.load(), false, "m_flush_twice is not false"); + // m_flush_twice = true; if (upto_seq_num == invalid_lsn()) { upto_seq_num = m_records.active_upto(); } // if we have flushed already, we are done - if (!m_records.status(upto_seq_num).is_active) { return; } + if (!m_records.status(upto_seq_num).is_active) { + auto s = m_records.status(upto_seq_num); + LOGINFO("not is_active {}, not flushing is_active {}, is_complete {}, is_hole {}, is_out_of_range {}", + upto_seq_num, s.is_active, s.is_completed, s.is_hole, s.is_out_of_range); + return; + } { std::unique_lock lk(m_sync_flush_mtx); @@ -416,17 +432,25 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) { // Step 2: After marking this lsn, we again do a check, to avoid a race where completion checked for no lsn // and the lsn is stored in step 1 above. - if (!m_records.status(upto_seq_num).is_active) { return; } + if (!m_records.status(upto_seq_num).is_active) { + auto s = m_records.status(upto_seq_num); + LOGINFO("not is_active {}, not flushing is_active {}, is_complete {}, is_hole {}, is_out_of_range {}", + upto_seq_num, s.is_active, s.is_completed, s.is_hole, s.is_out_of_range); + return; + } // Step 3: Force a flush (with least threshold) m_logdev->flush_if_needed(1); // Step 4: Wait for completion + LOGINFO("flush_sync wait for not active {}", upto_seq_num); m_sync_flush_cv.wait(lk, [this, upto_seq_num] { return !m_records.status(upto_seq_num).is_active; }); // NOTE: We are not resetting the lsn because same seq number should never have 2 completions and thus not // doing it saves an atomic instruction + LOGINFO("flush_sync over {}", upto_seq_num); } + // m_flush_twice = false; } uint64_t HomeLogStore::rollback_async(logstore_seq_num_t to_lsn, on_rollback_cb_t cb) { diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 80e08d809..1536128b4 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -130,7 +130,7 @@ ulong HomeRaftLogStore::next_slot() const { } ulong HomeRaftLogStore::last_index() const { - uint64_t last_index = m_log_store->get_contiguous_completed_seq_num(m_last_durable_lsn); + uint64_t last_index = to_repl_lsn(m_log_store->get_contiguous_completed_seq_num(m_last_durable_lsn)); return last_index; } @@ -160,9 +160,14 @@ ulong HomeRaftLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) { REPL_STORE_LOG(TRACE, "append entry term={}, log_val_type={} size={}", entry->get_term(), static_cast< uint32_t >(entry->get_val_type()), entry->get_buf().size()); auto buf = entry->serialize(); +#if 1 auto const next_seq = m_log_store->append_async(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */}, nullptr /* cookie */, [buf](int64_t, sisl::io_blob&, logdev_key, void*) {}); +#else + auto const next_seq = + m_log_store->append_sync(sisl::io_blob{buf->data_begin(), uint32_cast(buf->size()), false /* is_aligned */}); +#endif return to_repl_lsn(next_seq); } @@ -189,9 +194,14 @@ nuraft::ptr< std::vector< nuraft::ptr< nuraft::log_entry > > > HomeRaftLogStore: auto out_vec = std::make_shared< std::vector< nuraft::ptr< nuraft::log_entry > > >(); m_log_store->foreach (to_store_lsn(start), [end, &out_vec](store_lsn_t cur, const log_buffer& entry) -> bool { bool ret = (cur < to_store_lsn(end) - 1); - if (cur < to_store_lsn(end)) { out_vec->emplace_back(to_nuraft_log_entry(entry)); } + if (cur < to_store_lsn(end)) { + out_vec->emplace_back(to_nuraft_log_entry(entry)); + LOGINFO("HomeRaftLogStore::log_entries {}", to_repl_lsn(cur)); + } return ret; }); + + LOGINFO("HomeRaftLogStore::log_entries start {} end {} size {}", start, end, out_vec->size()); return out_vec; } diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index c678a90d4..059041617 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -10,7 +10,7 @@ uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) { // We don't want to transform anything that is not an app log if (entry->get_val_type() != nuraft::log_val_type::app_log) { ulong lsn = HomeRaftLogStore::append(entry); - RD_LOGD("append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(), + LOGINFO("append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(), static_cast< uint32_t >(entry->get_val_type()), lsn, entry->get_buf().size()); return lsn; } @@ -19,7 +19,7 @@ uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) { ulong lsn = HomeRaftLogStore::append(entry); m_sm.link_lsn_to_req(rreq, int64_cast(lsn)); - RD_LOGD("Raft Channel: Received append log entry rreq=[{}]", rreq->to_compact_string()); + LOGINFO("Raft Channel: Received append log entry rreq=[{}]", rreq->to_compact_string()); return lsn; } @@ -41,17 +41,23 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { // Start fetch the batch of data for this lsn range from remote if its not available yet. auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); + auto proposer_reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) { auto rreq = m_sm.lsn_to_req(lsn); // Skip this call in proposer, since this method will synchronously flush the data, which is not required for // leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a // high possibility the log entry is already flushed. Skip it for rreq == nullptr which is the case for raft // config entries. - if ((rreq == nullptr) || rreq->is_proposer()) { continue; } - reqs->emplace_back(std::move(rreq)); + if ((rreq == nullptr) /*|| rreq->is_proposer()*/) { + continue; + } else if (rreq->is_proposer()) { + proposer_reqs->emplace_back(std::move(rreq)); + } else { + reqs->emplace_back(std::move(rreq)); + } } - RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count, + LOGINFO("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count, reqs->size()); // All requests are from proposer for data write, so as mentioned above we can skip the flush for now @@ -73,8 +79,21 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { for (auto const& rreq : *reqs) { if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); } } + for (auto const& rreq : *reqs) { + if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; } + LOGINFO("Raft Channel: Data after future wait: rreq=[{}]", rreq->to_compact_string()); + } + } else if (!proposer_reqs->empty()) { + LOGINFO("Raft Channel: end_of_append_batch, I am proposer, only flush log s from {} , count {}", start_lsn, + count); + // Mark all the reqs also completely written + HomeRaftLogStore::end_of_append_batch(start_lsn, count); + for (auto const& rreq : *proposer_reqs) { + if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); } + } } sisl::VectorPool< repl_req_ptr_t >::free(reqs); + sisl::VectorPool< repl_req_ptr_t >::free(proposer_reqs); } std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index d902bce52..f8750f44b 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -211,7 +211,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t), PushDataRequestTypeTable()));*/ - RD_LOGD("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string()); + LOGINFO("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string()); group_msg_service() ->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->m_pkts) @@ -224,7 +224,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list return; } // Release the buffer which holds the packets - RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string()); + LOGINFO("Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string()); rreq->release_fb_builder(); rreq->m_pkts.clear(); }); @@ -260,8 +260,11 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d return; } + LOGINFO("Data Channel: Data Write started rreq=[{}]", rreq->to_compact_string()); + if (!rreq->save_pushed_data(rpc_data, incoming_buf.cbytes() + fb_size, push_req->data_size())) { - RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_compact_string()); + RD_LOG(ERROR, "Data Channel: Data already received for rreq=[{}], ignoring this data", + rreq->to_compact_string()); return; } @@ -276,7 +279,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d } else { rreq->add_state(repl_req_state_t::DATA_WRITTEN); rreq->m_data_written_promise.setValue(); - RD_LOGD("Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); + LOGINFO("Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); } }); } @@ -382,6 +385,10 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector< // All the entries are done already, no need to wait if (futs.size() == 0) { return folly::makeFuture< folly::Unit >(folly::Unit{}); } + for (auto const& rreq : *rreqs) { + if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; } + LOGINFO("Raft Channel: Data future wait: rreq=[{}]", rreq->to_compact_string()); + } return folly::collectAllUnsafe(futs).thenValue([this, rreqs](auto&& e) { #ifndef NDEBUG @@ -390,7 +397,7 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector< HS_DBG_ASSERT(rreq->has_state(repl_req_state_t::DATA_WRITTEN), "Data written promise raised without updating DATA_WRITTEN state for rkey={}", rreq->rkey().to_string()); - RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string()); + LOGINFO("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string()); } #endif RD_LOGT("Data Channel: {} pending reqs's data are written", rreqs->size()); @@ -473,7 +480,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -867,12 +874,14 @@ void RaftReplDev::save_config(const nuraft::cluster_config& config) { std::unique_lock lg{m_config_mtx}; (*m_raft_config_sb)["config"] = serialize_cluster_config(config); m_raft_config_sb.write(); + LOGINFO("{}", __FUNCTION__); } void RaftReplDev::save_state(const nuraft::srv_state& state) { std::unique_lock lg{m_config_mtx}; (*m_raft_config_sb)["state"] = nlohmann::json{{"term", state.get_term()}, {"voted_for", state.get_voted_for()}}; m_raft_config_sb.write(); + LOGINFO("{}", __FUNCTION__); } nuraft::ptr< nuraft::srv_state > RaftReplDev::read_state() { @@ -930,6 +939,7 @@ void RaftReplDev::leave() { // post restart. m_rd_sb->destroy_pending = 0x1; m_rd_sb.write(); + LOGINFO("{}", __FUNCTION__); RD_LOGI("RaftReplDev leave group"); m_destroy_promise.setValue(ReplServiceError::OK); // In case proposer is waiting for the destroy to complete @@ -978,6 +988,7 @@ void RaftReplDev::flush_durable_commit_lsn() { std::unique_lock lg{m_sb_mtx}; m_rd_sb->durable_commit_lsn = lsn; m_rd_sb.write(); + // LOGINFO("{}", __FUNCTION__); } /////////////////////////////////// Private metohds //////////////////////////////////// @@ -996,6 +1007,7 @@ void RaftReplDev::cp_flush(CP*) { m_rd_sb->checkpoint_lsn = lsn; m_rd_sb->last_applied_dsn = m_next_dsn.load(); m_rd_sb.write(); + LOGINFO("{}", __FUNCTION__); m_last_flushed_commit_lsn = lsn; } @@ -1045,7 +1057,7 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx nuraft::log_entry const* lentry = r_cast< nuraft::log_entry const* >(buf.bytes()); // TODO: Handle the case where the log entry is not app_log, example config logs - if(lentry->get_val_type() != nuraft::log_val_type::app_log) { return; } + if (lentry->get_val_type() != nuraft::log_val_type::app_log) { return; } repl_journal_entry* jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin()); RELEASE_ASSERT_EQ(jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index a805b229b..e867f77ce 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -7,6 +7,8 @@ #include "service/raft_repl_service.h" #include "repl_dev/raft_state_machine.h" #include "repl_dev/raft_repl_dev.h" +#include +#include "common/homestore_config.hpp" SISL_LOGGING_DECL(replication) @@ -188,10 +190,11 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id(), params.data->size()); repl_req_ptr_t rreq = lsn_to_req(lsn); if (!rreq) { RD_LOGD("Raft channel got null rreq"); } - RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); + if (rreq->is_proposer()) { + LOGINFO("Leader Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); // This is the time to ensure flushing of journal happens in the proposer - if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); } + // if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); } rreq->add_state(repl_req_state_t::LOG_FLUSHED); } diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index acf447e76..16150c866 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -255,7 +255,7 @@ class HSTestHelper { /* create files */ LOGINFO("creating {} device files with each of size {} ", ndevices, homestore::in_bytes(dev_size)); for (uint32_t i{0}; i < ndevices; ++i) { - s_dev_names.emplace_back(std::string{"/tmp/" + token.name_ + "_" + std::to_string(i + 1)}); + s_dev_names.emplace_back(std::string{"/tmp/source/tests/" + token.name_ + "_" + std::to_string(i + 1)}); } if (!fake_restart && init_device) { init_files(s_dev_names, dev_size); }