Skip to content

Commit

Permalink
Issue: 257 Replication Fetch Remote Data (#290)
Browse files Browse the repository at this point in the history
* issue: 257 Fetch remote data

* add wait for read complete on fetch complete
  • Loading branch information
yamingk authored Feb 1, 2024
1 parent 9aa7baa commit 42cdddb
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 25 deletions.
1 change: 1 addition & 0 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class HomestoreConan(ConanFile):
name = "homestore"
version = "5.0.8"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
6 changes: 6 additions & 0 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ table Consensus {

// Minimum log gap a replica has to be from leader before joining the replica set.
min_log_gap_to_join: int32 = 30;

// amount of time in seconds to wait on data write before fetch data from remote;
wait_data_write_timer_sec: uint32 = 30 (hotswap);

// Leadership expiry 120 seconds
leadership_expiry_ms: uint32 = 120000;
}

table HomeStoreSettings {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/meta/meta_blk_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ bool MetaBlkService::scan_and_load_meta_blks(meta_blk_map_t& meta_blks, ovf_hdr_
mblk->hdr.h.compressed = 0;
mblk->hdr.h.context_sz = read_sz;
} else {
LOGINFO("[type={}], meta blk size check passed!", mblk->hdr.h.type);
LOGDEBUG("[type={}], meta blk size check passed!", mblk->hdr.h.type);
}

// move on to next meta blk;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ list(APPEND SCHEMA_FLAGS "--scoped-enums" "--gen-name-strings" "--cpp-std=c++17"

flatbuffers_generate_headers(
TARGET hs_replication_fb
SCHEMAS push_data_rpc.fbs
SCHEMAS push_data_rpc.fbs fetch_data_rpc.fbs
FLAGS ${SCHEMA_FLAGS}
)

Expand Down
6 changes: 3 additions & 3 deletions src/lib/replication/fetch_data_rpc.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ table RequestEntry {
dsn : uint64; // Data Sequence number
user_header: [ubyte]; // User header bytes
user_key : [ubyte]; // User key data
blkid_originator : int32; // Originally which replica's blkid is this
blkid_originator : int32; // Server_id: Originally which replica's blkid is this
remote_blkid : [ubyte]; // Serialized remote blkid
}

Expand All @@ -15,7 +15,7 @@ table FetchDataRequest {
}

table ResponseEntry {
lsn : [int64]; // LSN of the raft log if known
lsn : int64; // LSN of the raft log if known
dsn : uint64; // Data Sequence number
raft_term : uint64; // Raft term number
data_size : uint32; // Size of the data which is sent as separate non flatbuffer
Expand All @@ -31,4 +31,4 @@ table FetchData {
response : FetchDataResponse;
}

root_type FetchData;
root_type FetchData;
272 changes: 254 additions & 18 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class RaftReplDev : public ReplDev,
iomgr::timer_handle_t m_sb_flush_timer_hdl;

std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry
//
iomgr::timer_handle_t m_wait_data_timer_hdl{iomgr::null_timer_handle};
bool m_resync_mode{false};

static std::atomic< uint64_t > s_next_group_ordinal;

Expand Down Expand Up @@ -117,6 +120,11 @@ class RaftReplDev : public ReplDev,
shared< nuraft::log_store > data_journal() { return m_data_journal; }
void push_data_to_all_followers(repl_req_ptr_t rreq);
void on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data);
void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data);
void check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs);
void fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs);

bool is_resync_mode() { return m_resync_mode; }
void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err);
};

Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo
auto const log_buf_data_offset = log_buf->size() - lentry->get_buf().size();
auto const [jentry, header, key] = log_to_journal_entry(log_buf, log_buf_data_offset);

LOGINFO("Received Raft server_id={}, term={}, dsn={}, journal_entry=[{}] ", jentry->server_id, lentry->get_term(),
jentry->dsn, jentry->to_string());
// From the repl_key, get the repl_req. In cases where log stream got here first, this method will create a new
// repl_req and return that back. Fill up all of the required journal entry inside the repl_req
auto rreq = m_rd.follower_create_req(
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ void RaftReplService::start() {
.with_stale_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_hi_threshold))
.with_fresh_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_lo_threshold))
.with_snapshot_enabled(HS_DYNAMIC_CONFIG(consensus.snapshot_freq_distance))
//.with_leadership_expiry(-1 /* never expires */) // >>> debug only
.with_reserved_log_items(0) // In reality ReplLogStore retains much more than this
.with_auto_forwarding(false);
r_params.return_method_ = nuraft::raft_params::async_handler;
Expand Down
3 changes: 2 additions & 1 deletion src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ if (${io_tests})
add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store)
add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr)
add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service)
# add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev)

#add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev)
# add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore)
# add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev)
endif()
Expand Down
50 changes: 49 additions & 1 deletion src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <folly/init/Init.h>
#include <folly/executors/GlobalExecutor.h>
#include <gtest/gtest.h>

#include <iomgr/iomgr_flip.hpp>
#include <homestore/blk.h>
#include <homestore/homestore.hpp>
#include <homestore/homestore_decl.hpp>
Expand All @@ -48,6 +48,7 @@ SISL_OPTION_GROUP(test_raft_repl_dev,
::cxxopts::value< uint32_t >()->default_value("4096"), "number"),
(num_raft_groups, "", "num_raft_groups", "number of raft groups per test",
::cxxopts::value< uint32_t >()->default_value("1"), "number"));

SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup)

static std::unique_ptr< test_common::HSReplTestHelper > g_helper;
Expand Down Expand Up @@ -172,6 +173,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
std::tie(k, v) = *it;
++it;
}

auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size);

Expand Down Expand Up @@ -235,6 +237,17 @@ class RaftReplDevTest : public testing::Test {

TestReplicatedDB& pick_one_db() { return *dbs_[0]; }

#ifdef _PRERELEASE
void set_flip_point(const std::string flip_name) {
flip::FlipCondition null_cond;
flip::FlipFrequency freq;
freq.set_count(1);
freq.set_percent(100);
m_fc.inject_noreturn_flip(flip_name, {null_cond}, freq);
LOGDEBUG("Flip {} set", flip_name);
}
#endif

void switch_all_db_leader() {
for (auto const& db : dbs_) {
do {
Expand All @@ -250,9 +263,13 @@ class RaftReplDevTest : public testing::Test {

private:
std::vector< std::shared_ptr< TestReplicatedDB > > dbs_;
#ifdef _PRERELEASE
flip::FlipClient m_fc{iomgr_flip::instance()};
#endif
};

TEST_F(RaftReplDevTest, All_Append_Restart_Append) {

LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();

Expand Down Expand Up @@ -291,11 +308,42 @@ TEST_F(RaftReplDevTest, All_Append_Restart_Append) {
g_helper->sync_for_cleanup_start();
}

TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();

#ifdef _PRERELEASE
set_flip_point("simulate_fetch_remote_data");
#endif

if (g_helper->replica_num() == 0) {
// g_helper->sync_dataset_size(SISL_OPTIONS["num_io"].as< uint64_t >());
g_helper->sync_dataset_size(100);
auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size);
g_helper->runner().set_task([this, block_size]() {
this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */);
});
g_helper->runner().execute().get();
}

this->wait_for_all_writes(g_helper->dataset_size());

g_helper->sync_for_verify_start();

// TODO: seems with filip and fetch remote, the data size is not correct;
LOGINFO("Validate all data written so far by reading them");
this->validate_all_data();

g_helper->sync_for_cleanup_start();
}

int main(int argc, char* argv[]) {
int parsed_argc{argc};
char** orig_argv = argv;

::testing::InitGoogleTest(&parsed_argc, argv);

SISL_OPTIONS_LOAD(parsed_argc, argv, logging, config, test_raft_repl_dev, iomgr, test_common_setup,
test_repl_common_setup);

Expand Down

0 comments on commit 42cdddb

Please sign in to comment.