Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
MadSchemas committed Feb 16, 2024
1 parent a783b04 commit d3b81e1
Show file tree
Hide file tree
Showing 41 changed files with 606 additions and 301 deletions.
5 changes: 5 additions & 0 deletions cpp_src/client/rpcclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,11 @@ Error RPCClient::GetSnapshot(std::string_view nsName, const SnapshotOpts& opts,
int64_t count = int64_t(args[1]);
snapshot = Snapshot(&conn_, int(args[0]), count, int64_t(args[2]), lsn_t(int64_t(args[3])),
count > 0 ? p_string(args[4]) : p_string(), config_.NetTimeout);
const unsigned nextArgNum = count > 0 ? 5 : 4;
if (args.size() >= nextArgNum + 1) {
snapshot.ClusterizationStat(ClusterizationStatus{.leaderId = int(args[nextArgNum]),
.role = reindexer::ClusterizationStatus::Role(int(args[nextArgNum + 1]))});
}
}
} catch (const Error& err) {
return err;
Expand Down
4 changes: 4 additions & 0 deletions cpp_src/client/snapshot.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "core/namespace/namespacestat.h"
#include "core/namespace/snapshot/snapshotrecord.h"
#include "net/cproto/coroclientconnection.h"

Expand Down Expand Up @@ -40,6 +41,8 @@ class Snapshot {
Iterator end() noexcept { return Iterator{this, i_.count_}; }
size_t Size() const noexcept { return i_.count_; }
bool HasRawData() const noexcept { return i_.rawCount_; }
void ClusterizationStat(ClusterizationStatus &&clusterStatus) noexcept { i_.clusterStatus_ = std::move(clusterStatus); }
std::optional<ClusterizationStatus> ClusterizationStat() const noexcept { return i_.clusterStatus_; }

private:
friend class RPCClient;
Expand All @@ -66,6 +69,7 @@ class Snapshot {
net::cproto::CoroClientConnection *conn_;
std::chrono::milliseconds requestTimeout_;
std::chrono::steady_clock::time_point sessionTs_;
std::optional<ClusterizationStatus> clusterStatus_;
};

Impl i_;
Expand Down
9 changes: 7 additions & 2 deletions cpp_src/cluster/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ Error ShardingConfig::FromYAML(const std::string &yaml) {
thisShardId = root["this_shard_id"].as<int>();
reconnectTimeout = std::chrono::milliseconds(root["reconnect_timeout_msec"].as<int>(reconnectTimeout.count()));
shardsAwaitingTimeout = std::chrono::seconds(root["shards_awaiting_timeout_sec"].as<int>(shardsAwaitingTimeout.count()));
configRollbackTimeout = std::chrono::seconds(root["config_rollback_timeout_sec"].as<int>(configRollbackTimeout.count()));
proxyConnCount = root["proxy_conn_count"].as<int>(proxyConnCount);
proxyConnConcurrency = root["proxy_conn_concurrency"].as<int>(proxyConnConcurrency);
proxyConnThreads = root["proxy_conn_threads"].as<int>(proxyConnThreads);
Expand Down Expand Up @@ -921,6 +922,7 @@ Error ShardingConfig::FromJSON(const gason::JsonNode &root) {
thisShardId = root["this_shard_id"].As<int>();
reconnectTimeout = std::chrono::milliseconds(root["reconnect_timeout_msec"].As<int>(reconnectTimeout.count()));
shardsAwaitingTimeout = std::chrono::seconds(root["shards_awaiting_timeout_sec"].As<int>(shardsAwaitingTimeout.count()));
configRollbackTimeout = std::chrono::seconds(root["config_rollback_timeout_sec"].As<int>(configRollbackTimeout.count()));
proxyConnCount = root["proxy_conn_count"].As<int>(proxyConnCount);
proxyConnConcurrency = root["proxy_conn_concurrency"].As<int>(proxyConnConcurrency);
proxyConnThreads = root["proxy_conn_threads"].As<int>(proxyConnThreads);
Expand All @@ -936,8 +938,9 @@ Error ShardingConfig::FromJSON(const gason::JsonNode &root) {
bool operator==(const ShardingConfig &lhs, const ShardingConfig &rhs) {
return lhs.namespaces == rhs.namespaces && lhs.thisShardId == rhs.thisShardId && lhs.shards == rhs.shards &&
lhs.reconnectTimeout == rhs.reconnectTimeout && lhs.shardsAwaitingTimeout == rhs.shardsAwaitingTimeout &&
lhs.proxyConnCount == rhs.proxyConnCount && lhs.proxyConnConcurrency == rhs.proxyConnConcurrency &&
rhs.proxyConnThreads == lhs.proxyConnThreads && rhs.sourceId == lhs.sourceId;
lhs.configRollbackTimeout == rhs.configRollbackTimeout && lhs.proxyConnCount == rhs.proxyConnCount &&
lhs.proxyConnConcurrency == rhs.proxyConnConcurrency && rhs.proxyConnThreads == lhs.proxyConnThreads &&
rhs.sourceId == lhs.sourceId;
}
bool operator==(const ShardingConfig::Key &lhs, const ShardingConfig::Key &rhs) {
return lhs.shardId == rhs.shardId && lhs.algorithmType == rhs.algorithmType && lhs.RelaxCompare(rhs.values) == 0;
Expand Down Expand Up @@ -971,6 +974,7 @@ YAML::Node ShardingConfig::GetYAMLObj() const {
yaml["this_shard_id"] = thisShardId;
yaml["reconnect_timeout_msec"] = reconnectTimeout.count();
yaml["shards_awaiting_timeout_sec"] = shardsAwaitingTimeout.count();
yaml["config_rollback_timeout_sec"] = configRollbackTimeout.count();
yaml["proxy_conn_count"] = proxyConnCount;
yaml["proxy_conn_concurrency"] = proxyConnConcurrency;
yaml["proxy_conn_threads"] = proxyConnThreads;
Expand Down Expand Up @@ -1014,6 +1018,7 @@ void ShardingConfig::GetJSON(JsonBuilder &jb) const {
jb.Put("this_shard_id", thisShardId);
jb.Put("reconnect_timeout_msec", reconnectTimeout.count());
jb.Put("shards_awaiting_timeout_sec", shardsAwaitingTimeout.count());
jb.Put("config_rollback_timeout_sec", configRollbackTimeout.count());
jb.Put("proxy_conn_count", proxyConnCount);
jb.Put("proxy_conn_concurrency", proxyConnConcurrency);
jb.Put("proxy_conn_threads", proxyConnThreads);
Expand Down
4 changes: 4 additions & 0 deletions cpp_src/cluster/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ namespace cluster {

inline uint32_t GetConsensusForN(uint32_t n) noexcept { return n / 2 + 1; }
constexpr auto kStatusCmdTimeout = std::chrono::seconds(3);
constexpr size_t kMaxRetriesOnRoleSwitchAwait = 50;
constexpr auto kRoleSwitchStepTime = std::chrono::milliseconds(150);

struct NodeData {
int serverId = -1;
Expand Down Expand Up @@ -191,6 +193,7 @@ constexpr uint32_t kDefaultShardingProxyConnThreads = 4;
struct ShardingConfig {
static constexpr unsigned serverIdPos = 53;
static constexpr int64_t serverIdMask = (((1ll << 10) - 1) << serverIdPos); // 01111111111000...000
static constexpr auto kDefaultRollbackTimeout = std::chrono::seconds(30);

struct Key {
Error FromYAML(const YAML::Node& yaml, const std::map<int, std::vector<std::string>>& shards, KeyValueType& valuesType,
Expand Down Expand Up @@ -236,6 +239,7 @@ struct ShardingConfig {
int thisShardId = ShardingKeyType::ProxyOff;
std::chrono::milliseconds reconnectTimeout = std::chrono::milliseconds(3000);
std::chrono::seconds shardsAwaitingTimeout = std::chrono::seconds(30);
std::chrono::seconds configRollbackTimeout = kDefaultRollbackTimeout;
int proxyConnCount = kDefaultShardingProxyConnCount;
int proxyConnConcurrency = kDefaultShardingProxyCoroPerConn;
int proxyConnThreads = kDefaultShardingProxyConnThreads;
Expand Down
12 changes: 12 additions & 0 deletions cpp_src/cluster/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include <string>
#include "core/type_consts.h"

#include <iostream>
#include "tools/assertrx.h"

namespace reindexer {
namespace cluster {

Expand Down Expand Up @@ -44,11 +47,20 @@ class Logger {
template <typename F>
void Log(LogLevel l, F&& f) const {
if (l <= GetLevel()) {
try {
std::string str = f();
if (!str.empty()) {
const auto outLevel = minOutputLogLevel_ < l ? minOutputLogLevel_ : l;
print(outLevel, str);
}
} catch (std::exception& e) {
std::cout << "!!!!!" << e.what() << std::endl;
assertrx(false);
}
catch (...) {
std::cout << "!!!!!<unknown error>" << std::endl;
assertrx(false);
}
}
}

Expand Down
170 changes: 82 additions & 88 deletions cpp_src/cluster/raftmanager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,84 +33,25 @@ void RaftManager::Configure(const ReplicationConfigData& baseConfig, const Clust
}
}

Error RaftManager::SendDesiredLeaderId(int nextServerId) {
logTrace("%d SendDesiredLeaderId nextLeaderId = %d", serverId_, nextServerId);
size_t nextServerNodeIndex = nodes_.size();
for (size_t i = 0; i < nodes_.size(); i++) {
if (nodes_[i].serverId == nextServerId) {
nextServerNodeIndex = i;
break;
}
}

std::shared_ptr<void> CloseConnection(nullptr, [this](void*) {
if (GetLeaderId() != serverId_) { // leader node clients, used for pinging
coroutine::wait_group wgStop;
for (auto& node : nodes_) {
loop_.spawn(wgStop, [&node]() { node.client.Stop(); });
}
wgStop.wait();
}
});

if (nextServerNodeIndex != nodes_.size()) {
Error err = clientStatus(nextServerNodeIndex, kDesiredLeaderTimeout);
if (!err.ok()) {
return Error(err.code(), "Target node %s is not available.", nodes_[nextServerNodeIndex].dsn);
}
}

uint32_t okCount = 1;
coroutine::wait_group wg;
std::string errString;

for (size_t nodeId = 0; nodeId < nodes_.size(); ++nodeId) {
if (nodeId == nextServerNodeIndex) {
continue;
}

loop_.spawn(wg, [this, nodeId, nextServerId, &errString, &okCount] {
try {
const auto err = sendDesiredServerIdToNode(nodeId, nextServerId);
if (err.ok()) {
++okCount;
} else {
errString += "[" + err.what() + "]";
}
} catch (...) {
logInfo("%d: Unable to send desired leader: got unknonw exception", serverId_);
}
});
}
wg.wait();
if (nextServerNodeIndex != nodes_.size()) {
Error err = sendDesiredServerIdToNode(nextServerNodeIndex, nextServerId);
if (!err.ok()) {
return err;
}
okCount++;
}

if (okCount >= GetConsensusForN(nodes_.size() + 1)) {
return errOK;
}

return Error(errNetwork, "Can't send nextLeaderId to servers okCount %d err: %s", okCount, errString);
void RaftManager::SetDesiredLeaderId(int serverId) {
logInfo("%d Set (%d) as a desired leader", serverId_, serverId);
nextServerId_.SetNextServerId(serverId);
lastLeaderPingTs_ = {ClockT::time_point()};
}

RaftInfo::Role RaftManager::Elections() {
std::vector<coroutine::routine_t> succeedRoutines;
succeedRoutines.reserve(nodes_.size());
while (!terminate_.load()) {
const int nextServerId = nextServerId_.GetNextServerId();
int32_t term = beginElectionsTerm(nextServerId);
logInfo("Starting new elections term for %d. Term number: %d", serverId_, term);
if (nextServerId != -1 && nextServerId != serverId_) {
endElections(term, RaftInfo::Role::Follower);
const bool isDesiredLeader = (nextServerId == serverId_);
if (!isDesiredLeader && nextServerId != -1) {
endElections(GetTerm(), RaftInfo::Role::Follower);
logInfo("Skipping elections (desired leader id is %d)", serverId_, nextServerId);
return RaftInfo::Role::Follower;
}
const bool isDesiredLeader = (nextServerId == serverId_);
int32_t term = beginElectionsTerm(nextServerId);
logInfo("Starting new elections term for %d. Term number: %d", serverId_, term);
coroutine::wait_group wg;
succeedRoutines.resize(0);
struct {
Expand All @@ -122,7 +63,7 @@ RaftInfo::Role RaftManager::Elections() {
loop_.spawn(wg, [this, &electionsStat, nodeId, term, &succeedRoutines, isDesiredLeader] {
auto& node = nodes_[nodeId];
if (!node.client.Status().ok()) {
node.client.Connect(node.dsn, loop_, client::ConnectOpts().WithExpectedClusterID(clusterID_));
node.client.Connect(node.dsn, loop_, createConnectionOpts());
}
NodeData suggestion, result;
suggestion.serverId = serverId_;
Expand Down Expand Up @@ -159,10 +100,8 @@ RaftInfo::Role RaftManager::Elections() {

const bool leaderIsAvailable = !isDesiredLeader && LeaderIsAvailable(ClockT::now());
if (leaderIsAvailable || !isConsensus(electionsStat.succeedPhase1)) {
logInfo(
"%d: Skip leaders ping. Elections are outdated. leaderIsAvailable: %d. Successfull "
"responses: %d",
serverId_, leaderIsAvailable ? 1 : 0, electionsStat.succeedPhase1);
logInfo("%d: Skip leaders ping. Elections are outdated. leaderIsAvailable: %d. Successfull responses: %d", serverId_,
leaderIsAvailable ? 1 : 0, electionsStat.succeedPhase1);
return; // This elections are outdated
}
err = node.client.LeadersPing(suggestion);
Expand Down Expand Up @@ -201,8 +140,8 @@ RaftInfo::Role RaftManager::Elections() {
return RaftInfo::Role::Follower;
}

bool RaftManager::LeaderIsAvailable(ClockT::time_point now) {
return ((now - lastLeaderPingTs_.load()) < kMinLeaderAwaitInterval) || (GetRole() == RaftInfo::Role::Leader);
bool RaftManager::LeaderIsAvailable(ClockT::time_point now) const noexcept {
return hasRecentLeadersPing(now) || (GetRole() == RaftInfo::Role::Leader);
}

bool RaftManager::FollowersAreAvailable() {
Expand Down Expand Up @@ -392,24 +331,79 @@ bool RaftManager::endElections(int32_t term, RaftInfo::Role result) {

bool RaftManager::isConsensus(size_t num) const noexcept { return num >= GetConsensusForN(nodes_.size() + 1); }

Error RaftManager::clientStatus(size_t index, std::chrono::seconds timeout) {
Error err;
if (!nodes_[index].client.WithTimeout(timeout).Status(true).ok()) {
err = nodes_[index].client.Connect(nodes_[index].dsn, loop_, client::ConnectOpts().WithExpectedClusterID(clusterID_));
if (err.ok()) {
err = nodes_[index].client.WithTimeout(timeout).Status(true);
bool RaftManager::hasRecentLeadersPing(RaftManager::ClockT::time_point now) const noexcept {
return (now - lastLeaderPingTs_.load()) < kMinLeaderAwaitInterval;
}

RaftManager::DesiredLeaderIdSender::DesiredLeaderIdSender(net::ev::dynamic_loop& loop, const std::vector<RaftNode>& nodes, int serverId,
int nextServerId, const Logger& log)
: loop_(loop), nodes_(nodes), log_(log), thisServerId_(serverId), nextServerId_(nextServerId), nextServerNodeIndex_(nodes_.size()) {
client::ReindexerConfig rpcCfg;
rpcCfg.AppName = "raft_manager_tmp";
rpcCfg.NetTimeout = kRaftTimeout;
rpcCfg.EnableCompression = false;
rpcCfg.RequestDedicatedThread = false; // No dedicated threads required for the leader switch command
clients_.reserve(nodes_.size());
for (size_t i = 0; i < nodes_.size(); ++i) {
auto& client = clients_.emplace_back(rpcCfg);
auto err = client.Connect(nodes_[i].dsn, loop_);
(void)err; // Ignore connection errors. Handle them on the status phase
if (nodes_[i].serverId == nextServerId_) {
nextServerNodeIndex_ = i;
err = client.WithTimeout(kDesiredLeaderTimeout).Status(true);
if (!err.ok()) {
throw Error(err.code(), "Target node %s is not available.", nodes_[i].dsn);
}
}
}
return err;
}

Error RaftManager::sendDesiredServerIdToNode(size_t index, int nextServerId) {
Error err = clientStatus(index, kDesiredLeaderTimeout);
if (!err.ok()) {
return err;
Error RaftManager::DesiredLeaderIdSender::operator()() {
uint32_t okCount = 1;
coroutine::wait_group wg;
std::string errString;

const bool thisNodeIsNext = (nextServerNodeIndex_ == nodes_.size());
if (!thisNodeIsNext) {
logTrace("%d Checking if node with desired server ID (%d) is available", thisServerId_, nextServerId_);
if (auto err = clients_[nextServerNodeIndex_].WithTimeout(kDesiredLeaderTimeout).Status(true); !err.ok()) {
return Error(err.code(), "Target node %s is not available.", nodes_[nextServerNodeIndex_].dsn);
}
}
for (size_t nodeId = 0; nodeId < clients_.size(); ++nodeId) {
if (nodeId == nextServerNodeIndex_) {
continue;
}

loop_.spawn(wg, [this, nodeId, &errString, &okCount] {
try {
logTrace("%d Sending desired server ID (%d) to node with server ID %d", thisServerId_, nextServerId_,
nodes_[nodeId].serverId);
if (auto err = sendDesiredServerIdToNode(nodeId); err.ok()) {
++okCount;
} else {
errString += "[" + err.what() + "]";
}
} catch (...) {
logInfo("%d: Unable to send desired leader: got unknonw exception", thisServerId_);
}
});
}
wg.wait();

if (!thisNodeIsNext) {
logTrace("%d Sending desired server ID (%d) to node with server ID %d", thisServerId_, nextServerId_,
nodes_[nextServerNodeIndex_].serverId);
if (auto err = sendDesiredServerIdToNode(nextServerNodeIndex_); !err.ok()) {
return err;
}
++okCount;
}

if (okCount < GetConsensusForN(nodes_.size() + 1)) {
return Error(errNetwork, "Can't send nextLeaderId to servers okCount %d err: %s", okCount, errString);
}
logTrace("%d Sending desired server ID (%d) to node with server ID %d", serverId_, nextServerId, nodes_[index].serverId);
return nodes_[index].client.WithTimeout(kDesiredLeaderTimeout).SetDesiredLeaderId(nextServerId);
return Error();
}

} // namespace cluster
Expand Down
Loading

0 comments on commit d3b81e1

Please sign in to comment.