Skip to content

Commit

Permalink
[#6196] feat(iceberg): adjust table distribution if creating table wi…
Browse files Browse the repository at this point in the history
…thout specifying disribution mode (#6214)

### What changes were proposed in this pull request?

Adjust the distribution mode for creating Iceberg table with none
distribution. the following is the Spark adjust logic, the flink is
similar.

```java
  private DistributionMode defaultWriteDistributionMode() {
    if (table.sortOrder().isSorted()) {
      return RANGE;
    } else if (table.spec().isPartitioned()) {
      return HASH;
    } else {
      return NONE;
    }
  }
```

### Why are the changes needed?

Fix: #6196 

### Does this PR introduce _any_ user-facing change?

Yes, add document

### How was this patch tested?

add UT and IT
  • Loading branch information
FANNG1 authored and web-flow committed Jan 16, 2025
1 parent 01ec8c4 commit 8014184
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
Expand Down Expand Up @@ -513,6 +514,13 @@ public Table createTable(
.build())
.toArray(IcebergColumn[]::new);

// Gravitino NONE distribution means the client side doesn't specify distribution, which is
// not the same as none distribution in Iceberg.
if (Distributions.NONE.equals(distribution)) {
distribution =
getIcebergDefaultDistribution(sortOrders.length > 0, partitioning.length > 0);
}

IcebergTable createdTable =
IcebergTable.builder()
.withName(tableIdent.name())
Expand Down Expand Up @@ -588,6 +596,16 @@ public void testConnection(
}
}

private static Distribution getIcebergDefaultDistribution(
boolean isSorted, boolean isPartitioned) {
if (isSorted) {
return Distributions.RANGE;
} else if (isPartitioned) {
return Distributions.HASH;
}
return Distributions.NONE;
}

private static String currentUser() {
return PrincipalUtils.getCurrentUserName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,6 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam
Schema schema = table.schema();
Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema);
SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder());
Distribution distribution = Distributions.NONE;
String distributionName = properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE);
if (null != distributionName) {
switch (DistributionMode.fromName(distributionName)) {
case HASH:
distribution = Distributions.HASH;
break;
case RANGE:
distribution = Distributions.RANGE;
break;
default:
// do nothing
break;
}
}
IcebergColumn[] icebergColumns =
schema.columns().stream().map(ConvertUtil::fromNestedField).toArray(IcebergColumn[]::new);
return IcebergTable.builder()
Expand All @@ -178,7 +163,7 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam
.withAuditInfo(AuditInfo.EMPTY)
.withPartitioning(partitionSpec)
.withSortOrders(sortOrder)
.withDistribution(distribution)
.withDistribution(getDistribution(properties))
.build();
}

Expand Down Expand Up @@ -236,4 +221,23 @@ protected IcebergTable internalBuild() {
public static Builder builder() {
return new Builder();
}

private static Distribution getDistribution(Map<String, String> properties) {
Distribution distribution = Distributions.NONE;
String distributionName = properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE);
if (null != distributionName) {
switch (DistributionMode.fromName(distributionName)) {
case HASH:
distribution = Distributions.HASH;
break;
case RANGE:
distribution = Distributions.RANGE;
break;
default:
// do nothing
break;
}
}
return distribution;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,76 @@ void testCreateTableWithNullComment() {
Assertions.assertNull(loadTable.comment());
}

@Test
void testCreateTableWithNoneDistribution() {
// Create table from Gravitino API
Column[] columns = createColumns();

NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
Distribution distribution = Distributions.NONE;

final SortOrder[] sortOrders =
new SortOrder[] {
SortOrders.of(
NamedReference.field(ICEBERG_COL_NAME2),
SortDirection.DESCENDING,
NullOrdering.NULLS_FIRST)
};

Transform[] partitioning = new Transform[] {Transforms.day(columns[1].name())};
Map<String, String> properties = createProperties();
TableCatalog tableCatalog = catalog.asTableCatalog();
Table tableWithPartitionAndSortorder =
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
sortOrders);
Assertions.assertEquals(tableName, tableWithPartitionAndSortorder.name());
Assertions.assertEquals(Distributions.RANGE, tableWithPartitionAndSortorder.distribution());

Table loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals(tableName, loadTable.name());
Assertions.assertEquals(Distributions.RANGE, loadTable.distribution());
tableCatalog.dropTable(tableIdentifier);

Table tableWithPartition =
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
new SortOrder[0]);
Assertions.assertEquals(tableName, tableWithPartition.name());
Assertions.assertEquals(Distributions.HASH, tableWithPartition.distribution());

loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals(tableName, loadTable.name());
Assertions.assertEquals(Distributions.HASH, loadTable.distribution());
tableCatalog.dropTable(tableIdentifier);

Table tableWithoutPartitionAndSortOrder =
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
new Transform[0],
distribution,
new SortOrder[0]);
Assertions.assertEquals(tableName, tableWithoutPartitionAndSortOrder.name());
Assertions.assertEquals(Distributions.NONE, tableWithoutPartitionAndSortOrder.distribution());

loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals(tableName, loadTable.name());
Assertions.assertEquals(Distributions.NONE, loadTable.distribution());
}

@Test
void testCreateAndLoadIcebergTable() {
// Create table from Gravitino API
Expand Down Expand Up @@ -968,9 +1038,9 @@ public void testTableDistribution() {
columns,
table_comment,
properties,
partitioning,
new Transform[0],
distribution,
sortOrders);
new SortOrder[0]);

Table loadTable = tableCatalog.loadTable(tableIdentifier);

Expand All @@ -981,8 +1051,8 @@ public void testTableDistribution() {
Arrays.asList(columns),
properties,
distribution,
sortOrders,
partitioning,
new SortOrder[0],
new Transform[0],
loadTable);

Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier));
Expand Down Expand Up @@ -1179,7 +1249,7 @@ public void testTableSortOrder() {
Column[] columns = createColumns();

NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
Distribution distribution = Distributions.NONE;
Distribution distribution = Distributions.HASH;

final SortOrder[] sortOrders =
new SortOrder[] {
Expand Down
75 changes: 4 additions & 71 deletions docs/lakehouse-iceberg-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,79 +220,12 @@ For `bucket` and `truncate`, the first argument must be integer literal, and the

### Table distributions

- Gravitino used by default `NoneDistribution`.
- Support `HashDistribution`, which distribute data by partition key.
- Support `RangeDistribution`, which distribute data by partition key or sort key for a SortOrder table.
- Doesn't support `EvenDistribution`.

<Tabs groupId='language' queryString>
<TabItem value="json" label="JSON">

```json
{
"strategy": "none",
"number": 0,
"expressions": []
}
```

</TabItem>
<TabItem value="java" label="Java">

```java
Distributions.NONE;
```

</TabItem>
</Tabs>

- Support `HashDistribution`, Hash distribute by partition key.

<Tabs groupId='language' queryString>
<TabItem value="json" label="JSON">

```json
{
"strategy": "hash",
"number": 0,
"expressions": []
}
```
</TabItem>
<TabItem value="java" label="Java">

```java
Distributions.HASH;
```

</TabItem>
</Tabs>

- Support `RangeDistribution`, You can pass `range` as values through the API. Range distribute by partition key or sort key if table has an SortOrder.

<Tabs groupId='language' queryString>
<TabItem value="json" label="JSON">

```json
{
"strategy": "range",
"number": 0,
"expressions": []
}
```

</TabItem>
<TabItem value="java" label="Java">

```java
Distributions.RANGE;
```

</TabItem>
</Tabs>

:::info
Iceberg automatically distributes the data according to the partition or table sort order. It is forbidden to specify distribution expressions.
:::
:::info
Apache Iceberg doesn't support Gravitino `EvenDistribution` type.
If you doesn't specify distribution expressions, the table distribution will be adjusted to `RangeDistribution` for a sort order table, to `HashDistribution` for a partition table.
:::

### Table column types
Expand Down

0 comments on commit 8014184

Please sign in to comment.