From 3f98cd8a16085ec3e8c3ed027a0d88b7cade1f63 Mon Sep 17 00:00:00 2001
From: corgy-w <73771213+corgy-w@users.noreply.github.com>
Date: Fri, 20 Sep 2024 18:09:57 +0800
Subject: [PATCH] [Improve][Connector-V2] Support read archive compress file
(#7633)
---
docs/en/connector-v2/source/CosFile.md | 12 +
docs/en/connector-v2/source/FtpFile.md | 12 +
docs/en/connector-v2/source/HdfsFile.md | 14 +-
docs/en/connector-v2/source/LocalFile.md | 12 +
docs/en/connector-v2/source/OssJindoFile.md | 12 +
docs/en/connector-v2/source/S3File.md | 40 ++-
docs/en/connector-v2/source/SftpFile.md | 12 +
.../file/config/ArchiveCompressFormat.java | 48 +++
.../seatunnel/file/config/BaseSinkConfig.java | 7 +
.../file/config/BaseSourceConfigOptions.java | 6 +
.../seatunnel/file/config/FileFormat.java | 16 +-
.../source/reader/AbstractReadStrategy.java | 144 +++++++++
.../file/source/reader/ExcelReadStrategy.java | 25 +-
.../file/source/reader/JsonReadStrategy.java | 37 ++-
.../file/source/reader/TextReadStrategy.java | 22 +-
.../file/source/reader/XmlReadStrategy.java | 19 +-
.../file/writer/XmlReadStrategyTest.java | 1 -
.../file/cos/source/CosFileSourceFactory.java | 1 +
.../file/ftp/source/FtpFileSourceFactory.java | 1 +
.../hdfs/source/HdfsFileSourceFactory.java | 1 +
.../local/source/LocalFileSourceFactory.java | 1 +
.../file/oss/source/OssFileSourceFactory.java | 1 +
.../file/s3/source/S3FileSourceFactory.java | 1 +
.../sftp/source/SftpFileSourceFactory.java | 1 +
.../e2e/connector/file/ftp/FtpFileIT.java | 7 +
.../src/test/resources/text/e2e-txt.zip | Bin 0 -> 1981 bytes
.../text/ftp_file_zip_text_to_assert.conf | 122 ++++++++
.../e2e/connector/file/local/LocalFileIT.java | 277 ++++++++++++++++++
.../local_excel_multi_zip_to_assert.conf | 119 ++++++++
.../excel/local_excel_zip_to_assert.conf | 119 ++++++++
.../local_file_json_multi_zip_to_assert.conf | 117 ++++++++
.../json/local_file_json_zip_to_assert.conf | 117 ++++++++
...ocal_file_multi_tar_gz_text_to_assert.conf | 117 ++++++++
.../local_file_multi_tar_text_to_assert.conf | 117 ++++++++
.../local_file_multi_zip_text_to_assert.conf | 117 ++++++++
.../local_file_tar_gz_text_to_assert.conf | 117 ++++++++
.../text/local_file_tar_text_to_assert.conf | 117 ++++++++
.../text/local_file_zip_text_to_assert.conf | 117 ++++++++
.../src/test/resources/xml/e2e.xml | 24 ++
.../xml/local_file_xml_to_assert.conf | 101 +++++++
.../xml/local_file_zip_xml_to_assert.conf | 102 +++++++
.../e2e/connector/file/oss/OssFileIT.java | 3 +
.../src/test/resources/text/e2e-text.zip | Bin 0 -> 1977 bytes
.../text/oss_file_zip_text_to_assert.conf | 115 ++++++++
.../e2e/connector/file/s3/S3FileIT.java | 3 +
.../src/test/resources/text/e2e-text.zip | Bin 0 -> 1977 bytes
.../text/s3_file_zip_text_to_assert.conf | 120 ++++++++
.../e2e/connector/file/fstp/SftpFileIT.java | 7 +
.../src/test/resources/text/e2e-text.zip | Bin 0 -> 1977 bytes
.../text/sftp_file_zip_text_to_assert.conf | 122 ++++++++
50 files changed, 2579 insertions(+), 44 deletions(-)
create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ArchiveCompressFormat.java
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/e2e-txt.zip
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_file_zip_text_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_multi_zip_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_zip_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_multi_zip_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_zip_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_tar_gz_text_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_tar_text_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_zip_text_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_tar_gz_text_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_tar_text_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_zip_text_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/e2e.xml
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/local_file_xml_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/local_file_zip_xml_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/e2e-text.zip
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/oss_file_zip_text_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/e2e-text.zip
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/s3_file_zip_text_to_assert.conf
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/e2e-text.zip
create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_zip_text_to_assert.conf
diff --git a/docs/en/connector-v2/source/CosFile.md b/docs/en/connector-v2/source/CosFile.md
index a6de672d378..702439c3062 100644
--- a/docs/en/connector-v2/source/CosFile.md
+++ b/docs/en/connector-v2/source/CosFile.md
@@ -66,6 +66,7 @@ To use this connector you need put hadoop-cos-{hadoop.version}-{version}.jar and
| xml_use_attr_format | boolean | no | - |
| file_filter_pattern | string | no | - |
| compress_codec | string | no | none |
+| archive_compress_codec | string | no | none |
| encoding | string | no | UTF-8 |
| common-options | | no | - |
@@ -284,6 +285,17 @@ The compress codec of files and the details that supported as the following show
- orc/parquet:
automatically recognizes the compression type, no additional settings required.
+### archive_compress_codec [string]
+
+The compress codec of archive files and the details that supported as the following shown:
+
+| archive_compress_codec | file_format | archive_compress_suffix |
+|------------------------|--------------------|-------------------------|
+| ZIP | txt,json,excel,xml | .zip |
+| TAR | txt,json,excel,xml | .tar |
+| TAR_GZ | txt,json,excel,xml | .tar.gz |
+| NONE | all | .* |
+
### encoding [string]
Only used when file_format_type is json,text,csv,xml.
diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md
index 6a18d62f194..656f7a00422 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -60,6 +60,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| xml_use_attr_format | boolean | no | - |
| file_filter_pattern | string | no | - |
| compress_codec | string | no | none |
+| archive_compress_codec | string | no | none |
| encoding | string | no | UTF-8 |
| common-options | | no | - |
@@ -265,6 +266,17 @@ The compress codec of files and the details that supported as the following show
- orc/parquet:
automatically recognizes the compression type, no additional settings required.
+### archive_compress_codec [string]
+
+The compress codec of archive files and the details that supported as the following shown:
+
+| archive_compress_codec | file_format | archive_compress_suffix |
+|------------------------|--------------------|-------------------------|
+| ZIP | txt,json,excel,xml | .zip |
+| TAR | txt,json,excel,xml | .tar |
+| TAR_GZ | txt,json,excel,xml | .tar.gz |
+| NONE | all | .* |
+
### encoding [string]
Only used when file_format_type is json,text,csv,xml.
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 0c0ea61ef11..7413c0428b8 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -63,7 +63,8 @@ Read data from hdfs file system.
| xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only used when file_format is xml. |
| xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only used when file_format is xml. |
| compress_codec | string | no | none | The compress codec of files |
-| encoding | string | no | UTF-8 |
+| archive_compress_codec | string | no | none |
+| encoding | string | no | UTF-8 | |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
### delimiter/field_delimiter [string]
@@ -80,6 +81,17 @@ The compress codec of files and the details that supported as the following show
- orc/parquet:
automatically recognizes the compression type, no additional settings required.
+### archive_compress_codec [string]
+
+The compress codec of archive files and the details that supported as the following shown:
+
+| archive_compress_codec | file_format | archive_compress_suffix |
+|------------------------|--------------------|-------------------------|
+| ZIP | txt,json,excel,xml | .zip |
+| TAR | txt,json,excel,xml | .tar |
+| TAR_GZ | txt,json,excel,xml | .tar.gz |
+| NONE | all | .* |
+
### encoding [string]
Only used when file_format_type is json,text,csv,xml.
diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index 570331bba54..6d11b992e3a 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -60,6 +60,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| xml_use_attr_format | boolean | no | - |
| file_filter_pattern | string | no | - |
| compress_codec | string | no | none |
+| archive_compress_codec | string | no | none |
| encoding | string | no | UTF-8 |
| common-options | | no | - |
| tables_configs | list | no | used to define a multiple table task |
@@ -263,6 +264,17 @@ The compress codec of files and the details that supported as the following show
- orc/parquet:
automatically recognizes the compression type, no additional settings required.
+### archive_compress_codec [string]
+
+The compress codec of archive files and the details that supported as the following shown:
+
+| archive_compress_codec | file_format | archive_compress_suffix |
+|------------------------|--------------------|-------------------------|
+| ZIP | txt,json,excel,xml | .zip |
+| TAR | txt,json,excel,xml | .tar |
+| TAR_GZ | txt,json,excel,xml | .tar.gz |
+| NONE | all | .* |
+
### encoding [string]
Only used when file_format_type is json,text,csv,xml.
diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md
index 83b51d1462a..d5bd6d14fa3 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -70,6 +70,7 @@ It only supports hadoop version **2.9.X+**.
| xml_use_attr_format | boolean | no | - |
| file_filter_pattern | string | no | - |
| compress_codec | string | no | none |
+| archive_compress_codec | string | no | none |
| encoding | string | no | UTF-8 |
| common-options | | no | - |
@@ -276,6 +277,17 @@ The compress codec of files and the details that supported as the following show
- orc/parquet:
automatically recognizes the compression type, no additional settings required.
+### archive_compress_codec [string]
+
+The compress codec of archive files and the details that supported as the following shown:
+
+| archive_compress_codec | file_format | archive_compress_suffix |
+|------------------------|--------------------|-------------------------|
+| ZIP | txt,json,excel,xml | .zip |
+| TAR | txt,json,excel,xml | .tar |
+| TAR_GZ | txt,json,excel,xml | .tar.gz |
+| NONE | all | .* |
+
### encoding [string]
Only used when file_format_type is json,text,csv,xml.
diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md
index bd755068c2a..d280d6dc7f2 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -20,14 +20,14 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] file format type
- - [x] text
- - [x] csv
- - [x] parquet
- - [x] orc
- - [x] json
- - [x] excel
- - [x] xml
- - [x] binary
+ - [x] text
+ - [x] csv
+ - [x] parquet
+ - [x] orc
+ - [x] json
+ - [x] excel
+ - [x] xml
+ - [x] binary
## Description
@@ -196,7 +196,7 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto
## Options
-| name | type | required | default value | Description |
+| name | type | required | default value | Description |
|---------------------------------|---------|----------|-------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| path | string | yes | - | The s3 path that needs to be read can have sub paths, but the sub paths need to meet certain format requirements. Specific requirements can be referred to "parse_partition_from_path" option |
| file_format_type | string | yes | - | File type, supported as the following file types: `text` `csv` `parquet` `orc` `json` `excel` `xml` `binary` |
@@ -217,8 +217,9 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto
| sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. |
| xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only valid for XML files. |
| xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only valid for XML files. |
-| compress_codec | string | no | none |
-| encoding | string | no | UTF-8 |
+| compress_codec | string | no | none | |
+| archive_compress_codec | string | no | none | |
+| encoding | string | no | UTF-8 | |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
### delimiter/field_delimiter [string]
@@ -235,6 +236,17 @@ The compress codec of files and the details that supported as the following show
- orc/parquet:
automatically recognizes the compression type, no additional settings required.
+### archive_compress_codec [string]
+
+The compress codec of archive files and the details that supported as the following shown:
+
+| archive_compress_codec | file_format | archive_compress_suffix |
+|------------------------|--------------------|-------------------------|
+| ZIP | txt,json,excel,xml | .zip |
+| TAR | txt,json,excel,xml | .tar |
+| TAR_GZ | txt,json,excel,xml | .tar.gz |
+| NONE | all | .* |
+
### encoding [string]
Only used when file_format_type is json,text,csv,xml.
@@ -346,8 +358,8 @@ sink {
### Next version
- [Feature] Support S3A protocol ([3632](https://github.com/apache/seatunnel/pull/3632))
- - Allow user to add additional hadoop-s3 parameters
- - Allow the use of the s3a protocol
- - Decouple hadoop-aws dependencies
+ - Allow user to add additional hadoop-s3 parameters
+ - Allow the use of the s3a protocol
+ - Decouple hadoop-aws dependencies
- [Feature]Set S3 AK to optional ([3688](https://github.com/apache/seatunnel/pull/))
diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md
index 6cf19053c6f..6d6ec5ea8db 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -92,6 +92,7 @@ The File does not have a specific type list, and we can indicate which SeaTunnel
| xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only used when file_format is xml. |
| schema | Config | No | - | Please check #schema below |
| compress_codec | String | No | None | The compress codec of files and the details that supported as the following shown:
- txt: `lzo` `None`
- json: `lzo` `None`
- csv: `lzo` `None`
- orc: `lzo` `snappy` `lz4` `zlib` `None`
- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `None`
Tips: excel type does Not support any compression format |
+| archive_compress_codec | string | no | none |
| encoding | string | no | UTF-8 |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
@@ -176,6 +177,17 @@ The compress codec of files and the details that supported as the following show
- orc/parquet:
automatically recognizes the compression type, no additional settings required.
+### archive_compress_codec [string]
+
+The compress codec of archive files and the details that supported as the following shown:
+
+| archive_compress_codec | file_format | archive_compress_suffix |
+|------------------------|--------------------|-------------------------|
+| ZIP | txt,json,excel,xml | .zip |
+| TAR | txt,json,excel,xml | .tar |
+| TAR_GZ | txt,json,excel,xml | .tar.gz |
+| NONE | all | .* |
+
### encoding [string]
Only used when file_format_type is json,text,csv,xml.
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ArchiveCompressFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ArchiveCompressFormat.java
new file mode 100644
index 00000000000..da30887a824
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ArchiveCompressFormat.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.config;
+
+/**
+ * ZIP etc.:
+ *
+ *
Archive format: ZIP can compress multiple files and directories into a single archive.
+ *
+ *
+ * Gzip etc.:
+ *
+ *
Single file compression: Gzip compresses only one file at a time, without creating an archive.
+ *
+ *
+ * Distinction: {@link org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat}
+ */
+public enum ArchiveCompressFormat {
+ NONE(""),
+ ZIP(".zip"),
+ TAR(".tar"),
+ TAR_GZ(".tar.gz"),
+ ;
+ private final String archiveCompressCodec;
+
+ ArchiveCompressFormat(String archiveCompressCodec) {
+ this.archiveCompressCodec = archiveCompressCodec;
+ }
+
+ public String getArchiveCompressCodec() {
+ return archiveCompressCodec;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 35f1e4ba167..2ec8ac5db4c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -54,6 +54,13 @@ public class BaseSinkConfig extends KerberosConfig {
.defaultValue(CompressFormat.NONE)
.withDescription("Compression codec");
+ // TODOļ¼Compression is supported during write
+ public static final Option ARCHIVE_COMPRESS_CODEC =
+ Options.key("archive_compress_codec")
+ .enumType(ArchiveCompressFormat.class)
+ .defaultValue(ArchiveCompressFormat.NONE)
+ .withDescription("Archive compression codec");
+
public static final Option TXT_COMPRESS =
Options.key("compress_codec")
.singleChoice(
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
index a96fc4658c3..ddcc13d47d4 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
@@ -164,6 +164,12 @@ public class BaseSourceConfigOptions {
.defaultValue(CompressFormat.NONE)
.withDescription("Compression codec");
+ public static final Option ARCHIVE_COMPRESS_CODEC =
+ Options.key("archive_compress_codec")
+ .enumType(ArchiveCompressFormat.class)
+ .defaultValue(ArchiveCompressFormat.NONE)
+ .withDescription("Archive compression codec");
+
public static final Option>> TABLE_CONFIGS =
Options.key("tables_configs")
.type(new TypeReference>>() {})
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 819dc03e870..2aa7cad99cf 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -36,6 +36,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.XmlReadStrategy;
import java.io.Serializable;
+import java.util.Arrays;
public enum FileFormat implements Serializable {
CSV("csv") {
@@ -94,7 +95,7 @@ public ReadStrategy getReadStrategy() {
return new JsonReadStrategy();
}
},
- EXCEL("xlsx") {
+ EXCEL("xlsx", "xls") {
@Override
public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
return new ExcelWriteStrategy(fileSinkConfig);
@@ -128,14 +129,21 @@ public ReadStrategy getReadStrategy() {
}
};
- private final String suffix;
+ private final String[] suffix;
- FileFormat(String suffix) {
+ FileFormat(String... suffix) {
this.suffix = suffix;
}
public String getSuffix() {
- return "." + suffix;
+ if (suffix.length > 0) {
+ return "." + suffix[0];
+ }
+ return "";
+ }
+
+ public String[] getAllSuffix() {
+ return Arrays.stream(suffix).map(suffix -> "." + suffix).toArray(String[]::new);
}
public ReadStrategy getReadStrategy() {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 33b688af060..3e71a3b2932 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -19,18 +19,28 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.hadoop.fs.FileStatus;
import lombok.extern.slf4j.Slf4j;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -42,6 +52,8 @@
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
@Slf4j
public abstract class AbstractReadStrategy implements ReadStrategy {
@@ -68,6 +80,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
protected long skipHeaderNumber = BaseSourceConfigOptions.SKIP_HEADER_ROW_NUMBER.defaultValue();
protected transient boolean isKerberosAuthorization = false;
protected HadoopFileSystemProxy hadoopFileSystemProxy;
+ protected ArchiveCompressFormat archiveCompressFormat =
+ BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC.defaultValue();
protected Pattern pattern;
@@ -124,6 +138,13 @@ public List getFileNamesByPath(String path) throws IOException {
@Override
public void setPluginConfig(Config pluginConfig) {
this.pluginConfig = pluginConfig;
+ // Determine whether it is a compressed file
+ if (pluginConfig.hasPath(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC.key())) {
+ String archiveCompressCodec =
+ pluginConfig.getString(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC.key());
+ archiveCompressFormat =
+ ArchiveCompressFormat.valueOf(archiveCompressCodec.toUpperCase());
+ }
if (pluginConfig.hasPath(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH.key())) {
isMergePartition =
pluginConfig.getBoolean(
@@ -153,6 +174,104 @@ public SeaTunnelRowType getActualSeaTunnelRowTypeInfo() {
return isMergePartition ? seaTunnelRowTypeWithPartition : seaTunnelRowType;
}
+ protected void resolveArchiveCompressedInputStream(
+ String path,
+ String tableId,
+ Collector output,
+ Map partitionsMap,
+ FileFormat fileFormat)
+ throws IOException {
+ switch (archiveCompressFormat) {
+ case ZIP:
+ try (ZipInputStream zis =
+ new ZipInputStream(hadoopFileSystemProxy.getInputStream(path))) {
+ ZipEntry entry;
+ while ((entry = zis.getNextEntry()) != null) {
+ if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
+ readProcess(
+ path,
+ tableId,
+ output,
+ copyInputStream(zis),
+ partitionsMap,
+ entry.getName());
+ }
+ zis.closeEntry();
+ }
+ }
+ break;
+ case TAR:
+ try (TarArchiveInputStream tarInput =
+ new TarArchiveInputStream(hadoopFileSystemProxy.getInputStream(path))) {
+ TarArchiveEntry entry;
+ while ((entry = tarInput.getNextTarEntry()) != null) {
+ if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
+ readProcess(
+ path,
+ tableId,
+ output,
+ copyInputStream(tarInput),
+ partitionsMap,
+ entry.getName());
+ }
+ }
+ }
+ break;
+ case TAR_GZ:
+ try (GzipCompressorInputStream gzipIn =
+ new GzipCompressorInputStream(
+ hadoopFileSystemProxy.getInputStream(path));
+ TarArchiveInputStream tarIn = new TarArchiveInputStream(gzipIn)) {
+
+ TarArchiveEntry entry;
+ while ((entry = tarIn.getNextTarEntry()) != null) {
+ if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
+ readProcess(
+ path,
+ tableId,
+ output,
+ copyInputStream(tarIn),
+ partitionsMap,
+ entry.getName());
+ }
+ }
+ }
+ break;
+ case NONE:
+ readProcess(
+ path,
+ tableId,
+ output,
+ hadoopFileSystemProxy.getInputStream(path),
+ partitionsMap,
+ path);
+ break;
+ default:
+ log.warn(
+ "The file does not support this archive compress type: {}",
+ archiveCompressFormat);
+ readProcess(
+ path,
+ tableId,
+ output,
+ hadoopFileSystemProxy.getInputStream(path),
+ partitionsMap,
+ path);
+ }
+ }
+
+ protected void readProcess(
+ String path,
+ String tableId,
+ Collector output,
+ InputStream inputStream,
+ Map partitionsMap,
+ String currentFileName)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "The file does not support the compressed file reading");
+ }
+
protected Map parsePartitionsByPath(String path) {
LinkedHashMap partitions = new LinkedHashMap<>();
Arrays.stream(path.split("/", -1))
@@ -202,6 +321,31 @@ protected boolean filterFileByPattern(FileStatus fileStatus) {
return true;
}
+ protected static InputStream copyInputStream(InputStream inputStream) throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ byte[] buffer = new byte[1024];
+ int bytesRead;
+
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ byteArrayOutputStream.write(buffer, 0, bytesRead);
+ }
+
+ return new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
+ }
+
+ protected boolean checkFileType(String fileName, FileFormat fileFormat) {
+ for (String suffix : fileFormat.getAllSuffix()) {
+ if (fileName.endsWith(suffix)) {
+ return true;
+ }
+ }
+
+ log.warn(
+ "The {} file format is incorrect. Please check the format in the compressed file.",
+ fileName);
+ return false;
+ }
+
@Override
public void close() throws IOException {
try {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
index 06225f9d33b..b794879ee70 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
@@ -30,9 +30,9 @@
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellType;
@@ -44,6 +44,8 @@
import lombok.SneakyThrows;
+import java.io.IOException;
+import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
@@ -73,12 +75,23 @@ public class ExcelReadStrategy extends AbstractReadStrategy {
@Override
public void read(String path, String tableId, Collector output) {
Map partitionsMap = parsePartitionsByPath(path);
- FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path);
+ resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.EXCEL);
+ }
+
+ @Override
+ protected void readProcess(
+ String path,
+ String tableId,
+ Collector output,
+ InputStream inputStream,
+ Map partitionsMap,
+ String currentFileName)
+ throws IOException {
Workbook workbook;
- if (path.endsWith(".xls")) {
- workbook = new HSSFWorkbook(file);
- } else if (path.endsWith(".xlsx")) {
- workbook = new XSSFWorkbook(file);
+ if (currentFileName.endsWith(".xls")) {
+ workbook = new HSSFWorkbook(inputStream);
+ } else if (currentFileName.endsWith(".xlsx")) {
+ workbook = new XSSFWorkbook(inputStream);
} else {
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
index 6c58e368721..982419266f5 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -22,11 +22,12 @@
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
@@ -76,24 +77,36 @@ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
public void read(String path, String tableId, Collector output)
throws FileConnectorException, IOException {
Map partitionsMap = parsePartitionsByPath(path);
- InputStream inputStream;
+ resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.JSON);
+ }
+
+ @Override
+ public void readProcess(
+ String path,
+ String tableId,
+ Collector output,
+ InputStream inputStream,
+ Map partitionsMap,
+ String currentFileName)
+ throws IOException {
+ InputStream actualInputStream;
switch (compressFormat) {
case LZO:
LzopCodec lzo = new LzopCodec();
- inputStream = lzo.createInputStream(hadoopFileSystemProxy.getInputStream(path));
+ actualInputStream = lzo.createInputStream(inputStream);
break;
case NONE:
- inputStream = hadoopFileSystemProxy.getInputStream(path);
+ actualInputStream = inputStream;
break;
default:
log.warn(
- "Text file does not support this compress type: {}",
+ "Json file does not support this compress type: {}",
compressFormat.getCompressCodec());
- inputStream = hadoopFileSystemProxy.getInputStream(path);
+ actualInputStream = inputStream;
break;
}
try (BufferedReader reader =
- new BufferedReader(new InputStreamReader(inputStream, encoding))) {
+ new BufferedReader(new InputStreamReader(actualInputStream, encoding))) {
reader.lines()
.forEach(
line -> {
@@ -110,8 +123,14 @@ public void read(String path, String tableId, Collector output)
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
} catch (IOException e) {
- throw CommonError.fileOperationFailed(
- "JsonFile", "read", path, e);
+ String errorMsg =
+ String.format(
+ "Deserialize this jsonFile data [%s] failed, please check the origin data",
+ line);
+ throw new FileConnectorException(
+ FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,
+ errorMsg,
+ e);
}
});
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index bd6809aad1c..2b722593770 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -67,25 +67,37 @@ public class TextReadStrategy extends AbstractReadStrategy {
public void read(String path, String tableId, Collector output)
throws FileConnectorException, IOException {
Map partitionsMap = parsePartitionsByPath(path);
- InputStream inputStream;
+ resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.TEXT);
+ }
+
+ @Override
+ public void readProcess(
+ String path,
+ String tableId,
+ Collector output,
+ InputStream inputStream,
+ Map partitionsMap,
+ String currentFileName)
+ throws IOException {
+ InputStream actualInputStream;
switch (compressFormat) {
case LZO:
LzopCodec lzo = new LzopCodec();
- inputStream = lzo.createInputStream(hadoopFileSystemProxy.getInputStream(path));
+ actualInputStream = lzo.createInputStream(inputStream);
break;
case NONE:
- inputStream = hadoopFileSystemProxy.getInputStream(path);
+ actualInputStream = inputStream;
break;
default:
log.warn(
"Text file does not support this compress type: {}",
compressFormat.getCompressCodec());
- inputStream = hadoopFileSystemProxy.getInputStream(path);
+ actualInputStream = inputStream;
break;
}
try (BufferedReader reader =
- new BufferedReader(new InputStreamReader(inputStream, encoding))) {
+ new BufferedReader(new InputStreamReader(actualInputStream, encoding))) {
reader.lines()
.skip(skipHeaderNumber)
.forEach(
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
index 814a9fbdb5a..a553a4f9d06 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
@@ -32,6 +32,7 @@
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
@@ -50,6 +51,7 @@
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
@@ -88,13 +90,22 @@ public void init(HadoopConf conf) {
public void read(String path, String tableId, Collector output)
throws IOException, FileConnectorException {
Map partitionsMap = parsePartitionsByPath(path);
+ resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.XML);
+ }
+
+ @Override
+ public void readProcess(
+ String path,
+ String tableId,
+ Collector output,
+ InputStream inputStream,
+ Map partitionsMap,
+ String currentFileName)
+ throws IOException {
SAXReader saxReader = new SAXReader();
Document document;
try {
- document =
- saxReader.read(
- new InputStreamReader(
- hadoopFileSystemProxy.getInputStream(path), encoding));
+ document = saxReader.read(new InputStreamReader(inputStream, encoding));
} catch (DocumentException e) {
throw new FileConnectorException(
FileConnectorErrorCode.FILE_READ_FAILED, "Failed to read xml file: " + path, e);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
index 0679bade2d6..8bb2e483896 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
@@ -128,7 +128,6 @@ public static class TestCollector implements Collector {
@Override
public void collect(SeaTunnelRow record) {
- System.out.println(record);
rows.add(record);
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
index b8405ace96e..388d245047b 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
@@ -71,6 +71,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.TIME_FORMAT)
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index 249deac26da..112cccc3afa 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -72,6 +72,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
.optional(FtpConfigOptions.FTP_CONNECTION_MODE)
+ .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index e02f7ad42c2..88e46841801 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -68,6 +68,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.TIME_FORMAT)
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 499ebf45f19..fb76d276d58 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -77,6 +77,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.DATETIME_FORMAT)
.optional(BaseSourceConfigOptions.TIME_FORMAT)
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
+ .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 6fb4a55c5d2..0eddf05693a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -84,6 +84,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.TIME_FORMAT)
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index 93d879e559c..d1107d46cf7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -86,6 +86,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.TIME_FORMAT)
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index f66db3996b7..c0f6aefda30 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -71,6 +71,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.TIME_FORMAT)
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
.build();
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 1b89a0bcc7c..165d48e6313 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -104,6 +104,11 @@ public void startUp() throws Exception {
"/home/vsftpd/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
ftpContainer);
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e-txt.zip",
+ "/home/vsftpd/seatunnel/tmp/seatunnel/read/zip/txt/single/e2e-txt.zip",
+ ftpContainer);
+
ContainerUtil.copyFileIntoContainers(
"/excel/e2e.xlsx",
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
@@ -141,6 +146,8 @@ public void testFtpFileReadAndWrite(TestContainer container)
helper.execute("/text/ftp_file_text_to_assert.conf");
// test read ftp text file with projection
helper.execute("/text/ftp_file_text_projection_to_assert.conf");
+ // test read ftp zip text file
+ helper.execute("/text/ftp_file_zip_text_to_assert.conf");
// test write ftp json file
helper.execute("/json/fake_to_ftp_file_json.conf");
// test read ftp json file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/e2e-txt.zip b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/e2e-txt.zip
new file mode 100644
index 0000000000000000000000000000000000000000..421e71a8125c3bff1db277336ee74e5b659275c2
GIT binary patch
literal 1981
zcmai#YdjN*8^`B9m*##;MrLCwY~~(~SlblK(cFqUu2XKMMV7-xQyeTLat%o~qtI>c
z_sda4$z>SFB03gPt@w9d{m*%IzR!#2_xb!j&*yphCAf%*s{nvNAVA6A%p33zbbsbU
z2y0VoYcrH3(liJe$8gJA`(2Hx?k$PiD%3n0qWg{{3zZf&OR;VZLVc8^{xM>O(N
zJ_ppcOWu9SH`7r)CKSDEZ&G#5-na;#+Z7tn91Sxi6;{p_JwI(z!@81z`;*!<5cUUl
zP8t_zSluXFlYm?{X(PKd{pcL4^NpJssmW1D!vju^nrqazKI*w{jy+iy
zF$_;G$;}yh`jHMd&qZ3={YeLxC#e?5L184ym`_H%;K=A8m175%S*j|Fms6jq?qDXF
z>)w#;ZT40OLc2NBa!-P#70Q6LF|EN_|18{Cg%q`Jelz#e{!>b4Il^ozVPkD@srIgM
zv0GIL#^FwBGVeC0d-_(aF25hB^;9JqH*C{hA4y|(04-ij<&an7A8VQ6Zo}b+V)Spl
zF**k|gK3PqyC|Q|0{vAt6iDr9G&1TngcPNz5>6^Y(lc+>#f2ln>t=s2&MsoZK2ez)
z7A+(3dG!Siw-vmxNw$GwFBRaXi0RGq5
z0)DxWACYOtB(r}>V73;wyqoD1Oz_t>9ue?$@?~|Q{QR8BSV*4+EJdb$xvda=LS?0Y
zJ%C^n3(pQ{ow;LhGP~`PaiV2w<6
zdKjV#AryCG4vM^+H?8j%#yKfbo-@{tQzC{mFfgg1#-!%!r7JdF?Y@>y-j$ENcx0sA
z@aX3jLF|%FIk~17^x;RIv`H=Zjj-X_@)IA5XZ%XZYf9W!!y@ZM+?C^1uP?_*YV*1#
zeRy2l!9Qfgda=s^@OkC^ysky7ee*er?N&sip~)$o67fN-t<3RxC_&HkmMIR1F_K2G
zE*m}_Fzm<#8w=!w7xD4s5*HUQd_myYD4XN8urlAI3dxE|-7At|Ulg)1c}a^R3~5o`
zd9ibBnBphm=1P;)`{M`PxcFW~RJ$CvYnq|0Uf$69@ScfUG`
z;xRaSfi!xc4E0Jxxpe?OT?o){LvQc4Mm_o#P&Hxph>t*aR85KTT;bC$-*T)9$TeMO``{;fRLH
z3_$VMvqIjI_%QrDoUdC}nOGO`qAniV5o_PMDUS01FL
z{J3-F8Qx%yC3RY&ePSgA+T0^NSMa^wy*xju44Y1+c%BL*fVOuJOm>IQP0t56f~iScl^p@E*j;
zr~XmBN1>apSz2mIx0gDU^1tFV3W~a=Krg%&8&S189s1LH)iP$ZfRBza^*tD)ht=lr
zL~xFD-z?jCm4&h#4VqVi>|6u2#D^sKlNxF+#OHlT*w{W%?-SVQSGC7%RExO7R(rd
zi1%UXVY=4{n%S{H*^;X$vr3BYV&Up;6IB;kqb-iRU;cg2*XMw`uK{b#S8f@ciV9&3_Nnpk-%4yEZK
zrhSwLR-o#l?JVpM7qZprA1HVn&334cZrG^M>;NU5(N8`4u}){7De`>~I%`(|B&W>q
zzNibE)OUCcV*KpH4{bygO)p*X)`NzdEV<3Tw&!=SWvCv%;j67+LH&Hzz}~!5J4XmE
zQov7FU# files, String name) throws IOException {
+ if (files == null || files.isEmpty()) {
+ throw new IllegalArgumentException("File list is empty or invalid");
+ }
+
+ File firstFile = files.get(0);
+ Path zipFilePath = Paths.get(firstFile.getParent(), String.format("%s.zip", name));
+
+ try (ZipOutputStream zos = new ZipOutputStream(Files.newOutputStream(zipFilePath))) {
+ for (File file : files) {
+ if (file.isDirectory()) {
+ Path dirPath = file.toPath();
+ Files.walkFileTree(
+ dirPath,
+ new SimpleFileVisitor() {
+ @Override
+ public FileVisitResult visitFile(
+ Path file, BasicFileAttributes attrs) throws IOException {
+ addToZipFile(file, dirPath.getParent(), zos);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } else {
+ addToZipFile(file.toPath(), file.getParentFile().toPath(), zos);
+ }
+ }
+ }
+
+ return zipFilePath;
+ }
+
+ private void addToZipFile(Path file, Path baseDir, ZipOutputStream zos) throws IOException {
+ Path relativePath = baseDir.relativize(file);
+ ZipEntry zipEntry;
+
+ if (relativePath.toString().contains(".")) {
+ String fileName = relativePath.toString().split("\\.")[0];
+ String suffix = relativePath.toString().split("\\.")[1];
+ zipEntry =
+ new ZipEntry(
+ new Random().nextInt()
+ + fileName
+ + "_"
+ + System.currentTimeMillis()
+ + "."
+ + suffix);
+ zos.putNextEntry(zipEntry);
+ }
+ Files.copy(file, zos);
+ zos.closeEntry();
+ }
+
+ public Path convertToTarFile(List files, String name) throws IOException {
+ if (files == null || files.isEmpty()) {
+ throw new IllegalArgumentException("File list is empty or invalid");
+ }
+
+ File firstFile = files.get(0);
+ Path tarFilePath = Paths.get(firstFile.getParent(), String.format("%s.tar", name));
+
+ try (TarArchiveOutputStream tarOut =
+ new TarArchiveOutputStream(Files.newOutputStream(tarFilePath))) {
+ for (File file : files) {
+ if (file.isDirectory()) {
+ Path dirPath = file.toPath();
+ Files.walkFileTree(
+ dirPath,
+ new SimpleFileVisitor() {
+ @Override
+ public FileVisitResult visitFile(
+ Path file, BasicFileAttributes attrs) throws IOException {
+ addToTarFile(file, dirPath.getParent(), tarOut);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } else {
+ addToTarFile(file.toPath(), file.getParentFile().toPath(), tarOut);
+ }
+ }
+ }
+
+ return tarFilePath;
+ }
+
+ private void addToTarFile(Path file, Path baseDir, TarArchiveOutputStream tarOut)
+ throws IOException {
+ Path relativePath = baseDir.relativize(file);
+
+ TarArchiveEntry tarEntry;
+ if (relativePath.toString().contains(".")) {
+ String fileName = relativePath.toString().split("\\.")[0];
+ String suffix = relativePath.toString().split("\\.")[1];
+ String entryName =
+ new Random().nextInt()
+ + fileName
+ + "_"
+ + System.currentTimeMillis()
+ + "."
+ + suffix;
+ tarEntry = new TarArchiveEntry(file.toFile(), entryName);
+ } else {
+ tarEntry = new TarArchiveEntry(file.toFile(), relativePath.toString());
+ }
+
+ tarOut.putArchiveEntry(tarEntry);
+ Files.copy(file, tarOut);
+ tarOut.closeArchiveEntry();
+ }
+
+ public Path convertToTarGzFile(List files, String name) throws IOException {
+ if (files == null || files.isEmpty()) {
+ throw new IllegalArgumentException("File list is empty or invalid");
+ }
+
+ File firstFile = files.get(0);
+ Path tarGzFilePath = Paths.get(firstFile.getParent(), String.format("%s.tar.gz", name));
+
+ // Create a GZIP output stream wrapping the tar output stream
+ try (GZIPOutputStream gzipOut = new GZIPOutputStream(Files.newOutputStream(tarGzFilePath));
+ TarArchiveOutputStream tarOut = new TarArchiveOutputStream(gzipOut)) {
+
+ for (File file : files) {
+ if (file.isDirectory()) {
+ Path dirPath = file.toPath();
+ Files.walkFileTree(
+ dirPath,
+ new SimpleFileVisitor() {
+ @Override
+ public FileVisitResult visitFile(
+ Path file, BasicFileAttributes attrs) throws IOException {
+ addToTarFile(file, dirPath.getParent(), tarOut);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } else {
+ addToTarFile(file.toPath(), file.getParentFile().toPath(), tarOut);
+ }
+ }
+ }
+
+ return tarGzFilePath;
+ }
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_multi_zip_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_multi_zip_to_assert.conf
new file mode 100644
index 00000000000..9a7fd249839
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_multi_zip_to_assert.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/zip/excel/multifile"
+ result_table_name = "fake"
+ file_format_type = excel
+ archive_compress_codec = "zip"
+ field_delimiter = ;
+ skip_header_row_number = 1
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_zip_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_zip_to_assert.conf
new file mode 100644
index 00000000000..18a6d6d64de
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_zip_to_assert.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/zip/excel/single"
+ result_table_name = "fake"
+ file_format_type = excel
+ archive_compress_codec = "zip"
+ field_delimiter = ;
+ skip_header_row_number = 1
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_multi_zip_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_multi_zip_to_assert.conf
new file mode 100644
index 00000000000..85fda2433e9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_multi_zip_to_assert.conf
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/zip/json/multifile"
+ file_format_type = "json"
+ archive_compress_codec = "zip"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ C_MAP = "map"
+ C_ARRAY = "array"
+ C_STRING = string
+ C_BOOLEAN = boolean
+ C_TINYINT = tinyint
+ C_SMALLINT = smallint
+ C_INT = int
+ C_BIGINT = bigint
+ C_FLOAT = float
+ C_DOUBLE = double
+ C_BYTES = bytes
+ C_DATE = date
+ C_DECIMAL = "decimal(38, 18)"
+ C_TIMESTAMP = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_zip_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_zip_to_assert.conf
new file mode 100644
index 00000000000..c7288afb507
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_zip_to_assert.conf
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/zip/json/single"
+ file_format_type = "json"
+ archive_compress_codec = "zip"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ C_MAP = "map"
+ C_ARRAY = "array"
+ C_STRING = string
+ C_BOOLEAN = boolean
+ C_TINYINT = tinyint
+ C_SMALLINT = smallint
+ C_INT = int
+ C_BIGINT = bigint
+ C_FLOAT = float
+ C_DOUBLE = double
+ C_BYTES = bytes
+ C_DATE = date
+ C_DECIMAL = "decimal(38, 18)"
+ C_TIMESTAMP = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_tar_gz_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_tar_gz_text_to_assert.conf
new file mode 100644
index 00000000000..b0170755cd1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_tar_gz_text_to_assert.conf
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/tar_gz/txt/multifile"
+ file_format_type = "text"
+ archive_compress_codec = "tar_gz"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_tar_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_tar_text_to_assert.conf
new file mode 100644
index 00000000000..71fff16843f
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_tar_text_to_assert.conf
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/tar/txt/multifile"
+ file_format_type = "text"
+ archive_compress_codec = "tar"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_zip_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_zip_text_to_assert.conf
new file mode 100644
index 00000000000..f6d2f9931a9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_multi_zip_text_to_assert.conf
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/zip/txt/multifile"
+ file_format_type = "text"
+ archive_compress_codec = "zip"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 10
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_tar_gz_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_tar_gz_text_to_assert.conf
new file mode 100644
index 00000000000..ba947fc21cf
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_tar_gz_text_to_assert.conf
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/tar_gz/txt/single"
+ file_format_type = "text"
+ archive_compress_codec = "tar_gz"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_tar_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_tar_text_to_assert.conf
new file mode 100644
index 00000000000..d4c872a7836
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_tar_text_to_assert.conf
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/tar/txt/single"
+ file_format_type = "text"
+ archive_compress_codec = "tar"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_zip_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_zip_text_to_assert.conf
new file mode 100644
index 00000000000..ac4929ef91a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_zip_text_to_assert.conf
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/zip/txt/single"
+ file_format_type = "text"
+ archive_compress_codec = "zip"
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/e2e.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/e2e.xml
new file mode 100644
index 00000000000..3f75fe390a2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/e2e.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/local_file_xml_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/local_file_xml_to_assert.conf
new file mode 100644
index 00000000000..310d0ed0b83
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/local_file_xml_to_assert.conf
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/xml"
+ file_format_type = "xml"
+ xml_row_tag = "RECORD"
+ xml_use_attr_format = true
+ schema = {
+ fields {
+ c_bytes = "tinyint"
+ c_short = "smallint"
+ c_int = "int"
+ c_bigint = "bigint"
+ c_string = "string"
+ c_double = "double"
+ c_float = "float"
+ c_decimal = "decimal(10, 2)"
+ c_boolean = "boolean"
+ c_map = "map"
+ c_array = "array"
+ c_date = "date"
+ c_datetime = "timestamp"
+ c_time = "time"
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/local_file_zip_xml_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/local_file_zip_xml_to_assert.conf
new file mode 100644
index 00000000000..bd3f11b71f1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/xml/local_file_zip_xml_to_assert.conf
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/zip/xml/single"
+ file_format_type = "xml"
+ archive_compress_codec = "zip"
+ xml_row_tag = "RECORD"
+ xml_use_attr_format = true
+ schema = {
+ fields {
+ c_bytes = "tinyint"
+ c_short = "smallint"
+ c_int = "int"
+ c_bigint = "bigint"
+ c_string = "string"
+ c_double = "double"
+ c_float = "float"
+ c_decimal = "decimal(10, 2)"
+ c_boolean = "boolean"
+ c_map = "map"
+ c_array = "array"
+ c_date = "date"
+ c_datetime = "timestamp"
+ c_time = "time"
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java
index 6ff7ae9c1e8..3750d82d9b5 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java
@@ -118,6 +118,8 @@ public void testOssFileReadAndWrite(TestContainer container)
"/text/e2e_time_format.txt",
"test/seatunnel/read/text_time_format/e2e.txt",
true);
+ ossUtils.uploadTestFiles(
+ "text/e2e-text.zip", "test/seatunnel/read/zip/text/e2e-text.zip", true);
Path txtLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/text/e2e.txt"));
ossUtils.uploadTestFiles(
txtLzo.toString(), "test/seatunnel/read/lzo_text/e2e.txt", false);
@@ -156,6 +158,7 @@ public void testOssFileReadAndWrite(TestContainer container)
helper.execute("/text/oss_file_text_skip_headers.conf");
// test read oss text file
helper.execute("/text/oss_file_text_to_assert.conf");
+ helper.execute("/text/oss_file_zip_text_to_assert.conf");
// test read oss text file with projection
helper.execute("/text/oss_file_text_projection_to_assert.conf");
// test write oss json file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/e2e-text.zip b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/resources/text/e2e-text.zip
new file mode 100644
index 0000000000000000000000000000000000000000..c9c57bd8d15e892a58dd3608d7d39f76fc36ef12
GIT binary patch
literal 1977
zcmaKtYd8~%AIIlDmxkPvQL{01unTjEMyzebax}N1j_Z_LX_4iyrKub&BytT&HlxsO
z?)S@4M9F0s$09lwQLXrQUj5Ji&HwXz@%ui{_xC)%mmiXgl(aek2m}I*Xa{@%{{s9o
zW6Y5jC@V`0Iv5!ofWROv%~96Y)@T$4V;UP5>wNp22hvtOr8=wmTXDNqCXpKQF^DYf
zB->>hlI8j
zgdpRp2F2<`NAaP+yr~g
z2a>hL-3&$OHDzFSC1R}5MwIm_ZQj~v@%jp+uw~<$g}>fka{5b=sL90j)q%yDyQW3%
zm7zF?J0({Iw|QMtx8e+heL&r(>M?{No3^?r2Dcq(`C>Agx{~l%7e%-YhyN05cub_b}|$6ckN&hy}QB0q{kRicvFLPTosa*ajP~y0ufO=^MiGI0U!Q}&R(}{
z9!|)u%dfw!;)74N4H|u^0ykg4Jz#@;9Cq@z0)(jFt8*7KR-(7QlV>@@XR-r@U*qzH
zWny7uh8>&A{WX!@Qq=rzxGYWQ!&_0N``_|m)*mWyB0;y3FTILG2IxmLcu7%!_Q=$!Bs
z@CgV1RFLYyF9pKq)bKY2
z{IuV=Jp*hiQWBphCX~sXUpVLExfoTx(#Ze#zys^W}wvJk3oUsRMnn{;8~y-I|EOv!GNl4gMyXz1!GrILGTtl1k4#!63FY-ZE8I0p(o
zcs+(_e)9CYac9Z1iVXGcQT!T2TGEA{1F}C7JF9k%c<(*2OhBewEbTkm;_J8b)k%_w
zBQW#jvHhj!R}%8|D!)3MP%9@RJa?2u80>YR4TS?-0UhmSb=ajP5~|NHA6F|X$s
zq6gcoM$MbwGQD)C-K_N*TKt(jkI0KHBRWN9$f$icckkh));Vn0y;Uu*O%oyC&-wsy1#pWx?vEx(j+ed7n^%*RbFe9l;m+@<85i7hS0f=4yT
zun%S$fBr_pKNd*6q+cC#FVc^wwNfi|mKV9v`pNv`Ja
zeLd6`c6mll*V&?6m$44hQ)UxP+vp0*g37M0kY|gprv{o?AE{&0iibfnUKJaw6&8mP
z*RNw8+)v(3ReGAIR(nPzKrtyyy8Y-pK0<0x`ata!;b=VXUZ?pXMV>b2P<%bnlRWV>
zAiC!Wbi*xES2OwcV*BpAuLSMF`0k}<*xGrga#2NE#-lhr`%yehPN1}M|>FcRw=8}JKpGn#@3+FjxqNR
z5zGODU5<&va&ZC2W`klinVY)5l<;1GVWJ6!=eHh1=z@ntlsf@ZK#u(H!gH$YbFNfYjueIM}iJ)q@h#Gj#|i$`8^r`WB^
zV<+abH8+2_CA;7Kv&4(
z9T9+4=muCjOZ!6wT+O-%DxOEO9I9gK*UNR@R>+z|n(Dbs>4
zTH;2nZ2^m%Ff;x`4-w7SpQd{2LH!Mm(nfFV^E>!bbhrP|wNp22hvtOr8=wmTXDNqCXpKQF^DYf
zB->>hlI8j
zgdpRp2F2<`NAaP+yr~g
z2a>hL-3&$OHDzFSC1R}5MwIm_ZQj~v@%jp+uw~<$g}>fka{5b=sL90j)q%yDyQW3%
zm7zF?J0({Iw|QMtx8e+heL&r(>M?{No3^?r2Dcq(`C>Agx{~l%7e%-YhyN05cub_b}|$6ckN&hy}QB0q{kRicvFLPTosa*ajP~y0ufO=^MiGI0U!Q}&R(}{
z9!|)u%dfw!;)74N4H|u^0ykg4Jz#@;9Cq@z0)(jFt8*7KR-(7QlV>@@XR-r@U*qzH
zWny7uh8>&A{WX!@Qq=rzxGYWQ!&_0N``_|m)*mWyB0;y3FTILG2IxmLcu7%!_Q=$!Bs
z@CgV1RFLYyF9pKq)bKY2
z{IuV=Jp*hiQWBphCX~sXUpVLExfoTx(#Ze#zys^W}wvJk3oUsRMnn{;8~y-I|EOv!GNl4gMyXz1!GrILGTtl1k4#!63FY-ZE8I0p(o
zcs+(_e)9CYac9Z1iVXGcQT!T2TGEA{1F}C7JF9k%c<(*2OhBewEbTkm;_J8b)k%_w
zBQW#jvHhj!R}%8|D!)3MP%9@RJa?2u80>YR4TS?-0UhmSb=ajP5~|NHA6F|X$s
zq6gcoM$MbwGQD)C-K_N*TKt(jkI0KHBRWN9$f$icckkh));Vn0y;Uu*O%oyC&-wsy1#pWx?vEx(j+ed7n^%*RbFe9l;m+@<85i7hS0f=4yT
zun%S$fBr_pKNd*6q+cC#FVc^wwNfi|mKV9v`pNv`Ja
zeLd6`c6mll*V&?6m$44hQ)UxP+vp0*g37M0kY|gprv{o?AE{&0iibfnUKJaw6&8mP
z*RNw8+)v(3ReGAIR(nPzKrtyyy8Y-pK0<0x`ata!;b=VXUZ?pXMV>b2P<%bnlRWV>
zAiC!Wbi*xES2OwcV*BpAuLSMF`0k}<*xGrga#2NE#-lhr`%yehPN1}M|>FcRw=8}JKpGn#@3+FjxqNR
z5zGODU5<&va&ZC2W`klinVY)5l<;1GVWJ6!=eHh1=z@ntlsf@ZK#u(H!gH$YbFNfYjueIM}iJ)q@h#Gj#|i$`8^r`WB^
zV<+abH8+2_CA;7Kv&4(
z9T9+4=muCjOZ!6wT+O-%DxOEO9I9gK*UNR@R>+z|n(Dbs>4
zTH;2nZ2^m%Ff;x`4-w7SpQd{2LH!Mm(nfFV^E>!bbhrP|wNp22hvtOr8=wmTXDNqCXpKQF^DYf
zB->>hlI8j
zgdpRp2F2<`NAaP+yr~g
z2a>hL-3&$OHDzFSC1R}5MwIm_ZQj~v@%jp+uw~<$g}>fka{5b=sL90j)q%yDyQW3%
zm7zF?J0({Iw|QMtx8e+heL&r(>M?{No3^?r2Dcq(`C>Agx{~l%7e%-YhyN05cub_b}|$6ckN&hy}QB0q{kRicvFLPTosa*ajP~y0ufO=^MiGI0U!Q}&R(}{
z9!|)u%dfw!;)74N4H|u^0ykg4Jz#@;9Cq@z0)(jFt8*7KR-(7QlV>@@XR-r@U*qzH
zWny7uh8>&A{WX!@Qq=rzxGYWQ!&_0N``_|m)*mWyB0;y3FTILG2IxmLcu7%!_Q=$!Bs
z@CgV1RFLYyF9pKq)bKY2
z{IuV=Jp*hiQWBphCX~sXUpVLExfoTx(#Ze#zys^W}wvJk3oUsRMnn{;8~y-I|EOv!GNl4gMyXz1!GrILGTtl1k4#!63FY-ZE8I0p(o
zcs+(_e)9CYac9Z1iVXGcQT!T2TGEA{1F}C7JF9k%c<(*2OhBewEbTkm;_J8b)k%_w
zBQW#jvHhj!R}%8|D!)3MP%9@RJa?2u80>YR4TS?-0UhmSb=ajP5~|NHA6F|X$s
zq6gcoM$MbwGQD)C-K_N*TKt(jkI0KHBRWN9$f$icckkh));Vn0y;Uu*O%oyC&-wsy1#pWx?vEx(j+ed7n^%*RbFe9l;m+@<85i7hS0f=4yT
zun%S$fBr_pKNd*6q+cC#FVc^wwNfi|mKV9v`pNv`Ja
zeLd6`c6mll*V&?6m$44hQ)UxP+vp0*g37M0kY|gprv{o?AE{&0iibfnUKJaw6&8mP
z*RNw8+)v(3ReGAIR(nPzKrtyyy8Y-pK0<0x`ata!;b=VXUZ?pXMV>b2P<%bnlRWV>
zAiC!Wbi*xES2OwcV*BpAuLSMF`0k}<*xGrga#2NE#-lhr`%yehPN1}M|>FcRw=8}JKpGn#@3+FjxqNR
z5zGODU5<&va&ZC2W`klinVY)5l<;1GVWJ6!=eHh1=z@ntlsf@ZK#u(H!gH$YbFNfYjueIM}iJ)q@h#Gj#|i$`8^r`WB^
zV<+abH8+2_CA;7Kv&4(
z9T9+4=muCjOZ!6wT+O-%DxOEO9I9gK*UNR@R>+z|n(Dbs>4
zTH;2nZ2^m%Ff;x`4-w7SpQd{2LH!Mm(nfFV^E>!bbhrP|