diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 337a7aa41fc16be..b89c3701123b7b5 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -267,8 +267,11 @@ void FragmentMgr::stop() { // Only me can delete { - std::lock_guard lock(_lock); + std::unique_lock lock(_query_ctx_map_mutex); _query_ctx_map.clear(); + } + { + std::unique_lock lock(_pipeline_map_mutex); _pipeline_map.clear(); } _thread_pool->shutdown(); @@ -620,11 +623,7 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r TUniqueId query_id; query_id.__set_hi(request->query_id().hi()); query_id.__set_lo(request->query_id().lo()); - std::shared_ptr q_ctx = nullptr; - { - std::lock_guard lock(_lock); - q_ctx = _get_or_erase_query_ctx(query_id); - } + auto q_ctx = get_query_ctx(query_id); if (q_ctx) { q_ctx->set_ready_to_execute(Status::OK()); LOG_INFO("Query {} start execution", print_id(query_id)); @@ -639,116 +638,110 @@ Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* r void FragmentMgr::remove_pipeline_context( std::shared_ptr f_context) { - { - std::lock_guard lock(_lock); - auto query_id = f_context->get_query_id(); - int64 now = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - g_fragment_executing_count << -1; - g_fragment_last_active_time.set_value(now); - // this log will show when a query is really finished in BEs - LOG_INFO("Removing query {} fragment {}", print_id(query_id), f_context->get_fragment_id()); - _pipeline_map.erase({query_id, f_context->get_fragment_id()}); - } + auto query_id = f_context->get_query_id(); + int64 now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + g_fragment_executing_count << -1; + g_fragment_last_active_time.set_value(now); + + // this log will show when a query is really finished in BEs + LOG_INFO("Removing query {} fragment {}", print_id(query_id), f_context->get_fragment_id()); + + std::unique_lock lock(_pipeline_map_mutex); + _pipeline_map.erase({query_id, f_context->get_fragment_id()}); } -std::shared_ptr FragmentMgr::_get_or_erase_query_ctx(const TUniqueId& query_id) { +std::shared_ptr FragmentMgr::get_query_ctx(const TUniqueId& query_id) { + std::shared_lock lock(_query_ctx_map_mutex); auto search = _query_ctx_map.find(query_id); if (search != _query_ctx_map.end()) { if (auto q_ctx = search->second.lock()) { return q_ctx; - } else { - LOG(WARNING) << "Query context (query id = " << print_id(query_id) - << ") has been released."; - _query_ctx_map.erase(search); - return nullptr; } } return nullptr; } -std::shared_ptr FragmentMgr::get_or_erase_query_ctx_with_lock( - const TUniqueId& query_id) { - std::unique_lock lock(_lock); - return _get_or_erase_query_ctx(query_id); -} - -template -Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, - QuerySource query_source, - std::shared_ptr& query_ctx) { +Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& params, + TUniqueId query_id, bool pipeline, + QuerySource query_source, + std::shared_ptr& query_ctx) { DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", { return Status::InternalError("FragmentMgr._get_query_ctx.failed, query id {}", print_id(query_id)); }); + + // Find _query_ctx_map, in case some other request has already + // create the query fragments context. + query_ctx = get_query_ctx(query_id); if (params.is_simplified_param) { // Get common components from _query_ctx_map - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { - query_ctx = q_ctx; - } else { + if (!query_ctx) { return Status::InternalError( "Failed to get query fragments context. Query {} may be timeout or be " "cancelled. host: {}", print_id(query_id), BackendOptions::get_localhost()); } } else { - // Find _query_ctx_map, in case some other request has already - // create the query fragments context. - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { - query_ctx = q_ctx; - return Status::OK(); - } + if (!query_ctx) { + std::unique_lock lock(_query_ctx_map_mutex); + // Only one thread need create query ctx. other thread just get query_ctx in _query_ctx_map. + auto search = _query_ctx_map.find(query_id); + if (search != _query_ctx_map.end()) { + query_ctx = search->second.lock(); + } - // First time a fragment of a query arrived. print logs. - LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord - << ", total fragment num on current host: " << params.fragment_num_on_host - << ", fe process uuid: " << params.query_options.fe_process_uuid - << ", query type: " << params.query_options.query_type - << ", report audit fe:" << params.current_connect_fe; - - // This may be a first fragment request of the query. - // Create the query fragments context. - query_ctx = QueryContext::create_shared(query_id, _exec_env, params.query_options, - params.coord, pipeline, params.is_nereids, - params.current_connect_fe, query_source); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); - RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, - &(query_ctx->desc_tbl))); - // set file scan range params - if (params.__isset.file_scan_params) { - query_ctx->file_scan_range_params_map = params.file_scan_params; - } + if (!query_ctx) { + // First time a fragment of a query arrived. print logs. + LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " << params.coord + << ", total fragment num on current host: " << params.fragment_num_on_host + << ", fe process uuid: " << params.query_options.fe_process_uuid + << ", query type: " << params.query_options.query_type + << ", report audit fe:" << params.current_connect_fe; + + // This may be a first fragment request of the query. + // Create the query fragments context. + query_ctx = QueryContext::create_shared(query_id, _exec_env, params.query_options, + params.coord, params.is_nereids, + params.current_connect_fe, query_source); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker); + RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, + &(query_ctx->desc_tbl))); + // set file scan range params + if (params.__isset.file_scan_params) { + query_ctx->file_scan_range_params_map = params.file_scan_params; + } - query_ctx->query_globals = params.query_globals; + query_ctx->query_globals = params.query_globals; - if (params.__isset.resource_info) { - query_ctx->user = params.resource_info.user; - query_ctx->group = params.resource_info.group; - query_ctx->set_rsc_info = true; - } + if (params.__isset.resource_info) { + query_ctx->user = params.resource_info.user; + query_ctx->group = params.resource_info.group; + query_ctx->set_rsc_info = true; + } - _set_scan_concurrency(params, query_ctx.get()); - - if (params.__isset.workload_groups && !params.workload_groups.empty()) { - uint64_t tg_id = params.workload_groups[0].id; - WorkloadGroupPtr workload_group_ptr = - _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id); - if (workload_group_ptr != nullptr) { - RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); - RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr)); - _exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id), - tg_id); - } else { - LOG(WARNING) << "Query/load id: " << print_id(query_ctx->query_id()) - << "can't find its workload group " << tg_id; + _set_scan_concurrency(params, query_ctx.get()); + + if (params.__isset.workload_groups && !params.workload_groups.empty()) { + uint64_t tg_id = params.workload_groups[0].id; + WorkloadGroupPtr workload_group_ptr = + _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id); + if (workload_group_ptr != nullptr) { + RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx)); + RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr)); + _exec_env->runtime_query_statistics_mgr()->set_workload_group_id( + print_id(query_id), tg_id); + } else { + LOG(WARNING) << "Query/load id: " << print_id(query_ctx->query_id()) + << "can't find its workload group " << tg_id; + } + } + // There is some logic in query ctx's dctor, we could not check if exists and delete the + // temp query ctx now. For example, the query id maybe removed from workload group's queryset. + _query_ctx_map.insert({query_id, query_ctx}); } } - // There is some logic in query ctx's dctor, we could not check if exists and delete the - // temp query ctx now. For example, the query id maybe removed from workload group's queryset. - _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx)); } return Status::OK(); } @@ -762,13 +755,13 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { fmt::memory_buffer debug_string_buffer; size_t i = 0; { - std::lock_guard lock(_lock); fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts are still running! duration_limit={}\n", _pipeline_map.size(), duration); - timespec now; clock_gettime(CLOCK_MONOTONIC, &now); + + std::shared_lock lock(_pipeline_map_mutex); for (auto& it : _pipeline_map) { auto elapsed = it.second->elapsed_time() / 1000000000.0; if (elapsed < duration) { @@ -787,7 +780,7 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { } std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) { - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { return q_ctx->print_all_pipeline_context(); } else { return fmt::format( @@ -806,7 +799,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, << apache::thrift::ThriftDebugString(params.query_options).c_str(); std::shared_ptr query_ctx; - RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_source, query_ctx)); + RETURN_IF_ERROR( + _get_or_create_query_ctx(params, params.query_id, true, query_source, query_ctx)); SCOPED_ATTACH_TASK(query_ctx.get()); int64_t duration_ns = 0; std::shared_ptr context = @@ -839,16 +833,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, } { - // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. - std::lock_guard lock(_lock); for (const auto& local_param : params.local_params) { const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; - auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); - if (iter != _pipeline_map.end()) { - return Status::InternalError( - "exec_plan_fragment query_id({}) input duplicated fragment_id({})", - print_id(params.query_id), params.fragment_id); - } query_ctx->fragment_instance_ids.push_back(fragment_instance_id); } @@ -857,7 +843,15 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, .count(); g_fragment_executing_count << 1; g_fragment_last_active_time.set_value(now); - // TODO: simplify this mapping + + // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. + std::unique_lock lock(_pipeline_map_mutex); + auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); + if (iter != _pipeline_map.end()) { + return Status::InternalError( + "exec_plan_fragment query_id({}) input duplicated fragment_id({})", + print_id(params.query_id), params.fragment_id); + } _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); } @@ -887,8 +881,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { std::shared_ptr query_ctx = nullptr; std::vector all_instance_ids; { - std::lock_guard state_lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; // Copy instanceids to avoid concurrent modification. // And to reduce the scope of lock. @@ -901,7 +894,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { } query_ctx->cancel(reason); { - std::lock_guard state_lock(_lock); + std::unique_lock l(_query_ctx_map_mutex); _query_ctx_map.erase(query_id); } LOG(INFO) << "Query " << print_id(query_id) @@ -937,7 +930,7 @@ void FragmentMgr::cancel_worker() { std::vector> ctx; { - std::lock_guard lock(_lock); + std::shared_lock lock(_pipeline_map_mutex); ctx.reserve(_pipeline_map.size()); for (auto& pipeline_itr : _pipeline_map) { ctx.push_back(pipeline_itr.second); @@ -948,21 +941,24 @@ void FragmentMgr::cancel_worker() { } { - std::lock_guard lock(_lock); - for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { - if (auto q_ctx = it->second.lock()) { - if (q_ctx->is_timeout(now)) { - LOG_WARNING("Query {} is timeout", print_id(it->first)); - queries_timeout.push_back(it->first); + { + // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must + // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok + std::unique_lock lock(_query_ctx_map_mutex); + for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { + if (auto q_ctx = it->second.lock()) { + if (q_ctx->is_timeout(now)) { + LOG_WARNING("Query {} is timeout", print_id(it->first)); + queries_timeout.push_back(it->first); + } ++it; } else { - ++it; + it = _query_ctx_map.erase(it); } - } else { - it = _query_ctx_map.erase(it); } } + std::shared_lock lock(_query_ctx_map_mutex); // We use a very conservative cancel strategy. // 0. If there are no running frontends, do not cancel any queries. // 1. If query's process uuid is zero, do not cancel @@ -1186,7 +1182,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, const auto& fragment_ids = request->fragment_ids(); { - std::unique_lock lock(_lock); + std::shared_lock lock(_pipeline_map_mutex); for (auto fragment_id : fragment_ids) { if (is_pipeline) { auto iter = _pipeline_map.find( @@ -1242,8 +1238,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; } else { return Status::EndOfFile( @@ -1266,8 +1261,7 @@ Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) { TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; } else { return Status::EndOfFile( @@ -1287,8 +1281,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, TUniqueId query_id; query_id.__set_hi(queryid.hi); query_id.__set_lo(queryid.lo); - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { + if (auto q_ctx = get_query_ctx(query_id)) { query_ctx = q_ctx; } else { return Status::EndOfFile( @@ -1305,7 +1298,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, void FragmentMgr::get_runtime_query_info(std::vector* query_info_list) { { - std::lock_guard lock(_lock); + std::unique_lock lock(_query_ctx_map_mutex); for (auto iter = _query_ctx_map.begin(); iter != _query_ctx_map.end();) { if (auto q_ctx = iter->second.lock()) { WorkloadQueryInfo workload_query_info; @@ -1328,19 +1321,9 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id, return Status::InvalidArgument("exes_status is nullptr"); } - std::shared_ptr query_context = nullptr; - - { - std::lock_guard lock(_lock); - if (auto q_ctx = _get_or_erase_query_ctx(query_id)) { - query_context = q_ctx; - } else { - return Status::NotFound("Query {} has been released", print_id(query_id)); - } - } - + std::shared_ptr query_context = get_query_ctx(query_id); if (query_context == nullptr) { - return Status::NotFound("Query {} not found", print_id(query_id)); + return Status::NotFound("Query {} not found or released", print_id(query_id)); } *exec_status = query_context->get_realtime_exec_status(); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 20b2fd8cdc20631..41ea67844e0cf30 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -133,7 +133,7 @@ class FragmentMgr : public RestMonitorIface { ThreadPool* get_thread_pool() { return _thread_pool.get(); } int32_t running_query_num() { - std::unique_lock ctx_lock(_lock); + std::shared_lock lock(_query_ctx_map_mutex); return _query_ctx_map.size(); } @@ -145,35 +145,41 @@ class FragmentMgr : public RestMonitorIface { Status get_realtime_exec_status(const TUniqueId& query_id, TReportExecStatusParams* exec_status); - std::shared_ptr get_or_erase_query_ctx_with_lock(const TUniqueId& query_id); + std::shared_ptr get_query_ctx(const TUniqueId& query_id); private: std::shared_ptr _get_or_erase_query_ctx(const TUniqueId& query_id); + struct BrpcItem { + TNetworkAddress network_address; + std::vector> queries; + }; + template void _set_scan_concurrency(const Param& params, QueryContext* query_ctx); - template - Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, - QuerySource query_type, std::shared_ptr& query_ctx); + Status _get_or_create_query_ctx(const TPipelineFragmentParams& params, TUniqueId query_id, + bool pipeline, QuerySource query_type, + std::shared_ptr& query_ctx); // This is input params ExecEnv* _exec_env = nullptr; + // The lock protect the `_pipeline_map` + std::shared_mutex _pipeline_map_mutex; + // (QueryID, FragmentID) -> PipelineFragmentContext + phmap::flat_hash_map, + std::shared_ptr> + _pipeline_map; + // The lock should only be used to protect the structures in fragment manager. Has to be // used in a very small scope because it may dead lock. For example, if the _lock is used // in prepare stage, the call path is prepare --> expr prepare --> may call allocator // when allocate failed, allocator may call query_is_cancelled, query is callced will also // call _lock, so that there is dead lock. - std::mutex _lock; - - // (QueryID, FragmentID) -> PipelineFragmentContext - std::unordered_map, - std::shared_ptr> - _pipeline_map; - + std::shared_mutex _query_ctx_map_mutex; // query id -> QueryContext - std::unordered_map> _query_ctx_map; + phmap::flat_hash_map> _query_ctx_map; std::unordered_map> _bf_size_map; CountDownLatch _stop_background_threads_latch; diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 9369c0c833c53cc..4ff83ff93dfe6f8 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -45,8 +45,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig _backend_id(backend_id), _enable_profile(enable_profile) { std::shared_ptr query_context = - ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock( - _load_id.to_thrift()); + ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(_load_id.to_thrift()); std::shared_ptr mem_tracker = nullptr; WorkloadGroupPtr wg_ptr = nullptr; diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 752e2ff95b29174..60da45fa685fbf0 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -428,7 +428,7 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool e TUniqueId load_tid = ((UniqueId)load_id).to_thrift(); #ifndef BE_TEST std::shared_ptr query_context = - ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(load_tid); + ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid); if (query_context != nullptr) { _query_thread_context = {load_tid, query_context->query_mem_tracker, query_context->workload_group()}; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 4f24824ac70547b..34d9be9bcb2b4da 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -299,7 +299,7 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt } std::weak_ptr RuntimeState::get_query_ctx_weak() { - return _exec_env->fragment_mgr()->get_or_erase_query_ctx_with_lock(_query_ctx->query_id()); + return _exec_env->fragment_mgr()->get_query_ctx(_query_ctx->query_id()); } void RuntimeState::init_mem_trackers(const std::string& name, const TUniqueId& id) {