From 41173357f847f7bc72119b3e0ebd5019f94a4c68 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 28 Sep 2023 16:55:42 +0800 Subject: [PATCH] [Improve] Refactor CatalogTable and add `SeaTunnelSource::getProducedCatalogTables` (#5562) --- .../seatunnel/api/source/SeaTunnelSource.java | 18 +++- .../api/table/catalog/CatalogTableUtil.java | 23 ++++- .../api/table/factory/FactoryUtil.java | 92 ++++++++++--------- .../table/factory/SupportMultipleTable.java | 56 ----------- .../table/factory/TableFactoryContext.java | 33 +------ .../api/table/factory/TableSinkFactory.java | 2 +- .../factory/TableSinkFactoryContext.java | 35 +++++++ .../api/table/factory/TableSourceFactory.java | 2 +- .../factory/TableSourceFactoryContext.java | 30 ++++++ .../table/factory/TableTransformFactory.java | 2 +- .../factory/TableTransformFactoryContext.java | 37 ++++++++ .../api/table/type/SeaTunnelRow.java | 4 +- .../cdc/base/source/IncrementalSource.java | 13 ++- .../cdc/mongodb/MongodbIncrementalSource.java | 8 +- .../MongodbIncrementalSourceFactory.java | 41 +++------ .../mysql/source/MySqlIncrementalSource.java | 7 +- .../source/MySqlIncrementalSourceFactory.java | 39 +++----- .../source/SqlServerIncrementalSource.java | 8 +- .../SqlServerIncrementalSourceFactory.java | 38 +++----- .../console/sink/ConsoleSinkFactory.java | 4 +- .../sqlserver/SqlServerDataTypeConvertor.java | 1 + .../seatunnel/jdbc/sink/JdbcSinkFactory.java | 4 +- .../jdbc/source/JdbcSourceFactory.java | 70 +++++++++++--- .../kafka/sink/KafkaSinkFactory.java | 4 +- .../starrocks/sink/StarRocksSinkFactory.java | 4 +- .../actions/ShuffleMultipleRowStrategy.java | 10 +- .../parse/MultipleTableJobConfigParser.java | 67 +++++--------- .../dag/execution/ExecutionPlanGenerator.java | 14 +-- .../dag/physical/PhysicalPlanGenerator.java | 7 -- .../server/task/SourceSeaTunnelTask.java | 14 ++- .../copy/CopyFieldTransformFactory.java | 7 +- .../FieldMapperTransformFactory.java | 6 +- .../filter/FilterFieldTransformFactory.java | 6 +- .../FilterRowKindTransformFactory.java | 6 +- .../replace/ReplaceTransformFactory.java | 6 +- .../split/SplitTransformFactory.java | 7 +- .../transform/sql/SQLTransformFactory.java | 6 +- 37 files changed, 395 insertions(+), 336 deletions(-) delete mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java index 0535de68b8e..924c2e52443 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java @@ -21,9 +21,11 @@ import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle; import org.apache.seatunnel.api.serialization.DefaultSerializer; import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import java.io.Serializable; +import java.util.List; /** * The interface for Source. It acts like a factory class that helps construct the {@link @@ -49,9 +51,23 @@ public interface SeaTunnelSource getProducedType(); + @Deprecated + default SeaTunnelDataType getProducedType() { + throw new UnsupportedOperationException("getProducedType method has not been implemented."); + } + + /** + * Get the catalog tables output by this source, It is recommended that all connectors implement + * this method instead of {@link #getProducedType}. CatalogTable contains more information to + * help downstream support more accurate and complete synchronization capabilities. + */ + default List getProducedCatalogTables() { + throw new UnsupportedOperationException( + "getProducedCatalogTables method has not been implemented."); + } /** * Create source reader, used to produce data. diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java index a9b921ce5bb..def005eeb6b 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java @@ -33,8 +33,10 @@ import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.MultipleRowType; import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.table.type.SqlType; import org.apache.seatunnel.common.utils.JsonUtils; @@ -138,9 +140,14 @@ public static List getCatalogTables(Config config, ClassLoader cla @Deprecated public static List getCatalogTablesFromConfig( ReadonlyConfig readonlyConfig, ClassLoader classLoader) { - // We use plugin_name as factoryId, so MySQL-CDC should be MySQL String factoryId = readonlyConfig.get(CommonOptions.PLUGIN_NAME).replace("-CDC", ""); + return getCatalogTablesFromConfig(factoryId, readonlyConfig, classLoader); + } + + @Deprecated + public static List getCatalogTablesFromConfig( + String factoryId, ReadonlyConfig readonlyConfig, ClassLoader classLoader) { // Highest priority: specified schema Map schemaMap = readonlyConfig.get(CatalogTableUtil.SCHEMA); if (schemaMap != null) { @@ -188,6 +195,20 @@ public static CatalogTable buildWithConfig(Config config) { return buildWithConfig(readonlyConfig); } + public static SeaTunnelDataType convertToDataType( + List catalogTables) { + if (catalogTables.size() == 1) { + return catalogTables.get(0).getTableSchema().toPhysicalRowDataType(); + } else { + Map rowTypeMap = new HashMap<>(); + for (CatalogTable catalogTable : catalogTables) { + String tableId = catalogTable.getTableId().toTablePath().toString(); + rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType()); + } + return new MultipleRowType(rowTypeMap); + } + } + public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) { if (readonlyConfig.get(SCHEMA) == null) { throw new RuntimeException( diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index f3090026991..48ed785c39b 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -21,13 +21,20 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.ConfigValidator; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.env.ParsingMode; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceOptions; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.source.SupportParallelism; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSource; +import org.apache.seatunnel.api.table.type.MultipleRowType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.slf4j.Logger; @@ -55,10 +62,11 @@ public final class FactoryUtil { private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class); + static final String DEFAULT_ID = "default-identifier"; + public static List, List>> createAndPrepareSource( - List multipleTables, ReadonlyConfig options, ClassLoader classLoader, String factoryIdentifier) { @@ -67,32 +75,44 @@ public final class FactoryUtil { final TableSourceFactory factory = discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier); List, List>> sources = - new ArrayList<>(multipleTables.size()); - if (factory instanceof SupportMultipleTable) { - List remainingTables = multipleTables; - while (!remainingTables.isEmpty()) { - TableFactoryContext context = - new TableFactoryContext(remainingTables, options, classLoader); - SupportMultipleTable.Result result = - ((SupportMultipleTable) factory).applyTables(context); - List acceptedTables = result.getAcceptedTables(); - sources.add( - new Tuple2<>( - createAndPrepareSource( - factory, acceptedTables, options, classLoader), - acceptedTables)); - remainingTables = result.getRemainingTables(); - } - } else { - for (CatalogTable catalogTable : multipleTables) { - List acceptedTables = Collections.singletonList(catalogTable); - sources.add( - new Tuple2<>( - createAndPrepareSource( - factory, acceptedTables, options, classLoader), - acceptedTables)); + new ArrayList<>(); + SeaTunnelSource source = + createAndPrepareSource(factory, options, classLoader); + List catalogTables; + try { + catalogTables = source.getProducedCatalogTables(); + } catch (UnsupportedOperationException e) { + // TODO remove it when all connector use `getProducedCatalogTables` + SeaTunnelDataType seaTunnelDataType = source.getProducedType(); + final String tableId = + options.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID); + if (seaTunnelDataType instanceof MultipleRowType) { + catalogTables = new ArrayList<>(); + for (String id : ((MultipleRowType) seaTunnelDataType).getTableIds()) { + catalogTables.add( + CatalogTableUtil.getCatalogTable( + id, ((MultipleRowType) seaTunnelDataType).getRowType(id))); + } + } else { + catalogTables = + Collections.singletonList( + CatalogTableUtil.getCatalogTable( + tableId, (SeaTunnelRowType) seaTunnelDataType)); } } + LOG.info( + "get the CatalogTable from source {}: {}", + source.getPluginName(), + catalogTables.stream() + .map(CatalogTable::getTableId) + .map(TableIdentifier::toString) + .collect(Collectors.joining(","))); + if (options.get(SourceOptions.DAG_PARSING_MODE) == ParsingMode.SHARDING) { + CatalogTable catalogTable = catalogTables.get(0); + catalogTables.clear(); + catalogTables.add(catalogTable); + } + sources.add(new Tuple2<>(source, catalogTables)); return sources; } catch (Throwable t) { throw new FactoryException( @@ -104,22 +124,13 @@ public final class FactoryUtil { private static SeaTunnelSource createAndPrepareSource( - TableSourceFactory factory, - List acceptedTables, - ReadonlyConfig options, - ClassLoader classLoader) { - TableFactoryContext context = new TableFactoryContext(acceptedTables, options, classLoader); + TableSourceFactory factory, ReadonlyConfig options, ClassLoader classLoader) { + TableSourceFactoryContext context = new TableSourceFactoryContext(options, classLoader); ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); TableSource tableSource = factory.createSource(context); - validateAndApplyMetadata(acceptedTables, tableSource); return tableSource.createSource(); } - private static void validateAndApplyMetadata( - List catalogTables, TableSource tableSource) { - // TODO: handle reading metadata - } - public static SeaTunnelSink createAndPrepareSink( CatalogTable catalogTable, @@ -129,9 +140,8 @@ SeaTunnelSink createAndPrepareSi try { TableSinkFactory factory = discoverFactory(classLoader, TableSinkFactory.class, factoryIdentifier); - TableFactoryContext context = - new TableFactoryContext( - Collections.singletonList(catalogTable), options, classLoader); + TableSinkFactoryContext context = + new TableSinkFactoryContext(catalogTable, options, classLoader); ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); return factory.createSink(context).createSink(); } catch (Throwable t) { @@ -293,8 +303,8 @@ public static SeaTunnelTransform createAndPrepareTransform( String factoryIdentifier) { final TableTransformFactory factory = discoverFactory(classLoader, TableTransformFactory.class, factoryIdentifier); - TableFactoryContext context = - new TableFactoryContext( + TableTransformFactoryContext context = + new TableTransformFactoryContext( Collections.singletonList(catalogTable), options, classLoader); ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); return factory.createTransform(context).createTransform(); diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java deleted file mode 100644 index a48fd96f741..00000000000 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.api.table.factory; - -import org.apache.seatunnel.api.table.catalog.CatalogTable; - -import java.util.List; - -/** - * Used to declare that the connector can handle data from multiple tables. - * - *

The expansion of the {@link TableSourceFactory}. - */ -public interface SupportMultipleTable { - - /** A connector can pick tables and return the accepted and remaining tables. */ - Result applyTables(TableFactoryContext context); - - final class Result { - private final List acceptedTables; - private final List remainingTables; - - private Result(List acceptedTables, List remainingTables) { - this.acceptedTables = acceptedTables; - this.remainingTables = remainingTables; - } - - public static Result of( - List acceptedTables, List remainingTables) { - return new Result(acceptedTables, remainingTables); - } - - public List getAcceptedTables() { - return acceptedTables; - } - - public List getRemainingTables() { - return remainingTables; - } - } -} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java index 2fda5fc0641..10436da09b8 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java @@ -18,42 +18,17 @@ package org.apache.seatunnel.api.table.factory; import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import lombok.Getter; -import java.util.List; +@Getter +public abstract class TableFactoryContext { -public class TableFactoryContext { - - private final List catalogTables; - @Getter private final ReadonlyConfig options; + private final ReadonlyConfig options; private final ClassLoader classLoader; - public TableFactoryContext( - List catalogTables, ReadonlyConfig options, ClassLoader classLoader) { - this.catalogTables = catalogTables; + public TableFactoryContext(ReadonlyConfig options, ClassLoader classLoader) { this.options = options; this.classLoader = classLoader; } - - public ClassLoader getClassLoader() { - return this.classLoader; - } - - /** - * Returns a list of tables that need to be processed. - * - *

By default, return only single table. - * - *

If you need multiple tables, implement {@link SupportMultipleTable}. - */ - public List getCatalogTables() { - return catalogTables; - } - - /** @return single table. */ - public CatalogTable getCatalogTable() { - return catalogTables.get(0); - } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java index f0015fa58d2..2fca039e7de 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java @@ -37,7 +37,7 @@ public interface TableSinkFactory createSink( - TableFactoryContext context) { + TableSinkFactoryContext context) { throw new UnsupportedOperationException( "The Factory has not been implemented and the deprecated Plugin will be used."); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java new file mode 100644 index 00000000000..f579adc4165 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.factory; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; + +import lombok.Getter; + +@Getter +public class TableSinkFactoryContext extends TableFactoryContext { + + private final CatalogTable catalogTable; + + public TableSinkFactoryContext( + CatalogTable catalogTable, ReadonlyConfig options, ClassLoader classLoader) { + super(options, classLoader); + this.catalogTable = catalogTable; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java index 30f70efdeac..132d9049582 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java @@ -35,7 +35,7 @@ public interface TableSourceFactory extends Factory { * @param context TableFactoryContext */ default - TableSource createSource(TableFactoryContext context) { + TableSource createSource(TableSourceFactoryContext context) { throw new UnsupportedOperationException( "The Factory has not been implemented and the deprecated Plugin will be used."); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java new file mode 100644 index 00000000000..41b2b39c6e6 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.factory; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import lombok.Getter; + +@Getter +public class TableSourceFactoryContext extends TableFactoryContext { + + public TableSourceFactoryContext(ReadonlyConfig options, ClassLoader classLoader) { + super(options, classLoader); + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java index 33caf328d6f..46c6cfa56ff 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java @@ -32,7 +32,7 @@ public interface TableTransformFactory extends Factory { * @param context TableFactoryContext * @return */ - default TableTransform createTransform(TableFactoryContext context) { + default TableTransform createTransform(TableTransformFactoryContext context) { throw new UnsupportedOperationException( "The Factory has not been implemented and the deprecated Plugin will be used."); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java new file mode 100644 index 00000000000..bf8176c7a8d --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.factory; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; + +import lombok.Getter; + +import java.util.List; + +@Getter +public class TableTransformFactoryContext extends TableFactoryContext { + + private final List catalogTables; + + public TableTransformFactoryContext( + List catalogTables, ReadonlyConfig options, ClassLoader classLoader) { + super(options, classLoader); + this.catalogTables = catalogTables; + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index bd05e0808d8..299026c4076 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.api.table.type; -import org.apache.seatunnel.api.table.factory.SupportMultipleTable; - import java.io.Serializable; import java.util.Arrays; import java.util.Map; @@ -27,7 +25,7 @@ /** SeaTunnel row type. */ public final class SeaTunnelRow implements Serializable { private static final long serialVersionUID = -1L; - /** Table identifier, used for the source connector that {@link SupportMultipleTable}. */ + /** Table identifier. */ private String tableId = ""; /** The kind of change that a row describes in a changelog. */ private RowKind kind = RowKind.INSERT; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java index ed04fb0f5d7..ff13fc30b14 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.source.SupportCoordinate; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; @@ -88,14 +89,19 @@ public abstract class IncrementalSource protected int incrementalParallelism; protected StopConfig stopConfig; + protected List catalogTables; protected StopMode stopMode; protected DebeziumDeserializationSchema deserializationSchema; protected SeaTunnelDataType dataType; - protected IncrementalSource(ReadonlyConfig options, SeaTunnelDataType dataType) { + protected IncrementalSource( + ReadonlyConfig options, + SeaTunnelDataType dataType, + List catalogTables) { this.dataType = dataType; + this.catalogTables = catalogTables; this.readonlyConfig = options; this.startupConfig = getStartupConfig(readonlyConfig); this.stopConfig = getStopConfig(readonlyConfig); @@ -137,6 +143,11 @@ private StopConfig getStopConfig(ReadonlyConfig config) { config.get(SourceOptions.STOP_TIMESTAMP)); } + @Override + public List getProducedCatalogTables() { + return catalogTables; + } + public abstract Option getStartupModeOption(); public abstract Option getStopModeOption(); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java index 41191cfa52b..edcbdea9a9e 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig; @@ -43,6 +44,7 @@ import javax.annotation.Nonnull; +import java.util.List; import java.util.Optional; @NoArgsConstructor @@ -53,8 +55,10 @@ public class MongodbIncrementalSource extends IncrementalSource dataType) { - super(options, dataType); + ReadonlyConfig options, + SeaTunnelDataType dataType, + List catalogTables) { + super(options, dataType, catalogTables); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java index 6215afb74ef..7b816ed3eb0 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java @@ -21,28 +21,22 @@ import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.SupportMultipleTable; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableSourceFactory; -import org.apache.seatunnel.api.table.type.MultipleRowType; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions; import com.google.auto.service.AutoService; -import javax.annotation.Nonnull; - import java.io.Serializable; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.List; @AutoService(Factory.class) -public class MongodbIncrementalSourceFactory implements TableSourceFactory, SupportMultipleTable { +public class MongodbIncrementalSourceFactory implements TableSourceFactory { @Override public String factoryIdentifier() { return MongodbIncrementalSource.IDENTIFIER; @@ -77,28 +71,15 @@ public Class getSourceClass() { @SuppressWarnings("unchecked") @Override public - TableSource createSource(TableFactoryContext context) { + TableSource createSource(TableSourceFactoryContext context) { return () -> { - SeaTunnelDataType dataType; - if (context.getCatalogTables().size() == 1) { - dataType = - context.getCatalogTables().get(0).getTableSchema().toPhysicalRowDataType(); - } else { - Map rowTypeMap = new HashMap<>(); - for (CatalogTable catalogTable : context.getCatalogTables()) { - rowTypeMap.put( - catalogTable.getTableId().toTablePath().toString(), - catalogTable.getTableSchema().toPhysicalRowDataType()); - } - dataType = new MultipleRowType(rowTypeMap); - } + List catalogTables = + CatalogTableUtil.getCatalogTablesFromConfig( + context.getOptions(), context.getClassLoader()); + SeaTunnelDataType dataType = + CatalogTableUtil.convertToDataType(catalogTables); return (SeaTunnelSource) - new MongodbIncrementalSource<>(context.getOptions(), dataType); + new MongodbIncrementalSource<>(context.getOptions(), dataType, catalogTables); }; } - - @Override - public Result applyTables(@Nonnull TableFactoryContext context) { - return Result.of(context.getCatalogTables(), Collections.emptyList()); - } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java index 43d8e505d6b..270b0d7309f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java @@ -50,6 +50,7 @@ import lombok.NoArgsConstructor; import java.time.ZoneId; +import java.util.List; @NoArgsConstructor @AutoService(SeaTunnelSource.class) @@ -58,8 +59,10 @@ public class MySqlIncrementalSource extends IncrementalSource dataType) { - super(options, dataType); + ReadonlyConfig options, + SeaTunnelDataType dataType, + List catalogTables) { + super(options, dataType, catalogTables); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java index 6429fa4b529..60e1105e306 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java @@ -22,15 +22,13 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.SupportMultipleTable; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableSourceFactory; -import org.apache.seatunnel.api.table.type.MultipleRowType; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; @@ -40,12 +38,10 @@ import com.google.auto.service.AutoService; import java.io.Serializable; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.List; @AutoService(Factory.class) -public class MySqlIncrementalSourceFactory implements TableSourceFactory, SupportMultipleTable { +public class MySqlIncrementalSourceFactory implements TableSourceFactory { @Override public String factoryIdentifier() { return MySqlIncrementalSource.IDENTIFIER; @@ -95,28 +91,15 @@ public Class getSourceClass() { @Override public - TableSource createSource(TableFactoryContext context) { + TableSource createSource(TableSourceFactoryContext context) { return () -> { - SeaTunnelDataType dataType; - if (context.getCatalogTables().size() == 1) { - dataType = - context.getCatalogTables().get(0).getTableSchema().toPhysicalRowDataType(); - } else { - Map rowTypeMap = new HashMap<>(); - for (CatalogTable catalogTable : context.getCatalogTables()) { - rowTypeMap.put( - catalogTable.getTableId().toTablePath().toString(), - catalogTable.getTableSchema().toPhysicalRowDataType()); - } - dataType = new MultipleRowType(rowTypeMap); - } + List catalogTables = + CatalogTableUtil.getCatalogTablesFromConfig( + context.getOptions(), context.getClassLoader()); + SeaTunnelDataType dataType = + CatalogTableUtil.convertToDataType(catalogTables); return (SeaTunnelSource) - new MySqlIncrementalSource<>(context.getOptions(), dataType); + new MySqlIncrementalSource<>(context.getOptions(), dataType, catalogTables); }; } - - @Override - public Result applyTables(TableFactoryContext context) { - return Result.of(context.getCatalogTables(), Collections.emptyList()); - } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java index cf9cf84b829..e56fb00423c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.utils.JdbcUrlUtil; @@ -50,6 +51,7 @@ import lombok.NoArgsConstructor; import java.time.ZoneId; +import java.util.List; import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection; import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils.convertFromTable; @@ -62,8 +64,10 @@ public class SqlServerIncrementalSource extends IncrementalSource dataType) { - super(options, dataType); + ReadonlyConfig options, + SeaTunnelDataType dataType, + List catalogTables) { + super(options, dataType, catalogTables); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java index 285d4b79232..7127209aefa 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java @@ -22,15 +22,13 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.SupportMultipleTable; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableSourceFactory; -import org.apache.seatunnel.api.table.type.MultipleRowType; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions; import org.apache.seatunnel.connectors.cdc.base.option.StartupMode; @@ -40,12 +38,10 @@ import com.google.auto.service.AutoService; import java.io.Serializable; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.List; @AutoService(Factory.class) -public class SqlServerIncrementalSourceFactory implements TableSourceFactory, SupportMultipleTable { +public class SqlServerIncrementalSourceFactory implements TableSourceFactory { @Override public String factoryIdentifier() { @@ -100,26 +96,14 @@ public Class getSourceClass() { @Override public - TableSource createSource(TableFactoryContext context) { + TableSource createSource(TableSourceFactoryContext context) { return () -> { - SeaTunnelDataType dataType; - if (context.getCatalogTables().size() == 1) { - dataType = - context.getCatalogTables().get(0).getTableSchema().toPhysicalRowDataType(); - } else { - Map rowTypeMap = new HashMap<>(); - for (CatalogTable catalogTable : context.getCatalogTables()) { - String tableId = catalogTable.getTableId().toTablePath().toString(); - rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType()); - } - dataType = new MultipleRowType(rowTypeMap); - } - return new SqlServerIncrementalSource(context.getOptions(), dataType); + List catalogTables = + CatalogTableUtil.getCatalogTablesFromConfig( + context.getOptions(), context.getClassLoader()); + SeaTunnelDataType dataType = + CatalogTableUtil.convertToDataType(catalogTables); + return new SqlServerIncrementalSource(context.getOptions(), dataType, catalogTables); }; } - - @Override - public SupportMultipleTable.Result applyTables(TableFactoryContext context) { - return SupportMultipleTable.Result.of(context.getCatalogTables(), Collections.emptyList()); - } } diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java index 5a66493aee5..858357d282b 100644 --- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java +++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java @@ -23,8 +23,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import com.google.auto.service.AutoService; @@ -56,7 +56,7 @@ public OptionRule optionRule() { } @Override - public TableSink createSink(TableFactoryContext context) { + public TableSink createSink(TableSinkFactoryContext context) { ReadonlyConfig options = context.getOptions(); return () -> new ConsoleSink( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java index afad20c67c1..7bd3bca6392 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java @@ -80,6 +80,7 @@ public SeaTunnelDataType toSeaTunnelType( case NTEXT: case NVARCHAR: case TEXT: + case XML: return BasicType.STRING_TYPE; case DATE: return LocalTimeType.LOCAL_DATE_TYPE; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index d18ff0d7fdb..d93115a707c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -27,8 +27,8 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; @@ -74,7 +74,7 @@ public String factoryIdentifier() { } @Override - public TableSink createSink(TableFactoryContext context) { + public TableSink createSink(TableSinkFactoryContext context) { ReadonlyConfig config = context.getOptions(); CatalogTable catalogTable = context.getCatalogTable(); Map catalogOptions = config.get(CatalogOptions.CATALOG_OPTIONS); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java index 8c21a842339..264df5eafa0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java @@ -22,12 +22,13 @@ import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -41,6 +42,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; @@ -49,7 +51,9 @@ import java.math.BigDecimal; import java.sql.Connection; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -78,20 +82,35 @@ public String factoryIdentifier() { @Override @SuppressWarnings("unchecked") public - TableSource createSource(TableFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTable(); + TableSource createSource(TableSourceFactoryContext context) { JdbcSourceConfig config = JdbcSourceConfig.of(context.getOptions()); - JdbcConnectionProvider connectionProvider = - new SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig()); - final String querySql = config.getQuery(); JdbcDialect dialect = JdbcDialectLoader.load( config.getJdbcConnectionConfig().getUrl(), config.getJdbcConnectionConfig().getCompatibleMode()); - TableSchema tableSchema = catalogTable.getTableSchema(); - SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); - Optional partitionParameter = - createPartitionParameter(config, tableSchema, connectionProvider); + JdbcConnectionProvider connectionProvider = + new SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig()); + + SeaTunnelRowType rowType; + Optional partitionParameter = Optional.empty(); + try { + CatalogTable catalogTable = + CatalogTableUtil.getCatalogTablesFromConfig( + dialect.dialectName(), + context.getOptions(), + context.getClassLoader()) + .get(0); + TableSchema tableSchema = catalogTable.getTableSchema(); + rowType = tableSchema.toPhysicalRowDataType(); + partitionParameter = createPartitionParameter(config, tableSchema, connectionProvider); + } catch (Exception e) { + try (Connection connection = connectionProvider.getOrEstablishConnection()) { + rowType = initTableField(connection, config, dialect); + } catch (Exception k) { + throw new PrepareFailException("jdbc", PluginType.SOURCE, k.toString()); + } + } + final String querySql = config.getQuery(); JdbcInputFormat inputFormat = new JdbcInputFormat( connectionProvider, @@ -100,18 +119,20 @@ TableSource createSource(TableFactoryContext context) { querySql, config.getFetchSize(), config.getJdbcConnectionConfig().isAutoCommit()); + Optional finalPartitionParameter = partitionParameter; + SeaTunnelRowType finalRowType = rowType; return () -> (SeaTunnelSource) new JdbcSource( config, - rowType, + finalRowType, dialect, inputFormat, - partitionParameter.orElse(null), + finalPartitionParameter.orElse(null), connectionProvider, - partitionParameter.isPresent() + finalPartitionParameter.isPresent() ? obtainPartitionSql( - dialect, partitionParameter.get(), querySql) + dialect, finalPartitionParameter.get(), querySql) : querySql); } @@ -132,6 +153,27 @@ static String obtainPartitionSql( partitionParameter.getPartitionColumnName()); } + private SeaTunnelRowType initTableField( + Connection conn, JdbcSourceConfig jdbcSourceConfig, JdbcDialect jdbcDialect) { + JdbcDialectTypeMapper jdbcDialectTypeMapper = jdbcDialect.getJdbcDialectTypeMapper(); + ArrayList> seaTunnelDataTypes = new ArrayList<>(); + ArrayList fieldNames = new ArrayList<>(); + try { + ResultSetMetaData resultSetMetaData = + jdbcDialect.getResultSetMetaData(conn, jdbcSourceConfig); + for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { + // Support AS syntax + fieldNames.add(resultSetMetaData.getColumnLabel(i)); + seaTunnelDataTypes.add(jdbcDialectTypeMapper.mapping(resultSetMetaData, i)); + } + } catch (Exception e) { + log.warn("get row type info exception", e); + } + return new SeaTunnelRowType( + fieldNames.toArray(new String[0]), + seaTunnelDataTypes.toArray(new SeaTunnelDataType[0])); + } + public static Optional createPartitionParameter( JdbcSourceConfig config, TableSchema tableSchema, diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java index 450ba3f1cde..c9a365511d5 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java @@ -20,8 +20,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config; import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat; @@ -56,7 +56,7 @@ public OptionRule optionRule() { } @Override - public TableSink createSink(TableFactoryContext context) { + public TableSink createSink(TableSinkFactoryContext context) { return () -> new KafkaSink( context.getOptions(), diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java index c0159c5fd42..7c3592411a4 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java @@ -21,8 +21,8 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions; @@ -60,7 +60,7 @@ public OptionRule optionRule() { } @Override - public TableSink createSink(TableFactoryContext context) { + public TableSink createSink(TableSinkFactoryContext context) { SinkConfig sinkConfig = SinkConfig.of(context.getOptions()); CatalogTable catalogTable = context.getCatalogTable(); if (StringUtils.isBlank(sinkConfig.getTable())) { diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java index b5dcdf0534c..18fea5fb287 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java @@ -17,11 +17,10 @@ package org.apache.seatunnel.engine.core.dag.actions; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; -import org.apache.seatunnel.api.table.type.MultipleRowType; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import com.hazelcast.collection.IQueue; import com.hazelcast.core.HazelcastInstance; @@ -33,6 +32,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -44,7 +44,7 @@ @Setter @ToString public class ShuffleMultipleRowStrategy extends ShuffleStrategy { - private MultipleRowType inputRowType; + private List catalogTables; private String targetTableId; @Tolerate @@ -54,8 +54,8 @@ public ShuffleMultipleRowStrategy() {} public Map>> createShuffles( HazelcastInstance hazelcast, int pipelineId, int inputIndex) { Map>> shuffleMap = new HashMap<>(); - for (Map.Entry entry : inputRowType) { - String tableId = entry.getKey(); + for (CatalogTable entry : catalogTables) { + String tableId = entry.getTableId().toTablePath().toString(); String queueName = generateQueueName(pipelineId, inputIndex, tableId); IQueue> queue = getIQueue(hazelcast, queueName); // clear old data when job restore diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index c83ceade12c..d38b5aacb7c 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -23,16 +23,12 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.env.EnvCommonOptions; -import org.apache.seatunnel.api.env.ParsingMode; import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SupportDataSaveMode; import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.source.SourceOptions; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; -import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -78,7 +74,6 @@ import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Queue; @@ -303,31 +298,15 @@ public Tuple2>> parseSource( factoryId, (factory) -> factory.createSource(null)); - final List catalogTables = new ArrayList<>(); - if (!fallback) { - List tables = - CatalogTableUtil.getCatalogTables(sourceConfig, classLoader); - if (!tables.isEmpty()) { - catalogTables.addAll(tables); - } - } - - if (fallback || catalogTables.isEmpty()) { + if (fallback) { Tuple2 tuple = fallbackParser.parseSource(sourceConfig, jobConfig, tableId, parallelism); return new Tuple2<>(tableId, Collections.singletonList(tuple)); } - if (readonlyConfig.get(SourceOptions.DAG_PARSING_MODE) == ParsingMode.SHARDING) { - CatalogTable shardingTable = catalogTables.get(0); - catalogTables.clear(); - catalogTables.add(shardingTable); - } - List, List>> sources = - FactoryUtil.createAndPrepareSource( - catalogTables, readonlyConfig, classLoader, factoryId); + FactoryUtil.createAndPrepareSource(readonlyConfig, classLoader, factoryId); Set factoryUrls = getFactoryUrls(readonlyConfig, classLoader, TableSourceFactory.class, factoryId); @@ -465,9 +444,26 @@ private void parseTransform( public static SeaTunnelDataType getProducedType(Action action) { if (action instanceof SourceAction) { - return ((SourceAction) action).getSource().getProducedType(); + try { + return ((SourceAction) action) + .getSource() + .getProducedCatalogTables() + .get(0) + .getSeaTunnelRowType(); + } catch (UnsupportedOperationException e) { + // TODO remove it when all connector use `getProducedCatalogTables` + return ((SourceAction) action).getSource().getProducedType(); + } } else if (action instanceof TransformAction) { - return ((TransformAction) action).getTransform().getProducedType(); + try { + return ((TransformAction) action) + .getTransform() + .getProducedCatalogTable() + .getSeaTunnelRowType(); + } catch (UnsupportedOperationException e) { + // TODO remove it when all connector use `getProducedCatalogTables` + return ((TransformAction) action).getTransform().getProducedType(); + } } throw new UnsupportedOperationException(); } @@ -534,13 +530,6 @@ private static T findLast(LinkedHashMap map) { return fallbackParser.parseSinks(configIndex, inputVertices, sinkConfig, jobConfig); } - Map tableMap = - CatalogTableUtil.getCatalogTables(sinkConfig, classLoader).stream() - .collect( - Collectors.toMap( - catalogTable -> catalogTable.getTableId().toTablePath(), - catalogTable -> catalogTable)); - // get factory urls Set factoryUrls = getFactoryUrls(readonlyConfig, classLoader, TableSinkFactory.class, factoryId); @@ -558,7 +547,6 @@ private static T findLast(LinkedHashMap map) { SinkAction sinkAction = createSinkAction( inputActionSample._1(), - tableMap, inputActions, readonlyConfig, classLoader, @@ -575,7 +563,6 @@ private static T findLast(LinkedHashMap map) { SinkAction sinkAction = createSinkAction( tuple._1(), - tableMap, Collections.singleton(tuple._2()), readonlyConfig, classLoader, @@ -590,7 +577,6 @@ private static T findLast(LinkedHashMap map) { private SinkAction createSinkAction( CatalogTable catalogTable, - Map sinkTableMap, Set inputActions, ReadonlyConfig readonlyConfig, ClassLoader classLoader, @@ -598,17 +584,6 @@ private static T findLast(LinkedHashMap map) { String factoryId, int parallelism, int configIndex) { - Optional insteadTable; - if (sinkTableMap.size() == 1) { - insteadTable = sinkTableMap.values().stream().findFirst(); - } else { - // TODO: another table full name map - insteadTable = - Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath())); - } - if (insteadTable.isPresent()) { - catalogTable = insteadTable.get(); - } SeaTunnelSink sink = FactoryUtil.createAndPrepareSink( catalogTable, readonlyConfig, classLoader, factoryId); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java index d7beaf3a32f..7e23c0b123a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java @@ -17,9 +17,7 @@ package org.apache.seatunnel.engine.server.dag.execution; -import org.apache.seatunnel.api.table.type.MultipleRowType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.engine.common.config.server.CheckpointConfig; import org.apache.seatunnel.engine.common.utils.IdGenerator; @@ -216,8 +214,12 @@ private Set generateShuffleEdges(Set executionEdge } ExecutionVertex sourceExecutionVertex = sourceExecutionVertices.stream().findFirst().get(); SourceAction sourceAction = (SourceAction) sourceExecutionVertex.getAction(); - SeaTunnelDataType sourceProducedType = sourceAction.getSource().getProducedType(); - if (!SqlType.MULTIPLE_ROW.equals(sourceProducedType.getSqlType())) { + List producedCatalogTables = new ArrayList<>(); + try { + producedCatalogTables = sourceAction.getSource().getProducedCatalogTables(); + } catch (UnsupportedOperationException e) { + } + if (producedCatalogTables.size() <= 1) { return executionEdges; } @@ -234,7 +236,7 @@ private Set generateShuffleEdges(Set executionEdge ShuffleMultipleRowStrategy.builder() .jobId(jobImmutableInformation.getJobId()) .inputPartitions(sourceAction.getParallelism()) - .inputRowType(MultipleRowType.class.cast(sourceProducedType)) + .catalogTables(producedCatalogTables) .queueEmptyQueueTtl((int) (checkpointConfig.getCheckpointInterval() * 3)) .build(); ShuffleConfig shuffleConfig = diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java index 5d0f571d497..25f1e850f5e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.engine.server.dag.physical; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; -import org.apache.seatunnel.api.table.type.MultipleRowType; import org.apache.seatunnel.engine.common.config.server.QueueType; import org.apache.seatunnel.engine.common.utils.IdGenerator; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; @@ -67,7 +66,6 @@ import java.io.IOException; import java.net.URL; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -323,11 +321,6 @@ private List getShuffleTask( SinkAction sinkAction = (SinkAction) sinkFlow.getAction(); String sinkTableId = sinkAction.getConfig().getMultipleRowTableId(); - MultipleRowType multipleRowType = - shuffleMultipleRowStrategy.getInputRowType(); - int sinkTableIndex = - Arrays.asList(multipleRowType.getTableIds()) - .indexOf(sinkTableId); long taskIDPrefix = idGenerator.getNextId(); long taskGroupIDPrefix = idGenerator.getNextId(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java index 80a0dff04e4..2a4a129adb9 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java @@ -21,6 +21,9 @@ import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig; @@ -73,13 +76,22 @@ public void init() throws Exception { "SourceSeaTunnelTask only support SourceFlowLifeCycle, but get " + startFlowLifeCycle.getClass().getName()); } else { + SeaTunnelDataType sourceProducedType; + try { + List producedCatalogTables = + sourceFlow.getAction().getSource().getProducedCatalogTables(); + sourceProducedType = CatalogTableUtil.convertToDataType(producedCatalogTables); + } catch (UnsupportedOperationException e) { + // TODO remove it when all connector use `getProducedCatalogTables` + sourceProducedType = sourceFlow.getAction().getSource().getProducedType(); + } this.collector = new SeaTunnelSourceCollector<>( checkpointLock, outputs, this.getMetricsContext(), getFlowControlStrategy(), - sourceFlow.getAction().getSource().getProducedType()); + sourceProducedType); ((SourceFlowLifeCycle) startFlowLifeCycle).setCollector(collector); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java index 2271fa641b8..d8d8a97962e 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java @@ -21,11 +21,10 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; import com.google.auto.service.AutoService; -import lombok.NonNull; @AutoService(Factory.class) public class CopyFieldTransformFactory implements TableTransformFactory { @@ -43,9 +42,9 @@ public OptionRule optionRule() { } @Override - public TableTransform createTransform(@NonNull TableFactoryContext context) { + public TableTransform createTransform(TableTransformFactoryContext context) { CopyTransformConfig copyTransformConfig = CopyTransformConfig.of(context.getOptions()); - CatalogTable catalogTable = context.getCatalogTable(); + CatalogTable catalogTable = context.getCatalogTables().get(0); return () -> new CopyFieldTransform(copyTransformConfig, catalogTable); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java index 9e6334516b8..b7382175ba4 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java @@ -22,8 +22,8 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; import com.google.auto.service.AutoService; @@ -40,8 +40,8 @@ public OptionRule optionRule() { } @Override - public TableTransform createTransform(TableFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTable(); + public TableTransform createTransform(TableTransformFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTables().get(0); ReadonlyConfig options = context.getOptions(); FieldMapperTransformConfig fieldMapperTransformConfig = FieldMapperTransformConfig.of(options); diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java index 14259a0a85b..f562a7cc28b 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java @@ -21,8 +21,8 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; import com.google.auto.service.AutoService; @@ -41,8 +41,8 @@ public OptionRule optionRule() { } @Override - public TableTransform createTransform(TableFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTable(); + public TableTransform createTransform(TableTransformFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTables().get(0); return () -> new FilterFieldTransform(context.getOptions(), catalogTable); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java index 9e89ebe5e2c..2191e30bc52 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java @@ -21,8 +21,8 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; import com.google.auto.service.AutoService; @@ -43,8 +43,8 @@ public OptionRule optionRule() { } @Override - public TableTransform createTransform(TableFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTable(); + public TableTransform createTransform(TableTransformFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTables().get(0); return () -> new FilterRowKindTransform(context.getOptions(), catalogTable); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java index 25696ba6e6d..c0bed8977da 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java @@ -21,8 +21,8 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; import com.google.auto.service.AutoService; @@ -49,8 +49,8 @@ public OptionRule optionRule() { } @Override - public TableTransform createTransform(TableFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTable(); + public TableTransform createTransform(TableTransformFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTables().get(0); return () -> new ReplaceTransform(context.getOptions(), catalogTable); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java index 91281251e91..32d660860b9 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java @@ -21,11 +21,10 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; import com.google.auto.service.AutoService; -import lombok.NonNull; @AutoService(Factory.class) public class SplitTransformFactory implements TableTransformFactory { @@ -45,9 +44,9 @@ public OptionRule optionRule() { } @Override - public TableTransform createTransform(@NonNull TableFactoryContext context) { + public TableTransform createTransform(TableTransformFactoryContext context) { SplitTransformConfig splitTransformConfig = SplitTransformConfig.of(context.getOptions()); - CatalogTable catalogTable = context.getCatalogTable(); + CatalogTable catalogTable = context.getCatalogTables().get(0); return () -> new SplitTransform(splitTransformConfig, catalogTable); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java index f509af832cb..5c4abf53c07 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java @@ -21,8 +21,8 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; import com.google.auto.service.AutoService; @@ -41,8 +41,8 @@ public OptionRule optionRule() { } @Override - public TableTransform createTransform(TableFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTable(); + public TableTransform createTransform(TableTransformFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTables().get(0); return () -> new SQLTransform(context.getOptions(), catalogTable); } }