Skip to content

feat: validate snapshot write compatibility #1772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

kaushiksrini
Copy link
Contributor

@kaushiksrini kaushiksrini commented Mar 6, 2025

Description

  • This PR checks snapshot write compatibility and validates no conflicting concurrent operations have been written that can clash.
  • Added Snapshot util file that implements ancestors_between and ancestors_of from SnapshotUtil.java
  • Commit conflict resolution and retry as outlined in the spec will be completed in a subsequent PR.

Solves #1678

@kaushiksrini kaushiksrini marked this pull request as draft March 7, 2025 03:53
@Fokko
Copy link
Contributor

Fokko commented Mar 20, 2025

@kaushiksrini can you check the CI? It looks like mypy has some issues:

pyiceberg/table/update/snapshot.py:303: error: Item "None" of "Snapshot | None" has no attribute "snapshot_id"  [union-attr]
pyiceberg/table/update/snapshot.py:306: error: Item "None" of "Summary | None" has no attribute "operation"  [union-attr]

@Fokko
Copy link
Contributor

Fokko commented Mar 20, 2025

Let's add some tests as well:

@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_delete_delete(
    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
    identifier = "default.test_conflict"
    tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
    tbl2 = session_catalog.load_table(identifier)

    tbl1.delete("string == 'z'")

    with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"):
        # tbl2 isn't aware of the commit by tbl1
        tbl2.delete("string == 'z'")


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_delete_append(
    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
    identifier = "default.test_conflict"
    tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
    tbl2 = session_catalog.load_table(identifier)

    # This is allowed
    tbl1.delete("string == 'z'")
    tbl2.append(arrow_table_with_null)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_append_delete(
    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
    identifier = "default.test_conflict"
    tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
    tbl2 = session_catalog.load_table(identifier)

    tbl1.delete("string == 'z'")

    with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"):
        # tbl2 isn't aware of the commit by tbl1
        tbl2.delete("string == 'z'")


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_append_append(
    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
    identifier = "default.test_conflict"
    tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
    tbl2 = session_catalog.load_table(identifier)

    tbl1.append(arrow_table_with_null)
    tbl2.append(arrow_table_with_null)

kaushiksrini and others added 4 commits March 22, 2025 22:23

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Co-authored-by: Fokko Driesprong <[email protected]>
@kaushiksrini kaushiksrini marked this pull request as ready for review March 27, 2025 18:10
Fokko added a commit to Fokko/iceberg-python that referenced this pull request Apr 9, 2025
Today, we have a copy of the `TableMetadata` on the `Table` and
the `Transaction`. This PR changes that logic to re-use the one
on the table, and add the changes to the one on the `Transaction`.

This also allows us to stack changes, for example, to first change
a schema, and then write data with the new schema right away.

Also a prerequisite for apache#1772
Fokko added a commit to Fokko/iceberg-python that referenced this pull request Apr 9, 2025
Today, we have a copy of the `TableMetadata` on the `Table` and
the `Transaction`. This PR changes that logic to re-use the one
on the table, and add the changes to the one on the `Transaction`.

This also allows us to stack changes, for example, to first change
a schema, and then write data with the new schema right away.

Also a prerequisite for apache#1772
Copy link
Collaborator

@sungwy sungwy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kaushiksrini thanks for working on this PR! This is great progress.

Unfortunately, I think the documentation on the Commit Retries is lacking and would benefit from an update. I've left some comments and links that'll hopefully bring the feature closer to the Java implementation

tbl2 = session_catalog.load_table(identifier)

tbl1.append(arrow_table_with_null)
tbl2.append(arrow_table_with_null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we introduce an assertion here to verify the content of the table is as we'd expect? (with 3*arrow_table_with_null data)


# This is allowed
tbl1.delete("string == 'z'")
tbl2.append(arrow_table_with_null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should verify the content of the table here

# Define allowed operations for each type of operation
allowed_operations = {
Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE},
Operation.REPLACE: {Operation.APPEND},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Operation.REPLACE: {Operation.APPEND},
Operation.REPLACE: {},

I think the spec may need a re-review because I think it's inaccurate to say that we only need to verify that the files we are trying to delete are still available when we are executing a REPLACE or DELETE operation.

In Spark, we also validate whether there's been a conflicting appends when we use SERIALIZABLE isolation level:

https://github.com/apache/iceberg/blob/9fc49e187069c7ec2493ac0abf20f73175b3df89/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L368-L374

I think it would be helpful to introduce all three types of isolation levels NONE, SERIALIZABLE and SNAPSHOT, and verify if conflicting appends or deletes have been introduced in the underlying partitions to be aligned with the implementation in Spark

Copy link
Contributor

@Fokko Fokko Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sungwy for jumping in here, and creating the issues 🙌

Indeed, depending on whether we do snapshot or serializable isolation, we should allow for new data (or not). Would you be willing to split out the different levels in a separate PR? It would be nice to get this in so we can start working independently on the subtasks that you created.

I think this one was mostly blocked on #1903

@sungwy
Copy link
Collaborator

sungwy commented Apr 18, 2025

I've created some subtasks on #819 that will help us implement the required validation functions that we can invoke to check that no conflicting commits have been made between two snapshots. @kaushiksrini would you be interested in helping out with some of those implementations?

Fokko added a commit that referenced this pull request Apr 18, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
# Rationale for this change

Today, we have a copy of the `TableMetadata` on the `Table` and the
`Transaction`. This PR changes that logic to re-use the one on the
table, and add the changes to the one on the `Transaction`.

This also allows us to stack changes, for example, to first change a
schema, and then write data with the new schema right away.

Also a prerequisite for
#1772

# Are these changes tested?

Includes a new test :)

# Are there any user-facing changes?

<!-- In the case of user-facing changes, please add the changelog label.
-->

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
@kaushiksrini
Copy link
Contributor Author

Hey @sungwy thanks for the review! Will address the feedback soon - I will also take a look at the subtasks and would like to work on them!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants