Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] support rf in partitioned hash join #45637

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -857,14 +857,6 @@ class OperatorXBase : public OperatorBase {
return (_child and !is_source()) ? _child->revocable_mem_size(state) : 0;
}

Status revoke_memory(RuntimeState* state,
const std::shared_ptr<SpillContext>& spill_context) override {
if (_child and !is_source()) {
return _child->revoke_memory(state, spill_context);
}
return Status::OK();
}

// If this method is not overwrite by child, its default value is 1MB
[[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) {
return state->minimum_operator_memory_required_bytes();
Expand Down
55 changes: 16 additions & 39 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
return Status::OK();
}

Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) {
Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state) {
auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
auto query_id = state->query_id();

Expand Down Expand Up @@ -208,7 +207,9 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
std::numeric_limits<size_t>::max(), _runtime_profile.get()));
}

auto merged_block = vectorized::MutableBlock::create_unique(blocks[0].clone_empty());
auto merged_block = vectorized::MutableBlock::create_unique(std::move(blocks.back()));
blocks.pop_back();

while (!blocks.empty() && !state->is_cancelled()) {
auto block = std::move(blocks.back());
blocks.pop_back();
Expand All @@ -218,17 +219,9 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
return Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_hash_join_probe spill_probe_blocks failed");
});

if (merged_block->allocated_bytes() >=
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
COUNTER_UPDATE(_spill_probe_rows, merged_block->rows());
RETURN_IF_ERROR(
spilling_stream->spill_block(state, merged_block->to_block(), false));
COUNTER_UPDATE(_spill_probe_blocks, 1);
}
}

if (!merged_block->empty()) {
if (!merged_block->empty()) [[likely]] {
COUNTER_UPDATE(_spill_probe_rows, merged_block->rows());
RETURN_IF_ERROR(
spilling_stream->spill_block(state, merged_block->to_block(), false));
Expand Down Expand Up @@ -256,18 +249,15 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
return status;
};

if (spill_context) {
spill_context->on_non_sink_task_started();
}
_spill_dependency->block();
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func", {
return Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_hash_join_probe spill_probe_blocks submit_func failed");
});

auto spill_runnable = std::make_shared<SpillNonSinkRunnable>(
state, spill_context, _spill_dependency, _runtime_profile.get(),
_shared_state->shared_from_this(), exception_catch_func);
state, _spill_dependency, _runtime_profile.get(), _shared_state->shared_from_this(),
exception_catch_func);
return spill_io_pool->submit(std::move(spill_runnable));
}

Expand Down Expand Up @@ -856,27 +846,6 @@ size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta
return size_to_reserve;
}

Status PartitionedHashJoinProbeOperatorX::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) {
auto& local_state = get_local_state(state);
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id()
<< ", task: " << state->task_id() << ", child eos: " << local_state._child_eos;

if (local_state._child_eos) {
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id()
<< ", task: " << state->task_id() << ", child eos: " << local_state._child_eos
<< ", will not revoke size: " << revocable_mem_size(state);
return Status::OK();
}

RETURN_IF_ERROR(local_state.spill_probe_blocks(state, spill_context));

if (_child) {
return _child->revoke_memory(state, spill_context);
}
return Status::OK();
}

Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
auto& local_state = get_local_state(state);
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id()
Expand All @@ -891,7 +860,15 @@ bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* stat
if (local_state._shared_state->need_to_spill) {
const auto revocable_size = _revocable_mem_size(state);
const auto min_revocable_size = state->min_revocable_mem();
return revocable_size > min_revocable_size;

if (state->get_query_ctx()->low_memory_mode()) {
return revocable_size >
std::min<int64_t>(min_revocable_size,
static_cast<int64_t>(
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM));
} else {
return vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
}
}
return false;
}
Expand Down
6 changes: 1 addition & 5 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ class PartitionedHashJoinProbeLocalState final
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;

Status spill_probe_blocks(RuntimeState* state,
const std::shared_ptr<SpillContext>& spill_context = nullptr);
Status spill_probe_blocks(RuntimeState* state);

Status recover_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index,
bool& has_data);
Expand Down Expand Up @@ -181,9 +180,6 @@ class PartitionedHashJoinProbeOperatorX final
return _inner_probe_operator->require_data_distribution();
}

Status revoke_memory(RuntimeState* state,
const std::shared_ptr<SpillContext>& spill_context) override;

private:
Status _revoke_memory(RuntimeState* state);

Expand Down
39 changes: 7 additions & 32 deletions be/src/pipeline/exec/spill_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,36 +50,18 @@ struct SpillContext {

~SpillContext() {
if (running_tasks_count.load() != 0) {
LOG_EVERY_T(WARNING, 60) << "Query: " << print_id(query_id)
<< " not all spill tasks finished, remaining tasks: "
<< running_tasks_count.load();
}

if (_running_non_sink_tasks_count.load() != 0) {
LOG_EVERY_T(WARNING, 60)
<< "Query: " << print_id(query_id)
<< " not all spill tasks(non sink tasks) finished, remaining tasks: "
<< _running_non_sink_tasks_count.load();
LOG(WARNING) << "Query: " << print_id(query_id)
<< " not all spill tasks finished, remaining tasks: "
<< running_tasks_count.load();
}
}

void on_task_finished() {
auto count = running_tasks_count.fetch_sub(1);
if (count == 1 && _running_non_sink_tasks_count.load() == 0) {
all_tasks_finished_callback(this);
}
}

void on_non_sink_task_started() { _running_non_sink_tasks_count.fetch_add(1); }
void on_non_sink_task_finished() {
const auto count = _running_non_sink_tasks_count.fetch_sub(1);
if (count == 1 && running_tasks_count.load() == 0) {
if (count == 1) {
all_tasks_finished_callback(this);
}
}

private:
std::atomic_int _running_non_sink_tasks_count {0};
};

class SpillRunnable : public Runnable {
Expand Down Expand Up @@ -233,20 +215,13 @@ class SpillSinkRunnable : public SpillRunnable {

class SpillNonSinkRunnable : public SpillRunnable {
public:
SpillNonSinkRunnable(RuntimeState* state, std::shared_ptr<SpillContext> spill_context,
std::shared_ptr<Dependency> spill_dependency, RuntimeProfile* profile,
SpillNonSinkRunnable(RuntimeState* state, std::shared_ptr<Dependency> spill_dependency,
RuntimeProfile* profile,
const std::shared_ptr<BasicSpillSharedState>& shared_state,
std::function<Status()> spill_exec_func,
std::function<Status()> spill_fin_cb = {})
: SpillRunnable(state, spill_context, spill_dependency, profile, shared_state, true,
: SpillRunnable(state, nullptr, spill_dependency, profile, shared_state, true,
spill_exec_func, spill_fin_cb) {}

protected:
void _on_task_finished() override {
if (_spill_context) {
_spill_context->on_non_sink_task_finished();
}
}
};

class SpillRecoverRunnable : public SpillRunnable {
Expand Down
4 changes: 1 addition & 3 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ size_t PipelineTask::get_revocable_size() const {
return 0;
}

return _sink->revocable_mem_size(_state) + _root->revocable_mem_size(_state);
return _sink->revocable_mem_size(_state);
}

Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& spill_context) {
Expand All @@ -632,8 +632,6 @@ Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& spill_co
return Status::OK();
}

RETURN_IF_ERROR(_root->revoke_memory(_state, spill_context));

const auto revocable_size = _sink->revocable_mem_size(_state);
if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context));
Expand Down
Loading