Skip to content

Commit

Permalink
fix(meta): fix null pointer while deleting environment variables afte…
Browse files Browse the repository at this point in the history
…r table was dropped (#2170)

#2149.

Previously we've fixed the problem that meta server failed due to null pointer while
deleting environment variables locally immediately after a table was dropped in
#2148. There's the same problem
while deleting environment variables.
  • Loading branch information
empiredan authored Dec 17, 2024
1 parent a65d1f4 commit 7ceef5d
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 65 deletions.
133 changes: 93 additions & 40 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/

// IWYU pragma: no_include <boost/detail/basic_pointerbuf.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/lexical_cast.hpp>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
Expand All @@ -33,6 +34,7 @@
#include <chrono>
#include <cstdint>
#include <cstring>
#include <iterator>
#include <set>
#include <sstream> // IWYU pragma: keep
#include <string>
Expand Down Expand Up @@ -2986,17 +2988,17 @@ void server_state::do_update_app_info(const std::string &app_path,

void server_state::set_app_envs(const app_env_rpc &env_rpc)
{
const configuration_update_app_env_request &request = env_rpc.request();
const auto &request = env_rpc.request();
if (!request.__isset.keys || !request.__isset.values ||
request.keys.size() != request.values.size() || request.keys.size() <= 0) {
request.keys.size() != request.values.size() || request.keys.empty()) {
env_rpc.response().err = ERR_INVALID_PARAMETERS;
LOG_WARNING("set app envs failed with invalid request");
return;
}

const std::vector<std::string> &keys = request.keys;
const std::vector<std::string> &values = request.values;
const std::string &app_name = request.app_name;
const auto &keys = request.keys;
const auto &values = request.values;
const auto &app_name = request.app_name;

std::ostringstream os;
for (size_t i = 0; i < keys.size(); ++i) {
Expand Down Expand Up @@ -3078,7 +3080,7 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc)
}
});

std::shared_ptr<app_state> app = get_app(app_name);
auto app = get_app(app_name);

// The table might be removed just before the callback function is invoked, thus we must
// check if this table still exists.
Expand All @@ -3095,6 +3097,8 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc)
return;
}

env_rpc.response().err = ERR_OK;

const auto &old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');

// Update envs of local memory.
Expand All @@ -3109,70 +3113,119 @@ void server_state::set_app_envs(const app_env_rpc &env_rpc)

void server_state::del_app_envs(const app_env_rpc &env_rpc)
{
const configuration_update_app_env_request &request = env_rpc.request();
if (!request.__isset.keys || request.keys.size() <= 0) {
const auto &request = env_rpc.request();
if (!request.__isset.keys || request.keys.empty()) {
env_rpc.response().err = ERR_INVALID_PARAMETERS;
LOG_WARNING("del app envs failed with invalid request");
return;
}
const std::vector<std::string> &keys = request.keys;
const std::string &app_name = request.app_name;

std::ostringstream os;
for (int i = 0; i < keys.size(); i++) {
if (i != 0)
os << ",";
os << keys[i];
}
const auto &keys = request.keys;
const auto &app_name = request.app_name;

LOG_INFO("del app envs for app({}) from remote({}): keys = {}",
app_name,
env_rpc.remote_address(),
os.str());
boost::join(keys, ","));

app_info ainfo;
std::string app_path;
{
FAIL_POINT_INJECT_NOT_RETURN_F("del_app_envs_failed", [app_name, this](std::string_view s) {
zauto_write_lock l(_lock);

if (s == "not_found") {
CHECK_EQ(_exist_apps.erase(app_name), 1);
return;
}

if (s == "dropping") {
gutil::FindOrDie(_exist_apps, app_name)->status = app_status::AS_DROPPING;
return;
}
});

zauto_read_lock l(_lock);
std::shared_ptr<app_state> app = get_app(app_name);
if (app == nullptr) {
LOG_WARNING("del app envs failed with invalid app_name({})", app_name);
env_rpc.response().err = ERR_INVALID_PARAMETERS;
env_rpc.response().hint_message = "invalid app name";

const auto &app = get_app(app_name);
if (!app) {
LOG_WARNING("del app envs failed since app_name({}) cannot be found", app_name);
env_rpc.response().err = ERR_APP_NOT_EXIST;
env_rpc.response().hint_message = "app cannot be found";
return;
} else {
ainfo = *(reinterpret_cast<app_info *>(app.get()));
app_path = get_app_path(*app);
}

if (app->status == app_status::AS_DROPPING) {
LOG_WARNING("del app envs failed since app(name={}, id={}) is being dropped",
app_name,
app->app_id);
env_rpc.response().err = ERR_BUSY_DROPPING;
env_rpc.response().hint_message = "app is being dropped";
return;
}

ainfo = *app;
app_path = get_app_path(*app);
}

std::ostringstream oss;
oss << "deleted keys:";
int deleted = 0;
std::string deleted_keys_info("deleted keys:");
size_t deleted_count = 0;
for (const auto &key : keys) {
if (ainfo.envs.erase(key) > 0) {
oss << std::endl << " " << key;
deleted++;
if (ainfo.envs.erase(key) == 0) {
continue;
}

fmt::format_to(std::back_inserter(deleted_keys_info), "\n {}", key);
++deleted_count;
}

if (deleted == 0) {
LOG_INFO("no key need to delete");
env_rpc.response().hint_message = "no key need to delete";
if (deleted_count == 0) {
LOG_INFO("no key needs to be deleted for app({})", app_name);
env_rpc.response().err = ERR_OK;
env_rpc.response().hint_message = "no key needs to be deleted";
return;
} else {
env_rpc.response().hint_message = oss.str();
}

env_rpc.response().hint_message = std::move(deleted_keys_info);

do_update_app_info(app_path, ainfo, [this, app_name, keys, env_rpc](error_code ec) {
CHECK_EQ_MSG(ec, ERR_OK, "update app info to remote storage failed");
CHECK_EQ_MSG(ec, ERR_OK, "update app({}) info to remote storage failed", app_name);

zauto_write_lock l(_lock);
std::shared_ptr<app_state> app = get_app(app_name);
std::string old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');

FAIL_POINT_INJECT_NOT_RETURN_F("del_app_envs_failed", [app_name, this](std::string_view s) {
if (s == "dropped_after") {
CHECK_EQ(_exist_apps.erase(app_name), 1);
return;
}
});

auto app = get_app(app_name);

// The table might be removed just before the callback function is invoked, thus we must
// check if this table still exists.
//
// TODO(wangdan): should make updates to remote storage sequential by supporting atomic
// set, otherwise update might be missing. For example, an update is setting the envs
// while another is dropping a table. The update setting the envs does not contain the
// dropped state. Once it is applied by remote storage after another update dropping
// the table, the state of the table would always be non-dropped on remote storage.
if (!app) {
LOG_ERROR("del app envs failed since app({}) has just been dropped", app_name);
env_rpc.response().err = ERR_APP_DROPPED;
env_rpc.response().hint_message = "app has just been dropped";
return;
}

env_rpc.response().err = ERR_OK;

const auto &old_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');

for (const auto &key : keys) {
app->envs.erase(key);
}
std::string new_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');

const auto &new_envs = dsn::utils::kv_map_to_string(app->envs, ',', '=');
LOG_INFO("app envs changed: old_envs = {}, new_envs = {}", old_envs, new_envs);
});
}
Expand Down
94 changes: 69 additions & 25 deletions src/meta/test/server_state_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,21 @@ class server_state_test
return rpc;
}

void test_set_app_envs(const std::string &app_name,
const std::vector<std::string> &env_keys,
const std::vector<std::string> &env_vals,
const error_code expected_err)
{
configuration_update_app_env_request request;
request.__set_app_name(app_name);
request.__set_op(app_env_operation::type::APP_ENV_OP_SET);
request.__set_keys(env_keys);
request.__set_values(env_vals);

auto rpc = set_app_envs(request);
ASSERT_EQ(expected_err, rpc.response().err);
}

app_env_rpc del_app_envs(const configuration_update_app_env_request &request)
{
auto rpc = create_app_env_rpc(request);
Expand All @@ -141,6 +156,19 @@ class server_state_test
return rpc;
}

void test_del_app_envs(const std::string &app_name,
const std::vector<std::string> &env_keys,
const error_code expected_err)
{
configuration_update_app_env_request request;
request.__set_app_name(app_name);
request.__set_op(app_env_operation::type::APP_ENV_OP_DEL);
request.__set_keys(env_keys);

auto rpc = del_app_envs(request);
ASSERT_EQ(expected_err, rpc.response().err);
}

app_env_rpc clear_app_envs(const configuration_update_app_env_request &request)
{
auto rpc = create_app_env_rpc(request);
Expand Down Expand Up @@ -220,22 +248,21 @@ void meta_service_test_app::app_envs_basic_test()
test.load_apps({"test_app1",
"test_set_app_envs_not_found",
"test_set_app_envs_dropping",
"test_set_app_envs_dropped_after"});
"test_set_app_envs_dropped_after",
"test_del_app_envs_not_found",
"test_del_app_envs_dropping",
"test_del_app_envs_dropped_after"});

#define TEST_SET_APP_ENVS_FAILED(action, err_code) \
std::cout << "test server_state::set_app_envs(" #action ")..." << std::endl; \
do { \
configuration_update_app_env_request request; \
request.__set_app_name("test_set_app_envs_" #action); \
request.__set_op(app_env_operation::type::APP_ENV_OP_SET); \
request.__set_keys({replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}); \
request.__set_values({"67108864"}); \
\
fail::setup(); \
fail::cfg("set_app_envs_failed", "void(" #action ")"); \
\
auto rpc = test.set_app_envs(request); \
ASSERT_EQ(err_code, rpc.response().err); \
test.test_set_app_envs("test_set_app_envs_" #action, \
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}, \
{"67108864"}, \
err_code); \
\
fail::teardown(); \
} while (0)
Expand All @@ -255,14 +282,7 @@ void meta_service_test_app::app_envs_basic_test()
// Normal case for setting envs.
std::cout << "test server_state::set_app_envs(success)..." << std::endl;
{
configuration_update_app_env_request request;
request.__set_app_name("test_app1");
request.__set_op(app_env_operation::type::APP_ENV_OP_SET);
request.__set_keys(keys);
request.__set_values(values);

auto rpc = test.set_app_envs(request);
ASSERT_EQ(ERR_OK, rpc.response().err);
test.test_set_app_envs("test_app1", keys, values, ERR_OK);

const auto &app = test.get_app("test_app1");
ASSERT_TRUE(app);
Expand All @@ -276,15 +296,39 @@ void meta_service_test_app::app_envs_basic_test()
}
}

std::cout << "test server_state::del_app_envs()..." << std::endl;
{
configuration_update_app_env_request request;
request.__set_app_name("test_app1");
request.__set_op(app_env_operation::type::APP_ENV_OP_DEL);
request.__set_keys(del_keys);
#define TEST_DEL_APP_ENVS_FAILED(action, err_code) \
std::cout << "test server_state::del_app_envs(" #action ")..." << std::endl; \
do { \
test.test_set_app_envs("test_del_app_envs_" #action, \
{replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}, \
{"67108864"}, \
ERR_OK); \
\
fail::setup(); \
fail::cfg("del_app_envs_failed", "void(" #action ")"); \
\
test.test_del_app_envs( \
"test_del_app_envs_" #action, {replica_envs::ROCKSDB_WRITE_BUFFER_SIZE}, err_code); \
\
fail::teardown(); \
} while (0)

// Failed to deleting envs while table was not found.
TEST_DEL_APP_ENVS_FAILED(not_found, ERR_APP_NOT_EXIST);

// Failed to deleting envs while table was being dropped as the intermediate state.
TEST_DEL_APP_ENVS_FAILED(dropping, ERR_BUSY_DROPPING);

// The table was found dropped after the new envs had been persistent on the remote
// meta storage.
TEST_DEL_APP_ENVS_FAILED(dropped_after, ERR_APP_DROPPED);

auto rpc = test.del_app_envs(request);
ASSERT_EQ(ERR_OK, rpc.response().err);
#undef TEST_DEL_APP_ENVS_FAILED

// Normal case for deleting envs.
std::cout << "test server_state::del_app_envs(success)..." << std::endl;
{
test.test_del_app_envs("test_app1", del_keys, ERR_OK);

const auto &app = test.get_app("test_app1");
ASSERT_TRUE(app);
Expand Down

0 comments on commit 7ceef5d

Please sign in to comment.