From 2301a6debf23dcb25a598604b763764ab8313d1e Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Fri, 23 Feb 2024 13:48:02 +0800 Subject: [PATCH 1/4] fix restartability bug for upper layer (#333) --- conanfile.py | 2 +- src/lib/homestore.cpp | 6 +----- src/lib/replication/service/generic_repl_svc.cpp | 3 +-- src/lib/replication/service/raft_repl_service.cpp | 3 +-- 4 files changed, 4 insertions(+), 10 deletions(-) diff --git a/conanfile.py b/conanfile.py index 369fcbbdb..61bdfa34f 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.1.6" + version = "5.1.7" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index ba6d1eaa6..6438986d3 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -126,6 +126,7 @@ bool HomeStore::start(const hs_input_params& input, hs_before_services_starting_ if (has_repl_data_service()) { m_log_service = std::make_unique< LogStoreService >(); m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector)); + m_repl_service = GenericReplService::create(std::move(s_repl_app)); } else { if (has_log_service()) { m_log_service = std::make_unique< LogStoreService >(); } if (has_data_service()) { @@ -191,11 +192,6 @@ void HomeStore::format_and_start(std::map< uint32_t, hs_format_params >&& format } void HomeStore::do_start() { - // when coming here: - // 1 if this is the first_time_boot, , the repl app already gets its uuid from upper layer - // 2 if this is not the first_time_boot, the repl app already gets its uuid from the metaservice - // now , we can safely initialize GenericReplService , which will get a correct uuid through get_my_repl_uuid() - if (has_repl_data_service()) m_repl_service = GenericReplService::create(std::move(s_repl_app)); const auto& inp_params = HomeStoreStaticConfig::instance().input; uint64_t cache_size = resource_mgr().get_cache_size(); diff --git a/src/lib/replication/service/generic_repl_svc.cpp b/src/lib/replication/service/generic_repl_svc.cpp index 240c82623..58dc6c5a0 100644 --- a/src/lib/replication/service/generic_repl_svc.cpp +++ b/src/lib/replication/service/generic_repl_svc.cpp @@ -35,8 +35,7 @@ std::shared_ptr< GenericReplService > GenericReplService::create(cshared< ReplAp } } -GenericReplService::GenericReplService(cshared< ReplApplication >& repl_app) : - m_repl_app{repl_app}, m_my_uuid{repl_app->get_my_repl_id()} { +GenericReplService::GenericReplService(cshared< ReplApplication >& repl_app) : m_repl_app{repl_app} { m_sb_bufs.reserve(100); meta_service().register_handler( get_meta_blk_name(), diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index dc4013034..c9fd1f5b3 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -71,6 +71,7 @@ RaftReplService::RaftReplService(cshared< ReplApplication >& repl_app) : Generic void RaftReplService::start() { // Step 1: Initialize the Nuraft messaging service, which starts the nuraft service + m_my_uuid = m_repl_app->get_my_repl_id(); auto params = nuraft_mesg::Manager::Params{ .server_uuid_ = m_my_uuid, .mesg_port_ = m_repl_app->lookup_peer(m_my_uuid).second, @@ -271,8 +272,6 @@ AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, replica_i return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED); } - - ///////////////////// RaftReplService CP Callbacks ///////////////////////////// std::unique_ptr< CPContext > RaftReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) { return nullptr; } From 13e235dbcd6bbce5f6d647f697809edc64f989af Mon Sep 17 00:00:00 2001 From: Harihara Kadayam Date: Tue, 27 Feb 2024 12:58:57 -0800 Subject: [PATCH 2/4] Create repl req details for all IO types. (#336) Following significant changes are made in this commit * Till now repl req is not created for header_only (inline) request, which results in different handling of request nullptr everywhere resulted in some crash. Made this uniform behavior on every types. * At present, we use replica set is_leader() to make decision on some commit, append entry code path. This could be potentially dangerous as at some inopprtunite time, if leader is switched and somehow the request survived, we might run into corruption cases. With this change, there is no special handling of leader. Only 2 roles, proposer and applier. In most use cases proposer is leader and applier is the follower, but that requirement is removed. * Because of the above assumption, there was a bug where in leader non-app entries (conf entries) are not guaranteed to be flushed, before calling commit. Fixed that behavior as well. * The HomeLogStore has an limitation (exposed when the above issue is fixed), whereby in proposer when the data write is completed and then propose to raft happens. RAFT calls end_of_append_batch in the same thread which does flush_sync. However, the data_write completion happens in IO thread and thus there is a potential for deadlock (since flush_sync takes giant lock). We can overcome this problem with fiber, but instead of forcing worker thread to have fiber, we isolate all logstore flush to be in separate flush thread always. --- src/include/homestore/replication/repl_dev.h | 11 +- src/lib/common/homestore_config.fbs | 2 +- src/lib/logstore/log_dev.cpp | 15 +- .../replication/log_store/repl_log_store.cpp | 61 ++++-- src/lib/replication/repl_dev/common.cpp | 4 +- .../replication/repl_dev/raft_repl_dev.cpp | 136 ++++++------ src/lib/replication/repl_dev/raft_repl_dev.h | 6 +- .../repl_dev/raft_state_machine.cpp | 194 +++++++++--------- .../replication/repl_dev/solo_repl_dev.cpp | 3 + src/tests/test_common/hs_repl_test_common.hpp | 12 +- src/tests/test_raft_repl_dev.cpp | 46 +++-- 11 files changed, 276 insertions(+), 214 deletions(-) diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 34bc41e49..d42ccc7ee 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -71,13 +71,14 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: repl_key rkey; // Unique key for the request sisl::blob header; // User header sisl::blob key; // User supplied key for this req - int64_t lsn{0}; // Lsn for this replication req + int64_t lsn{-1}; // Lsn for this replication req bool is_proposer{false}; // Is the repl_req proposed by this node //////////////// Value related section ///////////////// - sisl::sg_list value; // Raw value - applicable only to leader req - MultiBlkId local_blkid; // Local BlkId for the value - RemoteBlkId remote_blkid; // Corresponding remote blkid for the value + sisl::sg_list value; // Raw value - applicable only to leader req + MultiBlkId local_blkid; // Local BlkId for the value + RemoteBlkId remote_blkid; // Corresponding remote blkid for the value + bool value_inlined{false}; // Is the value inlined in the header itself //////////////// Journal/Buf related section ///////////////// std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > journal_buf; // Buf for the journal entry @@ -244,7 +245,7 @@ class ReplDev { /// @brief get replication status. If called on follower member /// this API can return empty result. - virtual std::vector get_replication_status() const = 0; + virtual std::vector< peer_info > get_replication_status() const = 0; /// @brief Gets the group_id this repldev is working for /// @return group_id diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index faa2a1358..7be5f659b 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -111,7 +111,7 @@ table LogStore { // Logdev will flush the logs only in a dedicated thread. Turn this on, if flush IO doesn't want to // intervene with data IO path. - flush_only_in_dedicated_thread: bool = false; + flush_only_in_dedicated_thread: bool = true; } table Generic { diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 83c284c3e..90cc8f2e7 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -87,13 +87,11 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { m_last_flush_idx = m_log_idx - 1; } - m_flush_timer_hdl = iomanager.schedule_global_timer( - HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, true, nullptr /* cookie */, - iomgr::reactor_regex::all_worker, - [this](void*) { - if (m_pending_flush_size.load() && !m_is_flushing.load(std::memory_order_relaxed)) { flush_if_needed(); } - }, - true /* wait_to_schedule */); + iomanager.run_on_wait(logstore_service().flush_thread(), [this]() { + m_flush_timer_hdl = iomanager.schedule_thread_timer(HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, + true /* recurring */, nullptr /* cookie */, + [this](void*) { flush_if_needed(); }); + }); handle_unopened_log_stores(format); @@ -133,7 +131,8 @@ void LogDev::stop() { } // cancel the timer - iomanager.cancel_timer(m_flush_timer_hdl, true); + iomanager.run_on_wait(logstore_service().flush_thread(), + [this]() { iomanager.cancel_timer(m_flush_timer_hdl, true); }); { folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index 84a12925d..1020258ba 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -7,42 +7,62 @@ namespace homestore { uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) { + // We don't want to transform anything that is not an app log + if (entry->get_val_type() != nuraft::log_val_type::app_log) { return HomeRaftLogStore::append(entry); } + repl_req_ptr_t rreq = m_sm.transform_journal_entry(entry); ulong lsn; - if (rreq) { - lsn = HomeRaftLogStore::append(rreq->raft_journal_buf()); - m_sm.link_lsn_to_req(rreq, int64_cast(lsn)); - RD_LOG(INFO, "Raft Channel: Received log entry rreq=[{}]", rreq->to_compact_string()); - } else { + if (rreq->is_proposer || rreq->value_inlined) { + // No need of any transformation for proposer or inline data, since the entry is already meaningful lsn = HomeRaftLogStore::append(entry); + } else { + lsn = HomeRaftLogStore::append(rreq->raft_journal_buf()); } + m_sm.link_lsn_to_req(rreq, int64_cast(lsn)); + RD_LOG(DEBUG, "Raft Channel: Received append log entry rreq=[{}]", rreq->to_compact_string()); return lsn; } void ReplLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& entry) { + // We don't want to transform anything that is not an app log + if (entry->get_val_type() != nuraft::log_val_type::app_log) { + HomeRaftLogStore::write_at(index, entry); + return; + } + repl_req_ptr_t rreq = m_sm.transform_journal_entry(entry); - if (rreq) { - HomeRaftLogStore::write_at(index, rreq->raft_journal_buf()); - m_sm.link_lsn_to_req(rreq, int64_cast(index)); - RD_LOG(INFO, "Raft Channel: Received log entry rreq=[{}]", rreq->to_compact_string()); - } else { + if (rreq->is_proposer || rreq->value_inlined) { + // No need of any transformation for proposer or inline data, since the entry is already meaningful HomeRaftLogStore::write_at(index, entry); + } else { + HomeRaftLogStore::write_at(index, rreq->raft_journal_buf()); } + m_sm.link_lsn_to_req(rreq, int64_cast(index)); + RD_LOG(DEBUG, "Raft Channel: Received write_at log entry rreq=[{}]", rreq->to_compact_string()); } void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { - // Skip this call in leader, since this method will synchronously flush the data, which is not required for - // leader. Leader will call the flush as part of commit after receiving quorum, upon which time, there is a high - // possibility the log entry is already flushed. - if (!m_rd.is_leader()) { - int64_t end_lsn = int64_cast(start_lsn + count - 1); + int64_t end_lsn = int64_cast(start_lsn + count - 1); - // Start fetch the batch of data for this lsn range from remote if its not available yet. - auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); - for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) { - reqs->emplace_back(m_sm.lsn_to_req(lsn)); + // Start fetch the batch of data for this lsn range from remote if its not available yet. + auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); + for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) { + auto rreq = m_sm.lsn_to_req(lsn); + // Skip this call in proposer, since this method will synchronously flush the data, which is not required for + // leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a + // high possibility the log entry is already flushed. + if (rreq && rreq->is_proposer) { + RD_LOG(TRACE, "Raft Channel: Ignoring to flush proposer request rreq=[{}]", rreq->to_compact_string()); + continue; } + reqs->emplace_back(std::move(rreq)); + } + + RD_LOG(TRACE, "Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count, + reqs->size()); + // All requests are from proposer for data write, so as mentioned above we can skip the flush for now + if (!reqs->empty()) { // Check the map if data corresponding to all of these requsts have been received and written. If not, schedule // a fetch and write. Once all requests are completed and written, these requests are poped out of the map and // the future will be ready. @@ -60,9 +80,8 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { for (auto const& rreq : *reqs) { if (rreq) { rreq->state.fetch_or(uint32_cast(repl_req_state_t::LOG_FLUSHED)); } } - - sisl::VectorPool< repl_req_ptr_t >::free(reqs); } + sisl::VectorPool< repl_req_ptr_t >::free(reqs); } std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); } diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index db5540d61..2d2e8a122 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -44,8 +44,8 @@ std::string repl_req_ctx::to_string() const { } std::string repl_req_ctx::to_compact_string() const { - return fmt::format("dsn={} term={} lsn={} state={} ref={}", rkey.dsn, rkey.term, lsn, req_state_name(state.load()), - this->use_count()); + return fmt::format("dsn={} term={} lsn={} Blkid={} state=[{}]", rkey.dsn, rkey.term, lsn, local_blkid.to_string(), + req_state_name(state.load())); } } // namespace homestore \ No newline at end of file diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index d03e0563d..30f98570c 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -95,11 +95,15 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& rreq->header = header; rreq->key = key; rreq->value = value; + rreq->rkey = repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)}; + + // Add the request to the repl_dev_rreq map, it will be accessed throughout the life cycle of this request + auto const [it, happened] = m_repl_key_req_map.emplace(rreq->rkey, rreq); + RD_DBG_ASSERT(happened, "Duplicate repl_key={} found in the map", rreq->rkey.to_string()); // If it is header only entry, directly propose to the raft if (rreq->value.size) { - rreq->rkey = - repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)}; + rreq->value_inlined = false; push_data_to_all_followers(rreq); // Step 1: Alloc Blkid @@ -116,7 +120,6 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& // Write the data data_service().async_write(rreq->value, rreq->local_blkid).thenValue([this, rreq](auto&& err) { if (!err) { - rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); auto raft_status = m_state_machine->propose_to_raft(std::move(rreq)); if (raft_status != ReplServiceError::OK) { handle_error(rreq, raft_status); } } else { @@ -125,8 +128,8 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& } }); } else { - RD_LOG(INFO, "Skipping data channel send since value size is 0"); - rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); + rreq->value_inlined = true; + RD_LOG(DEBUG, "Skipping data channel send since value size is 0"); auto raft_status = m_state_machine->propose_to_raft(std::move(rreq)); if (raft_status != ReplServiceError::OK) { handle_error(rreq, raft_status); } } @@ -160,7 +163,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { return; } // Release the buffer which holds the packets - RD_LOG(INFO, "Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string()); + RD_LOG(DEBUG, "Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string()); rreq->fb_builder.Release(); rreq->pkts.clear(); }); @@ -170,7 +173,7 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ auto const& incoming_buf = rpc_data->request_blob(); auto fetch_req = GetSizePrefixedFetchData(incoming_buf.cbytes()); - RD_LOG(INFO, "Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size()); + RD_LOG(DEBUG, "Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size()); std::vector< sisl::sg_list > sgs_vec; @@ -254,13 +257,13 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) auto s = rreq->state.load(); if ((s & uint32_cast(repl_req_state_t::ERRORED)) || !(rreq->state.compare_exchange_strong(s, s | uint32_cast(repl_req_state_t::ERRORED)))) { - RD_LOG(INFO, "Raft Channel: Error in processing rreq=[{}] error={} already errored", rreq->to_compact_string(), + RD_LOG(ERROR, "Raft Channel: Error in processing rreq=[{}] error={} already errored", rreq->to_compact_string(), err); return; } // Free the blks which is allocated already - RD_LOG(INFO, "Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_compact_string(), err); + RD_LOG(ERROR, "Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_compact_string(), err); if (rreq->state.load() & uint32_cast(repl_req_state_t::BLK_ALLOCATED)) { auto blkid = rreq->local_blkid; data_service().async_free_blk(blkid).thenValue([blkid](auto&& err) { @@ -296,7 +299,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d sisl::blob header = sisl::blob{push_req->user_header()->Data(), push_req->user_header()->size()}; sisl::blob key = sisl::blob{push_req->user_key()->Data(), push_req->user_key()->size()}; - auto rreq = follower_create_req( + auto rreq = applier_create_req( repl_key{.server_id = push_req->issuer_replica_id(), .term = push_req->raft_term(), .dsn = push_req->dsn()}, header, key, push_req->data_size()); rreq->rpc_data = rpc_data; @@ -308,7 +311,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d return; } #endif - RD_LOG(INFO, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); + RD_LOG(DEBUG, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); if (rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)) & uint32_cast(repl_req_state_t::DATA_RECEIVED)) { @@ -339,7 +342,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d } else { rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); rreq->data_written_promise.setValue(); - RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); + RD_LOG(DEBUG, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); } }); } @@ -356,12 +359,27 @@ static MultiBlkId do_alloc_blk(uint32_t size, blk_alloc_hints const& hints) { return blkid; } -repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob const& user_header, - sisl::blob const& user_key, uint32_t data_size) { +repl_req_ptr_t RaftReplDev::repl_key_to_req(repl_key const& rkey) const { + auto const it = m_repl_key_req_map.find(rkey); + if (it == m_repl_key_req_map.cend()) { return nullptr; } + return it->second; +} + +repl_req_ptr_t RaftReplDev::applier_create_req(repl_key const& rkey, sisl::blob const& user_header, + sisl::blob const& user_key, uint32_t data_size) { auto const [it, happened] = m_repl_key_req_map.try_emplace(rkey, repl_req_ptr_t(new repl_req_ctx())); RD_DBG_ASSERT((it != m_repl_key_req_map.end()), "Unexpected error in map_repl_key_to_req"); auto rreq = it->second; + // There is no data portion, so there is not requied to allocate + if (data_size == 0) { + rreq->rkey = rkey; + rreq->header = user_header; + rreq->key = user_key; + rreq->value_inlined = true; + return rreq; + } + if (!happened) { // We already have the entry in the map, check if we are already allocated the blk by previous caller, in that // case we need to return the req. @@ -370,7 +388,7 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob RD_REL_ASSERT(blob_equals(user_header, rreq->header), "User header mismatch for repl_key={}", rkey.to_string()); RD_REL_ASSERT(blob_equals(user_key, rreq->key), "User key mismatch for repl_key={}", rkey.to_string()); - RD_LOG(INFO, "Repl_key=[{}] already received ", rkey.to_string()); + RD_LOG(DEBUG, "Repl_key=[{}] already received ", rkey.to_string()); return rreq; } } @@ -382,15 +400,16 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob rreq->rkey = rkey; rreq->header = user_header; rreq->key = user_key; + rreq->value_inlined = false; rreq->local_blkid = do_alloc_blk(data_size, m_listener->get_blk_alloc_hints(user_header, data_size)); rreq->state.fetch_or(uint32_cast(repl_req_state_t::BLK_ALLOCATED)); - RD_LOG(INFO, "in follower_create_req: rreq={}, addr={}", rreq->to_compact_string(), - reinterpret_cast< uintptr_t >(rreq.get())); + RD_LOG(DEBUG, "in applier_create_req: rreq={}", rreq->to_compact_string()); + return rreq; } -auto RaftReplDev::get_max_data_fetch_size() const { +static auto get_max_data_fetch_size() { #ifdef _PRERELEASE if (iomgr_flip::instance()->test_flip("simulate_staging_fetch_data")) { LOGINFO("Flip simulate_staging_fetch_data is enabled, return max_data_fetch_size: 16K"); @@ -402,21 +421,18 @@ auto RaftReplDev::get_max_data_fetch_size() const { void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs) { // Pop any entries that are already completed - from the entries list as well as from map - rreqs->erase(std::remove_if( - rreqs->begin(), rreqs->end(), - [this](repl_req_ptr_t const& rreq) { - if (rreq == nullptr) { return true; } - - if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { - m_repl_key_req_map.erase(rreq->rkey); // Remove=Pop from map as well, since it is completed - RD_LOG(INFO, - "Raft Channel: Data write completed and blkid mapped, removing from map: rreq=[{}]", - rreq->to_compact_string()); - return true; // Remove from the pending list - } else { - return false; - } - }), + rreqs->erase(std::remove_if(rreqs->begin(), rreqs->end(), + [this](repl_req_ptr_t const& rreq) { + if (rreq == nullptr) { return true; } + + if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { + RD_LOG(DEBUG, "Raft Channel: Data write completed and blkid mapped: rreq=[{}]", + rreq->to_compact_string()); + return true; // Remove from the pending list + } else { + return false; + } + }), rreqs->end()); if (rreqs->size()) { @@ -444,7 +460,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -483,7 +499,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (!e) { // if we are here, it means the original who sent the log entries are down. // we need to handle error and when the other member becomes leader, it will resend the log entries; - RD_LOG(INFO, + RD_LOG(ERROR, "Not able to fetching data from originator={}, error={}, probably originator is down. Will " "retry when new leader start appending log entries", rreqs.front()->remote_blkid.server_id, e.error()); @@ -499,7 +515,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { RD_DBG_ASSERT_GT(total_size, 0, "Empty response from remote"); RD_DBG_ASSERT(raw_data, "Empty response from remote"); - RD_LOG(INFO, "Data Channel: FetchData completed for reques.size()={} ", rreqs.size()); + RD_LOG(DEBUG, "Data Channel: FetchData completed for reques.size()={} ", rreqs.size()); thread_local std::vector< folly::Future< std::error_code > > futs; // static is impplied futs.clear(); @@ -520,7 +536,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { "Data size mismatch for rreq={} blkid={}, remote size: {}, local size: {}", rreq->to_compact_string(), rreq->local_blkid.to_string(), data_size, local_size); - RD_LOG(INFO, "Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.", + RD_LOG(DEBUG, "Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.", rreq->to_compact_string()); continue; } else { @@ -565,7 +581,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { raw_data += data_size; total_size -= data_size; - RD_LOG(INFO, + RD_LOG(DEBUG, "Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, " "local_blkid: {}", rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string()); @@ -599,22 +615,21 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > futs.reserve(rreqs->size()); // Pop any entries that are already completed - from the entries list as well as from map - rreqs->erase(std::remove_if( - rreqs->begin(), rreqs->end(), - [this, &futs](repl_req_ptr_t const& rreq) { - if (rreq == nullptr) { return true; } - - if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { - m_repl_key_req_map.erase(rreq->rkey); // Remove=Pop from map as well, since it is completed - RD_LOG(INFO, - "Raft Channel: Data write completed and blkid mapped, removing from map: rreq=[{}]", - rreq->to_compact_string()); - return true; // Remove from the pending list - } else { - futs.emplace_back(rreq->data_written_promise.getSemiFuture()); - return false; - } - }), + rreqs->erase(std::remove_if(rreqs->begin(), rreqs->end(), + [this, &futs](repl_req_ptr_t const& rreq) { + if ((rreq == nullptr) || (rreq->value_inlined)) { return true; } + + if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { + RD_LOG(DEBUG, "Raft Channel: Data write completed and blkid mapped: rreq=[{}]", + rreq->to_compact_string()); + return true; // Remove from the pending list + } else { + RD_LOG(TRACE, "Data Channel: Data write pending rreq=[{}]", + rreq->to_compact_string()); + futs.emplace_back(rreq->data_written_promise.getSemiFuture()); + return false; + } + }), rreqs->end()); // All the entries are done already, no need to wait @@ -630,7 +645,7 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > m_wait_data_timer_hdl = iomanager.schedule_global_timer( // timer wakes up in current thread; HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */, nullptr /* cookie */, iomgr::reactor_regex::all_worker, [this, rreqs](auto /*cookie*/) { - RD_LOG(INFO, "Data Channel: Wait data write timer fired, checking if data is written"); + RD_LOG(DEBUG, "Data Channel: Wait data write timer fired, checking if data is written"); check_and_fetch_remote_data(rreqs); }); } @@ -641,10 +656,9 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > HS_DBG_ASSERT(rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN), "Data written promise raised without updating DATA_WRITTEN state for rkey={}", rreq->rkey.to_string()); - RD_LOG(INFO, "Raft Channel: Data write completed and blkid mapped, removing from map: rreq=[{}]", - rreq->to_compact_string()); - m_repl_key_req_map.erase(rreq->rkey); // Remove from map as well, since it is completed + RD_LOG(DEBUG, "Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_compact_string()); } + RD_LOG(TRACE, "Data Channel: {} pending reqs's data are written", rreqs->size()); return folly::makeSemiFuture< folly::Unit >(folly::Unit{}); }); } @@ -817,10 +831,13 @@ void RaftReplDev::leave() { void RaftReplDev::report_committed(repl_req_ptr_t rreq) { if (rreq->local_blkid.is_valid()) { data_service().commit_blk(rreq->local_blkid); } + // Remove the request from repl_key map. + m_repl_key_req_map.erase(rreq->rkey); + auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn); RD_DBG_ASSERT_GT(rreq->lsn, prev_lsn, "Out of order commit of lsns, it is not expected in RaftReplDev"); - RD_LOG(INFO, "Raft channel: Commit rreq=[{}]", rreq->to_compact_string()); + RD_LOG(DEBUG, "Raft channel: Commit rreq=[{}]", rreq->to_compact_string()); m_listener->on_commit(rreq->lsn, rreq->header, rreq->key, rreq->local_blkid, rreq); if (!rreq->is_proposer) { @@ -843,6 +860,7 @@ void RaftReplDev::cp_flush(CP*) { m_rd_sb->commit_lsn = lsn; m_rd_sb->checkpoint_lsn = lsn; + m_rd_sb->last_applied_dsn = m_next_dsn.load(); m_rd_sb.write(); m_last_flushed_commit_lsn = lsn; } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 257598a32..5c5f80627 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -97,8 +97,9 @@ class RaftReplDev : public ReplDev, //////////////// Methods needed for other Raft classes to access ///////////////// void use_config(json_superblk raft_config_sb); void report_committed(repl_req_ptr_t rreq); - repl_req_ptr_t follower_create_req(repl_key const& rkey, sisl::blob const& user_header, sisl::blob const& user_key, - uint32_t data_size); + repl_req_ptr_t repl_key_to_req(repl_key const& rkey) const; + repl_req_ptr_t applier_create_req(repl_key const& rkey, sisl::blob const& user_header, sisl::blob const& user_key, + uint32_t data_size); AsyncNotify notify_after_data_written(std::vector< repl_req_ptr_t >* rreqs); void cp_flush(CP* cp); void cp_cleanup(CP* cp); @@ -126,7 +127,6 @@ class RaftReplDev : public ReplDev, void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data); void check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs); void fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs); - auto get_max_data_fetch_size() const; bool is_resync_mode() { return m_resync_mode; } void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err); }; diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index ea3cba12d..98f63af71 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -16,55 +16,22 @@ RaftStateMachine::RaftStateMachine(RaftReplDev& rd) : m_rd{rd} { m_success_ptr->put(0); } -raft_buf_ptr_t RaftStateMachine::pre_commit_ext(nuraft::state_machine::ext_op_params const& params) { - // Leader precommit is processed in next callback, because this callback doesn't provide a way to stick a context - // which could contain the req structure in it. - if (!m_rd.is_leader()) { - int64_t lsn = s_cast< int64_t >(params.log_idx); - raft_buf_ptr_t data = params.data; - - repl_req_ptr_t rreq = lsn_to_req(lsn); - RD_LOG(INFO, "Raft channel: Precommit rreq=[{}]", rreq->to_compact_string()); - m_rd.m_listener->on_pre_commit(rreq->lsn, rreq->header, rreq->key, rreq); - } - return m_success_ptr; -} - -void RaftStateMachine::after_precommit_in_leader(nuraft::raft_server::req_ext_cb_params const& params) { - repl_req_ptr_t rreq = repl_req_ptr_t(r_cast< repl_req_ctx* >(params.context)); - link_lsn_to_req(rreq, int64_cast(params.log_idx)); - - RD_LOG(INFO, "Raft Channel: Proposed rreq=[{}]", rreq->to_compact_string()); - m_rd.m_listener->on_pre_commit(rreq->lsn, rreq->header, rreq->key, rreq); +static std::pair< sisl::blob, sisl::blob > header_only_extract(nuraft::buffer& buf) { + repl_journal_entry* jentry = r_cast< repl_journal_entry* >(buf.data_begin()); + RELEASE_ASSERT_EQ(jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, + "Mismatched version of journal entry received from RAFT peer"); + RELEASE_ASSERT_EQ(jentry->code, journal_type_t::HS_HEADER_ONLY, + "Trying to extract header on non-header only entry"); + sisl::blob const header = sisl::blob{uintptr_cast(jentry) + sizeof(repl_journal_entry), jentry->user_header_size}; + sisl::blob const key = sisl::blob{header.cbytes() + header.size(), jentry->key_size}; + return {header, key}; } -raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params const& params) { - int64_t lsn = s_cast< int64_t >(params.log_idx); - raft_buf_ptr_t data = params.data; - - repl_req_ptr_t rreq = lsn_to_req(lsn); - if (rreq == nullptr) { return m_success_ptr; } - - RD_LOG(INFO, "Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); - if (m_rd.is_leader()) { - // This is the time to ensure flushing of journal happens in leader - if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); } - rreq->state.fetch_or(uint32_cast(repl_req_state_t::LOG_FLUSHED)); - } - if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { - m_lsn_req_map.erase(rreq->lsn); - m_rd.report_committed(rreq); - } - return m_success_ptr; -} - -uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); } - ReplServiceError RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { - uint32_t val_size = rreq->value.size ? rreq->local_blkid.serialized_size() : 0; + uint32_t val_size = rreq->value_inlined ? 0 : rreq->local_blkid.serialized_size(); uint32_t entry_size = sizeof(repl_journal_entry) + rreq->header.size() + rreq->key.size() + val_size; rreq->alloc_journal_entry(entry_size, true /* raft_buf */); - rreq->journal_entry->code = (rreq->value.size) ? journal_type_t::HS_LARGE_DATA : journal_type_t::HS_HEADER_ONLY; + rreq->journal_entry->code = (rreq->value_inlined) ? journal_type_t::HS_HEADER_ONLY : journal_type_t::HS_LARGE_DATA; rreq->journal_entry->server_id = m_rd.server_id(); rreq->journal_entry->dsn = rreq->dsn(); rreq->journal_entry->user_header_size = rreq->header.size(); @@ -92,14 +59,9 @@ ReplServiceError RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { auto* vec = sisl::VectorPool< raft_buf_ptr_t >::alloc(); vec->push_back(rreq->raft_journal_buf()); - nuraft::raft_server::req_ext_params param; - param.after_precommit_ = bind_this(RaftStateMachine::after_precommit_in_leader, 1); - param.expected_term_ = 0; - param.context_ = voidptr_cast(rreq.get()); - RD_LOG(TRACE, "Raft Channel: journal_entry=[{}] ", rreq->journal_entry->to_string()); - auto append_status = m_rd.raft_server()->append_entries_ext(*vec, param); + auto append_status = m_rd.raft_server()->append_entries(*vec); sisl::VectorPool< raft_buf_ptr_t >::free(vec); if (append_status && !append_status->get_accepted()) { @@ -111,28 +73,29 @@ ReplServiceError RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { } repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::log_entry >& lentry) { - // Leader has nothing to transform or process - if (m_rd.is_leader()) { return nullptr; } - - // We don't want to transform anything that is not an app log - if (lentry->get_val_type() != nuraft::log_val_type::app_log) { return nullptr; } - - // Validate the journal entry and see if it needs to be processed - { - repl_journal_entry* tmp_jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin()); - RELEASE_ASSERT_EQ(tmp_jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, - "Mismatched version of journal entry received from RAFT peer"); - - RD_LOG(TRACE, "Received Raft log_entry=[term={}], journal_entry=[{}] ", lentry->get_term(), - tmp_jentry->to_string()); - - // For inline data we don't need to transform anything - if (tmp_jentry->code != journal_type_t::HS_LARGE_DATA) { return nullptr; } - - DEBUG_ASSERT_GT(tmp_jentry->value_size, 0, "Entry marked as large data, but value size is notified as 0"); + // Validate the journal entry and see if it needs to be transformed + + repl_journal_entry* tmp_jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin()); + RELEASE_ASSERT_EQ(tmp_jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, + "Mismatched version of journal entry received from RAFT peer"); + + RD_LOG(TRACE, "Received Raft log_entry=[term={}], journal_entry=[{}] ", lentry->get_term(), + tmp_jentry->to_string()); + + if (tmp_jentry->server_id == m_rd.server_id()) { + // We are the proposer for this entry, lets pull the request from the map. We don't need any actual + // transformation here, because the entry is already is local + repl_key rkey{.server_id = tmp_jentry->server_id, .term = lentry->get_term(), .dsn = tmp_jentry->dsn}; + auto rreq = m_rd.repl_key_to_req(rkey); + RELEASE_ASSERT(rreq != nullptr, + "Log entry write with local server_id rkey={} but its corresponding req is missting in map", + rkey.to_string()); + DEBUG_ASSERT(rreq->is_proposer, "Log entry has same server_id={}, but rreq says its not a proposer", + m_rd.server_id()) + return rreq; } - auto log_to_journal_entry = [](raft_buf_ptr_t const& log_buf, auto const log_buf_data_offset) { + auto log_to_journal_entry = [](raft_buf_ptr_t const& log_buf, auto log_buf_data_offset) { repl_journal_entry* jentry = r_cast< repl_journal_entry* >(log_buf->data_begin() + log_buf_data_offset); sisl::blob const header = sisl::blob{uintptr_cast(jentry) + sizeof(repl_journal_entry), jentry->user_header_size}; @@ -141,44 +104,85 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo }; // Serialize the log_entry buffer which returns the actual raft log_entry buffer. - auto log_buf = lentry->serialize(); - auto const log_buf_data_offset = log_buf->size() - lentry->get_buf().size(); + raft_buf_ptr_t log_buf; + size_t log_buf_data_offset; + if (tmp_jentry->code == journal_type_t::HS_LARGE_DATA) { + DEBUG_ASSERT_GT(tmp_jentry->value_size, 0, "Entry marked as large data, but value size is notified as 0"); + log_buf = lentry->serialize(); + log_buf_data_offset = log_buf->size() - lentry->get_buf().size(); + } else { + DEBUG_ASSERT_EQ(tmp_jentry->value_size, 0, "Entry marked as inline data, but value size is not 0"); + log_buf = lentry->get_buf_ptr(); + log_buf_data_offset = 0; + } + auto const [jentry, header, key] = log_to_journal_entry(log_buf, log_buf_data_offset); + RD_LOG(DEBUG, "Received Raft server_id={}, term={}, dsn={}, journal_entry=[{}] ", jentry->server_id, + lentry->get_term(), jentry->dsn, jentry->to_string()); - LOGINFO("Received Raft server_id={}, term={}, dsn={}, journal_entry=[{}] ", jentry->server_id, lentry->get_term(), - jentry->dsn, jentry->to_string()); // From the repl_key, get the repl_req. In cases where log stream got here first, this method will create a new // repl_req and return that back. Fill up all of the required journal entry inside the repl_req - auto rreq = m_rd.follower_create_req( + auto rreq = m_rd.applier_create_req( repl_key{.server_id = jentry->server_id, .term = lentry->get_term(), .dsn = jentry->dsn}, header, key, jentry->value_size); rreq->journal_buf = std::move(log_buf); rreq->journal_entry = jentry; - MultiBlkId entry_blkid; - entry_blkid.deserialize(sisl::blob{key.cbytes() + key.size(), jentry->value_size}, true /* copy */); - rreq->remote_blkid = RemoteBlkId{jentry->server_id, entry_blkid}; - - auto const local_size = rreq->local_blkid.serialized_size(); - auto const remote_size = entry_blkid.serialized_size(); - uint8_t* blkid_location; - if (local_size > remote_size) { - // We need to copy the entire log_entry to accomodate local blkid - auto new_buf = nuraft::buffer::expand(*rreq->raft_journal_buf(), - rreq->raft_journal_buf()->size() + local_size - remote_size); - blkid_location = uintptr_cast(new_buf->data_begin()) + rreq->raft_journal_buf()->size() - jentry->value_size; - std::tie(rreq->journal_entry, rreq->header, rreq->key) = log_to_journal_entry(new_buf, log_buf_data_offset); - rreq->journal_buf = std::move(new_buf); - } else { - // Can do in-place replace of remote blkid with local blkid. - blkid_location = uintptr_cast(rreq->raft_journal_buf()->data_begin()) + rreq->raft_journal_buf()->size() - - jentry->value_size; + if (jentry->value_size > 0) { + MultiBlkId entry_blkid; + entry_blkid.deserialize(sisl::blob{key.cbytes() + key.size(), jentry->value_size}, true /* copy */); + rreq->remote_blkid = RemoteBlkId{jentry->server_id, entry_blkid}; + + auto const local_size = rreq->local_blkid.serialized_size(); + auto const remote_size = entry_blkid.serialized_size(); + uint8_t* blkid_location; + if (local_size > remote_size) { + // We need to copy the entire log_entry to accomodate local blkid + auto new_buf = nuraft::buffer::expand(*rreq->raft_journal_buf(), + rreq->raft_journal_buf()->size() + local_size - remote_size); + blkid_location = + uintptr_cast(new_buf->data_begin()) + rreq->raft_journal_buf()->size() - jentry->value_size; + std::tie(rreq->journal_entry, rreq->header, rreq->key) = log_to_journal_entry(new_buf, log_buf_data_offset); + rreq->journal_buf = std::move(new_buf); + } else { + // Can do in-place replace of remote blkid with local blkid. + blkid_location = uintptr_cast(rreq->raft_journal_buf()->data_begin()) + rreq->raft_journal_buf()->size() - + jentry->value_size; + } + std::memcpy(blkid_location, rreq->local_blkid.serialize().cbytes(), local_size); } - std::memcpy(blkid_location, rreq->local_blkid.serialize().cbytes(), local_size); - return rreq; } +raft_buf_ptr_t RaftStateMachine::pre_commit_ext(nuraft::state_machine::ext_op_params const& params) { + int64_t lsn = s_cast< int64_t >(params.log_idx); + + repl_req_ptr_t rreq = lsn_to_req(lsn); + RD_LOG(DEBUG, "Raft channel: Precommit rreq=[{}]", rreq->to_compact_string()); + m_rd.m_listener->on_pre_commit(rreq->lsn, rreq->header, rreq->key, rreq); + + return m_success_ptr; +} + +raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params const& params) { + int64_t lsn = s_cast< int64_t >(params.log_idx); + + repl_req_ptr_t rreq = lsn_to_req(lsn); + RD_LOG(DEBUG, "Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string()); + if (rreq->is_proposer) { + // This is the time to ensure flushing of journal happens in the proposer + if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); } + rreq->state.fetch_or(uint32_cast(repl_req_state_t::LOG_FLUSHED)); + } + + m_lsn_req_map.erase(rreq->lsn); + m_rd.report_committed(rreq); + + return m_success_ptr; +} + +uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); } + void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) { rreq->lsn = lsn; rreq->state.fetch_or(uint32_cast(repl_req_state_t::LOG_RECEIVED)); diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index 73348f53e..5f07487d3 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -36,6 +36,8 @@ void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& // If it is header only entry, directly write to the journal if (rreq->value.size) { + rreq->value_inlined = false; + // Step 1: Alloc Blkid auto status = data_service().alloc_blks(uint32_cast(rreq->value.size), m_listener->get_blk_alloc_hints(rreq->header, rreq->value.size), @@ -50,6 +52,7 @@ void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& write_journal(std::move(rreq)); }); } else { + rreq->value_inlined = true; write_journal(std::move(rreq)); } } diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 6367be928..fbdcd6861 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -121,7 +121,7 @@ class HSReplTestHelper { public: friend class TestReplApplication; - HSReplTestHelper(std::string const& name, char** argv) : name_{name}, argv_{argv} {} + HSReplTestHelper(std::string const& name, int argc, char** argv) : name_{name}, argc_{argc}, argv_{argv} {} void setup() { replica_num_ = SISL_OPTIONS["replica_num"].as< uint16_t >(); @@ -153,8 +153,13 @@ class HSReplTestHelper { for (uint32_t i{1}; i < num_replicas; ++i) { LOGINFO("Spawning Homestore replica={} instance", i); - boost::process::child c(argv_[0], "--log_mods", "replication:trace", "--replica_num", std::to_string(i), - proc_grp_); + std::string cmd_line; + fmt::format_to(std::back_inserter(cmd_line), "{} --replica_num {}", argv_[0], i); + for (int j{1}; j < argc_; ++j) { + fmt::format_to(std::back_inserter(cmd_line), " {}", argv_[j]); + } + boost::process::child c(boost::process::cmd = cmd_line, proc_grp_); + // boost::process::child c(argv_[0], "--replica_num", std::to_string(i), proc_grp_); c.detach(); } } else { @@ -282,6 +287,7 @@ class HSReplTestHelper { private: uint16_t replica_num_; std::string name_; + int argc_; char** argv_; boost::process::group proc_grp_; diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index e31b61a1a..3e8e1ae7e 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -52,6 +52,8 @@ SISL_OPTION_GROUP(test_raft_repl_dev, SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup) static std::unique_ptr< test_common::HSReplTestHelper > g_helper; +static std::random_device g_rd{}; +static std::default_random_engine g_re{g_rd()}; class TestReplicatedDB : public homestore::ReplDevListener { public: @@ -174,20 +176,25 @@ class TestReplicatedDB : public homestore::ReplDevListener { ++it; } - auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size); - - repl_dev()->async_read(v.blkid_, read_sgs, v.data_size_).thenValue([read_sgs, k, v](auto const ec) { - RELEASE_ASSERT(!ec, "Read of blkid={} for key={} error={}", v.blkid_.to_string(), k.id_, ec.message()); - for (auto const& iov : read_sgs.iovs) { - test_common::HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, - v.data_pattern_); - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - LOGINFO("Validated successfully key={} value[blkid={} pattern={}]", k.id_, v.blkid_.to_string(), - v.data_pattern_); + if (v.data_size_ != 0) { + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size); + + repl_dev()->async_read(v.blkid_, read_sgs, v.data_size_).thenValue([read_sgs, k, v](auto const ec) { + RELEASE_ASSERT(!ec, "Read of blkid={} for key={} error={}", v.blkid_.to_string(), k.id_, + ec.message()); + for (auto const& iov : read_sgs.iovs) { + test_common::HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, + v.data_pattern_); + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + LOGINFO("Validated successfully key={} value[blkid={} pattern={}]", k.id_, v.blkid_.to_string(), + v.data_pattern_); + g_helper->runner().next_task(); + }); + } else { g_helper->runner().next_task(); - }); + } }); g_helper->runner().execute().get(); } @@ -269,7 +276,6 @@ class RaftReplDevTest : public testing::Test { }; TEST_F(RaftReplDevTest, All_Append_Restart_Append) { - LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); @@ -277,7 +283,10 @@ TEST_F(RaftReplDevTest, All_Append_Restart_Append) { if (g_helper->replica_num() == 0) { auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); - g_helper->runner().set_task([this, block_size]() { this->generate_writes(block_size, block_size); }); + g_helper->runner().set_task([this, block_size]() { + static std::normal_distribution<> num_blks_gen{3.0, 2.0}; + this->generate_writes(std::abs(std::round(num_blks_gen(g_re))) * block_size, block_size); + }); g_helper->runner().execute().get(); } this->wait_for_all_writes(exp_entries); @@ -298,7 +307,10 @@ TEST_F(RaftReplDevTest, All_Append_Restart_Append) { LOGINFO("Post restart write the data again"); auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); - g_helper->runner().set_task([this, block_size]() { this->generate_writes(block_size, block_size); }); + g_helper->runner().set_task([this, block_size]() { + static std::normal_distribution<> num_blks_gen{3.0, 2.0}; + this->generate_writes(std::abs(std::round(num_blks_gen(g_re))) * block_size, block_size); + }); g_helper->runner().execute().get(); } this->wait_for_all_writes(exp_entries); @@ -467,7 +479,7 @@ int main(int argc, char* argv[]) { test_repl_common_setup); FLAGS_folly_global_cpu_executor_threads = 4; - g_helper = std::make_unique< test_common::HSReplTestHelper >("test_raft_repl_dev", orig_argv); + g_helper = std::make_unique< test_common::HSReplTestHelper >("test_raft_repl_dev", argc, orig_argv); g_helper->setup(); (g_helper->replica_num() == 0) ? ::testing::GTEST_FLAG(filter) = "*Primary_*:*All_*" From e0cd9a81ca75298bd8bb21b0214c8b75f8635e59 Mon Sep 17 00:00:00 2001 From: Mehdi Hosseini <116847813+shosseinimotlagh@users.noreply.github.com> Date: Wed, 28 Feb 2024 10:58:29 -0800 Subject: [PATCH 3/4] Possible fix for refresh node lock problem (#330) --- .jenkins/jenkinsfile_nightly | 2 +- conanfile.py | 2 +- src/include/homestore/index/index_table.hpp | 26 +++++++--- src/tests/btree_helpers/btree_decls.h | 8 +-- src/tests/btree_helpers/btree_test_helper.hpp | 52 +++++++++++++++---- 5 files changed, 67 insertions(+), 23 deletions(-) diff --git a/.jenkins/jenkinsfile_nightly b/.jenkins/jenkinsfile_nightly index cddc26077..d00b7b722 100644 --- a/.jenkins/jenkinsfile_nightly +++ b/.jenkins/jenkinsfile_nightly @@ -40,7 +40,7 @@ pipeline { } stage("Build") { steps { - sh "conan create --build missing -o sisl:prerelease=True -o homestore:sanitize=True -o homestore:skip_testing=True -pr debug . ${PROJECT}/${VER}@" + sh "conan create --build missing -o homestore:sanitize=True -o homestore:skip_testing=True -pr debug . ${PROJECT}/${VER}@" sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/test_index_btree' -exec cp {} .jenkins/test_index_btree \\;" sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/test_meta_blk_mgr' -exec cp {} .jenkins/test_meta_blk_mgr \\;" sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/test_log_store' -exec cp {} .jenkins/test_log_store \\;" diff --git a/conanfile.py b/conanfile.py index 61bdfa34f..91946e9eb 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.1.7" + version = "5.1.9" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/index/index_table.hpp b/src/include/homestore/index/index_table.hpp index ebe189e7e..4aecc3ddb 100644 --- a/src/include/homestore/index/index_table.hpp +++ b/src/include/homestore/index/index_table.hpp @@ -75,16 +75,30 @@ class IndexTable : public IndexTableBase, public Btree< K, V > { template < typename ReqT > btree_status_t put(ReqT& put_req) { - auto cpg = hs()->cp_mgr().cp_guard(); - put_req.m_op_context = (void*)cpg.context(cp_consumer_t::INDEX_SVC); - return Btree< K, V >::put(put_req); + auto ret = btree_status_t::success; + do { + auto cpg = hs()->cp_mgr().cp_guard(); + put_req.m_op_context = (void*)cpg.context(cp_consumer_t::INDEX_SVC); + ret = Btree< K, V >::put(put_req); + if (ret == btree_status_t::cp_mismatch) { + LOGTRACEMOD(wbcache, "CP Mismatch, retrying put"); + } + } while (ret == btree_status_t::cp_mismatch); + return ret; } template < typename ReqT > btree_status_t remove(ReqT& remove_req) { - auto cpg = hs()->cp_mgr().cp_guard(); - remove_req.m_op_context = (void*)cpg.context(cp_consumer_t::INDEX_SVC); - return Btree< K, V >::remove(remove_req); + auto ret = btree_status_t::success; + do { + auto cpg = hs()->cp_mgr().cp_guard(); + remove_req.m_op_context = (void*)cpg.context(cp_consumer_t::INDEX_SVC); + ret = Btree< K, V >::remove(remove_req); + if (ret == btree_status_t::cp_mismatch) { + LOGTRACEMOD(wbcache, "CP Mismatch, retrying remove"); + } + } while (ret == btree_status_t::cp_mismatch); + return ret; } protected: diff --git a/src/tests/btree_helpers/btree_decls.h b/src/tests/btree_helpers/btree_decls.h index 2c99e90a3..132e1553e 100644 --- a/src/tests/btree_helpers/btree_decls.h +++ b/src/tests/btree_helpers/btree_decls.h @@ -35,11 +35,11 @@ struct VarKeySizeBtree { }; struct VarValueSizeBtree { - using BtreeType = IndexTable< TestVarLenKey, TestVarLenValue >; - using KeyType = TestVarLenKey; + using BtreeType = IndexTable< TestFixedKey, TestVarLenValue >; + using KeyType = TestFixedKey; using ValueType = TestVarLenValue; - static constexpr btree_node_type leaf_node_type = btree_node_type::VAR_OBJECT; - static constexpr btree_node_type interior_node_type = btree_node_type::VAR_OBJECT; + static constexpr btree_node_type leaf_node_type = btree_node_type::VAR_VALUE; + static constexpr btree_node_type interior_node_type = btree_node_type::FIXED; }; struct VarObjSizeBtree { diff --git a/src/tests/btree_helpers/btree_test_helper.hpp b/src/tests/btree_helpers/btree_test_helper.hpp index 9c142ffe5..e59c331cb 100644 --- a/src/tests/btree_helpers/btree_test_helper.hpp +++ b/src/tests/btree_helpers/btree_test_helper.hpp @@ -92,15 +92,44 @@ struct BtreeTestHelper { for (std::size_t i = 0; i < n_fibers; ++i) { const auto start_range = i * chunk_size; const auto end_range = start_range + ((i == n_fibers - 1) ? last_chunk_size : chunk_size); - iomanager.run_on_forget(m_fibers[i], [this, start_range, end_range, &test_count]() { - for (uint32_t i = start_range; i < end_range; i++) { - put(i, btree_put_type::INSERT); - } - { - std::unique_lock lg(m_test_done_mtx); - if (--test_count == 0) { m_test_done_cv.notify_one(); } - } - }); + auto fiber_id = i; + iomanager.run_on_forget( + m_fibers[i], [this, start_range, end_range, &test_count, fiber_id, preload_size]() { + double progress_interval = + (double)(end_range - start_range) / 20; // 5% of the total number of iterations + double progress_thresh = progress_interval; // threshold for progress interval + double elapsed_time, progress_percent, last_progress_time = 0; + auto m_start_time = Clock::now(); + + for (uint32_t i = start_range; i < end_range; i++) { + put(i, btree_put_type::INSERT); + if (fiber_id == 0) { + elapsed_time = get_elapsed_time_sec(m_start_time); + progress_percent = (double)(i - start_range) / (end_range - start_range) * 100; + + // check progress every 5% of the total number of iterations or every 30 seconds + bool print_time = false; + if (i >= progress_thresh) { + progress_thresh += progress_interval; + print_time = true; + } + if (elapsed_time - last_progress_time > 30) { + last_progress_time = elapsed_time; + print_time = true; + } + if (print_time) { + LOGINFO("Progress: iterations completed ({:.2f}%)- Elapsed time: {:.0f} seconds- " + "populated entries: {} ({:.2f}%)", + progress_percent, elapsed_time, m_shadow_map.size(), + m_shadow_map.size() * 100.0 / preload_size); + } + } + } + { + std::unique_lock lg(m_test_done_mtx); + if (--test_count == 0) { m_test_done_cv.notify_one(); } + } + }); } { @@ -419,8 +448,9 @@ struct BtreeTestHelper { } if (print_time) { LOGINFO("Progress: iterations completed ({:.2f}%)- Elapsed time: {:.0f} seconds of total " - "{} - total entries: {}", - progress_percent, elapsed_time, m_run_time, m_shadow_map.size()); + "{} ({:.2f}%) - total entries: {} ({:.2f}%)", + progress_percent, elapsed_time, m_run_time, elapsed_time * 100.0 / m_run_time, + m_shadow_map.size(), m_shadow_map.size() * 100.0 / m_max_range_input); } } } From dcaad8f87ac84d37419dde4f9645783e87903561 Mon Sep 17 00:00:00 2001 From: Yaming Kuang <1477567+yamingk@users.noreply.github.com> Date: Thu, 29 Feb 2024 11:26:23 -0800 Subject: [PATCH 4/4] issue 329: RafReplDev Metrics and Restart Leader Test (#337) * issue 329: RafReplDev Metrics and Restart Leader Test --- conanfile.py | 2 +- .../replication/repl_dev/raft_repl_dev.cpp | 19 ++++- src/lib/replication/repl_dev/raft_repl_dev.h | 24 +++++++ src/tests/test_common/hs_repl_test_common.hpp | 3 +- src/tests/test_raft_repl_dev.cpp | 72 +++++++++++++++++-- 5 files changed, 111 insertions(+), 9 deletions(-) diff --git a/conanfile.py b/conanfile.py index 91946e9eb..de3a12195 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.1.9" + version = "5.1.10" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 30f98570c..abddc8bdf 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -28,7 +28,8 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk m_group_id{rd_sb->group_id}, m_my_repl_id{svc.get_my_repl_uuid()}, m_raft_server_id{nuraft_mesg::to_server_id(m_my_repl_id)}, - m_rd_sb{std::move(rd_sb)} { + m_rd_sb{std::move(rd_sb)}, + m_metrics{fmt::format("{}_{}", group_id_str(), m_raft_server_id).c_str()} { m_state_machine = std::make_shared< RaftStateMachine >(*this); if (load_existing) { @@ -216,7 +217,11 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ sgs_vec.push_back(sgs); async_read(local_blkid, sgs, total_size).thenValue([this, &ctx](auto&& err) { - RD_REL_ASSERT(!err, "Error in reading data"); // TODO: Find a way to return error to the Listener + if (err) { + COUNTER_INCREMENT(m_metrics, read_err_cnt, 1); + RD_REL_ASSERT(false, "Error in reading data"); // TODO: Find a way to return error to the Listener + } + { std::unique_lock< std::mutex > lk{ctx->mtx}; --(ctx->outstanding_read_cnt); @@ -337,6 +342,7 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d .async_write(r_cast< const char* >(data), push_req->data_size(), rreq->local_blkid) .thenValue([this, rreq](auto&& err) { if (err) { + COUNTER_INCREMENT(m_metrics, write_err_cnt, 1); RD_DBG_ASSERT(false, "Error in writing data"); handle_error(rreq, ReplServiceError::DRIVE_WRITE_ERROR); } else { @@ -460,7 +466,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { if (rreqs.size() == 0) { return; } - std::vector< ::flatbuffers::Offset< RequestEntry > > entries; + std::vector<::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs.size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); @@ -487,6 +493,9 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { builder->FinishSizePrefixed( CreateFetchData(*builder, CreateFetchDataRequest(*builder, builder->CreateVector(entries)))); + COUNTER_INCREMENT(m_metrics, fetch_rreq_cnt, 1); + COUNTER_INCREMENT(m_metrics, fetch_total_entries_cnt, rreqs.size()); + // leader can change, on the receiving side, we need to check if the leader is still the one who originated the // blkid; group_msg_service() @@ -503,6 +512,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { "Not able to fetching data from originator={}, error={}, probably originator is down. Will " "retry when new leader start appending log entries", rreqs.front()->remote_blkid.server_id, e.error()); + COUNTER_INCREMENT(m_metrics, fetch_err_cnt, 1); for (auto const& rreq : rreqs) { handle_error(rreq, RaftReplService::to_repl_error(e.error())); } @@ -512,6 +522,8 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { auto raw_data = e.value().response_blob().cbytes(); auto total_size = e.value().response_blob().size(); + COUNTER_INCREMENT(m_metrics, fetch_total_blk_size, total_size); + RD_DBG_ASSERT_GT(total_size, 0, "Empty response from remote"); RD_DBG_ASSERT(raw_data, "Empty response from remote"); @@ -591,6 +603,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) { for (auto const& err_c : vf) { if (sisl_unlikely(err_c.value())) { auto ec = err_c.value(); + COUNTER_INCREMENT(m_metrics, write_err_cnt, 1); RD_LOG(ERROR, "Error in writing data: {}", ec.value()); // TODO: actually will never arrive here as iomgr will assert (should not assert but // to raise alert and leave the raft group); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 5c5f80627..e7e56c1ef 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -29,6 +29,28 @@ struct raft_repl_dev_superblk : public repl_dev_superblk { using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; +class RaftReplDevMetrics : public sisl::MetricsGroup { +public: + explicit RaftReplDevMetrics(const char* inst_name) : sisl::MetricsGroup("RaftReplDev", inst_name) { + REGISTER_COUNTER(read_err_cnt, "total read error count", "read_err_cnt", {"op", "read"}); + REGISTER_COUNTER(write_err_cnt, "total write error count", "write_err_cnt", {"op", "write"}); + REGISTER_COUNTER(fetch_err_cnt, "total fetch data error count", "fetch_err_cnt", {"op", "fetch"}); + + REGISTER_COUNTER(fetch_rreq_cnt, "total fetch data count", "fetch_data_req_cnt", {"op", "fetch"}); + REGISTER_COUNTER(fetch_total_blk_size, "total fetch data blocks size", "fetch_total_blk_size", {"op", "fetch"}); + REGISTER_COUNTER(fetch_total_entries_cnt, "total fetch total entries count", "fetch_total_entries_cnt", + {"op", "fetch"}); + + register_me_to_farm(); + } + + RaftReplDevMetrics(const RaftReplDevMetrics&) = delete; + RaftReplDevMetrics(RaftReplDevMetrics&&) noexcept = delete; + RaftReplDevMetrics& operator=(const RaftReplDevMetrics&) = delete; + RaftReplDevMetrics& operator=(RaftReplDevMetrics&&) noexcept = delete; + ~RaftReplDevMetrics() { deregister_me_from_farm(); } +}; + class RaftReplService; class CP; class RaftReplDev : public ReplDev, @@ -62,6 +84,8 @@ class RaftReplDev : public ReplDev, iomgr::null_timer_handle}; // non-recurring timer doesn't need to be cancelled on shutdown; bool m_resync_mode{false}; + RaftReplDevMetrics m_metrics; + static std::atomic< uint64_t > s_next_group_ordinal; public: diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index fbdcd6861..3084d0a16 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -83,6 +83,7 @@ class HSReplTestHelper { } else { cv_.wait(lg, [this, new_phase]() { return (phase_ == new_phase); }); } + count = 0; } }; @@ -182,7 +183,7 @@ class HSReplTestHelper { void teardown() { LOGINFO("Stopping Homestore replica={}", replica_num_); - sisl::GrpcAsyncClientWorker::shutdown_all(); + // sisl::GrpcAsyncClientWorker::shutdown_all(); test_common::HSTestHelper::shutdown_homestore(); } diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 3e8e1ae7e..22384917f 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -105,6 +105,9 @@ class TestReplicatedDB : public homestore::ReplDevListener { void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, cintrusive< repl_req_ctx >& ctx) override { + + m_num_commits.fetch_add(1, std::memory_order_relaxed); + ASSERT_EQ(header.size(), sizeof(test_req::journal_header)); auto jheader = r_cast< test_req::journal_header const* >(header.cbytes()); @@ -199,6 +202,8 @@ class TestReplicatedDB : public homestore::ReplDevListener { g_helper->runner().execute().get(); } + uint64_t db_num_writes() const { return m_num_commits.load(std::memory_order_relaxed); } + uint64_t db_size() const { std::shared_lock lk(db_mtx_); return inmem_db_.size(); @@ -207,6 +212,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { private: std::map< Key, Value > inmem_db_; std::shared_mutex db_mtx_; + std::atomic< uint64_t > m_num_commits; }; class RaftReplDevTest : public testing::Test { @@ -228,7 +234,7 @@ class RaftReplDevTest : public testing::Test { while (true) { uint64_t total_writes{0}; for (auto const& db : dbs_) { - total_writes += db->db_size(); + total_writes += db->db_num_writes(); } if (total_writes >= exp_writes) { break; } @@ -373,7 +379,7 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync) { if (g_helper->replica_num() == 1) { LOGINFO("Restart homestore: replica_num = 1"); g_helper->restart(10 /* shutdown_delay_sec */); - g_helper->sync_for_test_start(); + // g_helper->sync_for_test_start(); } exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >(); @@ -401,7 +407,6 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync) { this->validate_all_data(); g_helper->sync_for_cleanup_start(); } - // // staging the fetch remote data with flip point; // @@ -432,7 +437,7 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync_with_staging) { if (g_helper->replica_num() == 1) { LOGINFO("Restart homestore: replica_num = 1"); g_helper->restart(10 /* shutdown_delay_sec */); - g_helper->sync_for_test_start(); + // g_helper->sync_for_test_start(); } exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >(); @@ -461,6 +466,65 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync_with_staging) { g_helper->sync_for_cleanup_start(); } +// do some io before restart; +TEST_F(RaftReplDevTest, All_restart_leader) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + + // step-0: do some IO before restart one member; + uint64_t exp_entries = 20; + if (g_helper->replica_num() == 0) { + g_helper->runner().set_num_tasks(exp_entries); + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size]() { + this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */); + }); + g_helper->runner().execute().get(); + } + + // step-1: wait for all writes to be completed + this->wait_for_all_writes(exp_entries); + + // step-2: restart leader replica + if (g_helper->replica_num() == 0) { + LOGINFO("Restart homestore: replica_num = 0"); + g_helper->restart(10 /* shutdown_delay_sec */); + // g_helper->sync_for_test_start(); + } + + exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >(); + // step-3: on leader, wait for a while for replica-1 to finish shutdown so that it can be removed from raft-groups + // and following I/O issued by leader won't be pushed to relica-1; + if (g_helper->replica_num() == 1) { + LOGINFO("Wait for grpc connection to replica-0 to be removed from raft-groups, and wait for awhile before " + "sending new I/O."); + std::this_thread::sleep_for(std::chrono::seconds{5}); + + LOGINFO("Switch the leader to replica_num = 1"); + switch_all_db_leader(); + + g_helper->runner().set_num_tasks(SISL_OPTIONS["num_io"].as< uint64_t >()); + + // before replica-1 started, issue I/O so that replica-1 is lagging behind; + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size]() { + this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */); + }); + g_helper->runner().execute().get(); + } + + this->wait_for_all_writes(exp_entries); + + if (g_helper->replica_num() != 0) { std::this_thread::sleep_for(std::chrono::seconds{10}); } + + g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_all_data(); + g_helper->sync_for_cleanup_start(); +} + // TODO // double restart: // 1. restart one follower(F1) while I/O keep running.