diff --git a/src/include/homestore/logstore_service.hpp b/src/include/homestore/logstore_service.hpp index 9eb971eea..4baede278 100644 --- a/src/include/homestore/logstore_service.hpp +++ b/src/include/homestore/logstore_service.hpp @@ -158,6 +158,8 @@ class LogStoreService { uint32_t used_size() const; uint32_t total_size() const; iomgr::io_fiber_t flush_thread() { return m_flush_fiber; } + + // called by LogDev truncate; iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; } private: diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index d42ccc7ee..9430349f0 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -255,6 +255,8 @@ class ReplDev { /// @return Block size virtual uint32_t get_blk_size() const = 0; + virtual void truncate_if_needed() = 0; + virtual void attach_listener(shared< ReplDevListener > listener) { m_listener = std::move(listener); } virtual void detach_listener() { diff --git a/src/include/homestore/replication_service.hpp b/src/include/homestore/replication_service.hpp index 19ee11701..c44ec777c 100644 --- a/src/include/homestore/replication_service.hpp +++ b/src/include/homestore/replication_service.hpp @@ -20,7 +20,6 @@ VENUM(repl_impl_type, uint8_t, solo // For single node - no replication ); - class ReplApplication; class ReplicationService { @@ -53,6 +52,8 @@ class ReplicationService { virtual hs_stats get_cap_stats() const = 0; virtual meta_sub_type get_meta_blk_name() const = 0; + + virtual void resource_audit() = 0; }; //////////////// Application which uses Replication needs to be provide the following callbacks //////////////// diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index fba5a6099..7915a9fd8 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -37,7 +37,7 @@ CPManager::CPManager() : nullptr); resource_mgr().register_dirty_buf_exceed_cb( - [this]([[maybe_unused]] int64_t dirty_buf_count) { this->trigger_cp_flush(false /* false */); }); + [this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) { this->trigger_cp_flush(false /* force */); }); start_cp_thread(); } diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 7be5f659b..3dfa0add1 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -158,9 +158,15 @@ table ResourceLimits { /* precentage of memory used during recovery */ memory_in_recovery_precent: uint32 = 40; - /* journal size used percentage */ - journal_size_percent: uint32 = 50; + /* journal size used percentage high watermark -- trigger cp */ + journal_vdev_size_percent: uint32 = 50; + /* journal size used percentage critical watermark -- trigger truncation */ + journal_vdev_size_percent_critical: uint32 = 90; + + /* logdev num entries that will trigger mark this ready for truncation */ + logdev_num_log_entries_threadhold: uint32 = 2000000(hotswap); + /* We crash if volume is 95 percent filled and no disk space left */ vol_threshhold_used_size_p: uint32 = 95; } @@ -199,8 +205,8 @@ table Consensus { heartbeat_period_ms: uint32 = 250; // Re-election timeout low and high mark - elect_to_low_ms: uint32 = 900; - elect_to_high_ms: uint32 = 1400; + elect_to_low_ms: uint32 = 800; + elect_to_high_ms: uint32 = 1700; // When a new member is being synced, the batch size of number of logs to be shipped log_sync_batch_size: int32 = 100; @@ -228,6 +234,8 @@ table Consensus { // data fetch max size limit in MB data_fetch_max_size_mb: uint32 = 2; + + } table HomeStoreSettings { diff --git a/src/lib/common/resource_mgr.cpp b/src/lib/common/resource_mgr.cpp index 71a2e97d4..532cd5a83 100644 --- a/src/lib/common/resource_mgr.cpp +++ b/src/lib/common/resource_mgr.cpp @@ -20,7 +20,26 @@ namespace homestore { ResourceMgr& resource_mgr() { return hs()->resource_mgr(); } -void ResourceMgr::set_total_cap(uint64_t total_cap) { m_total_cap = total_cap; } +void ResourceMgr::start(uint64_t total_cap) { + m_total_cap = total_cap; + start_timer(); +} + +void ResourceMgr::start_timer() { + auto const res_mgr_timer_us = HS_DYNAMIC_CONFIG(generic.res_mgr_timer_us); + LOGINFO("resource audit timer is set to {} usec", res_mgr_timer_us); + + m_res_audit_timer_hdl = iomanager.schedule_global_timer( + res_mgr_timer_us * 1000, true /* recurring */, nullptr /* cookie */, iomgr::reactor_regex::all_worker, + [this](void*) { + // all resource timely audit routine should arrive here; + hs()->logstore_service().device_truncate(); + + // TODO: add device_truncate callback to audit how much space was freed per each LogDev and add related + // metrics; + }, + true /* wait_to_schedule */); +} /* monitor dirty buffer count */ void ResourceMgr::inc_dirty_buf_size(const uint32_t size) { @@ -106,22 +125,37 @@ uint64_t ResourceMgr::get_cache_size() const { return ((HS_STATIC_CONFIG(input.io_mem_size()) * HS_DYNAMIC_CONFIG(resource_limits.cache_size_percent)) / 100); } -/* monitor journal size */ -bool ResourceMgr::check_journal_size(const uint64_t used_size, const uint64_t total_size) { - if (m_journal_exceed_cb) { +bool ResourceMgr::check_journal_descriptor_size(const uint64_t used_size) { + return (used_size >= get_journal_descriptor_size_limit()); +} + +/* monitor journal vdev size */ +bool ResourceMgr::check_journal_vdev_size(const uint64_t used_size, const uint64_t total_size) { + if (m_journal_vdev_exceed_cb) { const uint32_t used_pct = (100 * used_size / total_size); - if (used_pct >= HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent)) { - m_journal_exceed_cb(used_size); + if (used_pct >= get_journal_vdev_size_limit()) { + m_journal_vdev_exceed_cb(used_size, used >= get_journal_vdev_size_critical_limit() /* is_critical */); HS_LOG_EVERY_N(WARN, base, 50, "high watermark hit, used percentage: {}, high watermark percentage: {}", - used_pct, HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent)); + used_pct, get_journal_vdev_size_limit()); return true; } } return false; } -void ResourceMgr::register_journal_exceed_cb(exceed_limit_cb_t cb) { m_journal_exceed_cb = std::move(cb); } -uint32_t ResourceMgr::get_journal_size_limit() const { return HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent); } +void ResourceMgr::register_journal_vdev_exceed_cb(exceed_limit_cb_t cb) { m_journal_vdev_exceed_cb = std::move(cb); } + +uint32_t ResourceMgr::get_journal_descriptor_size_limit() const { + return HS_DYNAMIC_CONFIG(resource_limits.journal_descriptor_size_threshold_mb) * 1024 * 1024; +} + +uint32_t ResourceMgr::get_journal_vdev_size_critical_limit() const { + return HS_DYNAMIC_CONFIG(resource_limits.journal_vdev_size_percent_critical); +} + +uint32_t ResourceMgr::get_journal_vdev_size_limit() const { + return HS_DYNAMIC_CONFIG(resource_limits.journal_vdev_size_percent); +} /* monitor chunk size */ void ResourceMgr::check_chunk_free_size_and_trigger_cp(uint64_t free_size, uint64_t alloc_size) {} diff --git a/src/lib/common/resource_mgr.hpp b/src/lib/common/resource_mgr.hpp index 54fc459b6..d57c65ed2 100644 --- a/src/lib/common/resource_mgr.hpp +++ b/src/lib/common/resource_mgr.hpp @@ -39,12 +39,12 @@ class RsrcMgrMetrics : public sisl::MetricsGroup { ~RsrcMgrMetrics() { deregister_me_from_farm(); } }; -typedef std::function< void(int64_t /* dirty_buf_cnt */) > exceed_limit_cb_t; +typedef std::function< void(int64_t /* dirty_buf_cnt */, bool /* critical */ = false) > exceed_limit_cb_t; const uint32_t max_qd_multiplier = 32; class ResourceMgr { public: - void set_total_cap(uint64_t total_cap); + void start(uint64_t total_cap); /* monitor dirty buffer count */ void inc_dirty_buf_size(const uint32_t size); @@ -76,10 +76,11 @@ class ResourceMgr { uint64_t get_cache_size() const; /* monitor journal size */ - bool check_journal_size(const uint64_t used_size, const uint64_t total_size); - void register_journal_exceed_cb(exceed_limit_cb_t cb); + bool check_journal_vdev_size(const uint64_t used_size, const uint64_t total_size); + void register_journal_vdev_exceed_cb(exceed_limit_cb_t cb); - uint32_t get_journal_size_limit() const; + uint32_t get_journal_vdev_size_limit() const; + uint32_t get_journal_vdev_size_critical_limit() const; /* monitor chunk size */ void check_chunk_free_size_and_trigger_cp(uint64_t free_size, uint64_t alloc_size); @@ -92,7 +93,9 @@ class ResourceMgr { private: int64_t get_dirty_buf_limit() const; + void start_timer(); +private: std::atomic< int64_t > m_hs_dirty_buf_cnt; std::atomic< int64_t > m_hs_fb_cnt; // free count std::atomic< int64_t > m_hs_fb_size; // free size @@ -100,10 +103,14 @@ class ResourceMgr { std::atomic< int64_t > m_memory_used_in_recovery; std::atomic< uint32_t > m_flush_dirty_buf_q_depth{64}; uint64_t m_total_cap; + + // TODO: make it event_cb exceed_limit_cb_t m_dirty_buf_exceed_cb; exceed_limit_cb_t m_free_blks_exceed_cb; - exceed_limit_cb_t m_journal_exceed_cb; + exceed_limit_cb_t m_journal_vdev_exceed_cb; RsrcMgrMetrics m_metrics; + + iomgr::timer_handle_t m_res_audit_timer_hdl; }; extern ResourceMgr& resource_mgr(); diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index d6063ae54..2b1f832c1 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -51,6 +51,15 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo return private_blob; }, m_vdev_info.hs_dev_type, m_vdev_info.vdev_id, m_vdev_info.chunk_size}); + + resource_mgr().register_journal_vdev_exceed_cb([this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) { + this->trigger_cp_flush(false /* force */); + + if (critical) { + // call resource autit to replicaiton service to free up some space immediately + hs()->repl_service().resource_audit(); + } + }); } JournalVirtualDev::~JournalVirtualDev() {} @@ -561,6 +570,21 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { m_write_sz_in_total.fetch_sub(size_to_truncate, std::memory_order_relaxed); m_truncate_done = true; + // + // Conceptually in rare case(not poosible for NuObject, possibly true for NuBlox2.0) truncate itself can't garunteen + // the space is freed up upto satisfy resource manager. e.g. multiple log stores on this same descriptor and one + // logstore lagging really behind and not able to truncate much space. Doing multiple truncation won't help in this + // case. + // + // In this rare case, the next write on this descrptor will set ready flag again. + // + // And any write on any other descriptor will trigger a high_watermark_check, and if it were to trigger critial + // alert on this vdev, truncation will be made immediately on all descriptors; + // + // If still no space can be freed, there is nothing we can't here to back pressure to above layer by rejecting log + // writes on this descriptor; + // + unset_ready_for_truncate(); HS_PERIODIC_LOG(DEBUG, journalvdev, "After truncate desc {}", to_string()); } @@ -625,8 +649,18 @@ bool JournalVirtualDev::Descriptor::is_offset_at_last_chunk(off_t bytes_offset) return false; } +// +// This API is ways called in single thread +// void JournalVirtualDev::Descriptor::high_watermark_check() { - if (resource_mgr().check_journal_size(used_size(), size())) { + // high watermark check for the individual journal descriptor; + if (resource_mgr()->check_journal_descriptor_size(used_size())) { + // the next resource manager audit will call truncation for this descriptor; + set_ready_for_truncation(); + } + + // high watermark check for the entire journal vdev; + if (resource_mgr().check_journal_vdev_size(m_vdev.used_size(), m_vdev.size())) { COUNTER_INCREMENT(m_vdev.m_metrics, vdev_high_watermark_count, 1); if (m_vdev.m_event_cb && m_truncate_done) { diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 18bc9608d..fd7e896b1 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -69,6 +69,7 @@ class JournalVirtualDev : public VirtualDev { uint64_t m_total_size{0}; // Total size of all chunks. off_t m_end_offset{0}; // Offset right to window. Never reduced. Increased in multiple of chunk size. bool m_end_offset_set{false}; // Adjust the m_end_offset only once during init. + std::atomic< bool > m_ready_for_truncate{false}; // reset by truncation thread and set by append thread; friend class JournalVirtualDev; public: @@ -78,16 +79,21 @@ class JournalVirtualDev : public VirtualDev { // Create and append the chunk to m_journal_chunks. void append_chunk(); + void set_ready_for_truncate() { m_ready_for_truncate.store(true, std::memory_order_relaxed); } + + void unset_ready_for_truncate() { m_ready_for_truncate.store(false, std::memory_order_relaxed); } + /** * @brief : allocate space specified by input size. + * this API will always be called in single thread; * * @param size : size to be allocated * * @return : the start unique offset of the allocated space * * Possible calling sequence: - * offset_1 = reserve(size1); - * offset_2 = reserve(size2); + * offset_1 = alloc_next_append_blk(size1); + * offset_2 = alloc_next_append_blk(size2); * write_at_offset(offset_2); * write_at_offset(offset_1); */ diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index 6438986d3..0adfec16c 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -205,7 +205,6 @@ void HomeStore::do_start() { m_meta_service->start(m_dev_mgr->is_first_time_boot()); m_cp_mgr->start(is_first_time_boot()); - m_resource_mgr->set_total_cap(m_dev_mgr->total_capacity()); if (has_index_service()) { m_index_service->start(); } @@ -221,6 +220,8 @@ void HomeStore::do_start() { } m_cp_mgr->start_timer(); + + m_resource_mgr->start(m_dev_mgr->total_capacity()); m_init_done = true; } diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 541c54768..593468096 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -683,25 +683,27 @@ void LogDev::remove_log_store(logstore_id_t store_id) { } void LogDev::device_truncate_under_lock(const std::shared_ptr< truncate_req >& treq) { - run_under_flush_lock([this, treq]() { - iomanager.run_on_forget(logstore_service().truncate_thread(), [this, treq]() { - const logdev_key trunc_upto = do_device_truncate(treq->dry_run); - bool done{false}; - if (treq->cb || treq->wait_till_done) { - { - std::lock_guard< std::mutex > lk{treq->mtx}; - done = (--treq->trunc_outstanding == 0); - treq->m_trunc_upto_result[m_logdev_id] = trunc_upto; + if (m_vdev_jd->ready_for_truncate()) { + run_under_flush_lock([this, treq]() { + iomanager.run_on_forget(logstore_service().truncate_thread(), [this, treq]() { + const logdev_key trunc_upto = do_device_truncate(treq->dry_run); + bool done{false}; + if (treq->cb || treq->wait_till_done) { + { + std::lock_guard< std::mutex > lk{treq->mtx}; + done = (--treq->trunc_outstanding == 0); + treq->m_trunc_upto_result[m_logdev_id] = trunc_upto; + } } - } - if (done) { - if (treq->cb) { treq->cb(treq->m_trunc_upto_result); } - if (treq->wait_till_done) { treq->cv.notify_one(); } - } - unlock_flush(); + if (done) { + if (treq->cb) { treq->cb(treq->m_trunc_upto_result); } + if (treq->wait_till_done) { treq->cv.notify_one(); } + } + unlock_flush(); + }); + return false; // Do not release the flush lock yet, the scheduler will unlock it. }); - return false; // Do not release the flush lock yet, the scheduler will unlock it. - }); + } } void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk& sb) { @@ -782,6 +784,12 @@ void LogDev::on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in } } +uint32_t LogDev::get_reserved_log_truncation_idx() const { + // TODO: are there any holes between m_log_idx and m_last_truncate_idx; + auto const total_in_use_ids = m_log_idx.load() - m_last_truncate_idx; + return std::min(total_in_use_ids, HS_DYNAMIC_CONFIG(resource_limits.logdev_num_log_entries_threadhold)); +} + logdev_key LogDev::do_device_truncate(bool dry_run) { static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_min_trunc_stores; static thread_local std::vector< std::shared_ptr< HomeLogStore > > m_non_participating_stores; @@ -828,6 +836,8 @@ logdev_key LogDev::do_device_truncate(bool dry_run) { return min_safe_ld_key; } + min_safe_ld_key = std::min(min_safe_ld_key.idx, get_reserved_log_truncation_idx()); + // Got the safest log id to truncate and actually truncate upto the safe log idx to the log device if (!dry_run) { truncate(min_safe_ld_key); } HS_PERIODIC_LOG(INFO, logstore, diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index f356102a0..a9ec55b5c 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -743,14 +743,6 @@ class LogDev : public std::enable_shared_from_this< LogDev > { */ void unlock_flush(bool do_flush = true); - /** - * @brief : truncate up to input log id; - * - * @param key : the key containing log id that needs to be truncate up to; - * @return number of records to truncate - */ - uint64_t truncate(const logdev_key& key); - /** * @brief Rollback the logid range specific to the given store id. This method persists the information * synchronously to the underlying storage. Once rolledback those logids in this range are ignored (only for @@ -793,11 +785,20 @@ class LogDev : public std::enable_shared_from_this< LogDev > { log_buffer buf, uint32_t nremaining_in_batch); void on_batch_completion(HomeLogStore* log_store, uint32_t nremaining_in_batch, logdev_key flush_ld_key); void device_truncate_under_lock(const std::shared_ptr< truncate_req >& treq); - logdev_key do_device_truncate(bool dry_run = false); void handle_unopened_log_stores(bool format); logdev_id_t get_id() { return m_logdev_id; } private: + /** + * @brief : truncate up to input log id; + * + * @param key : the key containing log id that needs to be truncate up to; + * @return number of records to truncate + */ + uint64_t truncate(const logdev_key& key); + + logdev_key do_device_truncate(bool dry_run = false); + LogGroup* make_log_group(uint32_t estimated_records) { m_log_group_pool[m_log_group_idx].reset(estimated_records); return &m_log_group_pool[m_log_group_idx]; @@ -823,6 +824,8 @@ class LogDev : public std::enable_shared_from_this< LogDev > { void set_flush_status(bool flush_status); bool get_flush_status(); + logid_t get_reserved_log_truncation_idx() const; + private: std::unique_ptr< sisl::StreamTracker< log_record > > m_log_records; // The container which stores all in-memory log records diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 48023f0e5..8e3b1b1bc 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -287,7 +287,7 @@ const truncation_info& HomeLogStore::pre_device_truncation() { // NOTE: This method assumes the flush lock is already acquired by the caller void HomeLogStore::post_device_truncation(const logdev_key& trunc_upto_loc) { - if (trunc_upto_loc.idx >= m_safe_truncation_boundary.ld_key.idx) { + if (trunc_upto_loc.idx >= m_safe_truncation_boundary.ld_key.idx) { // ???: Why it is >=, not == ? // This method is expected to be called always with this m_safe_truncation_boundary.pending_dev_truncation = false; m_safe_truncation_boundary.ld_key = trunc_upto_loc; diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 68f08d275..0a1796b92 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -106,7 +106,7 @@ void LogStoreService::start(bool format) { } void LogStoreService::stop() { - device_truncate(nullptr, true, false); + // device_truncate(nullptr, true, false); for (auto& [id, logdev] : m_id_logdev_map) { logdev->stop(); } @@ -238,6 +238,7 @@ void LogStoreService::device_truncate(const device_truncate_cb_t& cb, bool wait_ for (auto& [id, logdev] : m_id_logdev_map) { logdev->device_truncate_under_lock(treq); } + if (treq->wait_till_done) { std::unique_lock< std::mutex > lk{treq->mtx}; treq->cv.wait(lk, [&] { return (treq->trunc_outstanding == 0); }); diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 41b2ba1c4..f63dc4dc8 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -249,7 +249,7 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) { } } m_log_store->flush_sync(to_store_lsn(compact_lsn)); - m_log_store->truncate(to_store_lsn(compact_lsn)); + // m_log_store->truncate(to_store_lsn(compact_lsn)); return true; } diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index 1020258ba..8041bf8f2 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -86,4 +86,8 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); } +bool ReplLogStore::compact(ulong last_lsn) { + m_rd.on_compact(last_lsn); + return HomeRaftLogStore::compact(last_lsn); +} } // namespace homestore diff --git a/src/lib/replication/log_store/repl_log_store.h b/src/lib/replication/log_store/repl_log_store.h index c2fb615f2..1ae0b2826 100644 --- a/src/lib/replication/log_store/repl_log_store.h +++ b/src/lib/replication/log_store/repl_log_store.h @@ -22,9 +22,11 @@ class ReplLogStore : public HomeRaftLogStore { ReplLogStore(RaftReplDev& rd, RaftStateMachine& sm, Args&&... args) : HomeRaftLogStore{std::forward< Args >(args)...}, m_rd{rd}, m_sm{sm} {} + //////////////////////// function override //////////////////////// uint64_t append(nuraft::ptr< nuraft::log_entry >& entry) override; void write_at(ulong index, nuraft::ptr< nuraft::log_entry >& entry) override; void end_of_append_batch(ulong start_lsn, ulong count) override; + bool compact(ulong last_lsn) override; private: std::string rdev_name() const; diff --git a/src/lib/replication/repl_dev/common.h b/src/lib/replication/repl_dev/common.h index a39e44c12..5aec5e9b5 100644 --- a/src/lib/replication/repl_dev/common.h +++ b/src/lib/replication/repl_dev/common.h @@ -64,7 +64,8 @@ struct repl_dev_superblk { logdev_id_t logdev_id; logstore_id_t logstore_id; // Logstore id for the data journal int64_t commit_lsn; // LSN upto which this replica has committed - int64_t checkpoint_lsn; // LSN upto which this replica have checkpointed the data + int64_t checkpoint_lsn; // LSN upto which this replica have checkpointed the Data + int64_t compact_lsn; // maximum LSN that can be compacted to uint64_t group_ordinal; // Ordinal number which will be used to indicate the rdevXYZ for debugging uint64_t get_magic() const { return magic; } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 73ebf9fd1..707f9a6c1 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -867,12 +867,14 @@ void RaftReplDev::report_committed(repl_req_ptr_t rreq) { } void RaftReplDev::cp_flush(CP*) { - auto lsn = m_commit_upto_lsn.load(); + auto const lsn = m_commit_upto_lsn.load(); + auto const clsn = m_compact_lsn.load(); + if (lsn == m_last_flushed_commit_lsn) { // Not dirtied since last flush ignore return; } - + m_rd_sb->compact_lsn = clsn; m_rd_sb->commit_lsn = lsn; m_rd_sb->checkpoint_lsn = lsn; m_rd_sb->last_applied_dsn = m_next_dsn.load(); @@ -881,4 +883,7 @@ void RaftReplDev::cp_flush(CP*) { } void RaftReplDev::cp_cleanup(CP*) {} +#if 0 +void RaftReplDev::truncate_if_needed() { m_data_journal->truncate_if_needed(); } +#endif } // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index e7e56c1ef..a07e1b346 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -75,6 +75,8 @@ class RaftReplDev : public ReplDev, raft_repl_dev_superblk m_sb_in_mem; // Cached version which is used to read and for staging std::atomic< repl_lsn_t > m_commit_upto_lsn{0}; // LSN which was lastly written, to track flushes + std::atomic< repl_lsn_t > m_compact_lsn{0}; // LSN upto which it was compacted, it is used to track where to + // maximum lsn the data journal can truncate to; repl_lsn_t m_last_flushed_commit_lsn{0}; // LSN upto which it was flushed to persistent store iomgr::timer_handle_t m_sb_flush_timer_hdl; @@ -114,6 +116,8 @@ class RaftReplDev : public ReplDev, uint32_t get_blk_size() const override; repl_lsn_t get_last_commit_lsn() const { return m_commit_upto_lsn.load(); } + // void truncate_if_needed() override; + //////////////// Accessor/shortcut methods /////////////////////// nuraft_mesg::repl_service_ctx* group_msg_service(); nuraft::raft_server* raft_server(); @@ -128,6 +132,11 @@ class RaftReplDev : public ReplDev, void cp_flush(CP* cp); void cp_cleanup(CP* cp); + /// @brief This method is called when the data journal is compacted + /// + /// @param upto_lsn : LSN upto which the data journal was compacted + void on_compact(repl_lsn_t upto_lsn) { m_compact_lsn.store(upto_lsn); } + protected: //////////////// All nuraft::state_mgr overrides /////////////////////// nuraft::ptr< nuraft::cluster_config > load_config() override; diff --git a/src/lib/replication/service/generic_repl_svc.h b/src/lib/replication/service/generic_repl_svc.h index e55ac3f05..343159fa1 100644 --- a/src/lib/replication/service/generic_repl_svc.h +++ b/src/lib/replication/service/generic_repl_svc.h @@ -57,6 +57,8 @@ class GenericReplService : public ReplicationService { hs_stats get_cap_stats() const override; replica_id_t get_my_repl_uuid() const { return m_my_uuid; } + void resource_audit() override; + protected: virtual void add_repl_dev(group_id_t group_id, shared< ReplDev > rdev); virtual void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) = 0; @@ -73,7 +75,6 @@ class SoloReplService : public GenericReplService { void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override; AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out, replica_id_t member_in) const override; - }; class SoloReplServiceCPHandler : public CPCallbacks { diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 22384917f..d56d498fb 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -280,7 +280,7 @@ class RaftReplDevTest : public testing::Test { flip::FlipClient m_fc{iomgr_flip::instance()}; #endif }; - +#if 0 TEST_F(RaftReplDevTest, All_Append_Restart_Append) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); g_helper->sync_for_test_start(); @@ -354,7 +354,7 @@ TEST_F(RaftReplDevTest, All_Append_Fetch_Remote_Data) { g_helper->sync_for_cleanup_start(); } - +#endif // do some io before restart; TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync) { LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); @@ -407,6 +407,7 @@ TEST_F(RaftReplDevTest, All_restart_one_follower_inc_resync) { this->validate_all_data(); g_helper->sync_for_cleanup_start(); } +#if 0 // // staging the fetch remote data with flip point; // @@ -524,7 +525,7 @@ TEST_F(RaftReplDevTest, All_restart_leader) { this->validate_all_data(); g_helper->sync_for_cleanup_start(); } - +#endif // TODO // double restart: // 1. restart one follower(F1) while I/O keep running.