Skip to content

Commit

Permalink
[BugFix] avoid hudi context npe when cache reload in background (Star…
Browse files Browse the repository at this point in the history
…Rocks#50987)

Signed-off-by: yanz <[email protected]>
Signed-off-by: zhiminr.ren <[email protected]>
  • Loading branch information
dirtysalt authored and renzhimin7 committed Nov 7, 2024
1 parent 10cd446 commit 65360bf
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@
*/
public class RemoteFileScanContext {
public RemoteFileScanContext(Table table) {
this.table = table;
this.tableLocation = table.getTableLocation();
}

public Table table = null;
public RemoteFileScanContext(String tableLocation) {
this.tableLocation = tableLocation;
}

public String tableLocation = null;

// ---- concurrent initialization -----
public AtomicBoolean init = new AtomicBoolean(false);
public ReentrantLock lock = new ReentrantLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class RemotePathKey {
private final String path;
private final boolean isRecursive;
private RemoteFileScanContext scanContext;
private String tableLocation;

public static RemotePathKey of(String path, boolean isRecursive) {
return new RemotePathKey(path, isRecursive);
Expand All @@ -28,6 +29,8 @@ public static RemotePathKey of(String path, boolean isRecursive) {
public RemotePathKey(String path, boolean isRecursive) {
this.path = path;
this.isRecursive = isRecursive;
this.scanContext = null;
this.tableLocation = null;
}

public boolean approximateMatchPath(String basePath, boolean isRecursive) {
Expand All @@ -40,6 +43,10 @@ public String getPath() {
return path;
}

public String getTableLocation() {
return tableLocation;
}

public boolean isRecursive() {
return isRecursive;
}
Expand Down Expand Up @@ -79,6 +86,7 @@ public void drop() {

public void setScanContext(RemoteFileScanContext ctx) {
scanContext = ctx;
tableLocation = ctx.tableLocation;
}

public RemoteFileScanContext getScanContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.starrocks.connector.hudi;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -40,6 +41,7 @@
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -69,7 +71,7 @@ private void createHudiContext(RemoteFileScanContext ctx) {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(configuration);
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(configuration).setBasePath(ctx.table.getTableLocation()).build();
HoodieTableMetaClient.builder().setConf(configuration).setBasePath(ctx.tableLocation).build();
// metaClient.reloadActiveTimeline();
HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
Option<HoodieInstant> lastInstant = timeline.lastInstant();
Expand All @@ -86,11 +88,13 @@ private void createHudiContext(RemoteFileScanContext ctx) {

@Override
public Map<RemotePathKey, List<RemoteFileDesc>> getRemoteFiles(RemotePathKey pathKey) {
RemoteFileScanContext scanContext = pathKey.getScanContext();
String tableLocation = scanContext.table.getTableLocation();
String tableLocation = pathKey.getTableLocation();
if (tableLocation == null) {
throw new StarRocksConnectorException("Missing hudi table base location on %s", pathKey);
}
// scan context allows `getRemoteFiles` on set of `pathKey` to share a same context and avoid duplicated function calls.
// so in most cases, scan context has been created and set outside, so scan context is not nullptr.
RemoteFileScanContext scanContext = getScanContext(pathKey, tableLocation);

String partitionPath = pathKey.getPath();
String partitionName = FSUtils.getRelativePartitionPath(new StoragePath(tableLocation), new StoragePath(partitionPath));
Expand Down Expand Up @@ -124,6 +128,17 @@ public Map<RemotePathKey, List<RemoteFileDesc>> getRemoteFiles(RemotePathKey pat
return resultPartitions.put(pathKey, fileDescs).build();
}

@NotNull
@VisibleForTesting
public static RemoteFileScanContext getScanContext(RemotePathKey pathKey, String tableLocation) {
RemoteFileScanContext scanContext = pathKey.getScanContext();
// scan context is nullptr when cache is doing reload, and we don't have place to set scan context.
if (scanContext == null) {
scanContext = new RemoteFileScanContext(tableLocation);
}
return scanContext;
}

@Override
public FileStatus[] getFileStatus(Path... files) {
throw new UnsupportedOperationException("getFileStatus");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.catalog.HudiTable;
import com.starrocks.common.ExceptionChecker;
import com.starrocks.common.FeConstants;
import com.starrocks.connector.exception.StarRocksConnectorException;
Expand All @@ -28,6 +29,7 @@
import com.starrocks.connector.hive.Partition;
import com.starrocks.connector.hive.RemoteFileInputFormat;
import com.starrocks.connector.hive.TextFileFormatDesc;
import com.starrocks.connector.hudi.HudiRemoteFileIO;
import mockit.Mock;
import mockit.MockUp;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -73,7 +75,8 @@ public void testGetHiveRemoteFiles() {
Map<String, Partition> partitions = metastore.getPartitionsByNames("db1", "table1", partitionNames);

List<RemoteFileInfo> remoteFileInfos =
ops.getRemoteFiles(null, Lists.newArrayList(partitions.values()), GetRemoteFilesParams.newBuilder().build());
ops.getRemoteFiles(new HudiTable(), Lists.newArrayList(partitions.values()),
GetRemoteFilesParams.newBuilder().build());
Assert.assertEquals(2, remoteFileInfos.size());
Assert.assertTrue(remoteFileInfos.get(0).toString().contains("emoteFileInfo{format=ORC, files=["));

Expand Down Expand Up @@ -332,4 +335,20 @@ public long getModifiedTime() {
}
}

@Test
public void testRemotePathKeySetFileScanContext() {
RemotePathKey pathKey = new RemotePathKey("hello", true);
Assert.assertNull(pathKey.getTableLocation());
Assert.assertNull(pathKey.getScanContext());

RemoteFileScanContext scanContext = null;
scanContext = HudiRemoteFileIO.getScanContext(pathKey, "tableLocation");
Assert.assertNotNull(scanContext);
pathKey.setScanContext(scanContext);
Assert.assertEquals(pathKey.getTableLocation(), "tableLocation");
Assert.assertTrue(pathKey.getScanContext() == scanContext);

RemoteFileScanContext scanContext1 = HudiRemoteFileIO.getScanContext(pathKey, "null");
Assert.assertTrue(pathKey.getScanContext() == scanContext1);
}
}

0 comments on commit 65360bf

Please sign in to comment.