-
Notifications
You must be signed in to change notification settings - Fork 268
feat: validate snapshot write compatibility #1772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: validate snapshot write compatibility #1772
Conversation
@kaushiksrini can you check the CI? It looks like
|
Let's add some tests as well: @pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_delete_delete(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_conflict"
tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
tbl2 = session_catalog.load_table(identifier)
tbl1.delete("string == 'z'")
with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"):
# tbl2 isn't aware of the commit by tbl1
tbl2.delete("string == 'z'")
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_delete_append(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_conflict"
tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
tbl2 = session_catalog.load_table(identifier)
# This is allowed
tbl1.delete("string == 'z'")
tbl2.append(arrow_table_with_null)
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_append_delete(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_conflict"
tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
tbl2 = session_catalog.load_table(identifier)
tbl1.delete("string == 'z'")
with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"):
# tbl2 isn't aware of the commit by tbl1
tbl2.delete("string == 'z'")
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_conflict_append_append(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_conflict"
tbl1 = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null])
tbl2 = session_catalog.load_table(identifier)
tbl1.append(arrow_table_with_null)
tbl2.append(arrow_table_with_null) |
Today, we have a copy of the `TableMetadata` on the `Table` and the `Transaction`. This PR changes that logic to re-use the one on the table, and add the changes to the one on the `Transaction`. This also allows us to stack changes, for example, to first change a schema, and then write data with the new schema right away. Also a prerequisite for apache#1772
Today, we have a copy of the `TableMetadata` on the `Table` and the `Transaction`. This PR changes that logic to re-use the one on the table, and add the changes to the one on the `Transaction`. This also allows us to stack changes, for example, to first change a schema, and then write data with the new schema right away. Also a prerequisite for apache#1772
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kaushiksrini thanks for working on this PR! This is great progress.
Unfortunately, I think the documentation on the Commit Retries is lacking and would benefit from an update. I've left some comments and links that'll hopefully bring the feature closer to the Java implementation
tbl2 = session_catalog.load_table(identifier) | ||
|
||
tbl1.append(arrow_table_with_null) | ||
tbl2.append(arrow_table_with_null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we introduce an assertion here to verify the content of the table is as we'd expect? (with 3*arrow_table_with_null data)
|
||
# This is allowed | ||
tbl1.delete("string == 'z'") | ||
tbl2.append(arrow_table_with_null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should verify the content of the table here
# Define allowed operations for each type of operation | ||
allowed_operations = { | ||
Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE}, | ||
Operation.REPLACE: {Operation.APPEND}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Operation.REPLACE: {Operation.APPEND}, | |
Operation.REPLACE: {}, |
I think the spec may need a re-review because I think it's inaccurate to say that we only need to verify that the files we are trying to delete are still available when we are executing a REPLACE
or DELETE
operation.
In Spark, we also validate whether there's been a conflicting appends when we use SERIALIZABLE
isolation level:
I think it would be helpful to introduce all three types of isolation levels NONE
, SERIALIZABLE
and SNAPSHOT
, and verify if conflicting appends or deletes have been introduced in the underlying partitions to be aligned with the implementation in Spark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sungwy for jumping in here, and creating the issues 🙌
Indeed, depending on whether we do snapshot or serializable isolation, we should allow for new data (or not). Would you be willing to split out the different levels in a separate PR? It would be nice to get this in so we can start working independently on the subtasks that you created.
I think this one was mostly blocked on #1903
I've created some subtasks on #819 that will help us implement the required validation functions that we can invoke to check that no conflicting commits have been made between two snapshots. @kaushiksrini would you be interested in helping out with some of those implementations? |
# Rationale for this change Today, we have a copy of the `TableMetadata` on the `Table` and the `Transaction`. This PR changes that logic to re-use the one on the table, and add the changes to the one on the `Transaction`. This also allows us to stack changes, for example, to first change a schema, and then write data with the new schema right away. Also a prerequisite for #1772 # Are these changes tested? Includes a new test :) # Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. -->
Hey @sungwy thanks for the review! Will address the feedback soon - I will also take a look at the subtasks and would like to work on them! |
Description
ancestors_between
andancestors_of
from SnapshotUtil.javaSolves #1678