From 1e6f4f5eace679012aeb5e1e6e4c3d439d9b3403 Mon Sep 17 00:00:00 2001 From: CodingGPT <3173405212@qq.com> Date: Mon, 10 Apr 2023 00:38:04 +0800 Subject: [PATCH 1/2] [BitSail][Connector]add oss source connector --- bitsail-connectors/connector-oss/pom.xml | 84 ++++++++ .../connector/oss/config/HadoopConf.java | 58 ++++++ .../bitsail/connector/oss/config/OssConf.java | 54 +++++ .../connector/oss/config/OssConfig.java | 56 +++++ .../connector/oss/constant/OssConstants.java | 22 ++ .../oss/exception/OssConnectorErrorCode.java | 49 +++++ .../oss/option/OssReaderOptions.java | 93 +++++++++ .../connector/oss/source/OssSource.java | 99 +++++++++ .../reader/DeserializationSchemaFactory.java | 45 ++++ .../oss/source/reader/OssSourceReader.java | 193 ++++++++++++++++++ .../oss/source/split/OssSourceSplit.java | 42 ++++ .../split/OssSourceSplitCoordinator.java | 159 +++++++++++++++ .../bitsail/connector/oss/util/OssUtil.java | 62 ++++++ bitsail-connectors/pom.xml | 1 + bitsail-dist/pom.xml | 7 + 15 files changed, 1024 insertions(+) create mode 100644 bitsail-connectors/connector-oss/pom.xml create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConfig.java create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/exception/OssConnectorErrorCode.java create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/option/OssReaderOptions.java create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/OssSource.java create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/DeserializationSchemaFactory.java create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplit.java create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java create mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java diff --git a/bitsail-connectors/connector-oss/pom.xml b/bitsail-connectors/connector-oss/pom.xml new file mode 100644 index 000000000..1c9ef16d5 --- /dev/null +++ b/bitsail-connectors/connector-oss/pom.xml @@ -0,0 +1,84 @@ + + + + + bitsail-connectors + com.bytedance.bitsail + ${revision} + + 4.0.0 + + connector-oss + + + 2.9.2 + 8 + 8 + + + + hadoop-common + + + log4j + log4j + + + slf4j-log4j12 + org.slf4j + + + org.apache.hadoop + 2.9.2 + + + org.apache.hadoop + hadoop-aliyun + ${hadoop-aliyun.version} + + + commons-configuration + commons-configuration + + + slf4j-log4j12 + org.slf4j + + + log4j + log4j + + + hadoop-common + org.apache.hadoop + + + + + com.bytedance.bitsail + bitsail-component-format-json + ${revision} + + + com.bytedance.bitsail + bitsail-component-format-csv + ${revision} + + + \ No newline at end of file diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java new file mode 100644 index 000000000..33b948295 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java @@ -0,0 +1,58 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.config; + +import lombok.Data; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +@Data +public class HadoopConf implements Serializable { + private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; + private static final String SCHEMA = "hdfs"; + protected Map extraOptions = new HashMap<>(); + protected String hdfsNameKey; + protected String hdfsSitePath; + protected String kerberosPrincipal; + protected String kerberosKeytabPath; + + public HadoopConf(String hdfsNameKey) { + this.hdfsNameKey = hdfsNameKey; + } + + public String getHdfsImpl() { + return HDFS_IMPL; + } + + public String getSchema() { + return SCHEMA; + } + + public void setExtraOptionsForConfiguration(Configuration configuration) { + if (!extraOptions.isEmpty()) { + extraOptions.forEach(configuration::set); + } + if (hdfsSitePath != null) { + configuration.addResource(new Path(hdfsSitePath)); + } + } +} + diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java new file mode 100644 index 000000000..ee619c272 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java @@ -0,0 +1,54 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.config; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.oss.option.OssReaderOptions; + +import org.apache.hadoop.fs.aliyun.oss.Constants; + +import java.util.HashMap; + +public class OssConf extends HadoopConf { + private static final String HDFS_IMPL = "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"; + private static final String SCHEMA = "oss"; + + @Override + public String getHdfsImpl() { + return HDFS_IMPL; + } + + @Override + public String getSchema() { + return SCHEMA; + } + + public OssConf(String hdfsNameKey) { + super(hdfsNameKey); + } + + public static HadoopConf buildWithConfig(BitSailConfiguration config) { + HadoopConf hadoopConf = new OssConf(config.getString(OssReaderOptions.BUCKET.key())); + HashMap ossOptions = new HashMap<>(); + ossOptions.put(Constants.ACCESS_KEY_ID, config.getString(OssReaderOptions.ACCESS_KEY.key())); + ossOptions.put( + Constants.ACCESS_KEY_SECRET, config.getString(OssReaderOptions.ACCESS_SECRET.key())); + ossOptions.put(Constants.ENDPOINT_KEY, config.getString(OssReaderOptions.ENDPOINT.key())); + hadoopConf.setExtraOptions(ossOptions); + return hadoopConf; + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConfig.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConfig.java new file mode 100644 index 000000000..b246e91ff --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConfig.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.config; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode; +import com.bytedance.bitsail.connector.oss.option.OssReaderOptions; + +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; + +@Getter +@Setter +public class OssConfig implements Serializable { + private String bucket; + private String accessKey; + private String accessSecret; + private String endpoint; + private ContentType contentType; + private Boolean skipFirstLine; + private String filePath; + + public OssConfig() { + } + + public enum ContentType { + CSV, + JSON + } + + public OssConfig(BitSailConfiguration jobConf) { + this.bucket = jobConf.getNecessaryOption(OssReaderOptions.BUCKET, OssConnectorErrorCode.REQUIRED_VALUE); + this.accessKey = jobConf.getNecessaryOption(OssReaderOptions.ACCESS_KEY, OssConnectorErrorCode.REQUIRED_VALUE); + this.accessSecret = jobConf.getNecessaryOption(OssReaderOptions.ACCESS_SECRET, OssConnectorErrorCode.REQUIRED_VALUE); + this.endpoint = jobConf.getNecessaryOption(OssReaderOptions.ENDPOINT, OssConnectorErrorCode.REQUIRED_VALUE); + this.contentType = OssConfig.ContentType.valueOf(jobConf.getNecessaryOption(OssReaderOptions.CONTENT_TYPE, OssConnectorErrorCode.UNSUPPORTED_TYPE).toUpperCase()); + this.skipFirstLine = jobConf.get(OssReaderOptions.SKIP_FIRST_LINE); + this.filePath = jobConf.get(OssReaderOptions.FILE_PATH); + } +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java new file mode 100644 index 000000000..34f941693 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java @@ -0,0 +1,22 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.constant; + +public class OssConstants { + public static String OSS_CONNECTOR_NAME = "oss"; + public static final long OSS_SOURCE_SLEEP_MILL_SECS = 1000L; +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/exception/OssConnectorErrorCode.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/exception/OssConnectorErrorCode.java new file mode 100644 index 000000000..68064a1a0 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/exception/OssConnectorErrorCode.java @@ -0,0 +1,49 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.exception; + +import com.bytedance.bitsail.common.exception.ErrorCode; + +public enum OssConnectorErrorCode implements ErrorCode { + REQUIRED_VALUE("Oss-01", "You missed parameter which is required, please check your configuration."), + CONFIG_ERROR("Oss-02", "Config parameter is error."), + UNSUPPORTED_TYPE("Oss-07", "Content Type is not supported"), + FILE_OPERATION_FAILED("Oss-04", "File Operation Failed"), + SPLIT_ERROR("Oss-05", "Something wrong with creating splits."); + private final String code; + private final String description; + + OssConnectorErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String toString() { + return super.toString(); + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/option/OssReaderOptions.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/option/OssReaderOptions.java new file mode 100644 index 000000000..2a09eeb87 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/option/OssReaderOptions.java @@ -0,0 +1,93 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.option; + +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.common.option.ReaderOptions; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; +import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX; + +public interface OssReaderOptions extends ReaderOptions.BaseReaderOptions { + + public static final ConfigOption ACCESS_KEY = + key(READER_PREFIX + "access_key") + .noDefaultValue(String.class); + + public static final ConfigOption ACCESS_SECRET = + key(READER_PREFIX + "access_secret") + .noDefaultValue(String.class); + + public static final ConfigOption ENDPOINT = + key(READER_PREFIX + "endpoint") + .noDefaultValue(String.class); + + public static final ConfigOption BUCKET = + key(READER_PREFIX + "bucket") + .noDefaultValue(String.class); + + ConfigOption FILE_PATH = + key(READER_PREFIX + "file_path") + .noDefaultValue(String.class); + + ConfigOption CONTENT_TYPE = + key(READER_PREFIX + "content_type") + .defaultValue("csv"); + + /** + * CSV Format Options + */ + // whether to treat error column as null when parsing + ConfigOption CONVERT_ERROR_COLUMN_AS_NULL = + key(READER_PREFIX + "convert_error_column_as_null") + .defaultValue(false); + + ConfigOption CSV_DELIMITER = + key(READER_PREFIX + "csv_delimiter") + .defaultValue(","); + + ConfigOption CSV_ESCAPE = + key(READER_PREFIX + "csv_escape") + .noDefaultValue(Character.class); + + ConfigOption CSV_QUOTE = + key(READER_PREFIX + "csv_quote") + .noDefaultValue(Character.class); + + ConfigOption CSV_WITH_NULL_STRING = + key(READER_PREFIX + "csv_with_null_string") + .noDefaultValue(String.class); + + ConfigOption CSV_MULTI_DELIMITER_REPLACER = + key(READER_PREFIX + "csv_multi_delimiter_replace_char") + .defaultValue('ยง'); + + ConfigOption SKIP_FIRST_LINE = + key(READER_PREFIX + "skip_first_line") + .defaultValue(false); + + /** + * JSON Options + *

+ * Tips: + * CONVERT_ERROR_COLUMN_AS_NULL is set above + */ + // whether to be insensitive to upper or lower case + ConfigOption CASE_INSENSITIVE = + key(READER_PREFIX + "case_insensitive") + .defaultValue(false); +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/OssSource.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/OssSource.java new file mode 100644 index 000000000..a06f975ac --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/OssSource.java @@ -0,0 +1,99 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.source; + +import com.bytedance.bitsail.base.connector.reader.v1.Boundedness; +import com.bytedance.bitsail.base.connector.reader.v1.Source; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.base.execution.ExecutionEnviron; +import com.bytedance.bitsail.base.extension.ParallelismComputable; +import com.bytedance.bitsail.base.parallelism.ParallelismAdvice; +import com.bytedance.bitsail.base.serializer.BinarySerializer; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.type.TypeInfoConverter; +import com.bytedance.bitsail.common.type.filemapping.FileMappingTypeInfoConverter; +import com.bytedance.bitsail.connector.oss.constant.OssConstants; +import com.bytedance.bitsail.connector.oss.option.OssReaderOptions; +import com.bytedance.bitsail.connector.oss.source.reader.OssSourceReader; +import com.bytedance.bitsail.connector.oss.source.split.OssSourceSplit; +import com.bytedance.bitsail.connector.oss.source.split.OssSourceSplitCoordinator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OssSource implements Source, ParallelismComputable { + private static final Logger LOG = LoggerFactory.getLogger(OssSource.class); + private BitSailConfiguration jobConf; + + @Override + public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) { + this.jobConf = readerConfiguration; + } + + @Override + public Boundedness getSourceBoundedness() { + return Boundedness.BOUNDEDNESS; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new OssSourceReader(jobConf, readerContext); + } + + @Override + public SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator.Context coordinatorContext) { + return new OssSourceSplitCoordinator(coordinatorContext, jobConf); + } + + @Override + public BinarySerializer getSplitSerializer() { + return Source.super.getSplitSerializer(); + } + + @Override + public BinarySerializer getSplitCoordinatorCheckpointSerializer() { + return Source.super.getSplitCoordinatorCheckpointSerializer(); + } + + @Override + public TypeInfoConverter createTypeInfoConverter() { + return new FileMappingTypeInfoConverter(getReaderName()); + } + + @Override + public String getReaderName() { + return OssConstants.OSS_CONNECTOR_NAME; + } + + @Override + public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception { + int parallelism; + if (selfConf.fieldExists(OssReaderOptions.READER_PARALLELISM_NUM)) { + parallelism = selfConf.get(OssReaderOptions.READER_PARALLELISM_NUM); + } else { + parallelism = 1; + } + return ParallelismAdvice.builder() + .adviceParallelism(parallelism) + .enforceDownStreamChain(false) + .build(); + + } +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/DeserializationSchemaFactory.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/DeserializationSchemaFactory.java new file mode 100644 index 000000000..7a0748747 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/DeserializationSchemaFactory.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.source.reader; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.component.format.csv.CsvDeserializationSchema; +import com.bytedance.bitsail.component.format.json.JsonRowDeserializationSchema; +import com.bytedance.bitsail.connector.oss.config.OssConfig; +import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode; + +public class DeserializationSchemaFactory { + public static DeserializationSchema createDeserializationSchema(BitSailConfiguration jobConf, SourceReader.Context context, + OssConfig ossConfig) { + if (ossConfig.getContentType() == OssConfig.ContentType.CSV) { + return new CsvDeserializationSchema( + jobConf, + context.getRowTypeInfo()); + } else if (ossConfig.getContentType() == OssConfig.ContentType.JSON) { + return new JsonRowDeserializationSchema( + jobConf, + context.getRowTypeInfo()); + } else { + throw BitSailException.asBitSailException(OssConnectorErrorCode.UNSUPPORTED_TYPE, + "Content type only supports CSV and JSON"); + } + } +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java new file mode 100644 index 000000000..129cbb176 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java @@ -0,0 +1,193 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.source.reader; + +import com.bytedance.bitsail.base.connector.reader.v1.SourcePipeline; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.connector.oss.config.HadoopConf; +import com.bytedance.bitsail.connector.oss.config.OssConf; +import com.bytedance.bitsail.connector.oss.config.OssConfig; +import com.bytedance.bitsail.connector.oss.constant.OssConstants; +import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode; +import com.bytedance.bitsail.connector.oss.source.split.OssSourceSplit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; + +public class OssSourceReader implements SourceReader { + private static final Logger LOG = LoggerFactory.getLogger(OssSourceReader.class); + protected HadoopConf hadoopConf; + private final transient DeserializationSchema deserializationSchema; + private final OssConfig ossConfig; + private final transient Context context; + private long currentReadCount = 0; + private final Deque splits; + private boolean skipFirstLine = false; + private boolean hasNoMoreSplits = false; + private int totalSplitNum = 0; + private int skipFirstLineNums = 0; + private OssSourceSplit currentSplit; + FileSystem fs; + + public OssSourceReader(BitSailConfiguration jobConf, Context context) { + this.ossConfig = new OssConfig(jobConf); + this.context = context; + this.deserializationSchema = DeserializationSchemaFactory.createDeserializationSchema(jobConf, context, ossConfig); + this.splits = new LinkedList<>(); + this.hadoopConf = OssConf.buildWithConfig(jobConf); + LOG.info("OssSourceReader is initialized."); + } + + @Override + public void start() { + if (this.ossConfig.getSkipFirstLine()) { + this.skipFirstLine = true; + this.skipFirstLineNums = 1; + } + } + + @Override + public void pollNext(SourcePipeline pipeline) throws Exception { + if (currentSplit == null && splits.isEmpty()) { + LOG.info("pollnext no splits"); + Thread.sleep(OssConstants.OSS_SOURCE_SLEEP_MILL_SECS); + return; + } + LOG.info("pollnext split size {}", this.splits.size()); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + this.currentSplit = this.splits.poll(); + LOG.info("split {} path {}", currentSplit, currentSplit.getPath()); + Path filePath = new Path(currentSplit.getPath()); + try (BufferedReader reader = + new BufferedReader( + new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) { + reader.lines() + .skip(skipFirstLineNums) + .forEach( + line -> { + try { + if (line != null) { + Row row = deserializationSchema.deserialize(line.getBytes()); + pipeline.output(row); + this.currentReadCount++; + } + } catch (IOException e) { + String errorMsg = + String.format( + "Read data from this file [%s] failed", + filePath); + throw BitSailException.asBitSailException( + OssConnectorErrorCode.FILE_OPERATION_FAILED, errorMsg, e); + } + }); + } + } + + public List getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException { + LOG.info("start getFileNamesByPath path: {}", path); + Configuration configuration = getConfiguration(hadoopConf); + FileSystem hdfs = FileSystem.get(configuration); + ArrayList fileNames = new ArrayList<>(); + Path listFiles = new Path(path); + FileStatus[] stats = hdfs.listStatus(listFiles); + for (FileStatus fileStatus : stats) { + if (fileStatus.isDirectory()) { + LOG.info("getFileNamesByPath dir: {}", fileStatus.getPath()); + fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString())); + continue; + } + if (fileStatus.isFile()) { + if (!fileStatus.getPath().getName().equals("_SUCCESS")) { + String filePath = fileStatus.getPath().toString(); + fileNames.add(filePath); + } + } + } + return fileNames; + } + + Configuration getConfiguration() { + return getConfiguration(this.hadoopConf); + } + + public Configuration getConfiguration(HadoopConf hadoopConf) { + Configuration configuration = new Configuration(); + configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); + configuration.set( + String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getHdfsImpl()); + hadoopConf.setExtraOptionsForConfiguration(configuration); + return configuration; + } + + @Override + public void addSplits(List splitList) { + totalSplitNum += splitList.size(); + this.splits.addAll(splitList); + } + + @Override + public boolean hasMoreElements() { + if (hasNoMoreSplits && splits.isEmpty()) { + LOG.info("Finish reading all {} splits.", totalSplitNum); + return false; + } + return true; + } + + @Override + public void notifyNoMoreSplits() { + this.hasNoMoreSplits = true; + LOG.info("No more splits will be assigned."); + } + + @Override + public List snapshotState(long checkpointId) { + return Collections.emptyList(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + SourceReader.super.notifyCheckpointComplete(checkpointId); + } + + @Override + public void close() throws Exception { + if (fs != null) { + fs.close(); + } + } +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplit.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplit.java new file mode 100644 index 000000000..c106e894e --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplit.java @@ -0,0 +1,42 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.source.split; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplit; + +import lombok.Getter; +import lombok.Setter; + +@Getter +public class OssSourceSplit implements SourceSplit { + public static final String OSS_SOURCE_SPLIT_PREFIX = "Oss_source_split_"; + private final String splitId; + + public OssSourceSplit(String splitId) { + this.splitId = OSS_SOURCE_SPLIT_PREFIX + splitId; + this.path = splitId; + } + + @Setter + private String path; + + @Override + public String uniqSplitId() { + return splitId; + } + +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java new file mode 100644 index 000000000..9371681f9 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java @@ -0,0 +1,159 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.source.split; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceEvent; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.oss.config.HadoopConf; +import com.bytedance.bitsail.connector.oss.config.OssConf; +import com.bytedance.bitsail.connector.oss.config.OssConfig; +import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode; +import com.bytedance.bitsail.connector.oss.util.OssUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.NoArgsConstructor; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class OssSourceSplitCoordinator implements SourceSplitCoordinator { + private static final Logger LOG = LoggerFactory.getLogger(OssSourceSplitCoordinator.class); + private final SourceSplitCoordinator.Context context; + private final BitSailConfiguration jobConf; + private OssConfig ossConfig; + private final Map> splitAssignmentPlan; + + public OssSourceSplitCoordinator(SourceSplitCoordinator.Context context, BitSailConfiguration jobConf) { + this.context = context; + this.jobConf = jobConf; + this.ossConfig = new OssConfig(jobConf); + this.splitAssignmentPlan = Maps.newConcurrentMap(); + } + + private List constructSplit() throws IOException { + HadoopConf conf = OssConf.buildWithConfig(this.jobConf); + String path = ossConfig.getFilePath(); + List fileList = OssUtil.getFileNamesByPath(conf, path); + LOG.info("OssSourceSplitCoordinator fileList: {}", fileList); + List fileSourceSplits = new ArrayList<>(); + fileList.forEach(file -> fileSourceSplits.add(new OssSourceSplit(file))); + return fileSourceSplits; + } + + @Override + public void start() { + List splits = new ArrayList<>(); + try { + splits = constructSplit(); + } catch (IOException e) { + throw new BitSailException(OssConnectorErrorCode.SPLIT_ERROR, "Failed to create splits."); + } + int readerNum = context.totalParallelism(); + LOG.info("Found {} readers and {} splits.", readerNum, splits.size()); + if (readerNum > splits.size()) { + LOG.error("Reader number {} is larger than split number {}.", readerNum, splits.size()); + } + for (OssSourceSplit split : splits) { + int readerIndex = ReaderSelector.getReaderIndex(readerNum); + splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split); + LOG.info("Will assign split {} to the {}-th reader", split.uniqSplitId(), readerIndex); + } + } + + @Override + public void addReader(int subtaskId) { + LOG.info("Found reader {}", subtaskId); + tryAssignSplitsToReader(); + } + + private void tryAssignSplitsToReader() { + Map> splitsToAssign = new HashMap<>(); + for (Integer readerIndex : splitAssignmentPlan.keySet()) { + if (CollectionUtils.isNotEmpty(splitAssignmentPlan.get(readerIndex)) && context.registeredReaders().contains(readerIndex)) { + splitsToAssign.put(readerIndex, Lists.newArrayList(splitAssignmentPlan.get(readerIndex))); + } + } + for (Integer readerIndex : splitsToAssign.keySet()) { + LOG.info("Try assigning splits reader {}, splits are: [{}]", readerIndex, + splitsToAssign.get(readerIndex).stream().map(OssSourceSplit::uniqSplitId).collect(Collectors.toList())); + splitAssignmentPlan.remove(readerIndex); + context.assignSplit(readerIndex, splitsToAssign.get(readerIndex)); + context.signalNoMoreSplits(readerIndex); + LOG.info("Finish assigning splits reader {}", readerIndex); + } + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + LOG.info("Source reader {} return splits {}.", subtaskId, splits); + int readerNum = context.totalParallelism(); + for (OssSourceSplit split : splits) { + int readerIndex = ReaderSelector.getReaderIndex(readerNum); + splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split); + LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex); + } + tryAssignSplitsToReader(); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + SourceSplitCoordinator.super.handleSourceEvent(subtaskId, sourceEvent); + } + + @Override + public EmptyState snapshotState(long checkpoint) throws Exception { + return new EmptyState(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + SourceSplitCoordinator.super.notifyCheckpointComplete(checkpointId); + } + + @Override + public void close() { + } + + @NoArgsConstructor + static class ReaderSelector { + private static long readerIndex = 0; + + public static int getReaderIndex(int totalReaderNum) { + return (int) readerIndex++ % totalReaderNum; + } + } +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java new file mode 100644 index 000000000..1b96b4a5c --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java @@ -0,0 +1,62 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.util; + +import com.bytedance.bitsail.connector.oss.config.HadoopConf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class OssUtil { + static FileSystem fs; + public static Configuration getConfiguration(HadoopConf hadoopConf) { + Configuration configuration = new Configuration(); + configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); + configuration.set( + String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getHdfsImpl()); + hadoopConf.setExtraOptionsForConfiguration(configuration); + return configuration; + } + + public static List getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException { + Configuration configuration = getConfiguration(hadoopConf); + fs = FileSystem.get(configuration); + ArrayList fileNames = new ArrayList<>(); + Path listFiles = new Path(path); + FileStatus[] stats = fs.listStatus(listFiles); + for (FileStatus fileStatus : stats) { + if (fileStatus.isDirectory()) { + fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString())); + continue; + } + if (fileStatus.isFile()) { + if (!fileStatus.getPath().getName().equals("_SUCCESS")) { + String filePath = fileStatus.getPath().toString(); + fileNames.add(filePath); + } + } + } + return fileNames; + } +} diff --git a/bitsail-connectors/pom.xml b/bitsail-connectors/pom.xml index 7f423d273..445acc1dd 100644 --- a/bitsail-connectors/pom.xml +++ b/bitsail-connectors/pom.xml @@ -52,6 +52,7 @@ connector-cdc connector-kafka connector-mongodb + connector-oss diff --git a/bitsail-dist/pom.xml b/bitsail-dist/pom.xml index 276586b39..d8f6c415f 100644 --- a/bitsail-dist/pom.xml +++ b/bitsail-dist/pom.xml @@ -273,6 +273,13 @@ ${revision} provided + + + com.bytedance.bitsail + connector-oss + ${revision} + provided + From 7e6069dd311e85463360c4d6eb996f5511b20d5e Mon Sep 17 00:00:00 2001 From: CodingGPT <3173405212@qq.com> Date: Sun, 16 Apr 2023 10:34:01 +0800 Subject: [PATCH 2/2] add resource files and delete unnessary code --- bitsail-connectors/connector-oss/pom.xml | 29 +----- .../connector/oss/config/HadoopConf.java | 58 ------------ .../bitsail/connector/oss/config/OssConf.java | 36 ++++++-- .../connector/oss/constant/OssConstants.java | 1 + .../oss/source/reader/OssSourceReader.java | 44 ++-------- .../split/OssSourceSplitCoordinator.java | 7 +- .../bitsail/connector/oss/util/OssUtil.java | 18 ++-- .../bitsail-connector-unified-oss.json | 9 ++ .../main/resources/oss-type-converter.yaml | 88 +++++++++++++++++++ 9 files changed, 148 insertions(+), 142 deletions(-) delete mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java create mode 100644 bitsail-connectors/connector-oss/src/main/resources/bitsail-connector-unified-oss.json create mode 100644 bitsail-connectors/connector-oss/src/main/resources/oss-type-converter.yaml diff --git a/bitsail-connectors/connector-oss/pom.xml b/bitsail-connectors/connector-oss/pom.xml index 1c9ef16d5..211a54169 100644 --- a/bitsail-connectors/connector-oss/pom.xml +++ b/bitsail-connectors/connector-oss/pom.xml @@ -27,46 +27,23 @@ connector-oss - 2.9.2 + 3.1.1 8 8 - - hadoop-common - - - log4j - log4j - - - slf4j-log4j12 - org.slf4j - - - org.apache.hadoop - 2.9.2 - org.apache.hadoop hadoop-aliyun ${hadoop-aliyun.version} - commons-configuration - commons-configuration - - - slf4j-log4j12 org.slf4j + slf4j-log4j12 - log4j log4j - - - hadoop-common - org.apache.hadoop + log4j diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java deleted file mode 100644 index 33b948295..000000000 --- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.oss.config; - -import lombok.Data; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -@Data -public class HadoopConf implements Serializable { - private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; - private static final String SCHEMA = "hdfs"; - protected Map extraOptions = new HashMap<>(); - protected String hdfsNameKey; - protected String hdfsSitePath; - protected String kerberosPrincipal; - protected String kerberosKeytabPath; - - public HadoopConf(String hdfsNameKey) { - this.hdfsNameKey = hdfsNameKey; - } - - public String getHdfsImpl() { - return HDFS_IMPL; - } - - public String getSchema() { - return SCHEMA; - } - - public void setExtraOptionsForConfiguration(Configuration configuration) { - if (!extraOptions.isEmpty()) { - extraOptions.forEach(configuration::set); - } - if (hdfsSitePath != null) { - configuration.addResource(new Path(hdfsSitePath)); - } - } -} - diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java index ee619c272..38357258d 100644 --- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java @@ -19,36 +19,54 @@ import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.connector.oss.option.OssReaderOptions; +import lombok.Data; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.aliyun.oss.Constants; +import java.io.Serializable; import java.util.HashMap; +import java.util.Map; -public class OssConf extends HadoopConf { +@Data +public class OssConf implements Serializable { private static final String HDFS_IMPL = "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"; private static final String SCHEMA = "oss"; + protected Map extraOptions = new HashMap<>(); + protected String hdfsNameKey; + protected String hdfsSitePath; + protected String kerberosPrincipal; + protected String kerberosKeytabPath; - @Override public String getHdfsImpl() { return HDFS_IMPL; } - @Override public String getSchema() { return SCHEMA; } public OssConf(String hdfsNameKey) { - super(hdfsNameKey); + this.hdfsNameKey = hdfsNameKey; } - public static HadoopConf buildWithConfig(BitSailConfiguration config) { - HadoopConf hadoopConf = new OssConf(config.getString(OssReaderOptions.BUCKET.key())); + public static OssConf buildWithConfig(BitSailConfiguration config) { + OssConf hadoopConf = new OssConf(config.get(OssReaderOptions.BUCKET)); HashMap ossOptions = new HashMap<>(); - ossOptions.put(Constants.ACCESS_KEY_ID, config.getString(OssReaderOptions.ACCESS_KEY.key())); + ossOptions.put(Constants.ACCESS_KEY_ID, config.get(OssReaderOptions.ACCESS_KEY)); ossOptions.put( - Constants.ACCESS_KEY_SECRET, config.getString(OssReaderOptions.ACCESS_SECRET.key())); - ossOptions.put(Constants.ENDPOINT_KEY, config.getString(OssReaderOptions.ENDPOINT.key())); + Constants.ACCESS_KEY_SECRET, config.get(OssReaderOptions.ACCESS_SECRET)); + ossOptions.put(Constants.ENDPOINT_KEY, config.get(OssReaderOptions.ENDPOINT)); hadoopConf.setExtraOptions(ossOptions); return hadoopConf; } + + public void setExtraOptionsForConfiguration(Configuration configuration) { + if (!extraOptions.isEmpty()) { + extraOptions.forEach(configuration::set); + } + if (hdfsSitePath != null) { + configuration.addResource(new Path(hdfsSitePath)); + } + } } \ No newline at end of file diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java index 34f941693..5bc2e6e10 100644 --- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java @@ -19,4 +19,5 @@ public class OssConstants { public static String OSS_CONNECTOR_NAME = "oss"; public static final long OSS_SOURCE_SLEEP_MILL_SECS = 1000L; + public static final String OSS_SOURCE_IGNORE_FILENAME = "_SUCCESS"; } diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java index 129cbb176..a380a3eac 100644 --- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java @@ -22,7 +22,6 @@ import com.bytedance.bitsail.common.BitSailException; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; import com.bytedance.bitsail.common.row.Row; -import com.bytedance.bitsail.connector.oss.config.HadoopConf; import com.bytedance.bitsail.connector.oss.config.OssConf; import com.bytedance.bitsail.connector.oss.config.OssConfig; import com.bytedance.bitsail.connector.oss.constant.OssConstants; @@ -31,7 +30,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -41,7 +39,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.LinkedList; @@ -49,11 +46,10 @@ public class OssSourceReader implements SourceReader { private static final Logger LOG = LoggerFactory.getLogger(OssSourceReader.class); - protected HadoopConf hadoopConf; + protected OssConf ossConf; private final transient DeserializationSchema deserializationSchema; private final OssConfig ossConfig; private final transient Context context; - private long currentReadCount = 0; private final Deque splits; private boolean skipFirstLine = false; private boolean hasNoMoreSplits = false; @@ -67,14 +63,14 @@ public OssSourceReader(BitSailConfiguration jobConf, Context context) { this.context = context; this.deserializationSchema = DeserializationSchemaFactory.createDeserializationSchema(jobConf, context, ossConfig); this.splits = new LinkedList<>(); - this.hadoopConf = OssConf.buildWithConfig(jobConf); + this.ossConf = OssConf.buildWithConfig(jobConf); LOG.info("OssSourceReader is initialized."); } @Override public void start() { if (this.ossConfig.getSkipFirstLine()) { - this.skipFirstLine = true; + skipFirstLine = true; this.skipFirstLineNums = 1; } } @@ -103,7 +99,6 @@ public void pollNext(SourcePipeline pipeline) throws Exception { if (line != null) { Row row = deserializationSchema.deserialize(line.getBytes()); pipeline.output(row); - this.currentReadCount++; } } catch (IOException e) { String errorMsg = @@ -117,39 +112,16 @@ public void pollNext(SourcePipeline pipeline) throws Exception { } } - public List getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException { - LOG.info("start getFileNamesByPath path: {}", path); - Configuration configuration = getConfiguration(hadoopConf); - FileSystem hdfs = FileSystem.get(configuration); - ArrayList fileNames = new ArrayList<>(); - Path listFiles = new Path(path); - FileStatus[] stats = hdfs.listStatus(listFiles); - for (FileStatus fileStatus : stats) { - if (fileStatus.isDirectory()) { - LOG.info("getFileNamesByPath dir: {}", fileStatus.getPath()); - fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString())); - continue; - } - if (fileStatus.isFile()) { - if (!fileStatus.getPath().getName().equals("_SUCCESS")) { - String filePath = fileStatus.getPath().toString(); - fileNames.add(filePath); - } - } - } - return fileNames; - } - Configuration getConfiguration() { - return getConfiguration(this.hadoopConf); + return getConfiguration(this.ossConf); } - public Configuration getConfiguration(HadoopConf hadoopConf) { + public Configuration getConfiguration(OssConf ossConf) { Configuration configuration = new Configuration(); - configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); + configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, ossConf.getHdfsNameKey()); configuration.set( - String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getHdfsImpl()); - hadoopConf.setExtraOptionsForConfiguration(configuration); + String.format("fs.%s.impl", ossConf.getSchema()), ossConf.getHdfsImpl()); + ossConf.setExtraOptionsForConfiguration(configuration); return configuration; } diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java index 9371681f9..b5c8a7690 100644 --- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java @@ -21,7 +21,6 @@ import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; import com.bytedance.bitsail.common.BitSailException; import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.connector.oss.config.HadoopConf; import com.bytedance.bitsail.connector.oss.config.OssConf; import com.bytedance.bitsail.connector.oss.config.OssConfig; import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode; @@ -60,7 +59,7 @@ public OssSourceSplitCoordinator(SourceSplitCoordinator.Context constructSplit() throws IOException { - HadoopConf conf = OssConf.buildWithConfig(this.jobConf); + OssConf conf = OssConf.buildWithConfig(this.jobConf); String path = ossConfig.getFilePath(); List fileList = OssUtil.getFileNamesByPath(conf, path); LOG.info("OssSourceSplitCoordinator fileList: {}", fileList); @@ -71,7 +70,7 @@ private List constructSplit() throws IOException { @Override public void start() { - List splits = new ArrayList<>(); + List splits; try { splits = constructSplit(); } catch (IOException e) { @@ -80,7 +79,7 @@ public void start() { int readerNum = context.totalParallelism(); LOG.info("Found {} readers and {} splits.", readerNum, splits.size()); if (readerNum > splits.size()) { - LOG.error("Reader number {} is larger than split number {}.", readerNum, splits.size()); + LOG.warn("Reader number {} is larger than split number {}.", readerNum, splits.size()); } for (OssSourceSplit split : splits) { int readerIndex = ReaderSelector.getReaderIndex(readerNum); diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java index 1b96b4a5c..d8c78b742 100644 --- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java @@ -16,7 +16,8 @@ package com.bytedance.bitsail.connector.oss.util; -import com.bytedance.bitsail.connector.oss.config.HadoopConf; +import com.bytedance.bitsail.connector.oss.config.OssConf; +import com.bytedance.bitsail.connector.oss.constant.OssConstants; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -29,19 +30,18 @@ import java.util.List; public class OssUtil { - static FileSystem fs; - public static Configuration getConfiguration(HadoopConf hadoopConf) { + public static Configuration getConfiguration(OssConf ossConf) { Configuration configuration = new Configuration(); - configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); + configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, ossConf.getHdfsNameKey()); configuration.set( - String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getHdfsImpl()); - hadoopConf.setExtraOptionsForConfiguration(configuration); + String.format("fs.%s.impl", ossConf.getSchema()), ossConf.getHdfsImpl()); + ossConf.setExtraOptionsForConfiguration(configuration); return configuration; } - public static List getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException { + public static List getFileNamesByPath(OssConf hadoopConf, String path) throws IOException { Configuration configuration = getConfiguration(hadoopConf); - fs = FileSystem.get(configuration); + FileSystem fs = FileSystem.get(configuration); ArrayList fileNames = new ArrayList<>(); Path listFiles = new Path(path); FileStatus[] stats = fs.listStatus(listFiles); @@ -51,7 +51,7 @@ public static List getFileNamesByPath(HadoopConf hadoopConf, String path continue; } if (fileStatus.isFile()) { - if (!fileStatus.getPath().getName().equals("_SUCCESS")) { + if (!fileStatus.getPath().getName().equals(OssConstants.OSS_SOURCE_IGNORE_FILENAME)) { String filePath = fileStatus.getPath().toString(); fileNames.add(filePath); } diff --git a/bitsail-connectors/connector-oss/src/main/resources/bitsail-connector-unified-oss.json b/bitsail-connectors/connector-oss/src/main/resources/bitsail-connector-unified-oss.json new file mode 100644 index 000000000..a8be6a9df --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/resources/bitsail-connector-unified-oss.json @@ -0,0 +1,9 @@ +{ + "name": "bitsail-connector-unified-oss", + "classes": [ + "com.bytedance.bitsail.connector.oss.source.OssSource" + ], + "libs": [ + "connector-oss-${version}.jar" + ] +} diff --git a/bitsail-connectors/connector-oss/src/main/resources/oss-type-converter.yaml b/bitsail-connectors/connector-oss/src/main/resources/oss-type-converter.yaml new file mode 100644 index 000000000..503ee65b4 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/resources/oss-type-converter.yaml @@ -0,0 +1,88 @@ +engine.type.to.bitsail.type.converter: + - source.type: tinyint + target.type: int + + - source.type: smallint + target.type: int + + - source.type: int + target.type: int + + - source.type: long + target.type: bigint + + - source.type: bigint + target.type: bigint + + - source.type: float + target.type: float + + - source.type: double + target.type: double + + - source.type: decimal + target.type: bigdecimal + + - source.type: timestamp + target.type: timestamp + + - source.type: date + target.type: date + + - source.type: string + target.type: string + + - source.type: varchar + target.type: string + + - source.type: char + target.type: string + + - source.type: boolean + target.type: boolean + + - source.type: binary + target.type: bytes + +bitsail.type.to.engine.type.converter: + - source.type: byte + target.type: tinyint + + - source.type: short + target.type: smallint + + - source.type: int + target.type: int + + - source.type: long + target.type: bigint + + - source.type: bigint + target.type: bigint + + - source.type: double + target.type: double + + - source.type: float + target.type: float + + - source.type: bigdecimal + target.type: decimal + + - source.type: string + target.type: string + + - source.type: boolean + target.type: boolean + + - source.type: date.date + target.type: string + + - source.type: date.time + target.type: string + + - source.type: date.datetime + target.type: bigint + + - source.type: bytes + target.type: binary \ No newline at end of file