Skip to content

Commit

Permalink
[opt](profile) Operator of same pipeline should have same indent level (
Browse files Browse the repository at this point in the history
#47921)

### What problem does this PR solve?

Operator of same pipeline should have same indent level

Master
```text
MergedProfile
     Fragments:
       Fragment 0:
         Pipeline : 0(instance_num=16):
            - WaitWorkerTime: avg 29.266us, max 40.353us, min 22.437us
           RESULT_SINK_OPERATOR (id=0):
              - CloseTime: avg 126.883us, max 923.120us, min 9.218us
              - ExecTime: avg 391.625us, max 1.224ms, min 173.281us
              ...
              - WaitForDependency[RESULT_SINK_OPERATOR_DEPENDENCY]Time: avg 0ns, max 0ns, min 0ns
             LOCAL_EXCHANGE_OPERATOR (PASSTHROUGH) (id=-1):
                - BlocksProduced: sum 0, avg 0, max 0, min 0
                - CloseTime: avg 0ns, max 0ns, min 0ns
                - ExecTime: avg 29.423us, max 38.958us, min 20.25us
                ...
                - WaitForDependency[LOCAL_EXCHANGE_OPERATOR_DEPENDENCY]Time: avg 33.716ms, max 33.805ms, min 33.531ms
         Pipeline : 1(instance_num=1):
            - WaitWorkerTime: avg 26.452us, max 26.452us, min 26.452us
           LOCAL_EXCHANGE_SINK_OPERATOR (PASSTHROUGH) (id=-1):
              - CloseTime: avg 146.343us, max 146.343us, min 146.343us
              - ExecTime: avg 276.967us, max 276.967us, min 276.967us
              - InitTime: avg 117.846us, max 117.846us, min 117.846us
              - InputRows: sum 0, avg 0, max 0, min 0
              ...
              - WaitForDependency[LOCAL_EXCHANGE_SINK_DEPENDENCY]Time: avg 0ns, max 0ns, min 0ns
             OLAP_SCAN_OPERATOR (id=0. nereids_id=122. table name = test_bloom_filter_hit(test_bloom_filter_hit)):
                - BlocksProduced: sum 0, avg 0, max 0, min 0
                - CloseTime: avg 127.942us, max 127.942us, min 127.942us
                - ExecTime: avg 37.62ms, max 37.62ms, min 37.62ms
                - InitTime: avg 3.267ms, max 3.267ms, min 3.267ms
                ...
                - WaitForDependency[OLAP_SCAN_OPERATOR_DEPENDENCY]Time: avg 20.935ms, max 20.935ms, min 20.935ms
```
After opt
```text
MergedProfile
     Fragments:
       Fragment 0:
         Pipeline : 0(instance_num=1):
            - WaitWorkerTime: avg 25.288us, max 25.288us, min 25.288us
           RESULT_SINK_OPERATOR (id=0):
              - CloseTime: avg 3.262us, max 3.262us, min 3.262us
              - ExecTime: avg 181.520us, max 181.520us, min 181.520us
              - InputRows: sum 100, avg 100, max 100, min 100
              ...
              - WaitForDependency[RESULT_SINK_OPERATOR_DEPENDENCY]Time: avg 0ns, max 0ns, min 0ns
           EXCHANGE_OPERATOR (id=17):
              - BlocksProduced: sum 1, avg 1, max 1, min 1
              - ExecTime: avg 149.70us, max 149.70us, min 149.70us
              ...
              - RowsProduced: sum 100, avg 100, max 100, min 100
              - WaitForDependencyTime: avg 0ns, max 0ns, min 0ns
                - WaitForData0: avg 5sec473ms, max 5sec473ms, min 5sec473ms
       Fragment 1:
         Pipeline : 0(instance_num=48):
            - WaitWorkerTime: avg 15.408us, max 46.623us, min 5.290us
           DATA_STREAM_SINK_OPERATOR (id=17,dst_id=17):
              - BlocksProduced: sum 48, avg 1, max 1, min 1
              - ExecTime: avg 23.363us, max 40.711us, min 13.443us
              - InputRows: sum 100, avg 2, max 100, min 0
              ...
              - WaitForDependencyTime: avg 0ns, max 0ns, min 0ns
                - WaitForRpcBufferQueue: avg 0ns, max 0ns, min 0ns
           LOCAL_EXCHANGE_OPERATOR (LOCAL_MERGE_SORT) (id=-4):
              - BlocksProduced: sum 1, avg 0, max 1, min 0
              - CloseTime: avg 0ns, max 0ns, min 0ns
              - ExecTime: avg 15.815us, max 699.196us, min 620ns
              ...
              - RowsProduced: sum 100, avg 2, max 100, min 0
              - WaitForDependencyTime: avg 5sec472ms, max 5sec472ms, min 5sec472
```
  • Loading branch information
zhiqiang-hhhh authored Feb 18, 2025
1 parent 44d9aa4 commit 9647eba
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 20 deletions.
9 changes: 7 additions & 2 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,9 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_runtime_profile.reset(new RuntimeProfile(_parent->get_name() + name_suffix()));
_runtime_profile->set_metadata(_parent->node_id());
_runtime_profile->set_is_sink(false);
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
// indent is false so that source operator will have same
// indentation_level with its parent operator.
info.parent_profile->add_child(_runtime_profile.get(), /*indent=*/false, nullptr);
constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, FakeSharedState>;
if constexpr (!is_fake_shared) {
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) {
Expand Down Expand Up @@ -547,7 +549,10 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1);
_close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
info.parent_profile->add_child(_profile, true, nullptr);
// indentation is true
// The parent profile of sink operator is usually a RuntimeProfile called PipelineTask.
// So we should set the indentation to true.
info.parent_profile->add_child(_profile, /*indent=*/true, nullptr);
_memory_used_counter = _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public ExecutionProfile(TUniqueId queryId, List<Integer> fragmentIds) {
this.queryId = queryId;
root = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
root.addChild(fragmentsProfile);
root.addChild(fragmentsProfile, true);
fragmentProfiles = Maps.newHashMap();
multiBeProfile = Maps.newHashMap();
fragmentIdBeNum = Maps.newHashMap();
Expand All @@ -93,14 +93,14 @@ public ExecutionProfile(TUniqueId queryId, List<Integer> fragmentIds) {
for (int fragmentId : fragmentIds) {
RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " + i);
fragmentProfiles.put(fragmentId, runtimeProfile);
fragmentsProfile.addChild(runtimeProfile);
fragmentsProfile.addChild(runtimeProfile, true);
multiBeProfile.put(fragmentId, Maps.newHashMap());
fragmentIdBeNum.put(fragmentId, 0);
seqNoToFragmentId.put(i, fragmentId);
++i;
}
loadChannelProfile = new RuntimeProfile("LoadChannels");
root.addChild(loadChannelProfile);
root.addChild(loadChannelProfile, true);
}

private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) {
Expand Down Expand Up @@ -158,7 +158,7 @@ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNod
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
for (int i = 0; i < fragmentProfiles.size(); ++i) {
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i);
fragmentsProfile.addChild(newFragmentProfile);
fragmentsProfile.addChild(newFragmentProfile, true);
// All pipeline profiles of this fragment on all BEs
List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(seqNoToFragmentId.get(i));
int pipelineIdx = 0;
Expand All @@ -177,7 +177,7 @@ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNod
allPipelineTask.get(0).nodeId());
RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap);
}
newFragmentProfile.addChild(mergedpipelineProfile);
newFragmentProfile.addChild(mergedpipelineProfile, true);
pipelineIdx++;
fragmentsProfile.rowsProducedMap.putAll(mergedpipelineProfile.rowsProducedMap);
}
Expand Down Expand Up @@ -257,7 +257,7 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr

profileNode.update(pipelineProfile.profile);
profileNode.setIsDone(isDone);
fragmentProfiles.get(fragmentId).addChild(profileNode);
fragmentProfiles.get(fragmentId).addChild(profileNode, true);
}
setMultiBeProfile(fragmentId, backendHBAddress, taskProfile);
}
Expand Down Expand Up @@ -323,7 +323,7 @@ public String toString() {
sb.append("ExecutionProfile: ").append(DebugUtil.printId(queryId)).append("\n");
for (Entry<Integer, RuntimeProfile> entry : fragmentProfiles.entrySet()) {
sb.append("Fragment ").append(entry.getKey()).append(":\n");
entry.getValue().prettyPrint(sb, " ");
entry.getValue().prettyPrint(sb, " ");
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,10 @@ private void waitProfileCompleteIfNeeded() {

private RuntimeProfile composeRootProfile() {
RuntimeProfile rootProfile = new RuntimeProfile(getId());
rootProfile.addChild(summaryProfile.getSummary());
rootProfile.addChild(summaryProfile.getExecutionSummary());
rootProfile.addChild(summaryProfile.getSummary(), true);
rootProfile.addChild(summaryProfile.getExecutionSummary(), true);
for (ExecutionProfile executionProfile : executionProfiles) {
rootProfile.addChild(executionProfile.getRoot());
rootProfile.addChild(executionProfile.getRoot(), true);
}
rootProfile.computeTimeInProfile();
return rootProfile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,8 @@ public static void mergeProfiles(List<RuntimeProfile> profiles,
mergeProfiles(allChilds, newCreatedMergedChildProfile, planNodeMap);
// RuntimeProfile has at least one counter named TotalTime, should exclude it.
if (newCreatedMergedChildProfile.counterMap.size() > 1) {
simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap);
simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap,
templateProfile.childList.get(i).second);
simpleProfile.rowsProducedMap.putAll(newCreatedMergedChildProfile.rowsProducedMap);
}
}
Expand Down Expand Up @@ -662,7 +663,7 @@ public static String printCounter(long value, TUnit type) {
return builder.toString();
}

public void addChild(RuntimeProfile child) {
public void addChild(RuntimeProfile child, boolean indent) {
if (child == null) {
return;
}
Expand All @@ -679,21 +680,21 @@ public void addChild(RuntimeProfile child) {
}
}
this.childMap.put(child.name, child);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, indent);
this.childList.add(pair);
} finally {
childLock.writeLock().unlock();
}
}

public void addChildWithCheck(RuntimeProfile child, Map<Integer, String> planNodeMap) {
public void addChildWithCheck(RuntimeProfile child, Map<Integer, String> planNodeMap, boolean indent) {
// check name
if (child.name.startsWith("PipelineTask") || child.name.startsWith("PipelineContext")) {
return;
}
childLock.writeLock().lock();
try {
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, indent);
this.childList.add(pair);
} finally {
childLock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public void testSortChildren() {
profile1.getCounterTotalTime().setValue(TUnit.TIME_NS, 1);
profile2.getCounterTotalTime().setValue(TUnit.TIME_NS, 3);
profile3.getCounterTotalTime().setValue(TUnit.TIME_NS, 2);
profile.addChild(profile1);
profile.addChild(profile2);
profile.addChild(profile3);
profile.addChild(profile1, true);
profile.addChild(profile2, true);
profile.addChild(profile3, true);
// compare
profile.sortChildren();
// check result
Expand Down

0 comments on commit 9647eba

Please sign in to comment.