diff --git a/LICENSE b/LICENSE index adabba50de63..eb24ff9d1538 100644 --- a/LICENSE +++ b/LICENSE @@ -241,3 +241,5 @@ seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/z seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLType.java from https://github.com/JSQLParser/JSqlParser seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFilter.java from https://github.com/JSQLParser/JSqlParser seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sqlengine/zeta/ZetaSQLFunction.java from https://github.com/JSQLParser/JSqlParser +seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/spark/sql/seatunnel/util/CanANSIStoreAssign.java from https://github.com/apache/spark +seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/ReconcileSchema.java from https://github.com/apache/spark diff --git a/release-note.md b/release-note.md index b542b35a8148..11b9f450928f 100644 --- a/release-note.md +++ b/release-note.md @@ -142,6 +142,7 @@ - [Core] [API] Add copy method to Catalog codes (#4414) - [Core] [API] Add options check before create source and sink and transform in FactoryUtil (#4424) - [Core] [Shade] Add guava shade module (#4358) +- [Core] [Spark] Support SeaTunnel Time Type (#5188) ### Connector-V2 diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java index fc9be559257f..4657934599d7 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; import org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter; +import org.apache.seatunnel.translation.spark.utils.DatasetUtils; import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils; import org.apache.spark.api.java.function.MapPartitionsFunction; @@ -129,23 +130,27 @@ private Dataset sparkTransform(SeaTunnelTransform transform, Dataset s SeaTunnelRowConverter outputRowConverter = new SeaTunnelRowConverter(transform.getProducedType()); ExpressionEncoder encoder = RowEncoder.apply(structType); - return stream.mapPartitions( - (MapPartitionsFunction) - (Iterator rowIterator) -> { - TransformIterator iterator = - new TransformIterator( - rowIterator, - transform, - structType, - inputRowConverter, - outputRowConverter); - return iterator; - }, - encoder) - .filter( - (Row row) -> { - return row != null; - }); + Dataset result = + stream.mapPartitions( + (MapPartitionsFunction) + (Iterator rowIterator) -> { + TransformIterator iterator = + new TransformIterator( + rowIterator, + transform, + structType, + inputRowConverter, + outputRowConverter); + return iterator; + }, + encoder) + .filter( + (Row row) -> { + return row != null; + }); + + result = DatasetUtils.to(result, structType); + return result; } private static class TransformIterator implements Iterator, Serializable { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf index edd78432862d..ba2e311eeeac 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf @@ -35,6 +35,7 @@ source { fields { name = "string" age = "int" + c_time = "time" } } } @@ -46,7 +47,7 @@ transform { Filter { source_table_name = "fake" result_table_name = "fake1" - fields = ["name", "age"] + fields = ["name", "age", "c_time"] } } @@ -97,10 +98,17 @@ sink { rule_value = 2147483647 } ] + }, { + field_name = c_time + field_type = time + field_value = [ + { + rule_type = NOT_NULL + } + ] } - ] - } - + ] + } } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java index f4b1338b15b5..8dfe75acc6b8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java @@ -33,9 +33,11 @@ import java.math.BigDecimal; import java.sql.Date; +import java.sql.Time; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -95,6 +97,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT { + " `c_longtext` longtext,\n" + " `c_date` date DEFAULT NULL,\n" + " `c_datetime` datetime DEFAULT NULL,\n" + + " `c_time` time DEFAULT NULL,\n" + " `c_timestamp` timestamp NULL DEFAULT NULL,\n" + " `c_tinyblob` tinyblob,\n" + " `c_mediumblob` mediumblob,\n" @@ -187,6 +190,7 @@ Pair> initTestData() { "c_longtext", "c_date", "c_datetime", + "c_time", "c_timestamp", "c_tinyblob", "c_mediumblob", @@ -244,6 +248,7 @@ Pair> initTestData() { String.format("f1_%s", i), Date.valueOf(LocalDate.now()), Timestamp.valueOf(LocalDateTime.now()), + Time.valueOf(LocalTime.now()), new Timestamp(System.currentTimeMillis()), "test".getBytes(), "test".getBytes(), diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf index b91ee9d31779..f23351bdb84a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf @@ -46,8 +46,8 @@ sink { c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, - c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, + c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf index c393e69cee2d..5bec8bc54668 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf @@ -49,8 +49,8 @@ sink { c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, - c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, + c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf index 0460ccdf3ba7..1b092f1e91ce 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf @@ -50,8 +50,8 @@ sink { c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, - c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, + c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf index bf0d3afab151..2a4542d2a923 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf @@ -46,9 +46,9 @@ sink { c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, - c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, + c_datetime, c_time, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned) - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" # Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true" is_exactly_once = "true" @@ -57,4 +57,4 @@ sink { max_commit_attempts = 3 transaction_timeout_sec = 86400 } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf index 78e21280f0de..33a5a5315006 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform.conf @@ -31,6 +31,7 @@ source { id = "int" name = "string" age = "int" + c_time = "time" c_timestamp = "timestamp" c_date = "date" c_map = "map" @@ -51,7 +52,7 @@ transform { source_table_name = "fake" result_table_name = "fake1" # the query table name must same as field 'source_table_name' - query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake" + query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, pi() as pi, c_time, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake" } # The SQL transform support base function and criteria operation # But the complex SQL unsupported yet, include: multi source table/rows JOIN and AGGREGATE operation and the like @@ -116,6 +117,15 @@ sink { } ] }, + { + field_name = c_time + field_type = time + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { field_name = c_timestamp field_type = timestamp diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/spark/sql/seatunnel/util/AliasExpressionCreator.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/spark/sql/seatunnel/util/AliasExpressionCreator.java new file mode 100644 index 000000000000..5c73c9f85d0c --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/spark/sql/seatunnel/util/AliasExpressionCreator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.seatunnel.util; + +import org.apache.spark.sql.catalyst.expressions.Alias; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.Metadata; + +import scala.Option; +import scala.collection.Seq; +import scala.collection.Seq$; + +public class AliasExpressionCreator { + + protected static Alias create(Expression col, String name, Option explicitMetadata) { + return new Alias( + col, + name, + NamedExpressionId.newExprId(), + (Seq) Seq$.MODULE$.empty(), + explicitMetadata); + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/spark/sql/seatunnel/util/CanANSIStoreAssign.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/spark/sql/seatunnel/util/CanANSIStoreAssign.java new file mode 100644 index 000000000000..68c990c6c1e2 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/spark/sql/seatunnel/util/CanANSIStoreAssign.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.seatunnel.util; + +import org.apache.spark.sql.catalyst.expressions.Cast; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.CharType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.NumericType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.VarcharType; + +public class CanANSIStoreAssign { + /** + * Returns true if we can cast the `from` type to `to` type as per the ANSI SQL. + * + *

This function is developed in Java and {@link + * org.apache.spark.sql.catalyst.expressions.Cast$#MODULE$}.canANSIStoreAssign({@link DataType}, + * {@link DataType}) since Spark v3.0.0 was used as reference. + */ + protected static boolean apply(DataType from, DataType to) { + if (from == to) { + return true; + } + + if (from == DataTypes.NullType) { + return true; + } + + if (from instanceof NumericType && to instanceof NumericType) { + return true; + } + + if (from instanceof BinaryType && to == DataTypes.StringType) { + return true; + } + + if (from instanceof BooleanType && to == DataTypes.StringType) { + return true; + } + + if (from instanceof CharType && to == DataTypes.StringType) { + return true; + } + + if (from instanceof StringType && to == DataTypes.StringType) { + return true; + } + + if (from instanceof VarcharType && to == DataTypes.StringType) { + return true; + } + + if (from instanceof CalendarIntervalType && to == DataTypes.StringType) { + return true; + } + + if (from instanceof ArrayType && to instanceof ArrayType) { + ArrayType fromType = (ArrayType) from; + ArrayType toType = (ArrayType) to; + + DataType fromET = fromType.elementType(); + boolean fromContainsNull = fromType.containsNull(); + + DataType toET = toType.elementType(); + boolean toContainsNull = toType.containsNull(); + + return Cast.resolvableNullability(fromContainsNull, toContainsNull) + && CanANSIStoreAssign.apply(fromET, toET); + } + + if (from instanceof MapType && to instanceof MapType) { + MapType fromType = (MapType) from; + MapType toType = (MapType) to; + DataType fromKeyType = fromType.keyType(); + DataType fromValType = fromType.valueType(); + boolean fromValContainsNull = fromType.valueContainsNull(); + + DataType toKeyType = toType.keyType(); + DataType toValType = toType.valueType(); + boolean toValContainsNull = toType.valueContainsNull(); + + return Cast.resolvableNullability(fromValContainsNull, toValContainsNull) + && CanANSIStoreAssign.apply(fromKeyType, toKeyType) + && CanANSIStoreAssign.apply(fromValType, toValType); + } + + if (from instanceof StructType && to instanceof StructType) { + StructType formType = (StructType) from; + StructType toType = (StructType) to; + + if (formType.fields().length != toType.fields().length) { + return false; + } + + StructField[] fromFields = formType.fields(); + StructField[] toFields = toType.fields(); + for (int idx = 0; idx < fromFields.length; idx++) { + if (Cast.resolvableNullability(fromFields[idx].nullable(), toFields[idx].nullable()) + && CanANSIStoreAssign.apply( + fromFields[idx].dataType(), toFields[idx].dataType())) { + continue; + } else { + return false; + } + } + return true; + } + return false; + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java index 7f8fadc4e4d3..190b2f758198 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java @@ -57,11 +57,13 @@ import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; public final class InternalRowConverter extends RowConverter { @@ -87,9 +89,7 @@ private static Object convert(Object field, SeaTunnelDataType dataType) { case DATE: return (int) ((LocalDate) field).toEpochDay(); case TIME: - // TODO: Support TIME Type - throw new RuntimeException( - "time type is not supported now, but will be supported in the future."); + return TimeUnit.NANOSECONDS.toMicros(((LocalTime) field).toNanoOfDay()); case TIMESTAMP: return InstantConverterUtils.toEpochMicro( Timestamp.valueOf((LocalDateTime) field).toInstant()); @@ -202,6 +202,7 @@ private static MutableValue createMutableValue(SeaTunnelDataType dataType) { case DATE: return new MutableInt(); case BIGINT: + case TIME: case TIMESTAMP: return new MutableLong(); case FLOAT: @@ -231,9 +232,10 @@ private static Object reconvert(Object field, SeaTunnelDataType dataType) { } return LocalDate.ofEpochDay((int) field); case TIME: - // TODO: Support TIME Type - throw new RuntimeException( - "SeaTunnel not support time type, it will be supported in the future."); + if (field instanceof Timestamp) { + return LocalTime.ofNanoOfDay(((Timestamp) field).getNanos()); + } + return LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos((long) field)); case TIMESTAMP: if (field instanceof Timestamp) { return ((Timestamp) field).toLocalDateTime(); diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java index 15357204cd3e..000e0baa06f0 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.sql.Date; -import java.sql.Time; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; @@ -51,6 +50,7 @@ public SeaTunnelRowConverter(SeaTunnelDataType dataType) { super(dataType); } + // SeaTunnelRow To GenericRow @Override public SeaTunnelRow convert(SeaTunnelRow seaTunnelRow) throws IOException { validate(seaTunnelRow); @@ -75,7 +75,12 @@ private Object convert(Object field, SeaTunnelDataType dataType) { case TIMESTAMP: return Timestamp.valueOf((LocalDateTime) field); case TIME: - return Time.valueOf((LocalTime) field); + if (field instanceof LocalTime) { + return ((LocalTime) field).toNanoOfDay(); + } + if (field instanceof Long) { + return field; + } case STRING: return field.toString(); case MAP: @@ -145,6 +150,7 @@ private WrappedArray.ofRef convertArray(Object[] arrayData, ArrayType a return new WrappedArray.ofRef<>(arrayData); } + // GenericRow To SeaTunnel @Override public SeaTunnelRow reconvert(SeaTunnelRow engineRow) throws IOException { return (SeaTunnelRow) reconvert(engineRow, dataType); @@ -166,7 +172,10 @@ private Object reconvert(Object field, SeaTunnelDataType dataType) { case TIMESTAMP: return ((Timestamp) field).toLocalDateTime(); case TIME: - return ((Time) field).toLocalTime(); + if (field instanceof Timestamp) { + return ((Timestamp) field).toLocalDateTime().toLocalTime(); + } + return LocalTime.ofNanoOfDay((Long) field); case STRING: return field.toString(); case MAP: diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/DatasetUtils.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/DatasetUtils.java new file mode 100644 index 000000000000..231b4dd2b8eb --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/DatasetUtils.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.spark.utils; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.seatunnel.util.ReconcileSchema; +import org.apache.spark.sql.types.StructType; + +public class DatasetUtils { + public static Dataset to(Dataset dataset, StructType schema) { + return ReconcileSchema.reconcile(dataset, schema); + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java index 72fba0238244..a4dbcd0c729e 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java @@ -25,10 +25,12 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -38,9 +40,11 @@ import static com.google.common.base.Preconditions.checkNotNull; public class TypeConverterUtils { + private static final Map> TO_SEA_TUNNEL_TYPES = new HashMap<>(16); public static final String ROW_KIND_FIELD = "op"; + public static final String LOGICAL_TIME_TYPE_FLAG = "logical_time_type"; static { TO_SEA_TUNNEL_TYPES.put(DataTypes.NullType, BasicType.VOID_TYPE); @@ -87,8 +91,8 @@ public static DataType convert(SeaTunnelDataType dataType) { return DataTypes.BinaryType; case DATE: return DataTypes.DateType; - // case TIME: - // TODO: not support now, how reconvert? + case TIME: + return DataTypes.LongType; case TIMESTAMP: return DataTypes.TimestampType; case ARRAY: @@ -113,12 +117,14 @@ private static StructType convert(SeaTunnelRowType rowType) { // TODO: row kind StructField[] fields = new StructField[rowType.getFieldNames().length]; for (int i = 0; i < rowType.getFieldNames().length; i++) { + SeaTunnelDataType fieldType = rowType.getFieldTypes()[i]; + Metadata metadata = + fieldType.getSqlType() == SqlType.TIME + ? new MetadataBuilder().putBoolean(LOGICAL_TIME_TYPE_FLAG, true).build() + : Metadata.empty(); + fields[i] = - new StructField( - rowType.getFieldNames()[i], - convert(rowType.getFieldTypes()[i]), - true, - Metadata.empty()); + new StructField(rowType.getFieldNames()[i], convert(fieldType), true, metadata); } return new StructType(fields); } @@ -178,7 +184,14 @@ private static SeaTunnelRowType convert(StructType structType) { SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[structFields.length]; for (int i = 0; i < structFields.length; i++) { fieldNames[i] = structFields[i].name(); - fieldTypes[i] = convert(structFields[i].dataType()); + Metadata metadata = structFields[i].metadata(); + if (metadata != null + && metadata.contains(LOGICAL_TIME_TYPE_FLAG) + && metadata.getBoolean(LOGICAL_TIME_TYPE_FLAG)) { + fieldTypes[i] = LocalTimeType.LOCAL_TIME_TYPE; + } else { + fieldTypes[i] = convert(structFields[i].dataType()); + } } return new SeaTunnelRowType(fieldNames, fieldTypes); } diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/AliasExpressionCreator.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/AliasExpressionCreator.java new file mode 100644 index 000000000000..1ecf9ae4f399 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/AliasExpressionCreator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.seatunnel.util; + +import org.apache.spark.sql.catalyst.expressions.Alias; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.Metadata; + +import scala.Option; +import scala.collection.Seq; +import scala.collection.Seq$; + +public class AliasExpressionCreator { + + /** Compatible to Spark v3.1.1 */ + protected static Alias create(Expression col, String name, Option explicitMetadata) { + return new Alias( + col, + name, + NamedExpressionId.newExprId(), + (Seq) Seq$.MODULE$.empty(), + explicitMetadata, + (Seq) Seq$.MODULE$.empty()); + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/CanANSIStoreAssign.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/CanANSIStoreAssign.java new file mode 100644 index 000000000000..e0b645ece4d7 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/CanANSIStoreAssign.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.seatunnel.util; + +import org.apache.spark.sql.catalyst.expressions.Cast; +import org.apache.spark.sql.types.DataType; + +public class CanANSIStoreAssign { + /** Compatible to Spark v3.0.0 */ + protected static boolean apply(DataType from, DataType to) { + return Cast.canANSIStoreAssign(from, to); + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/NamedExpressionId.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/NamedExpressionId.java new file mode 100644 index 000000000000..c215dfd57ebb --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/NamedExpressionId.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.seatunnel.util; + +import org.apache.spark.sql.catalyst.expressions.ExprId; +import org.apache.spark.sql.catalyst.expressions.NamedExpression$; + +public class NamedExpressionId { + protected static ExprId newExprId() { + return NamedExpression$.MODULE$.newExprId(); + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/ReconcileSchema.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/ReconcileSchema.java new file mode 100644 index 000000000000..2a94bb4749de --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/spark/sql/seatunnel/util/ReconcileSchema.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.seatunnel.util; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.ArrayTransform; +import org.apache.spark.sql.catalyst.expressions.Attribute; +import org.apache.spark.sql.catalyst.expressions.Cast; +import org.apache.spark.sql.catalyst.expressions.CreateNamedStruct; +import org.apache.spark.sql.catalyst.expressions.CreateStruct; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.GetStructField; +import org.apache.spark.sql.catalyst.expressions.If; +import org.apache.spark.sql.catalyst.expressions.IsNull; +import org.apache.spark.sql.catalyst.expressions.KnownNotNull; +import org.apache.spark.sql.catalyst.expressions.LambdaFunction; +import org.apache.spark.sql.catalyst.expressions.Literal; +import org.apache.spark.sql.catalyst.expressions.MapFromArrays; +import org.apache.spark.sql.catalyst.expressions.MapKeys; +import org.apache.spark.sql.catalyst.expressions.MapValues; +import org.apache.spark.sql.catalyst.expressions.NamedExpression; +import org.apache.spark.sql.catalyst.expressions.NamedLambdaVariable; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.plans.logical.Project; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import org.jetbrains.annotations.NotNull; + +import scala.Function0; +import scala.Function2; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class ReconcileSchema { + + /** + * Returns a new Dataset<Row> where each row is reconciled to match the specified schema. + * + *

This function will carry over the metadata from the specified schema. + * + *

This function is developed in Java and {@link Dataset}.to( {@link StructType} ) function + * since Spark v3.4.0 was used as reference. + */ + public static Dataset reconcile(Dataset dataset, StructType schema) { + SparkSession sparkSession = dataset.sparkSession(); + + Supplier> block = + () -> { + LogicalPlan logicalPlan = dataset.logicalPlan(); + Project project = + matchSchema(logicalPlan, schema, sparkSession.sessionState().conf()); + return withPlan(sparkSession, project); + }; + + return ReconcileSchema.>withActive(sparkSession, block); + } + + /** + * This function is developed in Java and {@link + * org.apache.spark.sql.catalyst.plans.logical.Project$#MODULE$}.matchSchema( {@link + * LogicalPlan}, {@link StructType}, {@link SQLConf} ) since Spark v3.4.0 was used as reference. + */ + private static Project matchSchema(LogicalPlan plan, StructType schema, SQLConf conf) { + assert plan.resolved(); + List> fieldExprs = + JavaConverters.seqAsJavaListConverter(plan.output()).asJava().stream() + .map( + (Attribute a) -> { + return new Tuple2(a.name(), (Expression) a); + }) + .collect(Collectors.toList()); + + List schemaFields = Arrays.asList(schema.fields()); + + List projects = + reorderFields(fieldExprs, schemaFields, new ArrayList(), conf); + + return new Project( + JavaConverters.asScalaIteratorConverter(projects.iterator()).asScala().toSeq(), + plan); + } + + /** + * This function is developed in Java and {@link + * org.apache.spark.sql.catalyst.plans.logical.Project$#MODULE$}.reconcileColumnType( {@link + * Expression}, {@link Seq}<{@link String}>, {@link DataType}, {@link Boolean}, {@link + * SQLConf} ) since Spark v3.4.0 was used as reference. + */ + private static Expression reconcileColumnType( + Expression col, List columnPath, DataType dt, boolean nullable, SQLConf conf) + throws IllegalArgumentException { + if (col.nullable() && !nullable) { + String msg = String.format("Column: %s. Nullable column or Field", columnPath); + throw new IllegalArgumentException(msg); + } + + DataType colType = col.dataType(); + if (dt instanceof StructType && colType instanceof StructType) { + return reconcileStructType( + col, (StructType) colType, (StructType) dt, columnPath, conf); + } else if (dt instanceof ArrayType && colType instanceof ArrayType) { + return reconcileArrayType(col, (ArrayType) colType, (ArrayType) dt, columnPath, conf); + } else if (dt instanceof MapType && colType instanceof MapType) { + return reconcileMapType(col, (MapType) colType, (MapType) dt, columnPath, conf); + } else { + DataType other = colType; + DataType target = dt; + + if (other == target) { + return col; + } else if (CanANSIStoreAssign.apply(other, target)) { + return new Cast( + col, + target, + Option.apply(conf.sessionLocalTimeZone()), + SQLConf.get().ansiEnabled()); + } else { + String msg = + String.format( + "Invalid Column or Field DataType. Column Path: %s. %s, %s.", + columnPath, other, target); + throw new IllegalArgumentException(msg); + } + } + } + + /** + * This function is developed in Java and {@link + * org.apache.spark.sql.catalyst.plans.logical.Project$#MODULE$}.reconcileColumnType( {@link + * Expression}, {@link Seq}<{@link String}>, {@link DataType}, {@link Boolean}, {@link + * SQLConf} ) since Spark v3.4.0 was used as reference. + */ + @NotNull private static MapFromArrays reconcileMapType( + Expression col, + MapType colType, + MapType expected, + List columnPath, + SQLConf conf) { + boolean valueContainsNull = colType.valueContainsNull(); + DataType kt = colType.keyType(); + DataType vt = colType.valueType(); + + if (valueContainsNull && !expected.valueContainsNull()) { + String msg = + String.format( + "Not Null Constraint Violation in Map Value. Column Path: %s.", + columnPath); + throw new IllegalArgumentException(msg); + } + + NamedLambdaVariable keyParam = + new NamedLambdaVariable( + "key", kt, false, NamedExpressionId.newExprId(), new AtomicReference<>()); + NamedLambdaVariable valueParam = + new NamedLambdaVariable( + "value", + vt, + valueContainsNull, + NamedExpressionId.newExprId(), + new AtomicReference<>()); + + List keyColumnPath = new ArrayList<>(columnPath); + List valColumnPath = new ArrayList<>(columnPath); + keyColumnPath.add("key"); + valColumnPath.add("value"); + + Expression reconciledKey = + reconcileColumnType(keyParam, keyColumnPath, expected.keyType(), false, conf); + + Expression reconciledVal = + reconcileColumnType( + valueParam, + valColumnPath, + expected.valueType(), + expected.valueContainsNull(), + conf); + + List keyArgList = new ArrayList<>(); + keyArgList.add(keyParam); + Seq keyArgs = + JavaConverters.asScalaIteratorConverter(keyArgList.iterator()).asScala().toSeq(); + + List valArgList = new ArrayList<>(); + valArgList.add(valueParam); + Seq valArgs = + JavaConverters.asScalaIteratorConverter(valArgList.iterator()).asScala().toSeq(); + + LambdaFunction keyFunc = new LambdaFunction(reconciledKey, keyArgs, false); + LambdaFunction valFunc = new LambdaFunction(reconciledVal, valArgs, false); + + ArrayTransform newKeys = new ArrayTransform(new MapKeys(col), keyFunc); + ArrayTransform newVals = new ArrayTransform(new MapValues(col), valFunc); + + return new MapFromArrays(newKeys, newVals); + } + + /** + * This function is developed in Java and {@link + * org.apache.spark.sql.catalyst.plans.logical.Project$#MODULE$}.reconcileColumnType( {@link + * Expression}, {@link Seq}<{@link String}>, {@link DataType}, {@link Boolean}, {@link + * SQLConf} ) since Spark v3.4.0 was used as reference. + */ + @NotNull private static ArrayTransform reconcileArrayType( + Expression col, + ArrayType colType, + ArrayType expected, + List columnPath, + SQLConf conf) { + boolean containsNull = colType.containsNull(); + DataType et = colType.elementType(); + if (containsNull && !expected.containsNull()) { + String msg = + String.format( + "Not Null Constraint Violation in Array Element. Column Path: %s", + columnPath); + throw new IllegalArgumentException(msg); + } + + NamedLambdaVariable param = + new NamedLambdaVariable( + "x", + et, + containsNull, + NamedExpressionId.newExprId(), + new AtomicReference<>()); + + List newColumnPath = new ArrayList<>(columnPath); + newColumnPath.add("element"); + Expression reconciledElement = + reconcileColumnType( + param, + newColumnPath, + expected.elementType(), + expected.containsNull(), + conf); + + List argList = new ArrayList<>(); + argList.add(param); + Seq arguments = + JavaConverters.asScalaIteratorConverter(argList.iterator()).asScala().toSeq(); + + LambdaFunction func = new LambdaFunction(reconciledElement, arguments, false); + return new ArrayTransform(col, func); + } + + /** + * This function is developed in Java and {@link + * org.apache.spark.sql.catalyst.plans.logical.Project$#MODULE$}.reconcileColumnType( {@link + * Expression}, {@link Seq}<{@link String}>, {@link DataType}, {@link Boolean}, {@link + * SQLConf} ) since Spark v3.4.0 was used as reference. + */ + private static Expression reconcileStructType( + Expression col, + StructType colType, + StructType expected, + List columnPath, + SQLConf conf) { + StructField[] fields = colType.fields(); + List> fieldExps = new ArrayList<>(fields.length); + for (int index = 0; index < fields.length; index++) { + StructField f = fields[index]; + if (col.nullable()) { + Tuple2 fieldExpr = + new Tuple2<>( + f.name(), + new GetStructField( + new KnownNotNull(col), index, Option.apply(null))); + fieldExps.add(fieldExpr); + } else { + Tuple2 fieldExpr = + new Tuple2<>(f.name(), new GetStructField(col, index, Option.apply(null))); + fieldExps.add(fieldExpr); + } + } + StructField[] expectedFields = expected.fields(); + + List newFields = + reorderFields(fieldExps, Arrays.asList(expectedFields), columnPath, conf); + Seq newFieldSeq = + JavaConverters.asScalaIteratorConverter( + newFields.stream() + .map( + (NamedExpression nf) -> { + return (Expression) nf; + }) + .collect(Collectors.toList()) + .iterator()) + .asScala() + .toSeq(); + + CreateNamedStruct ns = CreateStruct.apply(newFieldSeq); + if (col.nullable()) { + return new If(new IsNull(col), new Literal(null, expected), ns); + } else { + return ns; + } + } + + /** + * This function is developed in Java and {@link + * org.apache.spark.sql.catalyst.plans.logical.Project$#MODULE$}.recordFields( {@link + * Seq}<{@link Tuple2}<{@link String}, {@link Expression}>>, {@link Seq}<{@link + * StructField}>, {@link Seq}<{@link String}>, {@link SQLConf} ) since Spark v3.4.0 was + * used as reference. + */ + private static List reorderFields( + List> fields, + List expected, + List columnPath, + SQLConf conf) { + + Function2 resolver = conf.resolver(); + return expected.stream() + .map( + (StructField f) -> { + List> matched = + fields.stream() + .filter( + (Tuple2 fieldExpr) -> { + return (Boolean) + resolver.apply( + fieldExpr._1(), f.name()); + }) + .collect(Collectors.toList()); + + if (matched.isEmpty()) { + if (f.nullable()) { + Literal columnExpr = Literal.create(null, f.dataType()); + return createNewColumn( + columnExpr, f.name(), f.metadata(), Metadata.empty()); + } else { + if (columnPath.isEmpty()) { + List candidates = + fields.stream() + .map(Tuple2::_1) + .collect(Collectors.toList()); + String msg = + String.format( + "Unresolved Column: %s, Candidates: %s.", + f.name(), candidates); + throw new IllegalArgumentException(msg); + } else { + String msg = + String.format( + "UnresolvedColumns: %s | Column path: %s.", + f.name(), columnPath); + throw new IllegalArgumentException(msg); + } + } + } else if (matched.size() > 1) { + + columnPath.add(f.name()); + String msg = + String.format( + "AmbiguousColumnOrField: ColumnPath: %s| Matched length: %d.", + matched.size()); + throw new IllegalArgumentException(msg); + } else { + Expression columnExpr = matched.get(0)._2(); + Metadata originalMetadata; + if (columnExpr instanceof NamedExpression) { + NamedExpression ne = (NamedExpression) columnExpr; + originalMetadata = ne.metadata(); + } else if (columnExpr instanceof GetStructField) { + GetStructField g = (GetStructField) columnExpr; + StructType childSchema = g.childSchema(); + originalMetadata = childSchema.apply(g.ordinal()).metadata(); + } else { + originalMetadata = Metadata.empty(); + } + + List newColumnPath = new ArrayList<>(columnPath); + newColumnPath.add(matched.get(0)._1); + Expression newColumnExpr = + reconcileColumnType( + columnExpr, + newColumnPath, + f.dataType(), + f.nullable(), + conf); + return createNewColumn( + newColumnExpr, f.name(), f.metadata(), originalMetadata); + } + }) + .collect(Collectors.toList()); + } + + /** + * This function is developed in Java and {@link + * org.apache.spark.sql.catalyst.plans.logical.Project$#MODULE$}.createNewColumn( {@link + * Expression}, {@link String}, {@link Metadata}, {@link Metadata} ) since Spark v3.4.0 was used + * as reference. + */ + private static NamedExpression createNewColumn( + Expression col, String name, Metadata newMetadata, Metadata originalMetadata) { + Metadata metadata = + new MetadataBuilder() + .withMetadata(originalMetadata) + .withMetadata(newMetadata) + .build(); + + if (col instanceof Attribute) { + Attribute a = (Attribute) col; + return a.withName(name).withMetadata(metadata); + } + + if (metadata == Metadata.empty()) { + Option explicitMetadata = Option.apply(null); + return AliasExpressionCreator.create(col, name, explicitMetadata); + } else { + Option explicitMetadata = Option.apply(metadata); + return AliasExpressionCreator.create(col, name, explicitMetadata); + } + } + + private static String toSQLId(Seq parts) { + List jParts = JavaConverters.seqAsJavaListConverter(parts).asJava(); + String sqlId = + jParts.stream() + .map( + (String name) -> { + return "`" + name.replace("`", "``") + "`"; + }) + .collect(Collectors.joining(".")); + return sqlId; + } + + /** + * Execute a block of code with the this session set as the active session, and restore the + * previous session on completion. + * + *

This function is developed in Java and {@link SparkSession#withActive(Function0)} function + * was used as reference. + */ + private static T withActive(SparkSession sparkSession, Supplier block) { + SparkSession old = SparkSession.getActiveSession().get(); + SparkSession.setActiveSession(sparkSession); + try { + return block.get(); + } finally { + SparkSession.setActiveSession(old); + } + } + + /** + * A convenient function to wrap a logical plan and produce a {@link Dataset}<{@link + * Row}>. + * + *

This function is developed in Java and {@link Dataset}.withPlan( {@link LogicalPlan} ) + * function was used as reference. + */ + private static Dataset withPlan(SparkSession sparkSession, LogicalPlan logicalPlan) { + return Dataset.ofRows(sparkSession, logicalPlan); + } +}