Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix DataDir invalid access, refactor DataDir management (backport #51063) #51205

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/storage/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Status StorageEngine::start_bg_threads() {
// convert store map to vector
std::vector<DataDir*> data_dirs;
for (auto& tmp_store : _store_map) {
data_dirs.push_back(tmp_store.second);
data_dirs.push_back(tmp_store.second.get());
}
const auto data_dir_num = static_cast<int32_t>(data_dirs.size());

Expand Down
49 changes: 16 additions & 33 deletions be/src/storage/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ StorageEngine::~StorageEngine() {
// tablet manager need to destruct before set storage engine instance to nullptr because tablet may access storage
// engine instance during their destruction.
_tablet_manager.reset();

// _store can be still referenced by any tablet, make sure it is destroyed after `_tablet_manager`
_store_map.clear();
#ifdef BE_TEST
if (_s_instance == this) {
_s_instance = _p_instance;
Expand Down Expand Up @@ -263,22 +266,16 @@ Status StorageEngine::_open(const EngineOptions& options) {
}

Status StorageEngine::_init_store_map() {
std::vector<std::pair<bool, DataDir*>> tmp_stores;
ScopedCleanup release_guard([&] {
for (const auto& item : tmp_stores) {
if (item.first) {
delete item.second;
}
}
});
std::map<std::string, std::unique_ptr<DataDir>> tmp_stores;
std::vector<std::thread> threads;
SpinLock error_msg_lock;
std::string error_msg;
for (auto& path : _options.store_paths) {
auto* store = new DataDir(path.path, path.storage_medium, _tablet_manager.get(), _txn_manager.get());
ScopedCleanup store_release_guard([&]() { delete store; });
tmp_stores.emplace_back(true, store);
store_release_guard.cancel();
auto store_ptr =
std::make_unique<DataDir>(path.path, path.storage_medium, _tablet_manager.get(), _txn_manager.get());
DataDir* store = store_ptr.get();
tmp_stores.emplace(path.path, std::move(store_ptr));
// store_ptr will be invalid ever since
threads.emplace_back([store, &error_msg_lock, &error_msg]() {
auto st = store->init();
if (!st.ok()) {
Expand All @@ -300,12 +297,7 @@ Status StorageEngine::_init_store_map() {
return Status::InternalError(strings::Substitute("init path failed, error=$0", error_msg));
}

for (auto& store : tmp_stores) {
_store_map.emplace(store.second->path(), store.second);
store.first = false;
}

release_guard.cancel();
_store_map.swap(tmp_stores);
return Status::OK();
}

Expand Down Expand Up @@ -352,12 +344,12 @@ std::vector<DataDir*> StorageEngine::get_stores() {
std::lock_guard<std::mutex> l(_store_lock);
if (include_unused) {
for (auto& it : _store_map) {
stores.push_back(it.second);
stores.push_back(it.second.get());
}
} else {
for (auto& it : _store_map) {
if (it.second->is_used()) {
stores.push_back(it.second);
stores.push_back(it.second.get());
}
}
}
Expand Down Expand Up @@ -496,7 +488,7 @@ std::vector<DataDir*> StorageEngine::get_stores_for_create_tablet(TStorageMedium
if (it.second->get_state() == DiskState::ONLINE) {
if (_available_storage_medium_type_count == 1 || it.second->storage_medium() == storage_medium) {
if (!it.second->capacity_limit_reached(0)) {
stores.push_back(it.second);
stores.push_back(it.second.get());
}
}
}
Expand Down Expand Up @@ -537,14 +529,14 @@ DataDir* StorageEngine::get_store(const std::string& path) {
if (it == std::end(_store_map)) {
return nullptr;
}
return it->second;
return it->second.get();
}

DataDir* StorageEngine::get_store(int64_t path_hash) {
std::lock_guard<std::mutex> l(_store_lock);
for (auto& it : _store_map) {
if (it.second->path_hash() == path_hash) {
return it.second;
return it.second.get();
}
}
return nullptr;
Expand Down Expand Up @@ -670,15 +662,6 @@ void StorageEngine::stop() {
#undef JOIN_THREADS
#undef JOIN_THREAD

{
std::lock_guard<std::mutex> l(_store_lock);
for (auto& store_pair : _store_map) {
delete store_pair.second;
store_pair.second = nullptr;
}
_store_map.clear();
}

_checker_cv.notify_all();
if (_compaction_checker_thread.joinable()) {
_compaction_checker_thread.join();
Expand Down Expand Up @@ -1413,7 +1396,7 @@ void StorageEngine::increase_update_compaction_thread(const int num_threads_per_
// convert store map to vector
std::vector<DataDir*> data_dirs;
for (auto& tmp_store : _store_map) {
data_dirs.push_back(tmp_store.second);
data_dirs.push_back(tmp_store.second.get());
}
const auto data_dir_num = static_cast<int32_t>(data_dirs.size());
const int32_t cur_threads_per_disk = _update_compaction_threads.size() / data_dir_num;
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ class StorageEngine {
private:
EngineOptions _options;
std::mutex _store_lock;
std::map<std::string, DataDir*> _store_map;
std::map<std::string, std::unique_ptr<DataDir>> _store_map;
uint32_t _available_storage_medium_type_count;
bool _is_all_cluster_id_exist;

Expand Down
Loading