-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement] Optimize memory tracker #49841
Conversation
|
||
private Map<ReportType, Map<Long, ReportTask>> pendingTaskMap = Maps.newHashMap(); | ||
private final Map<ReportType, Map<Long, ReportTask>> pendingTaskMap = Maps.newHashMap(); | ||
|
||
/** | ||
* Record the mapping of <tablet id, backend id> to the to be dropped time of tablet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Data modification without synchronization
You can modify the code like this:
@@ -140,11 +140,12 @@
public class ReportHandler extends Daemon implements MemoryTrackable {
private final Object lock = new Object();
@Override
public List<Pair<List<Object>, Long>> getSamples() {
- synchronized (pendingTaskMap) {
+ synchronized (lock) {
List<Pair<List<Object>, Long>> result = new ArrayList<>();
for (Map<Long, ReportTask> taskMap : pendingTaskMap.values()) {
result.add(Pair.create(taskMap.values()
.stream()
.limit(1)
.collect(Collectors.toList()),
@@ -150,20 +151,26 @@
(long) taskMap.size()));
}
return result;
}
}
@Override
public Map<String, Long> estimateCount() {
- synchronized (pendingTaskMap) {
+ synchronized (lock) {
long count = 0;
for (Map<Long, ReportTask> taskMap : pendingTaskMap.values()) {
count += taskMap.size();
}
return ImmutableMap.of("PendingTask", count,
"ReportQueue", (long) reportQueue.size());
}
}
private static final Logger LOG = LogManager.getLogger(ReportHandler.class);
private final BlockingQueue<Pair<Long, ReportType>> reportQueue = Queues.newLinkedBlockingQueue();
- private final Map<ReportType, Map<Long, ReportTask>> pendingTaskMap = Maps.newHashMap();
+ private final Map<ReportType, Map<Long, ReportTask>> pendingTaskMap = Maps.newHashMap();
}
.collect(Collectors.toList()); | ||
|
||
return Lists.newArrayList(Pair.create(profileSamples, (long) profileMap.size()), | ||
Pair.create(loadProfileSamples, (long) loadProfileMap.size())); | ||
} finally { | ||
readLock.unlock(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Redefining the estimateCount
method might introduce confusion as the logic for counting profiles was initially removed but reintroduced later in the code. Additionally, there is a risk of class cast exceptions if assumptions about the types of objects within profileMap
and loadProfileMap
are not met during stream operations.
You can modify the code like this:
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.common.Config;
import com.starrocks.common.Pair;
import com.starrocks.memory.MemoryTrackable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.stream.Collectors;
/*
* if you want to visit the atrribute(such as queryID,defaultDb)
*/
public class ProfileManager implements MemoryTrackable {
private static final Logger LOG = LogManager.getLogger(ProfileManager.class);
private final Map<String, ProfileElement> profileMap = Maps.newConcurrentMap();
private final Map<String, ProfileElement> loadProfileMap = Maps.newConcurrentMap();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReadLock readLock = lock.readLock();
private final WriteLock writeLock = lock.writeLock();
private static final List<String> PROFILE_ELEMENT_KEYS = new ArrayList<>(
Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE,
START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE));
public static class ProfileElement {
public Map<String, String> infoStrings = Maps.newHashMap();
}
@Override
public Map<String, Long> estimateCount() {
return ImmutableMap.of("QueryProfile", (long) profileMap.size(),
"LoadProfile", (long) loadProfileMap.size());
}
@Override
public List<Pair<List<Object>, Long>> getSamples() {
readLock.lock();
try {
List<Object> profileSamples = profileMap.values()
.stream()
.limit(10)
.collect(Collectors.toList());
List<Object> loadProfileSamples = loadProfileMap.values()
.stream()
.limit(10)
.collect(Collectors.toList());
return Lists.newArrayList(Pair.create(profileSamples, (long) profileMap.size()),
Pair.create(loadProfileSamples, (long) loadProfileMap.size()));
} finally {
readLock.unlock();
}
}
}
Make sure both profileMap
and loadProfileMap
contain compatible object types for the stream operations.
.limit(100) | ||
.collect(Collectors.toList()), | ||
deleteFileCache.size()); | ||
return Lists.newArrayList(dbSamples, tableSamples, partitionSamples, dataFileSamples, deleteFileSamples); | ||
} | ||
|
||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Replacing the estimateSize
method with getSamples
, which changes the functionality.
You can modify the code like this:
@Override
public long estimateSize() {
return SizeEstimator.estimate(databases) +
SizeEstimator.estimate(tables) +
SizeEstimator.estimate(partitionNames) +
SizeEstimator.estimate(dataFileCache) +
SizeEstimator.estimate(deleteFileCache);
}
// Added new getSamples method without removing estimateSize
public List<Pair<List<Object>, Long>> getSamples() {
Pair<List<Object>, Long> dbSamples = Pair.create(databases.asMap().values()
.stream()
.limit(10)
.collect(Collectors.toList()),
databases.size());
Pair<List<Object>, Long> tableSamples = Pair.create(tables.asMap().values()
.stream()
.limit(10)
.collect(Collectors.toList()),
tables.size());
Pair<List<Object>, Long> partitionSamples = Pair.create(partitionNames.asMap().values()
.stream()
.limit(100)
.collect(Collectors.toList()),
partitionNames.size());
Pair<List<Object>, Long> dataFileSamples = Pair.create(dataFileCache.asMap().values()
.stream()
.limit(100)
.collect(Collectors.toList()),
dataFileCache.size());
Pair<List<Object>, Long> deleteFileSamples = Pair.create(deleteFileCache.asMap().values()
.stream()
.limit(100)
.collect(Collectors.toList()),
deleteFileCache.size());
return Lists.newArrayList(dbSamples, tableSamples, partitionSamples, dataFileSamples, deleteFileSamples);
}
94d128a
to
80c39ff
Compare
List<Pair<List<Object>, Long>> samples = getSamples(); | ||
long totalBytes = 0; | ||
for (Pair<List<Object>, Long> pair : samples) { | ||
List<Object> sample = pair.first; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<Object> sample = pair.first; | |
List<Object> sampleObjects = pair.first; |
for (Pair<List<Object>, Long> pair : samples) { | ||
List<Object> sample = pair.first; | ||
long size = pair.second; | ||
if (sample.size() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Precondition.check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, empty is normal when there is no objects in that manager.
totalTracked += trackMemory(ImmutableMap.of("Connector", | ||
GlobalStateMgr.getCurrentState().getConnectorMgr().getMemTrackers())); | ||
|
||
LOG.info("total tracked memory: {}, heap used memory: {}", new ByteSizeValue(totalTracked), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add another 2 metrics?
- RSS of the java process
- Size of off-heap memory, like direct buffer pool, etc.
public class InternalCatalogMemoryTracker implements MemoryTrackable { | ||
|
||
@Override | ||
public long estimateSize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still need to calculate the size of all the partitions
update: should include the size of replicas, but not include replicas in tablet inverted index.
f9f3f09
to
d113cc5
Compare
@@ -65,6 +65,8 @@ public class CachingIcebergCatalog implements IcebergCatalog { | |||
private static final Logger LOG = LogManager.getLogger(CachingIcebergCatalog.class); | |||
public static final long NEVER_CACHE = 0; | |||
public static final long DEFAULT_CACHE_NUM = 100000; | |||
private static final int MEMORY_META_SAMPLES = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use a config?
@Mergifyio backport branch-3.3 |
@Mergifyio backport branch-3.2 |
@Mergifyio backport branch-3.1 |
✅ Backports have been created
|
✅ Backports have been created
|
✅ Backports have been created
|
## Why I'm doing: Currently, memory statistics consume a lot of performance because they are all calculated each time. ## What I'm doing: Get some samples to estimate the manager's memory, even though it is inaccurate, but it's helpful to solve memory leak bugs. Signed-off-by: gengjun-git <[email protected]> (cherry picked from commit f0cb5e9) # Conflicts: # fe/fe-core/src/main/java/com/starrocks/catalog/Database.java # fe/fe-core/src/main/java/com/starrocks/memory/InternalCatalogMemoryTracker.java # fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java # fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java # fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java
## Why I'm doing: Currently, memory statistics consume a lot of performance because they are all calculated each time. ## What I'm doing: Get some samples to estimate the manager's memory, even though it is inaccurate, but it's helpful to solve memory leak bugs. Signed-off-by: gengjun-git <[email protected]> (cherry picked from commit f0cb5e9) # Conflicts: # contrib/destination-starrocks/src/main/java/io/airbyte/integrations/destination/starrocks/StreamLoader.java # fe/fe-core/src/main/java/com/starrocks/catalog/Database.java # fe/fe-core/src/main/java/com/starrocks/common/util/ProfileManager.java # fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnector.java # fe/fe-core/src/main/java/com/starrocks/connector/Connector.java # fe/fe-core/src/main/java/com/starrocks/connector/delta/CachingDeltaLakeMetastore.java # fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetastore.java # fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java # fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergCatalog.java # fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergConnector.java # fe/fe-core/src/main/java/com/starrocks/connector/jdbc/JDBCTableIdCache.java # fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java # fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java # fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobMgr.java # fe/fe-core/src/main/java/com/starrocks/memory/InternalCatalogMemoryTracker.java # fe/fe-core/src/main/java/com/starrocks/memory/MemoryTrackable.java # fe/fe-core/src/main/java/com/starrocks/memory/MemoryUsageTracker.java # fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java # fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java # fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java # fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistory.java # fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java # fe/fe-core/src/main/java/com/starrocks/warehouse/WarehouseClusterProcNode.java # fe/fe-core/src/test/java/com/starrocks/connector/delta/CachingDeltaLakeMetastoreTest.java # fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaLakeConnectorTest.java
## Why I'm doing: Currently, memory statistics consume a lot of performance because they are all calculated each time. ## What I'm doing: Get some samples to estimate the manager's memory, even though it is inaccurate, but it's helpful to solve memory leak bugs. Signed-off-by: gengjun-git <[email protected]> (cherry picked from commit f0cb5e9) # Conflicts: # contrib/destination-starrocks/src/main/java/io/airbyte/integrations/destination/starrocks/StreamLoader.java # fe/fe-core/src/main/java/com/starrocks/catalog/Database.java # fe/fe-core/src/main/java/com/starrocks/common/util/ProfileManager.java # fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnector.java # fe/fe-core/src/main/java/com/starrocks/connector/Connector.java # fe/fe-core/src/main/java/com/starrocks/connector/LazyConnector.java # fe/fe-core/src/main/java/com/starrocks/connector/delta/CachingDeltaLakeMetastore.java # fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetastore.java # fe/fe-core/src/main/java/com/starrocks/connector/hive/glue/metastore/DefaultAWSCredentialsProviderFactory.java # fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java # fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergCatalog.java # fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergConnector.java # fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java # fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java # fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java # fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobMgr.java # fe/fe-core/src/main/java/com/starrocks/memory/InternalCatalogMemoryTracker.java # fe/fe-core/src/main/java/com/starrocks/memory/MemoryTrackable.java # fe/fe-core/src/main/java/com/starrocks/memory/MemoryUsageTracker.java # fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java # fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java # fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java # fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistory.java # fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java # fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java # fe/fe-core/src/main/java/com/starrocks/warehouse/WarehouseClusterProcNode.java # fe/fe-core/src/test/java/com/starrocks/connector/delta/CachingDeltaLakeMetastoreTest.java # fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaLakeConnectorTest.java
Currently, memory statistics consume a lot of performance because they are all calculated each time. Get some samples to estimate the manager's memory, even though it is inaccurate, but it's helpful to solve memory leak bugs. Signed-off-by: gengjun-git <[email protected]> (cherry picked from commit f0cb5e9) resolve Signed-off-by: gengjun-git <[email protected]> fix Signed-off-by: gengjun-git <[email protected]>
backport #49841 ## Why I'm doing: Currently, memory statistics consume a lot of performance because they are all calculated each time. ## What I'm doing: Get some samples to estimate the manager's memory, even though it is inaccurate, but it's helpful to solve memory leak bugs. Signed-off-by: gengjun-git <[email protected]>
Currently, memory statistics consume a lot of performance because they are all calculated each time. Get some samples to estimate the manager's memory, even though it is inaccurate, but it's helpful to solve memory leak bugs. Signed-off-by: gengjun-git <[email protected]> (cherry picked from commit f0cb5e9) resolve Signed-off-by: gengjun-git <[email protected]> fix Signed-off-by: gengjun-git <[email protected]>
Signed-off-by: gengjun-git <[email protected]> Co-authored-by: gengjun-git <[email protected]>
…rRocks#50693) Signed-off-by: gengjun-git <[email protected]> Co-authored-by: gengjun-git <[email protected]>
Signed-off-by: gengjun-git <[email protected]>
## Why I'm doing: Currently, memory statistics consume a lot of performance because they are all calculated each time. ## What I'm doing: Get some samples to estimate the manager's memory, even though it is inaccurate, but it's helpful to solve memory leak bugs. Signed-off-by: gengjun-git <[email protected]> Signed-off-by: zhiminr.ren <[email protected]>
Why I'm doing:
Currently, memory statistics consume a lot of performance because they are all calculated each time.
What I'm doing:
Get some samples to estimate the manager's memory, even though it is inaccurate, but it's helpful to solve memory leak bugs.
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check:
Documentation PRs only:
If you are submitting a PR that adds or changes English documentation and have not
included Chinese documentation, then you can check the box to request GPT to translate the
English doc to Chinese. Please ensure to uncheck the Do not translate box if translation is needed.
The workflow will generate a new PR with the Chinese translation after this PR is merged.