From b5bdb0dbb2c6098f0e93555e0c8aac31f7f58e42 Mon Sep 17 00:00:00 2001 From: "zihe.liu" Date: Fri, 20 Sep 2024 10:07:48 +0800 Subject: [PATCH] [BugFix] Capture resource group for scan task (#51121) Signed-off-by: zihe.liu (cherry picked from commit 3317f49811a181e4697041e5359484549cb4271b) --- .../pipeline/exchange/mem_limited_chunk_queue.cpp | 4 ++-- .../hashjoin/spillable_hash_join_probe_operator.cpp | 4 ++-- be/src/exec/pipeline/scan/scan_operator.cpp | 2 +- be/src/exec/spill/executor.h | 4 ++-- be/src/exec/spill/spiller.hpp | 7 +++---- be/src/exec/workgroup/scan_task_queue.cpp | 6 +++--- be/src/exec/workgroup/scan_task_queue.h | 13 +++++++------ be/src/udf/java/utils.cpp | 2 +- 8 files changed, 21 insertions(+), 21 deletions(-) diff --git a/be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp b/be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp index 109a12a0f6af1..b14de6744aa66 100644 --- a/be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp +++ b/be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp @@ -479,7 +479,7 @@ Status MemLimitedChunkQueue::_submit_flush_task() { } }; - auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup().get(), std::move(flush_task)); + auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup(), std::move(flush_task)); RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task))); return Status::OK(); } @@ -551,7 +551,7 @@ Status MemLimitedChunkQueue::_submit_load_task(Block* block) { _update_io_task_status(status); } }; - auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup().get(), std::move(load_task)); + auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup(), std::move(load_task)); RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task))); return Status::OK(); } diff --git a/be/src/exec/pipeline/hashjoin/spillable_hash_join_probe_operator.cpp b/be/src/exec/pipeline/hashjoin/spillable_hash_join_probe_operator.cpp index 7238d77940814..bf6bace124eef 100644 --- a/be/src/exec/pipeline/hashjoin/spillable_hash_join_probe_operator.cpp +++ b/be/src/exec/pipeline/hashjoin/spillable_hash_join_probe_operator.cpp @@ -315,8 +315,8 @@ Status SpillableHashJoinProbeOperator::_load_all_partition_build_side(RuntimeSta } }; auto yield_func = [&](workgroup::ScanTask&& task) { spill::IOTaskExecutor::force_submit(std::move(task)); }; - auto io_task = workgroup::ScanTask(_join_builder->spiller()->options().wg.get(), std::move(task), - std::move(yield_func)); + auto io_task = + workgroup::ScanTask(_join_builder->spiller()->options().wg, std::move(task), std::move(yield_func)); RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task))); } return Status::OK(); diff --git a/be/src/exec/pipeline/scan/scan_operator.cpp b/be/src/exec/pipeline/scan/scan_operator.cpp index f0498a31530dc..c31562b67e427 100644 --- a/be/src/exec/pipeline/scan/scan_operator.cpp +++ b/be/src/exec/pipeline/scan/scan_operator.cpp @@ -395,7 +395,7 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in int32_t driver_id = CurrentThread::current().get_driver_id(); workgroup::ScanTask task; - task.workgroup = _workgroup.get(); + task.workgroup = _workgroup; // TODO: consider more factors, such as scan bytes and i/o time. task.priority = OlapScanNode::compute_priority(_submit_task_counter->value()); task.task_group = down_cast(_factory)->scan_task_group(); diff --git a/be/src/exec/spill/executor.h b/be/src/exec/spill/executor.h index fd48d44fba263..5ca23b5415f17 100644 --- a/be/src/exec/spill/executor.h +++ b/be/src/exec/spill/executor.h @@ -104,7 +104,7 @@ struct IOTaskExecutor { auto io_ctx = std::any_cast(task_ctx.task_context_data); use_local_io_executor = io_ctx->use_local_io_executor; } - auto* pool = get_executor(task.workgroup, use_local_io_executor); + auto* pool = get_executor(task.workgroup.get(), use_local_io_executor); if (pool->submit(std::move(task))) { return Status::OK(); } else { @@ -114,7 +114,7 @@ struct IOTaskExecutor { static void force_submit(workgroup::ScanTask task) { const auto& task_ctx = task.get_work_context(); auto io_ctx = std::any_cast(task_ctx.task_context_data); - auto* pool = get_executor(task.workgroup, io_ctx->use_local_io_executor); + auto* pool = get_executor(task.workgroup.get(), io_ctx->use_local_io_executor); pool->force_submit(std::move(task)); } diff --git a/be/src/exec/spill/spiller.hpp b/be/src/exec/spill/spiller.hpp index 1ce11a67404c0..ab7695183c4b4 100644 --- a/be/src/exec/spill/spiller.hpp +++ b/be/src/exec/spill/spiller.hpp @@ -187,7 +187,7 @@ Status RawSpillerWriter::flush(RuntimeState* state, MemGuard&& guard) { }; auto yield_func = [&](workgroup::ScanTask&& task) { TaskExecutor::force_submit(std::move(task)); }; - auto io_task = workgroup::ScanTask(_spiller->options().wg.get(), std::move(task), std::move(yield_func)); + auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(task), std::move(yield_func)); RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task))); COUNTER_UPDATE(_spiller->metrics().flush_io_task_count, 1); COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_flush_tasks); @@ -255,8 +255,7 @@ Status SpillerReader::trigger_restore(RuntimeState* state, MemGuard&& guard) { auto ctx = std::any_cast(task.get_work_context().task_context_data); TaskExecutor::force_submit(std::move(task)); }; - auto io_task = - workgroup::ScanTask(_spiller->options().wg.get(), std::move(restore_task), std::move(yield_func)); + auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(restore_task), std::move(yield_func)); RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task))); COUNTER_UPDATE(_spiller->metrics().restore_io_task_count, 1); COUNTER_SET(_spiller->metrics().peak_restore_io_task_count, _running_restore_tasks); @@ -347,7 +346,7 @@ Status PartitionedSpillerWriter::flush(RuntimeState* state, bool is_final_flush, return Status::OK(); }; auto yield_func = [&](workgroup::ScanTask&& task) { TaskExecutor::force_submit(std::move(task)); }; - auto io_task = workgroup::ScanTask(_spiller->options().wg.get(), std::move(task), std::move(yield_func)); + auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(task), std::move(yield_func)); RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task))); COUNTER_UPDATE(_spiller->metrics().flush_io_task_count, 1); COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_flush_tasks); diff --git a/be/src/exec/workgroup/scan_task_queue.cpp b/be/src/exec/workgroup/scan_task_queue.cpp index 7cb5d5950ede5..74cf6dbb6c560 100644 --- a/be/src/exec/workgroup/scan_task_queue.cpp +++ b/be/src/exec/workgroup/scan_task_queue.cpp @@ -117,7 +117,7 @@ bool WorkGroupScanTaskQueue::try_offer(ScanTask task) { task.peak_scan_task_queue_size_counter->set(_num_tasks); } - auto* wg_entity = _sched_entity(task.workgroup); + auto* wg_entity = _sched_entity(task.workgroup.get()); wg_entity->set_in_queue(this); RETURN_IF_UNLIKELY(!wg_entity->queue()->try_offer(std::move(task)), false); @@ -137,7 +137,7 @@ void WorkGroupScanTaskQueue::force_put(ScanTask task) { task.peak_scan_task_queue_size_counter->set(_num_tasks); } - auto* wg_entity = _sched_entity(task.workgroup); + auto* wg_entity = _sched_entity(task.workgroup.get()); wg_entity->set_in_queue(this); wg_entity->queue()->force_put(std::move(task)); @@ -151,7 +151,7 @@ void WorkGroupScanTaskQueue::force_put(ScanTask task) { void WorkGroupScanTaskQueue::update_statistics(ScanTask& task, int64_t runtime_ns) { std::lock_guard lock(_global_mutex); - auto* wg = task.workgroup; + auto* wg = task.workgroup.get(); auto* wg_entity = _sched_entity(wg); // Update sched entity information. diff --git a/be/src/exec/workgroup/scan_task_queue.h b/be/src/exec/workgroup/scan_task_queue.h index c4a47fff13a10..46785fe6b3740 100644 --- a/be/src/exec/workgroup/scan_task_queue.h +++ b/be/src/exec/workgroup/scan_task_queue.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "common/statusor.h" #include "exec/workgroup/work_group_fwd.h" @@ -69,11 +70,11 @@ struct ScanTask { using YieldFunction = std::function; ScanTask() : ScanTask(nullptr, nullptr) {} - explicit ScanTask(WorkFunction work_function) : workgroup(nullptr), work_function(std::move(work_function)) {} - ScanTask(WorkGroup* workgroup, WorkFunction work_function) - : workgroup(workgroup), work_function(std::move(work_function)) {} - ScanTask(WorkGroup* workgroup, WorkFunction work_function, YieldFunction yield_function) - : workgroup(workgroup), + explicit ScanTask(WorkFunction work_function) : ScanTask(nullptr, std::move(work_function)) {} + ScanTask(WorkGroupPtr workgroup, WorkFunction work_function) + : workgroup(std::move(workgroup)), work_function(std::move(work_function)) {} + ScanTask(WorkGroupPtr workgroup, WorkFunction work_function, YieldFunction yield_function) + : workgroup(std::move(workgroup)), work_function(std::move(work_function)), yield_function(std::move(yield_function)) {} ~ScanTask() = default; @@ -103,7 +104,7 @@ struct ScanTask { const YieldContext& get_work_context() const { return work_context; } public: - WorkGroup* workgroup; + WorkGroupPtr workgroup; YieldContext work_context; WorkFunction work_function; YieldFunction yield_function; diff --git a/be/src/udf/java/utils.cpp b/be/src/udf/java/utils.cpp index 95cd4e41f2fb5..4869d5cd2eadf 100644 --- a/be/src/udf/java/utils.cpp +++ b/be/src/udf/java/utils.cpp @@ -51,7 +51,7 @@ PromiseStatusPtr call_hdfs_scan_function_in_pthread(const std::function(); if (bthread_self()) { ExecEnv::GetInstance()->connector_scan_executor()->submit(workgroup::ScanTask( - ExecEnv::GetInstance()->workgroup_manager()->get_default_workgroup().get(), + ExecEnv::GetInstance()->workgroup_manager()->get_default_workgroup(), [promise = ms.get(), func](workgroup::YieldContext&) { promise->set_value(func()); })); } else { ms->set_value(func());