Skip to content

Commit

Permalink
[BugFix] fix DataDir invalid access, refactor DataDir management
Browse files Browse the repository at this point in the history
* release DataDir in StorageEngine destructor
* manage DataDir with unique_ptr to ease the memory deallocation

Signed-off-by: Kevin Xiaohua Cai <[email protected]>
  • Loading branch information
kevincai committed Sep 14, 2024
1 parent 0df6bd2 commit a9b389f
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 32 deletions.
2 changes: 1 addition & 1 deletion be/src/storage/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Status StorageEngine::start_bg_threads() {
// convert store map to vector
std::vector<DataDir*> data_dirs;
for (auto& tmp_store : _store_map) {
data_dirs.push_back(tmp_store.second);
data_dirs.push_back(tmp_store.second.get());
}
const auto data_dir_num = static_cast<int32_t>(data_dirs.size());

Expand Down
48 changes: 18 additions & 30 deletions be/src/storage/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ StorageEngine::~StorageEngine() {
// tablet manager need to destruct before set storage engine instance to nullptr because tablet may access storage
// engine instance during their destruction.
_tablet_manager.reset();

// _store can be still referenced by any tablet, make sure it is destroyed after `_tablet_manager`
_store_map.clear();
#ifdef BE_TEST
if (_s_instance == this) {
_s_instance = _p_instance;
Expand Down Expand Up @@ -263,22 +266,16 @@ Status StorageEngine::_open(const EngineOptions& options) {
}

Status StorageEngine::_init_store_map() {
std::vector<std::pair<bool, DataDir*>> tmp_stores;
ScopedCleanup release_guard([&] {
for (const auto& item : tmp_stores) {
if (item.first) {
delete item.second;
}
}
});
std::vector<std::unique_ptr<DataDir>> tmp_stores;
std::vector<std::thread> threads;
SpinLock error_msg_lock;
std::string error_msg;
for (auto& path : _options.store_paths) {
auto* store = new DataDir(path.path, path.storage_medium, _tablet_manager.get(), _txn_manager.get());
ScopedCleanup store_release_guard([&]() { delete store; });
tmp_stores.emplace_back(true, store);
store_release_guard.cancel();
auto store_ptr =
std::make_unique<DataDir>(path.path, path.storage_medium, _tablet_manager.get(), _txn_manager.get());
DataDir* store = store_ptr.get();
tmp_stores.emplace_back(std::move(store_ptr));
// store_ptr will be invalid ever since
threads.emplace_back([store, &error_msg_lock, &error_msg]() {
auto st = store->init();
if (!st.ok()) {
Expand All @@ -301,11 +298,11 @@ Status StorageEngine::_init_store_map() {
}

for (auto& store : tmp_stores) {
_store_map.emplace(store.second->path(), store.second);
store.first = false;
// store will be nullptr after `store.release()`, so record the path beforehead
auto path = store->path();
_store_map.emplace(path, store.release());
}

release_guard.cancel();
return Status::OK();
}

Expand Down Expand Up @@ -352,12 +349,12 @@ std::vector<DataDir*> StorageEngine::get_stores() {
std::lock_guard<std::mutex> l(_store_lock);
if (include_unused) {
for (auto& it : _store_map) {
stores.push_back(it.second);
stores.push_back(it.second.get());
}
} else {
for (auto& it : _store_map) {
if (it.second->is_used()) {
stores.push_back(it.second);
stores.push_back(it.second.get());
}
}
}
Expand Down Expand Up @@ -496,7 +493,7 @@ std::vector<DataDir*> StorageEngine::get_stores_for_create_tablet(TStorageMedium
if (it.second->get_state() == DiskState::ONLINE) {
if (_available_storage_medium_type_count == 1 || it.second->storage_medium() == storage_medium) {
if (!it.second->capacity_limit_reached(0)) {
stores.push_back(it.second);
stores.push_back(it.second.get());
}
}
}
Expand Down Expand Up @@ -537,14 +534,14 @@ DataDir* StorageEngine::get_store(const std::string& path) {
if (it == std::end(_store_map)) {
return nullptr;
}
return it->second;
return it->second.get();
}

DataDir* StorageEngine::get_store(int64_t path_hash) {
std::lock_guard<std::mutex> l(_store_lock);
for (auto& it : _store_map) {
if (it.second->path_hash() == path_hash) {
return it.second;
return it.second.get();
}
}
return nullptr;
Expand Down Expand Up @@ -670,15 +667,6 @@ void StorageEngine::stop() {
#undef JOIN_THREADS
#undef JOIN_THREAD

{
std::lock_guard<std::mutex> l(_store_lock);
for (auto& store_pair : _store_map) {
delete store_pair.second;
store_pair.second = nullptr;
}
_store_map.clear();
}

_checker_cv.notify_all();
if (_compaction_checker_thread.joinable()) {
_compaction_checker_thread.join();
Expand Down Expand Up @@ -1413,7 +1401,7 @@ void StorageEngine::increase_update_compaction_thread(const int num_threads_per_
// convert store map to vector
std::vector<DataDir*> data_dirs;
for (auto& tmp_store : _store_map) {
data_dirs.push_back(tmp_store.second);
data_dirs.push_back(tmp_store.second.get());
}
const auto data_dir_num = static_cast<int32_t>(data_dirs.size());
const int32_t cur_threads_per_disk = _update_compaction_threads.size() / data_dir_num;
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ class StorageEngine {
private:
EngineOptions _options;
std::mutex _store_lock;
std::map<std::string, DataDir*> _store_map;
std::map<std::string, std::unique_ptr<DataDir>> _store_map;
uint32_t _available_storage_medium_type_count;
bool _is_all_cluster_id_exist;

Expand Down

0 comments on commit a9b389f

Please sign in to comment.