Skip to content

Commit

Permalink
[Feature][Spark] Support SeaTunnel Time Type.
Browse files Browse the repository at this point in the history
  • Loading branch information
CheneyYin committed Jul 31, 2023
1 parent 1db9f45 commit d2c362d
Show file tree
Hide file tree
Showing 20 changed files with 889 additions and 51 deletions.
2 changes: 2 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,5 @@ seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/z
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFilter.java from https://github.com/JSQLParser/JSqlParser
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/spark/sql/seatunnel/util/CanANSIStoreAssign.java from https://github.com/apache/spark
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/ReconcileSchema.java from https://github.com/apache/spark
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
- [Core] [API] Add copy method to Catalog codes (#4414)
- [Core] [API] Add options check before create source and sink and transform in FactoryUtil (#4424)
- [Core] [Shade] Add guava shade module (#4358)
- [Core] [Spark] Support SeaTunnel Time Type (#5188)

### Connector-V2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
import org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter;
import org.apache.seatunnel.translation.spark.utils.DatasetUtils;
import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;

import org.apache.spark.api.java.function.MapPartitionsFunction;
Expand Down Expand Up @@ -129,23 +130,27 @@ private Dataset<Row> sparkTransform(SeaTunnelTransform transform, Dataset<Row> s
SeaTunnelRowConverter outputRowConverter =
new SeaTunnelRowConverter(transform.getProducedType());
ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
return stream.mapPartitions(
(MapPartitionsFunction<Row, Row>)
(Iterator<Row> rowIterator) -> {
TransformIterator iterator =
new TransformIterator(
rowIterator,
transform,
structType,
inputRowConverter,
outputRowConverter);
return iterator;
},
encoder)
.filter(
(Row row) -> {
return row != null;
});
Dataset<Row> result =
stream.mapPartitions(
(MapPartitionsFunction<Row, Row>)
(Iterator<Row> rowIterator) -> {
TransformIterator iterator =
new TransformIterator(
rowIterator,
transform,
structType,
inputRowConverter,
outputRowConverter);
return iterator;
},
encoder)
.filter(
(Row row) -> {
return row != null;
});

result = DatasetUtils.to(result, structType);
return result;
}

private static class TransformIterator implements Iterator<Row>, Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ source {
fields {
name = "string"
age = "int"
c_time = "time"
}
}
}
Expand All @@ -46,7 +47,7 @@ transform {
Filter {
source_table_name = "fake"
result_table_name = "fake1"
fields = ["name", "age"]
fields = ["name", "age", "c_time"]
}
}

Expand Down Expand Up @@ -97,10 +98,17 @@ sink {
rule_value = 2147483647
}
]
}, {
field_name = c_time
field_type = time
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}

]
}
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -95,6 +97,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
+ " `c_longtext` longtext,\n"
+ " `c_date` date DEFAULT NULL,\n"
+ " `c_datetime` datetime DEFAULT NULL,\n"
+ " `c_time` time DEFAULT NULL,\n"
+ " `c_timestamp` timestamp NULL DEFAULT NULL,\n"
+ " `c_tinyblob` tinyblob,\n"
+ " `c_mediumblob` mediumblob,\n"
Expand Down Expand Up @@ -187,6 +190,7 @@ Pair<String[], List<SeaTunnelRow>> initTestData() {
"c_longtext",
"c_date",
"c_datetime",
"c_time",
"c_timestamp",
"c_tinyblob",
"c_mediumblob",
Expand Down Expand Up @@ -244,6 +248,7 @@ Pair<String[], List<SeaTunnelRow>> initTestData() {
String.format("f1_%s", i),
Date.valueOf(LocalDate.now()),
Timestamp.valueOf(LocalDateTime.now()),
Time.valueOf(LocalTime.now()),
new Timestamp(System.currentTimeMillis()),
"test".getBytes(),
"test".getBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ sink {
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary,
c_binary, c_year, c_int_unsigned, c_integer_unsigned)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""

# Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true"
is_exactly_once = "true"
Expand All @@ -57,4 +57,4 @@ sink {
max_commit_attempts = 3
transaction_timeout_sec = 86400
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ source {
id = "int"
name = "string"
age = "int"
c_time = "time"
c_timestamp = "timestamp"
c_date = "date"
c_map = "map<string, string>"
Expand All @@ -51,7 +52,7 @@ transform {
source_table_name = "fake"
result_table_name = "fake1"
# the query table name must same as field 'source_table_name'
query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake"
query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_time, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake"
}
# The SQL transform support base function and criteria operation
# But the complex SQL unsupported yet, include: multi source table/rows JOIN and AGGREGATE operation and the like
Expand Down Expand Up @@ -116,6 +117,15 @@ sink {
}
]
},
{
field_name = c_time
field_type = time
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = c_timestamp
field_type = timestamp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.seatunnel.util;

import org.apache.spark.sql.catalyst.expressions.Alias;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.types.Metadata;

import scala.Option;
import scala.collection.Seq;
import scala.collection.Seq$;

public class AliasExpressionCreator {

protected static Alias create(Expression col, String name, Option<Metadata> explicitMetadata) {
return new Alias(
col,
name,
NamedExpressionId.newExprId(),
(Seq<String>) Seq$.MODULE$.empty(),
explicitMetadata);
}
}
Loading

0 comments on commit d2c362d

Please sign in to comment.