diff --git a/conanfile.py b/conanfile.py index f526e3b4e..2a30e53e6 100644 --- a/conanfile.py +++ b/conanfile.py @@ -6,6 +6,7 @@ class HomestoreConan(ConanFile): name = "homestore" version = "5.0.8" + homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" topics = ("ebay", "nublox") diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 95f523d81..66181ed2b 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -220,6 +220,12 @@ table Consensus { // Minimum log gap a replica has to be from leader before joining the replica set. min_log_gap_to_join: int32 = 30; + + // amount of time in seconds to wait on data write before fetch data from remote; + wait_data_write_timer_sec: uint32 = 30 (hotswap); + + // Leadership expiry 120 seconds + leadership_expiry_ms: uint32 = 120000; } table HomeStoreSettings { diff --git a/src/lib/meta/meta_blk_service.cpp b/src/lib/meta/meta_blk_service.cpp index 50259becf..630702af3 100644 --- a/src/lib/meta/meta_blk_service.cpp +++ b/src/lib/meta/meta_blk_service.cpp @@ -331,7 +331,7 @@ bool MetaBlkService::scan_and_load_meta_blks(meta_blk_map_t& meta_blks, ovf_hdr_ mblk->hdr.h.compressed = 0; mblk->hdr.h.context_sz = read_sz; } else { - LOGINFO("[type={}], meta blk size check passed!", mblk->hdr.h.type); + LOGDEBUG("[type={}], meta blk size check passed!", mblk->hdr.h.type); } // move on to next meta blk; diff --git a/src/lib/replication/CMakeLists.txt b/src/lib/replication/CMakeLists.txt index 448d14adc..0fe578d68 100644 --- a/src/lib/replication/CMakeLists.txt +++ b/src/lib/replication/CMakeLists.txt @@ -7,7 +7,7 @@ list(APPEND SCHEMA_FLAGS "--scoped-enums" "--gen-name-strings" "--cpp-std=c++17" flatbuffers_generate_headers( TARGET hs_replication_fb - SCHEMAS push_data_rpc.fbs + SCHEMAS push_data_rpc.fbs fetch_data_rpc.fbs FLAGS ${SCHEMA_FLAGS} ) diff --git a/src/lib/replication/fetch_data_rpc.fbs b/src/lib/replication/fetch_data_rpc.fbs index d73d4dd1f..e809cde42 100644 --- a/src/lib/replication/fetch_data_rpc.fbs +++ b/src/lib/replication/fetch_data_rpc.fbs @@ -6,7 +6,7 @@ table RequestEntry { dsn : uint64; // Data Sequence number user_header: [ubyte]; // User header bytes user_key : [ubyte]; // User key data - blkid_originator : int32; // Originally which replica's blkid is this + blkid_originator : int32; // Server_id: Originally which replica's blkid is this remote_blkid : [ubyte]; // Serialized remote blkid } @@ -15,7 +15,7 @@ table FetchDataRequest { } table ResponseEntry { - lsn : [int64]; // LSN of the raft log if known + lsn : int64; // LSN of the raft log if known dsn : uint64; // Data Sequence number raft_term : uint64; // Raft term number data_size : uint32; // Size of the data which is sent as separate non flatbuffer @@ -31,4 +31,4 @@ table FetchData { response : FetchDataResponse; } -root_type FetchData; \ No newline at end of file +root_type FetchData; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 6ecf7dfd8..4e2bb363f 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1,7 +1,7 @@ #include #include #include - +#include #include #include #include @@ -9,9 +9,12 @@ #include #include "common/homestore_assert.hpp" +#include "common/homestore_config.hpp" +// #include "common/homestore_flip.hpp" #include "replication/service/raft_repl_service.h" #include "replication/repl_dev/raft_repl_dev.h" #include "push_data_rpc_generated.h" +#include "fetch_data_rpc_generated.h" namespace homestore { std::atomic< uint64_t > RaftReplDev::s_next_group_ordinal{1}; @@ -63,8 +66,9 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk RD_LOG(INFO, "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={}", (load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id, m_commit_upto_lsn.load(), m_next_dsn.load()); + m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, bind_this(RaftReplDev::on_push_data_received, 1)); - // m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 2)); + m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 1)); } bool RaftReplDev::join_group() { @@ -140,7 +144,7 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t), PushDataRequestTypeTable()));*/ - RD_LOG(INFO, "Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string()); + LOGINFO("Data Channel: Pushing data to all followers: rreq=[{}]", rreq->to_compact_string()); group_msg_service() ->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->pkts) @@ -158,6 +162,93 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { }); } +void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { + auto const& incoming_buf = rpc_data->request_blob(); + auto fetch_req = GetSizePrefixedFetchData(incoming_buf.cbytes()); + + RD_LOG(INFO, "Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size()); + + std::vector< sisl::sg_list > sgs_vec; + + struct Context { + std::condition_variable cv; + std::mutex mtx; + size_t outstanding_read_cnt; + }; + + auto ctx = std::make_shared< Context >(); + ctx->outstanding_read_cnt = fetch_req->request()->entries()->size(); + + for (auto const& req : *(fetch_req->request()->entries())) { + RD_LOG(INFO, "Data Channel: FetchData received: lsn={}", req->lsn()); + + auto const& lsn = req->lsn(); + auto const& term = req->raft_term(); + auto const& dsn = req->dsn(); + auto const& header = req->user_header(); + auto const& key = req->user_key(); + auto const& originator = req->blkid_originator(); + auto const& remote_blkid = req->remote_blkid(); + + // fetch data based on the remote_blkid + if (originator == server_id()) { + // We are the originator of the blkid, read data locally; + MultiBlkId local_blkid; + + // convert remote_blkid serialized data to local blkid + local_blkid.deserialize(sisl::blob{remote_blkid->Data(), remote_blkid->size()}, true /* copy */); + + // prepare the sgs data buffer to read into; + auto const total_size = local_blkid.blk_count() * get_blk_size(); + sisl::sg_list sgs; + sgs.size = total_size; + sgs.iovs.emplace_back( + iovec{.iov_base = iomanager.iobuf_alloc(get_blk_size(), total_size), .iov_len = total_size}); + + // accumulate the sgs for later use (send back to the requester)); + sgs_vec.push_back(sgs); + + async_read(local_blkid, sgs, total_size).thenValue([this, &ctx](auto&& err) { + RD_REL_ASSERT(!err, "Error in reading data"); // TODO: Find a way to return error to the Listener + { + std::unique_lock< std::mutex > lk{ctx->mtx}; + --(ctx->outstanding_read_cnt); + } + ctx->cv.notify_one(); + }); + } else { + // TODO: if we are not the originator, we need to fetch based on lsn; + // To be implemented; + RD_LOG(INFO, "I am not the originaltor for the requested blks, originaltor: {}, server_id: {}.", originator, + server_id()); + } + } + + { + // wait for read to complete; + std::unique_lock< std::mutex > lk{ctx->mtx}; + ctx->cv.wait(lk, [&ctx] { return (ctx->outstanding_read_cnt == 0); }); + } + + // now prepare the io_blob_list to response back to requester; + nuraft_mesg::io_blob_list_t pkts = sisl::io_blob_list_t{}; + for (auto const& sgs : sgs_vec) { + auto const ret = sisl::io_blob::sg_list_to_ioblob_list(sgs); + pkts.insert(pkts.end(), ret.begin(), ret.end()); + } + + // copy by value to avoid since it is on stack; + rpc_data->set_comp_cb([sgs_vec](boost::intrusive_ptr< sisl::GenericRpcData >&) { + for (auto const& sgs : sgs_vec) { + for (auto const& iov : sgs.iovs) { + iomanager.iobuf_free(reinterpret_cast< uint8_t* >(iov.iov_base)); + } + } + }); + + group_msg_service()->send_data_service_response(pkts, rpc_data); +} + void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) { if (err == ReplServiceError::OK) { return; } @@ -206,15 +297,18 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d sisl::blob header = sisl::blob{push_req->user_header()->Data(), push_req->user_header()->size()}; sisl::blob key = sisl::blob{push_req->user_key()->Data(), push_req->user_key()->size()}; - RD_LOG(TRACE, "PushData received on data channel: {}", - flatbuffers::FlatBufferToString(incoming_buf.cbytes() + sizeof(flatbuffers::uoffset_t), - PushDataRequestTypeTable())); - auto rreq = follower_create_req( repl_key{.server_id = push_req->issuer_replica_id(), .term = push_req->raft_term(), .dsn = push_req->dsn()}, header, key, push_req->data_size()); rreq->rpc_data = rpc_data; - +#ifdef _PRERELEASE + if (iomgr_flip::instance()->test_flip("simulate_fetch_remote_data")) { + LOGINFO("Data Channel: Flip is enabled, skip on_push_data_received to simulate fetch remote data, " + "server_id={}, term={}, dsn={}", + push_req->issuer_replica_id(), push_req->raft_term(), push_req->dsn()); + return; + } +#endif RD_LOG(INFO, "Data Channel: Received data rreq=[{}]", rreq->to_compact_string()); if (rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_RECEIVED)) & @@ -292,9 +386,149 @@ repl_req_ptr_t RaftReplDev::follower_create_req(repl_key const& rkey, sisl::blob rreq->local_blkid = do_alloc_blk(data_size, m_listener->get_blk_alloc_hints(user_header, data_size)); rreq->state.fetch_or(uint32_cast(repl_req_state_t::BLK_ALLOCATED)); + RD_LOG(INFO, "in follower_create_req: rreq={}, addr={}", rreq->to_compact_string(), + reinterpret_cast< uintptr_t >(rreq.get())); return rreq; } +void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs) { + // Pop any entries that are already completed - from the entries list as well as from map + rreqs->erase(std::remove_if( + rreqs->begin(), rreqs->end(), + [this](repl_req_ptr_t const& rreq) { + if (rreq == nullptr) { return true; } + + if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN)) { + m_repl_key_req_map.erase(rreq->rkey); // Remove=Pop from map as well, since it is completed + RD_LOG(INFO, + "Raft Channel: Data write completed and blkid mapped, removing from map: rreq=[{}]", + rreq->to_compact_string()); + return true; // Remove from the pending list + } else { + return false; + } + }), + rreqs->end()); + + if (rreqs->size()) { + // Some data not completed yet, let's fetch from remote; + fetch_data_from_remote(rreqs); + } +} + +void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { + std::vector<::flatbuffers::Offset< RequestEntry > > entries; + entries.reserve(rreqs->size()); + + shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); + + for (auto const& rreq : *rreqs) { + entries.push_back(CreateRequestEntry(*builder, rreq->get_lsn(), rreq->term(), rreq->dsn(), + builder->CreateVector(rreq->header.cbytes(), rreq->header.size()), + builder->CreateVector(rreq->key.cbytes(), rreq->key.size()), + rreq->remote_blkid.server_id /* blkid_originator */, + builder->CreateVector(rreq->remote_blkid.blkid.serialize().cbytes(), + rreq->remote_blkid.blkid.serialized_size()))); + LOGINFO("Fetching data from remote: rreq=[{}], remote_blkid={}", rreq->to_compact_string(), + rreq->remote_blkid.blkid.to_string()); + } + + builder->FinishSizePrefixed( + CreateFetchData(*builder, CreateFetchDataRequest(*builder, builder->CreateVector(entries)))); + + // leader can change, on the receiving side, we need to check if the leader is still the one who originated the + // blkid; + group_msg_service() + ->data_service_request_bidirectional( + nuraft_mesg::role_regex::LEADER, FETCH_DATA, + sisl::io_blob_list_t{ + sisl::io_blob{builder->GetBufferPointer(), builder->GetSize(), false /* is_aligned */}}) + .via(&folly::InlineExecutor::instance()) + .thenValue([this, builder, rreqs](auto e) { + RD_REL_ASSERT(!!e, "Error in fetching data"); + + auto raw_data = e.value().cbytes(); + auto total_size = e.value().size(); + + for (auto const& rreq : *rreqs) { + auto const data_size = rreq->remote_blkid.blkid.blk_count() * get_blk_size(); + // if data is already received, skip it because someone is already doing the write; + if (rreq->state.load() & uint32_cast(repl_req_state_t::DATA_RECEIVED)) { + // very unlikely to arrive here, but if data got received during we fetch, let the data channel + // handle data written; + raw_data += data_size; + total_size -= data_size; + + // if blk is already allocated, validate if blk is valid and size matches; + RD_DBG_ASSERT(rreq->local_blkid.is_valid(), "Invalid blkid for rreq={}", rreq->to_compact_string()); + auto const local_size = rreq->local_blkid.blk_count() * get_blk_size(); + RD_DBG_ASSERT_EQ(data_size, local_size, + "Data size mismatch for rreq={} blkid={}, remote size: {}, local size: {}", + rreq->to_compact_string(), rreq->local_blkid.to_string(), data_size, local_size); + + RD_LOG(INFO, "Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.", + rreq->to_compact_string()); + continue; + } else { + // aquire lock here to avoid two threads are trying to do the same thing; + std::unique_lock< std::mutex > lg(rreq->state_mtx); + if (rreq->state.load() & uint32_cast(repl_req_state_t::BLK_ALLOCATED)) { + // if blk is already allocated, validate if blk is valid and size matches; + RD_DBG_ASSERT(rreq->local_blkid.is_valid(), "Invalid blkid for rreq={}", + rreq->to_compact_string()); + auto const local_size = rreq->local_blkid.blk_count() * get_blk_size(); + RD_DBG_ASSERT_EQ(data_size, local_size, + "Data size mismatch for rreq={} blkid={}, remote size: {}, local size: {}", + rreq->to_compact_string(), rreq->local_blkid.to_string(), data_size, + local_size); + } else { + // if blk is not allocated, we need to allocate it; + rreq->local_blkid = + do_alloc_blk(data_size, m_listener->get_blk_alloc_hints(rreq->header, data_size)); + + // we are about to write the data, so mark both blk allocated and data received; + rreq->state.fetch_or( + uint32_cast(repl_req_state_t::BLK_ALLOCATED | repl_req_state_t::DATA_RECEIVED)); + } + } + + auto data = raw_data; + if (((uintptr_t)raw_data % data_service().get_align_size()) != 0) { + // Unaligned buffer, create a new buffer and copy the entire buf + rreq->buf_for_unaligned_data = + std::move(sisl::io_blob_safe(data_size, data_service().get_align_size())); + std::memcpy(rreq->buf_for_unaligned_data.bytes(), data, data_size); + data = rreq->buf_for_unaligned_data.cbytes(); + RD_DBG_ASSERT(((uintptr_t)data % data_service().get_align_size()) == 0, + "Data is still not aligned after copy"); + } + + // Schedule a write and upon completion, mark the data as written. + data_service() + .async_write(r_cast< const char* >(data), data_size, rreq->local_blkid) + .thenValue([this, rreq](auto&& err) { + RD_REL_ASSERT(!err, + "Error in writing data"); // TODO: Find a way to return error to the Listener + rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); + rreq->data_written_promise.setValue(); + RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); + }); + + // move the raw_data pointer to next rreq's data; + raw_data += data_size; + total_size -= data_size; + + LOGINFO( + "Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}", + rreq->to_compact_string(), data_size, total_size, rreq->local_blkid.to_string()); + } + + builder->Release(); + + RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed"); + }); +} + AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t >* rreqs) { std::vector< folly::SemiFuture< folly::Unit > > futs; futs.reserve(rreqs->size()); @@ -321,23 +555,25 @@ AsyncNotify RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t > // All the entries are done already, no need to wait if (rreqs->size() == 0) { return folly::makeFuture< folly::Unit >(folly::Unit{}); } -#if 0 // We are yet to support reactive fetch from remote. - if (m_resync_mode) { + if (is_resync_mode()) { // if in resync mode, fetch data from remote immediately; - check_and_fetch_remote_data(std::move(rreqs)); + check_and_fetch_remote_data(rreqs); } else { - // some blkids are not in completed state, let's schedule a timer to check it again; + check_and_fetch_remote_data(rreqs); + // some data are not in completed state, let's schedule a timer to check it again; // we wait for data channel to fill in the data. Still if its not done we trigger a fetch from remote; - m_wait_blkid_write_timer_hdl = iomanager.schedule_thread_timer( // timer wakes up in current thread; - HS_DYNAMIC_CONFIG(repl->wait_blkid_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */, - nullptr /* cookie */, [this, std::move(rreqs)](auto) { - check_and_fetch_remote_data(std::move(rreqs)); +#if 0 + m_wait_data_timer_hdl = iomanager.schedule_global_timer( // timer wakes up in current thread; + HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_sec) * 1000 * 1000 * 1000, false /* recurring */, + nullptr /* cookie */, iomgr::reactor_regex::all_worker, [this, rreqs](auto /*cookie*/) { + LOGINFO("Data Channel: Wait data write timer fired, checking if data is written"); + check_and_fetch_remote_data(rreqs); }); - } - return ret; #endif + } + // block waiting here until all the futs are ready (data channel filled in and promises are made); return folly::collectAll(futs).deferValue([this, rreqs](auto&& e) { for (auto const& rreq : *rreqs) { HS_DBG_ASSERT(rreq->state.load() & uint32_cast(repl_req_state_t::DATA_WRITTEN), diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 066c34174..3184e2cf1 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -57,6 +57,9 @@ class RaftReplDev : public ReplDev, iomgr::timer_handle_t m_sb_flush_timer_hdl; std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry + // + iomgr::timer_handle_t m_wait_data_timer_hdl{iomgr::null_timer_handle}; + bool m_resync_mode{false}; static std::atomic< uint64_t > s_next_group_ordinal; @@ -117,6 +120,11 @@ class RaftReplDev : public ReplDev, shared< nuraft::log_store > data_journal() { return m_data_journal; } void push_data_to_all_followers(repl_req_ptr_t rreq); void on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data); + void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data); + void check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs); + void fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs); + + bool is_resync_mode() { return m_resync_mode; } void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err); }; diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index f1811fd77..ea3cba12d 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -145,6 +145,8 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo auto const log_buf_data_offset = log_buf->size() - lentry->get_buf().size(); auto const [jentry, header, key] = log_to_journal_entry(log_buf, log_buf_data_offset); + LOGINFO("Received Raft server_id={}, term={}, dsn={}, journal_entry=[{}] ", jentry->server_id, lentry->get_term(), + jentry->dsn, jentry->to_string()); // From the repl_key, get the repl_req. In cases where log stream got here first, this method will create a new // repl_req and return that back. Fill up all of the required journal entry inside the repl_req auto rreq = m_rd.follower_create_req( diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index dde2da93c..b0cfbca75 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -96,6 +96,7 @@ void RaftReplService::start() { .with_stale_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_hi_threshold)) .with_fresh_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_lo_threshold)) .with_snapshot_enabled(HS_DYNAMIC_CONFIG(consensus.snapshot_freq_distance)) + //.with_leadership_expiry(-1 /* never expires */) // >>> debug only .with_reserved_log_items(0) // In reality ReplLogStore retains much more than this .with_auto_forwarding(false); r_params.return_method_ = nuraft::raft_params::async_handler; diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index bf80e4dc5..023e25565 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -110,7 +110,8 @@ if (${io_tests}) add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) - # add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev) + + #add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev) # add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore) # add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev) endif() diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 2ffaf2d70..a7bdf9b11 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -25,7 +25,7 @@ #include #include #include - +#include #include #include #include @@ -48,6 +48,7 @@ SISL_OPTION_GROUP(test_raft_repl_dev, ::cxxopts::value< uint32_t >()->default_value("4096"), "number"), (num_raft_groups, "", "num_raft_groups", "number of raft groups per test", ::cxxopts::value< uint32_t >()->default_value("1"), "number")); + SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup) static std::unique_ptr< test_common::HSReplTestHelper > g_helper; @@ -172,6 +173,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { std::tie(k, v) = *it; ++it; } + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size); @@ -235,6 +237,17 @@ class RaftReplDevTest : public testing::Test { TestReplicatedDB& pick_one_db() { return *dbs_[0]; } +#ifdef _PRERELEASE + void set_flip_point(const std::string flip_name) { + flip::FlipCondition null_cond; + flip::FlipFrequency freq; + freq.set_count(1); + freq.set_percent(100); + m_fc.inject_noreturn_flip(flip_name, {null_cond}, freq); + LOGDEBUG("Flip {} set", flip_name); + } +#endif + void switch_all_db_leader() { for (auto const& db : dbs_) { do { @@ -250,9 +263,13 @@ class RaftReplDevTest : public testing::Test { private: std::vector< std::shared_ptr< TestReplicatedDB > > dbs_; +#ifdef _PRERELEASE + flip::FlipClient m_fc{iomgr_flip::instance()}; +#endif }; TEST_F(RaftReplDevTest, All_Append_Restart_Append) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); @@ -291,11 +308,42 @@ TEST_F(RaftReplDevTest, All_Append_Restart_Append) { g_helper->sync_for_cleanup_start(); } +TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + +#ifdef _PRERELEASE + set_flip_point("simulate_fetch_remote_data"); +#endif + + if (g_helper->replica_num() == 0) { + // g_helper->sync_dataset_size(SISL_OPTIONS["num_io"].as< uint64_t >()); + g_helper->sync_dataset_size(100); + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); + g_helper->runner().set_task([this, block_size]() { + this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */); + }); + g_helper->runner().execute().get(); + } + + this->wait_for_all_writes(g_helper->dataset_size()); + + g_helper->sync_for_verify_start(); + + // TODO: seems with filip and fetch remote, the data size is not correct; + LOGINFO("Validate all data written so far by reading them"); + this->validate_all_data(); + + g_helper->sync_for_cleanup_start(); +} + int main(int argc, char* argv[]) { int parsed_argc{argc}; char** orig_argv = argv; ::testing::InitGoogleTest(&parsed_argc, argv); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, config, test_raft_repl_dev, iomgr, test_common_setup, test_repl_common_setup);