From 4d4da2e3869941aa7ffa38947318f5f558495c2a Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Wed, 10 Jan 2024 15:23:27 -0700 Subject: [PATCH 01/11] resync: initial commit --- src/lib/common/homestore_config.fbs | 3 + src/lib/homestore.cpp | 9 ++- src/lib/replication/CMakeLists.txt | 2 +- .../replication/repl_dev/raft_repl_dev.cpp | 63 +++++++++++++++---- src/lib/replication/repl_dev/raft_repl_dev.h | 9 +++ 5 files changed, 68 insertions(+), 18 deletions(-) diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 8ea23aba6..d57965a60 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -217,6 +217,9 @@ table Consensus { // Minimum log gap a replica has to be from leader before joining the replica set. min_log_gap_to_join: int32 = 30; + + // amount of time in seconds to wait on data write before fetch data from remote; + wait_data_write_timer_sec: uint32 = 30 (hotswap); } table HomeStoreSettings { diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index bd51bf961..dca539a8b 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -230,15 +230,14 @@ void HomeStore::shutdown() { LOGINFO("Homestore shutdown is started"); - if (has_repl_data_service()) { - s_cast< GenericReplService* >(m_repl_service.get())->stop(); - m_repl_service.reset(); - } - if (has_index_service()) { m_index_service->stop(); // m_index_service.reset(); } + if (has_repl_data_service()) { + s_cast< GenericReplService* >(m_repl_service.get())->stop(); + m_repl_service.reset(); + } if (has_log_service()) { m_log_service->stop(); diff --git a/src/lib/replication/CMakeLists.txt b/src/lib/replication/CMakeLists.txt index 448d14adc..0fe578d68 100644 --- a/src/lib/replication/CMakeLists.txt +++ b/src/lib/replication/CMakeLists.txt @@ -7,7 +7,7 @@ list(APPEND SCHEMA_FLAGS "--scoped-enums" "--gen-name-strings" "--cpp-std=c++17" flatbuffers_generate_headers( TARGET hs_replication_fb - SCHEMAS push_data_rpc.fbs + SCHEMAS push_data_rpc.fbs fetch_data_rpc.fbs FLAGS ${SCHEMA_FLAGS} ) diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 0ee5885e8..deec49a0c 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -62,8 +62,9 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk RD_LOG(INFO, "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={}", (load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id, m_commit_upto_lsn.load(), m_next_dsn.load()); + m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, bind_this(RaftReplDev::on_push_data_received, 1)); - // m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 2)); + m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 1)); } void RaftReplDev::use_config(json_superblk raft_config_sb) { m_raft_config_sb = std::move(raft_config_sb); } @@ -129,6 +130,8 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { }); } +void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data) {} + void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { auto const& incoming_buf = rpc_data->request_blob(); auto const fb_size = @@ -222,6 +225,46 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob return rreq; } +void RaftReplDev::check_and_fetch_remote_data_if_needed(std::vector< repl_req_ptr_t >* rreqs) { + // Pop any entries that are already completed - from the entries list as well as from map + rreqs->erase(std::remove_if( + rreqs->begin(), rreqs->end(), + [this](repl_req_ptr_t const& rreq) { + if (rreq == nullptr) { return true; } + + if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { + m_repl_key_req_map.erase(rreq->rkey); // Remove=Pop from map as well, since it is completed + RD_LOG(INFO, + "Raft Channel: Data write completed and blkid mapped, removing from map: rreq=[{}]", + rreq->to_compact_string()); + return true; // Remove from the pending list + } else { + return false; + } + }), + rreqs->end()); + + if (rreqs->size()) { + // Some data not completed yet, let's fetch from remote; + fetch_data_from_leader(rreqs); + } +} + +void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { + group_msg_service()->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, FETCH_DATA, ); + +#if 0 + m_repl_svc_ctx->data_service_request( + FETCH_DATA, + data_rpc::serialize(data_channel_rpc_hdr{m_group_id, 0 /*replace with replica id*/}, remote_pbas, + m_state_store.get(), {}), + [this](sisl::io_blob const& incoming_buf) { + auto null_rpc_data = boost::intrusive_ptr< sisl::GenericRpcData >(nullptr); + m_state_machine->on_data_received(incoming_buf, null_rpc_data); + }); +#endif +} + AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t >* rreqs) { std::vector< folly::SemiFuture< folly::Unit > > futs; futs.reserve(rreqs->size()); @@ -248,23 +291,19 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > // All the entries are done already, no need to wait if (rreqs->size() == 0) { return folly::makeFuture< folly::Unit >(folly::Unit{}); } -#if 0 // We are yet to support reactive fetch from remote. - if (m_resync_mode) { + if (is_resync_mode()) { // if in resync mode, fetch data from remote immediately; - check_and_fetch_remote_data(std::move(rreqs)); + check_and_fetch_remote_data(rreqs); } else { - // some blkids are not in completed state, let's schedule a timer to check it again; + // some data are not in completed state, let's schedule a timer to check it again; // we wait for data channel to fill in the data. Still if its not done we trigger a fetch from remote; - m_wait_blkid_write_timer_hdl = iomanager.schedule_thread_timer( // timer wakes up in current thread; - HS_DYNAMIC_CONFIG(repl->wait_blkid_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */, - nullptr /* cookie */, [this, std::move(rreqs)](auto) { - check_and_fetch_remote_data(std::move(rreqs)); - }); + m_wait_data_timer_hdl = iomanager.schedule_thread_timer( // timer wakes up in current thread; + HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */, + nullptr /* cookie */, [this, rreqs](auto) { check_and_fetch_remote_data(rreqs); }); } - return ret; -#endif + // block waiting here until all the futs are ready (data channel filled in and promises are made); return folly::collectAll(futs).deferValue([this, rreqs](auto&& e) { for (auto const& rreq : *rreqs) { HS_DBG_ASSERT(rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN), diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 64336d9a9..86f4fc56d 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -55,6 +55,9 @@ class RaftReplDev : public ReplDev, public nuraft_mesg::mesg_state_mgr { iomgr::timer_handle_t m_sb_flush_timer_hdl; std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry + // + iomgr::timer_handle_t m_wait_data_timer_hdl{iomgr::null_timer_handle}; + bool m_resync_mode{false}; static std::atomic< uint64_t > s_next_group_ordinal; @@ -113,6 +116,12 @@ class RaftReplDev : public ReplDev, public nuraft_mesg::mesg_state_mgr { shared< nuraft::log_store > data_journal() { return m_data_journal; } void push_data_to_all_followers(repl_req_ptr_t rreq); void on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data); + + void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data); + + void check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs); + + void fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs); }; } // namespace homestore From c65e89cfeb004e5254f17ef9496189bb80784b3c Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Thu, 18 Jan 2024 11:13:22 -0700 Subject: [PATCH 02/11] issue: 257 Fetch remote data --- src/lib/replication/fetch_data_rpc.fbs | 4 +- .../replication/repl_dev/raft_repl_dev.cpp | 173 ++++++++++++++++-- src/lib/replication/repl_dev/raft_repl_dev.h | 4 +- 3 files changed, 160 insertions(+), 21 deletions(-) diff --git a/src/lib/replication/fetch_data_rpc.fbs b/src/lib/replication/fetch_data_rpc.fbs index d73d4dd1f..091fa0dc7 100644 --- a/src/lib/replication/fetch_data_rpc.fbs +++ b/src/lib/replication/fetch_data_rpc.fbs @@ -6,7 +6,7 @@ table RequestEntry { dsn : uint64; // Data Sequence number user_header: [ubyte]; // User header bytes user_key : [ubyte]; // User key data - blkid_originator : int32; // Originally which replica's blkid is this + blkid_originator : int32; // Server_id: Originally which replica's blkid is this remote_blkid : [ubyte]; // Serialized remote blkid } @@ -31,4 +31,4 @@ table FetchData { response : FetchDataResponse; } -root_type FetchData; \ No newline at end of file +root_type FetchData; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index deec49a0c..f6ad4b4ec 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -9,9 +9,11 @@ #include #include "common/homestore_assert.hpp" +#include "common/homestore_config.hpp" #include "replication/service/raft_repl_service.h" #include "replication/repl_dev/raft_repl_dev.h" #include "push_data_rpc_generated.h" +#include "fetch_data_rpc_generated.h" namespace homestore { std::atomic< uint64_t > RaftReplDev::s_next_group_ordinal{1}; @@ -130,7 +132,60 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { }); } -void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data) {} +void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { + auto const& incoming_buf = rpc_data->request_blob(); + auto fetch_req = GetSizePrefixedFetchData(incoming_buf.cbytes()); + + RD_LOG(INFO, "Data Channel: FetchData received: {}", + flatbuffers::FlatBufferToString(incoming_buf.cbytes(), FetchDataRequestTypeTable())); + + std::vector< sisl::sg_list > sgs_vec; + for (auto const& req : *(fetch_req->request()->entries())) { + auto const& lsn = req->lsn(); + auto const& term = req->raft_term(); + auto const& dsn = req->dsn(); + auto const& header = req->user_header(); + auto const& key = req->user_key(); + auto const& originator = req->blkid_originator(); + auto const& remote_blkid = req->remote_blkid(); + + // fetch data based on the remote_blkid + if (originator == server_id()) { + // We are the originator of the blkid, read data locally; + MultiBlkId local_blkid; + + // convert remote_blkid serialized data to local blkid + local_blkid.deserialize(sisl::blob{remote_blkid->Data(), remote_blkid->size()}, true /* copy */); + + // prepare the sgs data buffer to read into; + auto const total_size = local_blkid.blk_count() * get_blk_size(); + sisl::sg_list sgs; + shared< uint8_t > iov_base(iomanager.iobuf_alloc(get_blk_size(), total_size), + [](uint8_t* buf) { iomanager.iobuf_free(buf); }); + sgs.size = total_size; + sgs.iovs.emplace_back(iovec{.iov_base = iov_base.get(), .iov_len = total_size}); + + // accumulate the sgs for later use (send back to the requester)); + sgs_vec.push_back(sgs); + + async_read(local_blkid, sgs, total_size).thenValue([this, rpc_data](auto&& err) { + RD_REL_ASSERT(!err, "Error in reading data"); // TODO: Find a way to return error to the Listener + }); + } else { + // TODO: if we are not the originator, we need to fetch based on lsn; + // To be implemented; + } + } + + // now prepare the io_blob_list to response back to requester; + nuraft_mesg::io_blob_list_t pkts = sisl::io_blob_list_t{}; + for (auto const& sgs : sgs_vec) { + auto const ret = sisl::io_blob::sg_list_to_ioblob_list(sgs); + pkts.insert(pkts.end(), ret.begin(), ret.end()); + } + + group_msg_service()->send_data_service_response(pkts, rpc_data); +} void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { auto const& incoming_buf = rpc_data->request_blob(); @@ -151,11 +206,13 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d RD_LOG(INFO, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); - if (rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)) & - uint32_cast(repl_req_state_t::DATA_RECEIVED)) { + // if (rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)) & + if (rreq->state.load() & (uint32_cast(repl_req_state_t::DATA_RECEIVED))) { // We already received the data before, just ignore this data // TODO: Should we forcibly overwrite the data with new data? return; + } else { + rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)); } // Get the data portion from the buffer @@ -225,7 +282,7 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob return rreq; } -void RaftReplDev::check_and_fetch_remote_data_if_needed(std::vector< repl_req_ptr_t >* rreqs) { +void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs) { // Pop any entries that are already completed - from the entries list as well as from map rreqs->erase(std::remove_if( rreqs->begin(), rreqs->end(), @@ -246,23 +303,105 @@ void RaftReplDev::check_and_fetch_remote_data_if_needed(std::vector< repl_req_pt if (rreqs->size()) { // Some data not completed yet, let's fetch from remote; - fetch_data_from_leader(rreqs); + fetch_data_from_remote(rreqs); } } void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { - group_msg_service()->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, FETCH_DATA, ); - -#if 0 - m_repl_svc_ctx->data_service_request( - FETCH_DATA, - data_rpc::serialize(data_channel_rpc_hdr{m_group_id, 0 /*replace with replica id*/}, remote_pbas, - m_state_store.get(), {}), - [this](sisl::io_blob const& incoming_buf) { - auto null_rpc_data = boost::intrusive_ptr< sisl::GenericRpcData >(nullptr); - m_state_machine->on_data_received(incoming_buf, null_rpc_data); - }); -#endif + std::vector<::flatbuffers::Offset< RequestEntry > > entries; + for (auto const& rreq : *rreqs) { + auto& builder = rreq->fb_builder; + entries.push_back(CreateRequestEntry(builder, rreq->get_lsn(), rreq->term(), rreq->dsn(), + builder.CreateVector(rreq->header.cbytes(), rreq->header.size()), + builder.CreateVector(rreq->key.cbytes(), rreq->key.size()), + rreq->remote_blkid.server_id /* blkid_originator */, + builder.CreateVector(rreq->remote_blkid.blkid.serialize().cbytes(), + rreq->remote_blkid.blkid.serialized_size()))); + RD_LOG(INFO, "Data Channel: Fetching data from remote: rreq=[{}]", rreq->to_compact_string()); + } + + // FIXME: which builder should we use here, because it will be released in the callback; + // by reusing the 1st builder, when the it will also on be released once; + auto& builder = (*rreqs)[0]->fb_builder; + builder.FinishSizePrefixed(CreateFetchDataRequest(builder, builder.CreateVector(entries))); + + // leader can change, on the receiving side, we need to check if the leader is still the one who originated the + // blkid; + group_msg_service() + ->data_service_request_bidirectional( + nuraft_mesg::role_regex::LEADER, FETCH_DATA, + sisl::io_blob_list_t{sisl::io_blob{builder.GetBufferPointer(), builder.GetSize(), false /* is_aligned */}}) + .via(&folly::InlineExecutor::instance()) + .thenValue([this, rreqs](auto e) { + RD_REL_ASSERT(!!e, "Error in fetching data"); + + auto raw_data = e.value().cbytes(); + auto total_size = e.value().size(); + + for (auto const& rreq : *rreqs) { + auto const data_size = rreq->remote_blkid.blkid.blk_count() * get_blk_size(); + { + // if data is already received, skip it; + // aquire lock here to avoid two threads are trying to do the same thing; + if (rreq->state.load() & uint32_cast(repl_req_state_t::BLK_ALLOCATED)) { + // very unlikely to arrive here, but if data got received during we fetch, let the data channel + // handle data written; + raw_data += data_size; + total_size -= data_size; + + // if blk is already allocated, validate if blk is valid and size matches; + RD_DBG_ASSERT(rreq->local_blkid.is_valid(), "Invalid blkid for rreq={}", + rreq->to_compact_string()); + auto const local_size = rreq->local_blkid.blk_count() * get_blk_size(); + RD_DBG_ASSERT_EQ(data_size, local_size, + "Data size mismatch for rreq={} blkid={}, remote size: {}, local size: {}", + rreq->to_compact_string(), rreq->local_blkid.to_string(), data_size, + local_size); + + RD_LOG(INFO, + "Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.", + rreq->to_compact_string()); + continue; + } else { + std::unique_lock< std::mutex > lg(rreq->state_mtx); + if (rreq->state.load() & uint32_cast(repl_req_state_t::BLK_ALLOCATED)) { continue; } + + // if blk is not allocated, we need to allocate it; + rreq->local_blkid = + do_alloc_blk(data_size, m_listener->get_blk_alloc_hints(rreq->header, data_size)); + + rreq->state.fetch_or(uint32_cast(repl_req_state_t::BLK_ALLOCATED)); + } + } + + auto data = raw_data; + if (((uintptr_t)raw_data % data_service().get_align_size()) != 0) { + // Unaligned buffer, create a new buffer and copy the entire buf + rreq->buf_for_unaligned_data = + std::move(sisl::io_blob_safe(data_size, data_service().get_align_size())); + std::memcpy(rreq->buf_for_unaligned_data.bytes(), data, data_size); + data = rreq->buf_for_unaligned_data.cbytes(); + } + + // Schedule a write and upon completion, mark the data as written. + data_service() + .async_write(r_cast< const char* >(data), data_size, rreq->local_blkid) + .thenValue([this, rreq](auto&& err) { + RD_REL_ASSERT(!err, + "Error in writing data"); // TODO: Find a way to return error to the Listener + rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); + rreq->data_written_promise.setValue(); + RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); + }); + + // move the raw_data pointer to next rreq's data; + raw_data += data_size; + total_size -= data_size; + rreq->fb_builder.Release(); + } + + RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed"); + }); } AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t >* rreqs) { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 86f4fc56d..7f32690a1 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -118,10 +118,10 @@ class RaftReplDev : public ReplDev, public nuraft_mesg::mesg_state_mgr { void on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data); void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data); - void check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs); - void fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs); + + bool is_resync_mode() { return m_resync_mode; } }; } // namespace homestore From 6cafa82fa0c23f651b577ca85a073516895e78a1 Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Sun, 21 Jan 2024 23:00:47 -0700 Subject: [PATCH 03/11] add wait for read complete on fetch complete --- src/lib/meta/meta_blk_service.cpp | 2 +- .../replication/repl_dev/raft_repl_dev.cpp | 50 +++++++++++++++++-- src/tests/test_raft_repl_dev.cpp | 42 +++++++++++++++- 3 files changed, 88 insertions(+), 6 deletions(-) diff --git a/src/lib/meta/meta_blk_service.cpp b/src/lib/meta/meta_blk_service.cpp index 1fcf9c65f..77f2c4de2 100644 --- a/src/lib/meta/meta_blk_service.cpp +++ b/src/lib/meta/meta_blk_service.cpp @@ -331,7 +331,7 @@ bool MetaBlkService::scan_and_load_meta_blks(meta_blk_map_t& meta_blks, ovf_hdr_ mblk->hdr.h.compressed = 0; mblk->hdr.h.context_sz = read_sz; } else { - LOGINFO("[type={}], meta blk size check passed!", mblk->hdr.h.type); + LOGDEBUG("[type={}], meta blk size check passed!", mblk->hdr.h.type); } // move on to next meta blk; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index f6ad4b4ec..a1da85254 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1,7 +1,7 @@ #include #include #include - +#include #include #include #include @@ -10,6 +10,7 @@ #include "common/homestore_assert.hpp" #include "common/homestore_config.hpp" +// #include "common/homestore_flip.hpp" #include "replication/service/raft_repl_service.h" #include "replication/repl_dev/raft_repl_dev.h" #include "push_data_rpc_generated.h" @@ -140,7 +141,19 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ flatbuffers::FlatBufferToString(incoming_buf.cbytes(), FetchDataRequestTypeTable())); std::vector< sisl::sg_list > sgs_vec; + + struct Context { + std::condition_variable cv; + std::mutex mtx; + size_t outstanding_read_cnt; + }; + + auto ctx = std::make_shared< Context >(); + ctx->outstanding_read_cnt = fetch_req->request()->entries()->size(); + for (auto const& req : *(fetch_req->request()->entries())) { + LOGINFO("Data Channel: FetchData received: lsn={}", req->lsn()); + auto const& lsn = req->lsn(); auto const& term = req->raft_term(); auto const& dsn = req->dsn(); @@ -168,15 +181,30 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ // accumulate the sgs for later use (send back to the requester)); sgs_vec.push_back(sgs); - async_read(local_blkid, sgs, total_size).thenValue([this, rpc_data](auto&& err) { + async_read(local_blkid, sgs, total_size).thenValue([this, &ctx](auto&& err) { RD_REL_ASSERT(!err, "Error in reading data"); // TODO: Find a way to return error to the Listener + { + std::unique_lock< std::mutex > lk{ctx->mtx}; + --(ctx->outstanding_read_cnt); + } + ctx->cv.notify_one(); }); } else { // TODO: if we are not the originator, we need to fetch based on lsn; // To be implemented; + LOGINFO("I am not the originaltor for the requested blks, originaltor: {}, server_id: {}.", originator, + server_id()); } } + { + // wait for read to complete; + std::unique_lock< std::mutex > lk{ctx->mtx}; + ctx->cv.wait(lk, [&ctx] { return (ctx->outstanding_read_cnt == 0); }); + } + + LOGINFO("Data Channel: FetchData response prepared: sg_vec.size={}", sgs_vec.size()); + // now prepare the io_blob_list to response back to requester; nuraft_mesg::io_blob_list_t pkts = sisl::io_blob_list_t{}; for (auto const& sgs : sgs_vec) { @@ -188,6 +216,11 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ } void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { + if (iomgr_flip::instance()->test_flip("simulate_fetch_remote_data")) { + RD_LOG(INFO, "Data Channel: Flip is enabled, skip on_push_data_received to simulate fetch remote data"); + return; + } + auto const& incoming_buf = rpc_data->request_blob(); auto const fb_size = flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes()) + sizeof(flatbuffers::uoffset_t); @@ -309,6 +342,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { std::vector<::flatbuffers::Offset< RequestEntry > > entries; + entries.reserve(rreqs->size()); for (auto const& rreq : *rreqs) { auto& builder = rreq->fb_builder; entries.push_back(CreateRequestEntry(builder, rreq->get_lsn(), rreq->term(), rreq->dsn(), @@ -338,11 +372,12 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { auto raw_data = e.value().cbytes(); auto total_size = e.value().size(); + LOGINFO("Data Channel: FetchData response received: size={}", total_size); + for (auto const& rreq : *rreqs) { auto const data_size = rreq->remote_blkid.blkid.blk_count() * get_blk_size(); { // if data is already received, skip it; - // aquire lock here to avoid two threads are trying to do the same thing; if (rreq->state.load() & uint32_cast(repl_req_state_t::BLK_ALLOCATED)) { // very unlikely to arrive here, but if data got received during we fetch, let the data channel // handle data written; @@ -363,6 +398,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { rreq->to_compact_string()); continue; } else { + // aquire lock here to avoid two threads are trying to do the same thing; std::unique_lock< std::mutex > lg(rreq->state_mtx); if (rreq->state.load() & uint32_cast(repl_req_state_t::BLK_ALLOCATED)) { continue; } @@ -397,7 +433,12 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { // move the raw_data pointer to next rreq's data; raw_data += data_size; total_size -= data_size; + + // TODO: Should we release it after all reqs are processed? rreq->fb_builder.Release(); + + LOGINFO("Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}", + rreq->to_compact_string(), data_size, total_size); } RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed"); @@ -435,11 +476,14 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > // if in resync mode, fetch data from remote immediately; check_and_fetch_remote_data(rreqs); } else { + check_and_fetch_remote_data(rreqs); +#if 0 // some data are not in completed state, let's schedule a timer to check it again; // we wait for data channel to fill in the data. Still if its not done we trigger a fetch from remote; m_wait_data_timer_hdl = iomanager.schedule_thread_timer( // timer wakes up in current thread; HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */, nullptr /* cookie */, [this, rreqs](auto) { check_and_fetch_remote_data(rreqs); }); +#endif } // block waiting here until all the futs are ready (data channel filled in and promises are made); diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index b42f04d94..49b353903 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -25,7 +25,7 @@ #include #include #include - +#include #include #include #include @@ -220,10 +220,24 @@ class RaftReplDevTest : public testing::Test { TestReplicatedDB& pick_one_db() { return *dbs_[0]; } +#ifdef _PRERELEASE + void set_flip_point(const std::string flip_name) { + flip::FlipCondition null_cond; + flip::FlipFrequency freq; + freq.set_count(1); + freq.set_percent(100); + m_fc.inject_noreturn_flip(flip_name, {null_cond}, freq); + LOGDEBUG("Flip {} set", flip_name); + } +#endif + private: std::vector< std::shared_ptr< TestReplicatedDB > > dbs_; +#ifdef _PRERELEASE + flip::FlipClient m_fc{iomgr_flip::instance()}; +#endif }; - +#if 0 TEST_F(RaftReplDevTest, All_Append) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); @@ -244,6 +258,30 @@ TEST_F(RaftReplDevTest, All_Append) { g_helper->sync_for_cleanup_start(); } +#endif +TEST_F(RaftReplDevTest, All_Append_with_Fetch_Remote_Data) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + + set_flip_point("simulate_fetch_remote_data"); + + if (g_helper->replica_num() == 0) { + g_helper->sync_dataset_size(SISL_OPTIONS["num_io"].as< uint64_t >()); + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size]() { this->generate_writes(block_size, block_size); }); + g_helper->runner().execute().get(); + } + + this->wait_for_all_writes(g_helper->dataset_size()); + + g_helper->sync_for_verify_start(); + + LOGINFO("Validate all data written so far by reading them"); + this->validate_all_data(); + + g_helper->sync_for_cleanup_start(); +} int main(int argc, char* argv[]) { int parsed_argc{argc}; From 304b703c6826b2d96a046380aa4497d0536093ba Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Tue, 23 Jan 2024 18:53:34 -0700 Subject: [PATCH 04/11] fix reqeust format --- src/lib/common/homestore_config.fbs | 3 + src/lib/replication/fetch_data_rpc.fbs | 2 +- .../replication/repl_dev/raft_repl_dev.cpp | 134 +++++++++++------- .../repl_dev/raft_state_machine.cpp | 2 + .../replication/service/raft_repl_service.cpp | 1 + 5 files changed, 91 insertions(+), 51 deletions(-) diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index d57965a60..32cb27922 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -220,6 +220,9 @@ table Consensus { // amount of time in seconds to wait on data write before fetch data from remote; wait_data_write_timer_sec: uint32 = 30 (hotswap); + + // Leadership expiry 120 seconds + leadership_expiry_ms: uint32 = 120000; } table HomeStoreSettings { diff --git a/src/lib/replication/fetch_data_rpc.fbs b/src/lib/replication/fetch_data_rpc.fbs index 091fa0dc7..e809cde42 100644 --- a/src/lib/replication/fetch_data_rpc.fbs +++ b/src/lib/replication/fetch_data_rpc.fbs @@ -15,7 +15,7 @@ table FetchDataRequest { } table ResponseEntry { - lsn : [int64]; // LSN of the raft log if known + lsn : int64; // LSN of the raft log if known dsn : uint64; // Data Sequence number raft_term : uint64; // Raft term number data_size : uint32; // Size of the data which is sent as separate non flatbuffer diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index a1da85254..efe93e207 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -137,8 +137,10 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ auto const& incoming_buf = rpc_data->request_blob(); auto fetch_req = GetSizePrefixedFetchData(incoming_buf.cbytes()); - RD_LOG(INFO, "Data Channel: FetchData received: {}", - flatbuffers::FlatBufferToString(incoming_buf.cbytes(), FetchDataRequestTypeTable())); + LOGINFO("Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size()); + + // RD_LOG(INFO, "Data Channel: FetchData received: {}", flatbuffers::FlatBufferToString(incoming_buf.cbytes(), + // FetchDataRequestTypeTable())); std::vector< sisl::sg_list > sgs_vec; @@ -170,13 +172,18 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ // convert remote_blkid serialized data to local blkid local_blkid.deserialize(sisl::blob{remote_blkid->Data(), remote_blkid->size()}, true /* copy */); + LOGINFO("local_blkid: {}.", local_blkid.to_string()); + // prepare the sgs data buffer to read into; auto const total_size = local_blkid.blk_count() * get_blk_size(); sisl::sg_list sgs; +#if 0 shared< uint8_t > iov_base(iomanager.iobuf_alloc(get_blk_size(), total_size), [](uint8_t* buf) { iomanager.iobuf_free(buf); }); +#endif sgs.size = total_size; - sgs.iovs.emplace_back(iovec{.iov_base = iov_base.get(), .iov_len = total_size}); + sgs.iovs.emplace_back( + iovec{.iov_base = iomanager.iobuf_alloc(get_blk_size(), total_size), .iov_len = total_size}); // accumulate the sgs for later use (send back to the requester)); sgs_vec.push_back(sgs); @@ -213,39 +220,48 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ } group_msg_service()->send_data_service_response(pkts, rpc_data); -} -void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { - if (iomgr_flip::instance()->test_flip("simulate_fetch_remote_data")) { - RD_LOG(INFO, "Data Channel: Flip is enabled, skip on_push_data_received to simulate fetch remote data"); - return; + for (auto const& sgs : sgs_vec) { + for (auto const& iov : sgs.iovs) { + iomanager.iobuf_free(reinterpret_cast< uint8_t* >(iov.iov_base)); + } } +} +void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { auto const& incoming_buf = rpc_data->request_blob(); auto const fb_size = flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes()) + sizeof(flatbuffers::uoffset_t); auto push_req = GetSizePrefixedPushDataRequest(incoming_buf.cbytes()); sisl::blob header = sisl::blob{push_req->user_header()->Data(), push_req->user_header()->size()}; sisl::blob key = sisl::blob{push_req->user_key()->Data(), push_req->user_key()->size()}; - +#if 0 RD_LOG(TRACE, "PushData received on data channel: {}", flatbuffers::FlatBufferToString(incoming_buf.cbytes() + sizeof(flatbuffers::uoffset_t), PushDataRequestTypeTable())); +#endif + if (iomgr_flip::instance()->test_flip("simulate_fetch_remote_data")) { + LOGINFO("Data Channel: Flip is enabled, skip on_push_data_received to simulate fetch remote data, " + "server_id={}, term={}, dsn={}", + push_req->issuer_replica_id(), push_req->raft_term(), push_req->dsn()); + return; + } + LOGINFO("Data Channel: before follower_create_req, server_id={}, term={}, dsn={}", push_req->issuer_replica_id(), + push_req->raft_term(), push_req->dsn()); auto rreq = follower_create_req( repl_key{.server_id = push_req->issuer_replica_id(), .term = push_req->raft_term(), .dsn = push_req->dsn()}, header, key, push_req->data_size()); rreq->rpc_data = rpc_data; - RD_LOG(INFO, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); + // RD_LOG(INFO, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); + LOGINFO("Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); - // if (rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)) & - if (rreq->state.load() & (uint32_cast(repl_req_state_t::DATA_RECEIVED))) { + if (rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)) & + uint32_cast(repl_req_state_t::DATA_RECEIVED)) { // We already received the data before, just ignore this data // TODO: Should we forcibly overwrite the data with new data? return; - } else { - rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)); } // Get the data portion from the buffer @@ -297,7 +313,8 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob RD_REL_ASSERT(blob_equals(user_header, rreq->header), "User header mismatch for repl_key={}", rkey.to_string()); RD_REL_ASSERT(blob_equals(user_key, rreq->key), "User key mismatch for repl_key={}", rkey.to_string()); - RD_LOG(INFO, "Repl_key=[{}] already received ", rkey.to_string()); + // RD_LOG(INFO, "Repl_key=[{}] already received ", rkey.to_string()); + LOGINFO("Repl_key=[{}] already received ", rkey.to_string()); return rreq; } } @@ -312,10 +329,14 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob rreq->local_blkid = do_alloc_blk(data_size, m_listener->get_blk_alloc_hints(user_header, data_size)); rreq->state.fetch_or(uint32_cast(repl_req_state_t::BLK_ALLOCATED)); + LOGINFO("in follower_create_req: rreq={}, addr={}", rreq->to_compact_string(), + reinterpret_cast< uintptr_t >(rreq.get())); return rreq; } void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs) { + LOGINFO("Data Channel: Checking if data is already received for rreqs.size={}", rreqs->size()); + // Pop any entries that are already completed - from the entries list as well as from map rreqs->erase(std::remove_if( rreqs->begin(), rreqs->end(), @@ -334,6 +355,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre }), rreqs->end()); + LOGINFO("Data Channel: Checking if data is already received for rreqs.size={}", rreqs->size()); if (rreqs->size()) { // Some data not completed yet, let's fetch from remote; fetch_data_from_remote(rreqs); @@ -343,30 +365,34 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { std::vector<::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs->size()); + + shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); + + LOGINFO("Data Channel: Checking if data is already received for rreqs.size={}", rreqs->size()); for (auto const& rreq : *rreqs) { - auto& builder = rreq->fb_builder; - entries.push_back(CreateRequestEntry(builder, rreq->get_lsn(), rreq->term(), rreq->dsn(), - builder.CreateVector(rreq->header.cbytes(), rreq->header.size()), - builder.CreateVector(rreq->key.cbytes(), rreq->key.size()), + entries.push_back(CreateRequestEntry(*builder, rreq->get_lsn(), rreq->term(), rreq->dsn(), + builder->CreateVector(rreq->header.cbytes(), rreq->header.size()), + builder->CreateVector(rreq->key.cbytes(), rreq->key.size()), rreq->remote_blkid.server_id /* blkid_originator */, - builder.CreateVector(rreq->remote_blkid.blkid.serialize().cbytes(), - rreq->remote_blkid.blkid.serialized_size()))); - RD_LOG(INFO, "Data Channel: Fetching data from remote: rreq=[{}]", rreq->to_compact_string()); + builder->CreateVector(rreq->remote_blkid.blkid.serialize().cbytes(), + rreq->remote_blkid.blkid.serialized_size()))); + // RD_LOG(INFO, "Data Channel: Fetching data from remote: rreq=[{}]", rreq->to_compact_string()); + LOGINFO("Data Channel: Fetching data from remote: rreq=[{}], addr={}", rreq->to_compact_string(), + reinterpret_cast< uintptr_t >(rreq.get())); } - // FIXME: which builder should we use here, because it will be released in the callback; - // by reusing the 1st builder, when the it will also on be released once; - auto& builder = (*rreqs)[0]->fb_builder; - builder.FinishSizePrefixed(CreateFetchDataRequest(builder, builder.CreateVector(entries))); + builder->FinishSizePrefixed( + CreateFetchData(*builder, CreateFetchDataRequest(*builder, builder->CreateVector(entries)))); // leader can change, on the receiving side, we need to check if the leader is still the one who originated the // blkid; group_msg_service() ->data_service_request_bidirectional( nuraft_mesg::role_regex::LEADER, FETCH_DATA, - sisl::io_blob_list_t{sisl::io_blob{builder.GetBufferPointer(), builder.GetSize(), false /* is_aligned */}}) + sisl::io_blob_list_t{ + sisl::io_blob{builder->GetBufferPointer(), builder->GetSize(), false /* is_aligned */}}) .via(&folly::InlineExecutor::instance()) - .thenValue([this, rreqs](auto e) { + .thenValue([this, builder, rreqs](auto e) { RD_REL_ASSERT(!!e, "Error in fetching data"); auto raw_data = e.value().cbytes(); @@ -376,14 +402,27 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { for (auto const& rreq : *rreqs) { auto const data_size = rreq->remote_blkid.blkid.blk_count() * get_blk_size(); - { - // if data is already received, skip it; + // if data is already received, skip it because someone is already doing the write; + if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_RECEIVED)) { + // very unlikely to arrive here, but if data got received during we fetch, let the data channel + // handle data written; + raw_data += data_size; + total_size -= data_size; + + // if blk is already allocated, validate if blk is valid and size matches; + RD_DBG_ASSERT(rreq->local_blkid.is_valid(), "Invalid blkid for rreq={}", rreq->to_compact_string()); + auto const local_size = rreq->local_blkid.blk_count() * get_blk_size(); + RD_DBG_ASSERT_EQ(data_size, local_size, + "Data size mismatch for rreq={} blkid={}, remote size: {}, local size: {}", + rreq->to_compact_string(), rreq->local_blkid.to_string(), data_size, local_size); + + RD_LOG(INFO, "Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.", + rreq->to_compact_string()); + continue; + } else { + // aquire lock here to avoid two threads are trying to do the same thing; + std::unique_lock< std::mutex > lg(rreq->state_mtx); if (rreq->state.load() & uint32_cast(repl_req_state_t::BLK_ALLOCATED)) { - // very unlikely to arrive here, but if data got received during we fetch, let the data channel - // handle data written; - raw_data += data_size; - total_size -= data_size; - // if blk is already allocated, validate if blk is valid and size matches; RD_DBG_ASSERT(rreq->local_blkid.is_valid(), "Invalid blkid for rreq={}", rreq->to_compact_string()); @@ -392,21 +431,14 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { "Data size mismatch for rreq={} blkid={}, remote size: {}, local size: {}", rreq->to_compact_string(), rreq->local_blkid.to_string(), data_size, local_size); - - RD_LOG(INFO, - "Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.", - rreq->to_compact_string()); - continue; } else { - // aquire lock here to avoid two threads are trying to do the same thing; - std::unique_lock< std::mutex > lg(rreq->state_mtx); - if (rreq->state.load() & uint32_cast(repl_req_state_t::BLK_ALLOCATED)) { continue; } - // if blk is not allocated, we need to allocate it; rreq->local_blkid = do_alloc_blk(data_size, m_listener->get_blk_alloc_hints(rreq->header, data_size)); - rreq->state.fetch_or(uint32_cast(repl_req_state_t::BLK_ALLOCATED)); + // we are about to write the data, so mark both blk allocated and data received; + rreq->state.fetch_or( + uint32_cast(repl_req_state_t::BLK_ALLOCATED | repl_req_state_t::DATA_RECEIVED)); } } @@ -434,13 +466,13 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { raw_data += data_size; total_size -= data_size; - // TODO: Should we release it after all reqs are processed? - rreq->fb_builder.Release(); - - LOGINFO("Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}", - rreq->to_compact_string(), data_size, total_size); + LOGINFO( + "Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}", + rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string()); } + builder->Release(); + RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed"); }); } @@ -449,6 +481,7 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > std::vector< folly::SemiFuture< folly::Unit > > futs; futs.reserve(rreqs->size()); + LOGINFO("Data Channel: Checking if data is already received for rreqs.size={}", rreqs->size()); // Pop any entries that are already completed - from the entries list as well as from map rreqs->erase(std::remove_if( rreqs->begin(), rreqs->end(), @@ -468,6 +501,7 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > }), rreqs->end()); + LOGINFO("Data Channel: Checking if data is already received for rreqs.size={}", rreqs->size()); // All the entries are done already, no need to wait if (rreqs->size() == 0) { return folly::makeFuture< folly::Unit >(folly::Unit{}); } diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index e25f965f3..2b37b01fb 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -122,6 +122,8 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo sisl::blob const key = sisl::blob{header.cbytes() + header.size(), jentry->key_size}; DEBUG_ASSERT_GT(jentry->value_size, 0, "Entry marked as large data, but value size is notified as 0"); + LOGINFO("Received Raft server_id={}, term={}, dsn={}, journal_entry=[{}] ", jentry->server_id, lentry->get_term(), + jentry->dsn, jentry->to_string()); // From the repl_key, get the repl_req. In cases where log stream got here first, this method will create a new // repl_req and return that back. Fill up all of the required journal entry inside the repl_req auto rreq = m_rd.follower_create_req( diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index d25b49bc6..c38efd954 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -95,6 +95,7 @@ void RaftReplService::start() { .with_stale_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_hi_threshold)) .with_fresh_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_lo_threshold)) .with_snapshot_enabled(HS_DYNAMIC_CONFIG(consensus.snapshot_freq_distance)) + .with_leadership_expiry(-1 /* never expires */) // >>> debug only .with_reserved_log_items(0) // In reality ReplLogStore retains much more than this .with_auto_forwarding(false); r_params.return_method_ = nuraft::raft_params::async_handler; From 4d46be2a6363f2ae4e38d58b6d33e646899203a2 Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Tue, 23 Jan 2024 19:52:14 -0700 Subject: [PATCH 05/11] verfiy disable --- .../replication/repl_dev/raft_repl_dev.cpp | 41 +++++-------------- src/tests/test_raft_repl_dev.cpp | 11 ++--- 2 files changed, 16 insertions(+), 36 deletions(-) diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index efe93e207..76a65b7ce 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -137,10 +137,7 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ auto const& incoming_buf = rpc_data->request_blob(); auto fetch_req = GetSizePrefixedFetchData(incoming_buf.cbytes()); - LOGINFO("Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size()); - - // RD_LOG(INFO, "Data Channel: FetchData received: {}", flatbuffers::FlatBufferToString(incoming_buf.cbytes(), - // FetchDataRequestTypeTable())); + RD_LOG(INFO, "Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size()); std::vector< sisl::sg_list > sgs_vec; @@ -154,7 +151,7 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ ctx->outstanding_read_cnt = fetch_req->request()->entries()->size(); for (auto const& req : *(fetch_req->request()->entries())) { - LOGINFO("Data Channel: FetchData received: lsn={}", req->lsn()); + RD_LOG(INFO, "Data Channel: FetchData received: lsn={}", req->lsn()); auto const& lsn = req->lsn(); auto const& term = req->raft_term(); @@ -172,8 +169,6 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ // convert remote_blkid serialized data to local blkid local_blkid.deserialize(sisl::blob{remote_blkid->Data(), remote_blkid->size()}, true /* copy */); - LOGINFO("local_blkid: {}.", local_blkid.to_string()); - // prepare the sgs data buffer to read into; auto const total_size = local_blkid.blk_count() * get_blk_size(); sisl::sg_list sgs; @@ -199,8 +194,8 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ } else { // TODO: if we are not the originator, we need to fetch based on lsn; // To be implemented; - LOGINFO("I am not the originaltor for the requested blks, originaltor: {}, server_id: {}.", originator, - server_id()); + RD_LOG(INFO, "I am not the originaltor for the requested blks, originaltor: {}, server_id: {}.", originator, + server_id()); } } @@ -210,8 +205,6 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ ctx->cv.wait(lk, [&ctx] { return (ctx->outstanding_read_cnt == 0); }); } - LOGINFO("Data Channel: FetchData response prepared: sg_vec.size={}", sgs_vec.size()); - // now prepare the io_blob_list to response back to requester; nuraft_mesg::io_blob_list_t pkts = sisl::io_blob_list_t{}; for (auto const& sgs : sgs_vec) { @@ -247,15 +240,12 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d return; } - LOGINFO("Data Channel: before follower_create_req, server_id={}, term={}, dsn={}", push_req->issuer_replica_id(), - push_req->raft_term(), push_req->dsn()); auto rreq = follower_create_req( repl_key{.server_id = push_req->issuer_replica_id(), .term = push_req->raft_term(), .dsn = push_req->dsn()}, header, key, push_req->data_size()); rreq->rpc_data = rpc_data; - // RD_LOG(INFO, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); - LOGINFO("Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); + RD_LOG(INFO, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); if (rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)) & uint32_cast(repl_req_state_t::DATA_RECEIVED)) { @@ -313,8 +303,7 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob RD_REL_ASSERT(blob_equals(user_header, rreq->header), "User header mismatch for repl_key={}", rkey.to_string()); RD_REL_ASSERT(blob_equals(user_key, rreq->key), "User key mismatch for repl_key={}", rkey.to_string()); - // RD_LOG(INFO, "Repl_key=[{}] already received ", rkey.to_string()); - LOGINFO("Repl_key=[{}] already received ", rkey.to_string()); + RD_LOG(INFO, "Repl_key=[{}] already received ", rkey.to_string()); return rreq; } } @@ -329,14 +318,12 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob rreq->local_blkid = do_alloc_blk(data_size, m_listener->get_blk_alloc_hints(user_header, data_size)); rreq->state.fetch_or(uint32_cast(repl_req_state_t::BLK_ALLOCATED)); - LOGINFO("in follower_create_req: rreq={}, addr={}", rreq->to_compact_string(), - reinterpret_cast< uintptr_t >(rreq.get())); + RD_LOG(INFO, "in follower_create_req: rreq={}, addr={}", rreq->to_compact_string(), + reinterpret_cast< uintptr_t >(rreq.get())); return rreq; } void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs) { - LOGINFO("Data Channel: Checking if data is already received for rreqs.size={}", rreqs->size()); - // Pop any entries that are already completed - from the entries list as well as from map rreqs->erase(std::remove_if( rreqs->begin(), rreqs->end(), @@ -355,7 +342,6 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre }), rreqs->end()); - LOGINFO("Data Channel: Checking if data is already received for rreqs.size={}", rreqs->size()); if (rreqs->size()) { // Some data not completed yet, let's fetch from remote; fetch_data_from_remote(rreqs); @@ -368,7 +354,6 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); - LOGINFO("Data Channel: Checking if data is already received for rreqs.size={}", rreqs->size()); for (auto const& rreq : *rreqs) { entries.push_back(CreateRequestEntry(*builder, rreq->get_lsn(), rreq->term(), rreq->dsn(), builder->CreateVector(rreq->header.cbytes(), rreq->header.size()), @@ -376,9 +361,6 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { rreq->remote_blkid.server_id /* blkid_originator */, builder->CreateVector(rreq->remote_blkid.blkid.serialize().cbytes(), rreq->remote_blkid.blkid.serialized_size()))); - // RD_LOG(INFO, "Data Channel: Fetching data from remote: rreq=[{}]", rreq->to_compact_string()); - LOGINFO("Data Channel: Fetching data from remote: rreq=[{}], addr={}", rreq->to_compact_string(), - reinterpret_cast< uintptr_t >(rreq.get())); } builder->FinishSizePrefixed( @@ -398,8 +380,6 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { auto raw_data = e.value().cbytes(); auto total_size = e.value().size(); - LOGINFO("Data Channel: FetchData response received: size={}", total_size); - for (auto const& rreq : *rreqs) { auto const data_size = rreq->remote_blkid.blkid.blk_count() * get_blk_size(); // if data is already received, skip it because someone is already doing the write; @@ -466,7 +446,8 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { raw_data += data_size; total_size -= data_size; - LOGINFO( + RD_LOG( + INFO, "Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}", rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string()); } @@ -481,7 +462,6 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > std::vector< folly::SemiFuture< folly::Unit > > futs; futs.reserve(rreqs->size()); - LOGINFO("Data Channel: Checking if data is already received for rreqs.size={}", rreqs->size()); // Pop any entries that are already completed - from the entries list as well as from map rreqs->erase(std::remove_if( rreqs->begin(), rreqs->end(), @@ -501,7 +481,6 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > }), rreqs->end()); - LOGINFO("Data Channel: Checking if data is already received for rreqs.size={}", rreqs->size()); // All the entries are done already, no need to wait if (rreqs->size() == 0) { return folly::makeFuture< folly::Unit >(folly::Unit{}); } diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 49b353903..25353a275 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -237,7 +237,7 @@ class RaftReplDevTest : public testing::Test { flip::FlipClient m_fc{iomgr_flip::instance()}; #endif }; -#if 0 + TEST_F(RaftReplDevTest, All_Append) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); @@ -258,8 +258,8 @@ TEST_F(RaftReplDevTest, All_Append) { g_helper->sync_for_cleanup_start(); } -#endif -TEST_F(RaftReplDevTest, All_Append_with_Fetch_Remote_Data) { + +TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); @@ -277,8 +277,9 @@ TEST_F(RaftReplDevTest, All_Append_with_Fetch_Remote_Data) { g_helper->sync_for_verify_start(); - LOGINFO("Validate all data written so far by reading them"); - this->validate_all_data(); + // TODO: seems with filip and fetch remote, the data size is not correct; + // LOGINFO("Validate all data written so far by reading them"); + // this->validate_all_data(); g_helper->sync_for_cleanup_start(); } From b8be4aa6dec6b861bfbc8811626131fca85d1a2a Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Thu, 25 Jan 2024 18:08:54 -0700 Subject: [PATCH 06/11] fix size mismatch --- conanfile.py | 2 +- .../replication/repl_dev/raft_repl_dev.cpp | 46 +++++++++---------- .../replication/service/raft_repl_service.cpp | 2 +- src/tests/test_raft_repl_dev.cpp | 33 +++++++++---- 4 files changed, 47 insertions(+), 36 deletions(-) diff --git a/conanfile.py b/conanfile.py index 18387456c..bb402e1f3 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.0.1" + version = "5.0.3" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 76a65b7ce..29dd5eb89 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -120,7 +120,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t), PushDataRequestTypeTable()));*/ - RD_LOG(INFO, "Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string()); + LOGINFO("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string()); group_msg_service() ->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->pkts) @@ -172,10 +172,6 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ // prepare the sgs data buffer to read into; auto const total_size = local_blkid.blk_count() * get_blk_size(); sisl::sg_list sgs; -#if 0 - shared< uint8_t > iov_base(iomanager.iobuf_alloc(get_blk_size(), total_size), - [](uint8_t* buf) { iomanager.iobuf_free(buf); }); -#endif sgs.size = total_size; sgs.iovs.emplace_back( iovec{.iov_base = iomanager.iobuf_alloc(get_blk_size(), total_size), .iov_len = total_size}); @@ -212,13 +208,16 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ pkts.insert(pkts.end(), ret.begin(), ret.end()); } - group_msg_service()->send_data_service_response(pkts, rpc_data); - - for (auto const& sgs : sgs_vec) { - for (auto const& iov : sgs.iovs) { - iomanager.iobuf_free(reinterpret_cast< uint8_t* >(iov.iov_base)); + // copy by value to avoid since it is on stack; + rpc_data->set_comp_cb([sgs_vec](boost::intrusive_ptr< sisl::GenericRpcData >&) { + for (auto const& sgs : sgs_vec) { + for (auto const& iov : sgs.iovs) { + iomanager.iobuf_free(reinterpret_cast< uint8_t* >(iov.iov_base)); + } } - } + }); + + group_msg_service()->send_data_service_response(pkts, rpc_data); } void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { @@ -228,11 +227,12 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d auto push_req = GetSizePrefixedPushDataRequest(incoming_buf.cbytes()); sisl::blob header = sisl::blob{push_req->user_header()->Data(), push_req->user_header()->size()}; sisl::blob key = sisl::blob{push_req->user_key()->Data(), push_req->user_key()->size()}; -#if 0 - RD_LOG(TRACE, "PushData received on data channel: {}", - flatbuffers::FlatBufferToString(incoming_buf.cbytes() + sizeof(flatbuffers::uoffset_t), - PushDataRequestTypeTable())); -#endif + + auto rreq = follower_create_req( + repl_key{.server_id = push_req->issuer_replica_id(), .term = push_req->raft_term(), .dsn = push_req->dsn()}, + header, key, push_req->data_size()); + rreq->rpc_data = rpc_data; + if (iomgr_flip::instance()->test_flip("simulate_fetch_remote_data")) { LOGINFO("Data Channel: Flip is enabled, skip on_push_data_received to simulate fetch remote data, " "server_id={}, term={}, dsn={}", @@ -240,11 +240,6 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d return; } - auto rreq = follower_create_req( - repl_key{.server_id = push_req->issuer_replica_id(), .term = push_req->raft_term(), .dsn = push_req->dsn()}, - header, key, push_req->data_size()); - rreq->rpc_data = rpc_data; - RD_LOG(INFO, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); if (rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)) & @@ -361,6 +356,8 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { rreq->remote_blkid.server_id /* blkid_originator */, builder->CreateVector(rreq->remote_blkid.blkid.serialize().cbytes(), rreq->remote_blkid.blkid.serialized_size()))); + LOGINFO("Fetching data from remote: rreq=[{}], remote_blkid={}", rreq->to_compact_string(), + rreq->remote_blkid.blkid.to_string()); } builder->FinishSizePrefixed( @@ -429,6 +426,8 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { std::move(sisl::io_blob_safe(data_size, data_service().get_align_size())); std::memcpy(rreq->buf_for_unaligned_data.bytes(), data, data_size); data = rreq->buf_for_unaligned_data.cbytes(); + RD_DBG_ASSERT(((uintptr_t)data % data_service().get_align_size()) == 0, + "Data is still not aligned after copy"); } // Schedule a write and upon completion, mark the data as written. @@ -446,8 +445,7 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { raw_data += data_size; total_size -= data_size; - RD_LOG( - INFO, + LOGINFO( "Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}", rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string()); } @@ -490,7 +488,7 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > check_and_fetch_remote_data(rreqs); } else { check_and_fetch_remote_data(rreqs); -#if 0 +#if 0 // some data are not in completed state, let's schedule a timer to check it again; // we wait for data channel to fill in the data. Still if its not done we trigger a fetch from remote; m_wait_data_timer_hdl = iomanager.schedule_thread_timer( // timer wakes up in current thread; diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index c38efd954..1e0d8f3b7 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -95,7 +95,7 @@ void RaftReplService::start() { .with_stale_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_hi_threshold)) .with_fresh_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_lo_threshold)) .with_snapshot_enabled(HS_DYNAMIC_CONFIG(consensus.snapshot_freq_distance)) - .with_leadership_expiry(-1 /* never expires */) // >>> debug only + //.with_leadership_expiry(-1 /* never expires */) // >>> debug only .with_reserved_log_items(0) // In reality ReplLogStore retains much more than this .with_auto_forwarding(false); r_params.return_method_ = nuraft::raft_params::async_handler; diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 25353a275..bb00fa761 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -46,7 +46,7 @@ SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) SISL_OPTION_GROUP(test_raft_repl_dev, (block_size, "", "block_size", "block size to io", ::cxxopts::value< uint32_t >()->default_value("4096"), "number")); -SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, test_common_setup, test_repl_common_setup) +SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup) static std::unique_ptr< test_common::HSReplTestHelper > g_helper; @@ -100,7 +100,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, cintrusive< repl_req_ctx >& ctx) override { - LOGINFO("[Replica={}] Received commit on lsn={}", g_helper->replica_num(), lsn); + // LOGINFO("[Replica={}] Received commit on lsn={}", g_helper->replica_num(), lsn); ASSERT_EQ(header.size(), sizeof(test_req::journal_header)); auto jheader = r_cast< test_req::journal_header const* >(header.cbytes()); @@ -108,6 +108,8 @@ class TestReplicatedDB : public homestore::ReplDevListener { Value v{ .lsn_ = lsn, .data_size_ = jheader->data_size, .data_pattern_ = jheader->data_pattern, .blkid_ = blkids}; + LOGINFO("[Replica={}] Received commit on lsn={}, blkid={}, data_size={}", g_helper->replica_num(), lsn, + blkids.to_string(), jheader->data_size); { std::unique_lock lk(db_mtx_); inmem_db_.insert_or_assign(k, v); @@ -161,6 +163,10 @@ class TestReplicatedDB : public homestore::ReplDevListener { std::tie(k, v) = *it; ++it; } + + LOGINFO("[Replica={}]: Validating blkid={}, data_size={}", g_helper->replica_num(), v.blkid_.to_string(), + v.data_size_); + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size); @@ -237,13 +243,14 @@ class RaftReplDevTest : public testing::Test { flip::FlipClient m_fc{iomgr_flip::instance()}; #endif }; - +#if 0 TEST_F(RaftReplDevTest, All_Append) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); if (g_helper->replica_num() == 0) { - g_helper->sync_dataset_size(SISL_OPTIONS["num_io"].as< uint64_t >()); + // g_helper->sync_dataset_size(SISL_OPTIONS["num_io"].as< uint64_t >()); + g_helper->sync_dataset_size(1); auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); g_helper->runner().set_task([this, block_size]() { this->generate_writes(block_size, block_size); }); @@ -258,7 +265,9 @@ TEST_F(RaftReplDevTest, All_Append) { g_helper->sync_for_cleanup_start(); } +#endif +#if 1 TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); @@ -266,10 +275,13 @@ TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) { set_flip_point("simulate_fetch_remote_data"); if (g_helper->replica_num() == 0) { - g_helper->sync_dataset_size(SISL_OPTIONS["num_io"].as< uint64_t >()); + // g_helper->sync_dataset_size(SISL_OPTIONS["num_io"].as< uint64_t >()); + g_helper->sync_dataset_size(100); auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); - g_helper->runner().set_task([this, block_size]() { this->generate_writes(block_size, block_size); }); + g_helper->runner().set_task([this, block_size]() { + this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */); + }); g_helper->runner().execute().get(); } @@ -278,18 +290,19 @@ TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) { g_helper->sync_for_verify_start(); // TODO: seems with filip and fetch remote, the data size is not correct; - // LOGINFO("Validate all data written so far by reading them"); - // this->validate_all_data(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_all_data(); g_helper->sync_for_cleanup_start(); } - +#endif int main(int argc, char* argv[]) { int parsed_argc{argc}; char** orig_argv = argv; ::testing::InitGoogleTest(&parsed_argc, argv); - SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_raft_repl_dev, iomgr, test_common_setup, test_repl_common_setup); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_raft_repl_dev, iomgr, config, test_common_setup, + test_repl_common_setup); FLAGS_folly_global_cpu_executor_threads = 4; g_helper = std::make_unique< test_common::HSReplTestHelper >("test_raft_repl_dev", orig_argv); From 161cbe44d1d58887e16307554af7cde489572088 Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Tue, 30 Jan 2024 11:01:19 -0700 Subject: [PATCH 07/11] fix merge conflict --- src/lib/homestore.cpp | 4 ---- src/lib/replication/repl_dev/raft_repl_dev.cpp | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index 20686c89a..b22164b5d 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -242,10 +242,6 @@ void HomeStore::shutdown() { m_index_service->stop(); // m_index_service.reset(); } - if (has_repl_data_service()) { - s_cast< GenericReplService* >(m_repl_service.get())->stop(); - m_repl_service.reset(); - } if (has_log_service()) { m_log_service->stop(); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 29dd5eb89..e7db30838 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -232,14 +232,14 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d repl_key{.server_id = push_req->issuer_replica_id(), .term = push_req->raft_term(), .dsn = push_req->dsn()}, header, key, push_req->data_size()); rreq->rpc_data = rpc_data; - +#ifdef _PRERELEASE if (iomgr_flip::instance()->test_flip("simulate_fetch_remote_data")) { LOGINFO("Data Channel: Flip is enabled, skip on_push_data_received to simulate fetch remote data, " "server_id={}, term={}, dsn={}", push_req->issuer_replica_id(), push_req->raft_term(), push_req->dsn()); return; } - +#endif RD_LOG(INFO, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); if (rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)) & From d8d1a1b80dd9fd6ab88ce5068ffee7a7b603072b Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Tue, 30 Jan 2024 11:31:46 -0700 Subject: [PATCH 08/11] fix build --- src/tests/CMakeLists.txt | 4 ++-- src/tests/test_raft_repl_dev.cpp | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index a9752a3ef..9f16520fd 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -110,7 +110,7 @@ if (${io_tests}) add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) - add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev) + #add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev) # add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore) # add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev) endif() @@ -120,7 +120,7 @@ if (${io_tests}) add_test(NAME LogStore-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store -- --spdk "true") add_test(NAME MetaBlkMgr-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr -- --spdk "true") add_test(NAME DataSerice-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service -- --spdk "true") - add_test(NAME SoloReplDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev -- --spdk "true") + #add_test(NAME SoloReplDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev -- --spdk "true") add_test(NAME HomeRaftLogStore-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore -- --spdk "true") add_test(NAME RaftReplDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev -- --spdk "true") if(${epoll_tests}) diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index ef6fbf6de..128138863 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -240,7 +240,6 @@ class RaftReplDevTest : public testing::Test { TestReplicatedDB& pick_one_db() { return *dbs_[0]; } - #ifdef _PRERELEASE void set_flip_point(const std::string flip_name) { flip::FlipCondition null_cond; @@ -265,7 +264,6 @@ class RaftReplDevTest : public testing::Test { } } - private: std::vector< std::shared_ptr< TestReplicatedDB > > dbs_; #ifdef _PRERELEASE @@ -312,9 +310,7 @@ TEST_F(RaftReplDevTest, All_Append_Restart_Append) { this->validate_all_data(); g_helper->sync_for_cleanup_start(); } -#endif -#if 1 TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); @@ -342,7 +338,7 @@ TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) { g_helper->sync_for_cleanup_start(); } -#endif + int main(int argc, char* argv[]) { int parsed_argc{argc}; char** orig_argv = argv; From b4252c54daef6677092003bf0a71d0e6b8076ed9 Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Tue, 30 Jan 2024 12:30:54 -0700 Subject: [PATCH 09/11] fix timer --- src/lib/replication/repl_dev/raft_repl_dev.cpp | 14 +++++++------- src/tests/CMakeLists.txt | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 0d1d1cb3d..492737ce7 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -162,7 +162,6 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { }); } - void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { auto const& incoming_buf = rpc_data->request_blob(); auto fetch_req = GetSizePrefixedFetchData(incoming_buf.cbytes()); @@ -249,7 +248,7 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ group_msg_service()->send_data_service_response(pkts, rpc_data); } - + void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) { if (err == ReplServiceError::OK) { return; } @@ -561,14 +560,15 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > // if in resync mode, fetch data from remote immediately; check_and_fetch_remote_data(rreqs); } else { - check_and_fetch_remote_data(rreqs); -#if 0 + // check_and_fetch_remote_data(rreqs); // some data are not in completed state, let's schedule a timer to check it again; // we wait for data channel to fill in the data. Still if its not done we trigger a fetch from remote; - m_wait_data_timer_hdl = iomanager.schedule_thread_timer( // timer wakes up in current thread; + m_wait_data_timer_hdl = iomanager.schedule_global_timer( // timer wakes up in current thread; HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */, - nullptr /* cookie */, [this, rreqs](auto) { check_and_fetch_remote_data(rreqs); }); -#endif + nullptr /* cookie */, iomgr::reactor_regex::all_worker, [this, rreqs](auto) { + LOGINFO("Data Channel: Wait data write timer fired, checking if data is written"); + check_and_fetch_remote_data(rreqs); + }); } // block waiting here until all the futs are ready (data channel filled in and promises are made); diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 9f16520fd..46ab02afc 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -112,7 +112,7 @@ if (${io_tests}) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) #add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev) # add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore) - # add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev) + add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev) endif() can_build_spdk_io_tests(spdk_tests) From 9433b09df4bc24e26ab0c4195d9cf82bc2bef2cf Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Tue, 30 Jan 2024 16:35:07 -0700 Subject: [PATCH 10/11] disable non-recurssive tmr --- src/lib/replication/repl_dev/raft_repl_dev.cpp | 6 ++++-- src/lib/replication/repl_dev/raft_repl_dev.h | 1 - src/tests/test_raft_repl_dev.cpp | 5 ++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 492737ce7..4e2bb363f 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -560,15 +560,17 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > // if in resync mode, fetch data from remote immediately; check_and_fetch_remote_data(rreqs); } else { - // check_and_fetch_remote_data(rreqs); + check_and_fetch_remote_data(rreqs); // some data are not in completed state, let's schedule a timer to check it again; // we wait for data channel to fill in the data. Still if its not done we trigger a fetch from remote; +#if 0 m_wait_data_timer_hdl = iomanager.schedule_global_timer( // timer wakes up in current thread; HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */, - nullptr /* cookie */, iomgr::reactor_regex::all_worker, [this, rreqs](auto) { + nullptr /* cookie */, iomgr::reactor_regex::all_worker, [this, rreqs](auto /*cookie*/) { LOGINFO("Data Channel: Wait data write timer fired, checking if data is written"); check_and_fetch_remote_data(rreqs); }); +#endif } // block waiting here until all the futs are ready (data channel filled in and promises are made); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 8fbcebd2a..3184e2cf1 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -126,7 +126,6 @@ class RaftReplDev : public ReplDev, bool is_resync_mode() { return m_resync_mode; } void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err); - }; } // namespace homestore diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 128138863..a7bdf9b11 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -174,9 +174,6 @@ class TestReplicatedDB : public homestore::ReplDevListener { ++it; } - LOGINFO("[Replica={}]: Validating blkid={}, data_size={}", g_helper->replica_num(), v.blkid_.to_string(), - v.data_size_); - auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size); @@ -315,7 +312,9 @@ TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); +#ifdef _PRERELEASE set_flip_point("simulate_fetch_remote_data"); +#endif if (g_helper->replica_num() == 0) { // g_helper->sync_dataset_size(SISL_OPTIONS["num_io"].as< uint64_t >()); From 73b223aba8d877ad9aab0a8f2c291a30c6caa7b0 Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Tue, 30 Jan 2024 16:54:47 -0700 Subject: [PATCH 11/11] revert cmake --- src/tests/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 46ab02afc..2543795a7 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -112,7 +112,7 @@ if (${io_tests}) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) #add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev) # add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore) - add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev) + # add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev) endif() can_build_spdk_io_tests(spdk_tests) @@ -120,7 +120,7 @@ if (${io_tests}) add_test(NAME LogStore-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store -- --spdk "true") add_test(NAME MetaBlkMgr-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr -- --spdk "true") add_test(NAME DataSerice-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service -- --spdk "true") - #add_test(NAME SoloReplDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev -- --spdk "true") + add_test(NAME SoloReplDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev -- --spdk "true") add_test(NAME HomeRaftLogStore-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore -- --spdk "true") add_test(NAME RaftReplDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev -- --spdk "true") if(${epoll_tests})