Skip to content

Commit

Permalink
[Feature] add be config brpc_connection_type (backport #42824) (#49055)
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Jul 29, 2024
1 parent 9b2e58b commit 3240a8e
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 3 deletions.
8 changes: 8 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,14 @@ CONF_mInt32(tablet_meta_checkpoint_min_interval_secs, "600");
CONF_Int64(brpc_max_body_size, "2147483648");
// Max unwritten bytes in each socket, if the limit is reached, Socket.Write fails with EOVERCROWDED.
CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
// brpc connection types, "single", "pooled", "short".
CONF_String_enum(brpc_connection_type, "single", "single,pooled,short");
// If the amount of data to be sent by a single channel of brpc exceeds brpc_socket_max_unwritten_bytes
// it will cause rpc to report an error. We add configuration to ignore rpc overload.
// This may cause process memory usage to rise.
// In the future we need to count the memory on each channel of the rpc. To do application layer rpc flow limiting.
CONF_mBool(brpc_query_ignore_overcrowded, "false");
CONF_mBool(brpc_load_ignore_overcrowded, "true");

// Max number of txns for every txn_partition_map in txn manager.
// this is a self-protection to avoid too many txns saving in manager.
Expand Down
46 changes: 44 additions & 2 deletions be/src/common/configbase.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <iostream>
#include <map>
#include <set>
#include <string>
#include <vector>

Expand Down Expand Up @@ -168,7 +169,7 @@ template <typename T, typename = void>
class FieldImpl;

template <typename T>
class FieldImpl<T> final : public Field {
class FieldImpl<T> : public Field {
public:
FieldImpl(const char* type, const char* name, void* storage, const char* defval, bool valmutable)
: Field(type, name, storage, defval, valmutable) {}
Expand All @@ -180,7 +181,7 @@ class FieldImpl<T> final : public Field {

//// FieldImpl<std::vector<T>>
template <typename T>
class FieldImpl<std::vector<T>> final : public Field {
class FieldImpl<std::vector<T>> : public Field {
public:
FieldImpl(const char* type, const char* name, void* storage, const char* defval, bool valmutable)
: Field(type, name, storage, defval, valmutable) {}
Expand Down Expand Up @@ -224,6 +225,39 @@ class Alias {
}
};

template <typename T>
class EnumField : public FieldImpl<T> {
using Base = FieldImpl<T>;

public:
EnumField(const char* type, const char* name, void* storage, const char* defval, bool valmutable,
std::string enums_)
: FieldImpl<T>(type, name, storage, defval, valmutable), raw_enum_values(std::move(enums_)) {}

bool parse_value(const std::string& valstr) override {
if (enums.empty()) {
std::vector<std::string> parts = strings::Split(raw_enum_values, ",");
for (auto& part : parts) {
StripWhiteSpace(&part);
if (!Base::parse_value(part)) {
return false;
}
auto v = *reinterpret_cast<T*>(Field::_storage);
enums.emplace(std::move(v));
}
}
if (!Base::parse_value(valstr)) {
return false;
}
auto value = *reinterpret_cast<T*>(Field::_storage);
return enums.find(value) != enums.end();
}

private:
std::set<T> enums;
std::string raw_enum_values;
};

#endif // __IN_CONFIGBASE_CPP__

#define DEFINE_FIELD(FIELD_TYPE, FIELD_NAME, FIELD_DEFAULT, VALMUTABLE, TYPE_NAME) \
Expand All @@ -234,6 +268,11 @@ class Alias {

#define DECLARE_FIELD(FIELD_TYPE, FIELD_NAME) extern FIELD_TYPE FIELD_NAME;

#define DEFINE_ENUM_FIELD(FIELD_TYPE, FIELD_NAME, FIELD_DEFAULT, VALMUTABLE, TYPE_NAME, ENUM_SET) \
FIELD_TYPE FIELD_NAME; \
static EnumField<FIELD_TYPE> field_##FIELD_NAME(TYPE_NAME, #FIELD_NAME, &FIELD_NAME, FIELD_DEFAULT, VALMUTABLE, \
ENUM_SET);

#ifdef __IN_CONFIGBASE_CPP__
// NOTE: alias configs must be defined after the true config, otherwise there will be a compile error
#define CONF_Alias(name, alias) DEFINE_ALIAS(name, alias)
Expand All @@ -243,6 +282,8 @@ class Alias {
#define CONF_Int64(name, defaultstr) DEFINE_FIELD(int64_t, name, defaultstr, false, "int64")
#define CONF_Double(name, defaultstr) DEFINE_FIELD(double, name, defaultstr, false, "double")
#define CONF_String(name, defaultstr) DEFINE_FIELD(std::string, name, defaultstr, false, "string")
#define CONF_String_enum(name, defaultstr, enums) \
DEFINE_ENUM_FIELD(std::string, name, defaultstr, false, "string", enums)
#define CONF_Bools(name, defaultstr) DEFINE_FIELD(std::vector<bool>, name, defaultstr, false, "list<bool>")
#define CONF_Int16s(name, defaultstr) DEFINE_FIELD(std::vector<int16_t>, name, defaultstr, false, "list<int16>")
#define CONF_Int32s(name, defaultstr) DEFINE_FIELD(std::vector<int32_t>, name, defaultstr, false, "list<int32>")
Expand All @@ -264,6 +305,7 @@ class Alias {
#define CONF_Int64(name, defaultstr) DECLARE_FIELD(int64_t, name)
#define CONF_Double(name, defaultstr) DECLARE_FIELD(double, name)
#define CONF_String(name, defaultstr) DECLARE_FIELD(std::string, name)
#define CONF_String_enum(name, defaultstr, enums) DECLARE_FIELD(std::string, name)
#define CONF_Bools(name, defaultstr) DECLARE_FIELD(std::vector<bool>, name)
#define CONF_Int16s(name, defaultstr) DECLARE_FIELD(std::vector<int16_t>, name)
#define CONF_Int32s(name, defaultstr) DECLARE_FIELD(std::vector<int32_t>, name)
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/exchange/sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun

closure->cntl.Reset();
closure->cntl.set_timeout_ms(_brpc_timeout_ms);
SET_IGNORE_OVERCROWDED(closure->cntl, query);

Status st;
if (bthread_self()) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/service/brpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,10 @@
#include <butil/strings/string_piece.h>

#include "common/compiler_util.h"
#include "common/config.h"

// ignore brpc overcrowded error
#define SET_IGNORE_OVERCROWDED(ctnl, module) \
if (config::brpc_##module##_ignore_overcrowded) { \
ctnl.ignore_eovercrowded(); \
}
2 changes: 1 addition & 1 deletion be/src/storage/segment_replicate_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void ReplicateChannel::_send_request(SegmentPB* segment, butil::IOBuf& data, boo
_closure->ref();
_closure->reset();
_closure->cntl.set_timeout_ms(_opt->timeout_ms);
_closure->cntl.ignore_eovercrowded();
SET_IGNORE_OVERCROWDED(_closure->cntl, load);

if (segment != nullptr) {
request.set_allocated_segment(segment);
Expand Down
1 change: 1 addition & 0 deletions be/src/util/brpc_stub_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class BrpcStubCache {
// Explicitly set the max_retry
// TODO(meegoo): The retry strategy can be customized in the future
options.max_retry = 3;
options.connection_type = config::brpc_connection_type;
std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
if (channel->Init(endpoint, &options)) {
return nullptr;
Expand Down
12 changes: 12 additions & 0 deletions be/test/common/config_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ TEST_F(ConfigTest, test_init) {
CONF_Strings(cfg_strings, "s1,s2,s3");
CONF_String(cfg_string_env, "prefix/${ConfigTestEnv1}/suffix");
CONF_Bool(cfg_bool_env, "false");
CONF_String_enum(cfg_string_enum, "true", "true,false");
// Invalid config file name
{ EXPECT_FALSE(config::init("/path/to/nonexist/file")); }
// Invalid bool value
Expand Down Expand Up @@ -88,6 +89,14 @@ TEST_F(ConfigTest, test_init) {

EXPECT_FALSE(config::init(ss));
}
// Invalid enum value
{
std::stringstream ss;
ss << R"DEL(
cfg_string_enum = unknown
)DEL";
EXPECT_FALSE(config::init(ss));
}

// Valid input
{
Expand All @@ -111,6 +120,8 @@ TEST_F(ConfigTest, test_init) {
cfg_strings = text1, hello world , StarRocks
cfg_bool_env = ${ConfigTestEnv2}
cfg_string_enum = false
)DEL";

ASSERT_EQ(0, ::setenv("ConfigTestEnv1", "env1_value", 1));
Expand All @@ -137,6 +148,7 @@ TEST_F(ConfigTest, test_init) {
EXPECT_THAT(cfg_strings, ElementsAre("text1", "hello world", "StarRocks"));
EXPECT_EQ("prefix/env1_value/suffix", cfg_string_env);
EXPECT_EQ(true, cfg_bool_env);
EXPECT_EQ("false", cfg_string_enum);
}

TEST_F(ConfigTest, test_invalid_default_value) {
Expand Down

0 comments on commit 3240a8e

Please sign in to comment.