From b6b71fb7d6cba834114c0738eefca7e31adb505e Mon Sep 17 00:00:00 2001 From: fanng Date: Mon, 13 Jan 2025 19:16:03 +0800 Subject: [PATCH 1/5] fix comment --- .../iceberg/IcebergCatalogOperations.java | 16 ++++ .../lakehouse/iceberg/IcebergTable.java | 37 +++++---- .../test/CatalogIcebergBaseIT.java | 72 +++++++++++++++++- docs/lakehouse-iceberg-catalog.md | 75 +------------------ 4 files changed, 112 insertions(+), 88 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 7b27438d2e5..5f7396ec37e 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -58,6 +58,7 @@ import org.apache.gravitino.rel.TableCatalog; import org.apache.gravitino.rel.TableChange; import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.distributions.Distributions; import org.apache.gravitino.rel.expressions.sorts.SortOrder; import org.apache.gravitino.rel.expressions.transforms.Transform; import org.apache.gravitino.rel.indexes.Index; @@ -513,6 +514,11 @@ public Table createTable( .build()) .toArray(IcebergColumn[]::new); + if (Distributions.NONE.equals(distribution)) { + distribution = + getIcebergDefaultDistribution(sortOrders.length > 0, partitioning.length > 0); + } + IcebergTable createdTable = IcebergTable.builder() .withName(tableIdent.name()) @@ -588,6 +594,16 @@ public void testConnection( } } + private static Distribution getIcebergDefaultDistribution( + Boolean isSorted, Boolean isPartitioned) { + if (isSorted) { + return Distributions.RANGE; + } else if (isPartitioned) { + return Distributions.HASH; + } + return Distributions.NONE; + } + private static String currentUser() { return PrincipalUtils.getCurrentUserName(); } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 27e3c429e7c..67a5290b7f1 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -131,6 +131,7 @@ String transformDistribution(Distribution distribution) { ArrayUtils.isNotEmpty(partitioning) || ArrayUtils.isNotEmpty(sortOrders), "Iceberg's Distribution Mode.RANGE is distributed based on sortOrder or partition, but both are empty."); return DistributionMode.RANGE.modeName(); + // Gravitino NONE distribution means the client side doesn't specify distribution. case NONE: return DistributionMode.NONE.modeName(); default: @@ -152,21 +153,6 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam Schema schema = table.schema(); Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema); SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder()); - Distribution distribution = Distributions.NONE; - String distributionName = properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE); - if (null != distributionName) { - switch (DistributionMode.fromName(distributionName)) { - case HASH: - distribution = Distributions.HASH; - break; - case RANGE: - distribution = Distributions.RANGE; - break; - default: - // do nothing - break; - } - } IcebergColumn[] icebergColumns = schema.columns().stream().map(ConvertUtil::fromNestedField).toArray(IcebergColumn[]::new); return IcebergTable.builder() @@ -178,7 +164,7 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam .withAuditInfo(AuditInfo.EMPTY) .withPartitioning(partitionSpec) .withSortOrders(sortOrder) - .withDistribution(distribution) + .withDistribution(getDistribution(properties)) .build(); } @@ -236,4 +222,23 @@ protected IcebergTable internalBuild() { public static Builder builder() { return new Builder(); } + + private static Distribution getDistribution(Map properties) { + Distribution distribution = Distributions.NONE; + String distributionName = properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE); + if (null != distributionName) { + switch (DistributionMode.fromName(distributionName)) { + case HASH: + distribution = Distributions.HASH; + break; + case RANGE: + distribution = Distributions.RANGE; + break; + default: + // do nothing + break; + } + } + return distribution; + } } diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java index fd37441b459..2d7090240b4 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java @@ -379,6 +379,76 @@ void testCreateTableWithNullComment() { Assertions.assertNull(loadTable.comment()); } + @Test + void testCreateTableWithNoneDistribution() { + // Create table from Gravitino API + Column[] columns = createColumns(); + + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Distribution distribution = Distributions.NONE; + + final SortOrder[] sortOrders = + new SortOrder[] { + SortOrders.of( + NamedReference.field(ICEBERG_COL_NAME2), + SortDirection.DESCENDING, + NullOrdering.NULLS_FIRST) + }; + + Transform[] partitioning = new Transform[] {Transforms.day(columns[1].name())}; + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table tableWithPartitionAndSortorder = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertEquals(tableName, tableWithPartitionAndSortorder.name()); + Assertions.assertEquals(Distributions.RANGE, tableWithPartitionAndSortorder.distribution()); + + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(Distributions.RANGE, loadTable.distribution()); + tableCatalog.dropTable(tableIdentifier); + + Table tableWithPartition = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + new SortOrder[0]); + Assertions.assertEquals(tableName, tableWithPartition.name()); + Assertions.assertEquals(Distributions.HASH, tableWithPartition.distribution()); + + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(Distributions.HASH, loadTable.distribution()); + tableCatalog.dropTable(tableIdentifier); + + Table tableWithoutPartitionAndSortOrder = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + new Transform[0], + distribution, + new SortOrder[0]); + Assertions.assertEquals(tableName, tableWithoutPartitionAndSortOrder.name()); + Assertions.assertEquals(Distributions.NONE, tableWithoutPartitionAndSortOrder.distribution()); + + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(Distributions.NONE, loadTable.distribution()); + } + @Test void testCreateAndLoadIcebergTable() { // Create table from Gravitino API @@ -1179,7 +1249,7 @@ public void testTableSortOrder() { Column[] columns = createColumns(); NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); - Distribution distribution = Distributions.NONE; + Distribution distribution = Distributions.HASH; final SortOrder[] sortOrders = new SortOrder[] { diff --git a/docs/lakehouse-iceberg-catalog.md b/docs/lakehouse-iceberg-catalog.md index 6ad011d7160..f8b462b24e5 100644 --- a/docs/lakehouse-iceberg-catalog.md +++ b/docs/lakehouse-iceberg-catalog.md @@ -220,79 +220,12 @@ For `bucket` and `truncate`, the first argument must be integer literal, and the ### Table distributions -- Gravitino used by default `NoneDistribution`. +- Support `HashDistribution`, which distribute data by partition key. +- Support `RangeDistribution`, which distribute data by partition key or sort key for a SortOrder table. +- Doesn't support `EvenDistribution`. - - - -```json -{ - "strategy": "none", - "number": 0, - "expressions": [] -} -``` - - - - -```java -Distributions.NONE; -``` - - - - -- Support `HashDistribution`, Hash distribute by partition key. - - - - -```json -{ - "strategy": "hash", - "number": 0, - "expressions": [] -} -``` - - - -```java -Distributions.HASH; -``` - - - - -- Support `RangeDistribution`, You can pass `range` as values through the API. Range distribute by partition key or sort key if table has an SortOrder. - - - - -```json -{ - "strategy": "range", - "number": 0, - "expressions": [] -} -``` - - - - -```java -Distributions.RANGE; -``` - - - - -:::info -Iceberg automatically distributes the data according to the partition or table sort order. It is forbidden to specify distribution expressions. -::: :::info -Apache Iceberg doesn't support Gravitino `EvenDistribution` type. +If you doesn't specify distribution expressions, the table distribution will be adjusted to `RangeDistribution` for a sort order table, to `HashDistribution` for a partition table. ::: ### Table column types From 834d6ef8151b708e3b2e9fb16fe498e5f32f3657 Mon Sep 17 00:00:00 2001 From: fanng Date: Thu, 16 Jan 2025 11:18:02 +0800 Subject: [PATCH 2/5] fix credential vending document --- .../catalog/lakehouse/iceberg/IcebergCatalogOperations.java | 2 ++ .../gravitino/catalog/lakehouse/iceberg/IcebergTable.java | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 5f7396ec37e..0ff16a1eac8 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -514,6 +514,8 @@ public Table createTable( .build()) .toArray(IcebergColumn[]::new); + // Gravitino NONE distribution means the client side doesn't specify distribution not the same + // as Iceberg none distribution. if (Distributions.NONE.equals(distribution)) { distribution = getIcebergDefaultDistribution(sortOrders.length > 0, partitioning.length > 0); diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 67a5290b7f1..3f2f54c1b3e 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -131,7 +131,6 @@ String transformDistribution(Distribution distribution) { ArrayUtils.isNotEmpty(partitioning) || ArrayUtils.isNotEmpty(sortOrders), "Iceberg's Distribution Mode.RANGE is distributed based on sortOrder or partition, but both are empty."); return DistributionMode.RANGE.modeName(); - // Gravitino NONE distribution means the client side doesn't specify distribution. case NONE: return DistributionMode.NONE.modeName(); default: From 8f0fe5e1aea96113816ed3eb230ba5aa8caa27a5 Mon Sep 17 00:00:00 2001 From: fanng Date: Thu, 16 Jan 2025 13:49:59 +0800 Subject: [PATCH 3/5] fix credential vending document --- .../iceberg/integration/test/CatalogIcebergBaseIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java index 2d7090240b4..a796b0c22f3 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java @@ -1038,9 +1038,9 @@ public void testTableDistribution() { columns, table_comment, properties, - partitioning, + new Transform[0], distribution, - sortOrders); + new SortOrder[0]); Table loadTable = tableCatalog.loadTable(tableIdentifier); From 76af844e4cd1455527efff74a163a5c48b587e72 Mon Sep 17 00:00:00 2001 From: fanng Date: Thu, 16 Jan 2025 13:58:00 +0800 Subject: [PATCH 4/5] fix credential vending document --- .../iceberg/integration/test/CatalogIcebergBaseIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java index a796b0c22f3..f0162a6ec88 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java @@ -1051,8 +1051,8 @@ public void testTableDistribution() { Arrays.asList(columns), properties, distribution, - sortOrders, - partitioning, + new SortOrder[0], + new Transform[0], loadTable); Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier)); From 44b3a94028fc10c61543c5ec4df0f48eaa1ec04c Mon Sep 17 00:00:00 2001 From: fanng Date: Thu, 16 Jan 2025 14:36:40 +0800 Subject: [PATCH 5/5] fix credential vending document --- .../catalog/lakehouse/iceberg/IcebergCatalogOperations.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 0ff16a1eac8..aef42044c0c 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -514,8 +514,8 @@ public Table createTable( .build()) .toArray(IcebergColumn[]::new); - // Gravitino NONE distribution means the client side doesn't specify distribution not the same - // as Iceberg none distribution. + // Gravitino NONE distribution means the client side doesn't specify distribution, which is + // not the same as none distribution in Iceberg. if (Distributions.NONE.equals(distribution)) { distribution = getIcebergDefaultDistribution(sortOrders.length > 0, partitioning.length > 0); @@ -597,7 +597,7 @@ public void testConnection( } private static Distribution getIcebergDefaultDistribution( - Boolean isSorted, Boolean isPartitioned) { + boolean isSorted, boolean isPartitioned) { if (isSorted) { return Distributions.RANGE; } else if (isPartitioned) {