Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue: 257 Replication Fetch Remote Data #290

Merged
merged 16 commits into from
Feb 1, 2024
3 changes: 2 additions & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.0.7"
version = "5.0.8"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
6 changes: 6 additions & 0 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ table Consensus {

// Minimum log gap a replica has to be from leader before joining the replica set.
min_log_gap_to_join: int32 = 30;

// amount of time in seconds to wait on data write before fetch data from remote;
wait_data_write_timer_sec: uint32 = 30 (hotswap);

// Leadership expiry 120 seconds
leadership_expiry_ms: uint32 = 120000;
}

table HomeStoreSettings {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/meta/meta_blk_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ bool MetaBlkService::scan_and_load_meta_blks(meta_blk_map_t& meta_blks, ovf_hdr_
mblk->hdr.h.compressed = 0;
mblk->hdr.h.context_sz = read_sz;
} else {
LOGINFO("[type={}], meta blk size check passed!", mblk->hdr.h.type);
LOGDEBUG("[type={}], meta blk size check passed!", mblk->hdr.h.type);
}

// move on to next meta blk;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ list(APPEND SCHEMA_FLAGS "--scoped-enums" "--gen-name-strings" "--cpp-std=c++17"

flatbuffers_generate_headers(
TARGET hs_replication_fb
SCHEMAS push_data_rpc.fbs
SCHEMAS push_data_rpc.fbs fetch_data_rpc.fbs
FLAGS ${SCHEMA_FLAGS}
)

Expand Down
6 changes: 3 additions & 3 deletions src/lib/replication/fetch_data_rpc.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ table RequestEntry {
dsn : uint64; // Data Sequence number
user_header: [ubyte]; // User header bytes
user_key : [ubyte]; // User key data
blkid_originator : int32; // Originally which replica's blkid is this
blkid_originator : int32; // Server_id: Originally which replica's blkid is this
remote_blkid : [ubyte]; // Serialized remote blkid
}

Expand All @@ -15,7 +15,7 @@ table FetchDataRequest {
}

table ResponseEntry {
lsn : [int64]; // LSN of the raft log if known
lsn : int64; // LSN of the raft log if known
dsn : uint64; // Data Sequence number
raft_term : uint64; // Raft term number
data_size : uint32; // Size of the data which is sent as separate non flatbuffer
Expand All @@ -31,4 +31,4 @@ table FetchData {
response : FetchDataResponse;
}

root_type FetchData;
root_type FetchData;
272 changes: 254 additions & 18 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class RaftReplDev : public ReplDev,
iomgr::timer_handle_t m_sb_flush_timer_hdl;

std::atomic< uint64_t > m_next_dsn{0}; // Data Sequence Number that will keep incrementing for each data entry
//
iomgr::timer_handle_t m_wait_data_timer_hdl{iomgr::null_timer_handle};
bool m_resync_mode{false};

static std::atomic< uint64_t > s_next_group_ordinal;

Expand Down Expand Up @@ -117,6 +120,11 @@ class RaftReplDev : public ReplDev,
shared< nuraft::log_store > data_journal() { return m_data_journal; }
void push_data_to_all_followers(repl_req_ptr_t rreq);
void on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data);
void on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data);
void check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rreqs);
void fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs);

bool is_resync_mode() { return m_resync_mode; }
void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err);
};

Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo
auto const log_buf_data_offset = log_buf->size() - lentry->get_buf().size();
auto const [jentry, header, key] = log_to_journal_entry(log_buf, log_buf_data_offset);

LOGINFO("Received Raft server_id={}, term={}, dsn={}, journal_entry=[{}] ", jentry->server_id, lentry->get_term(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need this trace. as we have trace in line 116

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is some additional info in this log, I will merge them in my comming PR>

jentry->dsn, jentry->to_string());
// From the repl_key, get the repl_req. In cases where log stream got here first, this method will create a new
// repl_req and return that back. Fill up all of the required journal entry inside the repl_req
auto rreq = m_rd.follower_create_req(
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ void RaftReplService::start() {
.with_stale_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_hi_threshold))
.with_fresh_log_gap(HS_DYNAMIC_CONFIG(consensus.stale_log_gap_lo_threshold))
.with_snapshot_enabled(HS_DYNAMIC_CONFIG(consensus.snapshot_freq_distance))
//.with_leadership_expiry(-1 /* never expires */) // >>> debug only
.with_reserved_log_items(0) // In reality ReplLogStore retains much more than this
.with_auto_forwarding(false);
r_params.return_method_ = nuraft::raft_params::async_handler;
Expand Down
2 changes: 1 addition & 1 deletion src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ if (${io_tests})
add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store)
add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr)
add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service)
add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev)
#add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev)
# add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore)
# add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev)
endif()
Expand Down
50 changes: 49 additions & 1 deletion src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <folly/init/Init.h>
#include <folly/executors/GlobalExecutor.h>
#include <gtest/gtest.h>

#include <iomgr/iomgr_flip.hpp>
#include <homestore/blk.h>
#include <homestore/homestore.hpp>
#include <homestore/homestore_decl.hpp>
Expand All @@ -48,6 +48,7 @@ SISL_OPTION_GROUP(test_raft_repl_dev,
::cxxopts::value< uint32_t >()->default_value("4096"), "number"),
(num_raft_groups, "", "num_raft_groups", "number of raft groups per test",
::cxxopts::value< uint32_t >()->default_value("1"), "number"));

SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup)

static std::unique_ptr< test_common::HSReplTestHelper > g_helper;
Expand Down Expand Up @@ -172,6 +173,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
std::tie(k, v) = *it;
++it;
}

auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
auto read_sgs = test_common::HSTestHelper::create_sgs(v.data_size_, block_size);

Expand Down Expand Up @@ -235,6 +237,17 @@ class RaftReplDevTest : public testing::Test {

TestReplicatedDB& pick_one_db() { return *dbs_[0]; }

#ifdef _PRERELEASE
void set_flip_point(const std::string flip_name) {
flip::FlipCondition null_cond;
flip::FlipFrequency freq;
freq.set_count(1);
freq.set_percent(100);
m_fc.inject_noreturn_flip(flip_name, {null_cond}, freq);
LOGDEBUG("Flip {} set", flip_name);
}
#endif

void switch_all_db_leader() {
for (auto const& db : dbs_) {
do {
Expand All @@ -250,9 +263,13 @@ class RaftReplDevTest : public testing::Test {

private:
std::vector< std::shared_ptr< TestReplicatedDB > > dbs_;
#ifdef _PRERELEASE
flip::FlipClient m_fc{iomgr_flip::instance()};
#endif
};

TEST_F(RaftReplDevTest, All_Append_Restart_Append) {

LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();

Expand Down Expand Up @@ -291,11 +308,42 @@ TEST_F(RaftReplDevTest, All_Append_Restart_Append) {
g_helper->sync_for_cleanup_start();
}

TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();

#ifdef _PRERELEASE
set_flip_point("simulate_fetch_remote_data");
#endif

if (g_helper->replica_num() == 0) {
// g_helper->sync_dataset_size(SISL_OPTIONS["num_io"].as< uint64_t >());
g_helper->sync_dataset_size(100);
auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();
LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size);
g_helper->runner().set_task([this, block_size]() {
this->generate_writes(block_size /* data_size */, block_size /* max_size_per_iov */);
});
g_helper->runner().execute().get();
}

this->wait_for_all_writes(g_helper->dataset_size());

g_helper->sync_for_verify_start();

// TODO: seems with filip and fetch remote, the data size is not correct;
LOGINFO("Validate all data written so far by reading them");
this->validate_all_data();

g_helper->sync_for_cleanup_start();
}

int main(int argc, char* argv[]) {
int parsed_argc{argc};
char** orig_argv = argv;

::testing::InitGoogleTest(&parsed_argc, argv);

SISL_OPTIONS_LOAD(parsed_argc, argv, logging, config, test_raft_repl_dev, iomgr, test_common_setup,
test_repl_common_setup);

Expand Down
Loading