diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java index fc28001b2ca..60591d9893c 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.InfoPreviewResult; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.PreviewResult; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; @@ -36,6 +37,7 @@ import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils; import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; @@ -48,8 +50,11 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; @@ -222,7 +227,8 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) } public CatalogTable toCatalogTable(Table icebergTable, TablePath tablePath) { - List columns = icebergTable.schema().columns(); + Schema schema = icebergTable.schema(); + List columns = schema.columns(); TableSchema.Builder builder = TableSchema.builder(); columns.forEach( nestedField -> { @@ -234,12 +240,19 @@ public CatalogTable toCatalogTable(Table icebergTable, TablePath tablePath) { name, seaTunnelType, (Long) null, - true, + nestedField.isOptional(), null, nestedField.doc()); builder.column(physicalColumn); }); - + Optional.ofNullable(schema.identifierFieldNames()) + .map( + (Function, Object>) + names -> + builder.primaryKey( + PrimaryKey.of( + tablePath.getTableName() + "_pk", + new ArrayList<>(names)))); List partitionKeys = icebergTable.spec().fields().stream() .map(PartitionField::name) diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java index 54ea5721ac3..5047746e9e4 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java @@ -36,7 +36,6 @@ import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaDeleteColumn; import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaModifyColumn; -import org.apache.commons.collections.CollectionUtils; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -54,17 +53,17 @@ import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; @@ -161,27 +160,17 @@ private static Table createTable( @NotNull protected static Schema toIcebergSchema( TableSchema tableSchema, ReadonlyConfig readonlyConfig) { Types.StructType structType = SchemaUtils.toIcebergType(tableSchema); - Set identifierFieldIds = new HashSet<>(); - if (Objects.nonNull(readonlyConfig)) { - List pks = - SinkConfig.stringToList(readonlyConfig.get(SinkConfig.TABLE_PRIMARY_KEYS), ","); - if (CollectionUtils.isNotEmpty(pks)) { - for (String pk : pks) { - Optional pkId = - structType.fields().stream() - .filter(nestedField -> nestedField.name().equals(pk)) - .map(Types.NestedField::fieldId) - .findFirst(); - if (!pkId.isPresent()) { - throw new IllegalArgumentException( - String.format( - "iceberg table pk:%s not present in the incoming struct", - pk)); - } - identifierFieldIds.add(pkId.get()); - } - } - } + Set identifierFieldIds = + readonlyConfig.getOptional(SinkConfig.TABLE_PRIMARY_KEYS) + .map(e -> SinkConfig.stringToList(e, ",")) + .orElseGet( + () -> + Optional.ofNullable(tableSchema.getPrimaryKey()) + .map(e -> e.getColumnNames()) + .orElse(Collections.emptyList())) + .stream() + .map(f -> structType.field(f).fieldId()) + .collect(Collectors.toSet()); List fields = new ArrayList<>(); structType .fields() @@ -281,7 +270,7 @@ public static Types.StructType toIcebergType(TableSchema tableSchema) { Types.NestedField icebergField = Types.NestedField.of( idIncrementer.getAndIncrement(), - true, + column.isNullable(), column.getName(), IcebergTypeMapper.toIcebergType(column.getDataType(), idIncrementer), column.getComment()); diff --git a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java index c7c240228bd..6ec5ae5783f 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; @@ -152,7 +153,8 @@ void dropTable() { CatalogTable buildAllTypesTable(TableIdentifier tableIdentifier) { TableSchema.Builder builder = TableSchema.builder(); builder.column( - PhysicalColumn.of("id", BasicType.INT_TYPE, (Long) null, true, null, "id comment")); + PhysicalColumn.of( + "id", BasicType.INT_TYPE, (Long) null, false, null, "id comment")); builder.column( PhysicalColumn.of( "boolean_col", BasicType.BOOLEAN_TYPE, (Long) null, true, null, null)); @@ -185,6 +187,9 @@ CatalogTable buildAllTypesTable(TableIdentifier tableIdentifier) { PhysicalColumn.of( "decimal_col", new DecimalType(38, 18), (Long) null, true, null, null)); builder.column(PhysicalColumn.of("dt_col", STRING_TYPE, (Long) null, true, null, null)); + builder.primaryKey( + PrimaryKey.of( + tableIdentifier.getTableName() + "_pk", Collections.singletonList("id"))); TableSchema schema = builder.build(); HashMap options = new HashMap<>();