Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] add be config brpc_connection_type (backport #42824) #49055

Merged
merged 2 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,14 @@ CONF_mInt32(tablet_meta_checkpoint_min_interval_secs, "600");
CONF_Int64(brpc_max_body_size, "2147483648");
// Max unwritten bytes in each socket, if the limit is reached, Socket.Write fails with EOVERCROWDED.
CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
// brpc connection types, "single", "pooled", "short".
CONF_String_enum(brpc_connection_type, "single", "single,pooled,short");
// If the amount of data to be sent by a single channel of brpc exceeds brpc_socket_max_unwritten_bytes
// it will cause rpc to report an error. We add configuration to ignore rpc overload.
// This may cause process memory usage to rise.
// In the future we need to count the memory on each channel of the rpc. To do application layer rpc flow limiting.
CONF_mBool(brpc_query_ignore_overcrowded, "false");
CONF_mBool(brpc_load_ignore_overcrowded, "true");

// Max number of txns for every txn_partition_map in txn manager.
// this is a self-protection to avoid too many txns saving in manager.
Expand Down
46 changes: 44 additions & 2 deletions be/src/common/configbase.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <iostream>
#include <map>
#include <set>
#include <string>
#include <vector>

Expand Down Expand Up @@ -168,7 +169,7 @@ template <typename T, typename = void>
class FieldImpl;

template <typename T>
class FieldImpl<T> final : public Field {
class FieldImpl<T> : public Field {
public:
FieldImpl(const char* type, const char* name, void* storage, const char* defval, bool valmutable)
: Field(type, name, storage, defval, valmutable) {}
Expand All @@ -180,7 +181,7 @@ class FieldImpl<T> final : public Field {

//// FieldImpl<std::vector<T>>
template <typename T>
class FieldImpl<std::vector<T>> final : public Field {
class FieldImpl<std::vector<T>> : public Field {
public:
FieldImpl(const char* type, const char* name, void* storage, const char* defval, bool valmutable)
: Field(type, name, storage, defval, valmutable) {}
Expand Down Expand Up @@ -224,6 +225,39 @@ class Alias {
}
};

template <typename T>
class EnumField : public FieldImpl<T> {
using Base = FieldImpl<T>;

public:
EnumField(const char* type, const char* name, void* storage, const char* defval, bool valmutable,
std::string enums_)
: FieldImpl<T>(type, name, storage, defval, valmutable), raw_enum_values(std::move(enums_)) {}

bool parse_value(const std::string& valstr) override {
if (enums.empty()) {
std::vector<std::string> parts = strings::Split(raw_enum_values, ",");
for (auto& part : parts) {
StripWhiteSpace(&part);
if (!Base::parse_value(part)) {
return false;
}
auto v = *reinterpret_cast<T*>(Field::_storage);
enums.emplace(std::move(v));
}
}
if (!Base::parse_value(valstr)) {
return false;
}
auto value = *reinterpret_cast<T*>(Field::_storage);
return enums.find(value) != enums.end();
}

private:
std::set<T> enums;
std::string raw_enum_values;
};

#endif // __IN_CONFIGBASE_CPP__

#define DEFINE_FIELD(FIELD_TYPE, FIELD_NAME, FIELD_DEFAULT, VALMUTABLE, TYPE_NAME) \
Expand All @@ -234,6 +268,11 @@ class Alias {

#define DECLARE_FIELD(FIELD_TYPE, FIELD_NAME) extern FIELD_TYPE FIELD_NAME;

#define DEFINE_ENUM_FIELD(FIELD_TYPE, FIELD_NAME, FIELD_DEFAULT, VALMUTABLE, TYPE_NAME, ENUM_SET) \
FIELD_TYPE FIELD_NAME; \
static EnumField<FIELD_TYPE> field_##FIELD_NAME(TYPE_NAME, #FIELD_NAME, &FIELD_NAME, FIELD_DEFAULT, VALMUTABLE, \
ENUM_SET);

#ifdef __IN_CONFIGBASE_CPP__
// NOTE: alias configs must be defined after the true config, otherwise there will be a compile error
#define CONF_Alias(name, alias) DEFINE_ALIAS(name, alias)
Expand All @@ -243,6 +282,8 @@ class Alias {
#define CONF_Int64(name, defaultstr) DEFINE_FIELD(int64_t, name, defaultstr, false, "int64")
#define CONF_Double(name, defaultstr) DEFINE_FIELD(double, name, defaultstr, false, "double")
#define CONF_String(name, defaultstr) DEFINE_FIELD(std::string, name, defaultstr, false, "string")
#define CONF_String_enum(name, defaultstr, enums) \
DEFINE_ENUM_FIELD(std::string, name, defaultstr, false, "string", enums)
#define CONF_Bools(name, defaultstr) DEFINE_FIELD(std::vector<bool>, name, defaultstr, false, "list<bool>")
#define CONF_Int16s(name, defaultstr) DEFINE_FIELD(std::vector<int16_t>, name, defaultstr, false, "list<int16>")
#define CONF_Int32s(name, defaultstr) DEFINE_FIELD(std::vector<int32_t>, name, defaultstr, false, "list<int32>")
Expand All @@ -264,6 +305,7 @@ class Alias {
#define CONF_Int64(name, defaultstr) DECLARE_FIELD(int64_t, name)
#define CONF_Double(name, defaultstr) DECLARE_FIELD(double, name)
#define CONF_String(name, defaultstr) DECLARE_FIELD(std::string, name)
#define CONF_String_enum(name, defaultstr, enums) DECLARE_FIELD(std::string, name)
#define CONF_Bools(name, defaultstr) DECLARE_FIELD(std::vector<bool>, name)
#define CONF_Int16s(name, defaultstr) DECLARE_FIELD(std::vector<int16_t>, name)
#define CONF_Int32s(name, defaultstr) DECLARE_FIELD(std::vector<int32_t>, name)
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/exchange/sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun

closure->cntl.Reset();
closure->cntl.set_timeout_ms(_brpc_timeout_ms);
SET_IGNORE_OVERCROWDED(closure->cntl, query);

Status st;
if (bthread_self()) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/service/brpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,10 @@
#include <butil/strings/string_piece.h>

#include "common/compiler_util.h"
#include "common/config.h"

// ignore brpc overcrowded error
#define SET_IGNORE_OVERCROWDED(ctnl, module) \
if (config::brpc_##module##_ignore_overcrowded) { \
ctnl.ignore_eovercrowded(); \
}
2 changes: 1 addition & 1 deletion be/src/storage/segment_replicate_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void ReplicateChannel::_send_request(SegmentPB* segment, butil::IOBuf& data, boo
_closure->ref();
_closure->reset();
_closure->cntl.set_timeout_ms(_opt->timeout_ms);
_closure->cntl.ignore_eovercrowded();
SET_IGNORE_OVERCROWDED(_closure->cntl, load);

if (segment != nullptr) {
request.set_allocated_segment(segment);
Expand Down
1 change: 1 addition & 0 deletions be/src/util/brpc_stub_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class BrpcStubCache {
// Explicitly set the max_retry
// TODO(meegoo): The retry strategy can be customized in the future
options.max_retry = 3;
options.connection_type = config::brpc_connection_type;
std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
if (channel->Init(endpoint, &options)) {
return nullptr;
Expand Down
12 changes: 12 additions & 0 deletions be/test/common/config_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ TEST_F(ConfigTest, test_init) {
CONF_Strings(cfg_strings, "s1,s2,s3");
CONF_String(cfg_string_env, "prefix/${ConfigTestEnv1}/suffix");
CONF_Bool(cfg_bool_env, "false");
CONF_String_enum(cfg_string_enum, "true", "true,false");
// Invalid config file name
{ EXPECT_FALSE(config::init("/path/to/nonexist/file")); }
// Invalid bool value
Expand Down Expand Up @@ -88,6 +89,14 @@ TEST_F(ConfigTest, test_init) {

EXPECT_FALSE(config::init(ss));
}
// Invalid enum value
{
std::stringstream ss;
ss << R"DEL(
cfg_string_enum = unknown
)DEL";
EXPECT_FALSE(config::init(ss));
}

// Valid input
{
Expand All @@ -111,6 +120,8 @@ TEST_F(ConfigTest, test_init) {
cfg_strings = text1, hello world , StarRocks

cfg_bool_env = ${ConfigTestEnv2}

cfg_string_enum = false
)DEL";

ASSERT_EQ(0, ::setenv("ConfigTestEnv1", "env1_value", 1));
Expand All @@ -137,6 +148,7 @@ TEST_F(ConfigTest, test_init) {
EXPECT_THAT(cfg_strings, ElementsAre("text1", "hello world", "StarRocks"));
EXPECT_EQ("prefix/env1_value/suffix", cfg_string_env);
EXPECT_EQ(true, cfg_bool_env);
EXPECT_EQ("false", cfg_string_enum);
}

TEST_F(ConfigTest, test_invalid_default_value) {
Expand Down
Loading