Skip to content

Commit

Permalink
[refactor](pipelineX) refine _build_side_pipelines (apache#25871)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored Oct 26, 2023
1 parent 78165a3 commit ad1313c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 35 deletions.
38 changes: 8 additions & 30 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i].get()});
}
}
_build_side_pipelines.clear();
_union_child_pipelines.clear();
_set_child_pipelines.clear();
_pipeline_parent_map.clear();
_dag.clear();
_op_id_to_le_state.clear();

Expand Down Expand Up @@ -589,16 +587,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
const DescriptorTbl& descs, OperatorXPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx) {
if (_build_side_pipelines.find(parent_idx) != _build_side_pipelines.end() && child_idx > 0) {
cur_pipe = _build_side_pipelines[parent_idx];
}
if (_union_child_pipelines.find(parent_idx) != _union_child_pipelines.end()) {
cur_pipe = _union_child_pipelines[parent_idx][child_idx];
}
if (_set_child_pipelines.find(parent_idx) != _set_child_pipelines.end()) {
cur_pipe = _set_child_pipelines[parent_idx][child_idx];
}

_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
std::stringstream error_msg;
switch (tnode.node_type) {
case TPlanNodeType::OLAP_SCAN_NODE: {
Expand Down Expand Up @@ -704,7 +693,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get()));
_build_side_pipelines.insert({sink->node_id(), build_side_pipe});
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
break;
}
case TPlanNodeType::CROSS_JOIN_NODE: {
Expand All @@ -723,7 +713,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get()));
_build_side_pipelines.insert({sink->node_id(), build_side_pipe});
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
break;
}
case TPlanNodeType::UNION_NODE: {
Expand All @@ -735,7 +726,6 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
int father_id = tnode.node_id;
for (int i = 0; i < child_count; i++) {
PipelinePtr build_side_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
Expand All @@ -745,13 +735,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get()));
// preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
if (_union_child_pipelines.find(father_id) == _union_child_pipelines.end()) {
_union_child_pipelines.insert({father_id, {build_side_pipe}});
} else {
_union_child_pipelines[father_id].push_back(build_side_pipe);
}
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
}

break;
}
case TPlanNodeType::SORT_NODE: {
Expand Down Expand Up @@ -877,8 +862,6 @@ Status PipelineXFragmentContext::_build_operators_for_set_operation_node(
_dag.insert({downstream_pipeline_id, {}});
}

int parent_id = tnode.node_id;

for (int child_id = 0; child_id < tnode.num_children; child_id++) {
PipelinePtr probe_side_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
Expand All @@ -895,12 +878,7 @@ Status PipelineXFragmentContext::_build_operators_for_set_operation_node(
RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
RETURN_IF_ERROR(probe_side_pipe->sink_x()->init(tnode, _runtime_state.get()));
// prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
if (child_id == 0) {
DCHECK(_set_child_pipelines.find(parent_id) == _set_child_pipelines.end());
_set_child_pipelines.insert({parent_id, {probe_side_pipe}});
} else {
_set_child_pipelines[parent_id].push_back(probe_side_pipe);
}
_pipeline_parent_map.push(op->node_id(), probe_side_pipe);
}

return Status::OK();
Expand Down
25 changes: 20 additions & 5 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,29 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
// build probe operator and build operator in separate pipelines. To do this, we should build
// ProbeSide first, and use `_pipelines_to_build` to store which pipeline the build operator
// is in, so we can build BuildSide once we complete probe side.
std::map<int, PipelinePtr> _build_side_pipelines;
struct pipeline_parent_map {
std::map<int, std::vector<PipelinePtr>> _build_side_pipelines;
void push(int parent_node_id, PipelinePtr pipeline) {
if (!_build_side_pipelines.contains(parent_node_id)) {
_build_side_pipelines.insert({parent_node_id, {pipeline}});
} else {
_build_side_pipelines[parent_node_id].push_back(pipeline);
}
}
void pop(PipelinePtr& cur_pipe, int parent_node_id, int child_idx) {
if (!_build_side_pipelines.contains(parent_node_id)) {
return;
}
DCHECK(_build_side_pipelines.contains(parent_node_id));
auto& child_pipeline = _build_side_pipelines[parent_node_id];
DCHECK(child_idx < child_pipeline.size());
cur_pipe = child_pipeline[child_idx];
}
void clear() { _build_side_pipelines.clear(); }
} _pipeline_parent_map;

std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
std::mutex _state_map_lock;

// TODO: Unify `_union_child_pipelines`, `_set_child_pipelines`, `_build_side_pipelines`.
std::map<int, std::vector<PipelinePtr>> _union_child_pipelines;
std::map<int, std::vector<PipelinePtr>> _set_child_pipelines;
// We can guarantee that a plan node ID can correspond to an operator ID,
// but some operators do not have a corresponding plan node ID.
// We set these IDs as negative numbers, which are not visible to the user.
Expand Down

0 comments on commit ad1313c

Please sign in to comment.