Skip to content

Commit

Permalink
添加HDFS文件检查接口并优化相关功能
Browse files Browse the repository at this point in the history
  • Loading branch information
“v_kkhuang” committed Jan 23, 2025
1 parent 1e78a3c commit 7a66e10
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public String getDefaultFolderPerm() {

public abstract String checkSum(FsPath dest) throws IOException;

public abstract long getBlockSize(FsPath dest) throws IOException;

public abstract boolean canExecute(FsPath dest) throws IOException;

public abstract boolean setOwner(FsPath dest, String user, String group) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,4 +513,13 @@ public String checkSum(FsPath dest) throws IOException {
(MD5MD5CRC32FileChecksum) fs.getFileChecksum(new Path(path));
return fileChecksum.toString().split(":")[1];
}

@Override
public long getBlockSize(FsPath dest) throws IOException {
String path = checkHDFSPath(dest.getPath());
if (!exists(dest)) {
throw new IOException("directory or file not exists: " + path);
}
return fs.getBlockSize(new Path(path));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -515,4 +515,9 @@ public long getLength(FsPath dest) throws IOException {
public String checkSum(FsPath dest) {
return null;
}

@Override
public long getBlockSize(FsPath dest) {
return 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class WorkSpaceConstants {
"File permissions prohibit modification of unreadable files: {0} (文件权限禁止修改不可读: {0})";
public static final String FILEPATH_ILLEGAL_SYMBOLS =
"File path illegal symbols : {0} (文件路径结果集路径包含非法字符 : {0})";
public static final String FILEPATH_ILLEGAL =
"Result set path encryption failed, please check if the path is legal : {0} (文件路径加密失败,请检查路径是否合法 : {0})";

public static final String HIVE_FILEPATH_ILLEGAL_SYMBOLS =
"Hive file HDFS path is illegal: {0}, please use/live/workplace/related path (Hive文件hdfs路径非法: {0},请使用/hive/warehouse/相关路径并指定到表级别)";
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.linkis.storage.source.FileSource$;
import org.apache.linkis.storage.utils.StorageUtils;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BOMInputStream;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -66,6 +67,9 @@
import java.nio.file.Paths;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.JsonNode;
import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
Expand Down Expand Up @@ -1414,7 +1418,7 @@ public Message encryptPath(
throws WorkSpaceException, IOException {
String username = ModuleUserUtils.getOperationUser(req, "encrypt-path " + filePath);
if (StringUtils.isEmpty(filePath)) {
return Message.error(MessageFormat.format(PARAMETER_NOT_BLANK, "restultPath"));
return Message.error(MessageFormat.format(PARAMETER_NOT_BLANK, "filePath"));
}
if (!WorkspaceUtil.filePathRegexPattern.matcher(filePath).find()) {
return Message.error(MessageFormat.format(FILEPATH_ILLEGAL_SYMBOLS, filePath));
Expand All @@ -1423,4 +1427,64 @@ public Message encryptPath(
String fileMD5Str = fs.checkSum(new FsPath(filePath));
return Message.ok().data("data", fileMD5Str);
}

@ApiOperation(value = "check-hdfs-files", notes = "encrypt file path", response = Message.class)
@ApiImplicitParams({
@ApiImplicitParam(name = "filePath", required = true, dataType = "String", value = "Path")
})
@RequestMapping(path = "/check-hdfs-files", method = RequestMethod.GET)
public Message checkHdfsFiles(
HttpServletRequest req, @RequestParam(value = "filePath", required = false) String filePath)
throws WorkSpaceException, IOException {
String username = ModuleUserUtils.getOperationUser(req, "check-hdfs-files " + filePath);
if (StringUtils.isEmpty(filePath)) {
return Message.error(MessageFormat.format(PARAMETER_NOT_BLANK, "filePath"));
}
if (!WorkspaceUtil.filePathRegexPattern.matcher(filePath).find()) {
return Message.error(MessageFormat.format(FILEPATH_ILLEGAL_SYMBOLS, filePath));
}

if (!WorkspaceUtil.hiveFilePathRegexPattern.matcher(filePath).find()) {
return Message.error(MessageFormat.format(HIVE_FILEPATH_ILLEGAL_SYMBOLS, filePath));
}

FsPath fsPath = new FsPath(filePath);
FileSystem fs = fsService.getFileSystem(username, fsPath);
if (!fs.exists(fsPath)) {
return Message.error(MessageFormat.format(FILEPATH_ILLEGAL_SYMBOLS, filePath));
}
List<Map<String, Object>> resultMap = new ArrayList<>();
List<FsPath> list = fs.list(fsPath);
if (CollectionUtils.isNotEmpty(list)) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
list.forEach(
path -> {
executorService.submit(
() -> {
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("checkSum", fs.checkSum(path));
dataMap.put("blockSize", fs.getBlockSize(path));
dataMap.put("path", path.getPath());
synchronized (resultMap) {
resultMap.add(dataMap);
}
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
});
});
// 关闭线程池并等待所有任务完成
executorService.shutdown();
try {
if (!executorService.awaitTermination(60L, TimeUnit.MINUTES)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
return Message.ok().data("fileDataList", resultMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ public class WorkspaceUtil {
public static String allReg = "(.*?)";

public static String filePathReg = "^[a-zA-Z0-9-\\d_.:/]+$";
public static String hiveFilePathReg = "\\/hive\\/warehouse\\/.*db\\/.*\\/";

public static Pattern filePathRegexPattern = Pattern.compile(filePathReg);
public static Pattern hiveFilePathRegexPattern = Pattern.compile(hiveFilePathReg);

public static List<LogLevel.Type> logReg = new ArrayList<>();

Expand Down

0 comments on commit 7a66e10

Please sign in to comment.