From 2ea5aab986dbc35084c3e45f0cb4208a9cffdf81 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Thu, 8 Aug 2024 17:25:29 +0800 Subject: [PATCH] [improvement](statistics)Return -1 when external table row count is unknown. (#38990) Return -1 when external table row count is unknown. Don't cache any row count value when loading row count for external table get exception. --- .../datasource/ExternalRowCountCache.java | 37 +++++----- .../doris/datasource/ExternalTable.java | 12 +-- .../datasource/hive/HMSExternalTable.java | 3 +- .../datasource/iceberg/IcebergUtils.java | 28 ++++--- .../paimon/PaimonExternalTable.java | 27 +++---- .../datasource/ExternalRowCountCacheTest.java | 74 +++++++++++++++++++ .../iceberg/test_iceberg_table_stats.groovy | 2 +- .../paimon/test_paimon_table_stats.groovy | 2 +- 8 files changed, 126 insertions(+), 59 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java index 44aecbca1d556c..4602c594571f56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java @@ -86,9 +86,17 @@ protected Optional doLoad(RowCountKey rowCountKey) { TableIf table = StatisticsUtil.findTable(rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId); return Optional.of(table.fetchRowCount()); } catch (Exception e) { - LOG.warn("Failed to get table with catalogId {}, dbId {}, tableId {}", rowCountKey.catalogId, - rowCountKey.dbId, rowCountKey.tableId); - return Optional.empty(); + LOG.warn("Failed to get table row count with catalogId {}, dbId {}, tableId {}. Reason {}", + rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId, e.getMessage()); + LOG.debug(e); + // Return Optional.empty() will cache this empty value in memory, + // so we can't try to load the row count until the cache expire. + // Throw an exception here will cause too much stack log in fe.out. + // So we return null when exception happen. + // Null may raise NPE in caller, but that is expected. + // We catch that NPE and return a default value -1 without keep the value in cache, + // so we can trigger the load function to fetch row count again next time in this exception case. + return null; } } } @@ -96,46 +104,39 @@ protected Optional doLoad(RowCountKey rowCountKey) { /** * Get cached row count for the given table. Return 0 if cached not loaded or table not exists. * Cached will be loaded async. - * @param catalogId - * @param dbId - * @param tableId - * @return Cached row count or 0 if not exist + * @return Cached row count or -1 if not exist */ public long getCachedRowCount(long catalogId, long dbId, long tableId) { RowCountKey key = new RowCountKey(catalogId, dbId, tableId); try { CompletableFuture> f = rowCountCache.get(key); if (f.isDone()) { - return f.get().orElse(0L); + return f.get().orElse(-1L); } } catch (Exception e) { LOG.warn("Unexpected exception while returning row count", e); } - return 0; + return -1; } /** - * Get cached row count for the given table if present. Return 0 if cached not loaded. + * Get cached row count for the given table if present. Return -1 if cached not loaded. * This method will not trigger async loading if cache is missing. - * - * @param catalogId - * @param dbId - * @param tableId - * @return + * @return Cached row count or -1 if not exist */ public long getCachedRowCountIfPresent(long catalogId, long dbId, long tableId) { RowCountKey key = new RowCountKey(catalogId, dbId, tableId); try { CompletableFuture> f = rowCountCache.getIfPresent(key); if (f == null) { - return 0; + return -1; } else if (f.isDone()) { - return f.get().orElse(0L); + return f.get().orElse(-1L); } } catch (Exception e) { LOG.warn("Unexpected exception while returning row count if present", e); } - return 0; + return -1; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 499e9195ec4694..2d5689a54687c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -187,13 +187,13 @@ public String getMysqlType() { @Override public long getRowCount() { - // Return 0 if makeSureInitialized throw exception. + // Return -1 if makeSureInitialized throw exception. // For example, init hive table may throw NotSupportedException. try { makeSureInitialized(); } catch (Exception e) { LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e); - return 0; + return -1; } // All external table should get external row count from cache. return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id); @@ -201,24 +201,24 @@ public long getRowCount() { @Override public long getCachedRowCount() { - // Return 0 if makeSureInitialized throw exception. + // Return -1 if makeSureInitialized throw exception. // For example, init hive table may throw NotSupportedException. try { makeSureInitialized(); } catch (Exception e) { LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e); - return 0; + return -1; } return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id); } @Override /** - * Default return 0. Subclass need to implement this interface. + * Default return -1. Subclass need to implement this interface. * This is called by ExternalRowCountCache to load row count cache. */ public long fetchRowCount() { - return 0; + return -1; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 5692f61df0ca5f..6752ae5dcbe606 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -320,7 +320,7 @@ public long getCreateTime() { } private long getRowCountFromExternalSource() { - long rowCount; + long rowCount = -1; switch (dlaType) { case HIVE: rowCount = StatisticsUtil.getHiveRowCount(this); @@ -332,7 +332,6 @@ private long getRowCountFromExternalSource() { if (LOG.isDebugEnabled()) { LOG.debug("getRowCount for dlaType {} is not supported.", dlaType); } - rowCount = -1; } return rowCount; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 512e6a3ee93087..f7280f5721f79d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -592,22 +592,20 @@ public static List getSchema(ExternalCatalog catalog, String dbName, Str * @return estimated row count */ public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, String tbName) { - try { - Table icebergTable = Env.getCurrentEnv() - .getExtMetaCacheMgr() - .getIcebergMetadataCache() - .getIcebergTable(catalog, dbName, tbName); - Snapshot snapshot = icebergTable.currentSnapshot(); - if (snapshot == null) { - // empty table - return 0; - } - Map summary = snapshot.summary(); - return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); - } catch (Exception e) { - LOG.warn("Fail to collect row count for db {} table {}", dbName, tbName, e); + // the table may be null when the iceberg metadata cache is not loaded.But I don't think it's a problem, + // because the NPE would be caught in the caller and return the default value -1. + // Meanwhile, it will trigger iceberg metadata cache to load the table, so we can get it next time. + Table icebergTable = Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getIcebergMetadataCache() + .getIcebergTable(catalog, dbName, tbName); + Snapshot snapshot = icebergTable.currentSnapshot(); + if (snapshot == null) { + // empty table + return 0; } - return -1; + Map summary = snapshot.summary(); + return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index c9579d03e94086..a3406dcbb57a99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -187,22 +187,17 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { @Override public long fetchRowCount() { makeSureInitialized(); - try { - long rowCount = 0; - Optional schemaCacheValue = getSchemaCacheValue(); - Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) - .orElse(null); - if (paimonTable == null) { - return -1; - } - List splits = paimonTable.newReadBuilder().newScan().plan().splits(); - for (Split split : splits) { - rowCount += split.rowCount(); - } - return rowCount; - } catch (Exception e) { - LOG.warn("Fail to collect row count for db {} table {}", dbName, name, e); + long rowCount = 0; + Optional schemaCacheValue = getSchemaCacheValue(); + Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) + .orElse(null); + if (paimonTable == null) { + return -1; + } + List splits = paimonTable.newReadBuilder().newScan().plan().splits(); + for (Split split : splits) { + rowCount += split.rowCount(); } - return -1; + return rowCount; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java new file mode 100644 index 00000000000000..10b6b01527cb98 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.ThreadPoolManager; + +import mockit.Mock; +import mockit.MockUp; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + +public class ExternalRowCountCacheTest { + @Test + public void testLoadWithException() throws Exception { + ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool( + 1, Integer.MAX_VALUE, "TEST", true); + AtomicInteger counter = new AtomicInteger(0); + + new MockUp() { + @Mock + protected Optional doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + return null; + } + }; + ExternalRowCountCache cache = new ExternalRowCountCache(executor); + long cachedRowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(-1, cachedRowCount); + for (int i = 0; i < 60; i++) { + if (counter.get() == 1) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(1, counter.get()); + + new MockUp() { + @Mock + protected Optional doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + return Optional.of(100L); + } + }; + cache.getCachedRowCount(1, 1, 1); + for (int i = 0; i < 60; i++) { + cachedRowCount = cache.getCachedRowCount(1, 1, 1); + if (cachedRowCount != -1) { + Assertions.assertEquals(100, cachedRowCount); + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(2, counter.get()); + } +} diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_stats.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_stats.groovy index b8eacf6d9e24e7..064139d22c9dea 100644 --- a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_stats.groovy +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_stats.groovy @@ -42,7 +42,7 @@ suite("test_iceberg_table_stats", "p0,external,doris,external_docker,external_do while (retry < 10) { def result = sql """ show table stats ${table_name} """ act = result[0][2] - if (act != "0") { + if (act != "-1") { break; } Thread.sleep(2000) diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_table_stats.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_table_stats.groovy index a747eea67e7467..bae158f66ee5c2 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_table_stats.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_table_stats.groovy @@ -36,7 +36,7 @@ suite("test_paimon_table_stats", "p0,external,doris,external_docker,external_doc while (retry < 10) { def result = sql """ show table stats ${table_name} """ act = result[0][2] - if (act != "0") { + if (act != "-1") { break; } Thread.sleep(2000)