Skip to content

Commit

Permalink
[Feature][Spark] Support SeaTunnel Time Type.
Browse files Browse the repository at this point in the history
  • Loading branch information
CheneyYin committed Aug 18, 2023
1 parent 51c0d70 commit b51384f
Show file tree
Hide file tree
Showing 17 changed files with 163 additions and 64 deletions.
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
- [Core] [API] Add copy method to Catalog codes (#4414)
- [Core] [API] Add options check before create source and sink and transform in FactoryUtil (#4424)
- [Core] [Shade] Add guava shade module (#4358)
- [Core] [Spark] Support SeaTunnel Time Type (#5188)

### Connector-V2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;

import com.google.common.collect.Lists;

Expand Down Expand Up @@ -92,6 +93,12 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
Dataset<Row> dataset =
fromSourceTable(sinkConfig, sparkRuntimeEnvironment).orElse(input);
StructType inputSchema =
sourceSchema(sinkConfig, sparkRuntimeEnvironment)
.orElseGet(
() -> {
return dataset.schema();
});
int parallelism;
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
Expand All @@ -105,8 +112,7 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
}
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism);
// TODO modify checkpoint location
seaTunnelSink.setTypeInfo(
(SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(inputSchema));
if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.StructType;

import com.google.common.collect.Lists;

Expand Down Expand Up @@ -93,6 +94,12 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
Dataset<Row> dataset =
fromSourceTable(sinkConfig, sparkRuntimeEnvironment).orElse(input);
StructType inputSchema =
sourceSchema(sinkConfig, sparkRuntimeEnvironment)
.orElseGet(
() -> {
return dataset.schema();
});
int parallelism;
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
Expand All @@ -106,8 +113,7 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
}
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism);
// TODO modify checkpoint location
seaTunnelSink.setTypeInfo(
(SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(inputSchema));
if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {
CommonOptions.PARALLELISM.key(),
CommonOptions.PARALLELISM.defaultValue());
}
StructType schema = (StructType) TypeConverterUtils.convert(source.getProducedType());
Dataset<Row> dataset =
sparkRuntimeEnvironment
.getSparkSession()
Expand All @@ -77,12 +78,13 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {
.option(
Constants.SOURCE_SERIALIZATION,
SerializationUtils.objectToString(source))
.schema(
(StructType)
TypeConverterUtils.convert(source.getProducedType()))
.schema(schema)
.load();
sources.add(dataset);
registerInputTempView(pluginConfigs.get(i), dataset);

Config config = pluginConfigs.get(i);
stageSchema(config, schema);
registerInputTempView(config, dataset);
}
return sources;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;

import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -72,6 +73,26 @@ protected Optional<Dataset<Row>> fromSourceTable(
return Optional.of(sparkRuntimeEnvironment.getSparkSession().read().table(sourceTableName));
}

protected Optional<StructType> sourceSchema(
Config pluginConfig, SparkRuntimeEnvironment sparkRuntimeEnvironment) {
if (!pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
return Optional.empty();
}
String sourceTableName = pluginConfig.getString(SOURCE_TABLE_NAME);
return this.sparkRuntimeEnvironment.schema(sourceTableName);
}

protected void stageSchema(Config pluginConfig, StructType schema) {
if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
String tableName = pluginConfig.getString(RESULT_TABLE_NAME);
stageSchema(tableName, schema);
}
}

protected void stageSchema(String tableName, StructType schema) {
this.sparkRuntimeEnvironment.stageSchema(tableName, schema);
}

private void registerTempView(String tableName, Dataset<Row> ds) {
ds.createOrReplaceTempView(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@

import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;

import lombok.extern.slf4j.Slf4j;

import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

@Slf4j
public class SparkRuntimeEnvironment implements RuntimeEnvironment {
Expand All @@ -55,9 +59,12 @@ public class SparkRuntimeEnvironment implements RuntimeEnvironment {

private String jobName = Constants.LOGO;

private Map<String, StructType> schemas;

private SparkRuntimeEnvironment(Config config) {
this.setEnableHive(checkIsContainHive(config));
this.initialize(config);
this.schemas = new HashMap<>();
}

public void setEnableHive(boolean enableHive) {
Expand Down Expand Up @@ -176,4 +183,13 @@ public static SparkRuntimeEnvironment getInstance(Config config) {
}
return INSTANCE;
}

public Optional<StructType> schema(String tblName) {
StructType schema = this.schemas.get(tblName);
return schema == null ? Optional.empty() : Optional.of(schema);
}

public void stageSchema(String tblName, StructType schema) {
this.schemas.put(tblName, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
Config pluginConfig = pluginConfigs.get(i);
Dataset<Row> stream =
fromSourceTable(pluginConfig, sparkRuntimeEnvironment).orElse(input);
input = sparkTransform(transform, stream);
input = sparkTransform(pluginConfig, transform, stream);
registerInputTempView(pluginConfig, input);
result.add(input);
} catch (Exception e) {
Expand All @@ -119,33 +119,45 @@ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
return result;
}

private Dataset<Row> sparkTransform(SeaTunnelTransform transform, Dataset<Row> stream)
private Dataset<Row> sparkTransform(
Config pluginConfig, SeaTunnelTransform transform, Dataset<Row> stream)
throws IOException {
SeaTunnelDataType<?> seaTunnelDataType = TypeConverterUtils.convert(stream.schema());
StructType inputSchema =
sourceSchema(pluginConfig, sparkRuntimeEnvironment)
.orElseGet(
() -> {
return stream.schema();
});

SeaTunnelDataType<?> seaTunnelDataType = TypeConverterUtils.convert(inputSchema);
transform.setTypeInfo(seaTunnelDataType);
StructType structType =
StructType outputSchema =
(StructType) TypeConverterUtils.convert(transform.getProducedType());
SeaTunnelRowConverter inputRowConverter = new SeaTunnelRowConverter(seaTunnelDataType);
SeaTunnelRowConverter outputRowConverter =
new SeaTunnelRowConverter(transform.getProducedType());
ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
return stream.mapPartitions(
(MapPartitionsFunction<Row, Row>)
(Iterator<Row> rowIterator) -> {
TransformIterator iterator =
new TransformIterator(
rowIterator,
transform,
structType,
inputRowConverter,
outputRowConverter);
return iterator;
},
encoder)
.filter(
(Row row) -> {
return row != null;
});
ExpressionEncoder<Row> encoder = RowEncoder.apply(outputSchema);
Dataset<Row> result =
stream.mapPartitions(
(MapPartitionsFunction<Row, Row>)
(Iterator<Row> rowIterator) -> {
TransformIterator iterator =
new TransformIterator(
rowIterator,
transform,
outputSchema,
inputRowConverter,
outputRowConverter);
return iterator;
},
encoder)
.filter(
(Row row) -> {
return row != null;
});

stageSchema(pluginConfig, outputSchema);
return result;
}

private static class TransformIterator implements Iterator<Row>, Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ source {
fields {
name = "string"
age = "int"
c_time = "time"
}
}
}
Expand All @@ -46,7 +47,7 @@ transform {
Filter {
source_table_name = "fake"
result_table_name = "fake1"
fields = ["name", "age"]
fields = ["name", "age", "c_time"]
}
}

Expand Down Expand Up @@ -97,10 +98,17 @@ sink {
rule_value = 2147483647
}
]
}, {
field_name = c_time
field_type = time
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}

]
}
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -95,6 +97,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
+ " `c_longtext` longtext,\n"
+ " `c_date` date DEFAULT NULL,\n"
+ " `c_datetime` datetime DEFAULT NULL,\n"
+ " `c_time` time DEFAULT NULL,\n"
+ " `c_timestamp` timestamp NULL DEFAULT NULL,\n"
+ " `c_tinyblob` tinyblob,\n"
+ " `c_mediumblob` mediumblob,\n"
Expand Down Expand Up @@ -187,6 +190,7 @@ Pair<String[], List<SeaTunnelRow>> initTestData() {
"c_longtext",
"c_date",
"c_datetime",
"c_time",
"c_timestamp",
"c_tinyblob",
"c_mediumblob",
Expand Down Expand Up @@ -244,6 +248,7 @@ Pair<String[], List<SeaTunnelRow>> initTestData() {
String.format("f1_%s", i),
Date.valueOf(LocalDate.now()),
Timestamp.valueOf(LocalDateTime.now()),
Time.valueOf(LocalTime.now()),
new Timestamp(System.currentTimeMillis()),
"test".getBytes(),
"test".getBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""

# Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true"
is_exactly_once = "true"
Expand All @@ -57,4 +57,4 @@ sink {
max_commit_attempts = 3
transaction_timeout_sec = 86400
}
}
}
Loading

0 comments on commit b51384f

Please sign in to comment.