Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: zihe.liu <[email protected]>
  • Loading branch information
ZiheLiu committed Sep 18, 2024
1 parent b3a6939 commit c258e32
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 2 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/spill/spiller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ Status PartitionedSpillerWriter::flush(RuntimeState* state, bool is_final_flush,
return Status::OK();
};
auto yield_func = [&](workgroup::ScanTask&& task) { TaskExecutor::force_submit(std::move(task)); };
auto io_task = workgroup::ScanTask(_spiller->options().wg.get(), std::move(task), std::move(yield_func));
auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(task), std::move(yield_func));
RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task)));
COUNTER_UPDATE(_spiller->metrics().flush_io_task_count, 1);
COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_flush_tasks);
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/topn_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ std::vector<std::shared_ptr<pipeline::OperatorFactory>> TopNNode::_decompose_to_
ops_sink_with_sort = context->maybe_interpolate_local_passthrough_exchange(
runtime_state(), id(), ops_sink_with_sort, context->degree_of_parallelism(), is_partition_skewed);
}
} else if (is_partition_topn) {
ops_sink_with_sort = context->maybe_interpolate_local_passthrough_exchange(
runtime_state(), id(), ops_sink_with_sort, context->degree_of_parallelism(), is_partition_skewed);
} else {
// prepend local shuffle to PartitionSortSinkOperator
ops_sink_with_sort = context->maybe_interpolate_local_shuffle_exchange(
Expand Down
2 changes: 1 addition & 1 deletion be/src/udf/java/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ PromiseStatusPtr call_hdfs_scan_function_in_pthread(const std::function<Status()
PromiseStatusPtr ms = std::make_unique<PromiseStatus>();
if (bthread_self()) {
ExecEnv::GetInstance()->connector_scan_executor()->submit(workgroup::ScanTask(
ExecEnv::GetInstance()->workgroup_manager()->get_default_workgroup().get(),
ExecEnv::GetInstance()->workgroup_manager()->get_default_workgroup(),
[promise = ms.get(), func](workgroup::YieldContext&) { promise->set_value(func()); }));
} else {
ms->set_value(func());
Expand Down

0 comments on commit c258e32

Please sign in to comment.