Skip to content

Commit

Permalink
[Enhancement](ExternalTable)Optimize the performance of getCachedRowC…
Browse files Browse the repository at this point in the history
…ount when reading ExternalTable (apache#41659)

## Proposed changes
Because ExternalTable will initialize the previously uninitialized table
when `getCachedRowCount()`, which is unnecessary. So for the
uninitialized table, we directly return -1.
This will increase the speed of our query `information_schema.tables`.
  • Loading branch information
hubgeter committed Oct 16, 2024
1 parent d1d2a78 commit 3c4425b
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,15 @@ public long getRowCount() {

@Override
public long getCachedRowCount() {
// Return -1 if makeSureInitialized throw exception.
// For example, init hive table may throw NotSupportedException.
try {
makeSureInitialized();
} catch (Exception e) {
LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e);
// Return -1 if uninitialized.
// Before this, for uninitialized tables, we would call makeSureInitialized(), just like the implementation of
// ExternalTable.getRowCount(), but this is not very meaningful and time-consuming.
// The getCachedRowCount() function is only used when `show table` and querying `information_schema.tables`.
if (!isObjectCreated()) {
return -1;
}
// getExtMetaCacheMgr().getRowCountCache().getCachedRowCount() is an asynchronous non-blocking operation.
// For tables that are not in the cache, it will load asynchronously and return -1.
return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
import org.apache.doris.datasource.property.PropertyConverter;
Expand All @@ -53,6 +54,7 @@
import lombok.Getter;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -85,6 +87,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
private int hmsEventsBatchSizePerRpc = -1;
private boolean enableHmsEventsIncrementalSync = false;

//for "type" = "hms" , but is iceberg table.
private HiveCatalog icebergHiveCatalog;

@VisibleForTesting
public HMSExternalCatalog() {
Expand Down Expand Up @@ -195,6 +199,8 @@ protected void initLocalObjectsImpl() {
transactionManager = TransactionManagerFactory.createHiveTransactionManager(hiveOps, fileSystemProvider,
fileSystemExecutor);
metadataOps = hiveOps;

icebergHiveCatalog = IcebergUtils.createIcebergHiveCatalog(this, getName());
}

@Override
Expand Down Expand Up @@ -331,6 +337,10 @@ public boolean isEnableHmsEventsIncrementalSync() {
return enableHmsEventsIncrementalSync;
}

public HiveCatalog getIcebergHiveCatalog() {
return icebergHiveCatalog;
}

/**
* Enum for meta tables in hive catalog.
* eg: tbl$partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;

import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.hive.HiveCatalog;

import java.util.Map;

Expand All @@ -38,14 +34,7 @@ public IcebergHMSExternalCatalog(long catalogId, String name, String resource, M
@Override
protected void initCatalog() {
icebergCatalogType = ICEBERG_HMS;
HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
hiveCatalog.setConf(getConfiguration());
// initialize hive catalog
Map<String, String> catalogProperties = catalogProperty.getProperties();
String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
catalogProperties.put(CatalogProperties.URI, metastoreUris);
hiveCatalog.initialize(getName(), catalogProperties);
catalog = hiveCatalog;
catalog = IcebergUtils.createIcebergHiveCatalog(this, getName());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,22 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.thrift.TIcebergMetadataParams;

import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;
import org.jetbrains.annotations.NotNull;

import java.util.HashMap;
Expand Down Expand Up @@ -97,11 +94,6 @@ public Table getAndCloneTable(CatalogIf catalog, String dbName, String tbName) {
return restTable;
}

public Table getRemoteTable(CatalogIf catalog, String dbName, String tbName) {
IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, dbName, tbName);
return loadTable(key);
}

@NotNull
private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
Table icebergTable = getIcebergTable(key.catalog, key.dbName, key.tableName);
Expand All @@ -114,17 +106,13 @@ private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
private Table loadTable(IcebergMetadataCacheKey key) {
Catalog icebergCatalog;
if (key.catalog instanceof HMSExternalCatalog) {
HMSExternalCatalog ctg = (HMSExternalCatalog) key.catalog;
icebergCatalog = createIcebergHiveCatalog(
ctg.getHiveMetastoreUris(),
ctg.getCatalogProperty().getHadoopProperties(),
ctg.getProperties());
icebergCatalog = ((HMSExternalCatalog) key.catalog).getIcebergHiveCatalog();
} else if (key.catalog instanceof IcebergExternalCatalog) {
icebergCatalog = ((IcebergExternalCatalog) key.catalog).getCatalog();
} else {
throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table");
}
Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(key.catalog.getId(),
Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(((ExternalCatalog) key.catalog).getConfiguration(),
() -> icebergCatalog.loadTable(TableIdentifier.of(key.dbName, key.tableName)));
initIcebergTableFileIO(icebergTable, key.catalog.getProperties());
return icebergTable;
Expand Down Expand Up @@ -177,29 +165,6 @@ public void invalidateDbCache(long catalogId, String dbName) {
});
}

private Catalog createIcebergHiveCatalog(String uri, Map<String, String> hdfsConf, Map<String, String> props) {
// set hdfs configure
Configuration conf = DFSFileSystem.getHdfsConf(
hdfsConf.getOrDefault(DFSFileSystem.PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "").isEmpty());
for (Map.Entry<String, String> entry : hdfsConf.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
HiveCatalog hiveCatalog = new HiveCatalog();
hiveCatalog.setConf(conf);

if (props.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
props.put(HMSProperties.HIVE_METASTORE_URIS, uri);
props.put("uri", uri);
hiveCatalog.initialize("hive", props);
} else {
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, uri);
catalogProperties.put("uri", uri);
hiveCatalog.initialize("hive", catalogProperties);
}
return hiveCatalog;
}

private static void initIcebergTableFileIO(Table table, Map<String, String> props) {
Map<String, String> ioConf = new HashMap<>();
table.properties().forEach((key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.thrift.TExprOpcode;

import com.google.common.collect.Lists;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -63,6 +65,7 @@
import org.apache.iceberg.expressions.Not;
import org.apache.iceberg.expressions.Or;
import org.apache.iceberg.expressions.Unbound;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.LocationUtil;
Expand Down Expand Up @@ -553,7 +556,7 @@ public static org.apache.iceberg.Table getRemoteTable(ExternalCatalog catalog, S
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
.getRemoteTable(catalog, tableInfo.getDbName(), tableInfo.getTbName());
.getIcebergTable(catalog, tableInfo.getDbName(), tableInfo.getTbName());
}

private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog catalog, String dbName,
Expand Down Expand Up @@ -665,4 +668,16 @@ public static String dataLocation(Table table) {
}
return dataLocation;
}

public static HiveCatalog createIcebergHiveCatalog(ExternalCatalog externalCatalog, String name) {
HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
hiveCatalog.setConf(externalCatalog.getConfiguration());

Map<String, String> catalogProperties = externalCatalog.getProperties();
String metastoreUris = catalogProperties.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
catalogProperties.put(CatalogProperties.URI, metastoreUris);

hiveCatalog.initialize(name, catalogProperties);
return hiveCatalog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ suite("test_iceberg_table_stats", "p0,external,doris,external_docker,external_do
def retry = 0
def act = ""
while (retry < 10) {
sql """ select * from ${table_name} """
def result = sql """ show table stats ${table_name} """
act = result[0][2]
if (act != "-1") {
Expand Down

0 comments on commit 3c4425b

Please sign in to comment.