diff --git a/release-note.md b/release-note.md index d647bdbad935..1c415b1f716e 100644 --- a/release-note.md +++ b/release-note.md @@ -144,6 +144,7 @@ - [Core] [API] Add copy method to Catalog codes (#4414) - [Core] [API] Add options check before create source and sink and transform in FactoryUtil (#4424) - [Core] [Shade] Add guava shade module (#4358) +- [Core] [Spark] Support SeaTunnel Time Type (#5188) ### Connector-V2 diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 503f76b87a16..14f825590d32 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -34,6 +34,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; import com.google.common.collect.Lists; @@ -92,6 +93,12 @@ public List> execute(List> upstreamDataStreams) SeaTunnelSink seaTunnelSink = plugins.get(i); Dataset dataset = fromSourceTable(sinkConfig, sparkRuntimeEnvironment).orElse(input); + StructType inputSchema = + sourceSchema(sinkConfig, sparkRuntimeEnvironment) + .orElseGet( + () -> { + return dataset.schema(); + }); int parallelism; if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) { parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key()); @@ -105,8 +112,7 @@ public List> execute(List> upstreamDataStreams) } dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism); // TODO modify checkpoint location - seaTunnelSink.setTypeInfo( - (SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema())); + seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(inputSchema)); if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink; DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode(); diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 8afffe1add0b..351ddda4c648 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -35,6 +35,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.types.StructType; import com.google.common.collect.Lists; @@ -93,6 +94,12 @@ public List> execute(List> upstreamDataStreams) SeaTunnelSink seaTunnelSink = plugins.get(i); Dataset dataset = fromSourceTable(sinkConfig, sparkRuntimeEnvironment).orElse(input); + StructType inputSchema = + sourceSchema(sinkConfig, sparkRuntimeEnvironment) + .orElseGet( + () -> { + return dataset.schema(); + }); int parallelism; if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) { parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key()); @@ -106,8 +113,7 @@ public List> execute(List> upstreamDataStreams) } dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism); // TODO modify checkpoint location - seaTunnelSink.setTypeInfo( - (SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema())); + seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(inputSchema)); if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) { SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink; DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode(); diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java index d68aec3c2323..0c4b2c8fd892 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java @@ -68,6 +68,7 @@ public List> execute(List> upstreamDataStreams) { CommonOptions.PARALLELISM.key(), CommonOptions.PARALLELISM.defaultValue()); } + StructType schema = (StructType) TypeConverterUtils.convert(source.getProducedType()); Dataset dataset = sparkRuntimeEnvironment .getSparkSession() @@ -77,12 +78,13 @@ public List> execute(List> upstreamDataStreams) { .option( Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source)) - .schema( - (StructType) - TypeConverterUtils.convert(source.getProducedType())) + .schema(schema) .load(); sources.add(dataset); - registerInputTempView(pluginConfigs.get(i), dataset); + + Config config = pluginConfigs.get(i); + stageSchema(config, schema); + registerInputTempView(config, dataset); } return sources; } diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java index ebfcaf6e91fd..2913b24af24f 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java @@ -24,6 +24,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructType; import java.util.List; import java.util.Optional; @@ -72,6 +73,26 @@ protected Optional> fromSourceTable( return Optional.of(sparkRuntimeEnvironment.getSparkSession().read().table(sourceTableName)); } + protected Optional sourceSchema( + Config pluginConfig, SparkRuntimeEnvironment sparkRuntimeEnvironment) { + if (!pluginConfig.hasPath(SOURCE_TABLE_NAME)) { + return Optional.empty(); + } + String sourceTableName = pluginConfig.getString(SOURCE_TABLE_NAME); + return this.sparkRuntimeEnvironment.schema(sourceTableName); + } + + protected void stageSchema(Config pluginConfig, StructType schema) { + if (pluginConfig.hasPath(RESULT_TABLE_NAME)) { + String tableName = pluginConfig.getString(RESULT_TABLE_NAME); + stageSchema(tableName, schema); + } + } + + protected void stageSchema(String tableName, StructType schema) { + this.sparkRuntimeEnvironment.stageSchema(tableName, schema); + } + private void registerTempView(String tableName, Dataset ds) { ds.createOrReplaceTempView(tableName); } diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java index 7e31ca463bc3..0254415f6011 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java @@ -27,13 +27,17 @@ import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructType; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import lombok.extern.slf4j.Slf4j; import java.net.URL; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; @Slf4j public class SparkRuntimeEnvironment implements RuntimeEnvironment { @@ -55,9 +59,12 @@ public class SparkRuntimeEnvironment implements RuntimeEnvironment { private String jobName = Constants.LOGO; + private Map schemas; + private SparkRuntimeEnvironment(Config config) { this.setEnableHive(checkIsContainHive(config)); this.initialize(config); + this.schemas = new HashMap<>(); } public void setEnableHive(boolean enableHive) { @@ -176,4 +183,13 @@ public static SparkRuntimeEnvironment getInstance(Config config) { } return INSTANCE; } + + public Optional schema(String tblName) { + StructType schema = this.schemas.get(tblName); + return schema == null ? Optional.empty() : Optional.of(schema); + } + + public void stageSchema(String tblName, StructType schema) { + this.schemas.put(tblName, schema); + } } diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java index fc9be559257f..e7b2ca79d448 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java @@ -105,7 +105,7 @@ public List> execute(List> upstreamDataStreams) Config pluginConfig = pluginConfigs.get(i); Dataset stream = fromSourceTable(pluginConfig, sparkRuntimeEnvironment).orElse(input); - input = sparkTransform(transform, stream); + input = sparkTransform(pluginConfig, transform, stream); registerInputTempView(pluginConfig, input); result.add(input); } catch (Exception e) { @@ -119,33 +119,45 @@ public List> execute(List> upstreamDataStreams) return result; } - private Dataset sparkTransform(SeaTunnelTransform transform, Dataset stream) + private Dataset sparkTransform( + Config pluginConfig, SeaTunnelTransform transform, Dataset stream) throws IOException { - SeaTunnelDataType seaTunnelDataType = TypeConverterUtils.convert(stream.schema()); + StructType inputSchema = + sourceSchema(pluginConfig, sparkRuntimeEnvironment) + .orElseGet( + () -> { + return stream.schema(); + }); + + SeaTunnelDataType seaTunnelDataType = TypeConverterUtils.convert(inputSchema); transform.setTypeInfo(seaTunnelDataType); - StructType structType = + StructType outputSchema = (StructType) TypeConverterUtils.convert(transform.getProducedType()); SeaTunnelRowConverter inputRowConverter = new SeaTunnelRowConverter(seaTunnelDataType); SeaTunnelRowConverter outputRowConverter = new SeaTunnelRowConverter(transform.getProducedType()); - ExpressionEncoder encoder = RowEncoder.apply(structType); - return stream.mapPartitions( - (MapPartitionsFunction) - (Iterator rowIterator) -> { - TransformIterator iterator = - new TransformIterator( - rowIterator, - transform, - structType, - inputRowConverter, - outputRowConverter); - return iterator; - }, - encoder) - .filter( - (Row row) -> { - return row != null; - }); + ExpressionEncoder encoder = RowEncoder.apply(outputSchema); + Dataset result = + stream.mapPartitions( + (MapPartitionsFunction) + (Iterator rowIterator) -> { + TransformIterator iterator = + new TransformIterator( + rowIterator, + transform, + outputSchema, + inputRowConverter, + outputRowConverter); + return iterator; + }, + encoder) + .filter( + (Row row) -> { + return row != null; + }); + + stageSchema(pluginConfig, outputSchema); + return result; } private static class TransformIterator implements Iterator, Serializable { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf index edd78432862d..ba2e311eeeac 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf @@ -35,6 +35,7 @@ source { fields { name = "string" age = "int" + c_time = "time" } } } @@ -46,7 +47,7 @@ transform { Filter { source_table_name = "fake" result_table_name = "fake1" - fields = ["name", "age"] + fields = ["name", "age", "c_time"] } } @@ -97,10 +98,17 @@ sink { rule_value = 2147483647 } ] + }, { + field_name = c_time + field_type = time + field_value = [ + { + rule_type = NOT_NULL + } + ] } - ] - } - + ] + } } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java index f4b1338b15b5..8dfe75acc6b8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java @@ -33,9 +33,11 @@ import java.math.BigDecimal; import java.sql.Date; +import java.sql.Time; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -95,6 +97,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT { + " `c_longtext` longtext,\n" + " `c_date` date DEFAULT NULL,\n" + " `c_datetime` datetime DEFAULT NULL,\n" + + " `c_time` time DEFAULT NULL,\n" + " `c_timestamp` timestamp NULL DEFAULT NULL,\n" + " `c_tinyblob` tinyblob,\n" + " `c_mediumblob` mediumblob,\n" @@ -187,6 +190,7 @@ Pair> initTestData() { "c_longtext", "c_date", "c_datetime", + "c_time", "c_timestamp", "c_tinyblob", "c_mediumblob", @@ -244,6 +248,7 @@ Pair> initTestData() { String.format("f1_%s", i), Date.valueOf(LocalDate.now()), Timestamp.valueOf(LocalDateTime.now()), + Time.valueOf(LocalTime.now()), new Timestamp(System.currentTimeMillis()), "test".getBytes(), "test".getBytes(), diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf index b91ee9d31779..f23351bdb84a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf @@ -46,8 +46,8 @@ sink { c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, - c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, + c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf index c393e69cee2d..5bec8bc54668 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf @@ -49,8 +49,8 @@ sink { c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, - c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, + c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf index 0460ccdf3ba7..1b092f1e91ce 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf @@ -50,8 +50,8 @@ sink { c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, - c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, + c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf index bf0d3afab151..2a4542d2a923 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf @@ -46,9 +46,9 @@ sink { c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, - c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, + c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned) - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" # Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true" is_exactly_once = "true" @@ -57,4 +57,4 @@ sink { max_commit_attempts = 3 transaction_timeout_sec = 86400 } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf index 78e21280f0de..33a5a5315006 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf @@ -31,6 +31,7 @@ source { id = "int" name = "string" age = "int" + c_time = "time" c_timestamp = "timestamp" c_date = "date" c_map = "map" @@ -51,7 +52,7 @@ transform { source_table_name = "fake" result_table_name = "fake1" # the query table name must same as field 'source_table_name' - query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake" + query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_time, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake" } # The SQL transform support base function and criteria operation # But the complex SQL unsupported yet, include: multi source table/rows JOIN and AGGREGATE operation and the like @@ -116,6 +117,15 @@ sink { } ] }, + { + field_name = c_time + field_type = time + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { field_name = c_timestamp field_type = timestamp diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java index 7f8fadc4e4d3..a0d0b36da73d 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java @@ -57,6 +57,7 @@ import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -87,9 +88,7 @@ private static Object convert(Object field, SeaTunnelDataType dataType) { case DATE: return (int) ((LocalDate) field).toEpochDay(); case TIME: - // TODO: Support TIME Type - throw new RuntimeException( - "time type is not supported now, but will be supported in the future."); + return ((LocalTime) field).toNanoOfDay(); case TIMESTAMP: return InstantConverterUtils.toEpochMicro( Timestamp.valueOf((LocalDateTime) field).toInstant()); @@ -202,6 +201,7 @@ private static MutableValue createMutableValue(SeaTunnelDataType dataType) { case DATE: return new MutableInt(); case BIGINT: + case TIME: case TIMESTAMP: return new MutableLong(); case FLOAT: @@ -231,9 +231,7 @@ private static Object reconvert(Object field, SeaTunnelDataType dataType) { } return LocalDate.ofEpochDay((int) field); case TIME: - // TODO: Support TIME Type - throw new RuntimeException( - "SeaTunnel not support time type, it will be supported in the future."); + return LocalTime.ofNanoOfDay((long) field); case TIMESTAMP: if (field instanceof Timestamp) { return ((Timestamp) field).toLocalDateTime(); diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java index 15357204cd3e..11496747ec64 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.sql.Date; -import java.sql.Time; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; @@ -51,6 +50,7 @@ public SeaTunnelRowConverter(SeaTunnelDataType dataType) { super(dataType); } + // SeaTunnelRow To GenericRow @Override public SeaTunnelRow convert(SeaTunnelRow seaTunnelRow) throws IOException { validate(seaTunnelRow); @@ -75,7 +75,7 @@ private Object convert(Object field, SeaTunnelDataType dataType) { case TIMESTAMP: return Timestamp.valueOf((LocalDateTime) field); case TIME: - return Time.valueOf((LocalTime) field); + return ((LocalTime) field).toNanoOfDay(); case STRING: return field.toString(); case MAP: @@ -145,6 +145,7 @@ private WrappedArray.ofRef convertArray(Object[] arrayData, ArrayType a return new WrappedArray.ofRef<>(arrayData); } + // GenericRow To SeaTunnel @Override public SeaTunnelRow reconvert(SeaTunnelRow engineRow) throws IOException { return (SeaTunnelRow) reconvert(engineRow, dataType); @@ -166,7 +167,7 @@ private Object reconvert(Object field, SeaTunnelDataType dataType) { case TIMESTAMP: return ((Timestamp) field).toLocalDateTime(); case TIME: - return ((Time) field).toLocalTime(); + return LocalTime.ofNanoOfDay((Long) field); case STRING: return field.toString(); case MAP: diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java index 72fba0238244..a4dbcd0c729e 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java @@ -25,10 +25,12 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -38,9 +40,11 @@ import static com.google.common.base.Preconditions.checkNotNull; public class TypeConverterUtils { + private static final Map> TO_SEA_TUNNEL_TYPES = new HashMap<>(16); public static final String ROW_KIND_FIELD = "op"; + public static final String LOGICAL_TIME_TYPE_FLAG = "logical_time_type"; static { TO_SEA_TUNNEL_TYPES.put(DataTypes.NullType, BasicType.VOID_TYPE); @@ -87,8 +91,8 @@ public static DataType convert(SeaTunnelDataType dataType) { return DataTypes.BinaryType; case DATE: return DataTypes.DateType; - // case TIME: - // TODO: not support now, how reconvert? + case TIME: + return DataTypes.LongType; case TIMESTAMP: return DataTypes.TimestampType; case ARRAY: @@ -113,12 +117,14 @@ private static StructType convert(SeaTunnelRowType rowType) { // TODO: row kind StructField[] fields = new StructField[rowType.getFieldNames().length]; for (int i = 0; i < rowType.getFieldNames().length; i++) { + SeaTunnelDataType fieldType = rowType.getFieldTypes()[i]; + Metadata metadata = + fieldType.getSqlType() == SqlType.TIME + ? new MetadataBuilder().putBoolean(LOGICAL_TIME_TYPE_FLAG, true).build() + : Metadata.empty(); + fields[i] = - new StructField( - rowType.getFieldNames()[i], - convert(rowType.getFieldTypes()[i]), - true, - Metadata.empty()); + new StructField(rowType.getFieldNames()[i], convert(fieldType), true, metadata); } return new StructType(fields); } @@ -178,7 +184,14 @@ private static SeaTunnelRowType convert(StructType structType) { SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[structFields.length]; for (int i = 0; i < structFields.length; i++) { fieldNames[i] = structFields[i].name(); - fieldTypes[i] = convert(structFields[i].dataType()); + Metadata metadata = structFields[i].metadata(); + if (metadata != null + && metadata.contains(LOGICAL_TIME_TYPE_FLAG) + && metadata.getBoolean(LOGICAL_TIME_TYPE_FLAG)) { + fieldTypes[i] = LocalTimeType.LOCAL_TIME_TYPE; + } else { + fieldTypes[i] = convert(structFields[i].dataType()); + } } return new SeaTunnelRowType(fieldNames, fieldTypes); }