Skip to content

Commit

Permalink
refactor(FQDN): feather refator on idl/dsn.layer2.thrift
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Sep 23, 2024
1 parent 2bd86c1 commit 9b91b8a
Show file tree
Hide file tree
Showing 39 changed files with 615 additions and 315 deletions.
1 change: 0 additions & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ JavaScriptQuotes: Leave
JavaScriptWrapImports: true
KeepEmptyLinesAtTheStartOfBlocks: true
LambdaBodyIndentation: Signature
Language: Cpp
MacroBlockBegin: ''
MacroBlockEnd: ''
MaxEmptyLinesToKeep: 1
Expand Down
10 changes: 7 additions & 3 deletions src/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,18 @@ void partition_resolver_simple::handle_pending_requests(std::deque<request_conte
host_port partition_resolver_simple::get_host_port(const partition_configuration &pc) const
{
if (_app_is_stateful) {
return pc.hp_primary;
host_port primary;
GET_HOST_PORT(pc, primary, primary);
return primary;
}

if (pc.hp_last_drops.empty()) {
std::vector<host_port> last_drops;
GET_HOST_PORTS(pc, last_drops, last_drops);
if (last_drops.empty()) {
return host_port();
}

return pc.hp_last_drops[rand::next_u32(0, pc.last_drops.size() - 1)];
return last_drops[rand::next_u32(0, last_drops.size() - 1)];
}

error_code partition_resolver_simple::get_host_port(int partition_index, /*out*/ host_port &hp)
Expand Down
56 changes: 40 additions & 16 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,19 @@ dsn::error_code replication_ddl_client::wait_app_ready(const std::string &app_na
int ready_count = 0;
for (int i = 0; i < partition_count; i++) {
const auto &pc = query_resp.partitions[i];
if (pc.hp_primary && (pc.hp_secondaries.size() + 1 >= max_replica_count)) {
ready_count++;
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (!primary) {
continue;
}

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
if (secondaries.size() + 1 < max_replica_count) {
continue;
}

ready_count++;
}
if (ready_count == partition_count) {
std::cout << app_name << " is ready now: (" << ready_count << "/" << partition_count
Expand Down Expand Up @@ -435,11 +445,16 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st
int read_unhealthy = 0;
for (const auto &pc : pcs) {
int replica_count = 0;
if (pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (primary) {
replica_count++;
}
replica_count += pc.hp_secondaries.size();
if (pc.hp_primary) {

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
replica_count += secondaries.size();
if (primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
} else if (replica_count < 2) {
Expand Down Expand Up @@ -573,13 +588,18 @@ dsn::error_code replication_ddl_client::list_nodes(const dsn::replication::node_
}

for (const auto &pc : pcs) {
if (pc.hp_primary) {
auto find = tmp_map.find(pc.hp_primary);
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (primary) {
auto find = tmp_map.find(primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (const auto &secondary : pc.hp_secondaries) {

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
for (const auto &secondary : secondaries) {
auto find = tmp_map.find(secondary);
if (find != tmp_map.end()) {
find->second.secondary_count++;
Expand Down Expand Up @@ -766,14 +786,18 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int read_unhealthy = 0;
for (const auto &pc : pcs) {
int replica_count = 0;
if (pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (primary) {
replica_count++;
node_stat[pc.hp_primary].first++;
node_stat[primary].first++;
total_prim_count++;
}
replica_count += pc.hp_secondaries.size();
total_sec_count += pc.hp_secondaries.size();
if (pc.hp_primary) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
replica_count += secondaries.size();
total_sec_count += secondaries.size();
if (primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
} else if (replica_count < 2) {
Expand All @@ -783,14 +807,14 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
write_unhealthy++;
read_unhealthy++;
}
for (const auto &secondary : pc.hp_secondaries) {
for (const auto &secondary : secondaries) {
node_stat[secondary].second++;
}
tp_details.add_row(pc.pid.get_partition_index());
tp_details.append_data(pc.ballot);
tp_details.append_data(fmt::format("{}/{}", replica_count, pc.max_replica_count));
tp_details.append_data(pc.hp_primary ? pc.hp_primary.to_string() : "-");
tp_details.append_data(fmt::format("[{}]", fmt::join(pc.hp_secondaries, ",")));
tp_details.append_data(primary ? primary.to_string() : "-");
tp_details.append_data(fmt::format("[{}]", fmt::join(secondaries, ",")));
}
mtp.add(std::move(tp_details));

Expand Down
8 changes: 6 additions & 2 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,16 @@ int32_t replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_
rc.learner_signature = invalid_signature;
SET_OBJ_IP_AND_HOST_PORT(rc, primary, pc, primary);

if (node == pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (node == primary) {
rc.status = partition_status::PS_PRIMARY;
return true;
}

if (utils::contains(pc.hp_secondaries, node)) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
if (utils::contains(secondaries, node)) {
rc.status = partition_status::PS_SECONDARY;
return true;
}
Expand Down
30 changes: 23 additions & 7 deletions src/common/replication_other_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "consensus_types.h"
#include "replica_admin_types.h"
#include "common/replication_enums.h"
#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"

Expand Down Expand Up @@ -78,18 +79,33 @@ inline bool is_member(const partition_configuration &pc, const rpc_address &node
inline bool is_partition_config_equal(const partition_configuration &pc1,
const partition_configuration &pc2)
{
// secondaries no need to be same order
for (const auto &pc1_secondary : pc1.hp_secondaries) {
if (pc1.ballot != pc2.ballot || pc1.pid != pc2.pid ||
pc1.max_replica_count != pc2.max_replica_count ||
pc1.last_committed_decree != pc2.last_committed_decree) {
return false;
}

host_port pc1_primary;
GET_HOST_PORT(pc1, primary, pc1_primary);
host_port pc2_primary;
GET_HOST_PORT(pc2, primary, pc2_primary);
if (pc1_primary != pc2_primary) {
return false;
}

// secondaries no need to be in the same order.
std::vector<host_port> pc1_secondaries;
GET_HOST_PORTS(pc1, secondaries, pc1_secondaries);
for (const auto &pc1_secondary : pc1_secondaries) {
if (!is_secondary(pc2, pc1_secondary)) {
return false;
}
}

std::vector<host_port> pc2_secondaries;
GET_HOST_PORTS(pc2, secondaries, pc2_secondaries);
// last_drops is not considered into equality check
return pc1.ballot == pc2.ballot && pc1.pid == pc2.pid &&
pc1.max_replica_count == pc2.max_replica_count && pc1.primary == pc2.primary &&
pc1.hp_primary == pc2.hp_primary && pc1.secondaries.size() == pc2.secondaries.size() &&
pc1.hp_secondaries.size() == pc2.hp_secondaries.size() &&
pc1.last_committed_decree == pc2.last_committed_decree;
return pc1_secondaries.size() == pc2_secondaries.size();
}

class replica_helper
Expand Down
11 changes: 8 additions & 3 deletions src/meta/cluster_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,17 @@ bool cluster_balance_policy::get_app_migration_info(std::shared_ptr<app_state> a
info.partitions.reserve(app->pcs.size());
for (const auto &pc : app->pcs) {
std::map<host_port, partition_status::type> pstatus_map;
pstatus_map[pc.hp_primary] = partition_status::PS_PRIMARY;
if (pc.hp_secondaries.size() != pc.max_replica_count - 1) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
pstatus_map[primary] = partition_status::PS_PRIMARY;

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
if (secondaries.size() != pc.max_replica_count - 1) {
// partition is unhealthy
return false;
}
for (const auto &secondary : pc.hp_secondaries) {
for (const auto &secondary : secondaries) {
pstatus_map[secondary] = partition_status::PS_SECONDARY;
}
info.partitions.push_back(std::move(pstatus_map));
Expand Down
15 changes: 10 additions & 5 deletions src/meta/load_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "meta_admin_types.h"
#include "rpc/dns_resolver.h" // IWYU pragma: keep
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"
#include "utils/command_manager.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
Expand Down Expand Up @@ -172,14 +173,16 @@ generate_balancer_request(const app_mapper &apps,
new_proposal_action(to, to, config_type::CT_UPGRADE_TO_PRIMARY));
result.action_list.emplace_back(new_proposal_action(to, from, config_type::CT_REMOVE));
break;
case balance_type::COPY_SECONDARY:
case balance_type::COPY_SECONDARY: {
ans = "copy_secondary";
result.balance_type = balancer_request_type::copy_secondary;
host_port primary;
GET_HOST_PORT(pc, primary, primary);
result.action_list.emplace_back(
new_proposal_action(pc.hp_primary, to, config_type::CT_ADD_SECONDARY_FOR_LB));
result.action_list.emplace_back(
new_proposal_action(pc.hp_primary, from, config_type::CT_REMOVE));
new_proposal_action(primary, to, config_type::CT_ADD_SECONDARY_FOR_LB));
result.action_list.emplace_back(new_proposal_action(primary, from, config_type::CT_REMOVE));
break;
}
default:
CHECK(false, "");
}
Expand Down Expand Up @@ -566,7 +569,9 @@ void ford_fulkerson::update_decree(int node_id, const node_state &ns)
{
ns.for_each_primary(_app->app_id, [&, this](const gpid &pid) {
const auto &pc = _app->pcs[pid.get_partition_index()];
for (const auto &secondary : pc.hp_secondaries) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
for (const auto &secondary : secondaries) {
auto i = _host_port_id.find(secondary);
CHECK(i != _host_port_id.end(), "invalid secondary: {}", secondary);
_network[node_id][i->second]++;
Expand Down
8 changes: 6 additions & 2 deletions src/meta/meta_bulk_load_ingestion_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ void ingestion_context::partition_node_info::create(const partition_configuratio
{
pid = pc.pid;
std::unordered_set<host_port> current_nodes;
current_nodes.insert(pc.hp_primary);
for (const auto &secondary : pc.hp_secondaries) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
current_nodes.insert(primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
for (const auto &secondary : secondaries) {
current_nodes.insert(secondary);
}
for (const auto &node : current_nodes) {
Expand Down
24 changes: 16 additions & 8 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,9 @@ bool bulk_load_service::check_partition_status(
}

pc = app->pcs[pid.get_partition_index()];
if (!pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary, primary);
if (!primary) {
LOG_WARNING("app({}) partition({}) primary is invalid, try it later", app_name, pid);
tasking::enqueue(
LPC_META_STATE_NORMAL,
Expand All @@ -382,7 +384,9 @@ bool bulk_load_service::check_partition_status(
return false;
}

if (pc.hp_secondaries.size() < pc.max_replica_count - 1) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
if (secondaries.size() < pc.max_replica_count - 1) {
bulk_load_status::type p_status;
{
zauto_read_lock l(_lock);
Expand Down Expand Up @@ -434,7 +438,7 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
const app_bulk_load_info &ainfo = _app_bulk_load_info[pid.get_app_id()];
req->pid = pid;
req->app_name = app_name;
SET_IP_AND_HOST_PORT(*req, primary, pc.primary, pc.hp_primary);
SET_OBJ_IP_AND_HOST_PORT(*req, primary, pc, primary);
req->remote_provider_name = ainfo.file_provider_type;
req->cluster_name = ainfo.cluster_name;
req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid);
Expand Down Expand Up @@ -1201,8 +1205,12 @@ bool bulk_load_service::check_ever_ingestion_succeed(const partition_configurati
}

std::vector<host_port> current_nodes;
current_nodes.emplace_back(pc.hp_primary);
for (const auto &secondary : pc.hp_secondaries) {
dsn::host_port primary;
GET_HOST_PORT(pc, primary, primary);
current_nodes.emplace_back(primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries, secondaries);
for (const auto &secondary : secondaries) {
current_nodes.emplace_back(secondary);
}

Expand Down Expand Up @@ -1270,13 +1278,13 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
return;
}

const auto &primary = pc.hp_primary;
ballot meta_ballot = pc.ballot;
host_port primary;
GET_HOST_PORT(pc, primary, primary);
tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(
&bulk_load_service::send_ingestion_request, this, app_name, pid, primary, meta_ballot),
&bulk_load_service::send_ingestion_request, this, app_name, pid, primary, pc.ballot),
0,
std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
}
Expand Down
6 changes: 4 additions & 2 deletions src/meta/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,11 @@ bool construct_replica(meta_view view, const gpid &pid, int max_replica_count)
// we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the
// only primary dead
// when add node to pc.last_drops, we don't remove it from our cc.drop_list
CHECK(pc.hp_last_drops.empty(), "last_drops of partition({}) must be empty", pid);
std::vector<host_port> last_drops;
GET_HOST_PORTS(pc, last_drops, last_drops);
CHECK(last_drops.empty(), "last_drops of partition({}) must be empty", pid);
for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) {
// hp_last_drops is added in the steps bellow.
if (pc.hp_last_drops.size() + 1 >= max_replica_count) {
break;
}
Expand Down Expand Up @@ -540,7 +543,6 @@ app_state::app_state(const app_info &info) : app_info(info), helpers(new app_sta
for (int i = 0; i != app_info::partition_count; ++i) {
pcs[i].pid.set_partition_index(i);
}

helpers->on_init_partitions();
}

Expand Down
Loading

0 comments on commit 9b91b8a

Please sign in to comment.