Skip to content

Commit

Permalink
[fix][hive-source][bug] fix An error occurred reading an empty direct…
Browse files Browse the repository at this point in the history
…ory (#5427)

* [fix][hive-source][bug] fix An error occurred reading an empty directory

* [fix][hive-source][bug] fix An error occurred reading an empty directory
  • Loading branch information
zhilinli123 committed Sep 12, 2023
1 parent cef03f6 commit de7b86a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -153,15 +151,9 @@ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throw
}
}
}

if (fileNames.isEmpty()) {
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_EMPTY,
"The target file list is empty,"
+ "SeaTunnel will not be able to sync empty table, "
+ "please check the configuration parameters such as: [file_filter_pattern]");
if (this.fileNames.isEmpty()) {
log.error("The current directory is empty " + path);
}

return fileNames;
}

Expand Down Expand Up @@ -196,10 +188,12 @@ public SeaTunnelRowType getActualSeaTunnelRowTypeInfo() {

protected Map<String, String> parsePartitionsByPath(String path) {
LinkedHashMap<String, String> partitions = new LinkedHashMap<>();
Arrays.stream(path.split("/", -1))
.filter(split -> split.contains("="))
.map(split -> split.split("=", -1))
.forEach(kv -> partitions.put(kv[0], kv[1]));
if (path != null && !path.isEmpty()) {
Arrays.stream(path.split("/", -1))
.filter(split -> split.contains("="))
.map(split -> split.split("=", -1))
.forEach(kv -> partitions.put(kv[0], kv[1]));
}
return partitions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
"Schmea information is not set or incorrect schmea settings");
}
SeaTunnelRowType userDefinedRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
mergePartitionTypes(
fileNames.size() > 0 ? fileNames.get(0) : null, seaTunnelRowType);
// column projection
if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
// get the read column index from user-defined row type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String pa
@Override
public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
SeaTunnelRowType userDefinedRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
mergePartitionTypes(
fileNames.size() > 0 ? fileNames.get(0) : null, seaTunnelRowType);
if (pluginConfig.hasPath(BaseSourceConfig.DELIMITER.key())) {
fieldDelimiter = pluginConfig.getString(BaseSourceConfig.DELIMITER.key());
} else {
Expand Down

0 comments on commit de7b86a

Please sign in to comment.