Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](profile) Operator of same pipeline should have same indent level #47921

Merged
merged 3 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,9 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_runtime_profile.reset(new RuntimeProfile(_parent->get_name() + name_suffix()));
_runtime_profile->set_metadata(_parent->node_id());
_runtime_profile->set_is_sink(false);
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
// indent is false so that source operator will have same
// indentation_level with its parent operator.
info.parent_profile->add_child(_runtime_profile.get(), /*indent=*/false, nullptr);
constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, FakeSharedState>;
if constexpr (!is_fake_shared) {
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) {
Expand Down Expand Up @@ -550,7 +552,10 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1);
_close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
info.parent_profile->add_child(_profile, true, nullptr);
// indentation is true
// The parent profile of sink operator is usually a RuntimeProfile called PipelineTask.
// So we should set the indentation to true.
info.parent_profile->add_child(_profile, /*indent=*/true, nullptr);
_memory_used_counter = _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public ExecutionProfile(TUniqueId queryId, List<Integer> fragmentIds) {
this.queryId = queryId;
root = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
root.addChild(fragmentsProfile);
root.addChild(fragmentsProfile, true);
fragmentProfiles = Maps.newHashMap();
multiBeProfile = Maps.newHashMap();
fragmentIdBeNum = Maps.newHashMap();
Expand All @@ -93,14 +93,14 @@ public ExecutionProfile(TUniqueId queryId, List<Integer> fragmentIds) {
for (int fragmentId : fragmentIds) {
RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " + i);
fragmentProfiles.put(fragmentId, runtimeProfile);
fragmentsProfile.addChild(runtimeProfile);
fragmentsProfile.addChild(runtimeProfile, true);
multiBeProfile.put(fragmentId, Maps.newHashMap());
fragmentIdBeNum.put(fragmentId, 0);
seqNoToFragmentId.put(i, fragmentId);
++i;
}
loadChannelProfile = new RuntimeProfile("LoadChannels");
root.addChild(loadChannelProfile);
root.addChild(loadChannelProfile, true);
}

private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) {
Expand Down Expand Up @@ -158,7 +158,7 @@ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNod
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
for (int i = 0; i < fragmentProfiles.size(); ++i) {
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i);
fragmentsProfile.addChild(newFragmentProfile);
fragmentsProfile.addChild(newFragmentProfile, true);
// All pipeline profiles of this fragment on all BEs
List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(seqNoToFragmentId.get(i));
int pipelineIdx = 0;
Expand All @@ -177,7 +177,7 @@ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNod
allPipelineTask.get(0).nodeId());
RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap);
}
newFragmentProfile.addChild(mergedpipelineProfile);
newFragmentProfile.addChild(mergedpipelineProfile, true);
pipelineIdx++;
fragmentsProfile.rowsProducedMap.putAll(mergedpipelineProfile.rowsProducedMap);
}
Expand Down Expand Up @@ -257,7 +257,7 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr

profileNode.update(pipelineProfile.profile);
profileNode.setIsDone(isDone);
fragmentProfiles.get(fragmentId).addChild(profileNode);
fragmentProfiles.get(fragmentId).addChild(profileNode, true);
}
setMultiBeProfile(fragmentId, backendHBAddress, taskProfile);
}
Expand Down Expand Up @@ -323,7 +323,7 @@ public String toString() {
sb.append("ExecutionProfile: ").append(DebugUtil.printId(queryId)).append("\n");
for (Entry<Integer, RuntimeProfile> entry : fragmentProfiles.entrySet()) {
sb.append("Fragment ").append(entry.getKey()).append(":\n");
entry.getValue().prettyPrint(sb, " ");
entry.getValue().prettyPrint(sb, " ");
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,10 @@ private void waitProfileCompleteIfNeeded() {

private RuntimeProfile composeRootProfile() {
RuntimeProfile rootProfile = new RuntimeProfile(getId());
rootProfile.addChild(summaryProfile.getSummary());
rootProfile.addChild(summaryProfile.getExecutionSummary());
rootProfile.addChild(summaryProfile.getSummary(), true);
rootProfile.addChild(summaryProfile.getExecutionSummary(), true);
for (ExecutionProfile executionProfile : executionProfiles) {
rootProfile.addChild(executionProfile.getRoot());
rootProfile.addChild(executionProfile.getRoot(), true);
}
rootProfile.computeTimeInProfile();
return rootProfile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ public static void mergeProfiles(List<RuntimeProfile> profiles,
mergeProfiles(allChilds, newCreatedMergedChildProfile, planNodeMap);
// RuntimeProfile has at least one counter named TotalTime, should exclude it.
if (newCreatedMergedChildProfile.counterMap.size() > 1) {
simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap);
simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap, templateProfile.childList.get(i).second);
simpleProfile.rowsProducedMap.putAll(newCreatedMergedChildProfile.rowsProducedMap);
}
}
Expand Down Expand Up @@ -662,7 +662,7 @@ public static String printCounter(long value, TUnit type) {
return builder.toString();
}

public void addChild(RuntimeProfile child) {
public void addChild(RuntimeProfile child, boolean indent) {
if (child == null) {
return;
}
Expand All @@ -679,21 +679,21 @@ public void addChild(RuntimeProfile child) {
}
}
this.childMap.put(child.name, child);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, indent);
this.childList.add(pair);
} finally {
childLock.writeLock().unlock();
}
}

public void addChildWithCheck(RuntimeProfile child, Map<Integer, String> planNodeMap) {
public void addChildWithCheck(RuntimeProfile child, Map<Integer, String> planNodeMap, boolean indent) {
// check name
if (child.name.startsWith("PipelineTask") || child.name.startsWith("PipelineContext")) {
return;
}
childLock.writeLock().lock();
try {
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, indent);
this.childList.add(pair);
} finally {
childLock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public void testSortChildren() {
profile1.getCounterTotalTime().setValue(TUnit.TIME_NS, 1);
profile2.getCounterTotalTime().setValue(TUnit.TIME_NS, 3);
profile3.getCounterTotalTime().setValue(TUnit.TIME_NS, 2);
profile.addChild(profile1);
profile.addChild(profile2);
profile.addChild(profile3);
profile.addChild(profile1, true);
profile.addChild(profile2, true);
profile.addChild(profile3, true);
// compare
profile.sortChildren();
// check result
Expand Down