From 80141841138fd45dfc3dfcd705014ca7731f0d88 Mon Sep 17 00:00:00 2001 From: FANNG Date: Thu, 16 Jan 2025 16:59:47 +0800 Subject: [PATCH] [#6196] feat(iceberg): adjust table distribution if creating table without specifying disribution mode (#6214) ### What changes were proposed in this pull request? Adjust the distribution mode for creating Iceberg table with none distribution. the following is the Spark adjust logic, the flink is similar. ```java private DistributionMode defaultWriteDistributionMode() { if (table.sortOrder().isSorted()) { return RANGE; } else if (table.spec().isPartitioned()) { return HASH; } else { return NONE; } } ``` ### Why are the changes needed? Fix: #6196 ### Does this PR introduce _any_ user-facing change? Yes, add document ### How was this patch tested? add UT and IT --- .../iceberg/IcebergCatalogOperations.java | 18 +++++ .../lakehouse/iceberg/IcebergTable.java | 36 +++++---- .../test/CatalogIcebergBaseIT.java | 80 +++++++++++++++++-- docs/lakehouse-iceberg-catalog.md | 75 +---------------- 4 files changed, 117 insertions(+), 92 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 7b27438d2e5..aef42044c0c 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -58,6 +58,7 @@ import org.apache.gravitino.rel.TableCatalog; import org.apache.gravitino.rel.TableChange; import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.distributions.Distributions; import org.apache.gravitino.rel.expressions.sorts.SortOrder; import org.apache.gravitino.rel.expressions.transforms.Transform; import org.apache.gravitino.rel.indexes.Index; @@ -513,6 +514,13 @@ public Table createTable( .build()) .toArray(IcebergColumn[]::new); + // Gravitino NONE distribution means the client side doesn't specify distribution, which is + // not the same as none distribution in Iceberg. + if (Distributions.NONE.equals(distribution)) { + distribution = + getIcebergDefaultDistribution(sortOrders.length > 0, partitioning.length > 0); + } + IcebergTable createdTable = IcebergTable.builder() .withName(tableIdent.name()) @@ -588,6 +596,16 @@ public void testConnection( } } + private static Distribution getIcebergDefaultDistribution( + boolean isSorted, boolean isPartitioned) { + if (isSorted) { + return Distributions.RANGE; + } else if (isPartitioned) { + return Distributions.HASH; + } + return Distributions.NONE; + } + private static String currentUser() { return PrincipalUtils.getCurrentUserName(); } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 27e3c429e7c..3f2f54c1b3e 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -152,21 +152,6 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam Schema schema = table.schema(); Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema); SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder()); - Distribution distribution = Distributions.NONE; - String distributionName = properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE); - if (null != distributionName) { - switch (DistributionMode.fromName(distributionName)) { - case HASH: - distribution = Distributions.HASH; - break; - case RANGE: - distribution = Distributions.RANGE; - break; - default: - // do nothing - break; - } - } IcebergColumn[] icebergColumns = schema.columns().stream().map(ConvertUtil::fromNestedField).toArray(IcebergColumn[]::new); return IcebergTable.builder() @@ -178,7 +163,7 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam .withAuditInfo(AuditInfo.EMPTY) .withPartitioning(partitionSpec) .withSortOrders(sortOrder) - .withDistribution(distribution) + .withDistribution(getDistribution(properties)) .build(); } @@ -236,4 +221,23 @@ protected IcebergTable internalBuild() { public static Builder builder() { return new Builder(); } + + private static Distribution getDistribution(Map properties) { + Distribution distribution = Distributions.NONE; + String distributionName = properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE); + if (null != distributionName) { + switch (DistributionMode.fromName(distributionName)) { + case HASH: + distribution = Distributions.HASH; + break; + case RANGE: + distribution = Distributions.RANGE; + break; + default: + // do nothing + break; + } + } + return distribution; + } } diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java index fd37441b459..f0162a6ec88 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java @@ -379,6 +379,76 @@ void testCreateTableWithNullComment() { Assertions.assertNull(loadTable.comment()); } + @Test + void testCreateTableWithNoneDistribution() { + // Create table from Gravitino API + Column[] columns = createColumns(); + + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Distribution distribution = Distributions.NONE; + + final SortOrder[] sortOrders = + new SortOrder[] { + SortOrders.of( + NamedReference.field(ICEBERG_COL_NAME2), + SortDirection.DESCENDING, + NullOrdering.NULLS_FIRST) + }; + + Transform[] partitioning = new Transform[] {Transforms.day(columns[1].name())}; + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table tableWithPartitionAndSortorder = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertEquals(tableName, tableWithPartitionAndSortorder.name()); + Assertions.assertEquals(Distributions.RANGE, tableWithPartitionAndSortorder.distribution()); + + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(Distributions.RANGE, loadTable.distribution()); + tableCatalog.dropTable(tableIdentifier); + + Table tableWithPartition = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + new SortOrder[0]); + Assertions.assertEquals(tableName, tableWithPartition.name()); + Assertions.assertEquals(Distributions.HASH, tableWithPartition.distribution()); + + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(Distributions.HASH, loadTable.distribution()); + tableCatalog.dropTable(tableIdentifier); + + Table tableWithoutPartitionAndSortOrder = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + new Transform[0], + distribution, + new SortOrder[0]); + Assertions.assertEquals(tableName, tableWithoutPartitionAndSortOrder.name()); + Assertions.assertEquals(Distributions.NONE, tableWithoutPartitionAndSortOrder.distribution()); + + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(Distributions.NONE, loadTable.distribution()); + } + @Test void testCreateAndLoadIcebergTable() { // Create table from Gravitino API @@ -968,9 +1038,9 @@ public void testTableDistribution() { columns, table_comment, properties, - partitioning, + new Transform[0], distribution, - sortOrders); + new SortOrder[0]); Table loadTable = tableCatalog.loadTable(tableIdentifier); @@ -981,8 +1051,8 @@ public void testTableDistribution() { Arrays.asList(columns), properties, distribution, - sortOrders, - partitioning, + new SortOrder[0], + new Transform[0], loadTable); Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier)); @@ -1179,7 +1249,7 @@ public void testTableSortOrder() { Column[] columns = createColumns(); NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); - Distribution distribution = Distributions.NONE; + Distribution distribution = Distributions.HASH; final SortOrder[] sortOrders = new SortOrder[] { diff --git a/docs/lakehouse-iceberg-catalog.md b/docs/lakehouse-iceberg-catalog.md index 6ad011d7160..f8b462b24e5 100644 --- a/docs/lakehouse-iceberg-catalog.md +++ b/docs/lakehouse-iceberg-catalog.md @@ -220,79 +220,12 @@ For `bucket` and `truncate`, the first argument must be integer literal, and the ### Table distributions -- Gravitino used by default `NoneDistribution`. +- Support `HashDistribution`, which distribute data by partition key. +- Support `RangeDistribution`, which distribute data by partition key or sort key for a SortOrder table. +- Doesn't support `EvenDistribution`. - - - -```json -{ - "strategy": "none", - "number": 0, - "expressions": [] -} -``` - - - - -```java -Distributions.NONE; -``` - - - - -- Support `HashDistribution`, Hash distribute by partition key. - - - - -```json -{ - "strategy": "hash", - "number": 0, - "expressions": [] -} -``` - - - -```java -Distributions.HASH; -``` - - - - -- Support `RangeDistribution`, You can pass `range` as values through the API. Range distribute by partition key or sort key if table has an SortOrder. - - - - -```json -{ - "strategy": "range", - "number": 0, - "expressions": [] -} -``` - - - - -```java -Distributions.RANGE; -``` - - - - -:::info -Iceberg automatically distributes the data according to the partition or table sort order. It is forbidden to specify distribution expressions. -::: :::info -Apache Iceberg doesn't support Gravitino `EvenDistribution` type. +If you doesn't specify distribution expressions, the table distribution will be adjusted to `RangeDistribution` for a sort order table, to `HashDistribution` for a partition table. ::: ### Table column types