Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(duplication): add a new RPC to list duplications for one or multiple tables with specified pattern #2164

Merged
merged 14 commits into from
Dec 6, 2024
2 changes: 1 addition & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
CheckOptions: []
# Disable some checks that are not useful for us now.
# They are sorted by names, and should be consistent to build_tools/clang_tidy.py.
Checks: 'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines-macro-usage,-cppcoreguidelines-non-private-member-variables-in-classes,-cppcoreguidelines-owning-memory,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-cppcoreguidelines-pro-bounds-pointer-arithmetic,-cppcoreguidelines-pro-type-const-cast,-cppcoreguidelines-pro-type-union-access,-fuchsia-default-arguments-calls,-fuchsia-overloaded-operator,-fuchsia-statically-constructed-objects,-google-readability-avoid-underscore-in-googletest-name,-hicpp-avoid-c-arrays,-hicpp-named-parameter,-hicpp-no-array-decay,-llvm-include-order,-misc-definitions-in-headers,-misc-non-private-member-variables-in-classes,-modernize-avoid-c-arrays,-modernize-replace-disallow-copy-and-assign-macro,-modernize-use-trailing-return-type,-performance-unnecessary-value-param,-readability-function-cognitive-complexity,-readability-identifier-length,-readability-magic-numbers,-readability-named-parameter'
Checks: 'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-bugprone-easily-swappable-parameters,-bugprone-lambda-function-name,-bugprone-macro-parentheses,-cert-err58-cpp,-concurrency-mt-unsafe,-cppcoreguidelines-avoid-c-arrays,-cppcoreguidelines-avoid-magic-numbers,-cppcoreguidelines-avoid-non-const-global-variables,-cppcoreguidelines-macro-usage,-cppcoreguidelines-non-private-member-variables-in-classes,-cppcoreguidelines-owning-memory,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-cppcoreguidelines-pro-bounds-pointer-arithmetic,-cppcoreguidelines-pro-type-const-cast,-cppcoreguidelines-pro-type-union-access,-fuchsia-default-arguments-calls,-fuchsia-overloaded-operator,-fuchsia-statically-constructed-objects,-google-readability-avoid-underscore-in-googletest-name,-hicpp-avoid-c-arrays,-hicpp-named-parameter,-hicpp-no-array-decay,-llvm-include-order,-misc-definitions-in-headers,-misc-non-private-member-variables-in-classes,-misc-unused-parameters,-modernize-avoid-c-arrays,-modernize-replace-disallow-copy-and-assign-macro,-modernize-use-trailing-return-type,-performance-unnecessary-value-param,-readability-function-cognitive-complexity,-readability-identifier-length,-readability-magic-numbers,-readability-named-parameter'
ExtraArgs:
ExtraArgsBefore: []
FormatStyle: none
Expand Down
1 change: 1 addition & 0 deletions build_tools/clang_tidy.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def tidy_on_path(path):
"-llvm-include-order,"
"-misc-definitions-in-headers,"
"-misc-non-private-member-variables-in-classes,"
"-misc-unused-parameters,"
"-modernize-avoid-c-arrays,"
"-modernize-replace-disallow-copy-and-assign-macro,"
"-modernize-use-trailing-return-type,"
Expand Down
28 changes: 28 additions & 0 deletions idl/duplication.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

include "dsn.thrift"
include "dsn.layer2.thrift"
include "utils.thrift"

namespace cpp dsn.replication
namespace go admin
Expand Down Expand Up @@ -179,6 +180,15 @@ struct duplication_entry
11:optional map<i32, duplication_partition_state> partition_states;
}

// States for the duplications of a table.
struct duplication_app_state
{
1:i32 appid;

// dup id => per-duplication properties
2:map<i32, duplication_entry> duplications;
}

// This request is sent from client to meta.
struct duplication_query_request
{
Expand Down Expand Up @@ -234,3 +244,21 @@ struct duplication_sync_response
// this rpc will not return the apps that were not assigned duplication.
2:map<i32, map<i32, duplication_entry>> dup_map;
}

// This request is sent from client to meta server, to list duplications with their
// per-duplication info and progress of each partition for one or multiple tables.
struct duplication_list_request
{
// The pattern used to match an app name, whose type is specified by `match_type`.
1:string app_name_pattern;
2:utils.pattern_match_type match_type;
}

struct duplication_list_response
{
1:dsn.error_code err;
2:string hint_message;

// app name => duplications owned by an app
3:map<string, duplication_app_state> app_states;
}
2 changes: 2 additions & 0 deletions idl/meta_admin.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ service admin_client

duplication.duplication_modify_response modify_duplication(1: duplication.duplication_modify_request req);

duplication.duplication_list_response list_duplication(1: duplication.duplication_list_request req);

query_app_info_response query_app_info(1: query_app_info_request req);

configuration_update_app_env_response update_app_env(1: configuration_update_app_env_request req);
Expand Down
44 changes: 44 additions & 0 deletions idl/utils.thrift
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

namespace cpp dsn.utils
namespace go utils
namespace java org.apache.pegasus.utils

// How an string matches to a given pattern.
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
enum pattern_match_type
{
PMT_INVALID = 0,

// The string always matches no matter what the given pattern is.
PMT_MATCH_ALL,

// The string must exactly equal to the given pattern.
PMT_MATCH_EXACT,

// The string must appear anywhere in the given pattern.
PMT_MATCH_ANYWHERE,

// The string must start with the given pattern.
PMT_MATCH_PREFIX,

// The string must end with the given pattern.
PMT_MATCH_POSTFIX,

// The string must match the given pattern as a regular expression.
PMT_MATCH_REGEX,
}
10 changes: 10 additions & 0 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,16 @@ replication_ddl_client::query_dup(const std::string &app_name)
return call_rpc_sync(duplication_query_rpc(std::move(req), RPC_CM_QUERY_DUPLICATION));
}

error_with<duplication_list_response>
replication_ddl_client::list_dups(const std::string &app_name_pattern,
utils::pattern_match_type::type match_type)
{
auto req = std::make_unique<duplication_list_request>();
req->app_name_pattern = app_name_pattern;
req->match_type = match_type;
return call_rpc_sync(duplication_list_rpc(std::move(req), RPC_CM_LIST_DUPLICATION));
}

namespace {

bool need_retry(uint32_t attempt_count, const dsn::error_code &err)
Expand Down
4 changes: 4 additions & 0 deletions src/client/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
#include "utils_types.h"

DSN_DECLARE_uint32(ddl_client_max_attempt_count);
DSN_DECLARE_uint32(ddl_client_retry_interval_ms);
Expand Down Expand Up @@ -154,6 +155,9 @@ class replication_ddl_client

error_with<duplication_query_response> query_dup(const std::string &app_name);

error_with<duplication_list_response> list_dups(const std::string &app_name_pattern,
utils::pattern_match_type::type match_type);

dsn::error_code do_restore(const std::string &backup_provider_name,
const std::string &cluster_name,
const std::string &policy_name,
Expand Down
6 changes: 6 additions & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@

set(MY_PROJ_NAME dsn_replication_common)

thrift_generate_cpp(
METADATA_THRIFT_SRCS
METADATA_THRIFT_HDRS
${PROJECT_ROOT}/idl/utils.thrift
)

thrift_generate_cpp(
METADATA_THRIFT_SRCS
METADATA_THRIFT_HDRS
Expand Down
17 changes: 8 additions & 9 deletions src/common/duplication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@

DSN_DECLARE_uint32(duplicate_log_batch_bytes);

namespace dsn {
namespace replication {
namespace dsn::replication {

typedef rpc_holder<duplication_modify_request, duplication_modify_response> duplication_modify_rpc;
typedef rpc_holder<duplication_add_request, duplication_add_response> duplication_add_rpc;
typedef rpc_holder<duplication_query_request, duplication_query_response> duplication_query_rpc;
typedef rpc_holder<duplication_sync_request, duplication_sync_response> duplication_sync_rpc;
using duplication_modify_rpc = rpc_holder<duplication_modify_request, duplication_modify_response>;
using duplication_add_rpc = rpc_holder<duplication_add_request, duplication_add_response>;
using duplication_query_rpc = rpc_holder<duplication_query_request, duplication_query_response>;
using duplication_sync_rpc = rpc_holder<duplication_sync_request, duplication_sync_response>;
using duplication_list_rpc = rpc_holder<duplication_list_request, duplication_list_response>;

typedef int32_t dupid_t;
using dupid_t = int32_t;

extern const char *duplication_status_to_string(duplication_status::type status);

Expand Down Expand Up @@ -92,5 +92,4 @@ struct duplication_constants
USER_DEFINED_ENUM_FORMATTER(duplication_fail_mode::type)
USER_DEFINED_ENUM_FORMATTER(duplication_status::type)

} // namespace replication
} // namespace dsn
} // namespace dsn::replication
1 change: 1 addition & 0 deletions src/common/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_ADD_DUPLICATION, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_MODIFY_DUPLICATION, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_DUPLICATION, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_DUPLICATION_SYNC, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_LIST_DUPLICATION, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_UPDATE_APP_ENV, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_DDD_DIAGNOSE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_PARTITION_SPLIT, TASK_PRIORITY_COMMON)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/duplication/duplication_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ void duplication_info::append_as_entry(std::vector<duplication_entry> &entry_lis
{
zauto_read_lock l(_lock);

entry_list.emplace_back(to_duplication_level_entry());
entry_list.emplace_back(to_partition_level_entry_for_list());
}

} // namespace dsn::replication
67 changes: 59 additions & 8 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,69 @@ void meta_duplication_service::query_duplication_info(const duplication_query_re
LOG_INFO("query duplication info for app: {}", request.app_name);

response.err = ERR_OK;
{
zauto_read_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(request.app_name);
if (!app || app->status != app_status::AS_AVAILABLE) {
response.err = ERR_APP_NOT_EXIST;

zauto_read_lock l(app_lock());

std::shared_ptr<app_state> app = _state->get_app(request.app_name);
if (!app || app->status != app_status::AS_AVAILABLE) {
response.err = ERR_APP_NOT_EXIST;
return;
}

response.appid = app->app_id;
for (const auto &[_, dup] : app->duplications) {
dup->append_as_entry(response.entry_list);
}
}

// ThreadPool(READ): THREAD_POOL_META_SERVER
void meta_duplication_service::list_duplication_info(const duplication_list_request &request,
duplication_list_response &response)
{
LOG_INFO("list duplication info: app_name_pattern={}, match_type={}",
request.app_name_pattern,
enum_to_string(request.match_type));

response.err = ERR_OK;

zauto_read_lock l(app_lock());

for (const auto &[app_name, app] : _state->_exist_apps) {
if (app->status != app_status::AS_AVAILABLE) {
// Unavailable tables would not be listed for duplications.
continue;
}

const auto &err =
utils::pattern_match(app_name, request.app_name_pattern, request.match_type);
if (err == ERR_NOT_MATCHED) {
continue;
}

if (err == ERR_NOT_IMPLEMENTED) {
const auto &msg = fmt::format("match_type {} is not supported now",
static_cast<int>(request.match_type));
response.err = err;
response.hint_message = msg;

LOG_ERROR("{}: app_name_pattern={}", msg, request.app_name_pattern);

return;
}

if (err != ERR_OK) {
response.err = err;
return;
}

response.appid = app->app_id;
for (const auto &[_, dup] : app->duplications) {
dup->append_as_entry(response.entry_list);
duplication_app_state dup_app;
dup_app.appid = app->app_id;

for (const auto &[dup_id, dup] : app->duplications) {
dup_app.duplications.emplace(dup_id, dup->to_partition_level_entry_for_list());
}

response.app_states.emplace(app_name, dup_app);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/meta/duplication/meta_duplication_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class zrwlock_nr;
namespace replication {
class configuration_create_app_response;
class duplication_confirm_entry;
class duplication_list_request;
class duplication_list_response;
class duplication_query_request;
class duplication_query_response;
class meta_service;
Expand Down Expand Up @@ -69,8 +71,12 @@ class meta_duplication_service

/// See replication.thrift for possible errors for each rpc.

// Query duplications for one table.
void query_duplication_info(const duplication_query_request &, duplication_query_response &);

// List duplications for one or multiple tables.
void list_duplication_info(const duplication_list_request &, duplication_list_response &);

void add_duplication(duplication_add_rpc rpc);

void modify_duplication(duplication_modify_rpc rpc);
Expand Down
16 changes: 16 additions & 0 deletions src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,20 @@ void meta_service::on_duplication_sync(duplication_sync_rpc rpc)
server_state::sStateHash);
}

void meta_service::on_list_duplication_info(duplication_list_rpc rpc)
{
if (!check_status_and_authz(rpc)) {
return;
}

if (!_dup_svc) {
rpc.response().err = ERR_SERVICE_NOT_ACTIVE;
return;
}

_dup_svc->list_duplication_info(rpc.request(), rpc.response());
}

void meta_service::recover_duplication_from_meta_state()
{
if (_dup_svc) {
Expand All @@ -1056,6 +1070,8 @@ void meta_service::register_duplication_rpc_handlers()
&meta_service::on_query_duplication_info);
register_rpc_handler_with_rpc_holder(
RPC_CM_DUPLICATION_SYNC, "sync duplication", &meta_service::on_duplication_sync);
register_rpc_handler_with_rpc_holder(
RPC_CM_LIST_DUPLICATION, "list_duplication_info", &meta_service::on_list_duplication_info);
}

void meta_service::initialize_duplication_service()
Expand Down
1 change: 1 addition & 0 deletions src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class meta_service : public serverlet<meta_service>
void on_modify_duplication(duplication_modify_rpc rpc);
void on_query_duplication_info(duplication_query_rpc rpc);
void on_duplication_sync(duplication_sync_rpc rpc);
void on_list_duplication_info(duplication_list_rpc rpc);
void register_duplication_rpc_handlers();
void recover_duplication_from_meta_state();
void initialize_duplication_service();
Expand Down
Loading
Loading