Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(meta): fix null pointer while deleting environment variables after table was dropped #2170

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 93 additions & 40 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/

// IWYU pragma: no_include <boost/detail/basic_pointerbuf.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/lexical_cast.hpp>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
Expand All @@ -33,6 +34,7 @@
#include <chrono>
#include <cstdint>
#include <cstring>
#include <iterator>
#include <set>
#include <sstream> // IWYU pragma: keep
#include <string>
Expand Down Expand Up @@ -2986,17 +2988,17 @@ void server_state::do_update_app_info(const std::string &app_path,

void server_state::set_app_envs(const app_env_rpc &env_rpc)
{
const configuration_update_app_env_request &request = env_rpc.request();
const auto &request = env_rpc.request();
if (!request.__isset.keys || !request.__isset.values ||
request.keys.size() != request.values.size() || request.keys.size() <= 0) {
request.keys.size() != request.values.size() || request.keys.empty()) {
env_rpc.response().err = ERR_INVALID_PARAMETERS;
LOG_WARNING("set app envs failed with invalid request");
return;
}

const std::vector<std::string> &keys = request.keys;
const std::vector<std::string> &values = request.values;
const std::string &app_name = request.app_name;
const auto &keys = request.keys;
const auto &values = request.values;
const auto &app_name = request.app_name;

std::ostringstream os;
for (size_t i = 0; i < keys.size(); ++i) {
Expand Down Expand Up @@ -3078,7 +3080,7 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc)
}
});

std::shared_ptr<app_state> app = get_app(app_name);
auto app = get_app(app_name);

// The table might be removed just before the callback function is invoked, thus we must
// check if this table still exists.
Expand All @@ -3095,6 +3097,8 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc)
return;
}

env_rpc.response().err = ERR_OK;

const auto &old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');

// Update envs of local memory.
Expand All @@ -3109,70 +3113,119 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc)

void server_state::del_app_envs(const app_env_rpc &env_rpc)
{
const configuration_update_app_env_request &request = env_rpc.request();
if (!request.__isset.keys || request.keys.size() <= 0) {
const auto &request = env_rpc.request();
if (!request.__isset.keys || request.keys.empty()) {
env_rpc.response().err = ERR_INVALID_PARAMETERS;
LOG_WARNING("del app envs failed with invalid request");
return;
}
const std::vector<std::string> &keys = request.keys;
const std::string &app_name = request.app_name;

std::ostringstream os;
for (int i = 0; i < keys.size(); i++) {
if (i != 0)
os << ",";
os << keys[i];
}
const auto &keys = request.keys;
const auto &app_name = request.app_name;

LOG_INFO("del app envs for app({}) from remote({}): keys = {}",
app_name,
env_rpc.remote_address(),
os.str());
boost::join(keys, ","));

app_info ainfo;
std::string app_path;
{
FAIL_POINT_INJECT_NOT_RETURN_F("del_app_envs_failed", [app_name, this](std::string_view s) {
zauto_write_lock l(_lock);

if (s == "not_found") {
CHECK_EQ(_exist_apps.erase(app_name), 1);
return;
}

if (s == "dropping") {
gutil::FindOrDie(_exist_apps, app_name)->status = app_status::AS_DROPPING;
return;
}
});

zauto_read_lock l(_lock);
std::shared_ptr<app_state> app = get_app(app_name);
if (app == nullptr) {
LOG_WARNING("del app envs failed with invalid app_name({})", app_name);
env_rpc.response().err = ERR_INVALID_PARAMETERS;
env_rpc.response().hint_message = "invalid app name";

const auto &app = get_app(app_name);
if (!app) {
LOG_WARNING("del app envs failed since app_name({}) cannot be found", app_name);
env_rpc.response().err = ERR_APP_NOT_EXIST;
env_rpc.response().hint_message = "app cannot be found";
return;
} else {
ainfo = *(reinterpret_cast<app_info *>(app.get()));
app_path = get_app_path(*app);
}

if (app->status == app_status::AS_DROPPING) {
LOG_WARNING("del app envs failed since app(name={}, id={}) is being dropped",
app_name,
app->app_id);
env_rpc.response().err = ERR_BUSY_DROPPING;
env_rpc.response().hint_message = "app is being dropped";
return;
}

ainfo = *app;
app_path = get_app_path(*app);
}

std::ostringstream oss;
oss << "deleted keys:";
int deleted = 0;
std::string deleted_keys_info("deleted keys:");
size_t deleted_count = 0;
for (const auto &key : keys) {
if (ainfo.envs.erase(key) > 0) {
oss << std::endl << " " << key;
deleted++;
if (ainfo.envs.erase(key) == 0) {
continue;
}

fmt::format_to(std::back_inserter(deleted_keys_info), "\n {}", key);
++deleted_count;
}

if (deleted == 0) {
LOG_INFO("no key need to delete");
env_rpc.response().hint_message = "no key need to delete";
if (deleted_count == 0) {
LOG_INFO("no key needs to be deleted for app({})", app_name);
env_rpc.response().err = ERR_OK;
env_rpc.response().hint_message = "no key needs to be deleted";
return;
} else {
env_rpc.response().hint_message = oss.str();
}

env_rpc.response().hint_message = std::move(deleted_keys_info);

do_update_app_info(app_path, ainfo, [this, app_name, keys, env_rpc](error_code ec) {
CHECK_EQ_MSG(ec, ERR_OK, "update app info to remote storage failed");
CHECK_EQ_MSG(ec, ERR_OK, "update app({}) info to remote storage failed", app_name);

zauto_write_lock l(_lock);
std::shared_ptr<app_state> app = get_app(app_name);
std::string old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');

FAIL_POINT_INJECT_NOT_RETURN_F("del_app_envs_failed", [app_name, this](std::string_view s) {
if (s == "dropped_after") {
CHECK_EQ(_exist_apps.erase(app_name), 1);
return;
}
});

auto app = get_app(app_name);

// The table might be removed just before the callback function is invoked, thus we must
// check if this table still exists.
//
// TODO(wangdan): should make updates to remote storage sequential by supporting atomic
// set, otherwise update might be missing. For example, an update is setting the envs
// while another is dropping a table. The update setting the envs does not contain the
// dropped state. Once it is applied by remote storage after another update dropping
// the table, the state of the table would always be non-dropped on remote storage.
if (!app) {
LOG_ERROR("del app envs failed since app({}) has just been dropped", app_name);
env_rpc.response().err = ERR_APP_DROPPED;
env_rpc.response().hint_message = "app has just been dropped";
return;
}

env_rpc.response().err = ERR_OK;

const auto &old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');

for (const auto &key : keys) {
app->envs.erase(key);
}
std::string new_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');

const auto &new_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');
LOG_INFO("app envs changed: old_envs = {}, new_envs = {}", old_envs, new_envs);
});
}
Expand Down
94 changes: 69 additions & 25 deletions src/meta/test/server_state_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,21 @@ class server_state_test
return rpc;
}

void test_set_app_envs(const std::string &app_name,
const std::vector<std::string> &env_keys,
const std::vector<std::string> &env_vals,
const error_code expected_err)
{
configuration_update_app_env_request request;
request.__set_app_name(app_name);
request.__set_op(app_env_operation::type::APP_ENV_OP_SET);
request.__set_keys(env_keys);
request.__set_values(env_vals);

auto rpc = set_app_envs(request);
ASSERT_EQ(expected_err, rpc.response().err);
}

app_env_rpc del_app_envs(const configuration_update_app_env_request &request)
{
auto rpc = create_app_env_rpc(request);
Expand All @@ -141,6 +156,19 @@ class server_state_test
return rpc;
}

void test_del_app_envs(const std::string &app_name,
const std::vector<std::string> &env_keys,
const error_code expected_err)
{
configuration_update_app_env_request request;
request.__set_app_name(app_name);
request.__set_op(app_env_operation::type::APP_ENV_OP_DEL);
request.__set_keys(env_keys);

auto rpc = del_app_envs(request);
ASSERT_EQ(expected_err, rpc.response().err);
}

app_env_rpc clear_app_envs(const configuration_update_app_env_request &request)
{
auto rpc = create_app_env_rpc(request);
Expand Down Expand Up @@ -220,22 +248,21 @@ void meta_service_test_app::app_envs_basic_test()
test.load_apps({"test_app1",
"test_set_app_envs_not_found",
"test_set_app_envs_dropping",
"test_set_app_envs_dropped_after"});
"test_set_app_envs_dropped_after",
"test_del_app_envs_not_found",
"test_del_app_envs_dropping",
"test_del_app_envs_dropped_after"});

#define TEST_SET_APP_ENVS_FAILED(action, err_code) \
std::cout << "test server_state::set_app_envs(" #action ")..." << std::endl; \
do { \
configuration_update_app_env_request request; \
request.__set_app_name("test_set_app_envs_" #action); \
request.__set_op(app_env_operation::type::APP_ENV_OP_SET); \
request.__set_keys({replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}); \
request.__set_values({"67108864"}); \
\
fail::setup(); \
fail::cfg("set_app_envs_failed", "void(" #action ")"); \
\
auto rpc = test.set_app_envs(request); \
ASSERT_EQ(err_code, rpc.response().err); \
test.test_set_app_envs("test_set_app_envs_" #action, \
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}, \
{"67108864"}, \
err_code); \
\
fail::teardown(); \
} while (0)
Expand All @@ -255,14 +282,7 @@ void meta_service_test_app::app_envs_basic_test()
// Normal case for setting envs.
std::cout << "test server_state::set_app_envs(success)..." << std::endl;
{
configuration_update_app_env_request request;
request.__set_app_name("test_app1");
request.__set_op(app_env_operation::type::APP_ENV_OP_SET);
request.__set_keys(keys);
request.__set_values(values);

auto rpc = test.set_app_envs(request);
ASSERT_EQ(ERR_OK, rpc.response().err);
test.test_set_app_envs("test_app1", keys, values, ERR_OK);

const auto &app = test.get_app("test_app1");
ASSERT_TRUE(app);
Expand All @@ -276,15 +296,39 @@ void meta_service_test_app::app_envs_basic_test()
}
}

std::cout << "test server_state::del_app_envs()..." << std::endl;
{
configuration_update_app_env_request request;
request.__set_app_name("test_app1");
request.__set_op(app_env_operation::type::APP_ENV_OP_DEL);
request.__set_keys(del_keys);
#define TEST_DEL_APP_ENVS_FAILED(action, err_code) \
std::cout << "test server_state::del_app_envs(" #action ")..." << std::endl; \
do { \
test.test_set_app_envs("test_del_app_envs_" #action, \
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}, \
{"67108864"}, \
ERR_OK); \
\
fail::setup(); \
fail::cfg("del_app_envs_failed", "void(" #action ")"); \
\
test.test_del_app_envs( \
"test_del_app_envs_" #action, {replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}, err_code); \
\
fail::teardown(); \
} while (0)

// Failed to deleting envs while table was not found.
TEST_DEL_APP_ENVS_FAILED(not_found, ERR_APP_NOT_EXIST);

// Failed to deleting envs while table was being dropped as the intermediate state.
TEST_DEL_APP_ENVS_FAILED(dropping, ERR_BUSY_DROPPING);

// The table was found dropped after the new envs had been persistent on the remote
// meta storage.
TEST_DEL_APP_ENVS_FAILED(dropped_after, ERR_APP_DROPPED);

auto rpc = test.del_app_envs(request);
ASSERT_EQ(ERR_OK, rpc.response().err);
#undef TEST_DEL_APP_ENVS_FAILED

// Normal case for deleting envs.
std::cout << "test server_state::del_app_envs(success)..." << std::endl;
{
test.test_del_app_envs("test_app1", del_keys, ERR_OK);

const auto &app = test.get_app("test_app1");
ASSERT_TRUE(app);
Expand Down
Loading