From 4dd3f0d01353ff7e1aa4c5da2c38ef3b2c6bd1eb Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Mon, 23 Dec 2024 11:41:22 +0800 Subject: [PATCH] fix --- .../datasource/ExternalMetaCacheMgr.java | 5 + .../datasource/hive/HMSExternalTable.java | 22 +- .../hive/HiveMetaStoreClientHelper.java | 9 + .../source/HudiCachedMetaClientProcessor.java | 152 ++++++++++++++ .../hudi/source/HudiMetadataCacheMgr.java | 32 +++ .../doris/datasource/hudi/HudiUtilsTest.java | 196 ++++++++++++++++++ 6 files changed, 402 insertions(+), 14 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index d4e4ff9edb18449..3b8f7b3071c5910 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -25,6 +25,7 @@ import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.datasource.hudi.source.HudiCachedFsViewProcessor; +import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor; import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr; import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor; import org.apache.doris.datasource.iceberg.IcebergMetadataCache; @@ -172,6 +173,10 @@ public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) { return hudiMetadataCacheMgr.getFsViewProcessor(catalog); } + public HudiCachedMetaClientProcessor getMetaClientProcessor(ExternalCatalog catalog) { + return hudiMetadataCacheMgr.getHudiMetaClientProcessor(catalog); + } + public HudiMetadataCacheMgr getHudiMetadataCacheMgr() { return hudiMetadataCacheMgr; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index c0db5cf9c9bbc69..d6c14c0fab1c76f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -120,9 +120,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI private static final String USE_HIVE_SYNC_PARTITION = "use_hive_sync_partition"; - private HoodieTableMetaClient hudiClient = null; - private final byte[] hudiClientLock = new byte[0]; - static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"); @@ -1024,16 +1021,13 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { } public HoodieTableMetaClient getHudiClient() { - if (hudiClient != null) { - return hudiClient; - } - synchronized (hudiClientLock) { - if (hudiClient != null) { - return hudiClient; - } - hudiClient = HudiUtils.buildHudiTableMetaClient( - getRemoteTable().getSd().getLocation(), catalog.getConfiguration()); - return hudiClient; - } + return Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getMetaClientProcessor(getCatalog()) + .getHoodieTableMetaClient( + getDbName(), + getName(), + getRemoteTable().getSd().getLocation(), + getCatalog().getConfiguration()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index ea5af4b854f5935..a66bf2cb3906b75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -807,6 +807,15 @@ public static Schema getHudiTableSchema(HMSExternalTable table) { HoodieTableMetaClient metaClient = table.getHudiClient(); TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); Schema hudiSchema; + + // Here, the timestamp should be reloaded again. + // Because when hudi obtains the schema in `getTableAvroSchema`, it needs to read the specified commit file, + // which is saved in the `metaClient`. + // But the `metaClient` is obtained from cache, so the file obtained may be an old file. + // This file may be deleted by hudi clean task, and an error will be reported. + // So, we should reload timeline so that we can read the latest commit files. + metaClient.reloadActiveTimeline(); + try { hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema()); } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java new file mode 100644 index 000000000000000..07726a54ffe5f10 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedMetaClientProcessor.java @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi.source; + +import org.apache.doris.common.CacheFactory; +import org.apache.doris.common.Config; +import org.apache.doris.datasource.ExternalMetaCacheMgr; +import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.Objects; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; + +public class HudiCachedMetaClientProcessor { + private static final Logger LOG = LogManager.getLogger(HudiCachedMetaClientProcessor.class); + private final LoadingCache hudiTableMetaClientCache; + + public HudiCachedMetaClientProcessor(ExecutorService executor) { + CacheFactory partitionCacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_cache_num, + true, + null); + + this.hudiTableMetaClientCache = + partitionCacheFactory.buildCache( + this::createHoodieTableMetaClient, + null, + executor); + } + + private HoodieTableMetaClient createHoodieTableMetaClient(HudiCachedClientKey key) { + LOG.debug("create hudi table meta client for {}.{}", key.getDbName(), key.getTbName()); + HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(key.getConf()); + return HiveMetaStoreClientHelper.ugiDoAs( + key.getConf(), + () -> HoodieTableMetaClient + .builder() + .setConf(hadoopStorageConfiguration) + .setBasePath(key.getHudiBasePath()) + .build()); + } + + public HoodieTableMetaClient getHoodieTableMetaClient( + String dbName, String tbName, String hudiBasePath, Configuration conf) { + return hudiTableMetaClientCache.get(new HudiCachedClientKey(dbName, tbName, hudiBasePath, conf)); + } + + public void cleanUp() { + hudiTableMetaClientCache.cleanUp(); + } + + public void invalidateAll() { + hudiTableMetaClientCache.invalidateAll(); + } + + public void invalidateDbCache(String dbName) { + hudiTableMetaClientCache.asMap().forEach((k, v) -> { + if (k.getDbName().equals(dbName)) { + hudiTableMetaClientCache.invalidate(k); + } + }); + } + + public void invalidateTableCache(String dbName, String tbName) { + hudiTableMetaClientCache.asMap().forEach((k, v) -> { + if (k.getDbName().equals(dbName) && k.getTbName().equals(tbName)) { + hudiTableMetaClientCache.invalidate(k); + } + }); + } + + private static class HudiCachedClientKey { + String dbName; + String tbName; + String hudiBasePath; + Configuration conf; + + public HudiCachedClientKey(String dbName, String tbName, String hudiBasePath, Configuration conf) { + this.dbName = dbName; + this.tbName = tbName; + this.hudiBasePath = hudiBasePath; + this.conf = conf; + } + + public String getDbName() { + return dbName; + } + + public String getTbName() { + return tbName; + } + + public String getHudiBasePath() { + return hudiBasePath; + } + + public Configuration getConf() { + return conf; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HudiCachedClientKey that = (HudiCachedClientKey) o; + return Objects.equals(dbName, that.dbName) && Objects.equals(tbName, that.tbName) + && Objects.equals(hudiBasePath, that.hudiBasePath); + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tbName, hudiBasePath); + } + } + + public Map> getCacheStats() { + Map> res = Maps.newHashMap(); + res.put("hudi_meta_client_cache", ExternalMetaCacheMgr.getCacheStats(hudiTableMetaClientCache.stats(), + hudiTableMetaClientCache.estimatedSize())); + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java index bcc41f18c0c3125..4ede5c73cfaa977 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiMetadataCacheMgr.java @@ -28,6 +28,8 @@ public class HudiMetadataCacheMgr { private final Map partitionProcessors = Maps.newConcurrentMap(); private final Map fsViewProcessors = Maps.newConcurrentMap(); + private final Map metaClientProcessors = Maps.newConcurrentMap(); + private final ExecutorService executor; public HudiMetadataCacheMgr(ExecutorService executor) { @@ -54,6 +56,16 @@ public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) { }); } + public HudiCachedMetaClientProcessor getHudiMetaClientProcessor(ExternalCatalog catalog) { + return metaClientProcessors.computeIfAbsent(catalog.getId(), catalogId -> { + if (catalog instanceof HMSExternalCatalog) { + return new HudiCachedMetaClientProcessor(executor); + } else { + throw new RuntimeException("Hudi only supports hive(or compatible) catalog now"); + } + }); + } + public void removeCache(long catalogId) { HudiPartitionProcessor partitionProcessor = partitionProcessors.remove(catalogId); if (partitionProcessor != null) { @@ -63,6 +75,10 @@ public void removeCache(long catalogId) { if (fsViewProcessor != null) { fsViewProcessor.cleanUp(); } + HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.remove(catalogId); + if (metaClientProcessor != null) { + metaClientProcessor.cleanUp(); + } } public void invalidateCatalogCache(long catalogId) { @@ -74,6 +90,10 @@ public void invalidateCatalogCache(long catalogId) { if (fsViewProcessor != null) { fsViewProcessor.invalidateAll(); } + HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.get(catalogId); + if (metaClientProcessor != null) { + metaClientProcessor.invalidateAll(); + } } public void invalidateDbCache(long catalogId, String dbName) { @@ -85,6 +105,10 @@ public void invalidateDbCache(long catalogId, String dbName) { if (fsViewProcessor != null) { fsViewProcessor.invalidateDbCache(dbName); } + HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.get(catalogId); + if (metaClientProcessor != null) { + metaClientProcessor.invalidateDbCache(dbName); + } } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -96,6 +120,10 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) if (fsViewProcessor != null) { fsViewProcessor.invalidateTableCache(dbName, tblName); } + HudiCachedMetaClientProcessor metaClientProcessor = metaClientProcessors.get(catalogId); + if (metaClientProcessor != null) { + metaClientProcessor.invalidateTableCache(dbName, tblName); + } } public Map> getCacheStats(ExternalCatalog catalog) { @@ -106,6 +134,10 @@ public Map> getCacheStats(ExternalCatalog catalog) { HudiCachedFsViewProcessor fsViewProcessor = getFsViewProcessor(catalog); res.putAll(fsViewProcessor.getCacheStats()); + + HudiCachedMetaClientProcessor metaClientProcessor = getHudiMetaClientProcessor(catalog); + res.putAll(metaClientProcessor.getCacheStats()); + return res; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java new file mode 100644 index 000000000000000..88b29f892cc02fa --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi; + +import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; + +import mockit.Mock; +import mockit.MockUp; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class HudiUtilsTest { + + @Test + public void testGetHudiSchemaWithCleanCommit() throws IOException { + + /* + example table: + CREATE TABLE tbx ( + c1 INT) + USING hudi + TBLPROPERTIES ( + 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS', + 'hoodie.clean.automatic' = 'true', + 'hoodie.cleaner.commits.retained' = '2' + ); + */ + + String commitContent1 = "{\n" + + " \"partitionToWriteStats\" : {\n" + + " \"\" : [ {\n" + + " \"fileId\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0\",\n" + + " \"path\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0_0-2164-2318_20241219214517936.parquet\",\n" + + " \"cdcStats\" : null,\n" + + " \"prevCommit\" : \"20241219214431757\",\n" + + " \"numWrites\" : 2,\n" + + " \"numDeletes\" : 0,\n" + + " \"numUpdateWrites\" : 0,\n" + + " \"numInserts\" : 1,\n" + + " \"totalWriteBytes\" : 434370,\n" + + " \"totalWriteErrors\" : 0,\n" + + " \"tempPath\" : null,\n" + + " \"partitionPath\" : \"\",\n" + + " \"totalLogRecords\" : 0,\n" + + " \"totalLogFilesCompacted\" : 0,\n" + + " \"totalLogSizeCompacted\" : 0,\n" + + " \"totalUpdatedRecordsCompacted\" : 0,\n" + + " \"totalLogBlocks\" : 0,\n" + + " \"totalCorruptLogBlock\" : 0,\n" + + " \"totalRollbackBlocks\" : 0,\n" + + " \"fileSizeInBytes\" : 434370,\n" + + " \"minEventTime\" : null,\n" + + " \"maxEventTime\" : null,\n" + + " \"runtimeStats\" : {\n" + + " \"totalScanTime\" : 0,\n" + + " \"totalUpsertTime\" : 87,\n" + + " \"totalCreateTime\" : 0\n" + + " }\n" + + " } ]\n" + + " },\n" + + " \"compacted\" : false,\n" + + " \"extraMetadata\" : {\n" + + " \"schema\" : \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"tbx_record\\\",\\\"namespace\\\":\\\"hoodie.tbx\\\",\\\"fields\\\":[{\\\"name\\\":\\\"c1\\\",\\\"type\\\":[\\\"null\\\",\\\"int\\\"],\\\"default\\\":null}]}\"\n" + + " },\n" + + " \"operationType\" : \"INSERT\"\n" + + "}"; + + String commitContent2 = "{\n" + + " \"partitionToWriteStats\" : {\n" + + " \"\" : [ {\n" + + " \"fileId\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0\",\n" + + " \"path\" : \"91b75cdf-e851-4524-b579-a9b08edd61d8-0_0-2180-2334_20241219214518880.parquet\",\n" + + " \"cdcStats\" : null,\n" + + " \"prevCommit\" : \"20241219214517936\",\n" + + " \"numWrites\" : 3,\n" + + " \"numDeletes\" : 0,\n" + + " \"numUpdateWrites\" : 0,\n" + + " \"numInserts\" : 1,\n" + + " \"totalWriteBytes\" : 434397,\n" + + " \"totalWriteErrors\" : 0,\n" + + " \"tempPath\" : null,\n" + + " \"partitionPath\" : \"\",\n" + + " \"totalLogRecords\" : 0,\n" + + " \"totalLogFilesCompacted\" : 0,\n" + + " \"totalLogSizeCompacted\" : 0,\n" + + " \"totalUpdatedRecordsCompacted\" : 0,\n" + + " \"totalLogBlocks\" : 0,\n" + + " \"totalCorruptLogBlock\" : 0,\n" + + " \"totalRollbackBlocks\" : 0,\n" + + " \"fileSizeInBytes\" : 434397,\n" + + " \"minEventTime\" : null,\n" + + " \"maxEventTime\" : null,\n" + + " \"runtimeStats\" : {\n" + + " \"totalScanTime\" : 0,\n" + + " \"totalUpsertTime\" : 86,\n" + + " \"totalCreateTime\" : 0\n" + + " }\n" + + " } ]\n" + + " },\n" + + " \"compacted\" : false,\n" + + " \"extraMetadata\" : {\n" + + " \"schema\" : \"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"tbx_record\\\",\\\"namespace\\\":\\\"hoodie.tbx\\\",\\\"fields\\\":[{\\\"name\\\":\\\"c1\\\",\\\"type\\\":[\\\"null\\\",\\\"int\\\"],\\\"default\\\":null}]}\"\n" + + " },\n" + + " \"operationType\" : \"INSERT\"\n" + + "}"; + + String propContent = "#Updated at 2024-12-19T13:44:32.166Z\n" + + "#Thu Dec 19 21:44:32 CST 2024\n" + + "hoodie.datasource.write.drop.partition.columns=false\n" + + "hoodie.table.type=COPY_ON_WRITE\n" + + "hoodie.archivelog.folder=archived\n" + + "hoodie.timeline.layout.version=1\n" + + "hoodie.table.version=6\n" + + "hoodie.table.metadata.partitions=files\n" + + "hoodie.database.name=mmc_hudi\n" + + "hoodie.datasource.write.partitionpath.urlencode=false\n" + + "hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator\n" + + "hoodie.table.name=tbx\n" + + "hoodie.table.metadata.partitions.inflight=\n" + + "hoodie.datasource.write.hive_style_partitioning=true\n" + + "hoodie.table.checksum=1632286010\n" + + "hoodie.table.create.schema={\"type\"\\:\"record\",\"name\"\\:\"tbx_record\",\"namespace\"\\:\"hoodie.tbx\",\"fields\"\\:[{\"name\"\\:\"c1\",\"type\"\\:[\"int\",\"null\"]}]}"; + + + // 1. prepare table path + Path hudiTable = Files.createTempDirectory("hudiTable"); + File meta = new File(hudiTable + "/.hoodie"); + Assert.assertTrue(meta.mkdirs()); + + new MockUp(HMSExternalTable.class) { + @Mock + public org.apache.hadoop.hive.metastore.api.Table getRemoteTable() { + Table table = new Table(); + StorageDescriptor storageDescriptor = new StorageDescriptor(); + storageDescriptor.setLocation("file://" + hudiTable.toAbsolutePath()); + table.setSd(storageDescriptor); + return table; + } + }; + + // 2. generate properties and commit + File prop = new File(meta + "/hoodie.properties"); + Files.write(prop.toPath(), propContent.getBytes()); + File commit1 = new File(meta + "/1.commit"); + Files.write(commit1.toPath(), commitContent1.getBytes()); + + // 3. now, we can get the schema from this table. + HMSExternalCatalog catalog = new HMSExternalCatalog(); + HMSExternalTable hmsExternalTable = new HMSExternalTable(1, "tb", "db", catalog); + HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable); + + // 4. delete the commit file, + // this operation is used to imitate the clean operation in hudi + Assert.assertTrue(commit1.delete()); + + // 5. generate a new commit + File commit2 = new File(meta + "/2.commit"); + Files.write(commit2.toPath(), commitContent2.getBytes()); + + // 6. we should get schema correctly + // because we will refresh timeline in this `getHudiTableSchema` method, + // and we can get the latest commit. + // so that this error: `Could not read commit details from file /.hoodie/1.commit` will be not reported. + HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable); + + // 7. clean up + Assert.assertTrue(commit2.delete()); + Assert.assertTrue(prop.delete()); + Assert.assertTrue(meta.delete()); + Files.delete(hudiTable); + } +}