From ad1313cce67e6342f253bed0da6fe0ca42197492 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Thu, 26 Oct 2023 10:32:23 +0800 Subject: [PATCH] [refactor](pipelineX) refine _build_side_pipelines (#25871) --- .../pipeline_x_fragment_context.cpp | 38 ++++--------------- .../pipeline_x/pipeline_x_fragment_context.h | 25 +++++++++--- 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index a1f69e6f02b0bb..78a888af9dfb9a 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -473,9 +473,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _runtime_states[i].get()}); } } - _build_side_pipelines.clear(); - _union_child_pipelines.clear(); - _set_child_pipelines.clear(); + _pipeline_parent_map.clear(); _dag.clear(); _op_id_to_le_state.clear(); @@ -589,16 +587,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx) { - if (_build_side_pipelines.find(parent_idx) != _build_side_pipelines.end() && child_idx > 0) { - cur_pipe = _build_side_pipelines[parent_idx]; - } - if (_union_child_pipelines.find(parent_idx) != _union_child_pipelines.end()) { - cur_pipe = _union_child_pipelines[parent_idx][child_idx]; - } - if (_set_child_pipelines.find(parent_idx) != _set_child_pipelines.end()) { - cur_pipe = _set_child_pipelines[parent_idx][child_idx]; - } - + _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); std::stringstream error_msg; switch (tnode.node_type) { case TPlanNodeType::OLAP_SCAN_NODE: { @@ -704,7 +693,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); - _build_side_pipelines.insert({sink->node_id(), build_side_pipe}); + _pipeline_parent_map.push(op->node_id(), cur_pipe); + _pipeline_parent_map.push(op->node_id(), build_side_pipe); break; } case TPlanNodeType::CROSS_JOIN_NODE: { @@ -723,7 +713,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); - _build_side_pipelines.insert({sink->node_id(), build_side_pipe}); + _pipeline_parent_map.push(op->node_id(), cur_pipe); + _pipeline_parent_map.push(op->node_id(), build_side_pipe); break; } case TPlanNodeType::UNION_NODE: { @@ -735,7 +726,6 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN if (_dag.find(downstream_pipeline_id) == _dag.end()) { _dag.insert({downstream_pipeline_id, {}}); } - int father_id = tnode.node_id; for (int i = 0; i < child_count; i++) { PipelinePtr build_side_pipe = add_pipeline(); _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); @@ -745,13 +735,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build. - if (_union_child_pipelines.find(father_id) == _union_child_pipelines.end()) { - _union_child_pipelines.insert({father_id, {build_side_pipe}}); - } else { - _union_child_pipelines[father_id].push_back(build_side_pipe); - } + _pipeline_parent_map.push(op->node_id(), build_side_pipe); } - break; } case TPlanNodeType::SORT_NODE: { @@ -877,8 +862,6 @@ Status PipelineXFragmentContext::_build_operators_for_set_operation_node( _dag.insert({downstream_pipeline_id, {}}); } - int parent_id = tnode.node_id; - for (int child_id = 0; child_id < tnode.num_children; child_id++) { PipelinePtr probe_side_pipe = add_pipeline(); _dag[downstream_pipeline_id].push_back(probe_side_pipe->id()); @@ -895,12 +878,7 @@ Status PipelineXFragmentContext::_build_operators_for_set_operation_node( RETURN_IF_ERROR(probe_side_pipe->set_sink(sink)); RETURN_IF_ERROR(probe_side_pipe->sink_x()->init(tnode, _runtime_state.get())); // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build. - if (child_id == 0) { - DCHECK(_set_child_pipelines.find(parent_id) == _set_child_pipelines.end()); - _set_child_pipelines.insert({parent_id, {probe_side_pipe}}); - } else { - _set_child_pipelines[parent_id].push_back(probe_side_pipe); - } + _pipeline_parent_map.push(op->node_id(), probe_side_pipe); } return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 45eb7f48cc533f..4d2a59277e9e04 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -175,14 +175,29 @@ class PipelineXFragmentContext : public PipelineFragmentContext { // build probe operator and build operator in separate pipelines. To do this, we should build // ProbeSide first, and use `_pipelines_to_build` to store which pipeline the build operator // is in, so we can build BuildSide once we complete probe side. - std::map _build_side_pipelines; + struct pipeline_parent_map { + std::map> _build_side_pipelines; + void push(int parent_node_id, PipelinePtr pipeline) { + if (!_build_side_pipelines.contains(parent_node_id)) { + _build_side_pipelines.insert({parent_node_id, {pipeline}}); + } else { + _build_side_pipelines[parent_node_id].push_back(pipeline); + } + } + void pop(PipelinePtr& cur_pipe, int parent_node_id, int child_idx) { + if (!_build_side_pipelines.contains(parent_node_id)) { + return; + } + DCHECK(_build_side_pipelines.contains(parent_node_id)); + auto& child_pipeline = _build_side_pipelines[parent_node_id]; + DCHECK(child_idx < child_pipeline.size()); + cur_pipe = child_pipeline[child_idx]; + } + void clear() { _build_side_pipelines.clear(); } + } _pipeline_parent_map; std::map _instance_id_to_runtime_state; std::mutex _state_map_lock; - - // TODO: Unify `_union_child_pipelines`, `_set_child_pipelines`, `_build_side_pipelines`. - std::map> _union_child_pipelines; - std::map> _set_child_pipelines; // We can guarantee that a plan node ID can correspond to an operator ID, // but some operators do not have a corresponding plan node ID. // We set these IDs as negative numbers, which are not visible to the user.