Skip to content

Commit

Permalink
M
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Feb 14, 2025
1 parent eee78fd commit df88a81
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 22 deletions.
9 changes: 7 additions & 2 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,9 @@ template <typename SharedStateArg>
Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalStateInfo& info) {
_runtime_profile.reset(new RuntimeProfile(_parent->get_name() + name_suffix()));
_runtime_profile->set_metadata(_parent->node_id());
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
// indent is false so that source operator will have same
// indentation_level with its parent operator.
info.parent_profile->add_child(_runtime_profile.get(), /*indent*/false, nullptr);
constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, FakeSharedState>;
if constexpr (!is_fake_shared) {
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) {
Expand Down Expand Up @@ -548,7 +550,10 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1);
_close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
info.parent_profile->add_child(_profile, true, nullptr);
// indentation is true
// The parent profile of sink operator is usually a RuntimeProfile called PipelineTask.
// So we should set the indentation to true.
info.parent_profile->add_child(_profile, /*indent=*/true, nullptr);
_memory_used_counter = _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
return Status::OK();
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,7 @@ RuntimeProfile::Counter* RuntimeProfile::add_counter(const std::string& name, TU
// TODO: FIX DUPLICATE COUNTERS
// In production, we will return the existing counter.
// This will not make be crash, but the result may be wrong.
throw Exception(
Status::InvalidArgument("Add Duplicate counter name {} to {} ", name, this->_name));
return _counter_map[name];
}

// Parent counter must already exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public ExecutionProfile(TUniqueId queryId, List<Integer> fragmentIds) {
this.queryId = queryId;
root = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
root.addChild(fragmentsProfile);
root.addChild(fragmentsProfile, true);
fragmentProfiles = Maps.newHashMap();
multiBeProfile = Maps.newHashMap();
fragmentIdBeNum = Maps.newHashMap();
Expand All @@ -93,14 +93,14 @@ public ExecutionProfile(TUniqueId queryId, List<Integer> fragmentIds) {
for (int fragmentId : fragmentIds) {
RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " + i);
fragmentProfiles.put(fragmentId, runtimeProfile);
fragmentsProfile.addChild(runtimeProfile);
fragmentsProfile.addChild(runtimeProfile, true);
multiBeProfile.put(fragmentId, Maps.newHashMap());
fragmentIdBeNum.put(fragmentId, 0);
seqNoToFragmentId.put(i, fragmentId);
++i;
}
loadChannelProfile = new RuntimeProfile("LoadChannels");
root.addChild(loadChannelProfile);
root.addChild(loadChannelProfile, true);
}

private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) {
Expand Down Expand Up @@ -158,7 +158,7 @@ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNod
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
for (int i = 0; i < fragmentProfiles.size(); ++i) {
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i);
fragmentsProfile.addChild(newFragmentProfile);
fragmentsProfile.addChild(newFragmentProfile, true);
// All pipeline profiles of this fragment on all BEs
List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(seqNoToFragmentId.get(i));
int pipelineIdx = 0;
Expand All @@ -177,7 +177,7 @@ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNod
allPipelineTask.get(0).nodeId());
RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap);
}
newFragmentProfile.addChild(mergedpipelineProfile);
newFragmentProfile.addChild(mergedpipelineProfile, true);
pipelineIdx++;
fragmentsProfile.rowsProducedMap.putAll(mergedpipelineProfile.rowsProducedMap);
}
Expand Down Expand Up @@ -257,7 +257,7 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr

profileNode.update(pipelineProfile.profile);
profileNode.setIsDone(isDone);
fragmentProfiles.get(fragmentId).addChild(profileNode);
fragmentProfiles.get(fragmentId).addChild(profileNode, true);
}
setMultiBeProfile(fragmentId, backendHBAddress, taskProfile);
}
Expand Down Expand Up @@ -323,7 +323,7 @@ public String toString() {
sb.append("ExecutionProfile: ").append(DebugUtil.printId(queryId)).append("\n");
for (Entry<Integer, RuntimeProfile> entry : fragmentProfiles.entrySet()) {
sb.append("Fragment ").append(entry.getKey()).append(":\n");
entry.getValue().prettyPrint(sb, " ");
entry.getValue().prettyPrint(sb, " ");
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,10 @@ private void waitProfileCompleteIfNeeded() {

private RuntimeProfile composeRootProfile() {
RuntimeProfile rootProfile = new RuntimeProfile(getId());
rootProfile.addChild(summaryProfile.getSummary());
rootProfile.addChild(summaryProfile.getExecutionSummary());
rootProfile.addChild(summaryProfile.getSummary(), true);
rootProfile.addChild(summaryProfile.getExecutionSummary(), true);
for (ExecutionProfile executionProfile : executionProfiles) {
rootProfile.addChild(executionProfile.getRoot());
rootProfile.addChild(executionProfile.getRoot(), true);
}
rootProfile.computeTimeInProfile();
return rootProfile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ public static void mergeProfiles(List<RuntimeProfile> profiles,
mergeProfiles(allChilds, newCreatedMergedChildProfile, planNodeMap);
// RuntimeProfile has at least one counter named TotalTime, should exclude it.
if (newCreatedMergedChildProfile.counterMap.size() > 1) {
simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap);
simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap, templateProfile.childList.get(i).second);
simpleProfile.rowsProducedMap.putAll(newCreatedMergedChildProfile.rowsProducedMap);
}
}
Expand Down Expand Up @@ -664,7 +664,7 @@ public static String printCounter(long value, TUnit type) {
return builder.toString();
}

public void addChild(RuntimeProfile child) {
public void addChild(RuntimeProfile child, boolean indent) {
if (child == null) {
return;
}
Expand All @@ -681,21 +681,21 @@ public void addChild(RuntimeProfile child) {
}
}
this.childMap.put(child.name, child);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, indent);
this.childList.add(pair);
} finally {
childLock.writeLock().unlock();
}
}

public void addChildWithCheck(RuntimeProfile child, Map<Integer, String> planNodeMap) {
public void addChildWithCheck(RuntimeProfile child, Map<Integer, String> planNodeMap, boolean indent) {
// check name
if (child.name.startsWith("PipelineTask") || child.name.startsWith("PipelineContext")) {
return;
}
childLock.writeLock().lock();
try {
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, indent);
this.childList.add(pair);
} finally {
childLock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public void testSortChildren() {
profile1.getCounterTotalTime().setValue(TUnit.TIME_NS, 1);
profile2.getCounterTotalTime().setValue(TUnit.TIME_NS, 3);
profile3.getCounterTotalTime().setValue(TUnit.TIME_NS, 2);
profile.addChild(profile1);
profile.addChild(profile2);
profile.addChild(profile3);
profile.addChild(profile1, true);
profile.addChild(profile2, true);
profile.addChild(profile3, true);
// compare
profile.sortChildren();
// check result
Expand Down

0 comments on commit df88a81

Please sign in to comment.