diff --git a/fe/fe-core/src/main/java/com/starrocks/backup/BackupHandler.java b/fe/fe-core/src/main/java/com/starrocks/backup/BackupHandler.java index 574a9e453c728..c3bf8f1bf8da5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/com/starrocks/backup/BackupHandler.java @@ -91,6 +91,7 @@ import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -744,6 +745,10 @@ public void removeOldJobs() throws DdlException { public Map estimateCount() { return ImmutableMap.of("BackupOrRestoreJob", (long) dbIdToBackupOrRestoreJob.size()); } -} - + @Override + public List, Long>> getSamples() { + List jobSamples = new ArrayList<>(dbIdToBackupOrRestoreJob.values()); + return Lists.newArrayList(Pair.create(jobSamples, (long) dbIdToBackupOrRestoreJob.size())); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/Database.java b/fe/fe-core/src/main/java/com/starrocks/catalog/Database.java index 3c014993468a3..ef7b95635cbfb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/Database.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/Database.java @@ -72,6 +72,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -730,4 +731,21 @@ public void setExist(boolean exist) { public boolean getExist() { return exist; } + + public List getPartitionSamples() { + return this.idToTable.values() + .stream() + .filter(table -> table instanceof OlapTable) + .map(table -> ((OlapTable) table).getPartitionSample()) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + public int getOlapPartitionsCount() { + return this.idToTable.values() + .stream() + .filter(table -> table instanceof OlapTable) + .mapToInt(table -> ((OlapTable) table).getPartitionsCount()) + .sum(); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java index 985f81adea8c9..e602c1272534d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java @@ -3529,4 +3529,15 @@ public void unlockCreatePartition(String partitionName) { } } + public int getPartitionsCount() { + return physicalPartitionIdToPartitionId.size(); + } + + public PhysicalPartition getPartitionSample() { + if (!idToPartition.isEmpty()) { + return idToPartition.values().iterator().next().getSubPartitions().iterator().next(); + } else { + return null; + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/com/starrocks/catalog/TabletInvertedIndex.java index 78bd25a9f1e19..95a9cdfbe2ce1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/TabletInvertedIndex.java @@ -888,5 +888,24 @@ public Map estimateCount() { "TabletCount", getTabletCount(), "ReplicateCount", getReplicaCount()); } -} + @Override + public List, Long>> getSamples() { + readLock(); + try { + List tabletMetaSamples = tabletMetaMap.values() + .stream() + .limit(1) + .collect(Collectors.toList()); + + List longSamples = Lists.newArrayList(0L); + long longSize = tabletMetaMap.size() + replicaToTabletMap.size() * 2L + forceDeleteTablets.size() * 4L + + replicaMetaTable.size() * 2L + backingReplicaMetaTable.size() * 2L; + + return Lists.newArrayList(Pair.create(tabletMetaSamples, (long) tabletMetaMap.size()), + Pair.create(longSamples, longSize)); + } finally { + readUnlock(); + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index fbc13cf564ce9..99fe540e29e8a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -1340,7 +1340,7 @@ public class Config extends ConfigBase { * If set to true, memory tracker feature will open */ @ConfField(mutable = true) - public static boolean memory_tracker_enable = false; + public static boolean memory_tracker_enable = true; /** * Decide how often to track the memory usage of the FE process diff --git a/fe/fe-core/src/main/java/com/starrocks/common/util/ProfileManager.java b/fe/fe-core/src/main/java/com/starrocks/common/util/ProfileManager.java index d432a59e0d625..d701cada38309 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/util/ProfileManager.java @@ -39,10 +39,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.starrocks.common.Config; +import com.starrocks.common.Pair; import com.starrocks.memory.MemoryTrackable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.spark.util.SizeEstimator; import java.io.IOException; import java.util.ArrayList; @@ -53,6 +53,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.stream.Collectors; /* * if you want to visit the atrribute(such as queryID,defaultDb) @@ -79,20 +80,12 @@ public class ProfileManager implements MemoryTrackable { public static final String VARIABLES = "Variables"; public static final String PROFILE_COLLECT_TIME = "Collect Profile Time"; + private static final int MEMORY_PROFILE_SAMPLES = 10; + public static final ArrayList PROFILE_HEADERS = new ArrayList<>( Arrays.asList(QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE, START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE)); - @Override - public long estimateSize() { - return SizeEstimator.estimate(profileMap) + SizeEstimator.estimate(loadProfileMap); - } - - @Override - public Map estimateCount() { - return ImmutableMap.of("QueryProfile", (long) profileMap.size(), - "LoadProfile", (long) loadProfileMap.size()); - } public static class ProfileElement { public Map infoStrings = Maps.newHashMap(); @@ -298,19 +291,27 @@ public List getAllProfileElements() { return result; } - public long getQueryProfileCount() { - readLock.lock(); - try { - return profileMap.size(); - } finally { - readLock.unlock(); - } + @Override + public Map estimateCount() { + return ImmutableMap.of("QueryProfile", (long) profileMap.size(), + "LoadProfile", (long) loadProfileMap.size()); } - public long getLoadProfileCount() { + @Override + public List, Long>> getSamples() { readLock.lock(); try { - return loadProfileMap.size(); + List profileSamples = profileMap.values() + .stream() + .limit(MEMORY_PROFILE_SAMPLES) + .collect(Collectors.toList()); + List loadProfileSamples = loadProfileMap.values() + .stream() + .limit(MEMORY_PROFILE_SAMPLES) + .collect(Collectors.toList()); + + return Lists.newArrayList(Pair.create(profileSamples, (long) profileMap.size()), + Pair.create(loadProfileSamples, (long) loadProfileMap.size())); } finally { readLock.unlock(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnector.java index b80f00565d74e..b067b10ddff12 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/CatalogConnector.java @@ -14,9 +14,11 @@ package com.starrocks.connector; +import com.starrocks.common.Pair; import com.starrocks.connector.informationschema.InformationSchemaConnector; import com.starrocks.connector.metadata.TableMetaConnector; +import java.util.List; import java.util.Map; import static com.google.common.base.Preconditions.checkArgument; @@ -56,16 +58,20 @@ public boolean supportMemoryTrack() { } @Override - public long estimateSize() { - return normalConnector.estimateSize(); + public Map estimateCount() { + return normalConnector.estimateCount(); } @Override - public Map estimateCount() { - return normalConnector.estimateCount(); + public List, Long>> getSamples() { + return normalConnector.getSamples(); } public String normalConnectorClassName() { - return normalConnector.getClass().getSimpleName(); + if (normalConnector instanceof LazyConnector) { + return ((LazyConnector) normalConnector).getRealConnectorClassName(); + } else { + return normalConnector.getClass().getSimpleName(); + } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/Connector.java b/fe/fe-core/src/main/java/com/starrocks/connector/Connector.java index d40670ed1c8f7..098d90b773d08 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/Connector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/Connector.java @@ -14,9 +14,15 @@ package com.starrocks.connector; +import com.starrocks.common.Pair; import com.starrocks.connector.config.ConnectorConfig; import com.starrocks.memory.MemoryTrackable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public interface Connector extends MemoryTrackable { /** * Get the connector meta of connector @@ -44,4 +50,12 @@ default void bindConfig(ConnectorConfig config) { default boolean supportMemoryTrack() { return false; } + + default Map estimateCount() { + return new HashMap<>(); + } + + default List, Long>> getSamples() { + return new ArrayList<>(); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/LazyConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/LazyConnector.java index 189ebeb974087..adf5ea7761dad 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/LazyConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/LazyConnector.java @@ -14,10 +14,14 @@ package com.starrocks.connector; +import com.starrocks.common.Pair; import com.starrocks.connector.exception.StarRocksConnectorException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; +import java.util.Map; + public class LazyConnector implements Connector { private static final Logger LOG = LogManager.getLogger(LazyConnector.class); private Connector delegate; @@ -56,4 +60,30 @@ public void shutdown() { } } } + + @Override + public boolean supportMemoryTrack() { + initIfNeeded(); + return delegate.supportMemoryTrack(); + } + + @Override + public Map estimateCount() { + initIfNeeded(); + return delegate.estimateCount(); + } + + @Override + public List, Long>> getSamples() { + initIfNeeded(); + return delegate.getSamples(); + } + + public String getRealConnectorClassName() { + if (delegate != null) { + return delegate.getClass().getSimpleName(); + } else { + return LazyConnector.class.getSimpleName(); + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/CachingDeltaLakeMetastore.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/CachingDeltaLakeMetastore.java index 6d992202ad16a..fc466b507be8b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/CachingDeltaLakeMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/CachingDeltaLakeMetastore.java @@ -19,6 +19,7 @@ import com.starrocks.catalog.Database; import com.starrocks.catalog.Table; import com.starrocks.common.Config; +import com.starrocks.common.Pair; import com.starrocks.connector.DatabaseTableName; import com.starrocks.connector.exception.StarRocksConnectorException; import com.starrocks.connector.metastore.CachingMetastore; @@ -37,11 +38,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.stream.Collectors; import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; public class CachingDeltaLakeMetastore extends CachingMetastore implements IDeltaLakeMetastore { private static final Logger LOG = LogManager.getLogger(CachingDeltaLakeMetastore.class); + private static final int MEMORY_META_SAMPLES = 10; public final IDeltaLakeMetastore delegate; private final Map lastAccessTimeMap; @@ -180,4 +183,29 @@ public void invalidateAll() { ((DeltaLakeMetastore) delegate).invalidateAll(); } } + + @Override + public List, Long>> getSamples() { + List dbSamples = databaseCache.asMap().values() + .stream() + .limit(MEMORY_META_SAMPLES) + .collect(Collectors.toList()); + List tableSamples = tableCache.asMap().values() + .stream() + .limit(MEMORY_META_SAMPLES) + .collect(Collectors.toList()); + + List, Long>> samples = delegate.getSamples(); + samples.add(Pair.create(dbSamples, databaseCache.size())); + samples.add(Pair.create(tableSamples, tableCache.size())); + return samples; + } + + @Override + public Map estimateCount() { + Map delegateCount = Maps.newHashMap(delegate.estimateCount()); + delegateCount.put("databaseCache", databaseCache.size()); + delegateCount.put("tableCache", tableCache.size()); + return delegateCount; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java index d22b94887845d..07f9dac068043 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java @@ -14,6 +14,7 @@ package com.starrocks.connector.delta; +import com.starrocks.common.Pair; import com.starrocks.connector.Connector; import com.starrocks.connector.ConnectorContext; import com.starrocks.connector.ConnectorMetadata; @@ -24,6 +25,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -81,4 +83,19 @@ public void onCreate() { updateProcessor.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor() .registerCacheUpdateProcessor(catalogName, updateProcessor.get())); } + + @Override + public boolean supportMemoryTrack() { + return metastore != null; + } + + @Override + public List, Long>> getSamples() { + return metastore.getSamples(); + } + + @Override + public Map estimateCount() { + return metastore.estimateCount(); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetastore.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetastore.java index a3ee0dd5e95f9..ea4ca772c4c95 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetastore.java @@ -50,6 +50,7 @@ public abstract class DeltaLakeMetastore implements IDeltaLakeMetastore { private static final Logger LOG = LogManager.getLogger(DeltaLakeMetastore.class); + private static final int MEMORY_META_SAMPLES = 10; protected final String catalogName; protected final IMetastore delegate; protected final Configuration hdfsConfiguration; @@ -173,4 +174,25 @@ public void invalidateAll() { checkpointCache.invalidateAll(); jsonCache.invalidateAll(); } + + @Override + public Map estimateCount() { + return Map.of("checkpointCache", checkpointCache.size(), "jsonCache", jsonCache.size()); + } + + @Override + public List, Long>> getSamples() { + List jsonSamples = jsonCache.asMap().values() + .stream() + .limit(MEMORY_META_SAMPLES) + .collect(Collectors.toList()); + + List checkpointSamples = checkpointCache.asMap().values() + .stream() + .limit(MEMORY_META_SAMPLES) + .collect(Collectors.toList()); + + return Lists.newArrayList(Pair.create(jsonSamples, jsonCache.size()), + Pair.create(checkpointSamples, checkpointCache.size())); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/IDeltaLakeMetastore.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/IDeltaLakeMetastore.java index 307bfce9c7e62..56087d6d48523 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/IDeltaLakeMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/IDeltaLakeMetastore.java @@ -16,10 +16,11 @@ import com.starrocks.catalog.Table; import com.starrocks.connector.metastore.IMetastore; +import com.starrocks.memory.MemoryTrackable; import java.util.List; -public interface IDeltaLakeMetastore extends IMetastore { +public interface IDeltaLakeMetastore extends IMetastore, MemoryTrackable { Table getTable(String dbName, String tableName); List getPartitionKeys(String dbName, String tableName); diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/elasticsearch/ElasticsearchConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/elasticsearch/ElasticsearchConnector.java index d0f73eb5d6c6a..be124f6948ac4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/elasticsearch/ElasticsearchConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/elasticsearch/ElasticsearchConnector.java @@ -59,5 +59,4 @@ public void bindConfig(ConnectorConfig config) { this.esRestClient = new EsRestClient(esConfig.getNodes(), esConfig.getUserName(), esConfig.getPassword(), esConfig.isEnableSsl()); } - } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java index 775af0c6168b9..f5814969ecec2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/CachingIcebergCatalog.java @@ -21,6 +21,7 @@ import com.starrocks.catalog.Database; import com.starrocks.common.Config; import com.starrocks.common.MetaNotFoundException; +import com.starrocks.common.Pair; import com.starrocks.connector.ConnectorViewDefinition; import com.starrocks.connector.PlanMode; import com.starrocks.connector.exception.StarRocksConnectorException; @@ -44,7 +45,6 @@ import org.apache.iceberg.view.View; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.spark.util.SizeEstimator; import java.io.IOException; import java.util.ArrayList; @@ -65,6 +65,8 @@ public class CachingIcebergCatalog implements IcebergCatalog { private static final Logger LOG = LogManager.getLogger(CachingIcebergCatalog.class); public static final long NEVER_CACHE = 0; public static final long DEFAULT_CACHE_NUM = 100000; + private static final int MEMORY_META_SAMPLES = 10; + private static final int MEMORY_FILE_SAMPLES = 100; private final String catalogName; private final IcebergCatalog delegate; private final Cache tables; @@ -429,13 +431,51 @@ public String toString() { } @Override - public long estimateSize() { - return SizeEstimator.estimate(databases) + - SizeEstimator.estimate(tables) + - SizeEstimator.estimate(partitionNames) + - SizeEstimator.estimate(dataFileCache) + - SizeEstimator.estimate(deleteFileCache); + public List, Long>> getSamples() { + Pair, Long> dbSamples = Pair.create(databases.asMap().values() + .stream() + .limit(MEMORY_META_SAMPLES) + .collect(Collectors.toList()), + databases.size()); + + Pair, Long> tableSamples = Pair.create(tables.asMap().values() + .stream() + .limit(MEMORY_META_SAMPLES) + .collect(Collectors.toList()), + tables.size()); + + List partitions = partitionNames.asMap().values() + .stream() + .flatMap(List::stream) + .limit(MEMORY_FILE_SAMPLES) + .collect(Collectors.toList()); + long partitionTotal = partitionNames.asMap().values() + .stream() + .mapToLong(List::size) + .sum(); + Pair, Long> partitionSamples = Pair.create(partitions, partitionTotal); + + List dataFiles = dataFileCache.asMap().values() + .stream().flatMap(Set::stream) + .limit(MEMORY_FILE_SAMPLES) + .collect(Collectors.toList()); + long dataFilesTotal = dataFileCache.asMap().values() + .stream() + .mapToLong(Set::size) + .sum(); + Pair, Long> dataFileSamples = Pair.create(dataFiles, dataFilesTotal); + + List deleteFiles = deleteFileCache.asMap().values() + .stream().flatMap(Set::stream) + .limit(MEMORY_FILE_SAMPLES) + .collect(Collectors.toList()); + long deleteFilesTotal = deleteFileCache.asMap().values() + .stream() + .mapToLong(Set::size) + .sum(); + Pair, Long> deleteFileSamples = Pair.create(deleteFiles, deleteFilesTotal); + return Lists.newArrayList(dbSamples, tableSamples, partitionSamples, dataFileSamples, deleteFileSamples); } @Override @@ -443,9 +483,18 @@ public Map estimateCount() { Map counter = new HashMap<>(); counter.put("Database", databases.size()); counter.put("Table", tables.size()); - counter.put("PartitionNames", partitionNames.size()); - counter.put("ManifestOfDataFile", dataFileCache.size()); - counter.put("ManifestOfDeleteFile", deleteFileCache.size()); + counter.put("PartitionNames", partitionNames.asMap().values() + .stream() + .mapToLong(List::size) + .sum()); + counter.put("ManifestOfDataFile", dataFileCache.asMap().values() + .stream() + .mapToLong(Set::size) + .sum()); + counter.put("ManifestOfDeleteFile", deleteFileCache.asMap().values() + .stream() + .mapToLong(Set::size) + .sum()); return counter; } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergCatalog.java b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergCatalog.java index cb0152044b0b9..267d2897a116f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergCatalog.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergCatalog.java @@ -18,6 +18,7 @@ import com.google.common.collect.Sets; import com.starrocks.catalog.Database; import com.starrocks.common.MetaNotFoundException; +import com.starrocks.common.Pair; import com.starrocks.connector.ConnectorViewDefinition; import com.starrocks.connector.exception.StarRocksConnectorException; import com.starrocks.memory.MemoryTrackable; @@ -151,4 +152,12 @@ default String defaultTableLocation(String dbName, String tableName) { default Map loadNamespaceMetadata(String dbName) { return new HashMap<>(); } + + default Map estimateCount() { + return new HashMap<>(); + } + + default List, Long>> getSamples() { + return new ArrayList<>(); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergConnector.java index b97d1258c8c90..53ab78da05f64 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/iceberg/IcebergConnector.java @@ -15,6 +15,7 @@ package com.starrocks.connector.iceberg; import com.starrocks.common.Config; +import com.starrocks.common.Pair; import com.starrocks.connector.Connector; import com.starrocks.connector.ConnectorContext; import com.starrocks.connector.ConnectorMetadata; @@ -32,6 +33,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -144,12 +146,12 @@ public boolean supportMemoryTrack() { } @Override - public long estimateSize() { - return icebergNativeCatalog.estimateSize(); + public Map estimateCount() { + return icebergNativeCatalog.estimateCount(); } @Override - public Map estimateCount() { - return icebergNativeCatalog.estimateCount(); + public List, Long>> getSamples() { + return icebergNativeCatalog.getSamples(); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java index f0a2065dab1f3..d320544412253 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionMgr.java @@ -16,8 +16,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import com.starrocks.common.Config; +import com.starrocks.common.Pair; import com.starrocks.memory.MemoryTrackable; import com.starrocks.persist.ImageWriter; import com.starrocks.persist.metablock.SRMetaBlockEOFException; @@ -231,4 +233,13 @@ public PartitionStatistics triggerManualCompaction(PartitionIdentifier partition public Map estimateCount() { return ImmutableMap.of("PartitionStats", (long) partitionStatisticsHashMap.size()); } + + @Override + public List, Long>> getSamples() { + List samples = partitionStatisticsHashMap.values() + .stream() + .limit(1) + .collect(Collectors.toList()); + return Lists.newArrayList(Pair.create(samples, (long) partitionStatisticsHashMap.size())); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java b/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java index 3c88d91e30e1a..8098403c4e866 100644 --- a/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java +++ b/fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java @@ -127,7 +127,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.spark.util.SizeEstimator; import org.apache.thrift.TException; import java.util.ArrayList; @@ -142,17 +141,30 @@ public class ReportHandler extends Daemon implements MemoryTrackable { @Override - public long estimateSize() { - return SizeEstimator.estimate(reportQueue) + SizeEstimator.estimate(pendingTaskMap); + public List, Long>> getSamples() { + synchronized (pendingTaskMap) { + List, Long>> result = new ArrayList<>(); + for (Map taskMap : pendingTaskMap.values()) { + result.add(Pair.create(taskMap.values() + .stream() + .limit(1) + .collect(Collectors.toList()), + (long) taskMap.size())); + } + return result; + } } @Override public Map estimateCount() { - long count = 0; - for (Map taskMap : pendingTaskMap.values()) { - count += taskMap.size(); + synchronized (pendingTaskMap) { + long count = 0; + for (Map taskMap : pendingTaskMap.values()) { + count += taskMap.size(); + } + return ImmutableMap.of("PendingTask", count, + "ReportQueue", (long) reportQueue.size()); } - return ImmutableMap.of("PendingTask", count, "ReportQueue", (long) reportQueue.size()); } public enum ReportType { @@ -172,9 +184,9 @@ public enum ReportType { private static final Logger LOG = LogManager.getLogger(ReportHandler.class); - private BlockingQueue> reportQueue = Queues.newLinkedBlockingQueue(); + private final BlockingQueue> reportQueue = Queues.newLinkedBlockingQueue(); - private Map> pendingTaskMap = Maps.newHashMap(); + private final Map> pendingTaskMap = Maps.newHashMap(); /** * Record the mapping of to the to be dropped time of tablet. diff --git a/fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java b/fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java index de472241445b8..095d93e5e7ba2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java @@ -81,6 +81,7 @@ import com.starrocks.common.ErrorCode; import com.starrocks.common.ErrorReport; import com.starrocks.common.FeConstants; +import com.starrocks.common.Pair; import com.starrocks.common.io.Text; import com.starrocks.common.io.Writable; import com.starrocks.common.util.DateUtils; @@ -911,4 +912,14 @@ public Map estimateCount() { "DeleteJob", (long) idToDeleteJob.size()); } + @Override + public List, Long>> getSamples() { + List samples = dbToDeleteInfos.values() + .stream() + .filter(infos -> !infos.isEmpty()) + .map(infos -> infos.stream().findAny().get()) + .collect(Collectors.toList()); + long size = dbToDeleteInfos.values().stream().mapToInt(List::size).sum(); + return Lists.newArrayList(Pair.create(samples, size)); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/ExportMgr.java b/fe/fe-core/src/main/java/com/starrocks/load/ExportMgr.java index b89765c7e4778..ac3431ea46e48 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/ExportMgr.java @@ -44,6 +44,7 @@ import com.starrocks.common.AnalysisException; import com.starrocks.common.Config; import com.starrocks.common.FeConstants; +import com.starrocks.common.Pair; import com.starrocks.common.UserException; import com.starrocks.common.util.ListComparator; import com.starrocks.common.util.OrderByPair; @@ -441,4 +442,9 @@ public void loadExportJobV2(SRMetaBlockReader reader) throws IOException, SRMeta public Map estimateCount() { return ImmutableMap.of("ExportJob", (long) idToJob.size()); } + + @Override + public List, Long>> getSamples() { + return Lists.newArrayList(Pair.create(new ArrayList<>(idToJob.values()), (long) idToJob.size())); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobMgr.java b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobMgr.java index 19149019ff014..87900b468ef32 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/InsertOverwriteJobMgr.java @@ -19,6 +19,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import com.starrocks.common.Pair; import com.starrocks.common.io.Text; import com.starrocks.common.io.Writable; import com.starrocks.memory.MemoryTrackable; @@ -48,9 +49,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; public class InsertOverwriteJobMgr implements Writable, GsonPostProcessable, MemoryTrackable { private static final Logger LOG = LogManager.getLogger(InsertOverwriteJobMgr.class); + private static final int MEMORY_JOB_SAMPLES = 10; @SerializedName(value = "overwriteJobMap") private Map overwriteJobMap; @@ -272,4 +275,13 @@ public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockExcept public Map estimateCount() { return ImmutableMap.of("insertOverwriteJobs", (long) overwriteJobMap.size()); } + + @Override + public List, Long>> getSamples() { + List samples = overwriteJobMap.values() + .stream() + .limit(MEMORY_JOB_SAMPLES) + .collect(Collectors.toList()); + return Lists.newArrayList(Pair.create(samples, (long) overwriteJobMap.size())); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadMgr.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadMgr.java index e077500673c9e..d0d2ebecffb4b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadMgr.java @@ -46,6 +46,7 @@ import com.starrocks.common.LabelAlreadyUsedException; import com.starrocks.common.LoadException; import com.starrocks.common.MetaNotFoundException; +import com.starrocks.common.Pair; import com.starrocks.common.TimeoutException; import com.starrocks.common.UserException; import com.starrocks.common.io.Writable; @@ -108,6 +109,7 @@ */ public class LoadMgr implements Writable, MemoryTrackable { private static final Logger LOG = LogManager.getLogger(LoadMgr.class); + private static final int MEMORY_JOB_SAMPLES = 10; private final Map idToLoadJob = Maps.newConcurrentMap(); private final Map>> dbIdToLabelToLoadJobs = Maps.newConcurrentMap(); @@ -821,4 +823,13 @@ public void saveLoadJobsV2JsonFormat(ImageWriter imageWriter) throws IOException public Map estimateCount() { return ImmutableMap.of("LoadJob", (long) idToLoadJob.size()); } + + @Override + public List, Long>> getSamples() { + List samples = idToLoadJob.values() + .stream() + .limit(MEMORY_JOB_SAMPLES) + .collect(Collectors.toList()); + return Lists.newArrayList(Pair.create(samples, (long) idToLoadJob.size())); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadMgr.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadMgr.java index 7b41156b3325d..becbd2920ec27 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadMgr.java @@ -45,6 +45,7 @@ import com.starrocks.common.DdlException; import com.starrocks.common.InternalErrorCode; import com.starrocks.common.MetaNotFoundException; +import com.starrocks.common.Pair; import com.starrocks.common.UserException; import com.starrocks.common.io.Writable; import com.starrocks.common.util.DebugUtil; @@ -93,6 +94,7 @@ public class RoutineLoadMgr implements Writable, MemoryTrackable { private static final Logger LOG = LogManager.getLogger(RoutineLoadMgr.class); + private static final int MEMORY_JOB_SAMPLES = 10; // warehouse ==> {be : running tasks num} private Map> warehouseNodeTasksNum = Maps.newHashMap(); @@ -803,4 +805,13 @@ public Map estimateCount() { return ImmutableMap.of("RoutineLoad", (long) idToRoutineLoadJob.size()); } + @Override + public List, Long>> getSamples() { + List samples = idToRoutineLoadJob.values() + .stream() + .limit(MEMORY_JOB_SAMPLES) + .collect(Collectors.toList()); + + return Lists.newArrayList(Pair.create(samples, (long) idToRoutineLoadJob.size())); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadMgr.java b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadMgr.java index e1b31b4e41eb8..10fd1e1e3cf25 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadMgr.java @@ -26,6 +26,7 @@ import com.starrocks.common.ErrorCode; import com.starrocks.common.ErrorReport; import com.starrocks.common.MetaNotFoundException; +import com.starrocks.common.Pair; import com.starrocks.common.UserException; import com.starrocks.common.util.LogBuilder; import com.starrocks.common.util.LogKey; @@ -60,6 +61,7 @@ public class StreamLoadMgr implements MemoryTrackable { private static final Logger LOG = LogManager.getLogger(StreamLoadMgr.class); + private static final int MEMORY_JOB_SAMPLES = 10; // label -> streamLoadTask private Map idToStreamLoadTask; @@ -672,4 +674,13 @@ public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockExcept public Map estimateCount() { return ImmutableMap.of("StreamLoad", (long) idToStreamLoadTask.size()); } + + @Override + public List, Long>> getSamples() { + List samples = idToStreamLoadTask.values() + .stream() + .limit(MEMORY_JOB_SAMPLES) + .collect(Collectors.toList()); + return Lists.newArrayList(Pair.create(samples, (long) idToStreamLoadTask.size())); + } } \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/memory/AgentTaskTracker.java b/fe/fe-core/src/main/java/com/starrocks/memory/AgentTaskTracker.java index a0df133b40021..21a23f54beeb8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/memory/AgentTaskTracker.java +++ b/fe/fe-core/src/main/java/com/starrocks/memory/AgentTaskTracker.java @@ -15,19 +15,22 @@ package com.starrocks.memory; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.starrocks.common.Pair; import com.starrocks.task.AgentTaskQueue; -import org.apache.spark.util.SizeEstimator; +import java.util.List; import java.util.Map; public class AgentTaskTracker implements MemoryTrackable { @Override - public long estimateSize() { - return SizeEstimator.estimate(AgentTaskQueue.tasks); + public Map estimateCount() { + return ImmutableMap.of("AgentTask", (long) AgentTaskQueue.getTaskNum()); } @Override - public Map estimateCount() { - return ImmutableMap.of("AgentTask", (long) AgentTaskQueue.getTaskNum()); + public List, Long>> getSamples() { + return Lists.newArrayList(Pair.create(AgentTaskQueue.getSamplesForMemoryTracker(), + (long) AgentTaskQueue.getTaskNum())); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/memory/InternalCatalogMemoryTracker.java b/fe/fe-core/src/main/java/com/starrocks/memory/InternalCatalogMemoryTracker.java deleted file mode 100644 index 1d3a51e409532..0000000000000 --- a/fe/fe-core/src/main/java/com/starrocks/memory/InternalCatalogMemoryTracker.java +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2021-present StarRocks, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.starrocks.memory; - -import com.google.common.collect.ImmutableMap; -import com.starrocks.catalog.Database; -import com.starrocks.catalog.Partition; -import com.starrocks.catalog.Table; -import com.starrocks.server.GlobalStateMgr; -import org.apache.spark.util.SizeEstimator; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class InternalCatalogMemoryTracker implements MemoryTrackable { - - @Override - public long estimateSize() { - long estimateSize = 0L; - List databases = new ArrayList<>(GlobalStateMgr.getCurrentState().getLocalMetastore().getIdToDb().values()); - for (Database database : databases) { - List tables = GlobalStateMgr.getCurrentState().getLocalMetastore().getTables(database.getId()); - for (Table table : tables) { - Collection partitions = table.getPartitions(); - Iterator iterator = partitions.iterator(); - if (iterator.hasNext()) { - estimateSize += SizeEstimator.estimate(iterator.next()) * partitions.size(); - } - } - } - return estimateSize; - } - - @Override - public Map estimateCount() { - long estimateCount = 0; - List databases = new ArrayList<>(GlobalStateMgr.getCurrentState().getLocalMetastore().getIdToDb().values()); - for (Database database : databases) { - List
tables = GlobalStateMgr.getCurrentState().getLocalMetastore().getTables(database.getId()); - for (Table table : tables) { - Collection partitions = table.getPartitions(); - estimateCount += partitions.size(); - } - } - return ImmutableMap.of("Catalog", GlobalStateMgr.getCurrentState().getCatalogMgr().getCatalogCount(), - "Partition", estimateCount); - } - -} diff --git a/fe/fe-core/src/main/java/com/starrocks/memory/MemoryTrackable.java b/fe/fe-core/src/main/java/com/starrocks/memory/MemoryTrackable.java index efd2bafab636f..e3470fec5f19d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/memory/MemoryTrackable.java +++ b/fe/fe-core/src/main/java/com/starrocks/memory/MemoryTrackable.java @@ -14,18 +14,34 @@ package com.starrocks.memory; +import com.starrocks.common.Pair; import org.apache.spark.util.SizeEstimator; -import java.util.HashMap; +import java.util.List; import java.util.Map; public interface MemoryTrackable { - default long estimateSize() { - return SizeEstimator.estimate(this); - } + List, Long>> samples = getSamples(); + long totalBytes = 0; + for (Pair, Long> pair : samples) { + List sampleObjects = pair.first; + long size = pair.second; + if (!sampleObjects.isEmpty()) { + totalBytes += (long) (((double) SizeEstimator.estimate(sampleObjects)) / sampleObjects.size() * size); + } + } - default Map estimateCount() { - return new HashMap<>(); + return totalBytes; } + + Map estimateCount(); + + // Samples for estimateSize() to calculate memory size; + // Pair.fist is the sample objects, Pair.second is the total size of that module. + // For example: + // Manager has two list attributes: List, List, we get 10 objects for samples, + // this function should return: + // Pair<10 A objects, List.size()>, Pair<10 B object, List.size()> + List, Long>> getSamples(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/memory/MemoryUsageTracker.java b/fe/fe-core/src/main/java/com/starrocks/memory/MemoryUsageTracker.java index bddcde805d2d0..61eef142ea213 100644 --- a/fe/fe-core/src/main/java/com/starrocks/memory/MemoryUsageTracker.java +++ b/fe/fe-core/src/main/java/com/starrocks/memory/MemoryUsageTracker.java @@ -19,6 +19,7 @@ import com.starrocks.common.Config; import com.starrocks.common.util.FrontendDaemon; import com.starrocks.common.util.ProfileManager; +import com.starrocks.monitor.jvm.JvmStats; import com.starrocks.monitor.unit.ByteSizeValue; import com.starrocks.persist.gson.GsonUtils; import com.starrocks.qe.QeProcessor; @@ -29,6 +30,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; @@ -42,6 +45,7 @@ public class MemoryUsageTracker extends FrontendDaemon { new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER); public static final Map> MEMORY_USAGE = Maps.newConcurrentMap(); + private static MemoryMXBean memoryMXBean; private boolean initialize; public MemoryUsageTracker() { @@ -64,11 +68,11 @@ private void initMemoryTracker() { registerMemoryTracker("Task", currentState.getTaskManager()); registerMemoryTracker("Task", currentState.getTaskManager().getTaskRunManager()); registerMemoryTracker("TabletInvertedIndex", currentState.getTabletInvertedIndex()); + registerMemoryTracker("LocalMetastore", currentState.getLocalMetastore()); registerMemoryTracker("Query", new QueryTracker()); registerMemoryTracker("Profile", ProfileManager.getInstance()); registerMemoryTracker("Agent", new AgentTaskTracker()); - registerMemoryTracker("LocalCatalog", new InternalCatalogMemoryTracker()); QeProcessor qeProcessor = QeProcessorImpl.INSTANCE; if (qeProcessor instanceof QeProcessorImpl) { @@ -80,6 +84,8 @@ private void initMemoryTracker() { registerMemoryTracker("Dict", (CacheDictManager) dictManager); } + memoryMXBean = ManagementFactory.getMemoryMXBean(); + LOG.info("Memory usage tracker init success"); initialize = true; @@ -91,11 +97,30 @@ public static void registerMemoryTracker(String moduleName, MemoryTrackable obje } public static void trackMemory() { - trackMemory(REFERENCE); - trackMemory(ImmutableMap.of("Connector", GlobalStateMgr.getCurrentState().getConnectorMgr().getMemTrackers())); + long totalTracked = trackMemory(REFERENCE); + totalTracked += trackMemory(ImmutableMap.of("Connector", + GlobalStateMgr.getCurrentState().getConnectorMgr().getMemTrackers())); + + LOG.info("total tracked memory: {}, jvm: {}", new ByteSizeValue(totalTracked), getJVMMemory()); + } + + private static String getJVMMemory() { + JvmStats jvmStats = JvmStats.jvmStats(); + long directBufferUsed = 0; + for (JvmStats.BufferPool pool : jvmStats.getBufferPools()) { + if (pool.getName().equalsIgnoreCase("direct")) { + directBufferUsed = pool.getUsed(); + } + } + return String.format("Process used: %s, heap used: %s, non heap used: %s, direct buffer used: %s", + new ByteSizeValue(Runtime.getRuntime().totalMemory()), + new ByteSizeValue(jvmStats.getMem().getHeapUsed()), + new ByteSizeValue(jvmStats.getMem().getNonHeapUsed()), + new ByteSizeValue(directBufferUsed)); } - private static void trackMemory(Map> trackers) { + private static long trackMemory(Map> trackers) { + long totalTracked = 0; for (Map.Entry> entry : trackers.entrySet()) { String moduleName = entry.getKey(); Map statMap = entry.getValue(); @@ -127,11 +152,15 @@ private static void trackMemory(Map> tracke memoryStat.setCounterMap(counterMap); usageMap.put(className, memoryStat); + totalTracked += currentEstimateSize; + LOG.info("({}ms) Module {} - {} estimated {} of memory. Contains {}", endTime - startTime, moduleName, className, new ByteSizeValue(currentEstimateSize), sb.toString()); } } + + return totalTracked; } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/memory/QueryTracker.java b/fe/fe-core/src/main/java/com/starrocks/memory/QueryTracker.java index 554537a7f3b0f..3c2c25f1bb931 100644 --- a/fe/fe-core/src/main/java/com/starrocks/memory/QueryTracker.java +++ b/fe/fe-core/src/main/java/com/starrocks/memory/QueryTracker.java @@ -15,19 +15,22 @@ package com.starrocks.memory; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.starrocks.common.Pair; import com.starrocks.qe.QueryDetailQueue; -import org.apache.spark.util.SizeEstimator; +import java.util.List; import java.util.Map; public class QueryTracker implements MemoryTrackable { @Override - public long estimateSize() { - return SizeEstimator.estimate(QueryDetailQueue.TOTAL_QUERIES); + public Map estimateCount() { + return ImmutableMap.of("QueryDetail", QueryDetailQueue.getTotalQueriesCount()); } @Override - public Map estimateCount() { - return ImmutableMap.of("QueryDetail", QueryDetailQueue.getTotalQueriesCount()); + public List, Long>> getSamples() { + return Lists.newArrayList(Pair.create(QueryDetailQueue.getSamplesForMemoryTracker(), + QueryDetailQueue.getTotalQueriesCount())); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java index e2a8e001ac63d..5ef045a3a7072 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/QeProcessorImpl.java @@ -35,9 +35,11 @@ package com.starrocks.qe; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.starrocks.catalog.MvId; import com.starrocks.common.Config; +import com.starrocks.common.Pair; import com.starrocks.common.Status; import com.starrocks.common.UserException; import com.starrocks.common.util.DebugUtil; @@ -57,7 +59,6 @@ import com.starrocks.thrift.TUniqueId; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.spark.util.SizeEstimator; import java.util.Iterator; import java.util.List; @@ -68,8 +69,8 @@ import static com.starrocks.mysql.MysqlCommand.COM_STMT_EXECUTE; public final class QeProcessorImpl implements QeProcessor, MemoryTrackable { - private static final Logger LOG = LogManager.getLogger(QeProcessorImpl.class); + private static final int MEMORY_QUERY_SAMPLES = 10; private static final long ONE_MINUTE = 60 * 1000L; private final Map coordinatorMap = Maps.newConcurrentMap(); private final Map monitorQueryMap = Maps.newConcurrentMap(); @@ -313,8 +314,12 @@ public long getCoordinatorCount() { } @Override - public long estimateSize() { - return SizeEstimator.estimate(coordinatorMap) + SizeEstimator.estimate(monitorQueryMap); + public List, Long>> getSamples() { + List samples = coordinatorMap.values() + .stream() + .limit(MEMORY_QUERY_SAMPLES) + .collect(Collectors.toList()); + return Lists.newArrayList(Pair.create(samples, (long) coordinatorMap.size())); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/QueryDetailQueue.java b/fe/fe-core/src/main/java/com/starrocks/qe/QueryDetailQueue.java index cf2bf528cac5f..c1a28f75d6ca5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/QueryDetailQueue.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/QueryDetailQueue.java @@ -36,7 +36,9 @@ import com.google.common.collect.Lists; import com.starrocks.common.Config; +import com.starrocks.common.Pair; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executors; @@ -94,4 +96,17 @@ private static long getCurrentTimeNS() { return ms * 1000000; } } + + public static List getSamplesForMemoryTracker() { + List samples = new ArrayList<>(); + QueryDetail first = TOTAL_QUERIES.peekFirst(); + if (first != null) { + samples.add(first); + } + QueryDetail last = TOTAL_QUERIES.peekLast(); + if (last != null) { + samples.add(last); + } + return Lists.newArrayList(Pair.create(samples, TOTAL_QUERIES.size())); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java index 30ed5f04ff7af..470f07fdedf61 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java @@ -26,6 +26,7 @@ import com.starrocks.catalog.ScalarType; import com.starrocks.common.Config; import com.starrocks.common.DdlException; +import com.starrocks.common.Pair; import com.starrocks.common.util.LogUtil; import com.starrocks.common.util.TimeUtils; import com.starrocks.common.util.concurrent.QueryableReentrantLock; @@ -53,7 +54,6 @@ import org.apache.commons.collections4.ListUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.spark.util.SizeEstimator; import java.io.IOException; import java.time.Duration; @@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.Collectors; import static com.starrocks.scheduler.SubmitResult.SubmitStatus.SUBMITTED; @@ -874,8 +875,12 @@ public Map estimateCount() { } @Override - public long estimateSize() { - return SizeEstimator.estimate(idToTaskMap.values()); + public List, Long>> getSamples() { + List taskSamples = idToTaskMap.values() + .stream() + .limit(1) + .collect(Collectors.toList()); + return Lists.newArrayList(Pair.create(taskSamples, (long) idToTaskMap.size())); } public boolean containTask(String taskName) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java index ae8b762943e3b..092855f292577 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java @@ -15,9 +15,10 @@ package com.starrocks.scheduler; -import com.google.api.client.util.Lists; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.starrocks.common.Config; +import com.starrocks.common.Pair; import com.starrocks.common.util.LogUtil; import com.starrocks.common.util.UUIDUtil; import com.starrocks.common.util.concurrent.QueryableReentrantLock; @@ -30,6 +31,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -108,7 +110,7 @@ public boolean arrangeTaskRun(TaskRun taskRun, boolean isReplay) { if (!tryTaskRunLock()) { return false; } - List mergedTaskRuns = Lists.newArrayList(); + List mergedTaskRuns = new ArrayList<>(); try { long taskId = taskRun.getTaskId(); Set taskRuns = taskRunScheduler.getPendingTaskRunsByTaskId(taskId); @@ -286,7 +288,16 @@ public Map estimateCount() { "RunningTaskRun", (long) taskRunScheduler.getRunningTaskCount(), "HistoryTaskRun", taskRunHistory.getTaskRunCount()); } - + + @Override + public List, Long>> getSamples() { + List taskRunSamples = taskRunHistory.getSamplesForMemoryTracker(); + long size = taskRunScheduler.getPendingQueueCount() + + taskRunScheduler.getRunningTaskCount() + + taskRunHistory.getTaskRunCount(); + return Lists.newArrayList(Pair.create(taskRunSamples, size)); + } + /** * For diagnosis purpose * diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistory.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistory.java index 380e53ff15fdf..455eaae21dacc 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistory.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/history/TaskRunHistory.java @@ -39,6 +39,7 @@ public class TaskRunHistory { private static final Logger LOG = LogManager.getLogger(TaskRunHistory.class); + private static final int MEMORY_TASK_RUN_SAMPLES = 10; // Thread-Safe history map: // QueryId -> TaskRunStatus @@ -86,6 +87,13 @@ public synchronized long getTaskRunCount() { return historyTaskRunMap.size(); } + public synchronized List getSamplesForMemoryTracker() { + return historyTaskRunMap.values() + .stream() + .limit(MEMORY_TASK_RUN_SAMPLES) + .collect(Collectors.toList()); + } + public List lookupHistoryByTaskNames(String dbName, Set taskNames) { List result = getInMemoryHistory().stream() .filter(x -> x.matchByTaskName(dbName, taskNames)) diff --git a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java index f8ab87e29a6f4..50125a294cea0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java @@ -132,6 +132,7 @@ import com.starrocks.lake.LakeTablet; import com.starrocks.lake.StorageInfo; import com.starrocks.load.pipe.PipeManager; +import com.starrocks.memory.MemoryTrackable; import com.starrocks.mv.MVMetaVersionRepairer; import com.starrocks.mv.MVRepairHandler; import com.starrocks.mv.analyzer.MVPartitionExprResolver; @@ -286,7 +287,7 @@ import static com.starrocks.server.GlobalStateMgr.NEXT_ID_INIT_VALUE; import static com.starrocks.server.GlobalStateMgr.isCheckpointThread; -public class LocalMetastore implements ConnectorMetadata, MVRepairHandler { +public class LocalMetastore implements ConnectorMetadata, MVRepairHandler, MemoryTrackable { private static final Logger LOG = LogManager.getLogger(LocalMetastore.class); private final ConcurrentHashMap idToDb = new ConcurrentHashMap<>(); @@ -5306,4 +5307,47 @@ public void load(SRMetaBlockReader reader) throws IOException, SRMetaBlockExcept public void handleMVRepair(Database db, Table table, List partitionRepairInfos) { MVMetaVersionRepairer.repairBaseTableVersionChanges(db, table, partitionRepairInfos); } + + @Override + public List, Long>> getSamples() { + long totalCount = idToDb.values() + .stream() + .mapToInt(database -> { + Locker locker = new Locker(); + locker.lockDatabase(database, LockType.READ); + try { + return database.getOlapPartitionsCount(); + } finally { + locker.unLockDatabase(database, LockType.READ); + } + }).sum(); + List samples = new ArrayList<>(); + // get every olap table's first partition + for (Database database : idToDb.values()) { + Locker locker = new Locker(); + locker.lockDatabase(database, LockType.READ); + try { + samples.addAll(database.getPartitionSamples()); + } finally { + locker.unLockDatabase(database, LockType.READ); + } + } + return Lists.newArrayList(Pair.create(samples, totalCount)); + } + + @Override + public Map estimateCount() { + long totalCount = idToDb.values() + .stream() + .mapToInt(database -> { + Locker locker = new Locker(); + locker.lockDatabase(database, LockType.READ); + try { + return database.getOlapPartitionsCount(); + } finally { + locker.unLockDatabase(database, LockType.READ); + } + }).sum(); + return ImmutableMap.of("Partition", totalCount); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/CacheDictManager.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/CacheDictManager.java index 8331dca6bbab9..62fe42ad6bdbf 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/CacheDictManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/CacheDictManager.java @@ -18,6 +18,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.starrocks.catalog.ColumnId; import com.starrocks.catalog.Database; @@ -35,6 +36,7 @@ import org.checkerframework.checker.nullness.qual.NonNull; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -297,4 +299,34 @@ public Optional getGlobalDict(long tableId, ColumnId columnName) { public Map estimateCount() { return ImmutableMap.of("ColumnDict", (long) dictStatistics.asMap().size()); } + + @Override + public List, Long>> getSamples() { + List samples = new ArrayList<>(); + dictStatistics.asMap().values().stream().findAny().ifPresent(future -> { + if (future.isDone()) { + try { + future.get().ifPresent(samples::add); + } catch (Exception e) { + LOG.warn("get samples failed", e); + } + } + }); + + return Lists.newArrayList(Pair.create(samples, (long) dictStatistics.asMap().size())); + } + + private List getSamplesForMemoryTracker() { + List result = new ArrayList<>(); + dictStatistics.asMap().values().stream().findAny().ifPresent(future -> { + if (future.isDone()) { + try { + future.get().ifPresent(result::add); + } catch (Exception e) { + LOG.warn("get samples failed", e); + } + } + }); + return result; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/task/AgentTaskQueue.java b/fe/fe-core/src/main/java/com/starrocks/task/AgentTaskQueue.java index e796708515c60..7dc1041742ff1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/AgentTaskQueue.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/AgentTaskQueue.java @@ -50,6 +50,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -326,5 +327,18 @@ public static synchronized List getFailedTask(long backendId, TTaskTy } return tasks; } -} + public static synchronized List getSamplesForMemoryTracker() { + List result = new ArrayList<>(); + // Get one task of each type + for (TTaskType type : TTaskType.values()) { + Map> tasksForType = tasks.column(type); + Optional> beTasks = tasksForType.values().stream().findAny(); + if (beTasks.isPresent()) { + Optional task = beTasks.get().values().stream().findAny(); + task.ifPresent(result::add); + } + } + return result; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java index 95fd4f3bee873..ee5e6cb93ce00 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/DatabaseTransactionMgr.java @@ -111,6 +111,7 @@ public class DatabaseTransactionMgr { public static final String TXN_TIMEOUT_BY_MANAGER = "timeout by txn manager"; private static final Logger LOG = LogManager.getLogger(DatabaseTransactionMgr.class); + private static final int MEMORY_TXN_SAMPLES = 10; private final TransactionStateListenerFactory stateListenerFactory = new TransactionStateListenerFactory(); private final TransactionLogApplierFactory txnLogApplierFactory = new TransactionLogApplierFactory(); private final GlobalStateMgr globalStateMgr; @@ -2036,4 +2037,25 @@ private void checkDatabaseDataQuota() throws AnalysisException { public void updateDatabaseUsedQuotaData(long usedQuotaDataBytes) { this.usedQuotaDataBytes = usedQuotaDataBytes; } + + public List getSamplesForMemoryTracker() { + readLock(); + try { + if (idToRunningTransactionState.size() > 0) { + return idToRunningTransactionState.values() + .stream() + .limit(MEMORY_TXN_SAMPLES) + .collect(Collectors.toList()); + } + if (idToFinalStatusTransactionState.size() > 0) { + return idToFinalStatusTransactionState.values() + .stream() + .limit(MEMORY_TXN_SAMPLES) + .collect(Collectors.toList()); + } + return new ArrayList<>(); + } finally { + readUnlock(); + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java index f66b2a01e1231..213f4faf9879a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java @@ -747,14 +747,6 @@ public int getTransactionNum() { return txnNum; } - public int getFinishedTransactionNum() { - int txnNum = 0; - for (DatabaseTransactionMgr dbTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { - txnNum += dbTransactionMgr.getFinishedTxnNums(); - } - return txnNum; - } - public TransactionIdGenerator getTransactionIDGenerator() { return this.idGenerator; } @@ -865,7 +857,24 @@ public boolean hasCommittedTxnOnPartition(long dbId, long tableId, long partitio @Override public Map estimateCount() { - return ImmutableMap.of("Txn", (long) getFinishedTransactionNum(), + return ImmutableMap.of("Txn", (long) getTransactionNum(), "TxnCallbackCount", getCallbackFactory().getCallBackCnt()); } + + @Override + public List, Long>> getSamples() { + List txnSamples = new ArrayList<>(); + for (DatabaseTransactionMgr mgr : dbIdToDatabaseTransactionMgrs.values()) { + List samples = mgr.getSamplesForMemoryTracker(); + if (samples.size() > 0) { + txnSamples.addAll(samples); + break; + } + } + + List callbackSamples = callbackFactory.getSamplesForMemoryTracker(); + + return Lists.newArrayList(Pair.create(txnSamples, (long) getTransactionNum()), + Pair.create(callbackSamples, callbackFactory.getCallBackCnt())); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/TxnStateCallbackFactory.java b/fe/fe-core/src/main/java/com/starrocks/transaction/TxnStateCallbackFactory.java index c36d5f3c2f175..94388ed7b2f3f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/TxnStateCallbackFactory.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/TxnStateCallbackFactory.java @@ -21,11 +21,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; // saves all TxnStateChangeListeners public class TxnStateCallbackFactory { private static final Logger LOG = LogManager.getLogger(TxnStateCallbackFactory.class); + private static final int MEMORY_CALLBACK_SAMPLES = 30; private Map callbacks = Maps.newHashMap(); @@ -53,4 +56,11 @@ public synchronized TxnStateChangeCallback getCallback(long id) { public synchronized long getCallBackCnt() { return callbacks.size(); } + + public synchronized List getSamplesForMemoryTracker() { + return callbacks.values() + .stream() + .limit(MEMORY_CALLBACK_SAMPLES) + .collect(Collectors.toList()); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/delta/CachingDeltaLakeMetastoreTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/delta/CachingDeltaLakeMetastoreTest.java index c1be6867f1267..bacce9f75788f 100644 --- a/fe/fe-core/src/test/java/com/starrocks/connector/delta/CachingDeltaLakeMetastoreTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/connector/delta/CachingDeltaLakeMetastoreTest.java @@ -161,4 +161,29 @@ public DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName, Strin Assert.fail(); } } + + @Test + public void testCacheMemoryUsage() { + new MockUp() { + @mockit.Mock + public DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName, String tblName, String path, + Engine deltaEngine, long createTime) { + return new DeltaLakeTable(1, "delta0", "db1", "table1", + Lists.newArrayList(), Lists.newArrayList("ts"), null, + "s3://bucket/path/to/table", null, 0); + } + }; + + CachingDeltaLakeMetastore cachingDeltaLakeMetastore = + CachingDeltaLakeMetastore.createCatalogLevelInstance(metastore, executor, expireAfterWriteSec, + refreshAfterWriteSec, 100); + + cachingDeltaLakeMetastore.getDb("db1"); + cachingDeltaLakeMetastore.getTable("db1", "table1"); + + Assert.assertTrue(cachingDeltaLakeMetastore.estimateSize() > 0); + Assert.assertFalse(cachingDeltaLakeMetastore.estimateCount().isEmpty()); + Assert.assertTrue(cachingDeltaLakeMetastore.estimateCount().containsKey("databaseCache")); + Assert.assertTrue(cachingDeltaLakeMetastore.estimateCount().containsKey("tableCache")); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaLakeConnectorTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaLakeConnectorTest.java index e710a47b32aed..a1ecb0040efdf 100644 --- a/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaLakeConnectorTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaLakeConnectorTest.java @@ -15,6 +15,7 @@ package com.starrocks.connector.delta; import com.google.common.collect.ImmutableMap; +import com.starrocks.connector.CatalogConnector; import com.starrocks.connector.ConnectorContext; import com.starrocks.connector.ConnectorFactory; import com.starrocks.connector.ConnectorMetadata; @@ -72,4 +73,15 @@ public void testCreateDeltaLakeConnectorWithException2() { "is not supported.", e.getMessage()); } } + + @Test + public void testDeltaLakeConnectorMemUsage() { + Map properties = ImmutableMap.of("type", "deltalake", + "hive.metastore.type", "hive", "hive.metastore.uris", "thrift://localhost:9083"); + CatalogConnector catalogConnector = ConnectorFactory.createConnector( + new ConnectorContext("delta0", "deltalake", properties), false); + Assert.assertTrue(catalogConnector.supportMemoryTrack()); + Assert.assertEquals(0, catalogConnector.estimateSize()); + Assert.assertEquals(4, catalogConnector.estimateCount().size()); + } }