SKIP_FIRST_LINE =
+ key(READER_PREFIX + "skip_first_line")
+ .defaultValue(false);
+
+ /**
+ * JSON Options
+ *
+ * Tips:
+ * CONVERT_ERROR_COLUMN_AS_NULL is set above
+ */
+ // whether to be insensitive to upper or lower case
+ ConfigOption CASE_INSENSITIVE =
+ key(READER_PREFIX + "case_insensitive")
+ .defaultValue(false);
+}
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/OssSource.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/OssSource.java
new file mode 100644
index 000000000..a06f975ac
--- /dev/null
+++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/OssSource.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.oss.source;
+
+import com.bytedance.bitsail.base.connector.reader.v1.Boundedness;
+import com.bytedance.bitsail.base.connector.reader.v1.Source;
+import com.bytedance.bitsail.base.connector.reader.v1.SourceReader;
+import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator;
+import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState;
+import com.bytedance.bitsail.base.execution.ExecutionEnviron;
+import com.bytedance.bitsail.base.extension.ParallelismComputable;
+import com.bytedance.bitsail.base.parallelism.ParallelismAdvice;
+import com.bytedance.bitsail.base.serializer.BinarySerializer;
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.common.row.Row;
+import com.bytedance.bitsail.common.type.TypeInfoConverter;
+import com.bytedance.bitsail.common.type.filemapping.FileMappingTypeInfoConverter;
+import com.bytedance.bitsail.connector.oss.constant.OssConstants;
+import com.bytedance.bitsail.connector.oss.option.OssReaderOptions;
+import com.bytedance.bitsail.connector.oss.source.reader.OssSourceReader;
+import com.bytedance.bitsail.connector.oss.source.split.OssSourceSplit;
+import com.bytedance.bitsail.connector.oss.source.split.OssSourceSplitCoordinator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OssSource implements Source, ParallelismComputable {
+ private static final Logger LOG = LoggerFactory.getLogger(OssSource.class);
+ private BitSailConfiguration jobConf;
+
+ @Override
+ public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) {
+ this.jobConf = readerConfiguration;
+ }
+
+ @Override
+ public Boundedness getSourceBoundedness() {
+ return Boundedness.BOUNDEDNESS;
+ }
+
+ @Override
+ public SourceReader createReader(SourceReader.Context readerContext) {
+ return new OssSourceReader(jobConf, readerContext);
+ }
+
+ @Override
+ public SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator.Context coordinatorContext) {
+ return new OssSourceSplitCoordinator(coordinatorContext, jobConf);
+ }
+
+ @Override
+ public BinarySerializer getSplitSerializer() {
+ return Source.super.getSplitSerializer();
+ }
+
+ @Override
+ public BinarySerializer getSplitCoordinatorCheckpointSerializer() {
+ return Source.super.getSplitCoordinatorCheckpointSerializer();
+ }
+
+ @Override
+ public TypeInfoConverter createTypeInfoConverter() {
+ return new FileMappingTypeInfoConverter(getReaderName());
+ }
+
+ @Override
+ public String getReaderName() {
+ return OssConstants.OSS_CONNECTOR_NAME;
+ }
+
+ @Override
+ public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception {
+ int parallelism;
+ if (selfConf.fieldExists(OssReaderOptions.READER_PARALLELISM_NUM)) {
+ parallelism = selfConf.get(OssReaderOptions.READER_PARALLELISM_NUM);
+ } else {
+ parallelism = 1;
+ }
+ return ParallelismAdvice.builder()
+ .adviceParallelism(parallelism)
+ .enforceDownStreamChain(false)
+ .build();
+
+ }
+}
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/DeserializationSchemaFactory.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/DeserializationSchemaFactory.java
new file mode 100644
index 000000000..7a0748747
--- /dev/null
+++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/DeserializationSchemaFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.oss.source.reader;
+
+import com.bytedance.bitsail.base.connector.reader.v1.SourceReader;
+import com.bytedance.bitsail.base.format.DeserializationSchema;
+import com.bytedance.bitsail.common.BitSailException;
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.common.row.Row;
+import com.bytedance.bitsail.component.format.csv.CsvDeserializationSchema;
+import com.bytedance.bitsail.component.format.json.JsonRowDeserializationSchema;
+import com.bytedance.bitsail.connector.oss.config.OssConfig;
+import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode;
+
+public class DeserializationSchemaFactory {
+ public static DeserializationSchema createDeserializationSchema(BitSailConfiguration jobConf, SourceReader.Context context,
+ OssConfig ossConfig) {
+ if (ossConfig.getContentType() == OssConfig.ContentType.CSV) {
+ return new CsvDeserializationSchema(
+ jobConf,
+ context.getRowTypeInfo());
+ } else if (ossConfig.getContentType() == OssConfig.ContentType.JSON) {
+ return new JsonRowDeserializationSchema(
+ jobConf,
+ context.getRowTypeInfo());
+ } else {
+ throw BitSailException.asBitSailException(OssConnectorErrorCode.UNSUPPORTED_TYPE,
+ "Content type only supports CSV and JSON");
+ }
+ }
+}
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java
new file mode 100644
index 000000000..129cbb176
--- /dev/null
+++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.oss.source.reader;
+
+import com.bytedance.bitsail.base.connector.reader.v1.SourcePipeline;
+import com.bytedance.bitsail.base.connector.reader.v1.SourceReader;
+import com.bytedance.bitsail.base.format.DeserializationSchema;
+import com.bytedance.bitsail.common.BitSailException;
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.common.row.Row;
+import com.bytedance.bitsail.connector.oss.config.HadoopConf;
+import com.bytedance.bitsail.connector.oss.config.OssConf;
+import com.bytedance.bitsail.connector.oss.config.OssConfig;
+import com.bytedance.bitsail.connector.oss.constant.OssConstants;
+import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode;
+import com.bytedance.bitsail.connector.oss.source.split.OssSourceSplit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+public class OssSourceReader implements SourceReader {
+ private static final Logger LOG = LoggerFactory.getLogger(OssSourceReader.class);
+ protected HadoopConf hadoopConf;
+ private final transient DeserializationSchema deserializationSchema;
+ private final OssConfig ossConfig;
+ private final transient Context context;
+ private long currentReadCount = 0;
+ private final Deque splits;
+ private boolean skipFirstLine = false;
+ private boolean hasNoMoreSplits = false;
+ private int totalSplitNum = 0;
+ private int skipFirstLineNums = 0;
+ private OssSourceSplit currentSplit;
+ FileSystem fs;
+
+ public OssSourceReader(BitSailConfiguration jobConf, Context context) {
+ this.ossConfig = new OssConfig(jobConf);
+ this.context = context;
+ this.deserializationSchema = DeserializationSchemaFactory.createDeserializationSchema(jobConf, context, ossConfig);
+ this.splits = new LinkedList<>();
+ this.hadoopConf = OssConf.buildWithConfig(jobConf);
+ LOG.info("OssSourceReader is initialized.");
+ }
+
+ @Override
+ public void start() {
+ if (this.ossConfig.getSkipFirstLine()) {
+ this.skipFirstLine = true;
+ this.skipFirstLineNums = 1;
+ }
+ }
+
+ @Override
+ public void pollNext(SourcePipeline pipeline) throws Exception {
+ if (currentSplit == null && splits.isEmpty()) {
+ LOG.info("pollnext no splits");
+ Thread.sleep(OssConstants.OSS_SOURCE_SLEEP_MILL_SECS);
+ return;
+ }
+ LOG.info("pollnext split size {}", this.splits.size());
+ Configuration conf = getConfiguration();
+ fs = FileSystem.get(conf);
+ this.currentSplit = this.splits.poll();
+ LOG.info("split {} path {}", currentSplit, currentSplit.getPath());
+ Path filePath = new Path(currentSplit.getPath());
+ try (BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
+ reader.lines()
+ .skip(skipFirstLineNums)
+ .forEach(
+ line -> {
+ try {
+ if (line != null) {
+ Row row = deserializationSchema.deserialize(line.getBytes());
+ pipeline.output(row);
+ this.currentReadCount++;
+ }
+ } catch (IOException e) {
+ String errorMsg =
+ String.format(
+ "Read data from this file [%s] failed",
+ filePath);
+ throw BitSailException.asBitSailException(
+ OssConnectorErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
+ }
+ });
+ }
+ }
+
+ public List getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException {
+ LOG.info("start getFileNamesByPath path: {}", path);
+ Configuration configuration = getConfiguration(hadoopConf);
+ FileSystem hdfs = FileSystem.get(configuration);
+ ArrayList fileNames = new ArrayList<>();
+ Path listFiles = new Path(path);
+ FileStatus[] stats = hdfs.listStatus(listFiles);
+ for (FileStatus fileStatus : stats) {
+ if (fileStatus.isDirectory()) {
+ LOG.info("getFileNamesByPath dir: {}", fileStatus.getPath());
+ fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString()));
+ continue;
+ }
+ if (fileStatus.isFile()) {
+ if (!fileStatus.getPath().getName().equals("_SUCCESS")) {
+ String filePath = fileStatus.getPath().toString();
+ fileNames.add(filePath);
+ }
+ }
+ }
+ return fileNames;
+ }
+
+ Configuration getConfiguration() {
+ return getConfiguration(this.hadoopConf);
+ }
+
+ public Configuration getConfiguration(HadoopConf hadoopConf) {
+ Configuration configuration = new Configuration();
+ configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
+ configuration.set(
+ String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getHdfsImpl());
+ hadoopConf.setExtraOptionsForConfiguration(configuration);
+ return configuration;
+ }
+
+ @Override
+ public void addSplits(List splitList) {
+ totalSplitNum += splitList.size();
+ this.splits.addAll(splitList);
+ }
+
+ @Override
+ public boolean hasMoreElements() {
+ if (hasNoMoreSplits && splits.isEmpty()) {
+ LOG.info("Finish reading all {} splits.", totalSplitNum);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ this.hasNoMoreSplits = true;
+ LOG.info("No more splits will be assigned.");
+ }
+
+ @Override
+ public List snapshotState(long checkpointId) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ SourceReader.super.notifyCheckpointComplete(checkpointId);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (fs != null) {
+ fs.close();
+ }
+ }
+}
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplit.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplit.java
new file mode 100644
index 000000000..c106e894e
--- /dev/null
+++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplit.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.oss.source.split;
+
+import com.bytedance.bitsail.base.connector.reader.v1.SourceSplit;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+public class OssSourceSplit implements SourceSplit {
+ public static final String OSS_SOURCE_SPLIT_PREFIX = "Oss_source_split_";
+ private final String splitId;
+
+ public OssSourceSplit(String splitId) {
+ this.splitId = OSS_SOURCE_SPLIT_PREFIX + splitId;
+ this.path = splitId;
+ }
+
+ @Setter
+ private String path;
+
+ @Override
+ public String uniqSplitId() {
+ return splitId;
+ }
+
+}
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java
new file mode 100644
index 000000000..9371681f9
--- /dev/null
+++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.oss.source.split;
+
+import com.bytedance.bitsail.base.connector.reader.v1.SourceEvent;
+import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator;
+import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState;
+import com.bytedance.bitsail.common.BitSailException;
+import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
+import com.bytedance.bitsail.connector.oss.config.HadoopConf;
+import com.bytedance.bitsail.connector.oss.config.OssConf;
+import com.bytedance.bitsail.connector.oss.config.OssConfig;
+import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode;
+import com.bytedance.bitsail.connector.oss.util.OssUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class OssSourceSplitCoordinator implements SourceSplitCoordinator {
+ private static final Logger LOG = LoggerFactory.getLogger(OssSourceSplitCoordinator.class);
+ private final SourceSplitCoordinator.Context context;
+ private final BitSailConfiguration jobConf;
+ private OssConfig ossConfig;
+ private final Map> splitAssignmentPlan;
+
+ public OssSourceSplitCoordinator(SourceSplitCoordinator.Context context, BitSailConfiguration jobConf) {
+ this.context = context;
+ this.jobConf = jobConf;
+ this.ossConfig = new OssConfig(jobConf);
+ this.splitAssignmentPlan = Maps.newConcurrentMap();
+ }
+
+ private List constructSplit() throws IOException {
+ HadoopConf conf = OssConf.buildWithConfig(this.jobConf);
+ String path = ossConfig.getFilePath();
+ List fileList = OssUtil.getFileNamesByPath(conf, path);
+ LOG.info("OssSourceSplitCoordinator fileList: {}", fileList);
+ List fileSourceSplits = new ArrayList<>();
+ fileList.forEach(file -> fileSourceSplits.add(new OssSourceSplit(file)));
+ return fileSourceSplits;
+ }
+
+ @Override
+ public void start() {
+ List splits = new ArrayList<>();
+ try {
+ splits = constructSplit();
+ } catch (IOException e) {
+ throw new BitSailException(OssConnectorErrorCode.SPLIT_ERROR, "Failed to create splits.");
+ }
+ int readerNum = context.totalParallelism();
+ LOG.info("Found {} readers and {} splits.", readerNum, splits.size());
+ if (readerNum > splits.size()) {
+ LOG.error("Reader number {} is larger than split number {}.", readerNum, splits.size());
+ }
+ for (OssSourceSplit split : splits) {
+ int readerIndex = ReaderSelector.getReaderIndex(readerNum);
+ splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);
+ LOG.info("Will assign split {} to the {}-th reader", split.uniqSplitId(), readerIndex);
+ }
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ LOG.info("Found reader {}", subtaskId);
+ tryAssignSplitsToReader();
+ }
+
+ private void tryAssignSplitsToReader() {
+ Map> splitsToAssign = new HashMap<>();
+ for (Integer readerIndex : splitAssignmentPlan.keySet()) {
+ if (CollectionUtils.isNotEmpty(splitAssignmentPlan.get(readerIndex)) && context.registeredReaders().contains(readerIndex)) {
+ splitsToAssign.put(readerIndex, Lists.newArrayList(splitAssignmentPlan.get(readerIndex)));
+ }
+ }
+ for (Integer readerIndex : splitsToAssign.keySet()) {
+ LOG.info("Try assigning splits reader {}, splits are: [{}]", readerIndex,
+ splitsToAssign.get(readerIndex).stream().map(OssSourceSplit::uniqSplitId).collect(Collectors.toList()));
+ splitAssignmentPlan.remove(readerIndex);
+ context.assignSplit(readerIndex, splitsToAssign.get(readerIndex));
+ context.signalNoMoreSplits(readerIndex);
+ LOG.info("Finish assigning splits reader {}", readerIndex);
+ }
+ }
+
+ @Override
+ public void addSplitsBack(List splits, int subtaskId) {
+ LOG.info("Source reader {} return splits {}.", subtaskId, splits);
+ int readerNum = context.totalParallelism();
+ for (OssSourceSplit split : splits) {
+ int readerIndex = ReaderSelector.getReaderIndex(readerNum);
+ splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);
+ LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex);
+ }
+ tryAssignSplitsToReader();
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
+
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ SourceSplitCoordinator.super.handleSourceEvent(subtaskId, sourceEvent);
+ }
+
+ @Override
+ public EmptyState snapshotState(long checkpoint) throws Exception {
+ return new EmptyState();
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ SourceSplitCoordinator.super.notifyCheckpointComplete(checkpointId);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @NoArgsConstructor
+ static class ReaderSelector {
+ private static long readerIndex = 0;
+
+ public static int getReaderIndex(int totalReaderNum) {
+ return (int) readerIndex++ % totalReaderNum;
+ }
+ }
+}
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java
new file mode 100644
index 000000000..1b96b4a5c
--- /dev/null
+++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.bytedance.bitsail.connector.oss.util;
+
+import com.bytedance.bitsail.connector.oss.config.HadoopConf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class OssUtil {
+ static FileSystem fs;
+ public static Configuration getConfiguration(HadoopConf hadoopConf) {
+ Configuration configuration = new Configuration();
+ configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
+ configuration.set(
+ String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getHdfsImpl());
+ hadoopConf.setExtraOptionsForConfiguration(configuration);
+ return configuration;
+ }
+
+ public static List getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException {
+ Configuration configuration = getConfiguration(hadoopConf);
+ fs = FileSystem.get(configuration);
+ ArrayList fileNames = new ArrayList<>();
+ Path listFiles = new Path(path);
+ FileStatus[] stats = fs.listStatus(listFiles);
+ for (FileStatus fileStatus : stats) {
+ if (fileStatus.isDirectory()) {
+ fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString()));
+ continue;
+ }
+ if (fileStatus.isFile()) {
+ if (!fileStatus.getPath().getName().equals("_SUCCESS")) {
+ String filePath = fileStatus.getPath().toString();
+ fileNames.add(filePath);
+ }
+ }
+ }
+ return fileNames;
+ }
+}
diff --git a/bitsail-connectors/pom.xml b/bitsail-connectors/pom.xml
index 7f423d273..445acc1dd 100644
--- a/bitsail-connectors/pom.xml
+++ b/bitsail-connectors/pom.xml
@@ -52,6 +52,7 @@
connector-cdc
connector-kafka
connector-mongodb
+ connector-oss
diff --git a/bitsail-dist/pom.xml b/bitsail-dist/pom.xml
index 276586b39..d8f6c415f 100644
--- a/bitsail-dist/pom.xml
+++ b/bitsail-dist/pom.xml
@@ -273,6 +273,13 @@
${revision}
provided
+
+
+ com.bytedance.bitsail
+ connector-oss
+ ${revision}
+ provided
+
From 7e6069dd311e85463360c4d6eb996f5511b20d5e Mon Sep 17 00:00:00 2001
From: CodingGPT <3173405212@qq.com>
Date: Sun, 16 Apr 2023 10:34:01 +0800
Subject: [PATCH 2/2] add resource files and delete unnessary code
---
bitsail-connectors/connector-oss/pom.xml | 29 +-----
.../connector/oss/config/HadoopConf.java | 58 ------------
.../bitsail/connector/oss/config/OssConf.java | 36 ++++++--
.../connector/oss/constant/OssConstants.java | 1 +
.../oss/source/reader/OssSourceReader.java | 44 ++--------
.../split/OssSourceSplitCoordinator.java | 7 +-
.../bitsail/connector/oss/util/OssUtil.java | 18 ++--
.../bitsail-connector-unified-oss.json | 9 ++
.../main/resources/oss-type-converter.yaml | 88 +++++++++++++++++++
9 files changed, 148 insertions(+), 142 deletions(-)
delete mode 100644 bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java
create mode 100644 bitsail-connectors/connector-oss/src/main/resources/bitsail-connector-unified-oss.json
create mode 100644 bitsail-connectors/connector-oss/src/main/resources/oss-type-converter.yaml
diff --git a/bitsail-connectors/connector-oss/pom.xml b/bitsail-connectors/connector-oss/pom.xml
index 1c9ef16d5..211a54169 100644
--- a/bitsail-connectors/connector-oss/pom.xml
+++ b/bitsail-connectors/connector-oss/pom.xml
@@ -27,46 +27,23 @@
connector-oss
- 2.9.2
+ 3.1.1
8
8
-
- hadoop-common
-
-
- log4j
- log4j
-
-
- slf4j-log4j12
- org.slf4j
-
-
- org.apache.hadoop
- 2.9.2
-
org.apache.hadoop
hadoop-aliyun
${hadoop-aliyun.version}
- commons-configuration
- commons-configuration
-
-
- slf4j-log4j12
org.slf4j
+ slf4j-log4j12
- log4j
log4j
-
-
- hadoop-common
- org.apache.hadoop
+ log4j
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java
deleted file mode 100644
index 33b948295..000000000
--- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/HadoopConf.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.bytedance.bitsail.connector.oss.config;
-
-import lombok.Data;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-@Data
-public class HadoopConf implements Serializable {
- private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
- private static final String SCHEMA = "hdfs";
- protected Map extraOptions = new HashMap<>();
- protected String hdfsNameKey;
- protected String hdfsSitePath;
- protected String kerberosPrincipal;
- protected String kerberosKeytabPath;
-
- public HadoopConf(String hdfsNameKey) {
- this.hdfsNameKey = hdfsNameKey;
- }
-
- public String getHdfsImpl() {
- return HDFS_IMPL;
- }
-
- public String getSchema() {
- return SCHEMA;
- }
-
- public void setExtraOptionsForConfiguration(Configuration configuration) {
- if (!extraOptions.isEmpty()) {
- extraOptions.forEach(configuration::set);
- }
- if (hdfsSitePath != null) {
- configuration.addResource(new Path(hdfsSitePath));
- }
- }
-}
-
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java
index ee619c272..38357258d 100644
--- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java
+++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java
@@ -19,36 +19,54 @@
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.connector.oss.option.OssReaderOptions;
+import lombok.Data;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.Constants;
+import java.io.Serializable;
import java.util.HashMap;
+import java.util.Map;
-public class OssConf extends HadoopConf {
+@Data
+public class OssConf implements Serializable {
private static final String HDFS_IMPL = "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem";
private static final String SCHEMA = "oss";
+ protected Map extraOptions = new HashMap<>();
+ protected String hdfsNameKey;
+ protected String hdfsSitePath;
+ protected String kerberosPrincipal;
+ protected String kerberosKeytabPath;
- @Override
public String getHdfsImpl() {
return HDFS_IMPL;
}
- @Override
public String getSchema() {
return SCHEMA;
}
public OssConf(String hdfsNameKey) {
- super(hdfsNameKey);
+ this.hdfsNameKey = hdfsNameKey;
}
- public static HadoopConf buildWithConfig(BitSailConfiguration config) {
- HadoopConf hadoopConf = new OssConf(config.getString(OssReaderOptions.BUCKET.key()));
+ public static OssConf buildWithConfig(BitSailConfiguration config) {
+ OssConf hadoopConf = new OssConf(config.get(OssReaderOptions.BUCKET));
HashMap ossOptions = new HashMap<>();
- ossOptions.put(Constants.ACCESS_KEY_ID, config.getString(OssReaderOptions.ACCESS_KEY.key()));
+ ossOptions.put(Constants.ACCESS_KEY_ID, config.get(OssReaderOptions.ACCESS_KEY));
ossOptions.put(
- Constants.ACCESS_KEY_SECRET, config.getString(OssReaderOptions.ACCESS_SECRET.key()));
- ossOptions.put(Constants.ENDPOINT_KEY, config.getString(OssReaderOptions.ENDPOINT.key()));
+ Constants.ACCESS_KEY_SECRET, config.get(OssReaderOptions.ACCESS_SECRET));
+ ossOptions.put(Constants.ENDPOINT_KEY, config.get(OssReaderOptions.ENDPOINT));
hadoopConf.setExtraOptions(ossOptions);
return hadoopConf;
}
+
+ public void setExtraOptionsForConfiguration(Configuration configuration) {
+ if (!extraOptions.isEmpty()) {
+ extraOptions.forEach(configuration::set);
+ }
+ if (hdfsSitePath != null) {
+ configuration.addResource(new Path(hdfsSitePath));
+ }
+ }
}
\ No newline at end of file
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java
index 34f941693..5bc2e6e10 100644
--- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java
+++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java
@@ -19,4 +19,5 @@
public class OssConstants {
public static String OSS_CONNECTOR_NAME = "oss";
public static final long OSS_SOURCE_SLEEP_MILL_SECS = 1000L;
+ public static final String OSS_SOURCE_IGNORE_FILENAME = "_SUCCESS";
}
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java
index 129cbb176..a380a3eac 100644
--- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java
+++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java
@@ -22,7 +22,6 @@
import com.bytedance.bitsail.common.BitSailException;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.row.Row;
-import com.bytedance.bitsail.connector.oss.config.HadoopConf;
import com.bytedance.bitsail.connector.oss.config.OssConf;
import com.bytedance.bitsail.connector.oss.config.OssConfig;
import com.bytedance.bitsail.connector.oss.constant.OssConstants;
@@ -31,7 +30,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -41,7 +39,6 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
@@ -49,11 +46,10 @@
public class OssSourceReader implements SourceReader {
private static final Logger LOG = LoggerFactory.getLogger(OssSourceReader.class);
- protected HadoopConf hadoopConf;
+ protected OssConf ossConf;
private final transient DeserializationSchema deserializationSchema;
private final OssConfig ossConfig;
private final transient Context context;
- private long currentReadCount = 0;
private final Deque splits;
private boolean skipFirstLine = false;
private boolean hasNoMoreSplits = false;
@@ -67,14 +63,14 @@ public OssSourceReader(BitSailConfiguration jobConf, Context context) {
this.context = context;
this.deserializationSchema = DeserializationSchemaFactory.createDeserializationSchema(jobConf, context, ossConfig);
this.splits = new LinkedList<>();
- this.hadoopConf = OssConf.buildWithConfig(jobConf);
+ this.ossConf = OssConf.buildWithConfig(jobConf);
LOG.info("OssSourceReader is initialized.");
}
@Override
public void start() {
if (this.ossConfig.getSkipFirstLine()) {
- this.skipFirstLine = true;
+ skipFirstLine = true;
this.skipFirstLineNums = 1;
}
}
@@ -103,7 +99,6 @@ public void pollNext(SourcePipeline pipeline) throws Exception {
if (line != null) {
Row row = deserializationSchema.deserialize(line.getBytes());
pipeline.output(row);
- this.currentReadCount++;
}
} catch (IOException e) {
String errorMsg =
@@ -117,39 +112,16 @@ public void pollNext(SourcePipeline pipeline) throws Exception {
}
}
- public List getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException {
- LOG.info("start getFileNamesByPath path: {}", path);
- Configuration configuration = getConfiguration(hadoopConf);
- FileSystem hdfs = FileSystem.get(configuration);
- ArrayList fileNames = new ArrayList<>();
- Path listFiles = new Path(path);
- FileStatus[] stats = hdfs.listStatus(listFiles);
- for (FileStatus fileStatus : stats) {
- if (fileStatus.isDirectory()) {
- LOG.info("getFileNamesByPath dir: {}", fileStatus.getPath());
- fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString()));
- continue;
- }
- if (fileStatus.isFile()) {
- if (!fileStatus.getPath().getName().equals("_SUCCESS")) {
- String filePath = fileStatus.getPath().toString();
- fileNames.add(filePath);
- }
- }
- }
- return fileNames;
- }
-
Configuration getConfiguration() {
- return getConfiguration(this.hadoopConf);
+ return getConfiguration(this.ossConf);
}
- public Configuration getConfiguration(HadoopConf hadoopConf) {
+ public Configuration getConfiguration(OssConf ossConf) {
Configuration configuration = new Configuration();
- configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
+ configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, ossConf.getHdfsNameKey());
configuration.set(
- String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getHdfsImpl());
- hadoopConf.setExtraOptionsForConfiguration(configuration);
+ String.format("fs.%s.impl", ossConf.getSchema()), ossConf.getHdfsImpl());
+ ossConf.setExtraOptionsForConfiguration(configuration);
return configuration;
}
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java
index 9371681f9..b5c8a7690 100644
--- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java
+++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java
@@ -21,7 +21,6 @@
import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState;
import com.bytedance.bitsail.common.BitSailException;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
-import com.bytedance.bitsail.connector.oss.config.HadoopConf;
import com.bytedance.bitsail.connector.oss.config.OssConf;
import com.bytedance.bitsail.connector.oss.config.OssConfig;
import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode;
@@ -60,7 +59,7 @@ public OssSourceSplitCoordinator(SourceSplitCoordinator.Context constructSplit() throws IOException {
- HadoopConf conf = OssConf.buildWithConfig(this.jobConf);
+ OssConf conf = OssConf.buildWithConfig(this.jobConf);
String path = ossConfig.getFilePath();
List fileList = OssUtil.getFileNamesByPath(conf, path);
LOG.info("OssSourceSplitCoordinator fileList: {}", fileList);
@@ -71,7 +70,7 @@ private List constructSplit() throws IOException {
@Override
public void start() {
- List splits = new ArrayList<>();
+ List splits;
try {
splits = constructSplit();
} catch (IOException e) {
@@ -80,7 +79,7 @@ public void start() {
int readerNum = context.totalParallelism();
LOG.info("Found {} readers and {} splits.", readerNum, splits.size());
if (readerNum > splits.size()) {
- LOG.error("Reader number {} is larger than split number {}.", readerNum, splits.size());
+ LOG.warn("Reader number {} is larger than split number {}.", readerNum, splits.size());
}
for (OssSourceSplit split : splits) {
int readerIndex = ReaderSelector.getReaderIndex(readerNum);
diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java
index 1b96b4a5c..d8c78b742 100644
--- a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java
+++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java
@@ -16,7 +16,8 @@
package com.bytedance.bitsail.connector.oss.util;
-import com.bytedance.bitsail.connector.oss.config.HadoopConf;
+import com.bytedance.bitsail.connector.oss.config.OssConf;
+import com.bytedance.bitsail.connector.oss.constant.OssConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -29,19 +30,18 @@
import java.util.List;
public class OssUtil {
- static FileSystem fs;
- public static Configuration getConfiguration(HadoopConf hadoopConf) {
+ public static Configuration getConfiguration(OssConf ossConf) {
Configuration configuration = new Configuration();
- configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
+ configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, ossConf.getHdfsNameKey());
configuration.set(
- String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getHdfsImpl());
- hadoopConf.setExtraOptionsForConfiguration(configuration);
+ String.format("fs.%s.impl", ossConf.getSchema()), ossConf.getHdfsImpl());
+ ossConf.setExtraOptionsForConfiguration(configuration);
return configuration;
}
- public static List getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException {
+ public static List getFileNamesByPath(OssConf hadoopConf, String path) throws IOException {
Configuration configuration = getConfiguration(hadoopConf);
- fs = FileSystem.get(configuration);
+ FileSystem fs = FileSystem.get(configuration);
ArrayList fileNames = new ArrayList<>();
Path listFiles = new Path(path);
FileStatus[] stats = fs.listStatus(listFiles);
@@ -51,7 +51,7 @@ public static List getFileNamesByPath(HadoopConf hadoopConf, String path
continue;
}
if (fileStatus.isFile()) {
- if (!fileStatus.getPath().getName().equals("_SUCCESS")) {
+ if (!fileStatus.getPath().getName().equals(OssConstants.OSS_SOURCE_IGNORE_FILENAME)) {
String filePath = fileStatus.getPath().toString();
fileNames.add(filePath);
}
diff --git a/bitsail-connectors/connector-oss/src/main/resources/bitsail-connector-unified-oss.json b/bitsail-connectors/connector-oss/src/main/resources/bitsail-connector-unified-oss.json
new file mode 100644
index 000000000..a8be6a9df
--- /dev/null
+++ b/bitsail-connectors/connector-oss/src/main/resources/bitsail-connector-unified-oss.json
@@ -0,0 +1,9 @@
+{
+ "name": "bitsail-connector-unified-oss",
+ "classes": [
+ "com.bytedance.bitsail.connector.oss.source.OssSource"
+ ],
+ "libs": [
+ "connector-oss-${version}.jar"
+ ]
+}
diff --git a/bitsail-connectors/connector-oss/src/main/resources/oss-type-converter.yaml b/bitsail-connectors/connector-oss/src/main/resources/oss-type-converter.yaml
new file mode 100644
index 000000000..503ee65b4
--- /dev/null
+++ b/bitsail-connectors/connector-oss/src/main/resources/oss-type-converter.yaml
@@ -0,0 +1,88 @@
+engine.type.to.bitsail.type.converter:
+ - source.type: tinyint
+ target.type: int
+
+ - source.type: smallint
+ target.type: int
+
+ - source.type: int
+ target.type: int
+
+ - source.type: long
+ target.type: bigint
+
+ - source.type: bigint
+ target.type: bigint
+
+ - source.type: float
+ target.type: float
+
+ - source.type: double
+ target.type: double
+
+ - source.type: decimal
+ target.type: bigdecimal
+
+ - source.type: timestamp
+ target.type: timestamp
+
+ - source.type: date
+ target.type: date
+
+ - source.type: string
+ target.type: string
+
+ - source.type: varchar
+ target.type: string
+
+ - source.type: char
+ target.type: string
+
+ - source.type: boolean
+ target.type: boolean
+
+ - source.type: binary
+ target.type: bytes
+
+bitsail.type.to.engine.type.converter:
+ - source.type: byte
+ target.type: tinyint
+
+ - source.type: short
+ target.type: smallint
+
+ - source.type: int
+ target.type: int
+
+ - source.type: long
+ target.type: bigint
+
+ - source.type: bigint
+ target.type: bigint
+
+ - source.type: double
+ target.type: double
+
+ - source.type: float
+ target.type: float
+
+ - source.type: bigdecimal
+ target.type: decimal
+
+ - source.type: string
+ target.type: string
+
+ - source.type: boolean
+ target.type: boolean
+
+ - source.type: date.date
+ target.type: string
+
+ - source.type: date.time
+ target.type: string
+
+ - source.type: date.datetime
+ target.type: bigint
+
+ - source.type: bytes
+ target.type: binary
\ No newline at end of file