Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication Snapshot and Compact #364

Closed
wants to merge 14 commits into from
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.0.1"
version = "6.1.0"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
64 changes: 64 additions & 0 deletions src/include/homestore/logstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
* to set this to true on cases where there are multiple log stores, so that once all in-memory truncation is
* completed, a device truncation can be triggered for all the logstores. The device truncation is more
* expensive and grouping them together yields better results.
*
* Note: this flag currently is not used, meaning all truncate is in memory only;
* @return number of records to truncate
*/
void truncate(logstore_seq_num_t upto_seq_num, bool in_memory_truncate_only = true);
Expand Down Expand Up @@ -274,18 +276,80 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {

nlohmann::json get_status(int verbosity) const;

/**
* Retrieves the truncation information before device truncation.
*
* @return A constant reference to the truncation_info object representing the truncation information.
*/
const truncation_info& pre_device_truncation();

/**
* \brief post device truncation processing.
*
* This function is used to update safe truncation boundary to the specified `trunc_upto_key`.
*
* \param trunc_upto_key The key indicating the log entry up to which truncation has been performed.
*/
void post_device_truncation(const logdev_key& trunc_upto_key);

/**
* Handles the completion of a write operation in the log store.
*
* @param req The logstore_req object representing the completed write operation.
* @param ld_key The logdev_key associated with the completed write operation.
*/
void on_write_completion(logstore_req* req, const logdev_key& ld_key);

/**
* \brief Handles the completion of a read operation in the log store.
*
* This function is called when a read operation in the log store has completed.
* It takes a pointer to a logstore_req object and a logdev_key object as parameters.
*
* \param req The pointer to the logstore_req object representing the read request.
* \param ld_key The logdev_key object representing the key used for the read operation.
*/
void on_read_completion(logstore_req* req, const logdev_key& ld_key);

/**
* @brief Handles the event when a log is found.
*
* This function is called when a log is found in the log store. It takes the sequence number of the log,
* the log device key, the flush log device key, and the log buffer as parameters.
*
* During LogDev::do_load during recovery boot, whenever a log is found, the associated logstore's on_log_found
* method is called.
*
* @param seq_num The sequence number of the log.
* @param ld_key The log device key.
* @param flush_ld_key The flush log device key.
* @param buf The log buffer.
*/
void on_log_found(logstore_seq_num_t seq_num, const logdev_key& ld_key, const logdev_key& flush_ld_key,
log_buffer buf);
/**
* @brief Handles the completion of a batch flush operation to update internal state.
*
* This function is called when a batch flush operation is completed.
* It takes a `logdev_key` parameter that represents the key of the flushed batch.
*
* This function is also called during log store recovery;
*
* @param flush_batch_ld_key The key of the flushed batch.
*/
void on_batch_completion(const logdev_key& flush_batch_ld_key);

private:
/**
* Truncates the log store up to the specified sequence number.
*
* @param upto_seq_num The sequence number up to which the log store should be truncated.
*/
void do_truncate(logstore_seq_num_t upto_seq_num);

int search_max_le(logstore_seq_num_t input_sn);

private:
logstore_id_t m_store_id;
std::shared_ptr< LogDev > m_logdev;
sisl::StreamTracker< logstore_record > m_records;
Expand Down
6 changes: 6 additions & 0 deletions src/include/homestore/logstore_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ class LogStoreService {
uint32_t used_size() const;
uint32_t total_size() const;
iomgr::io_fiber_t flush_thread() { return m_flush_fiber; }

/**
* This is used when the actual LogDev truncate is triggered;
*
* @return The IO fiber associated with the truncate thread.
*/
iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; }

private:
Expand Down
3 changes: 3 additions & 0 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ using remote_blkid_list_t = folly::small_vector< RemoteBlkId, 4 >;
using replica_id_t = uuid_t;
using group_id_t = uuid_t;

using store_lsn_t = int64_t;
using repl_lsn_t = int64_t;

struct peer_info {
// Peer ID.
replica_id_t id_;
Expand Down
8 changes: 8 additions & 0 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ struct repl_key {
std::string to_string() const { return fmt::format("server={}, term={}, dsn={}", server_id, term, dsn); }
};

struct repl_snapshot {
uint64_t last_log_idx_{0};
uint64_t last_log_term_{0};
};

struct repl_journal_entry;
struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::thread_safe_counter > {
friend class SoloReplDev;
Expand Down Expand Up @@ -192,6 +197,9 @@ class ReplDevListener {
/// @brief Called when the replica set is being stopped
virtual void on_replica_stop() = 0;

/// @brief Called when the snapshot is being created by nuraft;
virtual AsyncReplResult<> on_create_snapshot(repl_snapshot& s) = 0;

private:
std::weak_ptr< ReplDev > m_repl_dev;
};
Expand Down
3 changes: 2 additions & 1 deletion src/include/homestore/replication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ VENUM(repl_impl_type, uint8_t,
solo // For single node - no replication
);


class ReplApplication;

class ReplicationService {
Expand Down Expand Up @@ -53,6 +52,8 @@ class ReplicationService {
virtual hs_stats get_cap_stats() const = 0;

virtual meta_sub_type get_meta_blk_name() const = 0;

// virtual void resource_audit() = 0;
};

//////////////// Application which uses Replication needs to be provide the following callbacks ////////////////
Expand Down
2 changes: 1 addition & 1 deletion src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ CPManager::CPManager() :
nullptr);

resource_mgr().register_dirty_buf_exceed_cb(
[this]([[maybe_unused]] int64_t dirty_buf_count) { this->trigger_cp_flush(false /* false */); });
[this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) { this->trigger_cp_flush(false /* force */); });

start_cp_thread();
}
Expand Down
26 changes: 21 additions & 5 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,20 @@ table ResourceLimits {
/* precentage of memory used during recovery */
memory_in_recovery_precent: uint32 = 40;

/* journal size used percentage */
journal_size_percent: uint32 = 50;
/* journal size used percentage high watermark -- trigger cp */
journal_vdev_size_percent: uint32 = 50;

/* journal size used percentage critical watermark -- trigger truncation */
journal_vdev_size_percent_critical: uint32 = 90;

/* journal descriptor size (NuObject: Per PG) Threshold in MB -- ready for truncation */
journal_descriptor_size_threshold_mb: uint32 = 2048(hotswap);

/* num entries that raft logstore wants to reserve -- its truncate should not across this */
raft_logstore_reserve_threshold: uint32 = 2000000(hotswap);

/* resource audit timer in ms */
resource_audit_timer_ms: uint32 = 120000;

/* We crash if volume is 95 percent filled and no disk space left */
vol_threshhold_used_size_p: uint32 = 95;
Expand Down Expand Up @@ -199,14 +211,14 @@ table Consensus {
heartbeat_period_ms: uint32 = 250;

// Re-election timeout low and high mark
elect_to_low_ms: uint32 = 900;
elect_to_high_ms: uint32 = 1400;
elect_to_low_ms: uint32 = 800;
elect_to_high_ms: uint32 = 1700;

// When a new member is being synced, the batch size of number of logs to be shipped
log_sync_batch_size: int32 = 100;

// Log distance with which snapshot/compact needs to happen. 0 means snapshot is disabled
snapshot_freq_distance: int32 = 0;
snapshot_freq_distance: int32 = 20000;

// Max append batch size
max_append_batch_size: int32 = 64;
Expand All @@ -226,6 +238,10 @@ table Consensus {
// Leadership expiry (=0 indicates 20 times heartbeat period), set -1 to never expire
leadership_expiry_ms: int32 = 0;

// Num reserved log items while triggering compact from raft server;
// This is not the real number reserved log items, it is the maximum truncation barrier (truncate can't across this);
num_reserved_log_items: uint32 = 2000;

// data fetch max size limit in KB (2MB by default)
data_fetch_max_size_kb: uint32 = 2048;

Expand Down
89 changes: 79 additions & 10 deletions src/lib/common/resource_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,75 @@
*
*********************************************************************************/
#include <homestore/homestore.hpp>
#include <homestore/logstore_service.hpp>
#include <homestore/replication_service.hpp>
#include <iomgr/iomgr_flip.hpp>
#include "resource_mgr.hpp"
#include "homestore_assert.hpp"
#include "replication/repl_dev/raft_repl_dev.h"

namespace homestore {
ResourceMgr& resource_mgr() { return hs()->resource_mgr(); }

void ResourceMgr::set_total_cap(uint64_t total_cap) { m_total_cap = total_cap; }
void ResourceMgr::start(uint64_t total_cap) {
m_total_cap = total_cap;
start_timer();
}

void ResourceMgr::stop() {
LOGINFO("Cancel resource manager timer.");
iomanager.cancel_timer(m_res_audit_timer_hdl);
m_res_audit_timer_hdl = iomgr::null_timer_handle;
}

//
// 1. Conceptually in rare case(not poosible for NuObject, possibly true for NuBlox2.0) truncate itself can't garunteen
// the space is freed up upto satisfy resource manager. e.g. multiple log stores on this same descriptor and one
// logstore lagging really behind and not able to truncate much space. Doing multiple truncation won't help in this
// case.
// 2. And any write on any other descriptor will trigger a high_watermark_check, and if it were to trigger critial
// alert on this vdev, truncation will be made immediately on all descriptors;
// 3. If still no space can be freed, there is nothing we can't here to back pressure to above layer by rejecting log
// writes on this descriptor;
//
void ResourceMgr::trigger_truncate() {
if (hs()->has_repl_data_service()) {
// first make sure all repl dev's unlyding raft log store make corresponding reservation during
// truncate -- set the safe truncate boundary for each raft log store;
hs()->repl_service().iterate_repl_devs([](cshared< ReplDev >& rd) {
// lock is already taken by repl service layer;
std::dynamic_pointer_cast< RaftReplDev >(rd)->truncate(
HS_DYNAMIC_CONFIG(resource_limits.raft_logstore_reserve_threshold));
});

// next do device truncate which go through all logdevs and truncate them;
hs()->logstore_service().device_truncate();
}

// TODO: add device_truncate callback to audit how much space was freed per each LogDev and add related
// metrics;
}

void ResourceMgr::start_timer() {
auto const res_mgr_timer_ms = HS_DYNAMIC_CONFIG(resource_limits.resource_audit_timer_ms);
LOGINFO("resource audit timer is set to {} usec", res_mgr_timer_ms);

m_res_audit_timer_hdl = iomanager.schedule_global_timer(
res_mgr_timer_ms * 1000 * 1000, true /* recurring */, nullptr /* cookie */, iomgr::reactor_regex::all_worker,
[this](void*) {
// all resource timely audit routine should arrive here;
this->trigger_truncate();
},
true /* wait_to_schedule */);
}

/* monitor dirty buffer count */
void ResourceMgr::inc_dirty_buf_size(const uint32_t size) {
HS_REL_ASSERT_GT(size, 0);
const auto dirty_buf_cnt = m_hs_dirty_buf_cnt.fetch_add(size, std::memory_order_relaxed);
COUNTER_INCREMENT(m_metrics, dirty_buf_cnt, size);
if (m_dirty_buf_exceed_cb && ((dirty_buf_cnt + size) > get_dirty_buf_limit())) {
m_dirty_buf_exceed_cb(dirty_buf_cnt + size);
m_dirty_buf_exceed_cb(dirty_buf_cnt + size, false /* critical */);
}
}

Expand Down Expand Up @@ -106,22 +160,37 @@ uint64_t ResourceMgr::get_cache_size() const {
return ((HS_STATIC_CONFIG(input.io_mem_size()) * HS_DYNAMIC_CONFIG(resource_limits.cache_size_percent)) / 100);
}

/* monitor journal size */
bool ResourceMgr::check_journal_size(const uint64_t used_size, const uint64_t total_size) {
if (m_journal_exceed_cb) {
bool ResourceMgr::check_journal_descriptor_size(const uint64_t used_size) const {
return (used_size >= get_journal_descriptor_size_limit());
}

/* monitor journal vdev size */
bool ResourceMgr::check_journal_vdev_size(const uint64_t used_size, const uint64_t total_size) {
if (m_journal_vdev_exceed_cb) {
const uint32_t used_pct = (100 * used_size / total_size);
if (used_pct >= HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent)) {
m_journal_exceed_cb(used_size);
if (used_pct >= get_journal_vdev_size_limit()) {
m_journal_vdev_exceed_cb(used_size, used_pct >= get_journal_vdev_size_critical_limit() /* is_critical */);
HS_LOG_EVERY_N(WARN, base, 50, "high watermark hit, used percentage: {}, high watermark percentage: {}",
used_pct, HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent));
used_pct, get_journal_vdev_size_limit());
return true;
}
}
return false;
}
void ResourceMgr::register_journal_exceed_cb(exceed_limit_cb_t cb) { m_journal_exceed_cb = std::move(cb); }

uint32_t ResourceMgr::get_journal_size_limit() const { return HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent); }
void ResourceMgr::register_journal_vdev_exceed_cb(exceed_limit_cb_t cb) { m_journal_vdev_exceed_cb = std::move(cb); }

uint32_t ResourceMgr::get_journal_descriptor_size_limit() const {
return HS_DYNAMIC_CONFIG(resource_limits.journal_descriptor_size_threshold_mb) * 1024 * 1024;
}

uint32_t ResourceMgr::get_journal_vdev_size_critical_limit() const {
return HS_DYNAMIC_CONFIG(resource_limits.journal_vdev_size_percent_critical);
}

uint32_t ResourceMgr::get_journal_vdev_size_limit() const {
return HS_DYNAMIC_CONFIG(resource_limits.journal_vdev_size_percent);
}

/* monitor chunk size */
void ResourceMgr::check_chunk_free_size_and_trigger_cp(uint64_t free_size, uint64_t alloc_size) {}
Expand Down
Loading
Loading