-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Iceberg merge, update, delete, for tables with equality deletes #24062
base: master
Are you sure you want to change the base?
Conversation
b87430a
to
a080a7a
Compare
a080a7a
to
dddde02
Compare
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java
Show resolved
Hide resolved
public RowBlock apply(Block rowPosition) | ||
{ | ||
return RowBlock.fromFieldBlocks(rowPosition.getPositionCount(), new Block[] { | ||
RunLengthEncodedBlock.create(filePath, rowPosition.getPositionCount()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pre-existing
I am wondering whether the components (_file, _pos, partition_spec_id, partition_data)
can be actually considered the row identifier.
With add_files we can add exactly the same file twice in an Iceberg table.
When doing the sequence of operations:
- add_file('myfile')
- add equality delete affecting the file 'myfile'
- add_file('myfile')
The second addition of 'myfile' is affected by the previous equality delete applied on 'myfile'
cc @ebyhr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems like a bug in add_files. IMO, we should disallow that as I would guess there are other things that depend on files being unique.
@@ -1149,19 +1091,14 @@ else if (column.isFileModifiedTimeColumn()) { | |||
constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY))); | |||
} | |||
// For delete | |||
else if (column.isRowPositionColumn()) { | |||
else if (column.isMergeRowIdColumn() || column.isRowPositionColumn()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i find this mixing a bit misleading to read.
(unfortunately i don't have a better suggestion)
maybe similarly to what is done for parquet and orc with a duplicated if
else if (column.isMergeRowIdColumn()) {
// The merge $row_id is a composite of the row position and constant file information. The final value is assembled in IcebergPageSource
pageSourceBuilder.addRowIndexColumn();
}
else if (column.isRowPositionColumn()) {
pageSourceBuilder.addRowIndexColumn();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't worry about it . I am mostly rewriting this class in this PR
@@ -880,7 +880,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( | |||
|
|||
List<org.apache.parquet.schema.Type> parquetFields = readBaseColumns.stream() | |||
.map(column -> parquetIdToField.get(column.getId())) | |||
.collect(toList()); | |||
.toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.toList(); | |
.collect(toImmutableList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can't use immutable list because this list contains nulls.
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0"); | ||
writeEqualityDeleteToNationTable(icebergTable); | ||
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "1"); | ||
assertUpdate("UPDATE " + tableName + " SET comment = 'test'", 20); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirming here that the test reproduces the fixed problem.
Stacktrace of the test without the other changes in this PR:
io.trino.testing.QueryFailedException: Multiple entries with same key: 2147483645=$row_id._pos and 2147483645=_pos
at io.trino.testing.AbstractTestingTrinoClient.execute(AbstractTestingTrinoClient.java:138)
at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:565)
at io.trino.testing.DistributedQueryRunner.executeWithPlan(DistributedQueryRunner.java:554)
at io.trino.testing.QueryAssertions.assertDistributedUpdate(QueryAssertions.java:106)
at io.trino.testing.QueryAssertions.assertUpdate(QueryAssertions.java:60)
at io.trino.testing.AbstractTestQueryFramework.assertUpdate(AbstractTestQueryFramework.java:420)
at io.trino.testing.AbstractTestQueryFramework.assertUpdate(AbstractTestQueryFramework.java:415)
at io.trino.plugin.iceberg.TestIcebergV2.testUpdateAfterEqualityDelete(TestIcebergV2.java:1439)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1458)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2034)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:189)
Suppressed: java.lang.Exception: SQL: UPDATE test_update_after_equality_delete_y36815gpdg SET comment = 'test'
at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:572)
... 11 more
Caused by: io.trino.spi.TrinoException: Multiple entries with same key: 2147483645=$row_id._pos and 2147483645=_pos
at io.trino.plugin.iceberg.IcebergPageSource.getNextPage(IcebergPageSource.java:147)
at io.trino.operator.ScanFilterAndProjectOperator$ConnectorPageSourceToPages.process(ScanFilterAndProjectOperator.java:381)
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
Show resolved
Hide resolved
Update row id was replaced with merge row id
Rewrite the creation of merge row id to avoid duplicate key exception. This also simplifies and consolidates the merge row id code.
dddde02
to
89482d6
Compare
89482d6
to
7a06693
Compare
public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 2; | ||
public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 3; | ||
public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 1; | ||
public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 2; | ||
|
||
public static final String DATA_CHANGE_TYPE_NAME = "_change_type"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pre-existing / nit : it's not clear to me why did we have the gap between Integer.MIN_VALUE + 3
and Integer.MIN_VALUE + 5
Description
Fixes #15952
Superseeds #16216
Release notes
(x) Release notes are required, with the following suggested text: