diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index 04ac28d8e6..02c6d938ff 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -288,6 +288,18 @@ public class AmoroManagementConf { .withDescription( "The number of hours that self-optimizing runtime data expire interval."); + public static final ConfigOption OVERVIEW_CACHE_REFRESH_INTERVAL = + ConfigOptions.key("overview-cache.refresh-interval") + .durationType() + .defaultValue(Duration.ofSeconds(180)) + .withDescription("Interval for refreshing overview cache."); + + public static final ConfigOption OVERVIEW_CACHE_MAX_SIZE = + ConfigOptions.key("overview-cache.max-size") + .intType() + .defaultValue(3360) + .withDescription("Max size of overview cache."); + public static final ConfigOption DB_TYPE = ConfigOptions.key("database.type") .stringType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java index d21c838135..acb13e55f4 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java @@ -117,7 +117,8 @@ public DashboardServer( new TableController(catalogManager, tableManager, tableDescriptor, serviceConfig); this.terminalController = new TerminalController(terminalManager); this.versionController = new VersionController(); - this.overviewController = new OverviewController(); + OverviewManager manager = new OverviewManager(serviceConfig); + this.overviewController = new OverviewController(manager); this.authType = serviceConfig.get(AmoroManagementConf.HTTP_SERVER_REST_AUTH_TYPE); this.basicAuthUser = serviceConfig.get(AmoroManagementConf.ADMIN_USERNAME); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewCache.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewCache.java deleted file mode 100644 index 50994b541a..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewCache.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.dashboard; - -import static org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_COMMITTING_TABLES; -import static org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_EXECUTING_TABLES; -import static org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_IDLE_TABLES; -import static org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_MEMORY_BYTES_ALLOCATED; -import static org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_PENDING_TABLES; -import static org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_PLANING_TABLES; -import static org.apache.amoro.server.optimizing.OptimizerGroupMetrics.OPTIMIZER_GROUP_THREADS; -import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_HEALTH_SCORE; -import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_TOTAL_FILES; -import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_TOTAL_FILES_SIZE; - -import org.apache.amoro.metrics.Counter; -import org.apache.amoro.metrics.Gauge; -import org.apache.amoro.metrics.Metric; -import org.apache.amoro.metrics.MetricDefine; -import org.apache.amoro.metrics.MetricKey; -import org.apache.amoro.metrics.MetricSet; -import org.apache.amoro.server.dashboard.model.OverviewDataSizeItem; -import org.apache.amoro.server.dashboard.model.OverviewResourceUsageItem; -import org.apache.amoro.server.dashboard.model.OverviewTopTableItem; -import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; -import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; -import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Deque; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; -import java.util.stream.Collectors; - -public class OverviewCache { - - public static final String STATUS_PENDING = "Pending"; - public static final String STATUS_PLANING = "Planing"; - public static final String STATUS_EXECUTING = "Executing"; - public static final String STATUS_IDLE = "Idle"; - public static final String STATUS_COMMITTING = "Committing"; - - private static final Logger LOG = LoggerFactory.getLogger(OverviewCache.class); - - private volatile List allTopTableItem = new ArrayList<>(); - private final Map optimizingStatusCountMap = new ConcurrentHashMap<>(); - private final ConcurrentLinkedDeque resourceUsageHistory = - new ConcurrentLinkedDeque<>(); - private final ConcurrentLinkedDeque dataSizeHistory = - new ConcurrentLinkedDeque<>(); - private final AtomicInteger totalCatalog = new AtomicInteger(); - private final AtomicLong totalDataSize = new AtomicLong(); - private final AtomicInteger totalTableCount = new AtomicInteger(); - private final AtomicInteger totalCpu = new AtomicInteger(); - private final AtomicLong totalMemory = new AtomicLong(); - - private MetricSet metricSet; - private int maxRecordCount; - private static volatile OverviewCache INSTANCE; - - private OverviewCache() {} - - /** @return Get the singleton object. */ - public static OverviewCache getInstance() { - if (INSTANCE == null) { - synchronized (OverviewCache.class) { - if (INSTANCE == null) { - INSTANCE = new OverviewCache(); - } - } - } - return INSTANCE; - } - - public void initialize(int maxRecordCount, MetricSet globalMetricSet) { - this.maxRecordCount = maxRecordCount; - this.metricSet = globalMetricSet; - } - - public List getAllTopTableItem() { - return ImmutableList.copyOf(allTopTableItem); - } - - public int getTotalCatalog() { - return totalCatalog.get(); - } - - public int getTotalTableCount() { - return totalTableCount.get(); - } - - public long getTotalDataSize() { - return totalDataSize.get(); - } - - public int getTotalCpu() { - return totalCpu.get(); - } - - public long getTotalMemory() { - return totalMemory.get(); - } - - public List getResourceUsageHistory(long startTime) { - return resourceUsageHistory.stream() - .filter(item -> item.getTs() >= startTime) - .collect(Collectors.toList()); - } - - public List getDataSizeHistory(long startTime) { - return dataSizeHistory.stream() - .filter(item -> item.getTs() >= startTime) - .collect(Collectors.toList()); - } - - public Map getOptimizingStatus() { - return optimizingStatusCountMap; - } - - public void refresh() { - long start = System.currentTimeMillis(); - LOG.info("Updating overview cache"); - try { - // Already registered metrics may change, - // so the metricDefineMap needs to be recalculated at each refresh - Map> metricDefineMap = - metricSet.getMetrics().keySet().stream() - .collect( - Collectors.groupingBy( - MetricKey::getDefine, - Collectors.mapping(Function.identity(), Collectors.toList()))); - - updateResourceUsage(start, metricDefineMap); - updateTableDetail(start, metricDefineMap); - updateOptimizingStatus(metricDefineMap); - - } catch (Exception e) { - LOG.error("OverviewRefresher error", e); - } finally { - long end = System.currentTimeMillis(); - LOG.info("Refresher overview cache took {} ms.", end - start); - } - } - - private void updateResourceUsage(long ts, Map> metricDefineMap) { - int optimizerGroupThreadCount = - (int) sumMetricValuesByDefine(metricDefineMap, OPTIMIZER_GROUP_THREADS); - long optimizerGroupMemoryBytes = - sumMetricValuesByDefine(metricDefineMap, OPTIMIZER_GROUP_MEMORY_BYTES_ALLOCATED); - - this.totalCpu.set(optimizerGroupThreadCount); - this.totalMemory.set(optimizerGroupMemoryBytes); - addAndCheck( - new OverviewResourceUsageItem(ts, optimizerGroupThreadCount, optimizerGroupMemoryBytes)); - } - - private void updateTableDetail(long ts, Map> metricDefineMap) { - Map topTableItemMap = Maps.newHashMap(); - Set allCatalogs = Sets.newHashSet(); - Map registeredMetrics = metricSet.getMetrics(); - long totalTableSize = 0L; - - // table size - List metricKeys = - metricDefineMap.getOrDefault(TABLE_SUMMARY_TOTAL_FILES_SIZE, ImmutableList.of()); - for (MetricKey metricKey : metricKeys) { - String tableName = fullTableName(metricKey); - allCatalogs.add(catalog(metricKey)); - OverviewTopTableItem tableItem = - topTableItemMap.computeIfAbsent(tableName, ignore -> new OverviewTopTableItem(tableName)); - long tableSize = covertValue(registeredMetrics.get(metricKey)); - tableItem.setTableSize(tableSize); - totalTableSize += tableSize; - } - - // file count - metricKeys = metricDefineMap.getOrDefault(TABLE_SUMMARY_TOTAL_FILES, ImmutableList.of()); - for (MetricKey metricKey : metricKeys) { - String tableName = fullTableName(metricKey); - allCatalogs.add(catalog(metricKey)); - OverviewTopTableItem tableItem = - topTableItemMap.computeIfAbsent(tableName, ignore -> new OverviewTopTableItem(tableName)); - int fileCount = (int) covertValue(registeredMetrics.get(metricKey)); - tableItem.setFileCount(fileCount); - tableItem.setAverageFileSize(fileCount == 0 ? 0 : tableItem.getTableSize() / fileCount); - } - - // health score - metricKeys = metricDefineMap.getOrDefault(TABLE_SUMMARY_HEALTH_SCORE, ImmutableList.of()); - for (MetricKey metricKey : metricKeys) { - String tableName = fullTableName(metricKey); - allCatalogs.add(catalog(metricKey)); - OverviewTopTableItem tableItem = - topTableItemMap.computeIfAbsent(tableName, ignore -> new OverviewTopTableItem(tableName)); - int healthScore = (int) covertValue(registeredMetrics.get(metricKey)); - tableItem.setHealthScore(healthScore); - } - - this.totalDataSize.set(totalTableSize); - this.totalCatalog.set(allCatalogs.size()); - this.totalTableCount.set(topTableItemMap.size()); - this.allTopTableItem = new ArrayList<>(topTableItemMap.values()); - addAndCheck(new OverviewDataSizeItem(ts, totalTableSize)); - } - - private void updateOptimizingStatus(Map> metricDefineMap) { - optimizingStatusCountMap.put( - STATUS_PENDING, sumMetricValuesByDefine(metricDefineMap, OPTIMIZER_GROUP_PENDING_TABLES)); - optimizingStatusCountMap.put( - STATUS_PLANING, sumMetricValuesByDefine(metricDefineMap, OPTIMIZER_GROUP_PLANING_TABLES)); - optimizingStatusCountMap.put( - STATUS_EXECUTING, - sumMetricValuesByDefine(metricDefineMap, OPTIMIZER_GROUP_EXECUTING_TABLES)); - optimizingStatusCountMap.put( - STATUS_IDLE, sumMetricValuesByDefine(metricDefineMap, OPTIMIZER_GROUP_IDLE_TABLES)); - optimizingStatusCountMap.put( - STATUS_COMMITTING, - sumMetricValuesByDefine(metricDefineMap, OPTIMIZER_GROUP_COMMITTING_TABLES)); - } - - private void addAndCheck(OverviewDataSizeItem dataSizeItem) { - dataSizeHistory.add(dataSizeItem); - checkSize(dataSizeHistory); - } - - private void addAndCheck(OverviewResourceUsageItem resourceUsageItem) { - resourceUsageHistory.add(resourceUsageItem); - checkSize(resourceUsageHistory); - } - - private void checkSize(Deque deque) { - if (deque.size() > maxRecordCount) { - deque.poll(); - } - } - - private String fullTableName(MetricKey metricKey) { - return catalog(metricKey) - .concat(".") - .concat(database(metricKey)) - .concat(".") - .concat(table(metricKey)); - } - - private String catalog(MetricKey metricKey) { - return metricKey.valueOfTag("catalog"); - } - - private String database(MetricKey metricKey) { - return metricKey.valueOfTag("database"); - } - - private String table(MetricKey metricKey) { - return metricKey.valueOfTag("table"); - } - - private long sumMetricValuesByDefine( - Map> metricDefineMap, MetricDefine metricDefine) { - List metricKeys = metricDefineMap.get(metricDefine); - if ((metricKeys == null)) { - return 0; - } - return metricKeys.stream() - .map(metricKey -> covertValue(metricSet.getMetrics().get(metricKey))) - .mapToLong(Long::longValue) - .sum(); - } - - private long covertValue(Metric metric) { - if (metric instanceof Counter) { - return ((Counter) metric).getCount(); - } else if (metric instanceof Gauge) { - return ((Gauge) metric).getValue().longValue(); - } else { - throw new IllegalStateException( - "unknown metric implement class:" + metric.getClass().getName()); - } - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewManager.java new file mode 100644 index 0000000000..3de5adabcf --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewManager.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.dashboard; + +import org.apache.amoro.config.Configurations; +import org.apache.amoro.server.AmoroManagementConf; +import org.apache.amoro.server.dashboard.model.OverviewDataSizeItem; +import org.apache.amoro.server.dashboard.model.OverviewResourceUsageItem; +import org.apache.amoro.server.dashboard.model.OverviewTopTableItem; +import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.TableRuntimeMeta; +import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper; +import org.apache.amoro.server.persistence.mapper.OptimizerMapper; +import org.apache.amoro.server.persistence.mapper.TableMetaMapper; +import org.apache.amoro.server.resource.OptimizerInstance; +import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; +import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +public class OverviewManager extends PersistentBase { + + public static final String STATUS_PENDING = "Pending"; + public static final String STATUS_PLANING = "Planing"; + public static final String STATUS_EXECUTING = "Executing"; + public static final String STATUS_IDLE = "Idle"; + public static final String STATUS_COMMITTING = "Committing"; + + private static final Logger LOG = LoggerFactory.getLogger(OverviewManager.class); + private final List allTopTableItem = new ArrayList<>(); + private final Map optimizingStatusCountMap = new ConcurrentHashMap<>(); + private final ConcurrentLinkedDeque resourceUsageHistory = + new ConcurrentLinkedDeque<>(); + private final ConcurrentLinkedDeque dataSizeHistory = + new ConcurrentLinkedDeque<>(); + private final AtomicInteger totalCatalog = new AtomicInteger(); + private final AtomicLong totalDataSize = new AtomicLong(); + private final AtomicInteger totalTableCount = new AtomicInteger(); + private final AtomicInteger totalCpu = new AtomicInteger(); + private final AtomicLong totalMemory = new AtomicLong(); + + private final int maxRecordCount; + + public OverviewManager(Configurations serverConfigs) { + this( + serverConfigs.getInteger(AmoroManagementConf.OVERVIEW_CACHE_MAX_SIZE), + serverConfigs.get(AmoroManagementConf.OVERVIEW_CACHE_REFRESH_INTERVAL)); + } + + @VisibleForTesting + public OverviewManager(int maxRecordCount, Duration refreshInterval) { + this.maxRecordCount = maxRecordCount; + ScheduledExecutorService overviewUpdaterScheduler = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("overview-refresh-scheduler-%d") + .setDaemon(true) + .build()); + resetStatusMap(); + + if (refreshInterval.toMillis() > 0) { + overviewUpdaterScheduler.scheduleAtFixedRate( + this::refresh, 1000L, refreshInterval.toMillis(), TimeUnit.MILLISECONDS); + } + } + + public List getAllTopTableItem() { + return ImmutableList.copyOf(allTopTableItem); + } + + public int getTotalCatalog() { + return totalCatalog.get(); + } + + public int getTotalTableCount() { + return totalTableCount.get(); + } + + public long getTotalDataSize() { + return totalDataSize.get(); + } + + public int getTotalCpu() { + return totalCpu.get(); + } + + public long getTotalMemory() { + return totalMemory.get(); + } + + public List getResourceUsageHistory(long startTime) { + return resourceUsageHistory.stream() + .filter(item -> item.getTs() >= startTime) + .collect(Collectors.toList()); + } + + public List getDataSizeHistory(long startTime) { + return dataSizeHistory.stream() + .filter(item -> item.getTs() >= startTime) + .collect(Collectors.toList()); + } + + public Map getOptimizingStatus() { + return optimizingStatusCountMap; + } + + @VisibleForTesting + public void refresh() { + long start = System.currentTimeMillis(); + LOG.info("Refreshing overview cache"); + try { + refreshTableCache(start); + refreshResourceUsage(start); + + } catch (Exception e) { + LOG.error("Refreshed overview cache failed", e); + } finally { + long end = System.currentTimeMillis(); + LOG.info("Refreshed overview cache in {} ms.", end - start); + } + } + + private void refreshTableCache(long ts) { + int totalCatalogs = getAs(CatalogMetaMapper.class, CatalogMetaMapper::selectCatalogCount); + + List metas = + getAs(TableMetaMapper.class, TableMetaMapper::selectTableRuntimeMetas); + AtomicLong totalDataSize = new AtomicLong(); + AtomicInteger totalFileCounts = new AtomicInteger(); + Map topTableItemMap = Maps.newHashMap(); + Map optimizingStatusMap = Maps.newHashMap(); + for (TableRuntimeMeta meta : metas) { + Optional optItem = toTopTableItem(meta); + optItem.ifPresent( + tableItem -> { + topTableItemMap.put(tableItem.getTableName(), tableItem); + totalDataSize.addAndGet(tableItem.getTableSize()); + totalFileCounts.addAndGet(tableItem.getFileCount()); + }); + String status = statusToMetricString(meta.getTableStatus()); + if (StringUtils.isNotEmpty(status)) { + optimizingStatusMap.putIfAbsent(status, 0L); + optimizingStatusMap.computeIfPresent(status, (k, v) -> v + 1); + } + } + + this.totalCatalog.set(totalCatalogs); + this.totalTableCount.set(topTableItemMap.size()); + this.totalDataSize.set(totalDataSize.get()); + this.allTopTableItem.clear(); + this.allTopTableItem.addAll(topTableItemMap.values()); + addAndCheck(new OverviewDataSizeItem(ts, this.totalDataSize.get())); + resetStatusMap(); + this.optimizingStatusCountMap.putAll(optimizingStatusMap); + } + + private Optional toTopTableItem(TableRuntimeMeta meta) { + if (meta == null) { + return Optional.empty(); + } + OverviewTopTableItem tableItem = new OverviewTopTableItem(fullTableName(meta)); + if (meta.getPendingInput() != null) { + tableItem.setTableSize(meta.getPendingInput().getTotalFileSize()); + tableItem.setFileCount(meta.getPendingInput().getTotalFileCount()); + tableItem.setHealthScore(meta.getPendingInput().getHealthScore()); + } + tableItem.setAverageFileSize( + tableItem.getFileCount() == 0 ? 0 : tableItem.getTableSize() / tableItem.getFileCount()); + return Optional.of(tableItem); + } + + private String statusToMetricString(OptimizingStatus status) { + if (status == null) { + return null; + } + switch (status) { + case PENDING: + return STATUS_PENDING; + case PLANNING: + return STATUS_PLANING; + case MINOR_OPTIMIZING: + case MAJOR_OPTIMIZING: + case FULL_OPTIMIZING: + return STATUS_EXECUTING; + case IDLE: + return STATUS_IDLE; + case COMMITTING: + return STATUS_COMMITTING; + default: + return null; + } + } + + private void resetStatusMap() { + optimizingStatusCountMap.clear(); + optimizingStatusCountMap.put(STATUS_PENDING, 0L); + optimizingStatusCountMap.put(STATUS_PLANING, 0L); + optimizingStatusCountMap.put(STATUS_EXECUTING, 0L); + optimizingStatusCountMap.put(STATUS_IDLE, 0L); + optimizingStatusCountMap.put(STATUS_COMMITTING, 0L); + } + + private void refreshResourceUsage(long ts) { + List instances = getAs(OptimizerMapper.class, OptimizerMapper::selectAll); + AtomicInteger cpuCount = new AtomicInteger(); + AtomicLong memoryBytes = new AtomicLong(); + for (OptimizerInstance instance : instances) { + cpuCount.addAndGet(instance.getThreadCount()); + memoryBytes.addAndGet(instance.getMemoryMb() * 1024L * 1024L); + } + this.totalCpu.set(cpuCount.get()); + this.totalMemory.set(memoryBytes.get()); + addAndCheck(new OverviewResourceUsageItem(ts, cpuCount.get(), memoryBytes.get())); + } + + private void addAndCheck(OverviewDataSizeItem dataSizeItem) { + dataSizeHistory.add(dataSizeItem); + checkSize(dataSizeHistory); + } + + private void addAndCheck(OverviewResourceUsageItem resourceUsageItem) { + resourceUsageHistory.add(resourceUsageItem); + checkSize(resourceUsageHistory); + } + + private void checkSize(Deque deque) { + if (deque.size() > maxRecordCount) { + deque.poll(); + } + } + + private String fullTableName(TableRuntimeMeta meta) { + return meta.getCatalogName() + .concat(".") + .concat(meta.getDbName()) + .concat(".") + .concat(meta.getTableName()); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewMetricsReporter.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewMetricsReporter.java deleted file mode 100644 index 42a9facdc0..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/OverviewMetricsReporter.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.dashboard; - -import org.apache.amoro.metrics.MetricReporter; -import org.apache.amoro.metrics.MetricSet; -import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; - -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** Overview exporter */ -public class OverviewMetricsReporter implements MetricReporter { - - public static final String REFRESH_INTERVAL = "refresh-interval"; - public static final String MAX_HISTORY_RECORDS = "max-history-records"; - - private final ScheduledExecutorService overviewUpdaterScheduler = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat("overview-updater-scheduler-%d") - .setDaemon(true) - .build()); - - private long overviewRefreshingInterval; - private int maxRecordCount; - - @Override - public void open(Map properties) { - overviewRefreshingInterval = - Optional.ofNullable(properties.get(REFRESH_INTERVAL)) - .map(Long::valueOf) - .orElseThrow( - () -> new IllegalArgumentException("Lack required property: " + REFRESH_INTERVAL)); - - maxRecordCount = - Optional.ofNullable(properties.get(MAX_HISTORY_RECORDS)) - .map(Integer::valueOf) - .orElseThrow( - () -> - new IllegalArgumentException("Lack required property: " + MAX_HISTORY_RECORDS)); - } - - @Override - public void close() {} - - @Override - public String name() { - return "overview-exporter"; - } - - @Override - public void setGlobalMetricSet(MetricSet globalMetricSet) { - OverviewCache overviewCache = OverviewCache.getInstance(); - overviewCache.initialize(maxRecordCount, globalMetricSet); - overviewUpdaterScheduler.scheduleAtFixedRate( - overviewCache::refresh, 1000L, overviewRefreshingInterval, TimeUnit.MILLISECONDS); - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OverviewController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OverviewController.java index 484cf42bb3..e541de9896 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OverviewController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OverviewController.java @@ -19,7 +19,7 @@ package org.apache.amoro.server.dashboard.controller; import io.javalin.http.Context; -import org.apache.amoro.server.dashboard.OverviewCache; +import org.apache.amoro.server.dashboard.OverviewManager; import org.apache.amoro.server.dashboard.model.OverviewDataSizeItem; import org.apache.amoro.server.dashboard.model.OverviewResourceUsageItem; import org.apache.amoro.server.dashboard.model.OverviewSummary; @@ -38,17 +38,17 @@ /** The controller that handles overview page requests. */ public class OverviewController { - private OverviewCache overviewCache; + private final OverviewManager manager; - public OverviewController() { - this.overviewCache = OverviewCache.getInstance(); + public OverviewController(OverviewManager manager) { + this.manager = manager; } public void getResourceUsageHistory(Context ctx) { String startTime = ctx.queryParam("startTime"); Preconditions.checkArgument(StringUtils.isNumeric(startTime), "invalid startTime!"); List resourceUsageHistory = - overviewCache.getResourceUsageHistory(Long.parseLong(startTime)); + manager.getResourceUsageHistory(Long.parseLong(startTime)); ctx.json(OkResponse.of(resourceUsageHistory)); } @@ -56,7 +56,7 @@ public void getDataSizeHistory(Context ctx) { String startTime = ctx.queryParam("startTime"); Preconditions.checkArgument(StringUtils.isNumeric(startTime), "invalid startTime!"); List dataSizeHistory = - overviewCache.getDataSizeHistory(Long.parseLong(startTime)); + manager.getDataSizeHistory(Long.parseLong(startTime)); ctx.json(OkResponse.of(dataSizeHistory)); } @@ -98,7 +98,7 @@ public void getTopTables(Context ctx) { private List getTopTables( boolean asc, Comparator comparator, int limit) { - return overviewCache.getAllTopTableItem().stream() + return manager.getAllTopTableItem().stream() .sorted( asc ? comparator.thenComparing(OverviewTopTableItem::getTableName) @@ -108,11 +108,11 @@ private List getTopTables( } public void getSummary(Context ctx) { - int totalCatalog = overviewCache.getTotalCatalog(); - int totalTableCount = overviewCache.getTotalTableCount(); - long totalDataSize = overviewCache.getTotalDataSize(); - int totalCpu = overviewCache.getTotalCpu(); - long totalMemory = overviewCache.getTotalMemory(); + int totalCatalog = manager.getTotalCatalog(); + int totalTableCount = manager.getTotalTableCount(); + long totalDataSize = manager.getTotalDataSize(); + int totalCpu = manager.getTotalCpu(); + long totalMemory = manager.getTotalMemory(); OverviewSummary overviewSummary = new OverviewSummary(totalCatalog, totalTableCount, totalDataSize, totalCpu, totalMemory); @@ -120,7 +120,7 @@ public void getSummary(Context ctx) { } public void getOptimizingStatus(Context ctx) { - Map optimizingStatus = overviewCache.getOptimizingStatus(); + Map optimizingStatus = manager.getOptimizingStatus(); List> optimizingStatusList = optimizingStatus.entrySet().stream() .map(status -> ImmutableMap.of("name", status.getKey(), "value", status.getValue())) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/OverviewTopTableItem.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/OverviewTopTableItem.java index 2fffc9b8ea..64e5fa86cf 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/OverviewTopTableItem.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/OverviewTopTableItem.java @@ -25,7 +25,7 @@ public class OverviewTopTableItem { private long tableSize; private int fileCount; private long averageFileSize; - private int healthScore; + private int healthScore = -1; public OverviewTopTableItem() {} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java index ee62866aef..4058002b2c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/CatalogMetaMapper.java @@ -114,6 +114,9 @@ public interface CatalogMetaMapper { @Select("SELECT database_count FROM " + TABLE_NAME + " WHERE catalog_name = #{catalogName}") Integer selectDatabaseCount(@Param("catalogName") String catalogName); + @Select("SELECT COUNT(*) FROM " + TABLE_NAME) + Integer selectCatalogCount(); + @Update( "UPDATE " + TABLE_NAME diff --git a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.metrics.MetricReporter b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.metrics.MetricReporter index 57053ec933..01d7057208 100644 --- a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.metrics.MetricReporter +++ b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.metrics.MetricReporter @@ -16,4 +16,3 @@ # limitations under the License. # -org.apache.amoro.server.dashboard.OverviewMetricsReporter \ No newline at end of file diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java similarity index 62% rename from amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java rename to amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java index 5ac3c85c3d..acffaa942b 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewCache.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestOverviewManager.java @@ -18,11 +18,11 @@ package org.apache.amoro.server.dashboard; -import static org.apache.amoro.server.dashboard.OverviewCache.STATUS_COMMITTING; -import static org.apache.amoro.server.dashboard.OverviewCache.STATUS_EXECUTING; -import static org.apache.amoro.server.dashboard.OverviewCache.STATUS_IDLE; -import static org.apache.amoro.server.dashboard.OverviewCache.STATUS_PENDING; -import static org.apache.amoro.server.dashboard.OverviewCache.STATUS_PLANING; +import static org.apache.amoro.server.dashboard.OverviewManager.STATUS_COMMITTING; +import static org.apache.amoro.server.dashboard.OverviewManager.STATUS_EXECUTING; +import static org.apache.amoro.server.dashboard.OverviewManager.STATUS_IDLE; +import static org.apache.amoro.server.dashboard.OverviewManager.STATUS_PENDING; +import static org.apache.amoro.server.dashboard.OverviewManager.STATUS_PLANING; import org.apache.amoro.BasicTableTestHelper; import org.apache.amoro.TableFormat; @@ -31,7 +31,6 @@ import org.apache.amoro.catalog.CatalogTestHelper; import org.apache.amoro.io.MixedDataTestHelpers; import org.apache.amoro.server.dashboard.model.OverviewTopTableItem; -import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.table.AMSTableTestBase; import org.apache.amoro.server.table.TableRuntime; import org.apache.amoro.server.table.executor.TableRuntimeRefreshExecutor; @@ -48,13 +47,14 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.time.Duration; import java.util.ArrayList; import java.util.List; @RunWith(Parameterized.class) -public class TestOverviewCache extends AMSTableTestBase { +public class TestOverviewManager extends AMSTableTestBase { - private OverviewCache overviewCache; + private OverviewManager overviewManager; @Parameterized.Parameters(name = "{0}, {1}") public static Object[] parameters() { @@ -63,7 +63,7 @@ public static Object[] parameters() { }; } - public TestOverviewCache(CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { + public TestOverviewManager(CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { super(catalogTestHelper, tableTestHelper, false); } @@ -71,9 +71,8 @@ public TestOverviewCache(CatalogTestHelper catalogTestHelper, TableTestHelper ta public void prepare() { createDatabase(); createTable(); - this.overviewCache = OverviewCache.getInstance(); - this.overviewCache.initialize(10, MetricManager.getInstance().getGlobalRegistry()); - this.overviewCache.refresh(); + this.overviewManager = new OverviewManager(10, Duration.ofMinutes(0)); + this.overviewManager.refresh(); } @After @@ -117,38 +116,38 @@ void refreshPending() { @Test public void testOverviewCache() { // empty table - Assertions.assertEquals(1, overviewCache.getTotalCatalog()); - Assertions.assertEquals(1, overviewCache.getTotalTableCount()); - Assertions.assertEquals(0, overviewCache.getTotalDataSize()); - Assertions.assertEquals(0, overviewCache.getTotalCpu()); - Assertions.assertEquals(0, overviewCache.getTotalMemory()); - - Assertions.assertEquals(0, overviewCache.getOptimizingStatus().get(STATUS_PENDING)); - Assertions.assertEquals(0, overviewCache.getOptimizingStatus().get(STATUS_COMMITTING)); - Assertions.assertEquals(0, overviewCache.getOptimizingStatus().get(STATUS_EXECUTING)); - Assertions.assertEquals(0, overviewCache.getOptimizingStatus().get(STATUS_PLANING)); - Assertions.assertEquals(1, overviewCache.getOptimizingStatus().get(STATUS_IDLE)); - - Assertions.assertEquals(1, overviewCache.getDataSizeHistory(0).size()); - Assertions.assertEquals(1, overviewCache.getResourceUsageHistory(0).size()); - - List allTopTableItem = overviewCache.getAllTopTableItem(); + Assertions.assertEquals(1, overviewManager.getTotalCatalog()); + Assertions.assertEquals(1, overviewManager.getTotalTableCount()); + Assertions.assertEquals(0, overviewManager.getTotalDataSize()); + Assertions.assertEquals(0, overviewManager.getTotalCpu()); + Assertions.assertEquals(0, overviewManager.getTotalMemory()); + + Assertions.assertEquals(0, overviewManager.getOptimizingStatus().get(STATUS_PENDING)); + Assertions.assertEquals(0, overviewManager.getOptimizingStatus().get(STATUS_COMMITTING)); + Assertions.assertEquals(0, overviewManager.getOptimizingStatus().get(STATUS_EXECUTING)); + Assertions.assertEquals(0, overviewManager.getOptimizingStatus().get(STATUS_PLANING)); + Assertions.assertEquals(1, overviewManager.getOptimizingStatus().get(STATUS_IDLE)); + + Assertions.assertEquals(1, overviewManager.getDataSizeHistory(0).size()); + Assertions.assertEquals(1, overviewManager.getResourceUsageHistory(0).size()); + + List allTopTableItem = overviewManager.getAllTopTableItem(); Assertions.assertEquals(1, allTopTableItem.size()); Assertions.assertEquals(-1, allTopTableItem.get(0).getHealthScore()); // insert data initTableWithFiles(); refreshPending(); - overviewCache.refresh(); + overviewManager.refresh(); - Assertions.assertTrue(overviewCache.getTotalDataSize() > 0); + Assertions.assertTrue(overviewManager.getTotalDataSize() > 0); - Assertions.assertEquals(1, overviewCache.getOptimizingStatus().get(STATUS_PENDING)); - Assertions.assertEquals(0, overviewCache.getOptimizingStatus().get(STATUS_IDLE)); + Assertions.assertEquals(1, overviewManager.getOptimizingStatus().get(STATUS_PENDING)); + Assertions.assertEquals(0, overviewManager.getOptimizingStatus().get(STATUS_IDLE)); - Assertions.assertEquals(2, overviewCache.getDataSizeHistory(0).size()); - Assertions.assertEquals(2, overviewCache.getResourceUsageHistory(0).size()); - allTopTableItem = overviewCache.getAllTopTableItem(); + Assertions.assertEquals(2, overviewManager.getDataSizeHistory(0).size()); + Assertions.assertEquals(2, overviewManager.getResourceUsageHistory(0).size()); + allTopTableItem = overviewManager.getAllTopTableItem(); Assertions.assertEquals(100, allTopTableItem.get(0).getHealthScore()); } } diff --git a/dist/src/main/amoro-bin/conf/config.yaml b/dist/src/main/amoro-bin/conf/config.yaml index f3e8796010..a4d6b29f29 100644 --- a/dist/src/main/amoro-bin/conf/config.yaml +++ b/dist/src/main/amoro-bin/conf/config.yaml @@ -99,6 +99,10 @@ ams: identifier: default # Built-in support for default/base64. Defaults to "default", indicating no encryption sensitive-keywords: admin-password;database.password + overview-cache: + refresh-interval: 3m # 3 min + max-size: 3360 # Keep 7 days history by default, 7 * 24 * 60 / 3 = 3360 + database: type: derby jdbc-driver-class: org.apache.derby.jdbc.EmbeddedDriver diff --git a/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml b/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml index 75727df476..40842e8177 100644 --- a/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml @@ -19,12 +19,6 @@ # configurations of metric reporters metric-reporters: - - name: overview-exporter # configs for overview exporter - enabled: true # if false, overview page will not fetch data - properties: - refresh-interval: 180000 # 3 min - max-history-records: 3360 # Keep 7 days history by default, 7 * 24 * 60 / 3 = 3360 - # - name: prometheus-exporter # configs for prometheus exporter # enabled: false # properties: