Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BitSail][Connector]add oss source connector #467

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions bitsail-connectors/connector-oss/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bitsail-connectors</artifactId>
<groupId>com.bytedance.bitsail</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-oss</artifactId>

<properties>
<hadoop-aliyun.version>2.9.2</hadoop-aliyun.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
<groupId>org.apache.hadoop</groupId>
<version>2.9.2</version>
</dependency>
CodingGPT marked this conversation as resolved.
Show resolved Hide resolved
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop-aliyun.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-configuration</artifactId>
<groupId>commons-configuration</groupId>
CodingGPT marked this conversation as resolved.
Show resolved Hide resolved
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-component-format-json</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-component-format-csv</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
</project>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OssConf and HadoopCoonf can be merged into one, just one OssConf

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.oss.config;

import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@Data
public class HadoopConf implements Serializable {
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
private static final String SCHEMA = "hdfs";
protected Map<String, String> extraOptions = new HashMap<>();
protected String hdfsNameKey;
protected String hdfsSitePath;
protected String kerberosPrincipal;
protected String kerberosKeytabPath;

public HadoopConf(String hdfsNameKey) {
this.hdfsNameKey = hdfsNameKey;
}

public String getHdfsImpl() {
return HDFS_IMPL;
}

public String getSchema() {
return SCHEMA;
}

public void setExtraOptionsForConfiguration(Configuration configuration) {
if (!extraOptions.isEmpty()) {
extraOptions.forEach(configuration::set);
}
if (hdfsSitePath != null) {
configuration.addResource(new Path(hdfsSitePath));
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.oss.config;

import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.connector.oss.option.OssReaderOptions;

import org.apache.hadoop.fs.aliyun.oss.Constants;

import java.util.HashMap;

public class OssConf extends HadoopConf {
private static final String HDFS_IMPL = "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem";
private static final String SCHEMA = "oss";

@Override
public String getHdfsImpl() {
return HDFS_IMPL;
}

@Override
public String getSchema() {
return SCHEMA;
}

public OssConf(String hdfsNameKey) {
super(hdfsNameKey);
}

public static HadoopConf buildWithConfig(BitSailConfiguration config) {
HadoopConf hadoopConf = new OssConf(config.getString(OssReaderOptions.BUCKET.key()));
HashMap<String, String> ossOptions = new HashMap<>();
ossOptions.put(Constants.ACCESS_KEY_ID, config.getString(OssReaderOptions.ACCESS_KEY.key()));
ossOptions.put(
Constants.ACCESS_KEY_SECRET, config.getString(OssReaderOptions.ACCESS_SECRET.key()));
ossOptions.put(Constants.ENDPOINT_KEY, config.getString(OssReaderOptions.ENDPOINT.key()));
CodingGPT marked this conversation as resolved.
Show resolved Hide resolved
hadoopConf.setExtraOptions(ossOptions);
return hadoopConf;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.oss.config;

import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode;
import com.bytedance.bitsail.connector.oss.option.OssReaderOptions;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

@Getter
@Setter
public class OssConfig implements Serializable {
private String bucket;
private String accessKey;
private String accessSecret;
private String endpoint;
private ContentType contentType;
private Boolean skipFirstLine;
private String filePath;

public OssConfig() {
}

public enum ContentType {
CSV,
JSON
}

public OssConfig(BitSailConfiguration jobConf) {
this.bucket = jobConf.getNecessaryOption(OssReaderOptions.BUCKET, OssConnectorErrorCode.REQUIRED_VALUE);
this.accessKey = jobConf.getNecessaryOption(OssReaderOptions.ACCESS_KEY, OssConnectorErrorCode.REQUIRED_VALUE);
this.accessSecret = jobConf.getNecessaryOption(OssReaderOptions.ACCESS_SECRET, OssConnectorErrorCode.REQUIRED_VALUE);
this.endpoint = jobConf.getNecessaryOption(OssReaderOptions.ENDPOINT, OssConnectorErrorCode.REQUIRED_VALUE);
this.contentType = OssConfig.ContentType.valueOf(jobConf.getNecessaryOption(OssReaderOptions.CONTENT_TYPE, OssConnectorErrorCode.UNSUPPORTED_TYPE).toUpperCase());
this.skipFirstLine = jobConf.get(OssReaderOptions.SKIP_FIRST_LINE);
this.filePath = jobConf.get(OssReaderOptions.FILE_PATH);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.oss.constant;

public class OssConstants {
public static String OSS_CONNECTOR_NAME = "oss";
public static final long OSS_SOURCE_SLEEP_MILL_SECS = 1000L;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.oss.exception;

import com.bytedance.bitsail.common.exception.ErrorCode;

public enum OssConnectorErrorCode implements ErrorCode {
REQUIRED_VALUE("Oss-01", "You missed parameter which is required, please check your configuration."),
CONFIG_ERROR("Oss-02", "Config parameter is error."),
UNSUPPORTED_TYPE("Oss-07", "Content Type is not supported"),
FILE_OPERATION_FAILED("Oss-04", "File Operation Failed"),
SPLIT_ERROR("Oss-05", "Something wrong with creating splits.");
private final String code;
private final String description;

OssConnectorErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}

@Override
public String toString() {
return super.toString();
}
}
Loading