From b9791285a09fa247ce542a59423479282b23f1f9 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Sun, 10 Dec 2023 16:09:35 +0800 Subject: [PATCH] [Feature] Add unsupported datatype check for all catalog (#5890) * [Feature] Add unsupported datatype check for all catalog * update * update --- .../seatunnel/api/table/catalog/Catalog.java | 76 ++++++++++++++++--- .../api/table/catalog/CatalogTableTest.java | 54 +++++++++++++ .../api/table/catalog/InMemoryCatalog.java | 48 +++++++++++- .../catalog/ElasticSearchCatalog.java | 30 +++++--- .../jdbc/catalog/AbstractJdbcCatalog.java | 46 ++++++----- .../jdbc/catalog/dm/DamengCatalog.java | 29 +++++++ .../jdbc/catalog/mysql/MySqlCatalog.java | 25 ------ .../jdbc/utils/JdbcCatalogUtils.java | 2 +- .../seatunnel/kudu/catalog/KuduCatalog.java | 21 +++-- .../kudu/kuduclient/KuduTypeMapper.java | 4 +- .../maxcompute/catalog/MaxComputeCatalog.java | 5 ++ .../starrocks/catalog/StarRocksCatalog.java | 37 ++++++--- 12 files changed, 287 insertions(+), 90 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java index 93c17c10fb4..560fa98d3bf 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java @@ -24,13 +24,20 @@ import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.regex.Pattern; /** @@ -58,6 +65,9 @@ default Optional getFactory() { */ void close() throws CatalogException; + /** Get the name of the catalog. */ + String name(); + // -------------------------------------------------------------------------------------------- // database // -------------------------------------------------------------------------------------------- @@ -124,15 +134,10 @@ default Optional getFactory() { default List getTables(ReadonlyConfig config) throws CatalogException { // Get the list of specified tables List tableNames = config.get(CatalogOptions.TABLE_NAMES); - List catalogTables = new ArrayList<>(); if (tableNames != null && !tableNames.isEmpty()) { - for (String tableName : tableNames) { - TablePath tablePath = TablePath.of(tableName); - if (this.tableExists(tablePath)) { - catalogTables.add(this.getTable(tablePath)); - } - } - return catalogTables; + Iterator tablePaths = + tableNames.stream().map(TablePath::of).filter(this::tableExists).iterator(); + return buildCatalogTablesWithErrorCheck(tablePaths); } // Get the list of table pattern @@ -144,17 +149,66 @@ default List getTables(ReadonlyConfig config) throws CatalogExcept Pattern tablePattern = Pattern.compile(config.get(CatalogOptions.TABLE_PATTERN)); List allDatabase = this.listDatabases(); allDatabase.removeIf(s -> !databasePattern.matcher(s).matches()); + List tablePaths = new ArrayList<>(); for (String databaseName : allDatabase) { tableNames = this.listTables(databaseName); - for (String tableName : tableNames) { - if (tablePattern.matcher(databaseName + "." + tableName).matches()) { - catalogTables.add(this.getTable(TablePath.of(databaseName, tableName))); + tableNames.forEach( + tableName -> { + if (tablePattern.matcher(databaseName + "." + tableName).matches()) { + tablePaths.add(TablePath.of(databaseName, tableName)); + } + }); + } + return buildCatalogTablesWithErrorCheck(tablePaths.iterator()); + } + + default List buildCatalogTablesWithErrorCheck(Iterator tablePaths) { + Map> unsupportedTable = new LinkedHashMap<>(); + List catalogTables = new ArrayList<>(); + while (tablePaths.hasNext()) { + try { + catalogTables.add(getTable(tablePaths.next())); + } catch (SeaTunnelRuntimeException e) { + if (e.getSeaTunnelErrorCode() + .equals(CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR)) { + unsupportedTable.put( + e.getParams().get("tableName"), + e.getParamsValueAsMap("fieldWithDataTypes")); + } else { + throw e; } } } + if (!unsupportedTable.isEmpty()) { + throw CommonError.getCatalogTablesWithUnsupportedType(name(), unsupportedTable); + } return catalogTables; } + default void buildColumnsWithErrorCheck( + TablePath tablePath, + TableSchema.Builder builder, + Iterator keys, + Function getColumn) { + Map unsupported = new LinkedHashMap<>(); + while (keys.hasNext()) { + try { + builder.column(getColumn.apply(keys.next())); + } catch (SeaTunnelRuntimeException e) { + if (e.getSeaTunnelErrorCode() + .equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) { + unsupported.put(e.getParams().get("field"), e.getParams().get("dataType")); + } else { + throw e; + } + } + } + if (!unsupported.isEmpty()) { + throw CommonError.getCatalogTableWithUnsupportedType( + name(), tablePath.getFullName(), unsupported); + } + } + /** * Create a new table in this catalog. * diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java index 6bb0890f212..d3c7692b606 100644 --- a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableTest.java @@ -17,9 +17,17 @@ package org.apache.seatunnel.api.table.catalog; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; public class CatalogTableTest { @@ -35,4 +43,50 @@ public void testCatalogTableModifyOptionsOrPartitionKeys() { catalogTable.getOptions().put("test", "value"); catalogTable.getPartitionKeys().add("test"); } + + @Test + public void testReadCatalogTableWithUnsupportedType() { + Catalog catalog = + new InMemoryCatalogFactory() + .createCatalog("InMemory", ReadonlyConfig.fromMap(new HashMap<>())); + SeaTunnelRuntimeException exception = + Assertions.assertThrows( + SeaTunnelRuntimeException.class, + () -> + catalog.getTables( + ReadonlyConfig.fromMap( + new HashMap() { + { + put( + CatalogOptions.TABLE_NAMES.key(), + Arrays.asList( + "unsupported.public.table1", + "unsupported.public.table2")); + } + }))); + Assertions.assertEquals( + "ErrorCode:[COMMON-21], ErrorDescription:['InMemory' tables unsupported get catalog tableļ¼Œ" + + "the corresponding field types in the following tables are not supported: '{\"unsupported.public.table1\"" + + ":{\"field1\":\"interval\",\"field2\":\"interval2\"},\"unsupported.public.table2\":{\"field1\":\"interval\"," + + "\"field2\":\"interval2\"}}']", + exception.getMessage()); + Map> result = new LinkedHashMap<>(); + result.put( + "unsupported.public.table1", + new HashMap() { + { + put("field1", "interval"); + put("field2", "interval2"); + } + }); + result.put( + "unsupported.public.table2", + new HashMap() { + { + put("field1", "interval"); + put("field2", "interval2"); + } + }); + Assertions.assertEquals(result, exception.getParamsValueAs("tableUnsupportedTypes")); + } } diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java index 572955bda50..c745835abd3 100644 --- a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/InMemoryCatalog.java @@ -25,11 +25,15 @@ import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.common.exception.CommonError; + +import org.apache.commons.lang3.tuple.Pair; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,6 +46,7 @@ public class InMemoryCatalog implements Catalog { // database -> tables private final Map> catalogTables; private static final String DEFAULT_DATABASE = "default"; + private static final String UNSUPPORTED_DATABASE = "unsupported"; InMemoryCatalog(String catalogName, ReadonlyConfig options) { this.name = catalogName; @@ -53,6 +58,7 @@ public class InMemoryCatalog implements Catalog { // Add some default table for testing private void addDefaultTable() { this.catalogTables.put(DEFAULT_DATABASE, new ArrayList<>()); + this.catalogTables.put(UNSUPPORTED_DATABASE, new ArrayList<>()); List tables = new ArrayList<>(); this.catalogTables.put("st", tables); TableSchema tableSchema = @@ -92,20 +98,40 @@ private void addDefaultTable() { CatalogTable catalogTable1 = CatalogTable.of( TableIdentifier.of(name, TablePath.of("st", "public", "table1")), - tableSchema, + TableSchema.builder().build(), new HashMap<>(), new ArrayList<>(), "In Memory Table"); CatalogTable catalogTable2 = CatalogTable.of( TableIdentifier.of(name, TablePath.of("st", "public", "table2")), - tableSchema, + TableSchema.builder().build(), new HashMap<>(), new ArrayList<>(), "In Memory Table", name); tables.add(catalogTable1); tables.add(catalogTable2); + + CatalogTable unsupportedTable1 = + CatalogTable.of( + TableIdentifier.of( + name, TablePath.of(UNSUPPORTED_DATABASE, "public", "table1")), + tableSchema, + new HashMap<>(), + new ArrayList<>(), + "In Memory Table"); + CatalogTable unsupportedTable2 = + CatalogTable.of( + TableIdentifier.of( + name, TablePath.of(UNSUPPORTED_DATABASE, "public", "table2")), + tableSchema, + new HashMap<>(), + new ArrayList<>(), + "In Memory Table", + name); + this.catalogTables.get(UNSUPPORTED_DATABASE).add(unsupportedTable1); + this.catalogTables.get(UNSUPPORTED_DATABASE).add(unsupportedTable2); } @Override @@ -125,6 +151,11 @@ public void close() throws CatalogException { log.trace(String.format("InMemoryCatalog %s closing", name)); } + @Override + public String name() { + return "InMemory"; + } + @Override public String getDefaultDatabase() throws CatalogException { return DEFAULT_DATABASE; @@ -165,6 +196,19 @@ public boolean tableExists(TablePath tablePath) throws CatalogException { public CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException { if (catalogTables.containsKey(tablePath.getDatabaseName())) { + if (tablePath.getDatabaseName().equals(UNSUPPORTED_DATABASE)) { + List> unsupportedFields = + Arrays.asList( + Pair.of("field1", "interval"), Pair.of("field2", "interval2")); + buildColumnsWithErrorCheck( + tablePath, + new TableSchema.Builder(), + unsupportedFields.iterator(), + field -> { + throw CommonError.convertToSeaTunnelTypeError( + name(), field.getValue(), field.getKey()); + }); + } List tables = catalogTables.get(tablePath.getDatabaseName()); return tables.stream() .filter(t -> t.getTableId().toTablePath().equals(tablePath)) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java index 8ca60925b08..1d5ba105f05 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java @@ -91,6 +91,11 @@ public void close() throws CatalogException { esRestClient.close(); } + @Override + public String name() { + return catalogName; + } + @Override public String getDefaultDatabase() throws CatalogException { return defaultDatabase; @@ -142,19 +147,20 @@ public CatalogTable getTable(TablePath tablePath) TableSchema.Builder builder = TableSchema.builder(); Map fieldTypeMapping = esRestClient.getFieldTypeMapping(tablePath.getTableName(), Collections.emptyList()); - fieldTypeMapping.forEach( - (fieldName, fieldType) -> { + buildColumnsWithErrorCheck( + tablePath, + builder, + fieldTypeMapping.entrySet().iterator(), + nameAndType -> { // todo: we need to add a new type TEXT or add length in STRING type - PhysicalColumn physicalColumn = - PhysicalColumn.of( - fieldName, - elasticSearchDataTypeConvertor.toSeaTunnelType( - fieldName, fieldType), - null, - true, - null, - null); - builder.column(physicalColumn); + return PhysicalColumn.of( + nameAndType.getKey(), + elasticSearchDataTypeConvertor.toSeaTunnelType( + nameAndType.getKey(), nameAndType.getValue()), + null, + true, + null, + null); }); return CatalogTable.of( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java index 291333459ef..f695cc30cb1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java @@ -99,6 +99,11 @@ public AbstractJdbcCatalog( this.connectionMap = new ConcurrentHashMap<>(); } + @Override + public String name() { + return catalogName; + } + @Override public String getDefaultDatabase() { return defaultDatabase; @@ -174,24 +179,7 @@ public CatalogTable getTable(TablePath tablePath) ResultSet resultSet = ps.executeQuery()) { TableSchema.Builder builder = TableSchema.builder(); - Map unsupported = new LinkedHashMap<>(); - while (resultSet.next()) { - try { - builder.column(buildColumn(resultSet)); - } catch (SeaTunnelRuntimeException e) { - if (e.getSeaTunnelErrorCode() - .equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) { - unsupported.put( - e.getParams().get("field"), e.getParams().get("dataType")); - } else { - throw e; - } - } - } - if (!unsupported.isEmpty()) { - throw CommonError.getCatalogTableWithUnsupportedType( - catalogName, tablePath.getFullName(), unsupported); - } + buildColumnsWithErrorCheck(tablePath, resultSet, builder); // add primary key primaryKey.ifPresent(builder::primaryKey); // add constraint key @@ -213,6 +201,28 @@ public CatalogTable getTable(TablePath tablePath) } } + protected void buildColumnsWithErrorCheck( + TablePath tablePath, ResultSet resultSet, TableSchema.Builder builder) + throws SQLException { + Map unsupported = new LinkedHashMap<>(); + while (resultSet.next()) { + try { + builder.column(buildColumn(resultSet)); + } catch (SeaTunnelRuntimeException e) { + if (e.getSeaTunnelErrorCode() + .equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) { + unsupported.put(e.getParams().get("field"), e.getParams().get("dataType")); + } else { + throw e; + } + } + } + if (!unsupported.isEmpty()) { + throw CommonError.getCatalogTableWithUnsupportedType( + catalogName, tablePath.getFullName(), unsupported); + } + } + protected Optional getPrimaryKey(DatabaseMetaData metaData, TablePath tablePath) throws SQLException { return getPrimaryKey( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java index edf3e1b3804..219067b68fa 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java @@ -35,8 +35,10 @@ import lombok.extern.slf4j.Slf4j; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -177,6 +179,33 @@ private List listTables() { return listTables(databases.get(0)); } + @Override + public List listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + + try (PreparedStatement ps = + getConnection(defaultUrl) + .prepareStatement("SELECT OWNER, TABLE_NAME FROM ALL_TABLES"); + ResultSet rs = ps.executeQuery()) { + + List tables = new ArrayList<>(); + while (rs.next()) { + if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) { + continue; + } + tables.add(rs.getString(1) + "." + rs.getString(2)); + } + + return tables; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing table in catalog %s", catalogName), e); + } + } + @Override public CatalogTable getTable(String sqlQuery) throws SQLException { Connection defaultConnection = getConnection(defaultUrl); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java index 2ff7b399f31..e21ec7d3d75 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java @@ -20,9 +20,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; -import org.apache.seatunnel.api.table.catalog.ConstraintKey; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; -import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; @@ -36,14 +34,11 @@ import lombok.extern.slf4j.Slf4j; import java.sql.Connection; -import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; -import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Optional; @Slf4j public class MySqlCatalog extends AbstractJdbcCatalog { @@ -97,26 +92,6 @@ protected TableIdentifier getTableIdentifier(TablePath tablePath) { catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); } - @Override - protected Optional getPrimaryKey(DatabaseMetaData metaData, TablePath tablePath) - throws SQLException { - return getPrimaryKey( - metaData, - tablePath.getDatabaseName(), - tablePath.getTableName(), - tablePath.getTableName()); - } - - @Override - protected List getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath) - throws SQLException { - return getConstraintKeys( - metaData, - tablePath.getDatabaseName(), - tablePath.getTableName(), - tablePath.getTableName()); - } - @Override protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("COLUMN_NAME"); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java index a45f7ce7c5c..4d70b369c3a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java @@ -76,7 +76,7 @@ public static Map getTables( log.info("Loading catalog tables for catalog : {}", jdbcCatalog.getClass()); jdbcCatalog.open(); - Map> unsupportedTable = new HashMap<>(); + Map> unsupportedTable = new LinkedHashMap<>(); for (JdbcSourceTableConfig tableConfig : tablesConfig) { try { CatalogTable catalogTable = diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java index 0fa40d8b0e0..b5ba121fb51 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ADMIN_OPERATION_TIMEOUT; import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ENABLE_KERBEROS; @@ -89,6 +90,11 @@ public void close() throws CatalogException { } } + @Override + public String name() { + return catalogName; + } + @Override public String getDefaultDatabase() throws CatalogException { return defaultDatabase; @@ -143,17 +149,20 @@ public CatalogTable getTable(TablePath tablePath) kuduTable.getPartitionSchema(); List columnSchemaList = schema.getColumns(); Optional primaryKey = getPrimaryKey(schema.getPrimaryKeyColumns()); - for (int i = 0; i < columnSchemaList.size(); i++) { - SeaTunnelDataType type = KuduTypeMapper.mapping(columnSchemaList, i); - builder.column( - PhysicalColumn.of( + buildColumnsWithErrorCheck( + tablePath, + builder, + IntStream.range(0, columnSchemaList.size()).iterator(), + i -> { + SeaTunnelDataType type = KuduTypeMapper.mapping(columnSchemaList, i); + return PhysicalColumn.of( columnSchemaList.get(i).getName(), type, columnSchemaList.get(i).getTypeSize(), columnSchemaList.get(i).isNullable(), columnSchemaList.get(i).getDefaultValue(), - columnSchemaList.get(i).getComment())); - } + columnSchemaList.get(i).getComment()); + }); primaryKey.ifPresent(builder::primaryKey); diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java index a088ed21077..7c8383c8ffd 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java @@ -32,15 +32,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.SQLException; import java.util.List; public class KuduTypeMapper { private static final Logger log = LoggerFactory.getLogger(KuduTypeMapper.class); - public static SeaTunnelDataType mapping(List columnSchemaList, int colIndex) - throws SQLException { + public static SeaTunnelDataType mapping(List columnSchemaList, int colIndex) { Type kuduType = columnSchemaList.get(colIndex).getType(); switch (kuduType) { case BOOL: diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java index b131277bd78..6477eb2e365 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java @@ -66,6 +66,11 @@ public void open() throws CatalogException { @Override public void close() throws CatalogException {} + @Override + public String name() { + return catalogName; + } + @Override public String getDefaultDatabase() throws CatalogException { return readonlyConfig.get(PROJECT); diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java index 5caf0358896..0c37322199c 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java @@ -59,6 +59,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.IntStream; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; @@ -167,18 +168,25 @@ public CatalogTable getTable(TablePath tablePath) ResultSetMetaData tableMetaData = ps.getMetaData(); TableSchema.Builder builder = TableSchema.builder(); - for (int i = 1; i <= tableMetaData.getColumnCount(); i++) { - SeaTunnelDataType type = fromJdbcType(tableMetaData, i); - // TODO add default value and test it - builder.column( - PhysicalColumn.of( - tableMetaData.getColumnName(i), - type, - tableMetaData.getColumnDisplaySize(i), - tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable, - null, - tableMetaData.getColumnLabel(i))); - } + buildColumnsWithErrorCheck( + tablePath, + builder, + IntStream.range(1, tableMetaData.getColumnCount() + 1).iterator(), + i -> { + try { + SeaTunnelDataType type = fromJdbcType(tableMetaData, i); + // TODO add default value and test it + return PhysicalColumn.of( + tableMetaData.getColumnName(i), + type, + tableMetaData.getColumnDisplaySize(i), + tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable, + null, + tableMetaData.getColumnLabel(i)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }); primaryKey.ifPresent(builder::primaryKey); @@ -392,6 +400,11 @@ public void close() throws CatalogException { LOG.info("Catalog {} closing", catalogName); } + @Override + public String name() { + return catalogName; + } + protected Optional getPrimaryKey(String schema, String table) throws SQLException { List pkFields = new ArrayList<>();