Skip to content

Commit

Permalink
[BugFix] avoid hudi context npe when cache reload in background (#50987)
Browse files Browse the repository at this point in the history
Signed-off-by: yanz <[email protected]>
(cherry picked from commit 7e9eb06)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/connector/RemoteFileScanContext.java
#	fe/fe-core/src/main/java/com/starrocks/connector/RemotePathKey.java
#	fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiRemoteFileIO.java
#	fe/fe-core/src/test/java/com/starrocks/connector/RemoteFileOperationsTest.java
  • Loading branch information
dirtysalt authored and mergify[bot] committed Sep 19, 2024
1 parent fa22ef6 commit 9bb3eca
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector;

import com.starrocks.catalog.Table;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

/*
When doing concurrent loading remote files, some variables can be shared and reused to save costs.
And in this context, we maintain fields can be shared and reused.
*/
public class RemoteFileScanContext {
public RemoteFileScanContext(Table table) {
this.tableLocation = table.getTableLocation();
}

public RemoteFileScanContext(String tableLocation) {
this.tableLocation = tableLocation;
}

public String tableLocation = null;

// ---- concurrent initialization -----
public AtomicBoolean init = new AtomicBoolean(false);
public ReentrantLock lock = new ReentrantLock();

// ---- hudi related fields -----
public HoodieTableFileSystemView hudiFsView = null;
public HoodieTimeline hudiTimeline = null;
public HoodieInstant hudiLastInstant = null;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
public class RemotePathKey {
private final String path;
private final boolean isRecursive;
<<<<<<< HEAD
=======
private RemoteFileScanContext scanContext;
private String tableLocation;
>>>>>>> 7e9eb06217 ([BugFix] avoid hudi context npe when cache reload in background (#50987))

// The table location must exist in HudiTable
private final Optional<String> hudiTableLocation;
Expand Down Expand Up @@ -53,7 +58,12 @@ public static RemotePathKey of(String path, boolean isRecursive, Optional<String
public RemotePathKey(String path, boolean isRecursive, Optional<String> hudiTableLocation) {
this.path = path;
this.isRecursive = isRecursive;
<<<<<<< HEAD
this.hudiTableLocation = hudiTableLocation;
=======
this.scanContext = null;
this.tableLocation = null;
>>>>>>> 7e9eb06217 ([BugFix] avoid hudi context npe when cache reload in background (#50987))
}

public boolean approximateMatchPath(String basePath, boolean isRecursive) {
Expand All @@ -66,6 +76,10 @@ public String getPath() {
return path;
}

public String getTableLocation() {
return tableLocation;
}

public boolean isRecursive() {
return isRecursive;
}
Expand Down Expand Up @@ -111,8 +125,14 @@ public void drop() {
}
}

<<<<<<< HEAD
public void setHudiContext(HudiContext ctx) {
hudiContext = ctx;
=======
public void setScanContext(RemoteFileScanContext ctx) {
scanContext = ctx;
tableLocation = ctx.tableLocation;
>>>>>>> 7e9eb06217 ([BugFix] avoid hudi context npe when cache reload in background (#50987))
}

public HudiContext getHudiContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.starrocks.connector.hudi;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -37,6 +38,7 @@
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -70,7 +72,11 @@ private void createHudiContext(RemotePathKey.HudiContext ctx, String hudiTableLo
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(configuration);
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieTableMetaClient metaClient =
<<<<<<< HEAD
HoodieTableMetaClient.builder().setConf(configuration).setBasePath(hudiTableLocation).build();
=======
HoodieTableMetaClient.builder().setConf(configuration).setBasePath(ctx.tableLocation).build();
>>>>>>> 7e9eb06217 ([BugFix] avoid hudi context npe when cache reload in background (#50987))
// metaClient.reloadActiveTimeline();
HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
Option<HoodieInstant> lastInstant = timeline.lastInstant();
Expand All @@ -87,8 +93,18 @@ private void createHudiContext(RemotePathKey.HudiContext ctx, String hudiTableLo

@Override
public Map<RemotePathKey, List<RemoteFileDesc>> getRemoteFiles(RemotePathKey pathKey) {
<<<<<<< HEAD
String tableLocation = pathKey.getHudiTableLocation().orElseThrow(() ->
new StarRocksConnectorException("Missing hudi table base location on %s", pathKey));
=======
String tableLocation = pathKey.getTableLocation();
if (tableLocation == null) {
throw new StarRocksConnectorException("Missing hudi table base location on %s", pathKey);
}
// scan context allows `getRemoteFiles` on set of `pathKey` to share a same context and avoid duplicated function calls.
// so in most cases, scan context has been created and set outside, so scan context is not nullptr.
RemoteFileScanContext scanContext = getScanContext(pathKey, tableLocation);
>>>>>>> 7e9eb06217 ([BugFix] avoid hudi context npe when cache reload in background (#50987))

String partitionPath = pathKey.getPath();
String partitionName = FSUtils.getRelativePartitionPath(new StoragePath(tableLocation), new StoragePath(partitionPath));
Expand Down Expand Up @@ -124,4 +140,22 @@ public Map<RemotePathKey, List<RemoteFileDesc>> getRemoteFiles(RemotePathKey pat
return resultPartitions.put(pathKey, fileDescs).build();
}

<<<<<<< HEAD
=======
@NotNull
@VisibleForTesting
public static RemoteFileScanContext getScanContext(RemotePathKey pathKey, String tableLocation) {
RemoteFileScanContext scanContext = pathKey.getScanContext();
// scan context is nullptr when cache is doing reload, and we don't have place to set scan context.
if (scanContext == null) {
scanContext = new RemoteFileScanContext(tableLocation);
}
return scanContext;
}

@Override
public FileStatus[] getFileStatus(Path... files) {
throw new UnsupportedOperationException("getFileStatus");
}
>>>>>>> 7e9eb06217 ([BugFix] avoid hudi context npe when cache reload in background (#50987))
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
package com.starrocks.connector;

import com.google.common.collect.Lists;
<<<<<<< HEAD
=======
import com.google.common.collect.Maps;
import com.starrocks.catalog.HudiTable;
>>>>>>> 7e9eb06217 ([BugFix] avoid hudi context npe when cache reload in background (#50987))
import com.starrocks.common.ExceptionChecker;
import com.starrocks.common.FeConstants;
import com.starrocks.connector.exception.StarRocksConnectorException;
Expand All @@ -26,6 +31,11 @@
import com.starrocks.connector.hive.MockedRemoteFileSystem;
import com.starrocks.connector.hive.Partition;
import com.starrocks.connector.hive.RemoteFileInputFormat;
<<<<<<< HEAD
=======
import com.starrocks.connector.hive.TextFileFormatDesc;
import com.starrocks.connector.hudi.HudiRemoteFileIO;
>>>>>>> 7e9eb06217 ([BugFix] avoid hudi context npe when cache reload in background (#50987))
import mockit.Mock;
import mockit.MockUp;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -69,7 +79,13 @@ public void testGetHiveRemoteFiles() {
List<String> partitionNames = Lists.newArrayList("col1=1", "col1=2");
Map<String, Partition> partitions = metastore.getPartitionsByNames("db1", "table1", partitionNames);

<<<<<<< HEAD
List<RemoteFileInfo> remoteFileInfos = ops.getRemoteFiles(Lists.newArrayList(partitions.values()));
=======
List<RemoteFileInfo> remoteFileInfos =
ops.getRemoteFiles(new HudiTable(), Lists.newArrayList(partitions.values()),
GetRemoteFilesParams.newBuilder().build());
>>>>>>> 7e9eb06217 ([BugFix] avoid hudi context npe when cache reload in background (#50987))
Assert.assertEquals(2, remoteFileInfos.size());
Assert.assertTrue(remoteFileInfos.get(0).toString().contains("emoteFileInfo{format=ORC, files=["));

Expand Down Expand Up @@ -273,4 +289,78 @@ public FileSystem get(URI uri, Configuration conf) throws IOException {
"file name or query id is invalid",
() -> ops.removeNotCurrentQueryFiles(targetPath, "aaa"));
}
<<<<<<< HEAD
=======

@Test
public void testGetRemotePartitions() {
List<String> partitionNames = Lists.newArrayList("dt=20200101", "dt=20200102", "dt=20200103");
List<Partition> partitionList = Lists.newArrayList();
List<FileStatus> fileStatusList = Lists.newArrayList();
long modificationTime = 1000;
for (String name : partitionNames) {
Map<String, String> parameters = Maps.newHashMap();
TextFileFormatDesc formatDesc = new TextFileFormatDesc("a", "b", "c", "d");
String fullPath = "hdfs://path_to_table/" + name;
Partition partition = new Partition(parameters, RemoteFileInputFormat.PARQUET, formatDesc, fullPath, true);
partitionList.add(partition);

Path filePath = new Path(fullPath + "/00000_0");
FileStatus fileStatus = new FileStatus(100000, false, 1, 256, modificationTime++, filePath);
fileStatusList.add(fileStatus);
}

FileStatus[] fileStatuses = fileStatusList.toArray(new FileStatus[0]);

new MockUp<RemoteFileOperations>() {
@Mock
public FileStatus[] getFileStatus(Path... paths) {
return fileStatuses;
}
};

RemoteFileOperations ops = new RemoteFileOperations(null, null, null,
false, true, null);
List<PartitionInfo> partitions = ops.getRemotePartitions(partitionList);
Assert.assertEquals(3, partitions.size());
for (int i = 0; i < partitionNames.size(); i++) {
Assert.assertEquals(partitions.get(i).getFullPath(), "hdfs://path_to_table/" + partitionNames.get((i)));
}
}

@Test
public void testAnonPartitionInfo() {
{
PartitionInfo x = new PartitionInfo() {
@Override
public long getModifiedTime() {
return 0;
}
};
Assert.assertThrows(UnsupportedOperationException.class, () -> {
x.getFileFormat();
});
Assert.assertThrows(UnsupportedOperationException.class, () -> {
x.getFullPath();
});
}
}

@Test
public void testRemotePathKeySetFileScanContext() {
RemotePathKey pathKey = new RemotePathKey("hello", true);
Assert.assertNull(pathKey.getTableLocation());
Assert.assertNull(pathKey.getScanContext());

RemoteFileScanContext scanContext = null;
scanContext = HudiRemoteFileIO.getScanContext(pathKey, "tableLocation");
Assert.assertNotNull(scanContext);
pathKey.setScanContext(scanContext);
Assert.assertEquals(pathKey.getTableLocation(), "tableLocation");
Assert.assertTrue(pathKey.getScanContext() == scanContext);

RemoteFileScanContext scanContext1 = HudiRemoteFileIO.getScanContext(pathKey, "null");
Assert.assertTrue(pathKey.getScanContext() == scanContext1);
}
>>>>>>> 7e9eb06217 ([BugFix] avoid hudi context npe when cache reload in background (#50987))
}

0 comments on commit 9bb3eca

Please sign in to comment.