From 65360bfb10372210be56ec4887f92d3d7083e7c4 Mon Sep 17 00:00:00 2001 From: RyanZ Date: Thu, 19 Sep 2024 14:08:14 +0800 Subject: [PATCH] [BugFix] avoid hudi context npe when cache reload in background (#50987) Signed-off-by: yanz Signed-off-by: zhiminr.ren <1240388654@qq.com> --- .../connector/RemoteFileScanContext.java | 9 ++++++-- .../starrocks/connector/RemotePathKey.java | 8 +++++++ .../connector/hudi/HudiRemoteFileIO.java | 21 ++++++++++++++++--- .../connector/RemoteFileOperationsTest.java | 21 ++++++++++++++++++- 4 files changed, 53 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/RemoteFileScanContext.java b/fe/fe-core/src/main/java/com/starrocks/connector/RemoteFileScanContext.java index ee00f471d341b..5037a5970cbc5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/RemoteFileScanContext.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/RemoteFileScanContext.java @@ -28,10 +28,15 @@ */ public class RemoteFileScanContext { public RemoteFileScanContext(Table table) { - this.table = table; + this.tableLocation = table.getTableLocation(); } - public Table table = null; + public RemoteFileScanContext(String tableLocation) { + this.tableLocation = tableLocation; + } + + public String tableLocation = null; + // ---- concurrent initialization ----- public AtomicBoolean init = new AtomicBoolean(false); public ReentrantLock lock = new ReentrantLock(); diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/RemotePathKey.java b/fe/fe-core/src/main/java/com/starrocks/connector/RemotePathKey.java index deccf5161cb8c..3bc32f2beba77 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/RemotePathKey.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/RemotePathKey.java @@ -20,6 +20,7 @@ public class RemotePathKey { private final String path; private final boolean isRecursive; private RemoteFileScanContext scanContext; + private String tableLocation; public static RemotePathKey of(String path, boolean isRecursive) { return new RemotePathKey(path, isRecursive); @@ -28,6 +29,8 @@ public static RemotePathKey of(String path, boolean isRecursive) { public RemotePathKey(String path, boolean isRecursive) { this.path = path; this.isRecursive = isRecursive; + this.scanContext = null; + this.tableLocation = null; } public boolean approximateMatchPath(String basePath, boolean isRecursive) { @@ -40,6 +43,10 @@ public String getPath() { return path; } + public String getTableLocation() { + return tableLocation; + } + public boolean isRecursive() { return isRecursive; } @@ -79,6 +86,7 @@ public void drop() { public void setScanContext(RemoteFileScanContext ctx) { scanContext = ctx; + tableLocation = ctx.tableLocation; } public RemoteFileScanContext getScanContext() { diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiRemoteFileIO.java b/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiRemoteFileIO.java index ec94802788494..7bf30f84b9c1a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiRemoteFileIO.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiRemoteFileIO.java @@ -14,6 +14,7 @@ package com.starrocks.connector.hudi; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -40,6 +41,7 @@ import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; import java.util.Iterator; import java.util.List; @@ -69,7 +71,7 @@ private void createHudiContext(RemoteFileScanContext ctx) { HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(configuration); HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build(); HoodieTableMetaClient metaClient = - HoodieTableMetaClient.builder().setConf(configuration).setBasePath(ctx.table.getTableLocation()).build(); + HoodieTableMetaClient.builder().setConf(configuration).setBasePath(ctx.tableLocation).build(); // metaClient.reloadActiveTimeline(); HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); Option lastInstant = timeline.lastInstant(); @@ -86,11 +88,13 @@ private void createHudiContext(RemoteFileScanContext ctx) { @Override public Map> getRemoteFiles(RemotePathKey pathKey) { - RemoteFileScanContext scanContext = pathKey.getScanContext(); - String tableLocation = scanContext.table.getTableLocation(); + String tableLocation = pathKey.getTableLocation(); if (tableLocation == null) { throw new StarRocksConnectorException("Missing hudi table base location on %s", pathKey); } + // scan context allows `getRemoteFiles` on set of `pathKey` to share a same context and avoid duplicated function calls. + // so in most cases, scan context has been created and set outside, so scan context is not nullptr. + RemoteFileScanContext scanContext = getScanContext(pathKey, tableLocation); String partitionPath = pathKey.getPath(); String partitionName = FSUtils.getRelativePartitionPath(new StoragePath(tableLocation), new StoragePath(partitionPath)); @@ -124,6 +128,17 @@ public Map> getRemoteFiles(RemotePathKey pat return resultPartitions.put(pathKey, fileDescs).build(); } + @NotNull + @VisibleForTesting + public static RemoteFileScanContext getScanContext(RemotePathKey pathKey, String tableLocation) { + RemoteFileScanContext scanContext = pathKey.getScanContext(); + // scan context is nullptr when cache is doing reload, and we don't have place to set scan context. + if (scanContext == null) { + scanContext = new RemoteFileScanContext(tableLocation); + } + return scanContext; + } + @Override public FileStatus[] getFileStatus(Path... files) { throw new UnsupportedOperationException("getFileStatus"); diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/RemoteFileOperationsTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/RemoteFileOperationsTest.java index 4c5aa8ff7221b..68c3c74d6b840 100644 --- a/fe/fe-core/src/test/java/com/starrocks/connector/RemoteFileOperationsTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/connector/RemoteFileOperationsTest.java @@ -16,6 +16,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.starrocks.catalog.HudiTable; import com.starrocks.common.ExceptionChecker; import com.starrocks.common.FeConstants; import com.starrocks.connector.exception.StarRocksConnectorException; @@ -28,6 +29,7 @@ import com.starrocks.connector.hive.Partition; import com.starrocks.connector.hive.RemoteFileInputFormat; import com.starrocks.connector.hive.TextFileFormatDesc; +import com.starrocks.connector.hudi.HudiRemoteFileIO; import mockit.Mock; import mockit.MockUp; import org.apache.hadoop.conf.Configuration; @@ -73,7 +75,8 @@ public void testGetHiveRemoteFiles() { Map partitions = metastore.getPartitionsByNames("db1", "table1", partitionNames); List remoteFileInfos = - ops.getRemoteFiles(null, Lists.newArrayList(partitions.values()), GetRemoteFilesParams.newBuilder().build()); + ops.getRemoteFiles(new HudiTable(), Lists.newArrayList(partitions.values()), + GetRemoteFilesParams.newBuilder().build()); Assert.assertEquals(2, remoteFileInfos.size()); Assert.assertTrue(remoteFileInfos.get(0).toString().contains("emoteFileInfo{format=ORC, files=[")); @@ -332,4 +335,20 @@ public long getModifiedTime() { } } + @Test + public void testRemotePathKeySetFileScanContext() { + RemotePathKey pathKey = new RemotePathKey("hello", true); + Assert.assertNull(pathKey.getTableLocation()); + Assert.assertNull(pathKey.getScanContext()); + + RemoteFileScanContext scanContext = null; + scanContext = HudiRemoteFileIO.getScanContext(pathKey, "tableLocation"); + Assert.assertNotNull(scanContext); + pathKey.setScanContext(scanContext); + Assert.assertEquals(pathKey.getTableLocation(), "tableLocation"); + Assert.assertTrue(pathKey.getScanContext() == scanContext); + + RemoteFileScanContext scanContext1 = HudiRemoteFileIO.getScanContext(pathKey, "null"); + Assert.assertTrue(pathKey.getScanContext() == scanContext1); + } }