Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] More unit test for profile #47623

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,9 @@ Status StorageEngine::create_tablet(const TCreateTabletReq& request, RuntimeProf
// Get all available stores, use ref_root_path if the caller specified
std::vector<DataDir*> stores;
{
SCOPED_TIMER(ADD_TIMER(profile, "GetStores"));
RuntimeProfile::Counter* getStoresCounter = profile->get_counter("GetStores");
DCHECK(getStoresCounter != nullptr);
SCOPED_TIMER(getStoresCounter);
stores = get_stores_for_create_tablet(request.partition_id, request.storage_medium);
}
if (stores.empty()) {
Expand Down
11 changes: 7 additions & 4 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,9 @@ template <typename SharedStateArg>
Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalStateInfo& info) {
_runtime_profile.reset(new RuntimeProfile(_parent->get_name() + name_suffix()));
_runtime_profile->set_metadata(_parent->node_id());
_runtime_profile->set_is_sink(false);
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
// indent is false so that source operator will have same
// indentation_level with its parent operator.
info.parent_profile->add_child(_runtime_profile.get(), /*indent*/false, nullptr);
constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, FakeSharedState>;
if constexpr (!is_fake_shared) {
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) {
Expand Down Expand Up @@ -527,7 +528,6 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name() + name_suffix()));
_profile->set_metadata(_parent->node_id());
_profile->set_is_sink(true);
_wait_for_finish_dependency_timer = ADD_TIMER(_profile, "PendingFinishDependency");
constexpr auto is_fake_shared = std::is_same_v<SharedState, FakeSharedState>;
if constexpr (!is_fake_shared) {
Expand All @@ -550,7 +550,10 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1);
_close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1);
_exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1);
info.parent_profile->add_child(_profile, true, nullptr);
// indentation is true
// The parent profile of sink operator is usually a RuntimeProfile called PipelineTask.
// So we should set the indentation to true.
info.parent_profile->add_child(_profile, /*indent=*/true, nullptr);
_memory_used_counter = _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
return Status::OK();
}
Expand Down
8 changes: 2 additions & 6 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <memory>
#include <string>

#include "common/exception.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "gutil/integral_types.h"
Expand Down Expand Up @@ -300,9 +301,7 @@ RuntimeProfile* RuntimeProfile::create_child(const std::string& name, bool inden
if (this->is_set_metadata()) {
child->set_metadata(this->metadata());
}
if (this->is_set_sink()) {
child->set_is_sink(this->is_sink());
}

if (_children.empty()) {
add_child_unlock(child, indent, nullptr);
} else {
Expand Down Expand Up @@ -558,9 +557,6 @@ void RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes, int64 pr
node.metadata = _metadata;
node.timestamp = _timestamp;
node.indent = true;
if (this->is_set_sink()) {
node.__set_is_sink(this->is_sink());
}

{
std::lock_guard<std::mutex> l(_counter_map_lock);
Expand Down
9 changes: 0 additions & 9 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,15 +422,6 @@ class RuntimeProfile {

bool is_set_metadata() const { return _is_set_metadata; }

void set_is_sink(bool is_sink) {
_is_set_sink = true;
_is_sink = is_sink;
}

bool is_sink() const { return _is_sink; }

bool is_set_sink() const { return _is_set_sink; }

time_t timestamp() const { return _timestamp; }
void set_timestamp(time_t ss) { _timestamp = ss; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public ExecutionProfile(TUniqueId queryId, List<Integer> fragmentIds) {
this.queryId = queryId;
root = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
root.addChild(fragmentsProfile);
root.addChild(fragmentsProfile, true);
fragmentProfiles = Maps.newHashMap();
multiBeProfile = Maps.newHashMap();
fragmentIdBeNum = Maps.newHashMap();
Expand All @@ -93,14 +93,14 @@ public ExecutionProfile(TUniqueId queryId, List<Integer> fragmentIds) {
for (int fragmentId : fragmentIds) {
RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " + i);
fragmentProfiles.put(fragmentId, runtimeProfile);
fragmentsProfile.addChild(runtimeProfile);
fragmentsProfile.addChild(runtimeProfile, true);
multiBeProfile.put(fragmentId, Maps.newHashMap());
fragmentIdBeNum.put(fragmentId, 0);
seqNoToFragmentId.put(i, fragmentId);
++i;
}
loadChannelProfile = new RuntimeProfile("LoadChannels");
root.addChild(loadChannelProfile);
root.addChild(loadChannelProfile, true);
}

private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) {
Expand Down Expand Up @@ -158,7 +158,7 @@ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNod
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
for (int i = 0; i < fragmentProfiles.size(); ++i) {
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i);
fragmentsProfile.addChild(newFragmentProfile);
fragmentsProfile.addChild(newFragmentProfile, true);
// All pipeline profiles of this fragment on all BEs
List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(seqNoToFragmentId.get(i));
int pipelineIdx = 0;
Expand All @@ -177,7 +177,7 @@ private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNod
allPipelineTask.get(0).nodeId());
RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap);
}
newFragmentProfile.addChild(mergedpipelineProfile);
newFragmentProfile.addChild(mergedpipelineProfile, true);
pipelineIdx++;
fragmentsProfile.rowsProducedMap.putAll(mergedpipelineProfile.rowsProducedMap);
}
Expand Down Expand Up @@ -257,7 +257,7 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr

profileNode.update(pipelineProfile.profile);
profileNode.setIsDone(isDone);
fragmentProfiles.get(fragmentId).addChild(profileNode);
fragmentProfiles.get(fragmentId).addChild(profileNode, true);
}
setMultiBeProfile(fragmentId, backendHBAddress, taskProfile);
}
Expand Down Expand Up @@ -323,7 +323,7 @@ public String toString() {
sb.append("ExecutionProfile: ").append(DebugUtil.printId(queryId)).append("\n");
for (Entry<Integer, RuntimeProfile> entry : fragmentProfiles.entrySet()) {
sb.append("Fragment ").append(entry.getKey()).append(":\n");
entry.getValue().prettyPrint(sb, " ");
entry.getValue().prettyPrint(sb, " ");
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,10 @@ private void waitProfileCompleteIfNeeded() {

private RuntimeProfile composeRootProfile() {
RuntimeProfile rootProfile = new RuntimeProfile(getId());
rootProfile.addChild(summaryProfile.getSummary());
rootProfile.addChild(summaryProfile.getExecutionSummary());
rootProfile.addChild(summaryProfile.getSummary(), true);
rootProfile.addChild(summaryProfile.getExecutionSummary(), true);
for (ExecutionProfile executionProfile : executionProfiles) {
rootProfile.addChild(executionProfile.getRoot());
rootProfile.addChild(executionProfile.getRoot(), true);
}
rootProfile.computeTimeInProfile();
return rootProfile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ private void update(List<TRuntimeProfileNode> nodes, Reference<Integer> idx) {
// update this level's counters
if (node.counters != null) {
for (TCounter tcounter : node.counters) {
Counter counter = counterMap.get(tcounter.name);
// If different node has counter with the same name, it will lead to chaos.
Counter counter = this.counterMap.get(tcounter.name);
if (counter == null) {
counterMap.put(tcounter.name, new Counter(tcounter.type, tcounter.value, tcounter.level));
} else {
Expand Down Expand Up @@ -460,10 +461,10 @@ private void printPlanNodeInfo(String prefix, StringBuilder builder) {
if (planNodeInfos.isEmpty()) {
return;
}
builder.append(prefix + "- " + "PlanInfo\n");
builder.append(prefix).append("- ").append("PlanInfo\n");

for (String info : planNodeInfos) {
builder.append(prefix + " - " + info + "\n");
builder.append(prefix).append(" - ").append(info).append("\n");
}
}

Expand Down Expand Up @@ -497,19 +498,20 @@ public static void mergeProfiles(List<RuntimeProfile> profiles,
RuntimeProfile templateProfile = profiles.get(0);
for (int i = 0; i < templateProfile.childList.size(); i++) {
RuntimeProfile templateChildProfile = templateProfile.childList.get(i).first;
// Traverse all profiles and get the child profile with the same name
List<RuntimeProfile> allChilds = getChildListFromLists(templateChildProfile.name, profiles);
RuntimeProfile newCreatedMergedChildProfile = new RuntimeProfile(templateChildProfile.name,
templateChildProfile.nodeId());
mergeProfiles(allChilds, newCreatedMergedChildProfile, planNodeMap);
// RuntimeProfile has at least one counter named TotalTime, should exclude it.
if (newCreatedMergedChildProfile.counterMap.size() > 1) {
simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap);
simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap, templateProfile.childList.get(i).second);
simpleProfile.rowsProducedMap.putAll(newCreatedMergedChildProfile.rowsProducedMap);
}
}
}

private static void mergeCounters(String parentCounterName, List<RuntimeProfile> profiles,
static void mergeCounters(String parentCounterName, List<RuntimeProfile> profiles,
RuntimeProfile simpleProfile) {
if (profiles.isEmpty()) {
return;
Expand Down Expand Up @@ -539,7 +541,7 @@ private static void mergeCounters(String parentCounterName, List<RuntimeProfile>
Counter oldCounter = templateCounterMap.get(childCounterName);
AggCounter aggCounter = new AggCounter(oldCounter.getType());
for (RuntimeProfile profile : profiles) {
// orgCounter could be null if counter structure is changed
// orgCounter could be null if counter-structure is changed
// change of counter structure happens when NonZeroCounter is involved.
// So here we have to ignore the counter if it is not found in the profile.
Counter orgCounter = profile.counterMap.get(childCounterName);
Expand Down Expand Up @@ -662,7 +664,7 @@ public static String printCounter(long value, TUnit type) {
return builder.toString();
}

public void addChild(RuntimeProfile child) {
public void addChild(RuntimeProfile child, boolean indent) {
if (child == null) {
return;
}
Expand All @@ -679,26 +681,26 @@ public void addChild(RuntimeProfile child) {
}
}
this.childMap.put(child.name, child);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, indent);
this.childList.add(pair);
} finally {
childLock.writeLock().unlock();
}
}

public void addChildWithCheck(RuntimeProfile child, Map<Integer, String> planNodeMap) {
public void addChildWithCheck(RuntimeProfile child, Map<Integer, String> planNodeMap, boolean indent) {
// check name
if (child.name.startsWith("PipelineTask") || child.name.startsWith("PipelineContext")) {
return;
}
childLock.writeLock().lock();
try {
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, indent);
this.childList.add(pair);
} finally {
childLock.writeLock().unlock();
}
// insert plan node info to profile strinfo
// insert plan node info to profile string info
if (planNodeMap == null || !planNodeMap.containsKey(child.nodeId())) {
return;
}
Expand All @@ -722,30 +724,6 @@ public void addPlanNodeInfos(String infos) {
}
}

public void addFirstChild(RuntimeProfile child) {
if (child == null) {
return;
}
childLock.writeLock().lock();
try {
if (childMap.containsKey(child.name)) {
childList.removeIf(e -> e.first.name.equals(child.name));
}
this.childMap.put(child.name, child);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true);
this.childList.addFirst(pair);
} finally {
childLock.writeLock().unlock();
}
}

// Because the profile of summary and child fragment is not a real parent-child
// relationship
// Each child profile needs to calculate the time proportion consumed by itself
public void computeTimeInChildProfile() {
childMap.values().forEach(RuntimeProfile::computeTimeInProfile);
}

public void computeTimeInProfile() {
computeTimeInProfile(this.counterTotalTime.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ public Object profile(@PathVariable(value = ID) String id) {
if (profile == null) {
return ResponseEntityBuilder.okWithCommonError("ID " + id + " does not exist");
}
profile = profile.replaceAll("\n", "</br>");
profile = profile.replaceAll(" ", "&nbsp;&nbsp;");
return ResponseEntityBuilder.ok(profile);
}

Expand Down
Loading