Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Iceberg merge, update, delete, for tables with equality deletes #24062

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

dain
Copy link
Member

@dain dain commented Nov 7, 2024

Description

  • Rewrite the creation of merge row id to avoid duplicate key exception.
  • Simplify by having data source produce row position for row id and then wrappering this block into a row id in IcebergPageSource
  • Remove unused update row id. This should have be removed during convertion to merge row ids.

Fixes #15952
Superseeds #16216

Release notes

(x) Release notes are required, with the following suggested text:

## Section
* Fix Iceberg merge, update, delete, for tables with equality deletes. ({issue}`15952`)

@cla-bot cla-bot bot added the cla-signed label Nov 7, 2024
@github-actions github-actions bot added the iceberg Iceberg connector label Nov 7, 2024
@dain dain force-pushed the update-after-equality-delete branch 2 times, most recently from b87430a to a080a7a Compare November 7, 2024 21:54
@dain dain force-pushed the update-after-equality-delete branch from a080a7a to dddde02 Compare November 7, 2024 22:54
public RowBlock apply(Block rowPosition)
{
return RowBlock.fromFieldBlocks(rowPosition.getPositionCount(), new Block[] {
RunLengthEncodedBlock.create(filePath, rowPosition.getPositionCount()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-existing

I am wondering whether the components (_file, _pos, partition_spec_id, partition_data) can be actually considered the row identifier.

With add_files we can add exactly the same file twice in an Iceberg table.

When doing the sequence of operations:

  • add_file('myfile')
  • add equality delete affecting the file 'myfile'
  • add_file('myfile')

The second addition of 'myfile' is affected by the previous equality delete applied on 'myfile'

cc @ebyhr

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like a bug in add_files. IMO, we should disallow that as I would guess there are other things that depend on files being unique.

@@ -1149,19 +1091,14 @@ else if (column.isFileModifiedTimeColumn()) {
constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY)));
}
// For delete
else if (column.isRowPositionColumn()) {
else if (column.isMergeRowIdColumn() || column.isRowPositionColumn()) {
Copy link
Contributor

@findinpath findinpath Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i find this mixing a bit misleading to read.
(unfortunately i don't have a better suggestion)
maybe similarly to what is done for parquet and orc with a duplicated if

else if (column.isMergeRowIdColumn()) {
                    // The merge $row_id is a composite of the row position and constant file information. The final value is assembled in IcebergPageSource
                    pageSourceBuilder.addRowIndexColumn();
                }
                else if (column.isRowPositionColumn()) {
                    pageSourceBuilder.addRowIndexColumn();
                }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't worry about it . I am mostly rewriting this class in this PR

@@ -880,7 +880,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource(

List<org.apache.parquet.schema.Type> parquetFields = readBaseColumns.stream()
.map(column -> parquetIdToField.get(column.getId()))
.collect(toList());
.toList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.toList();
.collect(toImmutableList());

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't use immutable list because this list contains nulls.

assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0");
writeEqualityDeleteToNationTable(icebergTable);
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "1");
assertUpdate("UPDATE " + tableName + " SET comment = 'test'", 20);
Copy link
Contributor

@findinpath findinpath Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirming here that the test reproduces the fixed problem.

Stacktrace of the test without the other changes in this PR:

io.trino.testing.QueryFailedException: Multiple entries with same key: 2147483645=$row_id._pos and 2147483645=_pos

	at io.trino.testing.AbstractTestingTrinoClient.execute(AbstractTestingTrinoClient.java:138)
	at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:565)
	at io.trino.testing.DistributedQueryRunner.executeWithPlan(DistributedQueryRunner.java:554)
	at io.trino.testing.QueryAssertions.assertDistributedUpdate(QueryAssertions.java:106)
	at io.trino.testing.QueryAssertions.assertUpdate(QueryAssertions.java:60)
	at io.trino.testing.AbstractTestQueryFramework.assertUpdate(AbstractTestQueryFramework.java:420)
	at io.trino.testing.AbstractTestQueryFramework.assertUpdate(AbstractTestQueryFramework.java:415)
	at io.trino.plugin.iceberg.TestIcebergV2.testUpdateAfterEqualityDelete(TestIcebergV2.java:1439)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1458)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2034)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:189)
	Suppressed: java.lang.Exception: SQL: UPDATE test_update_after_equality_delete_y36815gpdg SET comment = 'test'
		at io.trino.testing.DistributedQueryRunner.executeInternal(DistributedQueryRunner.java:572)
		... 11 more
Caused by: io.trino.spi.TrinoException: Multiple entries with same key: 2147483645=$row_id._pos and 2147483645=_pos
	at io.trino.plugin.iceberg.IcebergPageSource.getNextPage(IcebergPageSource.java:147)
	at io.trino.operator.ScanFilterAndProjectOperator$ConnectorPageSourceToPages.process(ScanFilterAndProjectOperator.java:381)

Update row id was replaced with merge row id
Rewrite the creation of merge row id to avoid duplicate key exception.
This also simplifies and consolidates the merge row id code.
@dain dain force-pushed the update-after-equality-delete branch from dddde02 to 89482d6 Compare November 9, 2024 21:27
public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 2;
public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 3;
public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 1;
public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 2;

public static final String DATA_CHANGE_TYPE_NAME = "_change_type";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-existing / nit : it's not clear to me why did we have the gap between Integer.MIN_VALUE + 3 and Integer.MIN_VALUE + 5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed iceberg Iceberg connector
Development

Successfully merging this pull request may close these issues.

UPDATE failed in Iceberg: Multiple entries with same key: 3=$row_id.file_record_count
2 participants