Skip to content

Commit

Permalink
[BugFix] Capture resource group for scan task (#51121)
Browse files Browse the repository at this point in the history
Signed-off-by: zihe.liu <[email protected]>
(cherry picked from commit 3317f49)

# Conflicts:
#	be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp
#	be/src/exec/pipeline/hashjoin/spillable_hash_join_probe_operator.cpp
#	be/src/exec/spill/executor.h
#	be/src/exec/spill/spiller.hpp
#	be/src/exec/workgroup/scan_task_queue.cpp
#	be/src/exec/workgroup/scan_task_queue.h
#	be/src/udf/java/utils.cpp
  • Loading branch information
ZiheLiu authored and mergify[bot] committed Sep 20, 2024
1 parent 2ad8771 commit c30ac3d
Show file tree
Hide file tree
Showing 8 changed files with 668 additions and 3 deletions.
567 changes: 567 additions & 0 deletions be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,14 @@ Status SpillableHashJoinProbeOperator::_load_all_partition_build_side(RuntimeSta
_latch.count_down();
}
};
<<<<<<< HEAD
RETURN_IF_ERROR(_executor->submit(std::move(task)));
=======
auto yield_func = [&](workgroup::ScanTask&& task) { spill::IOTaskExecutor::force_submit(std::move(task)); };
auto io_task =
workgroup::ScanTask(_join_builder->spiller()->options().wg, std::move(task), std::move(yield_func));
RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task)));
>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121))
}
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in
int32_t driver_id = CurrentThread::current().get_driver_id();

workgroup::ScanTask task;
task.workgroup = _workgroup.get();
task.workgroup = _workgroup;
// TODO: consider more factors, such as scan bytes and i/o time.
task.priority = OlapScanNode::compute_priority(_submit_task_counter->value());
task.task_group = down_cast<const ScanOperatorFactory*>(_factory)->scan_task_group();
Expand Down
25 changes: 25 additions & 0 deletions be/src/exec/spill/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct ResourceMemTrackerGuard {
};

struct IOTaskExecutor {
<<<<<<< HEAD
workgroup::ScanExecutor* pool;
workgroup::WorkGroupPtr wg;

Expand All @@ -98,12 +99,36 @@ struct IOTaskExecutor {
template <class Func>
Status submit(Func&& func) {
workgroup::ScanTask task(wg.get(), func);
=======
static Status submit(workgroup::ScanTask task) {
const auto& task_ctx = task.get_work_context();
bool use_local_io_executor = true;
if (task_ctx.task_context_data.has_value()) {
auto io_ctx = std::any_cast<SpillIOTaskContextPtr>(task_ctx.task_context_data);
use_local_io_executor = io_ctx->use_local_io_executor;
}
auto* pool = get_executor(task.workgroup.get(), use_local_io_executor);
>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121))
if (pool->submit(std::move(task))) {
return Status::OK();
} else {
return Status::InternalError("offer task failed");
}
}
<<<<<<< HEAD
=======
static void force_submit(workgroup::ScanTask task) {
const auto& task_ctx = task.get_work_context();
auto io_ctx = std::any_cast<SpillIOTaskContextPtr>(task_ctx.task_context_data);
auto* pool = get_executor(task.workgroup.get(), io_ctx->use_local_io_executor);
pool->force_submit(std::move(task));
}

private:
inline static workgroup::ScanExecutor* get_executor(workgroup::WorkGroup* wg, bool use_local_io_executor) {
return use_local_io_executor ? wg->executors()->scan_executor() : wg->executors()->connector_scan_executor();
}
>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121))
};

struct SyncTaskExecutor {
Expand Down
22 changes: 22 additions & 0 deletions be/src/exec/spill/spiller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,15 @@ Status RawSpillerWriter::flush(RuntimeState* state, TaskExecutor&& executor, Mem
_spiller->update_spilled_task_status(flush_task(state, mem_table));
return Status::OK();
};
<<<<<<< HEAD
// submit io task
RETURN_IF_ERROR(executor.submit(std::move(task)));
=======

auto yield_func = [&](workgroup::ScanTask&& task) { TaskExecutor::force_submit(std::move(task)); };
auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(task), std::move(yield_func));
RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task)));
>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121))
COUNTER_UPDATE(_spiller->metrics().flush_io_task_count, 1);
COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_flush_tasks);
return Status::OK();
Expand Down Expand Up @@ -219,7 +226,16 @@ Status SpillerReader::trigger_restore(RuntimeState* state, TaskExecutor&& execut
};
return Status::OK();
};
<<<<<<< HEAD
RETURN_IF_ERROR(executor.submit(std::move(restore_task)));
=======
auto yield_func = [&](workgroup::ScanTask&& task) {
auto ctx = std::any_cast<SpillIOTaskContextPtr>(task.get_work_context().task_context_data);
TaskExecutor::force_submit(std::move(task));
};
auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(restore_task), std::move(yield_func));
RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task)));
>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121))
COUNTER_UPDATE(_spiller->metrics().restore_io_task_count, 1);
COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_restore_tasks);
}
Expand Down Expand Up @@ -297,8 +313,14 @@ Status PartitionedSpillerWriter::flush(RuntimeState* state, bool is_final_flush,
_spiller->update_spilled_task_status(_flush_task(splitting_partitions, spilling_partitions));
return Status::OK();
};
<<<<<<< HEAD

RETURN_IF_ERROR(executor.submit(std::move(task)));
=======
auto yield_func = [&](workgroup::ScanTask&& task) { TaskExecutor::force_submit(std::move(task)); };
auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(task), std::move(yield_func));
RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task)));
>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121))
COUNTER_UPDATE(_spiller->metrics().flush_io_task_count, 1);
COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_flush_tasks);

Expand Down
26 changes: 24 additions & 2 deletions be/src/exec/workgroup/scan_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ bool WorkGroupScanTaskQueue::try_offer(ScanTask task) {
task.peak_scan_task_queue_size_counter->set(_num_tasks);
}

auto* wg_entity = _sched_entity(task.workgroup);
auto* wg_entity = _sched_entity(task.workgroup.get());
wg_entity->set_in_queue(this);
RETURN_IF_UNLIKELY(!wg_entity->queue()->try_offer(std::move(task)), false);

Expand All @@ -213,9 +213,31 @@ bool WorkGroupScanTaskQueue::try_offer(ScanTask task) {
return true;
}

<<<<<<< HEAD
=======
void WorkGroupScanTaskQueue::force_put(ScanTask task) {
std::lock_guard<std::mutex> lock(_global_mutex);

if (task.peak_scan_task_queue_size_counter != nullptr) {
task.peak_scan_task_queue_size_counter->set(_num_tasks);
}

auto* wg_entity = _sched_entity(task.workgroup.get());
wg_entity->set_in_queue(this);
wg_entity->queue()->force_put(std::move(task));

if (_wg_entities.find(wg_entity) == _wg_entities.end()) {
_enqueue_workgroup(wg_entity);
}

_num_tasks++;
_cv.notify_one();
}

>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121))
void WorkGroupScanTaskQueue::update_statistics(ScanTask& task, int64_t runtime_ns) {
std::lock_guard<std::mutex> lock(_global_mutex);
auto* wg = task.workgroup;
auto* wg = task.workgroup.get();
auto* wg_entity = _sched_entity(wg);

// Update bandwidth control information.
Expand Down
16 changes: 16 additions & 0 deletions be/src/exec/workgroup/scan_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <queue>
#include <set>
#include <unordered_set>
#include <utility>

#include "common/statusor.h"
#include "exec/workgroup/work_group_fwd.h"
Expand All @@ -38,9 +39,19 @@ struct ScanTask {
using WorkFunction = std::function<void()>;

ScanTask() : ScanTask(nullptr, nullptr) {}
<<<<<<< HEAD
explicit ScanTask(WorkFunction work_function) : workgroup(nullptr), work_function(std::move(work_function)) {}
ScanTask(WorkGroup* workgroup, WorkFunction work_function)
: workgroup(workgroup), work_function(std::move(work_function)) {}
=======
explicit ScanTask(WorkFunction work_function) : ScanTask(nullptr, std::move(work_function)) {}
ScanTask(WorkGroupPtr workgroup, WorkFunction work_function)
: workgroup(std::move(workgroup)), work_function(std::move(work_function)) {}
ScanTask(WorkGroupPtr workgroup, WorkFunction work_function, YieldFunction yield_function)
: workgroup(std::move(workgroup)),
work_function(std::move(work_function)),
yield_function(std::move(yield_function)) {}
>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121))
~ScanTask() = default;

DISALLOW_COPY(ScanTask);
Expand All @@ -55,7 +66,12 @@ struct ScanTask {
}

public:
<<<<<<< HEAD
WorkGroup* workgroup;
=======
WorkGroupPtr workgroup;
YieldContext work_context;
>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121))
WorkFunction work_function;
int priority = 0;
std::shared_ptr<ScanTaskGroup> task_group = nullptr;
Expand Down
6 changes: 6 additions & 0 deletions be/src/udf/java/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,15 @@ PromiseStatusPtr call_function_in_pthread(RuntimeState* state, const std::functi
PromiseStatusPtr call_hdfs_scan_function_in_pthread(const std::function<Status()>& func) {
PromiseStatusPtr ms = std::make_unique<PromiseStatus>();
if (bthread_self()) {
<<<<<<< HEAD
ExecEnv::GetInstance()->connector_scan_executor()->submit(
workgroup::ScanTask(workgroup::WorkGroupManager::instance()->get_default_workgroup().get(),
[promise = ms.get(), func]() { promise->set_value(func()); }));
=======
ExecEnv::GetInstance()->connector_scan_executor()->submit(workgroup::ScanTask(
ExecEnv::GetInstance()->workgroup_manager()->get_default_workgroup(),
[promise = ms.get(), func](workgroup::YieldContext&) { promise->set_value(func()); }));
>>>>>>> 3317f49811 ([BugFix] Capture resource group for scan task (#51121))
} else {
ms->set_value(func());
}
Expand Down

0 comments on commit c30ac3d

Please sign in to comment.