Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Spark] Support SeaTunnel Time Type. #5188

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
- [Core] [API] Add copy method to Catalog codes (#4414)
- [Core] [API] Add options check before create source and sink and transform in FactoryUtil (#4424)
- [Core] [Shade] Add guava shade module (#4358)
- [Core] [Spark] Support SeaTunnel Time Type (#5188)
- [Core] [Flink] Support Decimal Type with configurable precision and scale (#5419)

### Connector-V2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
Expand All @@ -37,7 +38,6 @@
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -94,7 +94,10 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
DatasetTableInfo datasetTableInfo =
fromSourceTable(sinkConfig, sparkRuntimeEnvironment, upstreamDataStreams)
.orElse(input);
SeaTunnelDataType<?> inputType =
datasetTableInfo.getCatalogTable().getSeaTunnelRowType();
Dataset<Row> dataset = datasetTableInfo.getDataset();

int parallelism;
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
Expand All @@ -120,7 +123,7 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
sink.setJobContext(jobContext);
sink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
sink.setTypeInfo((SeaTunnelRowType) inputType);
} else {
TableSinkFactoryContext context =
new TableSinkFactoryContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
Expand All @@ -37,7 +38,6 @@
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -95,7 +95,10 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
DatasetTableInfo datasetTableInfo =
fromSourceTable(sinkConfig, sparkRuntimeEnvironment, upstreamDataStreams)
.orElse(input);
SeaTunnelDataType<?> inputType =
datasetTableInfo.getCatalogTable().getSeaTunnelRowType();
Dataset<Row> dataset = datasetTableInfo.getDataset();

int parallelism;
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
Expand All @@ -121,7 +124,7 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
sink.setJobContext(jobContext);
sink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
sink.setTypeInfo((SeaTunnelRowType) inputType);
} else {
TableSinkFactoryContext context =
new TableSinkFactoryContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
CommonOptions.PARALLELISM.key(),
CommonOptions.PARALLELISM.defaultValue());
}
StructType schema = (StructType) TypeConverterUtils.convert(source.getProducedType());
Dataset<Row> dataset =
sparkRuntimeEnvironment
.getSparkSession()
Expand All @@ -85,9 +86,7 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
.option(
Constants.SOURCE_SERIALIZATION,
SerializationUtils.objectToString(source))
.schema(
(StructType)
TypeConverterUtils.convert(source.getProducedType()))
.schema(schema)
.load();
sources.add(
new DatasetTableInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
Expand Down Expand Up @@ -107,7 +108,7 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
SeaTunnelTransform transform = factory.createTransform(context).createTransform();

Dataset<Row> inputDataset = sparkTransform(transform, dataset.getDataset());
Dataset<Row> inputDataset = sparkTransform(transform, dataset);
registerInputTempView(pluginConfig, inputDataset);
upstreamDataStreams.add(
new DatasetTableInfo(
Expand All @@ -127,33 +128,33 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
return upstreamDataStreams;
}

private Dataset<Row> sparkTransform(SeaTunnelTransform transform, Dataset<Row> stream)
private Dataset<Row> sparkTransform(SeaTunnelTransform transform, DatasetTableInfo tableInfo)
throws IOException {
SeaTunnelDataType<?> seaTunnelDataType = TypeConverterUtils.convert(stream.schema());
Dataset<Row> stream = tableInfo.getDataset();
SeaTunnelDataType<?> seaTunnelDataType = tableInfo.getCatalogTable().getSeaTunnelRowType();
transform.setTypeInfo(seaTunnelDataType);
StructType structType =
StructType outputSchema =
(StructType) TypeConverterUtils.convert(transform.getProducedType());
SeaTunnelRowConverter inputRowConverter = new SeaTunnelRowConverter(seaTunnelDataType);
SeaTunnelRowConverter outputRowConverter =
new SeaTunnelRowConverter(transform.getProducedType());
ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
return stream.mapPartitions(
(MapPartitionsFunction<Row, Row>)
(Iterator<Row> rowIterator) -> {
TransformIterator iterator =
new TransformIterator(
rowIterator,
transform,
structType,
inputRowConverter,
outputRowConverter);
return iterator;
},
encoder)
.filter(
(Row row) -> {
return row != null;
});
ExpressionEncoder<Row> encoder = RowEncoder.apply(outputSchema);
Dataset<Row> result =
stream.mapPartitions(
(MapPartitionsFunction<Row, Row>)
(Iterator<Row> rowIterator) -> {
TransformIterator iterator =
new TransformIterator(
rowIterator,
transform,
outputSchema,
inputRowConverter,
outputRowConverter);
return iterator;
},
encoder)
.filter(Objects::nonNull);
return result;
}

private static class TransformIterator implements Iterator<Row>, Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ source {
fields {
name = "string"
age = "int"
c_time = "time"
}
}
}
Expand All @@ -46,7 +47,7 @@ transform {
Filter {
source_table_name = "fake"
result_table_name = "fake1"
fields = ["name", "age"]
fields = ["name", "age", "c_time"]
}
}

Expand Down Expand Up @@ -97,10 +98,17 @@ sink {
rule_value = 2147483647
}
]
}, {
field_name = c_time
field_type = time
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}

]
}
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -127,6 +129,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
+ " `c_longtext` longtext,\n"
+ " `c_date` date DEFAULT NULL,\n"
+ " `c_datetime` datetime DEFAULT NULL,\n"
+ " `c_time` time DEFAULT NULL,\n"
+ " `c_timestamp` timestamp NULL DEFAULT NULL,\n"
+ " `c_tinyblob` tinyblob,\n"
+ " `c_mediumblob` mediumblob,\n"
Expand Down Expand Up @@ -221,6 +224,7 @@ Pair<String[], List<SeaTunnelRow>> initTestData() {
"c_longtext",
"c_date",
"c_datetime",
"c_time",
"c_timestamp",
"c_tinyblob",
"c_mediumblob",
Expand Down Expand Up @@ -278,6 +282,7 @@ Pair<String[], List<SeaTunnelRow>> initTestData() {
String.format("f1_%s", i),
Date.valueOf(LocalDate.now()),
Timestamp.valueOf(LocalDateTime.now()),
Time.valueOf(LocalTime.now()),
new Timestamp(System.currentTimeMillis()),
"test".getBytes(),
"test".getBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
properties {
useSSL=false
rewriteBatchedStatements=true
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""

# Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true"
is_exactly_once = "true"
Expand All @@ -65,4 +65,4 @@ sink {
rewriteBatchedStatements=true
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ source {
id = "int"
name = "string"
age = "int"
c_time = "time"
c_timestamp = "timestamp"
c_date = "date"
c_map = "map<string, string>"
Expand All @@ -51,7 +52,7 @@ transform {
source_table_name = "fake"
result_table_name = "fake1"
# the query table name must same as field 'source_table_name'
query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake"
query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_time, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake"
}
# The SQL transform support base function and criteria operation
# But the complex SQL unsupported yet, include: multi source table/rows JOIN and AGGREGATE operation and the like
Expand Down Expand Up @@ -116,6 +117,15 @@ sink {
}
]
},
{
field_name = c_time
field_type = time
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = c_timestamp
field_type = timestamp
Expand Down
Loading