Skip to content

Commit

Permalink
[BugFix] Avoid hdfs fs manager interrupting the thread when exception…
Browse files Browse the repository at this point in the history
… occurs (#48403)

Signed-off-by: xiangguangyxg <[email protected]>
(cherry picked from commit 3fcafac)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java
  • Loading branch information
xiangguangyxg authored and mergify[bot] committed Jul 22, 2024
1 parent 833c723 commit 705f7f3
Showing 1 changed file with 96 additions and 0 deletions.
96 changes: 96 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/fs/hdfs/HdfsFsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@

import java.io.FileNotFoundException;
import java.io.IOException;
<<<<<<< HEAD
import java.net.URISyntaxException;
=======
import java.io.InterruptedIOException;
>>>>>>> 3fcafac8b9 ([BugFix] Avoid hdfs fs manager interrupting the thread when exception occurs (#48403))
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Enumeration;
Expand Down Expand Up @@ -1003,8 +1007,33 @@ public void getTProperties(String path, Map<String, String> loadProperties, THdf
return;
}

<<<<<<< HEAD
public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> loadProperties)
throws UserException {
=======
public List<FileStatus> listFileMeta(String path, Map<String, String> properties) throws UserException {
WildcardURI pathUri = new WildcardURI(path);
HdfsFs fileSystem = getFileSystem(path, properties, null);
Path pathPattern = new Path(pathUri.getPath());
try {
FileStatus[] files = fileSystem.getDFSFileSystem().globStatus(pathPattern);
return Lists.newArrayList(files);
} catch (FileNotFoundException e) {
LOG.info("file not found: " + path, e);
throw new UserException("file not found: " + path, e);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while get file status: " + path, e);
throw new UserException("Failed to get file status: " + path, e); // throw unified user exception
} catch (Exception e) {
LOG.error("errors while get file status ", e);
throw new UserException("Fail to get file status: " + e.getMessage(), e);
}
}

public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> loadProperties)
throws UserException {
>>>>>>> 3fcafac8b9 ([BugFix] Avoid hdfs fs manager interrupting the thread when exception occurs (#48403))
List<TBrokerFileStatus> resultFileStatus = null;
WildcardURI pathUri = new WildcardURI(path);
HdfsFs fileSystem = getFileSystem(path, loadProperties, null);
Expand Down Expand Up @@ -1036,8 +1065,22 @@ public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<S
resultFileStatus.add(brokerFileStatus);
}
} catch (FileNotFoundException e) {
<<<<<<< HEAD
LOG.info("file not found: " + e.getMessage());
throw new UserException("file not found: " + e.getMessage());
=======
LOG.info("file not found: " + path, e);
throw new UserException("file not found: " + path, e);
} catch (IllegalArgumentException e) {
LOG.error("The arguments of blob store(S3/Azure) may be wrong. You can check " +
"the arguments like region, IAM, instance profile and so on.");
throw new UserException("The arguments of blob store(S3/Azure) may be wrong. " +
"You can check the arguments like region, IAM, instance profile and so on.", e);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while list path: " + path, e);
throw new UserException("Failed to list path: " + path, e); // throw unified user exception
>>>>>>> 3fcafac8b9 ([BugFix] Avoid hdfs fs manager interrupting the thread when exception occurs (#48403))
} catch (Exception e) {
LOG.error("errors while get file status ", e);
throw new UserException("unknown error when get file status: " + e.getMessage());
Expand All @@ -1051,6 +1094,10 @@ public void deletePath(String path, Map<String, String> loadProperties) throws U
Path filePath = new Path(pathUri.getPath());
try {
fileSystem.getDFSFileSystem().delete(filePath, true);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while delete path: " + path, e);
throw new UserException("Failed to delete path: " + path, e); // throw unified user exception
} catch (IOException e) {
LOG.error("errors while delete path " + path, e);
throw new UserException("delete path " + path + "error", e);
Expand Down Expand Up @@ -1081,6 +1128,11 @@ public void renamePath(String srcPath, String destPath, Map<String, String> load
if (!isRenameSuccess) {
throw new UserException("failed to rename path from " + srcPath + " to " + destPath);
}
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while rename path from " + srcPath + " to " + destPath, e);
// throw unified user exception
throw new UserException("Failed to rename path from " + srcPath + " to " + destPath, e);
} catch (IOException e) {
LOG.error("errors while rename path from " + srcPath + " to " + destPath, e);
throw new UserException("errors while rename " + srcPath + "to " + destPath, e);
Expand All @@ -1092,8 +1144,16 @@ public boolean checkPathExist(String path, Map<String, String> loadProperties) t
HdfsFs fileSystem = getFileSystem(path, loadProperties, null);
Path filePath = new Path(pathUri.getPath());
try {
<<<<<<< HEAD
boolean isPathExist = fileSystem.getDFSFileSystem().exists(filePath);
return isPathExist;
=======
return fileSystem.getDFSFileSystem().exists(filePath);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while check path exist: " + path, e);
throw new UserException("Failed to check path exist: " + path, e); // throw unified user exception
>>>>>>> 3fcafac8b9 ([BugFix] Avoid hdfs fs manager interrupting the thread when exception occurs (#48403))
} catch (IOException e) {
LOG.error("errors while check path exist: " + path, e);
throw new UserException("errors while check if path " + path + " exist", e);
Expand All @@ -1111,6 +1171,10 @@ public TBrokerFD openReader(String path, long startOffset, Map<String, String> l
TBrokerFD fd = parseUUIDToFD(uuid);
ioStreamManager.putNewInputStream(fd, fsDataInputStream, fileSystem);
return fd;
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while open file " + path, e);
throw new UserException("Failed to open file " + path, e); // throw unified user exception
} catch (IOException e) {
LOG.error("errors while open path", e);
throw new UserException("could not open file " + path, e);
Expand All @@ -1123,6 +1187,11 @@ public byte[] pread(TBrokerFD fd, long offset, long length) throws UserException
long currentStreamOffset;
try {
currentStreamOffset = fsDataInputStream.getPos();
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while get file pos from output stream", e);
// throw unified user exception
throw new UserException("Failed to get file pos from output stream", e);
} catch (IOException e) {
LOG.error("errors while get file pos from output stream", e);
throw new UserException("errors while get file pos from output stream");
Expand All @@ -1134,6 +1203,11 @@ public byte[] pread(TBrokerFD fd, long offset, long length) throws UserException
+ offset + " seek to it");
try {
fsDataInputStream.seek(offset);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while seek file pos from output stream", e);
// throw unified user exception
throw new UserException("Failed to seek file pos from output stream", e);
} catch (IOException e) {
throw new UserException("current read offset " + currentStreamOffset + " is not equal to "
+ offset + ", and could not seek to it");
Expand Down Expand Up @@ -1161,6 +1235,10 @@ public byte[] pread(TBrokerFD fd, long offset, long length) throws UserException
System.arraycopy(buf, 0, smallerBuf, 0, readLength);
return smallerBuf;
}
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while read data from stream", e);
throw new UserException("Failed to read data from stream", e); // throw unified user exception
} catch (IOException e) {
LOG.error("errors while read data from stream", e);
throw new UserException("errors while read data from stream", e);
Expand All @@ -1177,6 +1255,10 @@ public void closeReader(TBrokerFD fd) throws UserException {
synchronized (fsDataInputStream) {
try {
fsDataInputStream.close();
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while close file input stream", e);
throw new UserException("Failed to close file input stream", e); // throw unified user exception
} catch (IOException e) {
LOG.error("errors while close file input stream", e);
throw new UserException("errors while close file input stream", e);
Expand All @@ -1198,6 +1280,10 @@ public TBrokerFD openWriter(String path, Map<String, String> loadProperties) thr
LOG.info("finish a open writer request. fd: " + fd);
ioStreamManager.putNewOutputStream(fd, fsDataOutputStream, fileSystem);
return fd;
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while open file " + path, e);
throw new UserException("Failed to open file " + path, e); // throw unified user exception
} catch (IOException e) {
LOG.error("errors while open path", e);
throw new UserException("could not open file " + path, e);
Expand All @@ -1214,6 +1300,11 @@ public void pwrite(TBrokerFD fd, long offset, byte[] data) throws UserException
}
try {
fsDataOutputStream.write(data);
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while write file " + fd + " to output stream", e);
// throw unified user exception
throw new UserException("Failed to write file " + fd + " to output stream", e);
} catch (IOException e) {
LOG.error("errors while write file " + fd + " to output stream", e);
throw new UserException("errors while write data to output stream", e);
Expand All @@ -1227,6 +1318,11 @@ public void closeWriter(TBrokerFD fd) throws UserException {
try {
fsDataOutputStream.hsync();
fsDataOutputStream.close();
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while close file " + fd + " output stream", e);
// throw unified user exception
throw new UserException("Failed to close file " + fd + " output stream", e);
} catch (IOException e) {
LOG.error("errors while close file " + fd + " output stream", e);
throw new UserException("errors while close file output stream", e);
Expand Down

0 comments on commit 705f7f3

Please sign in to comment.