Skip to content

Commit

Permalink
[Feature][S3 File] Make S3 File Connector support multiple table write (
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored May 15, 2024
1 parent f108a5e commit 8f2049b
Show file tree
Hide file tree
Showing 56 changed files with 3,566 additions and 137 deletions.
2 changes: 1 addition & 1 deletion docs/en/connector-v2/sink/S3File.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ transform {
sink {
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
tmp_path = "/tmp/seatunnel/${table_name}"
path="/test/${table_name}"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<name>SeaTunnel : Connectors V2 : File : S3</name>

<properties>
<hadoop-aws.version>2.6.5</hadoop-aws.version>
<hadoop-aws.version>3.1.4</hadoop-aws.version>
<guava.version>27.0-jre</guava.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class S3FileCatalogFactory implements CatalogFactory {
@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
HadoopConf hadoopConf = S3Conf.buildWithReadOnlyConfig(options);
HadoopConf hadoopConf = S3HadoopConf.buildWithReadOnlyConfig(options);
HadoopFileSystemProxy fileSystemUtils = new HadoopFileSystemProxy(hadoopConf);
return new S3FileCatalog(fileSystemUtils, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.file.s3.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;

import java.util.HashMap;
import java.util.Map;

public class S3Conf extends HadoopConf {
public class S3HadoopConf extends HadoopConf {
private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
protected static final String S3A_SCHEMA = "s3a";
Expand All @@ -47,39 +44,33 @@ public void setSchema(String schema) {
this.schema = schema;
}

protected S3Conf(String hdfsNameKey) {
public S3HadoopConf(String hdfsNameKey) {
super(hdfsNameKey);
}

public static HadoopConf buildWithConfig(Config config) {
public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig config) {

String bucketName = config.getString(S3ConfigOptions.S3_BUCKET.key());
S3Conf hadoopConf = new S3Conf(bucketName);
String bucketName = config.get(S3ConfigOptions.S3_BUCKET);
S3HadoopConf hadoopConf = new S3HadoopConf(bucketName);
if (bucketName.startsWith(S3A_SCHEMA)) {
hadoopConf.setSchema(S3A_SCHEMA);
}
HashMap<String, String> s3Options = new HashMap<>();
hadoopConf.putS3SK(s3Options, config);
if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
.forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
if (config.getOptional(S3ConfigOptions.S3_PROPERTIES).isPresent()) {
config.get(S3ConfigOptions.S3_PROPERTIES)
.forEach((key, value) -> s3Options.put(key, String.valueOf(value)));
}

s3Options.put(
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
config.getString(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key()));
config.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());
s3Options.put(
S3ConfigOptions.FS_S3A_ENDPOINT.key(),
config.getString(S3ConfigOptions.FS_S3A_ENDPOINT.key()));
S3ConfigOptions.FS_S3A_ENDPOINT.key(), config.get(S3ConfigOptions.FS_S3A_ENDPOINT));
hadoopConf.setExtraOptions(s3Options);
return hadoopConf;
}

public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {
Config config = readonlyConfig.toConfig();
return buildWithConfig(config);
}

protected String switchHdfsImpl() {
switch (this.schema) {
case S3A_SCHEMA:
Expand All @@ -89,13 +80,13 @@ protected String switchHdfsImpl() {
}
}

private void putS3SK(Map<String, String> s3Options, Config config) {
if (!CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_ACCESS_KEY.key())
&& !CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_SECRET_KEY.key())) {
private void putS3SK(Map<String, String> s3Options, ReadonlyConfig config) {
if (!config.getOptional(S3ConfigOptions.S3_ACCESS_KEY).isPresent()
&& config.getOptional(S3ConfigOptions.S3_SECRET_KEY).isPresent()) {
return;
}
String accessKey = config.getString(S3ConfigOptions.S3_ACCESS_KEY.key());
String secretKey = config.getString(S3ConfigOptions.S3_SECRET_KEY.key());
String accessKey = config.get(S3ConfigOptions.S3_ACCESS_KEY);
String secretKey = config.get(S3ConfigOptions.S3_SECRET_KEY);
if (S3A_SCHEMA.equals(this.schema)) {
s3Options.put("fs.s3a.access.key", accessKey);
s3Options.put("fs.s3a.secret.key", secretKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;

import java.util.Optional;
Expand All @@ -55,7 +55,7 @@ public String getPluginName() {
}

public S3FileSink(CatalogTable catalogTable, ReadonlyConfig readonlyConfig) {
super(S3Conf.buildWithConfig(readonlyConfig.toConfig()), readonlyConfig, catalogTable);
super(S3HadoopConf.buildWithReadOnlyConfig(readonlyConfig), readonlyConfig, catalogTable);
this.catalogTable = catalogTable;
this.readonlyConfig = readonlyConfig;
Config pluginConfig = readonlyConfig.toConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,111 +17,19 @@

package org.apache.seatunnel.connectors.seatunnel.file.s3.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.file.s3.source.config.MultipleTableS3FileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;

import com.google.auto.service.AutoService;
public class S3FileSource extends BaseMultipleTableFileSource {

import java.io.IOException;
public S3FileSource(ReadonlyConfig readonlyConfig) {
super(new MultipleTableS3FileSourceConfig(readonlyConfig));
}

@AutoService(SeaTunnelSource.class)
public class S3FileSource extends BaseFileSource {
@Override
public String getPluginName() {
return FileSystemType.S3.getFileSystemPluginName();
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
S3ConfigOptions.FILE_PATH.key(),
S3ConfigOptions.FILE_FORMAT_TYPE.key(),
S3ConfigOptions.S3_BUCKET.key());
if (!result.isSuccess()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
String path = pluginConfig.getString(S3ConfigOptions.FILE_PATH.key());
hadoopConf = S3Conf.buildWithConfig(pluginConfig);
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(S3ConfigOptions.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
readStrategy.init(hadoopConf);
try {
filePaths = readStrategy.getFileNamesByPath(path);
} catch (IOException e) {
String errorMsg = String.format("Get file list from this path [%s] failed", path);
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
}
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(
pluginConfig
.getString(S3ConfigOptions.FILE_FORMAT_TYPE.key())
.toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
case EXCEL:
case XML:
SeaTunnelRowType userDefinedSchema =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
case PARQUET:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
"SeaTunnel does not support user-defined schema for [parquet, orc] files");
default:
// never got in there
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"SeaTunnel does not supported this file format");
}
} else {
if (filePaths.isEmpty()) {
// When the directory is empty, distribute default behavior schema
rowType = CatalogTableUtil.buildSimpleTextSchema();
return;
}
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
} catch (FileConnectorException e) {
String errorMsg =
String.format("Get table schema from file [%s] failed", filePaths.get(0));
throw new FileConnectorException(
CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;

import com.google.auto.service.AutoService;

import java.io.Serializable;
import java.util.Arrays;

@AutoService(Factory.class)
Expand All @@ -38,6 +42,12 @@ public String factoryIdentifier() {
return FileSystemType.S3.getFileSystemPluginName();
}

@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
return () -> (SeaTunnelSource<T, SplitT, StateT>) new S3FileSource(context.getOptions());
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.s3.source.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;

public class MultipleTableS3FileSourceConfig extends BaseMultipleTableFileSourceConfig {

public MultipleTableS3FileSourceConfig(ReadonlyConfig s3FileSourceRootConfig) {
super(s3FileSourceRootConfig);
}

@Override
public BaseFileSourceConfig getBaseSourceConfig(ReadonlyConfig readonlyConfig) {
return new S3FileSourceConfig(readonlyConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.s3.source.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;

import lombok.Getter;

@Getter
public class S3FileSourceConfig extends BaseFileSourceConfig {

private static final long serialVersionUID = 1L;

@Override
public HadoopConf getHadoopConfig() {
return S3HadoopConf.buildWithReadOnlyConfig(getBaseFileSourceConfig());
}

@Override
public String getPluginName() {
return FileSystemType.S3.getFileSystemPluginName();
}

public S3FileSourceConfig(ReadonlyConfig readonlyConfig) {
super(readonlyConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;

public class HiveOnS3Conf extends S3Conf {
public class HiveOnS3Conf extends S3HadoopConf {
protected static final String S3_SCHEMA = "s3";
// The emr of amazon on s3 use this EmrFileSystem as the file system
protected static final String HDFS_S3_IMPL = "com.amazon.ws.emr.hadoop.fs.EmrFileSystem";
Expand All @@ -43,7 +43,7 @@ protected String switchHdfsImpl() {
}

public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {
S3Conf s3Conf = (S3Conf) S3Conf.buildWithReadOnlyConfig(readonlyConfig);
S3HadoopConf s3Conf = (S3HadoopConf) S3HadoopConf.buildWithReadOnlyConfig(readonlyConfig);
String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);
if (bucketName.startsWith(DEFAULT_SCHEMA)) {
s3Conf.setSchema(DEFAULT_SCHEMA);
Expand Down
Loading

0 comments on commit 8f2049b

Please sign in to comment.