Skip to content

Commit

Permalink
Add get_leader_id/get_replication_status into replicationService
Browse files Browse the repository at this point in the history
Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen committed Jan 31, 2024
1 parent 198657e commit 8af95cf
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 1 deletion.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.0.7"
version = "5.1.1"
homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
9 changes: 9 additions & 0 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ using remote_blkid_list_t = folly::small_vector< RemoteBlkId, 4 >;
using replica_id_t = uuid_t;
using group_id_t = uuid_t;

struct peer_info {
// Peer ID.
replica_id_t id_;
// The last replication index that the peer has, from this server's point of view.
uint64_t replication_idx_;
// The elapsed time since the last successful response from this peer, set to 0 on leader
uint64_t last_succ_resp_us_;
};

} // namespace homestore

// hash function definitions
Expand Down
7 changes: 7 additions & 0 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ class ReplDev {
/// @return true or false
virtual bool is_leader() const = 0;

/// @brief get the leader replica_id of given group
virtual const replica_id_t get_leader_id() const = 0;

/// @brief get replication status. If called on follower member
/// this API can return empty result.
virtual std::vector<peer_info> get_replication_status() const = 0;

/// @brief Gets the group_id this repldev is working for
/// @return group_id
virtual group_id_t group_id() const = 0;
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/replication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ VENUM(repl_impl_type, uint8_t,
solo // For single node - no replication
);


class ReplApplication;

class ReplicationService {
Expand Down
17 changes: 17 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "replication/service/raft_repl_service.h"
#include "replication/repl_dev/raft_repl_dev.h"
#include "push_data_rpc_generated.h"
#include "boost/lexical_cast.hpp"

namespace homestore {
std::atomic< uint64_t > RaftReplDev::s_next_group_ordinal{1};
Expand Down Expand Up @@ -374,6 +375,22 @@ AsyncReplResult<> RaftReplDev::become_leader() {

bool RaftReplDev::is_leader() const { return m_repl_svc_ctx->is_raft_leader(); }

const replica_id_t RaftReplDev::get_leader_id() const {
auto leader = m_repl_svc_ctx->raft_leader_id();
return boost::lexical_cast< replica_id_t >(leader);
}

std::vector< peer_info > RaftReplDev::get_replication_status() const {
std::vector< peer_info > pi;
auto rep_status = m_repl_svc_ctx->get_raft_status();
for (auto const& pinfo : rep_status) {
pi.emplace_back(peer_info{.id_ = boost::lexical_cast< replica_id_t >(pinfo.id_),
.replication_idx_ = pinfo.last_log_idx_,
.last_succ_resp_us_ = pinfo.last_succ_resp_us_});
}
return pi;
}

uint32_t RaftReplDev::get_blk_size() const { return data_service().get_blk_size(); }

nuraft_mesg::repl_service_ctx* RaftReplDev::group_msg_service() { return m_repl_svc_ctx.get(); }
Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class RaftReplDev : public ReplDev,
void async_free_blks(int64_t lsn, MultiBlkId const& blkid) override;
AsyncReplResult<> become_leader() override;
bool is_leader() const override;
const replica_id_t get_leader_id() const override;
std::vector<peer_info> get_replication_status() const override;
group_id_t group_id() const override { return m_group_id; }
std::string group_id_str() const { return boost::uuids::to_string(m_group_id); }
std::string rdev_name() const { return m_rdev_name; }
Expand Down
6 changes: 6 additions & 0 deletions src/lib/replication/repl_dev/solo_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class SoloReplDev : public ReplDev {

AsyncReplResult<> become_leader() override { return make_async_error(ReplServiceError::OK); }
bool is_leader() const override { return true; }
const replica_id_t get_leader_id() const override { return m_group_id; }
std::vector<peer_info> get_replication_status() const override {
return std::vector<peer_info>{peer_info {.id_ = m_group_id, .replication_idx_ = 0, .last_succ_resp_us_ = 0}};
}


uuid_t group_id() const override { return m_group_id; }

Expand All @@ -56,6 +61,7 @@ class SoloReplDev : public ReplDev {
void cp_flush(CP* cp);
void cp_cleanup(CP* cp);


private:
void write_journal(repl_req_ptr_t rreq);
void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx);
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/service/generic_repl_svc.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class SoloReplService : public GenericReplService {
void load_repl_dev(sisl::byte_view const& buf, void* meta_cookie) override;
AsyncReplResult<> replace_member(group_id_t group_id, replica_id_t member_out,
replica_id_t member_in) const override;

};

class SoloReplServiceCPHandler : public CPCallbacks {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ AsyncReplResult<> RaftReplService::replace_member(group_id_t group_id, replica_i
return make_async_error<>(ReplServiceError::NOT_IMPLEMENTED);
}



///////////////////// RaftReplService CP Callbacks /////////////////////////////
std::unique_ptr< CPContext > RaftReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) { return nullptr; }

Expand Down
34 changes: 34 additions & 0 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,40 @@ TEST_F(RaftReplDevTest, All_Append_Restart_Append) {
g_helper->sync_for_cleanup_start();
}


TEST_F(RaftReplDevTest, All_ReplService) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();
auto repl_dev = dynamic_cast< RaftReplDev* >(pick_one_db().repl_dev());
auto group_id = repl_dev->group_id();
auto my_id_str = repl_dev->my_replica_id_str();

auto leader = repl_dev->get_leader_id();
ASSERT_TRUE(leader != replica_id_t())
<< "Error getting leader id for group_id=" << boost::uuids::to_string(group_id).c_str();
auto leader_str = boost::uuids::to_string(leader);
LOGINFO("Got raft leader {} for group {}", leader_str, group_id);

if (g_helper->replica_num() == 0) {
ASSERT_TRUE(leader_str == my_id_str)
<< "Leader id " << leader_str.c_str() << " should equals to my ID " << my_id_str.c_str();
} else {
ASSERT_TRUE(leader_str != my_id_str) << "I am a follower, Leader id " << leader_str.c_str()
<< " should not equals to my ID " << my_id_str.c_str();
}

auto peers_info = repl_dev->get_replication_status();
LOGINFO("Got peers_info size {} for group {}", peers_info.size(), group_id);
if (g_helper->replica_num() == 0) {
auto const num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
EXPECT_TRUE(peers_info.size() == num_replicas)
<< "Expecting peers_info size " << peers_info.size() << " but got " << peers_info.size();
} else {
EXPECT_TRUE(peers_info.size() == 0) << "Expecting zero length on follower, got " << peers_info.size();
}
g_helper->sync_for_cleanup_start();
}

int main(int argc, char* argv[]) {
int parsed_argc{argc};
char** orig_argv = argv;
Expand Down

0 comments on commit 8af95cf

Please sign in to comment.