Skip to content

Commit

Permalink
Merge branch 'yk_repl_metrix' of github.com:yamingk/HomeStore into yk…
Browse files Browse the repository at this point in the history
…_repl_metrix
  • Loading branch information
yamingk committed Feb 28, 2024
2 parents b39418f + 4cf7ad9 commit 563398b
Show file tree
Hide file tree
Showing 19 changed files with 346 additions and 246 deletions.
2 changes: 1 addition & 1 deletion .jenkins/jenkinsfile_nightly
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pipeline {
}
stage("Build") {
steps {
sh "conan create --build missing -o sisl:prerelease=True -o homestore:sanitize=True -o homestore:skip_testing=True -pr debug . ${PROJECT}/${VER}@"
sh "conan create --build missing -o homestore:sanitize=True -o homestore:skip_testing=True -pr debug . ${PROJECT}/${VER}@"
sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/test_index_btree' -exec cp {} .jenkins/test_index_btree \\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/test_meta_blk_mgr' -exec cp {} .jenkins/test_meta_blk_mgr \\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*bin/test_log_store' -exec cp {} .jenkins/test_log_store \\;"
Expand Down
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.1.7"
version = "5.1.10"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
26 changes: 20 additions & 6 deletions src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,30 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {

template < typename ReqT >
btree_status_t put(ReqT& put_req) {
auto cpg = hs()->cp_mgr().cp_guard();
put_req.m_op_context = (void*)cpg.context(cp_consumer_t::INDEX_SVC);
return Btree< K, V >::put(put_req);
auto ret = btree_status_t::success;
do {
auto cpg = hs()->cp_mgr().cp_guard();
put_req.m_op_context = (void*)cpg.context(cp_consumer_t::INDEX_SVC);
ret = Btree< K, V >::put(put_req);
if (ret == btree_status_t::cp_mismatch) {
LOGTRACEMOD(wbcache, "CP Mismatch, retrying put");
}
} while (ret == btree_status_t::cp_mismatch);
return ret;
}

template < typename ReqT >
btree_status_t remove(ReqT& remove_req) {
auto cpg = hs()->cp_mgr().cp_guard();
remove_req.m_op_context = (void*)cpg.context(cp_consumer_t::INDEX_SVC);
return Btree< K, V >::remove(remove_req);
auto ret = btree_status_t::success;
do {
auto cpg = hs()->cp_mgr().cp_guard();
remove_req.m_op_context = (void*)cpg.context(cp_consumer_t::INDEX_SVC);
ret = Btree< K, V >::remove(remove_req);
if (ret == btree_status_t::cp_mismatch) {
LOGTRACEMOD(wbcache, "CP Mismatch, retrying remove");
}
} while (ret == btree_status_t::cp_mismatch);
return ret;
}

protected:
Expand Down
11 changes: 6 additions & 5 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
repl_key rkey; // Unique key for the request
sisl::blob header; // User header
sisl::blob key; // User supplied key for this req
int64_t lsn{0}; // Lsn for this replication req
int64_t lsn{-1}; // Lsn for this replication req
bool is_proposer{false}; // Is the repl_req proposed by this node

//////////////// Value related section /////////////////
sisl::sg_list value; // Raw value - applicable only to leader req
MultiBlkId local_blkid; // Local BlkId for the value
RemoteBlkId remote_blkid; // Corresponding remote blkid for the value
sisl::sg_list value; // Raw value - applicable only to leader req
MultiBlkId local_blkid; // Local BlkId for the value
RemoteBlkId remote_blkid; // Corresponding remote blkid for the value
bool value_inlined{false}; // Is the value inlined in the header itself

//////////////// Journal/Buf related section /////////////////
std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > journal_buf; // Buf for the journal entry
Expand Down Expand Up @@ -244,7 +245,7 @@ class ReplDev {

/// @brief get replication status. If called on follower member
/// this API can return empty result.
virtual std::vector<peer_info> get_replication_status() const = 0;
virtual std::vector< peer_info > get_replication_status() const = 0;

/// @brief Gets the group_id this repldev is working for
/// @return group_id
Expand Down
2 changes: 1 addition & 1 deletion src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ table LogStore {

// Logdev will flush the logs only in a dedicated thread. Turn this on, if flush IO doesn't want to
// intervene with data IO path.
flush_only_in_dedicated_thread: bool = false;
flush_only_in_dedicated_thread: bool = true;
}

table Generic {
Expand Down
6 changes: 1 addition & 5 deletions src/lib/homestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ bool HomeStore::start(const hs_input_params& input, hs_before_services_starting_
if (has_repl_data_service()) {
m_log_service = std::make_unique< LogStoreService >();
m_data_service = std::make_unique< BlkDataService >(std::move(s_custom_chunk_selector));
m_repl_service = GenericReplService::create(std::move(s_repl_app));
} else {
if (has_log_service()) { m_log_service = std::make_unique< LogStoreService >(); }
if (has_data_service()) {
Expand Down Expand Up @@ -191,11 +192,6 @@ void HomeStore::format_and_start(std::map< uint32_t, hs_format_params >&& format
}

void HomeStore::do_start() {
// when coming here:
// 1 if this is the first_time_boot, , the repl app already gets its uuid from upper layer
// 2 if this is not the first_time_boot, the repl app already gets its uuid from the metaservice
// now , we can safely initialize GenericReplService , which will get a correct uuid through get_my_repl_uuid()
if (has_repl_data_service()) m_repl_service = GenericReplService::create(std::move(s_repl_app));
const auto& inp_params = HomeStoreStaticConfig::instance().input;

uint64_t cache_size = resource_mgr().get_cache_size();
Expand Down
15 changes: 7 additions & 8 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,11 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) {
m_last_flush_idx = m_log_idx - 1;
}

m_flush_timer_hdl = iomanager.schedule_global_timer(
HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, true, nullptr /* cookie */,
iomgr::reactor_regex::all_worker,
[this](void*) {
if (m_pending_flush_size.load() && !m_is_flushing.load(std::memory_order_relaxed)) { flush_if_needed(); }
},
true /* wait_to_schedule */);
iomanager.run_on_wait(logstore_service().flush_thread(), [this]() {
m_flush_timer_hdl = iomanager.schedule_thread_timer(HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000,
true /* recurring */, nullptr /* cookie */,
[this](void*) { flush_if_needed(); });
});

handle_unopened_log_stores(format);

Expand Down Expand Up @@ -133,7 +131,8 @@ void LogDev::stop() {
}

// cancel the timer
iomanager.cancel_timer(m_flush_timer_hdl, true);
iomanager.run_on_wait(logstore_service().flush_thread(),
[this]() { iomanager.cancel_timer(m_flush_timer_hdl, true); });

{
folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx);
Expand Down
61 changes: 40 additions & 21 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,62 @@
namespace homestore {

uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
// We don't want to transform anything that is not an app log
if (entry->get_val_type() != nuraft::log_val_type::app_log) { return HomeRaftLogStore::append(entry); }

repl_req_ptr_t rreq = m_sm.transform_journal_entry(entry);
ulong lsn;
if (rreq) {
lsn = HomeRaftLogStore::append(rreq->raft_journal_buf());
m_sm.link_lsn_to_req(rreq, int64_cast(lsn));
RD_LOG(INFO, "Raft Channel: Received log entry rreq=[{}]", rreq->to_compact_string());
} else {
if (rreq->is_proposer || rreq->value_inlined) {
// No need of any transformation for proposer or inline data, since the entry is already meaningful
lsn = HomeRaftLogStore::append(entry);
} else {
lsn = HomeRaftLogStore::append(rreq->raft_journal_buf());
}
m_sm.link_lsn_to_req(rreq, int64_cast(lsn));
RD_LOG(DEBUG, "Raft Channel: Received append log entry rreq=[{}]", rreq->to_compact_string());
return lsn;
}

void ReplLogStore::write_at(ulong index, nuraft::ptr< nuraft::log_entry >& entry) {
// We don't want to transform anything that is not an app log
if (entry->get_val_type() != nuraft::log_val_type::app_log) {
HomeRaftLogStore::write_at(index, entry);
return;
}

repl_req_ptr_t rreq = m_sm.transform_journal_entry(entry);
if (rreq) {
HomeRaftLogStore::write_at(index, rreq->raft_journal_buf());
m_sm.link_lsn_to_req(rreq, int64_cast(index));
RD_LOG(INFO, "Raft Channel: Received log entry rreq=[{}]", rreq->to_compact_string());
} else {
if (rreq->is_proposer || rreq->value_inlined) {
// No need of any transformation for proposer or inline data, since the entry is already meaningful
HomeRaftLogStore::write_at(index, entry);
} else {
HomeRaftLogStore::write_at(index, rreq->raft_journal_buf());
}
m_sm.link_lsn_to_req(rreq, int64_cast(index));
RD_LOG(DEBUG, "Raft Channel: Received write_at log entry rreq=[{}]", rreq->to_compact_string());
}

void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
// Skip this call in leader, since this method will synchronously flush the data, which is not required for
// leader. Leader will call the flush as part of commit after receiving quorum, upon which time, there is a high
// possibility the log entry is already flushed.
if (!m_rd.is_leader()) {
int64_t end_lsn = int64_cast(start_lsn + count - 1);
int64_t end_lsn = int64_cast(start_lsn + count - 1);

// Start fetch the batch of data for this lsn range from remote if its not available yet.
auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) {
reqs->emplace_back(m_sm.lsn_to_req(lsn));
// Start fetch the batch of data for this lsn range from remote if its not available yet.
auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) {
auto rreq = m_sm.lsn_to_req(lsn);
// Skip this call in proposer, since this method will synchronously flush the data, which is not required for
// leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a
// high possibility the log entry is already flushed.
if (rreq && rreq->is_proposer) {
RD_LOG(TRACE, "Raft Channel: Ignoring to flush proposer request rreq=[{}]", rreq->to_compact_string());
continue;
}
reqs->emplace_back(std::move(rreq));
}

RD_LOG(TRACE, "Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count,
reqs->size());

// All requests are from proposer for data write, so as mentioned above we can skip the flush for now
if (!reqs->empty()) {
// Check the map if data corresponding to all of these requsts have been received and written. If not, schedule
// a fetch and write. Once all requests are completed and written, these requests are poped out of the map and
// the future will be ready.
Expand All @@ -60,9 +80,8 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
for (auto const& rreq : *reqs) {
if (rreq) { rreq->state.fetch_or(uint32_cast(repl_req_state_t::LOG_FLUSHED)); }
}

sisl::VectorPool< repl_req_ptr_t >::free(reqs);
}
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
}

std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); }
Expand Down
4 changes: 2 additions & 2 deletions src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ std::string repl_req_ctx::to_string() const {
}

std::string repl_req_ctx::to_compact_string() const {
return fmt::format("dsn={} term={} lsn={} state={} ref={}", rkey.dsn, rkey.term, lsn, req_state_name(state.load()),
this->use_count());
return fmt::format("dsn={} term={} lsn={} Blkid={} state=[{}]", rkey.dsn, rkey.term, lsn, local_blkid.to_string(),
req_state_name(state.load()));
}

} // namespace homestore
Loading

0 comments on commit 563398b

Please sign in to comment.