Skip to content

Commit

Permalink
[Enhancement] Optimize memory tracker (StarRocks#49841)
Browse files Browse the repository at this point in the history
Currently, memory statistics consume a lot of performance because they are all calculated each time.

Get some samples to estimate the manager's memory, even though it is inaccurate, but it's helpful to solve memory leak bugs.

Signed-off-by: gengjun-git <[email protected]>
(cherry picked from commit f0cb5e9)

resolve

Signed-off-by: gengjun-git <[email protected]>

fix

Signed-off-by: gengjun-git <[email protected]>
  • Loading branch information
gengjun-git committed Sep 5, 2024
1 parent 44058fe commit bc62d16
Show file tree
Hide file tree
Showing 28 changed files with 483 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -755,6 +756,10 @@ public void removeOldJobs() throws DdlException {
public Map<String, Long> estimateCount() {
return ImmutableMap.of("BackupOrRestoreJob", (long) dbIdToBackupOrRestoreJob.size());
}
}


@Override
public List<Pair<List<Object>, Long>> getSamples() {
List<Object> jobSamples = new ArrayList<>(dbIdToBackupOrRestoreJob.values());
return Lists.newArrayList(Pair.create(jobSamples, (long) dbIdToBackupOrRestoreJob.size()));
}
}
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -920,4 +921,21 @@ public void setExist(boolean exist) {
public boolean isExist() {
return exist;
}

public List<PhysicalPartition> getPartitionSamples() {
return this.idToTable.values()
.stream()
.filter(table -> table instanceof OlapTable)
.map(table -> ((OlapTable) table).getPartitionSample())
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

public int getOlapPartitionsCount() {
return this.idToTable.values()
.stream()
.filter(table -> table instanceof OlapTable)
.mapToInt(table -> ((OlapTable) table).getPartitionsCount())
.sum();
}
}
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3113,4 +3113,15 @@ private boolean isEnableFillDataCacheImpl(Partition partition) throws AnalysisEx
return true;
}
// ------ for lake table and lake materialized view end ------
public int getPartitionsCount() {
return physicalPartitionIdToPartitionId.size();
}

public PhysicalPartition getPartitionSample() {
if (!idToPartition.isEmpty()) {
return idToPartition.values().iterator().next().getSubPartitions().iterator().next();
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -879,5 +879,24 @@ public void clear() {
public Map<String, Long> estimateCount() {
return ImmutableMap.of("TabletMeta", (long) tabletMetaMap.size());
}
}

@Override
public List<Pair<List<Object>, Long>> getSamples() {
readLock();
try {
List<Object> tabletMetaSamples = tabletMetaMap.values()
.stream()
.limit(1)
.collect(Collectors.toList());

List<Object> longSamples = Lists.newArrayList(0L);
long longSize = tabletMetaMap.size() + replicaToTabletMap.size() * 2L + forceDeleteTablets.size() * 4L
+ replicaMetaTable.size() * 2L + backingReplicaMetaTable.size() * 2L;

return Lists.newArrayList(Pair.create(tabletMetaSamples, (long) tabletMetaMap.size()),
Pair.create(longSamples, longSize));
} finally {
readUnlock();
}
}
}
2 changes: 1 addition & 1 deletion fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1232,7 +1232,7 @@ public class Config extends ConfigBase {
* If set to true, memory tracker feature will open
*/
@ConfField(mutable = true)
public static boolean memory_tracker_enable = false;
public static boolean memory_tracker_enable = true;

/**
* Decide how often to track the memory usage of the FE process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.common.Config;
import com.starrocks.common.Pair;
import com.starrocks.common.util.concurrent.FairReentrantReadWriteLock;
import com.starrocks.memory.MemoryTrackable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.util.SizeEstimator;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -54,6 +54,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.stream.Collectors;

/*
* if you want to visit the atrribute(such as queryID,defaultDb)
Expand All @@ -80,20 +81,12 @@ public class ProfileManager implements MemoryTrackable {
public static final String VARIABLES = "Variables";
public static final String PROFILE_COLLECT_TIME = "Collect Profile Time";

private static final int MEMORY_PROFILE_SAMPLES = 10;

public static final ArrayList<String> PROFILE_HEADERS = new ArrayList<>(
Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE,
START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE));

@Override
public long estimateSize() {
return SizeEstimator.estimate(profileMap) + SizeEstimator.estimate(loadProfileMap);
}

@Override
public Map<String, Long> estimateCount() {
return ImmutableMap.of("Profile", (long) profileMap.size(),
"LoadProfile", (long) loadProfileMap.size());
}

public static class ProfileElement {
public Map<String, String> infoStrings = Maps.newHashMap();
Expand Down Expand Up @@ -299,6 +292,7 @@ public List<ProfileElement> getAllProfileElements() {
return result;
}


public long getQueryProfileCount() {
readLock.lock();
try {
Expand All @@ -316,4 +310,30 @@ public long getLoadProfileCount() {
readLock.unlock();
}
}

@Override
public Map<String, Long> estimateCount() {
return ImmutableMap.of("QueryProfile", (long) profileMap.size(),
"LoadProfile", (long) loadProfileMap.size());
}

@Override
public List<Pair<List<Object>, Long>> getSamples() {
readLock.lock();
try {
List<Object> profileSamples = profileMap.values()
.stream()
.limit(MEMORY_PROFILE_SAMPLES)
.collect(Collectors.toList());
List<Object> loadProfileSamples = loadProfileMap.values()
.stream()
.limit(MEMORY_PROFILE_SAMPLES)
.collect(Collectors.toList());

return Lists.newArrayList(Pair.create(profileSamples, (long) profileMap.size()),
Pair.create(loadProfileSamples, (long) loadProfileMap.size()));
} finally {
readLock.unlock();
}
}
}
31 changes: 21 additions & 10 deletions fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.util.SizeEstimator;
import org.apache.thrift.TException;

import java.util.ArrayList;
Expand All @@ -135,18 +134,30 @@

public class ReportHandler extends Daemon implements MemoryTrackable {
@Override
public long estimateSize() {
return SizeEstimator.estimate(reportQueue) + SizeEstimator.estimate(pendingTaskMap);
public List<Pair<List<Object>, Long>> getSamples() {
synchronized (pendingTaskMap) {
List<Pair<List<Object>, Long>> result = new ArrayList<>();
for (Map<Long, ReportTask> taskMap : pendingTaskMap.values()) {
result.add(Pair.create(taskMap.values()
.stream()
.limit(1)
.collect(Collectors.toList()),
(long) taskMap.size()));
}
return result;
}
}

@Override
public Map<String, Long> estimateCount() {
long count = 0;
for (Map<Long, ReportTask> taskMap : pendingTaskMap.values()) {
count += taskMap.size();
synchronized (pendingTaskMap) {
long count = 0;
for (Map<Long, ReportTask> taskMap : pendingTaskMap.values()) {
count += taskMap.size();
}
return ImmutableMap.of("PendingTask", count,
"ReportQueue", (long) reportQueue.size());
}
return ImmutableMap.of("PendingTask", count,
"ReportQueue", (long) reportQueue.size());
}

public enum ReportType {
Expand All @@ -165,9 +176,9 @@ public enum ReportType {

private static final Logger LOG = LogManager.getLogger(ReportHandler.class);

private BlockingQueue<Pair<Long, ReportType>> reportQueue = Queues.newLinkedBlockingQueue();
private final BlockingQueue<Pair<Long, ReportType>> reportQueue = Queues.newLinkedBlockingQueue();

private Map<ReportType, Map<Long, ReportTask>> pendingTaskMap = Maps.newHashMap();
private final Map<ReportType, Map<Long, ReportTask>> pendingTaskMap = Maps.newHashMap();

/**
* Record the mapping of <tablet id, backend id> to the to be dropped time of tablet.
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReport;
import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.DateUtils;
Expand Down Expand Up @@ -912,4 +913,14 @@ public Map<String, Long> estimateCount() {
return ImmutableMap.of("DeleteInfo", count);
}

@Override
public List<Pair<List<Object>, Long>> getSamples() {
List<Object> samples = dbToDeleteInfos.values()
.stream()
.filter(infos -> !infos.isEmpty())
.map(infos -> infos.stream().findAny().get())
.collect(Collectors.toList());
long size = dbToDeleteInfos.values().stream().mapToInt(List::size).sum();
return Lists.newArrayList(Pair.create(samples, size));
}
}
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/ExportMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.common.util.ListComparator;
import com.starrocks.common.util.OrderByPair;
Expand Down Expand Up @@ -465,4 +466,9 @@ public void loadExportJobV2(SRMetaBlockReader reader) throws IOException, SRMeta
public Map<String, Long> estimateCount() {
return ImmutableMap.of("ExportJob", (long) idToJob.size());
}

@Override
public List<Pair<List<Object>, Long>> getSamples() {
return Lists.newArrayList(Pair.create(new ArrayList<>(idToJob.values()), (long) idToJob.size()));
}
}
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.starrocks.common.LabelAlreadyUsedException;
import com.starrocks.common.LoadException;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.Pair;
import com.starrocks.common.TimeoutException;
import com.starrocks.common.UserException;
import com.starrocks.common.io.Writable;
Expand Down Expand Up @@ -108,6 +109,7 @@
*/
public class LoadMgr implements Writable, MemoryTrackable {
private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
private static final int MEMORY_JOB_SAMPLES = 10;

private final Map<Long, LoadJob> idToLoadJob = Maps.newConcurrentMap();
private final Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs = Maps.newConcurrentMap();
Expand Down Expand Up @@ -815,4 +817,13 @@ public void saveLoadJobsV2JsonFormat(DataOutputStream out) throws IOException, S
public Map<String, Long> estimateCount() {
return ImmutableMap.of("LoadJob", (long) idToLoadJob.size());
}

@Override
public List<Pair<List<Object>, Long>> getSamples() {
List<Object> samples = idToLoadJob.values()
.stream()
.limit(MEMORY_JOB_SAMPLES)
.collect(Collectors.toList());
return Lists.newArrayList(Pair.create(samples, (long) idToLoadJob.size()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.starrocks.common.DdlException;
import com.starrocks.common.InternalErrorCode;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.LogBuilder;
Expand Down Expand Up @@ -92,6 +93,7 @@

public class RoutineLoadMgr implements Writable, MemoryTrackable {
private static final Logger LOG = LogManager.getLogger(RoutineLoadMgr.class);
private static final int MEMORY_JOB_SAMPLES = 10;

// be => running tasks num
private Map<Long, Integer> beTasksNum = Maps.newHashMap();
Expand Down Expand Up @@ -724,4 +726,13 @@ public Map<String, Long> estimateCount() {
return ImmutableMap.of("RoutineLoad", (long) idToRoutineLoadJob.size());
}

@Override
public List<Pair<List<Object>, Long>> getSamples() {
List<Object> samples = idToRoutineLoadJob.values()
.stream()
.limit(MEMORY_JOB_SAMPLES)
.collect(Collectors.toList());

return Lists.newArrayList(Pair.create(samples, (long) idToRoutineLoadJob.size()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReport;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.common.util.LogBuilder;
import com.starrocks.common.util.LogKey;
Expand Down Expand Up @@ -57,6 +58,7 @@

public class StreamLoadMgr implements MemoryTrackable {
private static final Logger LOG = LogManager.getLogger(StreamLoadMgr.class);
private static final int MEMORY_JOB_SAMPLES = 10;

// label -> streamLoadTask
private Map<String, StreamLoadTask> idToStreamLoadTask;
Expand Down Expand Up @@ -641,4 +643,13 @@ public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockExcept
public Map<String, Long> estimateCount() {
return ImmutableMap.of("StreamLoad", (long) idToStreamLoadTask.size());
}

@Override
public List<Pair<List<Object>, Long>> getSamples() {
List<Object> samples = idToStreamLoadTask.values()
.stream()
.limit(MEMORY_JOB_SAMPLES)
.collect(Collectors.toList());
return Lists.newArrayList(Pair.create(samples, (long) idToStreamLoadTask.size()));
}
}
Loading

0 comments on commit bc62d16

Please sign in to comment.