diff --git a/conanfile.py b/conanfile.py index 6d817306..50286ff8 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "2.2.2" + version = "2.2.3" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" diff --git a/src/lib/homestore_backend/CMakeLists.txt b/src/lib/homestore_backend/CMakeLists.txt index 54b2c01c..27014911 100644 --- a/src/lib/homestore_backend/CMakeLists.txt +++ b/src/lib/homestore_backend/CMakeLists.txt @@ -75,6 +75,6 @@ add_test(NAME HomestoreTestDynamic # To test both baseline & incremental resync functionality, we use 13 to minimize the likelihood of it being a divisor of the total LSN (currently 30) add_test(NAME HomestoreTestDynamicWithResync - COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./ + COMMAND homestore_test_dynamic --gtest_filter="HomeObjectFixture.ReplaceMember" -csv error --executor immediate --config_path ./ --override_config homestore_config.consensus.snapshot_freq_distance:13 --override_config homestore_config.consensus.num_reserved_log_items=13) diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp index fa5d3889..bceae920 100644 --- a/src/lib/homestore_backend/pg_blob_iterator.cpp +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -228,6 +228,12 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe LOGW("Simulating loading blob data error"); return false; } + auto delay = iomgr_flip::instance()->get_test_flip< long >("read_snapshot_load_blob_latency", static_cast(info.blob_id)); + if (delay) { + LOGI("Simulating pg blob iterator load data with delay, delay:{}, blob_id:{}", delay.get(), + info.blob_id); + std::this_thread::sleep_for(std::chrono::milliseconds(delay.get())); + } #endif sisl::io_blob_safe blob; uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry); diff --git a/src/lib/homestore_backend/snapshot_receive_handler.cpp b/src/lib/homestore_backend/snapshot_receive_handler.cpp index d380a7a1..65de4017 100644 --- a/src/lib/homestore_backend/snapshot_receive_handler.cpp +++ b/src/lib/homestore_backend/snapshot_receive_handler.cpp @@ -143,6 +143,16 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob continue; } +#ifdef _PRERELEASE + auto delay = iomgr_flip::instance()->get_test_flip< long >("write_snapshot_save_blob_latency", + static_cast< long >(blob->blob_id())); + if (delay) { + LOGI("Simulating pg snapshot receive data with delay, delay:{}, blob_id:{}", delay.get(), + blob->blob_id()); + std::this_thread::sleep_for(std::chrono::milliseconds(delay.get())); + } +#endif + // Check duplication to avoid reprocessing. This may happen on resent blob batches. if (!ctx_->index_table) { std::shared_lock lock_guard(home_obj_._pg_lock); diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index b858a241..82fae3fe 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -47,15 +47,35 @@ class HomeObjectFixture : public ::testing::Test { g_helper->delete_homeobject(); } - void restart() { + void restart(uint32_t shutdown_delay_secs = 0u, uint32_t restart_delay_secs = 0u) { g_helper->sync(); trigger_cp(true); _obj_inst.reset(); + _obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->restart(shutdown_delay_secs, restart_delay_secs)); + // wait for leader to be elected + std::this_thread::sleep_for(std::chrono::seconds(5)); + } + + void stop() { + LOGINFO("Stoping homeobject replica={}", g_helper->my_replica_id()); + _obj_inst.reset(); + g_helper->homeobj_.reset(); + sleep(120); + } + + void start() { + LOGINFO("Starting homeobject replica={}", g_helper->my_replica_id()); + _obj_inst.reset(); _obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->restart()); // wait for leader to be elected std::this_thread::sleep_for(std::chrono::seconds(5)); } + void kill() { + LOGINFO("SigKilling homeobject replica={}", g_helper->my_replica_id()); + std::raise(SIGKILL); + } + /** * \brief create pg with a given id. * @@ -281,14 +301,17 @@ class HomeObjectFixture : public ::testing::Test { // TODO:make this run in parallel void verify_get_blob(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec, - uint64_t const num_blobs_per_shard, bool const use_random_offset = false) { + uint64_t const num_blobs_per_shard, bool const use_random_offset = false, + std::map pg_start_blob_id = std::map()) { uint32_t off = 0, len = 0; for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) { if (!am_i_in_pg(pg_id)) continue; blob_id_t current_blob_id{0}; + if (pg_start_blob_id.find(pg_id) != pg_start_blob_id.end()) current_blob_id = pg_start_blob_id[pg_id]; for (const auto& shard_id : shard_vec) { for (uint64_t k = 0; k < num_blobs_per_shard; k++) { + LOGDEBUG("going to verify blob pg {} shard {} blob {}", pg_id, shard_id, current_blob_id); auto blob = build_blob(current_blob_id); len = blob.body.size(); if (use_random_offset) { @@ -491,6 +514,17 @@ class HomeObjectFixture : public ::testing::Test { LOGINFO("Flip {} set", flip_name); } + template +void set_retval_flip(const std::string flip_name, const T retval, + uint32_t count = 1, uint32_t percent = 100, flip::FlipCondition cond = flip::FlipCondition()) + { + flip::FlipFrequency freq; + freq.set_count(count); + freq.set_percent(percent); + ASSERT_TRUE(m_fc.inject_retval_flip(flip_name, {cond}, freq, retval)); + LOGINFO("Flip {} with returned value set, value={}", flip_name, retval); + } + void set_delay_flip(const std::string flip_name, uint64_t delay_usec, uint32_t count = 1, uint32_t percent = 100) { flip::FlipCondition null_cond; flip::FlipFrequency freq; @@ -505,7 +539,7 @@ class HomeObjectFixture : public ::testing::Test { LOGINFO("Flip {} removed", flip_name); } #endif - + void RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval); private: std::random_device rnd{}; std::default_random_engine rnd_engine{rnd()}; diff --git a/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp b/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp index f8784f7d..c4a68cd3 100644 --- a/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp +++ b/src/lib/homestore_backend/tests/hs_repl_test_helper.hpp @@ -198,20 +198,16 @@ class HSReplTestHelper { ipc_data_ = new (region_->get_address()) IPCData; for (uint32_t i{1}; i < num_replicas; ++i) { - LOGINFO("Spawning Homeobject replica={} instance", i); - - std::string cmd_line; - fmt::format_to(std::back_inserter(cmd_line), "{} --replica_num {}", args_[0], i); - for (int j{1}; j < (int)args_.size(); ++j) { - fmt::format_to(std::back_inserter(cmd_line), " {}", args_[j]); - } - boost::process::child c(boost::process::cmd = cmd_line, proc_grp_); - c.detach(); + spawn_homeobject_process(i, false); } } else { shm_ = std::make_unique< bip::shared_memory_object >(bip::open_only, "HO_repl_test_shmem", bip::read_write); region_ = std::make_unique< bip::mapped_region >(*shm_, bip::read_write); ipc_data_ = static_cast< IPCData* >(region_->get_address()); + if (SISL_OPTIONS["is_restart"].as()) { + // reset sync point to the next sync point before restart + sync_point_num = ipc_data_->sync_point_num_ + 1; + } } int tmp_argc = 1; @@ -221,8 +217,41 @@ class HSReplTestHelper { app = std::make_shared< TestReplApplication >(*this); } + void spawn_homeobject_process(uint8_t replica_num, bool is_restart) { + std::string cmd_line; + fmt::format_to(std::back_inserter(cmd_line), "{} --replica_num {} --is_restart={}", args_[0], replica_num, + is_restart? "true" : "false"); + if (is_restart) { + auto ut = testing::UnitTest::GetInstance(); + std::string pattern = ""; + bool is_following = false; + for (int i = 0; i < ut->total_test_suite_count(); i++) { + auto ts = ut->GetTestSuite(i); + for (int j = 0; j < ts->total_test_count(); j++) { + auto ti = ts->GetTestInfo(j); + if (!is_following && ti == ut->current_test_info()) { is_following = true; } + if (is_following && ti->should_run()) { + if (!pattern.empty()) { fmt::format_to(std::back_inserter(pattern), ":"); } + fmt::format_to(std::back_inserter(pattern), "{}", ti->test_case_name()); + fmt::format_to(std::back_inserter(pattern), ".{}", ti->name()); + break; + } + } + } + LOGINFO("Restart, gtest filter pattern: {}", pattern); + if ("" != pattern) { fmt::format_to(std::back_inserter(cmd_line), " --gtest_filter={}", pattern); } + } + for (int j{1}; j < (int)args_.size(); ++j) { + fmt::format_to(std::back_inserter(cmd_line), " {}", args_[j]); + } + LOGINFO("Spawning Homeobject cmd: {}", cmd_line); + boost::process::child c(boost::process::cmd = cmd_line, proc_grp_); + c.detach(); + } + std::shared_ptr< homeobject::HomeObject > build_new_homeobject() { - prepare_devices(); + auto is_restart = SISL_OPTIONS["is_restart"].as< bool >(); + prepare_devices(!is_restart); homeobj_ = init_homeobject(std::weak_ptr< TestReplApplication >(app)); return homeobj_; } @@ -233,9 +262,16 @@ class HSReplTestHelper { remove_test_files(); } - std::shared_ptr< homeobject::HomeObject > restart(uint32_t shutdown_delay_secs = 5u) { - LOGINFO("Restarting homeobject replica={}", replica_num_); + std::shared_ptr< homeobject::HomeObject > restart(uint32_t shutdown_delay_secs = 0u, uint32_t restart_delay_secs = 0u) { + if (shutdown_delay_secs > 0) { + std::this_thread::sleep_for(std::chrono::seconds(shutdown_delay_secs)); + } + LOGINFO("Stoping homeobject after {} secs, replica={}", shutdown_delay_secs, replica_num_); homeobj_.reset(); + if (restart_delay_secs > 0) { + std::this_thread::sleep_for(std::chrono::seconds(restart_delay_secs)); + } + LOGINFO("Starting homeobject after {} secs, replica={}", restart_delay_secs, replica_num_); homeobj_ = init_homeobject(std::weak_ptr< TestReplApplication >(app)); return homeobj_; } diff --git a/src/lib/homestore_backend/tests/test_homestore_backend.cpp b/src/lib/homestore_backend/tests/test_homestore_backend.cpp index 35dd6079..ace76ddb 100644 --- a/src/lib/homestore_backend/tests/test_homestore_backend.cpp +++ b/src/lib/homestore_backend/tests/test_homestore_backend.cpp @@ -38,7 +38,9 @@ SISL_OPTION_GROUP( (qdepth, "", "qdepth", "Max outstanding operations", ::cxxopts::value< uint32_t >()->default_value("8"), "number"), (num_pgs, "", "num_pgs", "number of pgs", ::cxxopts::value< uint64_t >()->default_value("2"), "number"), (num_shards, "", "num_shards", "number of shards", ::cxxopts::value< uint64_t >()->default_value("4"), "number"), - (num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("20"), "number")); + (num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("20"), "number"), + (is_restart, "", "is_restart", "the process is restart or the first start", ::cxxopts::value< bool >()-> + default_value("false"), "true or false")); SISL_LOGGING_INIT(homeobject) #define test_options logging, config, homeobject, test_homeobject_repl_common diff --git a/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp b/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp index e339d918..0216611f 100644 --- a/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp +++ b/src/lib/homestore_backend/tests/test_homestore_backend_dynamic.cpp @@ -111,7 +111,7 @@ TEST_F(HomeObjectFixture, ReplaceMember) { // spare replica, run_if_in_pg(pg_id, [&]() { verify_get_blob(pg_shard_id_vec, num_blobs_per_shard); - verify_obj_count(1, num_blobs_per_shard, num_shards_per_pg, false); + verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false); }); // step 5: Verify no pg related data in out_member @@ -134,7 +134,7 @@ TEST_F(HomeObjectFixture, ReplaceMember) { restart(); run_if_in_pg(pg_id, [&]() { verify_get_blob(pg_shard_id_vec, num_blobs_per_shard); - verify_obj_count(1, num_blobs_per_shard, num_shards_per_pg, false); + verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false); LOGINFO("After restart, check pg related data in pg members successfully"); }); @@ -149,6 +149,129 @@ TEST_F(HomeObjectFixture, ReplaceMember) { } } +//Restart during baseline resync and timeout +TEST_F(HomeObjectFixture, RestartFollowerDuringBaselineResyncAndTimeout) { + RestartFollowerDuringBaselineResyncUsingSigKill(10000, 10000); +} + +// Test case to restart new member during baseline resync, it will start 4 process to simulate the 4 replicas, let's say P0, P1, P2 and P3. +// P0, P1, P2 are the original members of the pg, P3 is the spare replica. +// After the replace_member happens, P3 will join the pg, and then kill itself(sigkill) to simulate the restart during baseline resync. +// As P0 is the original process who spawn the other 3 processes, so P0 will also help to spawn a new process to simulate the new member restart. +void HomeObjectFixture::RestartFollowerDuringBaselineResyncUsingSigKill(uint64_t flip_delay, uint64_t restart_interval) { + LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num()); + auto spare_num_replicas = SISL_OPTIONS["spare_replicas"].as< uint8_t >(); + ASSERT_TRUE(spare_num_replicas > 0) << "we need spare replicas for homestore backend dynamic tests"; + + auto is_restart = SISL_OPTIONS["is_restart"].as< bool >(); + auto num_replicas = SISL_OPTIONS["replicas"].as< uint8_t >(); + pg_id_t pg_id{1}; + if(!is_restart) { +#ifdef _PRERELEASE + //simulate delay in snapshot read data + flip::FlipCondition cond; + blob_id_t blob_id = 7; + m_fc.create_condition("blob id", flip::Operator::EQUAL, static_cast(blob_id), &cond); + //This latency simulation is used to workaround the shutdown concurrency issue + // set_retval_flip("read_snapshot_load_blob_latency", static_cast(flip_delay) /*ms*/, 10, 100, cond1); + //simulate delay in snapshot write data + set_retval_flip("write_snapshot_save_blob_latency", static_cast(flip_delay) /*ms*/, 1, 100, cond); + +#endif + } + // ====================Stage 1: Create a pg without spare replicas and put blobs.==================== + + std::unordered_set< uint8_t > excluding_replicas_in_pg; + for (size_t i = num_replicas; i < num_replicas + spare_num_replicas; i++) + excluding_replicas_in_pg.insert(i); + + create_pg(pg_id, 0 /* pg_leader */, excluding_replicas_in_pg); + + auto num_shards_per_pg = SISL_OPTIONS["num_shards"].as< uint64_t >(); + auto num_blobs_per_shard = SISL_OPTIONS["num_blobs"].as< uint64_t >() / num_shards_per_pg; + + // we can not share all the shard_id and blob_id among all the replicas including the spare ones, so we need to + // derive them by calculating. + // since shard_id = pg_id + shard_sequence_num, so we can derive shard_ids for all the shards in this pg, and these + // derived info is used by all replicas(including the newly added member) to verify the blobs. + std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec; + std::map< pg_id_t, blob_id_t > pg_blob_id; + pg_blob_id[pg_id] = 0; + for (shard_id_t shard_id = 1; shard_id <= num_shards_per_pg; shard_id++) { + auto derived_shard_id = make_new_shard_id(pg_id, shard_id); + pg_shard_id_vec[pg_id].emplace_back(derived_shard_id); + } + + if(!is_restart) { + for (uint64_t j = 0; j < num_shards_per_pg; j++) + create_shard(pg_id, 64 * Mi); + + // put and verify blobs in the pg, excluding the spare replicas + put_blobs(pg_shard_id_vec, num_blobs_per_shard, pg_blob_id); + + verify_get_blob(pg_shard_id_vec, num_blobs_per_shard); + verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false); + + // all the replicas , including the spare ones, sync at this point + g_helper->sync(); + + // ====================Stage 2: replace a member==================== + auto out_member_id = g_helper->replica_id(num_replicas - 1); + auto in_member_id = g_helper->replica_id(num_replicas); /*spare replica*/ + + run_on_pg_leader(pg_id, [&]() { + auto r = _obj_inst->pg_manager() + ->replace_member(pg_id, out_member_id, PGMember{in_member_id, "new_member", 0}) + .get(); + ASSERT_TRUE(r); + }); + + // ====================Stage 3: the new member will kill itself to simulate restart, then P0 will help start it ==================== + if (in_member_id == g_helper->my_replica_id()) { + while (!am_i_in_pg(pg_id)) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + LOGINFO("new member is waiting to become a member of pg {}", pg_id); + } + wait_for_all(pg_shard_id_vec[pg_id].front() /*the first shard id in this pg*/, + num_blobs_per_shard - 1 /*the last blob id in this shard*/); + LOGINFO("the data in the first shard has been replicated to the new member"); + LOGINFO("about to kill new member") + // SyncPoint 1(new member): kill itself. + g_helper->sync(); + kill(); + } + + // SyncPoint 1(others): wait for the new member stop, then P0 will help start it. + LOGINFO("waiting for new member stop") + g_helper->sync(); + if (g_helper->replica_num() == 0) { + //wait for kill + std::this_thread::sleep_for(std::chrono::milliseconds(restart_interval)); + LOGINFO("going to restart new member") + g_helper->spawn_homeobject_process(num_replicas, true); + } + // SyncPoint 2(others): wait for restart completed, new member will call g_helper->sync() at setup func to end up this stage implicitly. + LOGINFO("waiting for new member start up") + g_helper->sync(); + // SyncPoint 3: waiting for all the blobs are replicated to the new member + g_helper->sync(); + } else { + // new member restart + while (!am_i_in_pg(pg_id)) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + LOGINFO("new member is waiting to become a member of pg {}", pg_id); + } + wait_for_all(pg_shard_id_vec[pg_id].back() /*the first shard id in this pg*/, + num_shards_per_pg * num_blobs_per_shard - 1 /*the last blob id in this shard*/); + run_if_in_pg(pg_id, [&]() { + verify_get_blob(pg_shard_id_vec, num_blobs_per_shard); + verify_obj_count(1, num_shards_per_pg, num_blobs_per_shard, false); + }); + // SyncPoint 3(new member): replication done, notify others. + g_helper->sync(); + } +} + SISL_OPTION_GROUP( test_homeobject_repl_common, (spdk, "", "spdk", "spdk", ::cxxopts::value< bool >()->default_value("false"), "true or false"), @@ -175,7 +298,9 @@ SISL_OPTION_GROUP( (qdepth, "", "qdepth", "Max outstanding operations", ::cxxopts::value< uint32_t >()->default_value("8"), "number"), (num_pgs, "", "num_pgs", "number of pgs", ::cxxopts::value< uint64_t >()->default_value("2"), "number"), (num_shards, "", "num_shards", "number of shards", ::cxxopts::value< uint64_t >()->default_value("4"), "number"), - (num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("20"), "number")); + (num_blobs, "", "num_blobs", "number of blobs", ::cxxopts::value< uint64_t >()->default_value("20"), "number"), + (is_restart, "", "is_restart", "the process is restart or the first start", ::cxxopts::value< bool >()-> + default_value("false"), "true or false")); SISL_LOGGING_INIT(homeobject) #define test_options logging, config, homeobject, test_homeobject_repl_common