diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 2ffd108534598b..ae861f1f6f3035 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -599,6 +599,10 @@ DEFINE_Int32(fragment_mgr_asynic_work_pool_thread_num_min, "16"); DEFINE_Int32(fragment_mgr_asynic_work_pool_thread_num_max, "512"); DEFINE_Int32(fragment_mgr_asynic_work_pool_queue_size, "4096"); +// Fragment thread pool for prepare +DEFINE_Int32(fragment_mgr_prepare_work_pool_thread_num, "16"); +DEFINE_Int32(fragment_mgr_prepare_work_pool_queue_size, "512"); + // Control the number of disks on the machine. If 0, this comes from the system settings. DEFINE_Int32(num_disks, "0"); // The read size is the size of the reads sent to os. diff --git a/be/src/common/config.h b/be/src/common/config.h index 6c84d89fabac50..02d265e95e8dbe 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -637,6 +637,10 @@ DECLARE_Int32(fragment_mgr_asynic_work_pool_thread_num_min); DECLARE_Int32(fragment_mgr_asynic_work_pool_thread_num_max); DECLARE_Int32(fragment_mgr_asynic_work_pool_queue_size); +// Fragment thread pool for prepare +DECLARE_Int32(fragment_mgr_prepare_work_pool_thread_num); +DECLARE_Int32(fragment_mgr_prepare_work_pool_queue_size); + // Control the number of disks on the machine. If 0, this comes from the system settings. DECLARE_Int32(num_disks); // The read size is the size of the reads sent to os. diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 352fca89f8479a..b89705ac1a4fae 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -229,7 +229,7 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { } Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request, - ThreadPool* thread_pool) { + FifoThreadPool* thread_pool_for_prepare) { if (_prepared) { return Status::InternalError("Already prepared"); } @@ -348,7 +348,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re { SCOPED_TIMER(_build_tasks_timer); // 6. Build pipeline tasks and initialize local state. - RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool)); + RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool_for_prepare)); } _init_next_report_time(); @@ -358,7 +358,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re } Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFragmentParams& request, - ThreadPool* thread_pool) { + FifoThreadPool* thread_pool_for_prepare) { _total_tasks = 0; const auto target_size = request.local_params.size(); _tasks.resize(target_size); @@ -524,7 +524,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag std::condition_variable cv; int prepare_done = 0; for (int i = 0; i < target_size; i++) { - RETURN_IF_ERROR(thread_pool->submit_func([&, i]() { + thread_pool_for_prepare->offer([&, i]() { SCOPED_ATTACH_TASK(_query_ctx.get()); prepare_status[i] = pre_and_submit(i, this); std::unique_lock lock(m); @@ -532,7 +532,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag if (prepare_done == target_size) { cv.notify_one(); } - })); + }); } std::unique_lock lock(m); if (prepare_done != target_size) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index bd3a350d0a240a..9db149ab80dcef 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -86,7 +86,8 @@ class PipelineFragmentContext : public TaskExecutionContext { // should be protected by lock? [[nodiscard]] bool is_canceled() const { return _query_ctx->is_cancelled(); } - Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool); + Status prepare(const doris::TPipelineFragmentParams& request, + FifoThreadPool* thread_pool_for_prepare); Status submit(); @@ -168,7 +169,7 @@ class PipelineFragmentContext : public TaskExecutionContext { const std::map& shuffle_idx_to_instance_idx); Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request, - ThreadPool* thread_pool); + FifoThreadPool* thread_pool_for_prepare); void _close_fragment_instance(); void _init_next_report_time(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 788993f4c2afd8..8a9e55e35fde1c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -325,6 +325,11 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) .set_max_threads(config::fragment_mgr_asynic_work_pool_thread_num_max) .set_max_queue_size(config::fragment_mgr_asynic_work_pool_queue_size) .build(&_thread_pool); + + _thread_pool_for_prepare = std::make_unique( + config::fragment_mgr_prepare_work_pool_thread_num, + config::fragment_mgr_prepare_work_pool_queue_size, "for task prepare"); + CHECK(s.ok()) << s.to_string(); } @@ -849,8 +854,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, { SCOPED_RAW_TIMER(&duration_ns); Status prepare_st = Status::OK(); - ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params, _thread_pool.get()), - prepare_st); + ASSIGN_STATUS_IF_CATCH_EXCEPTION( + prepare_st = context->prepare(params, _thread_pool_for_prepare.get()), prepare_st); + if (!prepare_st.ok()) { query_ctx->cancel(prepare_st, params.fragment_id); query_ctx->set_execution_dependency_ready(); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 78f117a15dbcb6..792ddd4b38307e 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -216,6 +216,7 @@ class FragmentMgr : public RestMonitorIface { scoped_refptr _cancel_thread; // every job is a pool std::unique_ptr _thread_pool; + std::unique_ptr _thread_pool_for_prepare; std::shared_ptr _entity; UIntGauge* timeout_canceled_fragment_count = nullptr;