Skip to content

[test](profile) More unit test for profile on FE #47623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ template <typename SharedStateArg>
Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalStateInfo& info) {
_runtime_profile.reset(new RuntimeProfile(_parent->get_name() + name_suffix()));
_runtime_profile->set_metadata(_parent->node_id());
_runtime_profile->set_is_sink(false);
// indent is false so that source operator will have same
// indentation_level with its parent operator.
info.parent_profile->add_child(_runtime_profile.get(), /*indent=*/false, nullptr);
Expand Down Expand Up @@ -540,7 +539,6 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name() + name_suffix()));
_profile->set_metadata(_parent->node_id());
_profile->set_is_sink(true);
_wait_for_finish_dependency_timer = ADD_TIMER(_profile, "PendingFinishDependency");
constexpr auto is_fake_shared = std::is_same_v<SharedState, FakeSharedState>;
if constexpr (!is_fake_shared) {
Expand Down
7 changes: 1 addition & 6 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,7 @@ RuntimeProfile* RuntimeProfile::create_child(const std::string& name, bool inden
if (this->is_set_metadata()) {
child->set_metadata(this->metadata());
}
if (this->is_set_sink()) {
child->set_is_sink(this->is_sink());
}

if (_children.empty()) {
add_child_unlock(child, indent, nullptr);
} else {
Expand Down Expand Up @@ -558,9 +556,6 @@ void RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes, int64 pr
node.metadata = _metadata;
node.timestamp = _timestamp;
node.indent = true;
if (this->is_set_sink()) {
node.__set_is_sink(this->is_sink());
}

{
std::lock_guard<std::mutex> l(_counter_map_lock);
Expand Down
9 changes: 0 additions & 9 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,15 +422,6 @@ class RuntimeProfile {

bool is_set_metadata() const { return _is_set_metadata; }

void set_is_sink(bool is_sink) {
_is_set_sink = true;
_is_sink = is_sink;
}

bool is_sink() const { return _is_sink; }

bool is_set_sink() const { return _is_set_sink; }

time_t timestamp() const { return _timestamp; }
void set_timestamp(time_t ss) { _timestamp = ss; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ public class Profile {
private SummaryProfile summaryProfile = new SummaryProfile();
// executionProfiles will be stored to storage as text, when getting profile content, we will read
// from storage directly.
private List<ExecutionProfile> executionProfiles = Lists.newArrayList();
List<ExecutionProfile> executionProfiles = Lists.newArrayList();
// profileStoragePath will only be assigned when:
// 1. profile is stored to storage
// 2. or profile is loaded from storage
private String profileStoragePath = "";
// isQueryFinished means the coordinator or stmt executor is finished.
// does not mean the profile report has finished, since the report is async.
// finish of collection of profile is marked by isCompleted of ExecutionProfiles.
private boolean isQueryFinished = false;
boolean isQueryFinished = false;
// when coordinator finishes, it will mark finish time.
// we will wait for about 5 seconds to see if all profiles have been reported.
// if not, we will store the profile to storage, and release the memory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MasterDaemon;
Expand Down Expand Up @@ -71,7 +70,7 @@
public class ProfileManager extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(ProfileManager.class);
private static volatile ProfileManager INSTANCE = null;
private static final String PROFILE_STORAGE_PATH = Config.spilled_profile_storage_path;
static String PROFILE_STORAGE_PATH = Config.spilled_profile_storage_path;

public enum ProfileType {
QUERY,
Expand Down Expand Up @@ -131,10 +130,10 @@ public void deleteFromStorage() {

// profile id is long string for broker load
// is TUniqueId for others.
private Map<String, ProfileElement> queryIdToProfileMap;
final Map<String, ProfileElement> queryIdToProfileMap;
// Sometimes one Profile is related with multiple execution profiles(Broker-load), so that
// execution profile's query id is not related with Profile's query id.
private Map<TUniqueId, ExecutionProfile> queryIdToExecutionProfiles;
final Map<TUniqueId, ExecutionProfile> queryIdToExecutionProfiles;

private final ExecutorService fetchRealTimeProfileExecutor;
private final ExecutorService profileIOExecutor;
Expand Down Expand Up @@ -168,9 +167,9 @@ public static ProfileManager getInstance() {
private ProfileElement createElement(Profile profile) {
ProfileElement element = new ProfileElement(profile);
element.infoStrings.putAll(profile.getSummaryProfile().getAsInfoStings());
// Not init builder any more, we will not maintain it since 2.1.0, because the structure
// Not init builder anymore, we will not maintain it since 2.1.0, because the structure
// assume that the execution profiles structure is already known before execution. But in
// PipelineX Engine, it will changed during execution.
// PipelineX Engine, it will be changed during execution.
return element;
}

Expand Down Expand Up @@ -421,10 +420,6 @@ public ProfileElement findProfileElementObject(String queryId) {

/**
* Check if the query with specific query id is queried by specific user.
*
* @param user
* @param queryId
* @throws DdlException
*/
public void checkAuthByUserAndQueryId(String user, String queryId) throws AuthenticationException {
readLock.lock();
Expand Down Expand Up @@ -484,7 +479,7 @@ protected void runAfterCatalogReady() {

// List PROFILE_STORAGE_PATH and return all dir names
// string will contain profile id and its storage timestamp
private List<String> getOnStorageProfileInfos() {
List<String> getOnStorageProfileInfos() {
List<String> res = Lists.newArrayList();
try {
File profileDir = new File(PROFILE_STORAGE_PATH);
Expand All @@ -509,7 +504,7 @@ private List<String> getOnStorageProfileInfos() {
// read profile file on storage
// deserialize to an object Profile
// push them to memory structure of ProfileManager for index
private void loadProfilesFromStorageIfFirstTime() {
void loadProfilesFromStorageIfFirstTime() {
if (this.isProfileLoaded) {
return;
}
Expand Down Expand Up @@ -556,7 +551,7 @@ private void loadProfilesFromStorageIfFirstTime() {
}
}

private void createProfileStorageDirIfNecessary() {
void createProfileStorageDirIfNecessary() {
File profileDir = new File(PROFILE_STORAGE_PATH);
if (profileDir.exists()) {
return;
Expand All @@ -570,7 +565,7 @@ private void createProfileStorageDirIfNecessary() {
}
}

private List<ProfileElement> getProfilesNeedStore() {
List<ProfileElement> getProfilesNeedStore() {
List<ProfileElement> profilesToBeStored = Lists.newArrayList();

queryIdToProfileMap.forEach((queryId, profileElement) -> {
Expand All @@ -585,7 +580,7 @@ private List<ProfileElement> getProfilesNeedStore() {
// Collect profiles that need to be stored to storage
// Store them to storage
// Release the memory
private void writeProfileToStorage() {
void writeProfileToStorage() {
try {
if (Strings.isNullOrEmpty(PROFILE_STORAGE_PATH)) {
LOG.error("Logical error, PROFILE_STORAGE_PATH is empty");
Expand Down Expand Up @@ -639,7 +634,7 @@ private void writeProfileToStorage() {
}
}

private List<ProfileElement> getProfilesToBeRemoved() {
List<ProfileElement> getProfilesToBeRemoved() {
// By order of query finish timestamp
// The profile with the least storage timestamp will be on the top of heap
PriorityQueue<ProfileElement> profileDeque = new PriorityQueue<>(Comparator.comparingLong(
Expand Down Expand Up @@ -671,7 +666,7 @@ private List<ProfileElement> getProfilesToBeRemoved() {

// We can not store all profiles on storage, because the storage space is limited
// So we need to remove the outdated profiles
private void deleteOutdatedProfilesFromStorage() {
void deleteOutdatedProfilesFromStorage() {
try {
List<ProfileElement> queryIdToBeRemoved = Lists.newArrayList();
readLock.lock();
Expand Down Expand Up @@ -723,7 +718,7 @@ private void deleteOutdatedProfilesFromStorage() {
}
}

private List<String> getBrokenProfiles() {
List<String> getBrokenProfiles() {
List<String> profilesOnStorage = getOnStorageProfileInfos();
List<String> brokenProfiles = Lists.newArrayList();

Expand Down Expand Up @@ -767,7 +762,7 @@ private List<String> getBrokenProfiles() {
return brokenProfiles;
}

private void deleteBrokenProfiles() {
void deleteBrokenProfiles() {
List<String> brokenProfiles = getBrokenProfiles();
List<Future<?>> profileDeleteFutures = Lists.newArrayList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,13 @@ private void update(List<TRuntimeProfileNode> nodes, Reference<Integer> idx) {
if (node.isSetMetadata()) {
this.nodeid = (int) node.getMetadata();
}
if (node.isSetIsSink()) {
this.isSinkOperator = node.is_sink;
}

Preconditions.checkState(timestamp == -1 || node.timestamp != -1);
// update this level's counters
if (node.counters != null) {
for (TCounter tcounter : node.counters) {
Counter counter = counterMap.get(tcounter.name);
// If different node has counter with the same name, it will lead to chaos.
Counter counter = this.counterMap.get(tcounter.name);
if (counter == null) {
counterMap.put(tcounter.name, new Counter(tcounter.type, tcounter.value, tcounter.level));
} else {
Expand Down Expand Up @@ -460,10 +459,10 @@ private void printPlanNodeInfo(String prefix, StringBuilder builder) {
if (planNodeInfos.isEmpty()) {
return;
}
builder.append(prefix + "- " + "PlanInfo\n");
builder.append(prefix).append("- ").append("PlanInfo\n");

for (String info : planNodeInfos) {
builder.append(prefix + " - " + info + "\n");
builder.append(prefix).append(" - ").append(info).append("\n");
}
}

Expand Down Expand Up @@ -497,6 +496,7 @@ public static void mergeProfiles(List<RuntimeProfile> profiles,
RuntimeProfile templateProfile = profiles.get(0);
for (int i = 0; i < templateProfile.childList.size(); i++) {
RuntimeProfile templateChildProfile = templateProfile.childList.get(i).first;
// Traverse all profiles and get the child profile with the same name
List<RuntimeProfile> allChilds = getChildListFromLists(templateChildProfile.name, profiles);
RuntimeProfile newCreatedMergedChildProfile = new RuntimeProfile(templateChildProfile.name,
templateChildProfile.nodeId());
Expand All @@ -510,7 +510,7 @@ public static void mergeProfiles(List<RuntimeProfile> profiles,
}
}

private static void mergeCounters(String parentCounterName, List<RuntimeProfile> profiles,
static void mergeCounters(String parentCounterName, List<RuntimeProfile> profiles,
RuntimeProfile simpleProfile) {
if (profiles.isEmpty()) {
return;
Expand Down Expand Up @@ -540,7 +540,7 @@ private static void mergeCounters(String parentCounterName, List<RuntimeProfile>
Counter oldCounter = templateCounterMap.get(childCounterName);
AggCounter aggCounter = new AggCounter(oldCounter.getType());
for (RuntimeProfile profile : profiles) {
// orgCounter could be null if counter structure is changed
// orgCounter could be null if counter-structure is changed
// change of counter structure happens when NonZeroCounter is involved.
// So here we have to ignore the counter if it is not found in the profile.
Counter orgCounter = profile.counterMap.get(childCounterName);
Expand Down Expand Up @@ -699,7 +699,7 @@ public void addChildWithCheck(RuntimeProfile child, Map<Integer, String> planNod
} finally {
childLock.writeLock().unlock();
}
// insert plan node info to profile strinfo
// insert plan node info to profile string info
if (planNodeMap == null || !planNodeMap.containsKey(child.nodeId())) {
return;
}
Expand All @@ -723,30 +723,6 @@ public void addPlanNodeInfos(String infos) {
}
}

public void addFirstChild(RuntimeProfile child) {
if (child == null) {
return;
}
childLock.writeLock().lock();
try {
if (childMap.containsKey(child.name)) {
childList.removeIf(e -> e.first.name.equals(child.name));
}
this.childMap.put(child.name, child);
Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true);
this.childList.addFirst(pair);
} finally {
childLock.writeLock().unlock();
}
}

// Because the profile of summary and child fragment is not a real parent-child
// relationship
// Each child profile needs to calculate the time proportion consumed by itself
public void computeTimeInChildProfile() {
childMap.values().forEach(RuntimeProfile::computeTimeInProfile);
}

public void computeTimeInProfile() {
computeTimeInProfile(this.counterTotalTime.getValue());
}
Expand Down
Loading
Loading