diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f43190ebb36b97..b7bbaf8f206702 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -271,12 +271,17 @@ void FragmentMgr::stop() { { std::lock_guard lock(_lock); _fragment_instance_map.clear(); - _query_ctx_map.clear(); for (auto& pipeline : _pipeline_map) { pipeline.second->close_sink(); } _pipeline_map.clear(); } + + { + std::unique_lock lock(_query_ctx_map_lock); + _query_ctx_map.clear(); + } + _async_report_thread_pool->shutdown(); } @@ -620,11 +625,11 @@ void FragmentMgr::_exec_actual(std::shared_ptr fragment_ex _fragment_instance_map.erase(fragment_executor->fragment_instance_id()); LOG_INFO("Instance {} finished", print_id(fragment_executor->fragment_instance_id())); - - if (all_done && query_ctx) { - _query_ctx_map.erase(query_ctx->query_id()); - LOG_INFO("Query {} finished", print_id(query_ctx->query_id())); - } + } + if (all_done && query_ctx) { + std::unique_lock lock(_query_ctx_map_lock); + _query_ctx_map.erase(query_ctx->query_id()); + LOG_INFO("Query {} finished", print_id(query_ctx->query_id())); } // Callback after remove from this id @@ -713,8 +718,10 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r query_id.__set_lo(request->query_id().lo()); std::shared_ptr q_ctx = nullptr; { - std::lock_guard lock(_lock); - + TUniqueId query_id; + query_id.__set_hi(request->query_id().hi()); + query_id.__set_lo(request->query_id().lo()); + std::shared_lock lock(_query_ctx_map_lock); auto search = _query_ctx_map.find(query_id); if (search == _query_ctx_map.end()) { return Status::InternalError( @@ -732,22 +739,24 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r void FragmentMgr::remove_pipeline_context( std::shared_ptr f_context) { auto* q_context = f_context->get_query_ctx(); + bool all_done = false; + TUniqueId query_id = f_context->get_query_id(); { std::lock_guard lock(_lock); - auto query_id = f_context->get_query_id(); std::vector ins_ids; f_context->instance_ids(ins_ids); - bool all_done = q_context->countdown(ins_ids.size()); + all_done = q_context->countdown(ins_ids.size()); for (const auto& ins_id : ins_ids) { LOG_INFO("Removing query {} instance {}, all done? {}", print_id(query_id), print_id(ins_id), all_done); _pipeline_map.erase(ins_id); g_pipeline_fragment_instances_count << -1; } - if (all_done) { - LOG_INFO("Query {} finished", print_id(query_id)); - _query_ctx_map.erase(query_id); - } + } + if (all_done) { + std::unique_lock lock(_query_ctx_map_lock); + _query_ctx_map.erase(query_id); + LOG_INFO("Query {} finished", print_id(query_id)); } } @@ -759,7 +768,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo { return Status::InternalError("FragmentMgr._get_query_ctx.failed"); }); if (params.is_simplified_param) { // Get common components from _query_ctx_map - std::lock_guard lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto search = _query_ctx_map.find(query_id); if (search == _query_ctx_map.end()) { return Status::InternalError( @@ -771,7 +780,16 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } else { // Find _query_ctx_map, in case some other request has already // create the query fragments context. - std::lock_guard lock(_lock); + { + std::shared_lock lock(_query_ctx_map_lock); + auto search = _query_ctx_map.find(query_id); + if (search != _query_ctx_map.end()) { + query_ctx = search->second; + return Status::OK(); + } + } + + std::unique_lock lock(_query_ctx_map_lock); auto search = _query_ctx_map.find(query_id); if (search != _query_ctx_map.end()) { query_ctx = search->second; @@ -1170,7 +1188,7 @@ void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query } std::shared_ptr FragmentMgr::get_query_context(const TUniqueId& query_id) { - std::lock_guard state_lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto ctx = _query_ctx_map.find(query_id); if (ctx != _query_ctx_map.end()) { return ctx->second; @@ -1184,7 +1202,7 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCan std::shared_ptr query_ctx; std::vector all_instance_ids; { - std::lock_guard state_lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto ctx_iter = _query_ctx_map.find(query_id); if (ctx_iter == _query_ctx_map.end()) { @@ -1251,7 +1269,7 @@ void FragmentMgr::cancel_instance(const TUniqueId& instance_id, void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t fragment_id, const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::unique_lock lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto q_ctx_iter = _query_ctx_map.find(query_id); if (q_ctx_iter != _query_ctx_map.end()) { // Has to use value to keep the shared ptr not deconstructed. @@ -1315,6 +1333,9 @@ void FragmentMgr::cancel_worker() { pipeline_itr.second->clear_finished_tasks(); } } + } + { + std::unique_lock lock(_query_ctx_map_lock); for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { if (it->second->is_timeout(now)) { LOG_WARNING("Query {} is timeout", print_id(it->first)); @@ -1335,7 +1356,9 @@ void FragmentMgr::cancel_worker() { ++it; } } - + } + { + std::shared_lock lock(_query_ctx_map_lock); // We use a very conservative cancel strategy. // 0. If there are no running frontends, do not cancel any queries. // 1. If query's process uuid is zero, do not cancel @@ -1773,7 +1796,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto iter = _query_ctx_map.find(query_id); if (iter == _query_ctx_map.end()) { return Status::EndOfFile("Query context (query-id: {}) not found, maybe finished", @@ -1796,7 +1819,7 @@ Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto iter = _query_ctx_map.find(query_id); if (iter == _query_ctx_map.end()) { return Status::InvalidArgument("query-id: {}", queryid.to_string()); @@ -1819,7 +1842,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); auto iter = _query_ctx_map.find(query_id); if (iter == _query_ctx_map.end()) { return Status::InvalidArgument("query-id: {}", queryid.to_string()); @@ -1914,7 +1937,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag void FragmentMgr::get_runtime_query_info(std::vector* query_info_list) { { - std::lock_guard lock(_lock); + std::shared_lock lock(_query_ctx_map_lock); for (const auto& q : _query_ctx_map) { WorkloadQueryInfo workload_query_info; workload_query_info.query_id = print_id(q.first); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 0c1bb3033d9db9..53cea30686fa22 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -143,7 +143,7 @@ class FragmentMgr : public RestMonitorIface { std::shared_ptr get_query_context(const TUniqueId& query_id); int32_t running_query_num() { - std::unique_lock ctx_lock(_lock); + std::shared_lock ctx_lock(_query_ctx_map_lock); return _query_ctx_map.size(); } @@ -201,8 +201,9 @@ class FragmentMgr : public RestMonitorIface { std::unordered_map> _pipeline_map; + std::shared_mutex _query_ctx_map_lock; // query id -> QueryContext - std::unordered_map> _query_ctx_map; + phmap::flat_hash_map> _query_ctx_map; std::unordered_map> _bf_size_map; CountDownLatch _stop_background_threads_latch;