From f94da0e442ef0b4b6a3d2aa06b524defac963565 Mon Sep 17 00:00:00 2001 From: Kevin Cai Date: Fri, 20 Sep 2024 14:45:54 +0800 Subject: [PATCH] [BugFix] fix DataDir invalid access, refactor DataDir management (#51063) Signed-off-by: Kevin Xiaohua Cai (cherry picked from commit 92d1be9b2d3d7f1761b232001848da459296c08a) --- be/src/storage/olap_server.cpp | 2 +- be/src/storage/storage_engine.cpp | 49 ++++++++++--------------------- be/src/storage/storage_engine.h | 2 +- 3 files changed, 18 insertions(+), 35 deletions(-) diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp index b9186a7921247..e7fc14f149e3d 100644 --- a/be/src/storage/olap_server.cpp +++ b/be/src/storage/olap_server.cpp @@ -117,7 +117,7 @@ Status StorageEngine::start_bg_threads() { // convert store map to vector std::vector data_dirs; for (auto& tmp_store : _store_map) { - data_dirs.push_back(tmp_store.second); + data_dirs.push_back(tmp_store.second.get()); } const auto data_dir_num = static_cast(data_dirs.size()); diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index 20ecdeb7a12b2..e1aea584c5b6f 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -149,6 +149,9 @@ StorageEngine::~StorageEngine() { // tablet manager need to destruct before set storage engine instance to nullptr because tablet may access storage // engine instance during their destruction. _tablet_manager.reset(); + + // _store can be still referenced by any tablet, make sure it is destroyed after `_tablet_manager` + _store_map.clear(); #ifdef BE_TEST if (_s_instance == this) { _s_instance = _p_instance; @@ -263,22 +266,16 @@ Status StorageEngine::_open(const EngineOptions& options) { } Status StorageEngine::_init_store_map() { - std::vector> tmp_stores; - ScopedCleanup release_guard([&] { - for (const auto& item : tmp_stores) { - if (item.first) { - delete item.second; - } - } - }); + std::map> tmp_stores; std::vector threads; SpinLock error_msg_lock; std::string error_msg; for (auto& path : _options.store_paths) { - auto* store = new DataDir(path.path, path.storage_medium, _tablet_manager.get(), _txn_manager.get()); - ScopedCleanup store_release_guard([&]() { delete store; }); - tmp_stores.emplace_back(true, store); - store_release_guard.cancel(); + auto store_ptr = + std::make_unique(path.path, path.storage_medium, _tablet_manager.get(), _txn_manager.get()); + DataDir* store = store_ptr.get(); + tmp_stores.emplace(path.path, std::move(store_ptr)); + // store_ptr will be invalid ever since threads.emplace_back([store, &error_msg_lock, &error_msg]() { auto st = store->init(); if (!st.ok()) { @@ -300,12 +297,7 @@ Status StorageEngine::_init_store_map() { return Status::InternalError(strings::Substitute("init path failed, error=$0", error_msg)); } - for (auto& store : tmp_stores) { - _store_map.emplace(store.second->path(), store.second); - store.first = false; - } - - release_guard.cancel(); + _store_map.swap(tmp_stores); return Status::OK(); } @@ -352,12 +344,12 @@ std::vector StorageEngine::get_stores() { std::lock_guard l(_store_lock); if (include_unused) { for (auto& it : _store_map) { - stores.push_back(it.second); + stores.push_back(it.second.get()); } } else { for (auto& it : _store_map) { if (it.second->is_used()) { - stores.push_back(it.second); + stores.push_back(it.second.get()); } } } @@ -496,7 +488,7 @@ std::vector StorageEngine::get_stores_for_create_tablet(TStorageMedium if (it.second->get_state() == DiskState::ONLINE) { if (_available_storage_medium_type_count == 1 || it.second->storage_medium() == storage_medium) { if (!it.second->capacity_limit_reached(0)) { - stores.push_back(it.second); + stores.push_back(it.second.get()); } } } @@ -537,14 +529,14 @@ DataDir* StorageEngine::get_store(const std::string& path) { if (it == std::end(_store_map)) { return nullptr; } - return it->second; + return it->second.get(); } DataDir* StorageEngine::get_store(int64_t path_hash) { std::lock_guard l(_store_lock); for (auto& it : _store_map) { if (it.second->path_hash() == path_hash) { - return it.second; + return it.second.get(); } } return nullptr; @@ -670,15 +662,6 @@ void StorageEngine::stop() { #undef JOIN_THREADS #undef JOIN_THREAD - { - std::lock_guard l(_store_lock); - for (auto& store_pair : _store_map) { - delete store_pair.second; - store_pair.second = nullptr; - } - _store_map.clear(); - } - _checker_cv.notify_all(); if (_compaction_checker_thread.joinable()) { _compaction_checker_thread.join(); @@ -1413,7 +1396,7 @@ void StorageEngine::increase_update_compaction_thread(const int num_threads_per_ // convert store map to vector std::vector data_dirs; for (auto& tmp_store : _store_map) { - data_dirs.push_back(tmp_store.second); + data_dirs.push_back(tmp_store.second.get()); } const auto data_dir_num = static_cast(data_dirs.size()); const int32_t cur_threads_per_disk = _update_compaction_threads.size() / data_dir_num; diff --git a/be/src/storage/storage_engine.h b/be/src/storage/storage_engine.h index 4a1541fc60830..b07d390e13f1a 100644 --- a/be/src/storage/storage_engine.h +++ b/be/src/storage/storage_engine.h @@ -418,7 +418,7 @@ class StorageEngine { private: EngineOptions _options; std::mutex _store_lock; - std::map _store_map; + std::map> _store_map; uint32_t _available_storage_medium_type_count; bool _is_all_cluster_id_exist;