From 198657e20e628e3b166c8b25603aece79db70f51 Mon Sep 17 00:00:00 2001 From: Harihara Kadayam Date: Tue, 30 Jan 2024 09:41:04 -0800 Subject: [PATCH] Restartability of ReplDev (#285) Following changes are done as part of this commit 1. Because of ordering of services, restart of homestore doesn't recover the ReplDev does not work. Fixed the ordering and also made logstore and data service start/stop inside ReplService start/stop. 2. Error handling was not present, as a result tests failure were hard to comprehend. Added error handling of propose, commit, write failures 3. Aesthetical change to have open_log_store to return future< logstore > instead of callback 4. Added become_leader() call to ReplDev and fixed test cases to utilize that to ensure write happens/issued on correct leader. 5. Added testing to restart, write after restart, validate of ReplDev --- src/include/homestore/checkpoint/cp_mgr.hpp | 6 + src/include/homestore/logstore_service.hpp | 7 +- .../homestore/replication/repl_decls.h | 35 ++++++ src/include/homestore/replication/repl_dev.h | 35 +++++- src/include/homestore/replication_service.hpp | 32 ----- src/include/homestore/superblk_handler.hpp | 86 ++++++------- src/lib/checkpoint/cp_mgr.cpp | 2 + src/lib/homestore.cpp | 45 ++++--- src/lib/logstore/log_store_family.cpp | 42 ++++--- src/lib/logstore/log_store_family.hpp | 17 +-- src/lib/logstore/log_store_service.cpp | 10 +- .../log_store/home_raft_log_store.cpp | 18 ++- .../replication/repl_dev/raft_repl_dev.cpp | 117 +++++++++++++++--- src/lib/replication/repl_dev/raft_repl_dev.h | 7 +- .../repl_dev/raft_state_machine.cpp | 50 ++++++-- .../replication/repl_dev/raft_state_machine.h | 2 +- .../replication/repl_dev/solo_repl_dev.cpp | 15 ++- src/lib/replication/repl_dev/solo_repl_dev.h | 2 +- .../replication/service/generic_repl_svc.cpp | 27 +++- .../replication/service/generic_repl_svc.h | 2 + .../replication/service/raft_repl_service.cpp | 88 ++++++++++--- .../replication/service/raft_repl_service.h | 3 + .../test_common/homestore_test_common.hpp | 7 +- src/tests/test_common/hs_repl_test_common.hpp | 42 ++++++- src/tests/test_log_store.cpp | 7 +- src/tests/test_raft_repl_dev.cpp | 71 +++++++++-- src/tests/test_solo_repl_dev.cpp | 4 + 27 files changed, 548 insertions(+), 231 deletions(-) diff --git a/src/include/homestore/checkpoint/cp_mgr.hpp b/src/include/homestore/checkpoint/cp_mgr.hpp index 9c783eb84..c3324711e 100644 --- a/src/include/homestore/checkpoint/cp_mgr.hpp +++ b/src/include/homestore/checkpoint/cp_mgr.hpp @@ -160,7 +160,13 @@ class CPManager { CPManager(); virtual ~CPManager(); + /// @brief Start the CPManager, which creates a first cp session. + /// @param first_time_boot void start(bool first_time_boot); + + /// @brief Start the cp timer so that periodic cps are started + void start_timer(); + /// @brief Shutdown the checkpoint manager services. It will not trigger a flush, but cancels any existing /// checkpoint session abruptly. If caller needs clean shutdown, then they explicitly needs to trigger cp flush /// before calling shutdown. diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index c10423e27..6d8fc09f9 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -92,8 +92,7 @@ class LogStoreService { * * @return std::shared_ptr< HomeLogStore > */ - std::shared_ptr< HomeLogStore > create_new_log_store(const logstore_family_id_t family_id, - const bool append_mode = false); + shared< HomeLogStore > create_new_log_store(logstore_family_id_t family_id, bool append_mode = false); /** * @brief Open an existing log store and does a recovery. It then creates an instance of this logstore and @@ -102,8 +101,8 @@ class LogStoreService { * @param store_id: Store ID of the log store to open * @return std::shared_ptr< HomeLogStore > */ - void open_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id, const bool append_mode, - const log_store_opened_cb_t& on_open_cb); + folly::Future< shared< HomeLogStore > > open_log_store(logstore_family_id_t family_id, logstore_id_t store_id, + bool append_mode); /** * @brief Close the log store instance and free-up the resources diff --git a/src/include/homestore/replication/repl_decls.h b/src/include/homestore/replication/repl_decls.h index 9f9fee69f..fd347044f 100644 --- a/src/include/homestore/replication/repl_decls.h +++ b/src/include/homestore/replication/repl_decls.h @@ -3,6 +3,8 @@ #include #include +#include + #include #include #include @@ -13,6 +15,39 @@ SISL_LOGGING_DECL(replication) #define REPL_LOG_MODS grpc_server, HOMESTORE_LOG_MODS, nuraft_mesg, nuraft, replication namespace homestore { +// clang-format off +VENUM(ReplServiceError, int32_t, + OK = 0, // Everything OK + CANCELLED = -1, // Request was cancelled + TIMEOUT = -2, + NOT_LEADER = -3, + BAD_REQUEST = -4, + SERVER_ALREADY_EXISTS = -5, + CONFIG_CHANGING = -6, + SERVER_IS_JOINING = -7, + SERVER_NOT_FOUND = -8, + CANNOT_REMOVE_LEADER = -9, + SERVER_IS_LEAVING = -10, + TERM_MISMATCH = -11, + RESULT_NOT_EXIST_YET = -10000, + NOT_IMPLEMENTED = -10001, + NO_SPACE_LEFT = -20000, + DRIVE_WRITE_ERROR = -20001, + FAILED = -32768); +// clang-format on + +template < typename V, typename E > +using Result = folly::Expected< V, E >; + +template < class V > +using ReplResult = Result< V, ReplServiceError >; + +template < class V, class E > +using AsyncResult = folly::SemiFuture< Result< V, E > >; + +template < class V = folly::Unit > +using AsyncReplResult = AsyncResult< V, ReplServiceError >; + using blkid_list_t = folly::small_vector< BlkId, 4 >; // Fully qualified domain pba, unique pba id across replica set diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 27727c943..28b494f0f 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -29,7 +29,8 @@ VENUM(repl_req_state_t, uint32_t, DATA_RECEIVED = 1 << 1, // Data has been received and being written to the storage DATA_WRITTEN = 1 << 2, // Data has been written to the storage LOG_RECEIVED = 1 << 3, // Log is received and waiting for data - LOG_FLUSHED = 1 << 4 // Log has been flushed + LOG_FLUSHED = 1 << 4, // Log has been flushed + ERRORED = 1 << 5 // Error has happened and cleaned up ) struct repl_key { @@ -149,12 +150,25 @@ class ReplDevListener { /// NOTE: Listener should do the free any resources created as part of pre-commit. /// /// @param lsn - The log sequence number getting rolled back - /// @param header - Header originally passed with repl_dev::write() api - /// @param key - Key originally passed with repl_dev::write() api - /// @param ctx - Context passed as part of the replica_set::write() api + /// @param header - Header originally passed with ReplDev::async_alloc_write() api + /// @param key - Key originally passed with ReplDev::async_alloc_write() api + /// @param ctx - Context passed as part of the ReplDev::async_alloc_write() api virtual void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key, cintrusive< repl_req_ctx >& ctx) = 0; + /// @brief Called when the async_alloc_write call failed to initiate replication + /// + /// Called only on the node which called async_alloc_write + /// + /// + /// NOTE: Listener should do the free any resources created as part of pre-commit. + /// + /// @param header - Header originally passed with ReplDev::async_alloc_write() api + /// @param key - Key originally passed with ReplDev::async_alloc_write() api + /// @param ctx - Context passed as part of the ReplDev::async_alloc_write() api + virtual void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) = 0; + /// @brief Called when replication module is trying to allocate a block to write the value /// /// This function can be called both on leader and follower when it is trying to allocate a block to write the @@ -176,7 +190,7 @@ class ReplDevListener { class ReplDev { public: ReplDev() = default; - virtual ~ReplDev() = default; + virtual ~ReplDev() { detach_listener(); } /// @brief Replicate the data to the replica set. This method goes through the /// following steps: @@ -217,6 +231,10 @@ class ReplDev { /// @param blkids - blkids to be freed. virtual void async_free_blks(int64_t lsn, MultiBlkId const& blkid) = 0; + /// @brief Try to switch the current replica where this method called to become a leader. + /// @return True if it is successful, false otherwise. + virtual AsyncReplResult<> become_leader() = 0; + /// @brief Checks if this replica is the leader in this ReplDev /// @return true or false virtual bool is_leader() const = 0; @@ -231,6 +249,13 @@ class ReplDev { virtual void attach_listener(shared< ReplDevListener > listener) { m_listener = std::move(listener); } + virtual void detach_listener() { + if (m_listener) { + m_listener->set_repl_dev(nullptr); + m_listener.reset(); + } + } + protected: shared< ReplDevListener > m_listener; }; diff --git a/src/include/homestore/replication_service.hpp b/src/include/homestore/replication_service.hpp index 72d24d626..d24722202 100644 --- a/src/include/homestore/replication_service.hpp +++ b/src/include/homestore/replication_service.hpp @@ -4,48 +4,16 @@ #include #include -#include #include #include #include namespace homestore { -// clang-format off -VENUM(ReplServiceError, int32_t, - OK = 0, // Everything OK - CANCELLED = -1, // Request was cancelled - TIMEOUT = -2, - NOT_LEADER = -3, - BAD_REQUEST = -4, - SERVER_ALREADY_EXISTS = -5, - CONFIG_CHANGING = -6, - SERVER_IS_JOINING = -7, - SERVER_NOT_FOUND = -8, - CANNOT_REMOVE_LEADER = -9, - SERVER_IS_LEAVING = -10, - TERM_MISMATCH = -11, - RESULT_NOT_EXIST_YET = -10000, - NOT_IMPLEMENTED = -10001, - FAILED = -32768); -// clang-format on - class ReplDev; class ReplDevListener; struct hs_stats; -template < typename V, typename E > -using Result = folly::Expected< V, E >; - -template < class V > -using ReplResult = Result< V, ReplServiceError >; - -template < class V, class E > -using AsyncResult = folly::SemiFuture< Result< V, E > >; - -template < class V = folly::Unit > -using AsyncReplResult = AsyncResult< V, ReplServiceError >; - VENUM(repl_impl_type, uint8_t, server_side, // Completely homestore controlled replication client_assisted, // Client assisting in replication diff --git a/src/include/homestore/superblk_handler.hpp b/src/include/homestore/superblk_handler.hpp index 699edc5b6..bf07efca2 100644 --- a/src/include/homestore/superblk_handler.hpp +++ b/src/include/homestore/superblk_handler.hpp @@ -31,42 +31,42 @@ class superblk { return ++s_count; } - superblk(const std::string& meta_name = "") { set_name(meta_name); } + superblk(const std::string& sub_name = "") { set_name(sub_name); } superblk(const superblk&) = delete; superblk& operator=(const superblk&) = delete; - superblk(superblk&& rhs) noexcept - : m_meta_mgr_cookie(rhs.m_meta_mgr_cookie) - , m_raw_buf(std::move(rhs.m_raw_buf)) - , m_sb(rhs.m_sb) - , m_metablk_name(std::move(rhs.m_metablk_name)) { - rhs.m_meta_mgr_cookie = nullptr; - rhs.m_sb = nullptr; + superblk(superblk&& rhs) noexcept : + m_meta_blk(rhs.m_meta_blk), + m_raw_buf(std::move(rhs.m_raw_buf)), + m_sb(rhs.m_sb), + m_meta_sub_name(std::move(rhs.m_meta_sub_name)) { + rhs.m_meta_blk = nullptr; + rhs.m_sb = nullptr; } superblk& operator=(superblk&& rhs) noexcept { if (this != &rhs) { - m_meta_mgr_cookie = rhs.m_meta_mgr_cookie; + m_meta_blk = rhs.m_meta_blk; m_raw_buf = std::move(rhs.m_raw_buf); m_sb = rhs.m_sb; - m_metablk_name = std::move(rhs.m_metablk_name); - rhs.m_meta_mgr_cookie = nullptr; + m_meta_sub_name = std::move(rhs.m_meta_sub_name); + rhs.m_meta_blk = nullptr; rhs.m_sb = nullptr; - } - return *this; + } + return *this; } - void set_name(const std::string& meta_name) { - if (meta_name.empty()) { - m_metablk_name = "meta_blk_" + std::to_string(next_count()); + void set_name(const std::string& sub_name) { + if (sub_name.empty()) { + m_meta_sub_name = "meta_blk_" + std::to_string(next_count()); } else { - m_metablk_name = meta_name; + m_meta_sub_name = sub_name; } } - T* load(const sisl::byte_view& buf, void* meta_cookie) { - m_meta_mgr_cookie = voidptr_cast(meta_cookie); + T* load(const sisl::byte_view& buf, void* meta_blk) { + m_meta_blk = voidptr_cast(meta_blk); m_raw_buf = meta_service().is_aligned_buf_needed(buf.size()) ? buf.extract(meta_service().align_size()) : buf.extract(0); m_sb = r_cast< T* >(m_raw_buf->bytes()); @@ -85,9 +85,9 @@ class superblk { } void destroy() { - if (m_meta_mgr_cookie) { - meta_service().remove_sub_sb(m_meta_mgr_cookie); - m_meta_mgr_cookie = nullptr; + if (m_meta_blk) { + meta_service().remove_sub_sb(m_meta_blk); + m_meta_blk = nullptr; } m_raw_buf.reset(); m_sb = nullptr; @@ -97,10 +97,10 @@ class superblk { sisl::byte_array raw_buf() { return m_raw_buf; } void write() { - if (m_meta_mgr_cookie) { - meta_service().update_sub_sb(m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_mgr_cookie); + if (m_meta_blk) { + meta_service().update_sub_sb(m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk); } else { - meta_service().add_sub_sb(m_metablk_name, m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_mgr_cookie); + meta_service().add_sub_sb(m_meta_sub_name, m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk); } } @@ -111,17 +111,17 @@ class superblk { T& operator*() { return *m_sb; } private: - void* m_meta_mgr_cookie{nullptr}; + void* m_meta_blk{nullptr}; sisl::byte_array m_raw_buf; T* m_sb{nullptr}; - std::string m_metablk_name; + std::string m_meta_sub_name; }; class json_superblk { private: - void* m_meta_mgr_cookie{nullptr}; + void* m_meta_blk{nullptr}; nlohmann::json m_json_sb; - std::string m_metablk_name; + std::string m_meta_sub_name; public: static uint64_t next_count() { @@ -129,24 +129,24 @@ class json_superblk { return ++s_count; } - json_superblk(const std::string& meta_name = "") { set_name(meta_name); } + json_superblk(const std::string& sub_name = "") { set_name(sub_name); } - void set_name(const std::string& meta_name) { - if (meta_name.empty()) { - m_metablk_name = "meta_blk_" + std::to_string(next_count()); + void set_name(const std::string& sub_name) { + if (sub_name.empty()) { + m_meta_sub_name = "meta_blk_" + std::to_string(next_count()); } else { - m_metablk_name = meta_name; + m_meta_sub_name = sub_name; } } - nlohmann::json& load(const sisl::byte_view& buf, void* meta_cookie) { - m_meta_mgr_cookie = voidptr_cast(meta_cookie); + nlohmann::json& load(const sisl::byte_view& buf, void* meta_blk) { + m_meta_blk = voidptr_cast(meta_blk); std::string_view const b{c_charptr_cast(buf.bytes()), buf.size()}; try { m_json_sb = nlohmann::json::from_msgpack(b); } catch (nlohmann::json::exception const& e) { - DEBUG_ASSERT(false, "Failed to load superblk for meta_blk={}", m_metablk_name); + DEBUG_ASSERT(false, "Failed to load superblk for meta_blk={}", m_meta_sub_name); return m_json_sb; } return m_json_sb; @@ -155,9 +155,9 @@ class json_superblk { nlohmann::json& create() { return m_json_sb; } void destroy() { - if (m_meta_mgr_cookie) { - meta_service().remove_sub_sb(m_meta_mgr_cookie); - m_meta_mgr_cookie = nullptr; + if (m_meta_blk) { + meta_service().remove_sub_sb(m_meta_blk); + m_meta_blk = nullptr; } m_json_sb = nlohmann::json{}; } @@ -166,10 +166,10 @@ class json_superblk { void write() { auto do_write = [this](sisl::blob const& b) { - if (m_meta_mgr_cookie) { - meta_service().update_sub_sb(b.cbytes(), b.size(), m_meta_mgr_cookie); + if (m_meta_blk) { + meta_service().update_sub_sb(b.cbytes(), b.size(), m_meta_blk); } else { - meta_service().add_sub_sb(m_metablk_name, b.cbytes(), b.size(), m_meta_mgr_cookie); + meta_service().add_sub_sb(m_meta_sub_name, b.cbytes(), b.size(), m_meta_blk); } }; diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index ad6e3efdb..fba5a6099 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -50,7 +50,9 @@ void CPManager::start(bool first_time_boot) { create_first_cp(); m_sb.write(); } +} +void CPManager::start_timer() { LOGINFO("cp timer is set to {} usec", HS_DYNAMIC_CONFIG(generic.cp_timer_us)); m_cp_timer_hdl = iomanager.schedule_global_timer( HS_DYNAMIC_CONFIG(generic.cp_timer_us) * 1000, true, nullptr /*cookie*/, iomgr::reactor_regex::all_worker, diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index b22164b5d..b05387ee5 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -116,19 +116,23 @@ bool HomeStore::start(const hs_input_params& input, hs_before_services_starting_ std::call_once(flag1, [this]() { m_periodic_logger = sisl::logging::CreateCustomLogger("homestore", "_periodic", false, true /* tee_to_stdout_stderr */); + sisl::logging::SetLogPattern("[%D %T.%f] [%^%L%$] [%t] %v", m_periodic_logger); }); - sisl::logging::SetLogPattern("[%D %T.%f] [%^%L%$] [%t] %v", m_periodic_logger); HomeStoreDynamicConfig::init_settings_default(); LOGINFO("Homestore is loading with following services: {}", m_services.list()); if (has_meta_service()) { m_meta_service = std::make_unique< MetaBlkService >(); } - if (has_log_service()) { m_log_service = std::make_unique< LogStoreService >(); } - if (has_data_service()) { m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector)); } if (has_index_service()) { m_index_service = std::make_unique< IndexService >(std::move(s_index_cbs)); } if (has_repl_data_service()) { m_repl_service = GenericReplService::create(std::move(s_repl_app)); + m_log_service = std::make_unique< LogStoreService >(); m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector)); + } else { + if (has_log_service()) { m_log_service = std::make_unique< LogStoreService >(); } + if (has_data_service()) { + m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector)); + } } m_cp_mgr = std::make_unique< CPManager >(); m_dev_mgr = std::make_unique< DeviceManager >(input.devices, bind_this(HomeStore::create_vdev_cb, 2)); @@ -208,17 +212,18 @@ void HomeStore::do_start() { if (has_index_service()) { m_index_service->start(); } - if (has_data_service()) { - m_data_service->start(); - } else if (has_repl_data_service()) { - m_data_service->start(); - s_cast< GenericReplService* >(m_repl_service.get())->start(); + if (has_repl_data_service()) { + s_cast< GenericReplService* >(m_repl_service.get())->start(); // Replservice starts logstore & data service + } else { + if (has_data_service()) { m_data_service->start(); } + if (has_log_service() && inp_params.auto_recovery) { + // In case of custom recovery, let consumer starts the recovery and it is consumer module's responsibilities + // to start log store + m_log_service->start(is_first_time_boot() /* format */); + } } - // In case of custom recovery, let consumer starts the recovery and it is consumer module's responsibilities - // to start log store - if (has_log_service() && inp_params.auto_recovery) { m_log_service->start(is_first_time_boot() /* format */); } - + m_cp_mgr->start_timer(); m_init_done = true; } @@ -234,8 +239,17 @@ void HomeStore::shutdown() { m_cp_mgr.reset(); if (has_repl_data_service()) { + // Log and Data services are stopped by repl service s_cast< GenericReplService* >(m_repl_service.get())->stop(); + m_log_service.reset(); + m_data_service.reset(); m_repl_service.reset(); + } else { + if (has_log_service()) { + m_log_service->stop(); + m_log_service.reset(); + } + if (has_data_service()) { m_data_service.reset(); } } if (has_index_service()) { @@ -243,18 +257,11 @@ void HomeStore::shutdown() { // m_index_service.reset(); } - if (has_log_service()) { - m_log_service->stop(); - m_log_service.reset(); - } - if (has_meta_service()) { m_meta_service->stop(); m_meta_service.reset(); } - if (has_data_service()) { m_data_service.reset(); } - m_dev_mgr->close_devices(); m_dev_mgr.reset(); diff --git a/src/lib/logstore/log_store_family.cpp b/src/lib/logstore/log_store_family.cpp index 0af47625d..33e1e4313 100644 --- a/src/lib/logstore/log_store_family.cpp +++ b/src/lib/logstore/log_store_family.cpp @@ -63,7 +63,7 @@ void LogStoreFamily::start(bool format, JournalVirtualDev* blk_store) { // Also call the logstore to inform that start/replay is completed. if (!format) { for (auto& p : m_id_logstore_map) { - auto& lstore{p.second.m_log_store}; + auto& lstore{p.second.log_store}; if (lstore && lstore->get_log_replay_done_cb()) { lstore->get_log_replay_done_cb()(lstore, lstore->seq_num() - 1); lstore->truncate(lstore->truncated_upto()); @@ -90,18 +90,25 @@ std::shared_ptr< HomeLogStore > LogStoreFamily::create_new_log_store(bool append folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); const auto it = m_id_logstore_map.find(store_id); HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_family_id, store_id); - m_id_logstore_map.insert(std::make_pair<>(store_id, logstore_info_t{lstore, nullptr, append_mode})); + m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode})); } LOGINFO("Created log store id {}-{}", m_family_id, store_id); return lstore; } -void LogStoreFamily::open_log_store(logstore_id_t store_id, bool append_mode, const log_store_opened_cb_t& on_open_cb) { +folly::Future< shared< HomeLogStore > > LogStoreFamily::open_log_store(logstore_id_t store_id, bool append_mode) { folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); - const auto it = m_id_logstore_map.find(store_id); - HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_family_id, store_id); - LOGINFO("Opening log store id {}-{}", m_family_id, store_id); - m_id_logstore_map.insert(std::make_pair<>(store_id, logstore_info_t{nullptr, on_open_cb, append_mode})); + auto it = m_id_logstore_map.find(store_id); + if (it == m_id_logstore_map.end()) { + bool happened; + std::tie(it, happened) = m_id_logstore_map.insert(std::pair(store_id, + logstore_info{ + .log_store = nullptr, + .append_mode = append_mode, + })); + HS_REL_ASSERT_EQ(happened, true, "Unable to insert logstore into id_logstore_map"); + } + return it->second.promise.getFuture(); } void LogStoreFamily::remove_log_store(logstore_id_t store_id) { @@ -143,9 +150,10 @@ void LogStoreFamily::device_truncate(const std::shared_ptr< truncate_req >& treq void LogStoreFamily::on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb) { folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); - const auto it = m_id_logstore_map.find(store_id); + auto it = m_id_logstore_map.find(store_id); if (it == m_id_logstore_map.end()) { - LOGERROR("Store Id {}-{} found but not opened yet.", m_family_id, store_id); + LOGERROR("Store Id {}-{} found but not opened yet, it will be discarded after logstore is started", m_family_id, + store_id); m_unopened_store_id.insert(store_id); m_unopened_store_io.insert(std::make_pair<>(store_id, 0)); return; @@ -153,9 +161,9 @@ void LogStoreFamily::on_log_store_found(logstore_id_t store_id, const logstore_s LOGINFO("Found a logstore store_id={}-{} with start seq_num={}, Creating a new HomeLogStore instance", m_family_id, store_id, sb.m_first_seq_num); - auto& l_info = const_cast< logstore_info_t& >(it->second); - l_info.m_log_store = std::make_shared< HomeLogStore >(*this, store_id, l_info.append_mode, sb.m_first_seq_num); - if (l_info.m_on_log_store_opened) l_info.m_on_log_store_opened(l_info.m_log_store); + logstore_info& info = it->second; + info.log_store = std::make_shared< HomeLogStore >(*this, store_id, info.append_mode, sb.m_first_seq_num); + info.promise.setValue(info.log_store); } static thread_local std::vector< std::shared_ptr< HomeLogStore > > s_cur_flush_batch_stores; @@ -189,7 +197,7 @@ void LogStoreFamily::on_logfound(logstore_id_t id, logstore_seq_num_t seq_num, l ++unopened_it->second; return; } - log_store = it->second.m_log_store.get(); + log_store = it->second.log_store.get(); } if (!log_store) { return; } log_store->on_log_found(seq_num, ld_key, flush_ld_key, buf); @@ -232,7 +240,7 @@ logdev_key LogStoreFamily::do_device_truncate(bool dry_run) { { folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); for (auto& id_logstore : m_id_logstore_map) { - auto& store_ptr = id_logstore.second.m_log_store; + auto& store_ptr = id_logstore.second.log_store; const auto& trunc_info = store_ptr->pre_device_truncation(); if (!trunc_info.pending_dev_truncation && !trunc_info.active_writes_not_part_of_truncation) { @@ -290,8 +298,8 @@ nlohmann::json LogStoreFamily::dump_log_store(const log_dump_req& dump_req) { if (dump_req.log_store == nullptr) { folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); for (auto& id_logstore : m_id_logstore_map) { - auto store_ptr{id_logstore.second.m_log_store}; - const std::string id{std::to_string(store_ptr->get_store_id())}; + auto store_ptr = id_logstore.second.log_store; + const std::string id = std::to_string(store_ptr->get_store_id()); // must use operator= construction as copy construction results in error nlohmann::json val = store_ptr->dump_log_store(dump_req); json_dump[id] = std::move(val); @@ -320,7 +328,7 @@ nlohmann::json LogStoreFamily::get_status(int verbosity) const { { folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx); for (const auto& [id, lstore] : m_id_logstore_map) { - js["logstore_id_" + std::to_string(id)] = lstore.m_log_store->get_status(verbosity); + js["logstore_id_" + std::to_string(id)] = lstore.log_store->get_status(verbosity); } } return js; diff --git a/src/lib/logstore/log_store_family.hpp b/src/lib/logstore/log_store_family.hpp index c92d05633..b451a70bf 100644 --- a/src/lib/logstore/log_store_family.hpp +++ b/src/lib/logstore/log_store_family.hpp @@ -28,7 +28,7 @@ #include #include -#include +#include #include #include @@ -37,10 +37,10 @@ namespace homestore { struct log_dump_req; -struct logstore_info_t { - std::shared_ptr< HomeLogStore > m_log_store; - log_store_opened_cb_t m_on_log_store_opened; +struct logstore_info { + std::shared_ptr< HomeLogStore > log_store; bool append_mode; + folly::SharedPromise< std::shared_ptr< HomeLogStore > > promise{}; }; struct truncate_req { @@ -71,8 +71,9 @@ class LogStoreFamily { void start(const bool format, JournalVirtualDev* blk_store); void stop(); - std::shared_ptr< HomeLogStore > create_new_log_store(bool append_mode = false); - void open_log_store(logstore_id_t store_id, bool append_mode, const log_store_opened_cb_t& on_open_cb); + shared< HomeLogStore > create_new_log_store(bool append_mode = false); + folly::Future< shared< HomeLogStore > > open_log_store(logstore_id_t store_id, bool append_mode); + bool close_log_store(logstore_id_t store_id) { // TODO: Implement this method return true; @@ -101,8 +102,8 @@ class LogStoreFamily { void on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key); private: - folly::SharedMutexWritePriority m_store_map_mtx; - std::unordered_map< logstore_id_t, logstore_info_t > m_id_logstore_map; + mutable folly::SharedMutexWritePriority m_store_map_mtx; + std::unordered_map< logstore_id_t, logstore_info > m_id_logstore_map; std::unordered_map< logstore_id_t, uint64_t > m_unopened_store_io; std::unordered_set< logstore_id_t > m_unopened_store_id; std::unordered_map< logstore_id_t, logid_t > m_last_flush_info; diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 67f2dc1dd..e8f74d60e 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -105,18 +105,18 @@ void LogStoreService::stop() { } } -std::shared_ptr< HomeLogStore > LogStoreService::create_new_log_store(const logstore_family_id_t family_id, - const bool append_mode) { +shared< HomeLogStore > LogStoreService::create_new_log_store(const logstore_family_id_t family_id, + const bool append_mode) { HS_REL_ASSERT_LT(family_id, num_log_families); COUNTER_INCREMENT(m_metrics, logstores_count, 1); return m_logstore_families[family_id]->create_new_log_store(append_mode); } -void LogStoreService::open_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id, - const bool append_mode, const log_store_opened_cb_t& on_open_cb) { +folly::Future< shared< HomeLogStore > > LogStoreService::open_log_store(logstore_family_id_t family_id, + logstore_id_t store_id, bool append_mode) { HS_REL_ASSERT_LT(family_id, num_log_families); COUNTER_INCREMENT(m_metrics, logstores_count, 1); - return m_logstore_families[family_id]->open_log_store(store_id, append_mode, on_open_cb); + return m_logstore_families[family_id]->open_log_store(store_id, append_mode); } void LogStoreService::remove_log_store(const logstore_family_id_t family_id, const logstore_id_t store_id) { diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index bb3b7d2fb..a8718053d 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -67,13 +67,14 @@ HomeRaftLogStore::HomeRaftLogStore(logstore_id_t logstore_id) { } else { m_logstore_id = logstore_id; LOGDEBUGMOD(replication, "Opening existing home log store id={}", logstore_id); - logstore_service().open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, logstore_id, true, - [this](shared< HomeLogStore > log_store) { - m_log_store = std::move(log_store); - DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(), - "Mismatch in passed and create logstore id"); - REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully"); - }); + logstore_service() + .open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, logstore_id, true) + .thenValue([this](auto log_store) { + m_log_store = std::move(log_store); + DEBUG_ASSERT_EQ(m_logstore_id, m_log_store->get_store_id(), + "Mismatch in passed and create logstore id"); + REPL_STORE_LOG(DEBUG, "Home Log store created/opened successfully"); + }); } } @@ -85,20 +86,17 @@ void HomeRaftLogStore::remove_store() { ulong HomeRaftLogStore::next_slot() const { uint64_t next_slot = to_repl_lsn(m_log_store->get_contiguous_issued_seq_num(m_last_durable_lsn)) + 1; - REPL_STORE_LOG(DEBUG, "next_slot()={}", next_slot); return next_slot; } ulong HomeRaftLogStore::start_index() const { // start_index starts from 1. ulong start_index = std::max((repl_lsn_t)1, to_repl_lsn(m_log_store->truncated_upto()) + 1); - REPL_STORE_LOG(DEBUG, "start_index()={}", start_index); return start_index; } nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::last_entry() const { store_lsn_t max_seq = m_log_store->get_contiguous_issued_seq_num(m_last_durable_lsn); - REPL_STORE_LOG(DEBUG, "last_entry() store seqnum={}", max_seq); if (max_seq < 0) { return m_dummy_log_entry; } nuraft::ptr< nuraft::log_entry > nle; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 0ee5885e8..6ecf7dfd8 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -38,11 +38,12 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk } if (m_rd_sb->is_timeline_consistent) { - logstore_service().open_log_store(LogStoreService::CTRL_LOG_FAMILY_IDX, m_rd_sb->free_blks_journal_id, - false, [this](shared< HomeLogStore > log_store) { - m_free_blks_journal = std::move(log_store); - m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id(); - }); + logstore_service() + .open_log_store(LogStoreService::CTRL_LOG_FAMILY_IDX, m_rd_sb->free_blks_journal_id, false) + .thenValue([this](auto log_store) { + m_free_blks_journal = std::move(log_store); + m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id(); + }); } } else { m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine); @@ -66,6 +67,18 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk // m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 2)); } +bool RaftReplDev::join_group() { + auto raft_result = + m_msg_mgr.join_group(m_group_id, "homestore_replication", + std::dynamic_pointer_cast< nuraft_mesg::mesg_state_mgr >(shared_from_this())); + if (!raft_result) { + HS_DBG_ASSERT(false, "Unable to join the group_id={} with error={}", boost::uuids::to_string(m_group_id), + raft_result.error()); + return false; + } + return true; +} + void RaftReplDev::use_config(json_superblk raft_config_sb) { m_raft_config_sb = std::move(raft_config_sb); } void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, @@ -85,18 +98,29 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& auto status = data_service().alloc_blks(uint32_cast(rreq->value.size), m_listener->get_blk_alloc_hints(rreq->header, rreq->value.size), rreq->local_blkid); - HS_REL_ASSERT_EQ(status, BlkAllocStatus::SUCCESS); + if (status != BlkAllocStatus::SUCCESS) { + HS_DBG_ASSERT_EQ(status, BlkAllocStatus::SUCCESS, "Unable to allocate blks"); + handle_error(rreq, ReplServiceError::NO_SPACE_LEFT); + return; + } + rreq->state.fetch_or(uint32_cast(repl_req_state_t::BLK_ALLOCATED)); // Write the data data_service().async_write(rreq->value, rreq->local_blkid).thenValue([this, rreq](auto&& err) { - HS_REL_ASSERT(!err, "Error in writing data"); // TODO: Find a way to return error to the Listener - rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); - m_state_machine->propose_to_raft(std::move(rreq)); + if (!err) { + rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); + auto raft_status = m_state_machine->propose_to_raft(std::move(rreq)); + if (raft_status != ReplServiceError::OK) { handle_error(rreq, raft_status); } + } else { + HS_DBG_ASSERT(false, "Error in writing data"); + handle_error(rreq, ReplServiceError::DRIVE_WRITE_ERROR); + } }); } else { RD_LOG(INFO, "Skipping data channel send since value size is 0"); rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); - m_state_machine->propose_to_raft(std::move(rreq)); + auto raft_status = m_state_machine->propose_to_raft(std::move(rreq)); + if (raft_status != ReplServiceError::OK) { handle_error(rreq, raft_status); } } } @@ -120,8 +144,13 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { group_msg_service() ->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, PUSH_DATA, rreq->pkts) - .via(&folly::InlineExecutor::instance()) - .thenValue([this, rreq = std::move(rreq)](auto e) { + .deferValue([this, rreq = std::move(rreq)](auto e) { + if (e.hasError()) { + RD_LOG(ERROR, "Data Channel: Error in pushing data to all followers: rreq=[{}] error={}", + rreq->to_compact_string(), e.error()); + handle_error(rreq, RaftReplService::to_repl_error(e.error())); + return; + } // Release the buffer which holds the packets RD_LOG(INFO, "Data Channel: Data push completed for rreq=[{}]", rreq->to_compact_string()); rreq->fb_builder.Release(); @@ -129,6 +158,46 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq) { }); } +void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) { + if (err == ReplServiceError::OK) { return; } + + auto s = rreq->state.load(); + if ((s & uint32_cast(repl_req_state_t::ERRORED)) || + !(rreq->state.compare_exchange_strong(s, s | uint32_cast(repl_req_state_t::ERRORED)))) { + RD_LOG(INFO, "Raft Channel: Error in processing rreq=[{}] error={} already errored", rreq->to_compact_string(), + err); + return; + } + + // Free the blks which is allocated already + RD_LOG(INFO, "Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_compact_string(), err); + if (rreq->state.load() & uint32_cast(repl_req_state_t::BLK_ALLOCATED)) { + auto blkid = rreq->local_blkid; + data_service().async_free_blk(blkid).thenValue([blkid](auto&& err) { + HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak", blkid.to_string()); + }); + } + + HS_DBG_ASSERT(!(rreq->state.load() & uint32_cast(repl_req_state_t::LOG_FLUSHED)), + "Unexpected state, received error after log is flushed for rreq=[{}]", rreq->to_compact_string()); + + if (rreq->is_proposer) { + // Notify the proposer about the error + m_listener->on_error(err, rreq->header, rreq->key, rreq); + rreq->fb_builder.Release(); + rreq->pkts.clear(); + } else { + // Complete the response hence proposer can free up its resources + rreq->header = sisl::blob{}; + rreq->key = sisl::blob{}; + rreq->pkts = sisl::io_blob_list_t{}; + if (rreq->rpc_data) { + rreq->rpc_data->send_response(); + rreq->rpc_data = nullptr; + } + } +} + void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) { auto const& incoming_buf = rpc_data->request_blob(); auto const fb_size = @@ -171,10 +240,14 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d data_service() .async_write(r_cast< const char* >(data), push_req->data_size(), rreq->local_blkid) .thenValue([this, rreq](auto&& err) { - RD_REL_ASSERT(!err, "Error in writing data"); // TODO: Find a way to return error to the Listener - rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); - rreq->data_written_promise.setValue(); - RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); + if (err) { + RD_DBG_ASSERT(false, "Error in writing data"); + handle_error(rreq, ReplServiceError::DRIVE_WRITE_ERROR); + } else { + rreq->state.fetch_or(uint32_cast(repl_req_state_t::DATA_WRITTEN)); + rreq->data_written_promise.setValue(); + RD_LOG(INFO, "Data Channel: Data Write completed rreq=[{}]", rreq->to_compact_string()); + } }); } @@ -289,6 +362,16 @@ void RaftReplDev::async_free_blks(int64_t, MultiBlkId const& bid) { data_service().async_free_blk(bid); } +AsyncReplResult<> RaftReplDev::become_leader() { + return m_msg_mgr.become_leader(m_group_id).deferValue([this](auto&& e) { + if (e.hasError()) { + RD_LOG(ERROR, "Error in becoming leader: {}", e.error()); + return make_async_error<>(RaftReplService::to_repl_error(e.error())); + } + return make_async_success<>(); + }); +} + bool RaftReplDev::is_leader() const { return m_repl_svc_ctx->is_raft_leader(); } uint32_t RaftReplDev::get_blk_size() const { return data_service().get_blk_size(); } @@ -418,6 +501,8 @@ void RaftReplDev::leave() { /////////////////////////////////// Private metohds //////////////////////////////////// void RaftReplDev::report_committed(repl_req_ptr_t rreq) { + if (rreq->local_blkid.is_valid()) { data_service().commit_blk(rreq->local_blkid); } + auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn); RD_DBG_ASSERT_GT(rreq->lsn, prev_lsn, "Out of order commit of lsns, it is not expected in RaftReplDev"); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 64336d9a9..066c34174 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -31,7 +31,9 @@ using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >; class RaftReplService; class CP; -class RaftReplDev : public ReplDev, public nuraft_mesg::mesg_state_mgr { +class RaftReplDev : public ReplDev, + public nuraft_mesg::mesg_state_mgr, + public std::enable_shared_from_this< RaftReplDev > { private: shared< RaftStateMachine > m_state_machine; RaftReplService& m_repl_svc; @@ -64,6 +66,7 @@ class RaftReplDev : public ReplDev, public nuraft_mesg::mesg_state_mgr { RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk >&& rd_sb, bool load_existing); virtual ~RaftReplDev() = default; + bool join_group(); void destroy(); //////////////// All ReplDev overrides/implementation /////////////////////// @@ -72,6 +75,7 @@ class RaftReplDev : public ReplDev, public nuraft_mesg::mesg_state_mgr { folly::Future< std::error_code > async_read(MultiBlkId const& blkid, sisl::sg_list& sgs, uint32_t size, bool part_of_batch = false) override; void async_free_blks(int64_t lsn, MultiBlkId const& blkid) override; + AsyncReplResult<> become_leader() override; bool is_leader() const override; group_id_t group_id() const override { return m_group_id; } std::string group_id_str() const { return boost::uuids::to_string(m_group_id); } @@ -113,6 +117,7 @@ class RaftReplDev : public ReplDev, public nuraft_mesg::mesg_state_mgr { shared< nuraft::log_store > data_journal() { return m_data_journal; } void push_data_to_all_followers(repl_req_ptr_t rreq); void on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data); + void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err); }; } // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index e25f965f3..f1811fd77 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -3,6 +3,7 @@ #include #include +#include "service/raft_repl_service.h" #include "repl_dev/raft_state_machine.h" #include "repl_dev/raft_repl_dev.h" @@ -59,7 +60,7 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); } -void RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { +ReplServiceError RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { uint32_t val_size = rreq->value.size ? rreq->local_blkid.serialized_size() : 0; uint32_t entry_size = sizeof(repl_journal_entry) + rreq->header.size() + rreq->key.size() + val_size; rreq->alloc_journal_entry(entry_size, true /* raft_buf */); @@ -98,8 +99,15 @@ void RaftStateMachine::propose_to_raft(repl_req_ptr_t rreq) { RD_LOG(TRACE, "Raft Channel: journal_entry=[{}] ", rreq->journal_entry->to_string()); - m_rd.raft_server()->append_entries_ext(*vec, param); + auto append_status = m_rd.raft_server()->append_entries_ext(*vec, param); sisl::VectorPool< raft_buf_ptr_t >::free(vec); + + if (append_status && !append_status->get_accepted()) { + RD_LOG(ERROR, "Raft Channel: Failed to propose rreq=[{}] result_code={}", rreq->to_compact_string(), + append_status->get_result_code()); + return RaftReplService::to_repl_error(append_status->get_result_code()); + } + return ReplServiceError::OK; } repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::log_entry >& lentry) { @@ -109,25 +117,41 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo // We don't want to transform anything that is not an app log if (lentry->get_val_type() != nuraft::log_val_type::app_log) { return nullptr; } - repl_journal_entry* jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin()); - RELEASE_ASSERT_EQ(jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, - "Mismatched version of journal entry received from RAFT peer"); + // Validate the journal entry and see if it needs to be processed + { + repl_journal_entry* tmp_jentry = r_cast< repl_journal_entry* >(lentry->get_buf().data_begin()); + RELEASE_ASSERT_EQ(tmp_jentry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, + "Mismatched version of journal entry received from RAFT peer"); + + RD_LOG(TRACE, "Received Raft log_entry=[term={}], journal_entry=[{}] ", lentry->get_term(), + tmp_jentry->to_string()); - RD_LOG(TRACE, "Received Raft log_entry=[term={}], journal_entry=[{}] ", lentry->get_term(), jentry->to_string()); + // For inline data we don't need to transform anything + if (tmp_jentry->code != journal_type_t::HS_LARGE_DATA) { return nullptr; } + + DEBUG_ASSERT_GT(tmp_jentry->value_size, 0, "Entry marked as large data, but value size is notified as 0"); + } - // For inline data we don't need to transform anything - if (jentry->code != journal_type_t::HS_LARGE_DATA) { return nullptr; } + auto log_to_journal_entry = [](raft_buf_ptr_t const& log_buf, auto const log_buf_data_offset) { + repl_journal_entry* jentry = r_cast< repl_journal_entry* >(log_buf->data_begin() + log_buf_data_offset); + sisl::blob const header = + sisl::blob{uintptr_cast(jentry) + sizeof(repl_journal_entry), jentry->user_header_size}; + sisl::blob const key = sisl::blob{header.cbytes() + header.size(), jentry->key_size}; + return std::make_tuple(jentry, header, key); + }; - sisl::blob const header = sisl::blob{uintptr_cast(jentry) + sizeof(repl_journal_entry), jentry->user_header_size}; - sisl::blob const key = sisl::blob{header.cbytes() + header.size(), jentry->key_size}; - DEBUG_ASSERT_GT(jentry->value_size, 0, "Entry marked as large data, but value size is notified as 0"); + // Serialize the log_entry buffer which returns the actual raft log_entry buffer. + auto log_buf = lentry->serialize(); + auto const log_buf_data_offset = log_buf->size() - lentry->get_buf().size(); + auto const [jentry, header, key] = log_to_journal_entry(log_buf, log_buf_data_offset); // From the repl_key, get the repl_req. In cases where log stream got here first, this method will create a new // repl_req and return that back. Fill up all of the required journal entry inside the repl_req auto rreq = m_rd.follower_create_req( repl_key{.server_id = jentry->server_id, .term = lentry->get_term(), .dsn = jentry->dsn}, header, key, jentry->value_size); - rreq->journal_buf = lentry->serialize(); + rreq->journal_buf = std::move(log_buf); + rreq->journal_entry = jentry; MultiBlkId entry_blkid; entry_blkid.deserialize(sisl::blob{key.cbytes() + key.size(), jentry->value_size}, true /* copy */); @@ -141,6 +165,7 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo auto new_buf = nuraft::buffer::expand(*rreq->raft_journal_buf(), rreq->raft_journal_buf()->size() + local_size - remote_size); blkid_location = uintptr_cast(new_buf->data_begin()) + rreq->raft_journal_buf()->size() - jentry->value_size; + std::tie(rreq->journal_entry, rreq->header, rreq->key) = log_to_journal_entry(new_buf, log_buf_data_offset); rreq->journal_buf = std::move(new_buf); } else { // Can do in-place replace of remote blkid with local blkid. @@ -148,7 +173,6 @@ repl_req_ptr_t RaftStateMachine::transform_journal_entry(nuraft::ptr< nuraft::lo jentry->value_size; } std::memcpy(blkid_location, rreq->local_blkid.serialize().cbytes(), local_size); - rreq->journal_entry = r_cast< repl_journal_entry* >(rreq->raft_journal_buf()->data_begin()); return rreq; } diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index c341ebd3b..3f9d10eaf 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -108,7 +108,7 @@ class RaftStateMachine : public nuraft::state_machine { nuraft::ptr< nuraft::snapshot > last_snapshot() override { return nullptr; } ////////// APIs outside of nuraft::state_machine requirements //////////////////// - void propose_to_raft(repl_req_ptr_t rreq); + ReplServiceError propose_to_raft(repl_req_ptr_t rreq); repl_req_ptr_t transform_journal_entry(nuraft::ptr< nuraft::log_entry >& lentry); void link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn); repl_req_ptr_t lsn_to_req(int64_t lsn); diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index 3d62182ea..6277c2368 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -10,8 +10,13 @@ namespace homestore { SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existing) : m_rd_sb{std::move(rd_sb)}, m_group_id{m_rd_sb->group_id} { if (load_existing) { - logstore_service().open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, m_rd_sb->data_journal_id, true, - bind_this(SoloReplDev::on_data_journal_created, 1)); + logstore_service() + .open_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, m_rd_sb->data_journal_id, true /* append_mode */) + .thenValue([this](auto log_store) { + m_data_journal = std::move(log_store); + m_rd_sb->data_journal_id = m_data_journal->get_store_id(); + m_data_journal->register_log_found_cb(bind_this(SoloReplDev::on_log_found, 3)); + }); } else { m_data_journal = logstore_service().create_new_log_store(LogStoreService::DATA_LOG_FAMILY_IDX, true /* append_mode */); @@ -20,12 +25,6 @@ SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existi } } -void SoloReplDev::on_data_journal_created(shared< HomeLogStore > log_store) { - m_data_journal = std::move(log_store); - m_rd_sb->data_journal_id = m_data_journal->get_store_id(); - m_data_journal->register_log_found_cb(bind_this(SoloReplDev::on_log_found, 3)); -} - void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, repl_req_ptr_t rreq) { if (!rreq) { auto rreq = repl_req_ptr_t(new repl_req_ctx{}); } diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 331003f4a..1c104c2fc 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -46,6 +46,7 @@ class SoloReplDev : public ReplDev { void async_free_blks(int64_t lsn, MultiBlkId const& blkid) override; + AsyncReplResult<> become_leader() override { return make_async_error(ReplServiceError::OK); } bool is_leader() const override { return true; } uuid_t group_id() const override { return m_group_id; } @@ -56,7 +57,6 @@ class SoloReplDev : public ReplDev { void cp_cleanup(CP* cp); private: - void on_data_journal_created(shared< HomeLogStore > log_store); void write_journal(repl_req_ptr_t rreq); void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx); }; diff --git a/src/lib/replication/service/generic_repl_svc.cpp b/src/lib/replication/service/generic_repl_svc.cpp index d169d4ce2..451c355dc 100644 --- a/src/lib/replication/service/generic_repl_svc.cpp +++ b/src/lib/replication/service/generic_repl_svc.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include "common/homestore_assert.hpp" #include "replication/service/generic_repl_svc.h" #include "replication/service/raft_repl_service.h" @@ -36,15 +37,20 @@ std::shared_ptr< GenericReplService > GenericReplService::create(cshared< ReplAp GenericReplService::GenericReplService(cshared< ReplApplication >& repl_app) : m_repl_app{repl_app}, m_my_uuid{repl_app->get_my_repl_id()} { + m_sb_bufs.reserve(100); meta_service().register_handler( get_meta_blk_name(), - [this](meta_blk* mblk, sisl::byte_view buf, size_t) { load_repl_dev(std::move(buf), voidptr_cast(mblk)); }, + [this](meta_blk* mblk, sisl::byte_view buf, size_t) { + m_sb_bufs.emplace_back(std::pair(std::move(buf), voidptr_cast(mblk))); + }, nullptr); } void GenericReplService::stop() { - std::unique_lock lg{m_rd_map_mtx}; - m_rd_map.clear(); + { + std::unique_lock lg{m_rd_map_mtx}; + m_rd_map.clear(); + } } ReplResult< shared< ReplDev > > GenericReplService::get_repl_dev(group_id_t group_id) const { @@ -77,10 +83,23 @@ hs_stats GenericReplService::get_cap_stats() const { SoloReplService::SoloReplService(cshared< ReplApplication >& repl_app) : GenericReplService{repl_app} {} void SoloReplService::start() { + for (auto const& [buf, mblk] : m_sb_bufs) { + load_repl_dev(buf, voidptr_cast(mblk)); + } + m_sb_bufs.clear(); + + hs()->data_service().start(); + hs()->logstore_service().start(hs()->is_first_time_boot()); + // Register to CP to flush the super blk and truncate the logstore hs()->cp_mgr().register_consumer(cp_consumer_t::REPLICATION_SVC, std::make_unique< SoloReplServiceCPHandler >()); } +void SoloReplService::stop() { + GenericReplService::stop(); + hs()->logstore_service().stop(); +} + AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t group_id, std::set< replica_id_t > const& members) { superblk< repl_dev_superblk > rd_sb{get_meta_blk_name()}; @@ -120,7 +139,7 @@ void SoloReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki { std::unique_lock lg(m_rd_map_mtx); auto [_, happened] = m_rd_map.emplace(group_id, rdev); - (void) happened; + (void)happened; HS_DBG_ASSERT(happened, "Unable to put the repl_dev in rd map for group_id={}", group_id); } } diff --git a/src/lib/replication/service/generic_repl_svc.h b/src/lib/replication/service/generic_repl_svc.h index 64b8ea47a..4169e5f80 100644 --- a/src/lib/replication/service/generic_repl_svc.h +++ b/src/lib/replication/service/generic_repl_svc.h @@ -41,6 +41,7 @@ class GenericReplService : public ReplicationService { std::shared_mutex m_rd_map_mtx; std::map< group_id_t, shared< ReplDev > > m_rd_map; replica_id_t m_my_uuid; + std::vector< std::pair< sisl::byte_view, void* > > m_sb_bufs; public: static std::shared_ptr< GenericReplService > create(cshared< ReplApplication >& repl_app); @@ -65,6 +66,7 @@ class SoloReplService : public GenericReplService { public: SoloReplService(cshared< ReplApplication >& repl_app); void start() override; + void stop() override; AsyncReplResult< shared< ReplDev > > create_repl_dev(group_id_t group_id, std::set< replica_id_t > const& members) override; diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index d25b49bc6..dde2da93c 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -17,6 +17,8 @@ #include #include +#include +#include #include "common/homestore_config.hpp" #include "common/homestore_assert.hpp" #include "replication/service/raft_repl_service.h" @@ -58,32 +60,31 @@ ReplServiceError RaftReplService::to_repl_error(nuraft::cmd_result_code code) { } RaftReplService::RaftReplService(cshared< ReplApplication >& repl_app) : GenericReplService{repl_app} { + m_config_sb_bufs.reserve(100); meta_service().register_handler( get_meta_blk_name() + "_raft_config", [this](meta_blk* mblk, sisl::byte_view buf, size_t) { - raft_group_config_found(std::move(buf), voidptr_cast(mblk)); + m_config_sb_bufs.emplace_back(std::pair(std::move(buf), voidptr_cast(mblk))); }, nullptr, false, std::optional< meta_subtype_vec_t >({get_meta_blk_name()})); } void RaftReplService::start() { - /*auto params = nuraft_mesg::Manager::Params{ + // Step 1: Initialize the Nuraft messaging service, which starts the nuraft service + auto params = nuraft_mesg::Manager::Params{ .server_uuid_ = m_my_uuid, .mesg_port_ = m_repl_app->lookup_peer(m_my_uuid).second, .default_group_type_ = "homestore_replication", .ssl_key_ = ioenvironment.get_ssl_key(), .ssl_cert_ = ioenvironment.get_ssl_cert(), .token_verifier_ = std::dynamic_pointer_cast< sisl::GrpcTokenVerifier >(ioenvironment.get_token_verifier()), - .token_client_ = std::dynamic_pointer_cast< sisl::GrpcTokenClient >(ioenvironment.get_token_client())};*/ - auto params = nuraft_mesg::Manager::Params(); - params.server_uuid_ = m_my_uuid; - params.mesg_port_ = m_repl_app->lookup_peer(m_my_uuid).second; - params.default_group_type_ = "homestore_replication"; + .token_client_ = std::dynamic_pointer_cast< sisl::GrpcTokenClient >(ioenvironment.get_token_client())}; m_msg_mgr = nuraft_mesg::init_messaging(params, weak_from_this(), true /* with_data_channel */); LOGINFOMOD(replication, "Starting RaftReplService with server_uuid={} port={}", boost::uuids::to_string(params.server_uuid_), params.mesg_port_); + // Step 2: Register all RAFT parameters. At the end of this step, raft is ready to be created/join group auto r_params = nuraft::raft_params() .with_election_timeout_lower(HS_DYNAMIC_CONFIG(consensus.elect_to_low_ms)) .with_election_timeout_upper(HS_DYNAMIC_CONFIG(consensus.elect_to_high_ms)) @@ -100,22 +101,68 @@ void RaftReplService::start() { r_params.return_method_ = nuraft::raft_params::async_handler; m_msg_mgr->register_mgr_type(params.default_group_type_, r_params); + // Step 3: Load all the repl devs from the cached superblks. This step creates the ReplDev instances and adds to + // list. It is still not joined the Raft group yet + for (auto const& [buf, mblk] : m_sb_bufs) { + load_repl_dev(buf, voidptr_cast(mblk)); + } + m_sb_bufs.clear(); + + // Step 4: Load all the raft group configs from the cached superblks. We have 2 superblks for each raft group + // a) repl_dev configuration (loaded in step 3). This block is updated on every append and persisted on next cp. + // b) raft group configuration (loaded in this step). This block is updated on every config change and persisted + // instantly + // + // We need to first load the repl_dev with its config and then attach the raft config to that repl dev. + for (auto const& [buf, mblk] : m_config_sb_bufs) { + raft_group_config_found(buf, voidptr_cast(mblk)); + } + m_config_sb_bufs.clear(); + + // Step 5: Start the data and logstore service now. This step is essential before we can ask Raft to join groups etc + hs()->data_service().start(); + hs()->logstore_service().start(hs()->is_first_time_boot()); + + // Step 6: Iterate all the repl dev and ask each one of the join the raft group. + for (auto it = m_rd_map.begin(); it != m_rd_map.end();) { + auto rdev = std::dynamic_pointer_cast< RaftReplDev >(it->second); + if (!rdev->join_group()) { + it = m_rd_map.erase(it); + } else { + ++it; + } + } + + // Step 7: Register to CPManager to ensure we can flush the superblk. hs()->cp_mgr().register_consumer(cp_consumer_t::REPLICATION_SVC, std::make_unique< RaftReplServiceCPHandler >()); } +void RaftReplService::stop() { + GenericReplService::stop(); + m_msg_mgr.reset(); + hs()->logstore_service().stop(); +} + void RaftReplService::raft_group_config_found(sisl::byte_view const& buf, void* meta_cookie) { json_superblk group_config; auto& js = group_config.load(buf, meta_cookie); + + DEBUG_ASSERT(js.contains("group_id"), "Missing group_id field in raft_config superblk"); std::string gid_str = js["group_id"]; RELEASE_ASSERT(!gid_str.empty(), "Invalid raft_group config found"); boost::uuids::string_generator gen; - uuid_t uuid = gen(gid_str); + uuid_t group_id = gen(gid_str); - auto v = get_repl_dev(uuid); - RELEASE_ASSERT(bool(v), "Not able to find the group_id corresponding, has repl_dev superblk not loaded yet?"); + auto v = get_repl_dev(group_id); + RELEASE_ASSERT(bool(v), "Can't find the group_id={}, has repl_dev superblk not loaded yet?", + boost::uuids::to_string(group_id)); - (std::dynamic_pointer_cast< RaftReplDev >(*v))->use_config(std::move(group_config)); + auto rdev = std::dynamic_pointer_cast< RaftReplDev >(*v); + auto listener = m_repl_app->create_repl_dev_listener(group_id); + listener->set_repl_dev(rdev.get()); + rdev->attach_listener(std::move(listener)); + rdev->use_config(std::move(group_config)); } std::string RaftReplService::lookup_peer(nuraft_mesg::peer_id_t const& peer) { @@ -125,6 +172,8 @@ std::string RaftReplService::lookup_peer(nuraft_mesg::peer_id_t const& peer) { shared< nuraft_mesg::mesg_state_mgr > RaftReplService::create_state_mgr(int32_t srv_id, nuraft_mesg::group_id_t const& group_id) { + LOGINFO("Creating RAFT state manager for server_id={} group_id={}", srv_id, boost::uuids::to_string(group_id)); + auto result = get_repl_dev(group_id); if (result) { return std::dynamic_pointer_cast< nuraft_mesg::mesg_state_mgr >(result.value()); } @@ -136,7 +185,12 @@ shared< nuraft_mesg::mesg_state_mgr > RaftReplService::create_state_mgr(int32_t // Create a new instance of Raft ReplDev (which is the state manager this method is looking for) auto rdev = std::make_shared< RaftReplDev >(*this, std::move(rd_sb), false /* load_existing */); - rdev->use_config(json_superblk{get_meta_blk_name() + "_raft_config"}); + + // Create a raft config for this repl_dev and assign it to the repl_dev + auto raft_config_sb = json_superblk{get_meta_blk_name() + "_raft_config"}; + (*raft_config_sb)["group_id"] = boost::uuids::to_string(group_id); + raft_config_sb.write(); + rdev->use_config(std::move(raft_config_sb)); // Attach the listener to the raft auto listener = m_repl_app->create_repl_dev_listener(group_id); @@ -170,6 +224,8 @@ AsyncReplResult< shared< ReplDev > > RaftReplService::create_repl_dev(group_id_t boost::uuids::to_string(member)); break; } else if (result.error() != nuraft::CONFIG_CHANGING) { + LOGWARN("Groupid={}, add member={} failed with error={}", boost::uuids::to_string(group_id), + boost::uuids::to_string(member), result.error()); return make_async_error< shared< ReplDev > >(to_repl_error(result.error())); } else { LOGWARN("Config is changing for group_id={} while adding member={}, retry operation in a second", @@ -205,14 +261,6 @@ void RaftReplService::load_repl_dev(sisl::byte_view const& buf, void* meta_cooki // Create an instance of ReplDev from loaded superblk auto rdev = std::make_shared< RaftReplDev >(*this, std::move(rd_sb), true /* load_existing */); - // Try to join the RAFT group - auto raft_result = m_msg_mgr->join_group(group_id, "homestore_replication", - std::dynamic_pointer_cast< nuraft_mesg::mesg_state_mgr >(rdev)); - if (!raft_result) { - HS_DBG_ASSERT(false, "Unable to join the group_id={} with error={}", boost::uuids::to_string(group_id).c_str(), - raft_result.error()); - } - // Add the RaftReplDev to the list of repl_devs add_repl_dev(group_id, rdev); } diff --git a/src/lib/replication/service/raft_repl_service.h b/src/lib/replication/service/raft_repl_service.h index fa12cd07e..48d496a23 100644 --- a/src/lib/replication/service/raft_repl_service.h +++ b/src/lib/replication/service/raft_repl_service.h @@ -37,6 +37,7 @@ class RaftReplService : public GenericReplService, private: shared< nuraft_mesg::Manager > m_msg_mgr; json_superblk m_config_sb; + std::vector< std::pair< sisl::byte_view, void* > > m_config_sb_bufs; public: RaftReplService(cshared< ReplApplication >& repl_app); @@ -52,6 +53,8 @@ class RaftReplService : public GenericReplService, protected: ///////////////////// Overrides of GenericReplService //////////////////// void start() override; + void stop() override; + AsyncReplResult< shared< ReplDev > > create_repl_dev(group_id_t group_id, std::set< replica_id_t > const& members) override; void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override; diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index b108358dc..2cb946bd8 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -173,12 +173,6 @@ class HSTestHelper { vdev_size_type_t vdev_size_type{vdev_size_type_t::VDEV_SIZE_STATIC}; }; -#if 0 - static void start_homestore(const std::string& test_name, float meta_pct, float data_log_pct, float ctrl_log_pct, - float data_pct, float index_pct, hs_before_services_starting_cb_t cb, - bool restart = false, std::unique_ptr< IndexServiceCallbacks > index_svc_cb = nullptr, - bool default_data_svc_alloc_type = true); -#endif static void start_homestore(const std::string& test_name, std::map< uint32_t, test_params >&& svc_params, hs_before_services_starting_cb_t cb = nullptr, bool fake_restart = false, bool init_device = true) { @@ -190,6 +184,7 @@ class HSTestHelper { if (fake_restart) { shutdown_homestore(false); + // sisl::GrpcAsyncClientWorker::shutdown_all(); std::this_thread::sleep_for(std::chrono::seconds{5}); } diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 19dec9959..005ea97db 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -59,6 +59,7 @@ class HSReplTestHelper { struct IPCData { bip::interprocess_mutex mtx_; bip::interprocess_condition cv_; + bip::interprocess_mutex exec_mtx_; repl_test_phase_t phase_{repl_test_phase_t::REGISTER}; uint32_t registered_count_{0}; @@ -79,8 +80,10 @@ class HSReplTestHelper { if (count == SISL_OPTIONS["replicas"].as< uint32_t >()) { phase_ = new_phase; cv_.notify_all(); - } else + } else { cv_.wait(lg, [this, new_phase]() { return (phase_ == new_phase); }); + } + count = 0; } }; @@ -123,6 +126,7 @@ class HSReplTestHelper { void setup() { replica_num_ = SISL_OPTIONS["replica_num"].as< uint16_t >(); sisl::logging::SetLogger(name_ + std::string("_replica_") + std::to_string(replica_num_)); + sisl::logging::SetLogPattern("[%D %T%z] [%^%L%$] [%n] [%t] %v"); auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >(); boost::uuids::string_generator gen; @@ -149,7 +153,8 @@ class HSReplTestHelper { for (uint32_t i{1}; i < num_replicas; ++i) { LOGINFO("Spawning Homestore replica={} instance", i); - boost::process::child c(argv_[0], "--replica_num", std::to_string(i), proc_grp_); + boost::process::child c(argv_[0], "--log_mods", "replication:trace", "--replica_num", std::to_string(i), + proc_grp_); c.detach(); } } else { @@ -182,6 +187,27 @@ class HSReplTestHelper { setup(); } + void restart() { + test_common::HSTestHelper::start_homestore( + name_ + std::to_string(replica_num_), + {{HS_SERVICE::REPLICATION, {.repl_app = std::make_unique< TestReplApplication >(*this)}}, + {HS_SERVICE::LOG_REPLICATED, {}}, + {HS_SERVICE::LOG_LOCAL, {}}}, + nullptr, true /* restart */); + } + + void restart_one_by_one() { + exclusive_replica([&]() { + LOGINFO("Restarting Homestore replica={}", replica_num_); + test_common::HSTestHelper::start_homestore( + name_ + std::to_string(replica_num_), + {{HS_SERVICE::REPLICATION, {.repl_app = std::make_unique< TestReplApplication >(*this)}}, + {HS_SERVICE::LOG_REPLICATED, {}}, + {HS_SERVICE::LOG_LOCAL, {}}}, + nullptr, true /* restart */); + }); + } + uint16_t replica_num() const { return replica_num_; } Runner& runner() { return io_runner_; } @@ -222,12 +248,24 @@ class HSReplTestHelper { return listener; } + void unregister_listener(shared< ReplDevListener > listener) { + { + std::unique_lock lg(groups_mtx_); + repl_groups_.erase(listener->repl_dev()->group_id()); + } + } + void sync_for_test_start() { ipc_data_->sync_for_test_start(); } void sync_for_verify_start() { ipc_data_->sync_for_verify_start(); } void sync_for_cleanup_start() { ipc_data_->sync_for_cleanup_start(); } void sync_dataset_size(uint64_t dataset_size) { ipc_data_->test_dataset_size_ = dataset_size; } uint64_t dataset_size() const { return ipc_data_->test_dataset_size_; } + void exclusive_replica(std::function< void() > const& f) { + std::unique_lock< bip::interprocess_mutex > lg(ipc_data_->exec_mtx_); + f(); + } + void check_and_kill(int port) { std::string command = "lsof -t -i:" + std::to_string(port); if (system(command.c_str())) { diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 2cfb94b74..bf71cfa66 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -442,10 +442,9 @@ class SampleDB { if (restart) { for (uint32_t i{0}; i < n_log_stores; ++i) { SampleLogStoreClient* client = m_log_store_clients[i].get(); - logstore_service().open_log_store(client->m_family, client->m_store_id, false /* append_mode */, - [i, this, client](std::shared_ptr< HomeLogStore > log_store) { - client->set_log_store(log_store); - }); + logstore_service() + .open_log_store(client->m_family, client->m_store_id, false /* append_mode */) + .thenValue([i, this, client](auto log_store) { client->set_log_store(log_store); }); } } }, diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index b42f04d94..2ffaf2d70 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -45,8 +45,10 @@ SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) SISL_OPTION_GROUP(test_raft_repl_dev, (block_size, "", "block_size", "block size to io", - ::cxxopts::value< uint32_t >()->default_value("4096"), "number")); -SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, test_common_setup, test_repl_common_setup) + ::cxxopts::value< uint32_t >()->default_value("4096"), "number"), + (num_raft_groups, "", "num_raft_groups", "number of raft groups per test", + ::cxxopts::value< uint32_t >()->default_value("1"), "number")); +SISL_OPTIONS_ENABLE(logging, test_raft_repl_dev, iomgr, config, test_common_setup, test_repl_common_setup) static std::unique_ptr< test_common::HSReplTestHelper > g_helper; @@ -100,7 +102,6 @@ class TestReplicatedDB : public homestore::ReplDevListener { void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, MultiBlkId const& blkids, cintrusive< repl_req_ctx >& ctx) override { - LOGINFO("[Replica={}] Received commit on lsn={}", g_helper->replica_num(), lsn); ASSERT_EQ(header.size(), sizeof(test_req::journal_header)); auto jheader = r_cast< test_req::journal_header const* >(header.cbytes()); @@ -108,6 +109,9 @@ class TestReplicatedDB : public homestore::ReplDevListener { Value v{ .lsn_ = lsn, .data_size_ = jheader->data_size, .data_pattern_ = jheader->data_pattern, .blkid_ = blkids}; + LOGINFO("[Replica={}] Received commit on lsn={} key={} value[blkid={} pattern={}]", g_helper->replica_num(), + lsn, k.id_, v.blkid_.to_string(), v.data_pattern_); + { std::unique_lock lk(db_mtx_); inmem_db_.insert_or_assign(k, v); @@ -127,6 +131,12 @@ class TestReplicatedDB : public homestore::ReplDevListener { LOGINFO("[Replica={}] Received rollback on lsn={}", g_helper->replica_num(), lsn); } + void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override { + LOGINFO("[Replica={}] Received error={} on key={}", g_helper->replica_num(), enum_name(error), + *(r_cast< uint64_t const* >(key.cbytes()))); + } + blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override { return blk_alloc_hints{}; } @@ -134,9 +144,10 @@ class TestReplicatedDB : public homestore::ReplDevListener { void on_replica_stop() override {} void db_write(uint64_t data_size, uint32_t max_size_per_iov) { + static std::atomic< uint32_t > s_uniq_num{0}; auto req = intrusive< test_req >(new test_req()); req->jheader.data_size = data_size; - req->jheader.data_pattern = ((long long)rand() << 32) | rand(); + req->jheader.data_pattern = ((long long)rand() << 32) | ++s_uniq_num; auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); if (data_size != 0) { @@ -171,6 +182,8 @@ class TestReplicatedDB : public homestore::ReplDevListener { v.data_pattern_); iomanager.iobuf_free(uintptr_cast(iov.iov_base)); } + LOGINFO("Validated successfully key={} value[blkid={} pattern={}]", k.id_, v.blkid_.to_string(), + v.data_pattern_); g_helper->runner().next_task(); }); }); @@ -191,9 +204,11 @@ class RaftReplDevTest : public testing::Test { public: void SetUp() override { // By default it will create one db - auto db = std::make_shared< TestReplicatedDB >(); - g_helper->register_listener(db); - dbs_.emplace_back(std::move(db)); + for (uint32_t i{0}; i < SISL_OPTIONS["num_raft_groups"].as< uint32_t >(); ++i) { + auto db = std::make_shared< TestReplicatedDB >(); + g_helper->register_listener(db); + dbs_.emplace_back(std::move(db)); + } } void generate_writes(uint64_t data_size, uint32_t max_size_per_iov) { @@ -220,28 +235,59 @@ class RaftReplDevTest : public testing::Test { TestReplicatedDB& pick_one_db() { return *dbs_[0]; } + void switch_all_db_leader() { + for (auto const& db : dbs_) { + do { + auto result = db->repl_dev()->become_leader().get(); + if (result.hasError()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } else { + break; + } + } while (true); + } + } + private: std::vector< std::shared_ptr< TestReplicatedDB > > dbs_; }; -TEST_F(RaftReplDevTest, All_Append) { +TEST_F(RaftReplDevTest, All_Append_Restart_Append) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); + uint64_t exp_entries = SISL_OPTIONS["num_io"].as< uint64_t >(); if (g_helper->replica_num() == 0) { - g_helper->sync_dataset_size(SISL_OPTIONS["num_io"].as< uint64_t >()); auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size); g_helper->runner().set_task([this, block_size]() { this->generate_writes(block_size, block_size); }); g_helper->runner().execute().get(); } - - this->wait_for_all_writes(g_helper->dataset_size()); + this->wait_for_all_writes(exp_entries); g_helper->sync_for_verify_start(); LOGINFO("Validate all data written so far by reading them"); this->validate_all_data(); + g_helper->sync_for_cleanup_start(); + LOGINFO("Restart all the homestore replicas"); + g_helper->restart(); + g_helper->sync_for_test_start(); + + exp_entries += SISL_OPTIONS["num_io"].as< uint64_t >(); + if (g_helper->replica_num() == 0) { + LOGINFO("Switch the leader to replica_num = 0"); + this->switch_all_db_leader(); + + LOGINFO("Post restart write the data again"); + auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); + g_helper->runner().set_task([this, block_size]() { this->generate_writes(block_size, block_size); }); + g_helper->runner().execute().get(); + } + this->wait_for_all_writes(exp_entries); + + LOGINFO("Validate all data written (including pre-restart data) by reading them"); + this->validate_all_data(); g_helper->sync_for_cleanup_start(); } @@ -250,7 +296,8 @@ int main(int argc, char* argv[]) { char** orig_argv = argv; ::testing::InitGoogleTest(&parsed_argc, argv); - SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_raft_repl_dev, iomgr, test_common_setup, test_repl_common_setup); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, config, test_raft_repl_dev, iomgr, test_common_setup, + test_repl_common_setup); FLAGS_folly_global_cpu_executor_threads = 4; g_helper = std::make_unique< test_common::HSReplTestHelper >("test_raft_repl_dev", orig_argv); diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 492a8006a..90b5bcca0 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -121,6 +121,10 @@ class SoloReplDevTest : public testing::Test { return blk_alloc_hints{}; } + void on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, + cintrusive< repl_req_ctx >& ctx) override { + LOGINFO("Received error={} on repl_dev", enum_name(error)); + } void on_replica_stop() override {} };