Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Capture resource group for scan task (backport #51121) #51184

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ Status MemLimitedChunkQueue::_submit_flush_task() {
}
};

auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup().get(), std::move(flush_task));
auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup(), std::move(flush_task));
RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task)));
return Status::OK();
}
Expand Down Expand Up @@ -551,7 +551,7 @@ Status MemLimitedChunkQueue::_submit_load_task(Block* block) {
_update_io_task_status(status);
}
};
auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup().get(), std::move(load_task));
auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup(), std::move(load_task));
RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task)));
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ Status SpillableHashJoinProbeOperator::_load_all_partition_build_side(RuntimeSta
}
};
auto yield_func = [&](workgroup::ScanTask&& task) { spill::IOTaskExecutor::force_submit(std::move(task)); };
auto io_task = workgroup::ScanTask(_join_builder->spiller()->options().wg.get(), std::move(task),
std::move(yield_func));
auto io_task =
workgroup::ScanTask(_join_builder->spiller()->options().wg, std::move(task), std::move(yield_func));
RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task)));
}
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in
int32_t driver_id = CurrentThread::current().get_driver_id();

workgroup::ScanTask task;
task.workgroup = _workgroup.get();
task.workgroup = _workgroup;
// TODO: consider more factors, such as scan bytes and i/o time.
task.priority = OlapScanNode::compute_priority(_submit_task_counter->value());
task.task_group = down_cast<const ScanOperatorFactory*>(_factory)->scan_task_group();
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/spill/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct IOTaskExecutor {
auto io_ctx = std::any_cast<SpillIOTaskContextPtr>(task_ctx.task_context_data);
use_local_io_executor = io_ctx->use_local_io_executor;
}
auto* pool = get_executor(task.workgroup, use_local_io_executor);
auto* pool = get_executor(task.workgroup.get(), use_local_io_executor);
if (pool->submit(std::move(task))) {
return Status::OK();
} else {
Expand All @@ -114,7 +114,7 @@ struct IOTaskExecutor {
static void force_submit(workgroup::ScanTask task) {
const auto& task_ctx = task.get_work_context();
auto io_ctx = std::any_cast<SpillIOTaskContextPtr>(task_ctx.task_context_data);
auto* pool = get_executor(task.workgroup, io_ctx->use_local_io_executor);
auto* pool = get_executor(task.workgroup.get(), io_ctx->use_local_io_executor);
pool->force_submit(std::move(task));
}

Expand Down
7 changes: 3 additions & 4 deletions be/src/exec/spill/spiller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ Status RawSpillerWriter::flush(RuntimeState* state, MemGuard&& guard) {
};

auto yield_func = [&](workgroup::ScanTask&& task) { TaskExecutor::force_submit(std::move(task)); };
auto io_task = workgroup::ScanTask(_spiller->options().wg.get(), std::move(task), std::move(yield_func));
auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(task), std::move(yield_func));
RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task)));
COUNTER_UPDATE(_spiller->metrics().flush_io_task_count, 1);
COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_flush_tasks);
Expand Down Expand Up @@ -255,8 +255,7 @@ Status SpillerReader::trigger_restore(RuntimeState* state, MemGuard&& guard) {
auto ctx = std::any_cast<SpillIOTaskContextPtr>(task.get_work_context().task_context_data);
TaskExecutor::force_submit(std::move(task));
};
auto io_task =
workgroup::ScanTask(_spiller->options().wg.get(), std::move(restore_task), std::move(yield_func));
auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(restore_task), std::move(yield_func));
RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task)));
COUNTER_UPDATE(_spiller->metrics().restore_io_task_count, 1);
COUNTER_SET(_spiller->metrics().peak_restore_io_task_count, _running_restore_tasks);
Expand Down Expand Up @@ -347,7 +346,7 @@ Status PartitionedSpillerWriter::flush(RuntimeState* state, bool is_final_flush,
return Status::OK();
};
auto yield_func = [&](workgroup::ScanTask&& task) { TaskExecutor::force_submit(std::move(task)); };
auto io_task = workgroup::ScanTask(_spiller->options().wg.get(), std::move(task), std::move(yield_func));
auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(task), std::move(yield_func));
RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task)));
COUNTER_UPDATE(_spiller->metrics().flush_io_task_count, 1);
COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_flush_tasks);
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/workgroup/scan_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ bool WorkGroupScanTaskQueue::try_offer(ScanTask task) {
task.peak_scan_task_queue_size_counter->set(_num_tasks);
}

auto* wg_entity = _sched_entity(task.workgroup);
auto* wg_entity = _sched_entity(task.workgroup.get());
wg_entity->set_in_queue(this);
RETURN_IF_UNLIKELY(!wg_entity->queue()->try_offer(std::move(task)), false);

Expand All @@ -137,7 +137,7 @@ void WorkGroupScanTaskQueue::force_put(ScanTask task) {
task.peak_scan_task_queue_size_counter->set(_num_tasks);
}

auto* wg_entity = _sched_entity(task.workgroup);
auto* wg_entity = _sched_entity(task.workgroup.get());
wg_entity->set_in_queue(this);
wg_entity->queue()->force_put(std::move(task));

Expand All @@ -151,7 +151,7 @@ void WorkGroupScanTaskQueue::force_put(ScanTask task) {

void WorkGroupScanTaskQueue::update_statistics(ScanTask& task, int64_t runtime_ns) {
std::lock_guard<std::mutex> lock(_global_mutex);
auto* wg = task.workgroup;
auto* wg = task.workgroup.get();
auto* wg_entity = _sched_entity(wg);

// Update sched entity information.
Expand Down
13 changes: 7 additions & 6 deletions be/src/exec/workgroup/scan_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <queue>
#include <set>
#include <unordered_set>
#include <utility>

#include "common/statusor.h"
#include "exec/workgroup/work_group_fwd.h"
Expand Down Expand Up @@ -69,11 +70,11 @@ struct ScanTask {
using YieldFunction = std::function<void(ScanTask&&)>;

ScanTask() : ScanTask(nullptr, nullptr) {}
explicit ScanTask(WorkFunction work_function) : workgroup(nullptr), work_function(std::move(work_function)) {}
ScanTask(WorkGroup* workgroup, WorkFunction work_function)
: workgroup(workgroup), work_function(std::move(work_function)) {}
ScanTask(WorkGroup* workgroup, WorkFunction work_function, YieldFunction yield_function)
: workgroup(workgroup),
explicit ScanTask(WorkFunction work_function) : ScanTask(nullptr, std::move(work_function)) {}
ScanTask(WorkGroupPtr workgroup, WorkFunction work_function)
: workgroup(std::move(workgroup)), work_function(std::move(work_function)) {}
ScanTask(WorkGroupPtr workgroup, WorkFunction work_function, YieldFunction yield_function)
: workgroup(std::move(workgroup)),
work_function(std::move(work_function)),
yield_function(std::move(yield_function)) {}
~ScanTask() = default;
Expand Down Expand Up @@ -103,7 +104,7 @@ struct ScanTask {
const YieldContext& get_work_context() const { return work_context; }

public:
WorkGroup* workgroup;
WorkGroupPtr workgroup;
YieldContext work_context;
WorkFunction work_function;
YieldFunction yield_function;
Expand Down
2 changes: 1 addition & 1 deletion be/src/udf/java/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ PromiseStatusPtr call_hdfs_scan_function_in_pthread(const std::function<Status()
PromiseStatusPtr ms = std::make_unique<PromiseStatus>();
if (bthread_self()) {
ExecEnv::GetInstance()->connector_scan_executor()->submit(workgroup::ScanTask(
ExecEnv::GetInstance()->workgroup_manager()->get_default_workgroup().get(),
ExecEnv::GetInstance()->workgroup_manager()->get_default_workgroup(),
[promise = ms.get(), func](workgroup::YieldContext&) { promise->set_value(func()); }));
} else {
ms->set_value(func());
Expand Down
Loading