Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] [PySpark] Delta Table Schema Evolves Even When MERGE Condition Fails #3706

Open
3 of 8 tasks
caldempsey opened this issue Sep 22, 2024 · 0 comments
Open
3 of 8 tasks
Labels
bug Something isn't working

Comments

@caldempsey
Copy link

caldempsey commented Sep 22, 2024

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (PySpark)

Describe the problem

When the condition to MERGE a delta table fails, the schema of that Delta Table still gets updated.

Steps to reproduce

Create a new Delta Table. Perform an upsert over an evolving schema as follows:

 # Perform the merge operation
        delta_table.alias("current").merge(
            labels.alias("new"), "current.file_id = new.file_id"
        ).whenMatchedUpdate(
            condition="current.ingest_id < new.ingest_id",
            set={
                "ingest_id": "new.ingest_id",
                # Add new column updates here, including any evolving schema columns
                **{
                    col: f"new.{col}" for col in labels.columns if col not in ["file_id", "ingest_id"] # Dynamic upsert 
                },
            },
        ).whenNotMatchedInsert(
            values={
                "file_id": "new.file_id",
                "ingest_id": "new.ingest_id",
                **{
                    col: f"new.{col}" for col in labels.columns if col not in ["file_id", "ingest_id"] # Dynamic upsert 
                },
            }
        ).execute()

Run a simple test like this:

def test_no_update_when_ingest_id_is_not_greater(spark_session, delta_table, valid_data_with_column_a):
    """
    Given a file_id initially inserted with column_a
    When upserting the same file_id with a lower or equal ingest_id and new column_b (so failing the Delta Table condition)
    Then the row should not be updated, and column_b should not be added.
    """
    # Insert row with column_a
    df_column_a = spark_session.createDataFrame(valid_data_with_column_a)
    delta_table.upsert_labels(df_column_a)

    # Create a new row with the same file_id and lower/equal ingest_id, but with column_b
    updated_data = [
        Row(
            file_id=valid_data_with_column_a[0].file_id,
            ingest_id=valid_data_with_column_a[0].ingest_id,  # Same ingest_id
            column_b="value_b"
        )
    ]
    df_updated = spark_session.createDataFrame(updated_data)

    # Upsert the updated data (should not update due to same ingest_id)
    delta_table.upsert_labels(df_updated)

    # Validate the row was not updated, and column_b was not added
    result_df = delta_table.toDF()
    row = result_df.filter(result_df.file_id == updated_data[0].file_id).first()
    result_df.show(truncate=False)
    assert row.column_a == "value_a"  # Column_a should remain the same
    assert "column_b" not in row.asDict()  # Column_b should not be added

Observed results

Delta Lake extends the schema even though the condition to update the table fails (which is intended to be sortable to demonstrate this is a potential issue, I used ksuids):

image

Expected results

When whenNotMatchedInsert is not triggered I would expect column_b not to be inserted into the table. I believe in the condition analysis step the schema is being evolved.

Environment information

  • Delta Lake version: 3.1.0
  • Spark version: 3.5.1
  • Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@caldempsey caldempsey added the bug Something isn't working label Sep 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant