Skip to content

Add Support for Dynamic Overwrite #931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Dec 19, 2024

Conversation

jqin61
Copy link
Contributor

@jqin61 jqin61 commented Jul 15, 2024

Added support for dynamic overwrite leveraging delete and fast-append(counterpart in Iceberg Spark).

Several follow-ups:

  • to support current spec with transformed fields. Should be easy but due to the number of transforms, this takes some time. Will add them bit by bit in follow-up prs.
  • could consider whether to raise userwarning when no delete is executed. Because from prespectives of users of dynamic overwrite, they should not worry about whether it is an pure append or a partition replacement.

Closes #1287

@sungwy sungwy requested review from Fokko and sungwy July 15, 2024 19:35
Copy link
Collaborator

@sungwy sungwy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jqin61 this PR is looking great. I left a few nit suggestions and a few pointers to incorporate a new features like merge_append

@@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
for data_file in data_files:
append_files.append_data_file(data_file)

def _build_partition_predicate(self, spec_id: int, delete_partitions: List[Record]) -> BooleanExpression:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I found the delete_partitions argument a bit confusing here, because this function just translates a set of partition record values to its corresponding predicate. Could we rename it to something more generic to indicate that? W should also remove spec_id which isn't used in this function

Suggested change
def _build_partition_predicate(self, spec_id: int, delete_partitions: List[Record]) -> BooleanExpression:
def _build_partition_predicate(self, partition_records: List[Record]) -> BooleanExpression:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, thanks!

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jqin61 Good seeing you here again! 🙌 I'll do a more in-depth review tomorrow morning. Could you also document this in the docs under mkdocs/? Otherwise folks won't be able to find this awesome feature 👍

@@ -502,6 +503,73 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
for data_file in data_files:
append_files.append_data_file(data_file)

def _build_partition_predicate(self, partition_records: List[Record]) -> BooleanExpression:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add two tests where:

  • We start with an unpartitioned table, write some data, and then evolve to a partition.
  • Start with a Monthly partitioned table, insert data for a few days, convert the partition to a daily partition, and dynamically overwrite a single day.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough review! So if the table is firstly unpartitioned and then evolved into a partition. I think the expected bahavior is that dynamic overwrite will also delete (potentially through overwrite) data in the unpartitioned files?

@Fokko
Copy link
Contributor

Fokko commented Jul 17, 2024

Left some more comments @jqin61, thanks for working on this 👍

@sungwy sungwy mentioned this pull request Aug 1, 2024
@Fokko
Copy link
Contributor

Fokko commented Aug 7, 2024

@jqin61 Sorry for the slow review, I was doing some other stuff as well. Can you fix the merge conflicts? I think this looks good to go 👍

@jqin61
Copy link
Contributor Author

jqin61 commented Aug 7, 2024

@jqin61 Sorry for the slow review, I was doing some other stuff as well. Can you fix the merge conflicts? I think this looks good to go 👍

Thank you Fokko! Sorry for the delay, I was extremely busy recently, I will get some time next weekend to fix the comments, add tests and fix the documentation. I will also move the transform support out of the scope of this pr due to its complexity, will send you details about it soon.

@jqin61 jqin61 requested a review from sungwy September 18, 2024 00:18
Copy link
Collaborator

@sungwy sungwy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jqin61 - this looks good to me. I've added some nit suggestions to the documentation.

Thank you again for working on this amazing feature!

@@ -353,6 +353,127 @@ lat: [[52.371807,37.773972,53.11254],[53.21917]]
long: [[4.896029,-122.431297,6.0989],[6.56667]]
```

### Partial overwrites

You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data.
When using the `overwrite` API, you can use an `overwrite_filter` to delete data that that matches the filter before appending new data into the table.

tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris"))
```

This results in such data if data is printed by `tbl.scan().to_arrow()`:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This results in such data if data is printed by `tbl.scan().to_arrow()`:
This produces the following result with `tbl.scan().to_arrow()`:

long: [[74.006],[4.896029,6.0989,2.349014]]
```

If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically.
If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the existing partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically from the provided arrow table.

```

If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically.
To try out it, you could firstly create a same PyIceberg table with partition specified on `"city"` field:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
To try out it, you could firstly create a same PyIceberg table with partition specified on `"city"` field:
For example, with an iceberg table with a partition specified on `"city"` field:

)
```

And then suppose the data for the partition of `"paris"` is wrong:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
And then suppose the data for the partition of `"paris"` is wrong:
And we want to overwrite the data for the partition of `"Paris"`:

tbl.append(df)
```

Then you could use dynamic overwrite on this partition:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Then you could use dynamic overwrite on this partition:
Then we can call `dynamic_partition_overwrite` with this arrow table:

tbl.dynamic_partition_overwrite(df_corrected)
```

This results in such data if data is printed by `tbl.scan().to_arrow()`:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This results in such data if data is printed by `tbl.scan().to_arrow()`:
This produces the following result with `tbl.scan().to_arrow()`:

Comment on lines 487 to 489
The function detects partition values in the provided arrow table that using the current table
partition spec, and deletes existing partitions matching these values. Finally, the
data in the table is appended to the table.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The function detects partition values in the provided arrow table that using the current table
partition spec, and deletes existing partitions matching these values. Finally, the
data in the table is appended to the table.
The function detects partition values in the provided arrow table using the current
partition spec, and deletes existing partitions matching these values. Finally, the
data in the table is appended to the table.

Copy link
Contributor Author

@jqin61 jqin61 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sungwy Thank you for the detailed wording enhancements guidance. I updated the docs. Pls re-review when you get a chance

@sungwy
Copy link
Collaborator

sungwy commented Sep 20, 2024

Thank you for making this contribution @jqin61 ! I'll leave this PR open for another review, especially given that it introduces a new table commit API

@sungwy
Copy link
Collaborator

sungwy commented Sep 24, 2024

Hi @Fokko - this PR looks good from my end.

Would you have some time to take a look? Since this is a new API (which comes with another level of caution), I'd love to get your review before we merge in @jqin61 's awesome work

@Fokko Fokko self-requested a review November 4, 2024 18:20
@Fokko
Copy link
Contributor

Fokko commented Nov 4, 2024

@jqin61 @sungwy Sorry for leaving this hanging, I'll do a review first thing tomorrow 👍

@Fokko Fokko mentioned this pull request Nov 4, 2024
lat: double
long: double
----
city: [["New York"],["Amsterdam","Drachten","Paris"]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this example is correct. Paris should have been overwritten, right? It looks like we lost San Fran'.

@@ -456,6 +461,89 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
for data_file in data_files:
append_files.append_data_file(data_file)

def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanExpression:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a little doc here describing what the function does?

tbl = session_catalog.create_table(
identifier=identifier,
schema=TABLE_SCHEMA,
properties={"format-version": "2"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're testing the same thing twice :)

Suggested change
properties={"format-version": "2"},
properties={"format-version": str(format_version)},

# expecting 3 files:
rows = spark.sql(f"select partition from {identifier}.files").collect()
assert len(rows) == 3

Copy link
Contributor

@Fokko Fokko Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is also a good test to have:

@pytest.mark.integration
@pytest.mark.parametrize(
    "format_version",
    [1, 2],
)
def test_dynamic_partition_overwrite_rename_column(
    spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
    arrow_table = pa.Table.from_pydict(
        {
            "place": ["Amsterdam", "Drachten"],
            "inhabitants": [921402, 44940],
        },
    )

    identifier = f"default.partitioned_{format_version}_dynamic_partition_overwrite_rename_column"
    try:
        session_catalog.drop_table(identifier)
    except:
        pass

    tbl = session_catalog.create_table(
        identifier= identifier,
        schema=arrow_table.schema,
        properties={"format-version": str(format_version)},
    )


    with tbl.transaction() as tx:
        with tx.update_spec() as schema:
            schema.add_identity("place")

    tbl.append(arrow_table)

    with tbl.transaction() as tx:
        with tx.update_schema() as schema:
            schema.rename_column("place", "city")

    arrow_table = pa.Table.from_pydict(
        {
            "city": ["Drachten"],
            "inhabitants": [44941],  # A new baby was born!
        },
    )

    tbl.dynamic_partition_overwrite(arrow_table)
    result = tbl.scan().to_arrow()

    assert result['city'].to_pylist() == ['Drachten', 'Amsterdam']
    assert result['inhabitants'].to_pylist() == [44941, 921402]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pytest.mark.integration
@pytest.mark.parametrize(
    "format_version",
    [1, 2],
)
@pytest.mark.filterwarnings("ignore")
def test_dynamic_partition_overwrite_evolve_partition(
    spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
    arrow_table = pa.Table.from_pydict(
        {
            "place": ["Amsterdam", "Drachten"],
            "inhabitants": [921402, 44940],
        },
    )

    identifier = f"default.partitioned_{format_version}_test_dynamic_partition_overwrite_evolve_partition"
    try:
        session_catalog.drop_table(identifier)
    except:
        pass

    tbl = session_catalog.create_table(
        identifier=identifier,
        schema=arrow_table.schema,
        properties={"format-version": str(format_version)},
    )


    with tbl.transaction() as tx:
        with tx.update_spec() as schema:
            schema.add_identity("place")

    tbl.append(arrow_table)

    with tbl.transaction() as tx:
        with tx.update_schema() as schema:
            schema.add_column("country", StringType())
        with tx.update_spec() as schema:
            schema.add_identity("country")

    arrow_table = pa.Table.from_pydict(
        {
            "place": ["Groningen"],
            "country": ["Netherlands"],
            "inhabitants": [238147],
        },
    )

    tbl.dynamic_partition_overwrite(arrow_table)
    result = tbl.scan().to_arrow()

    assert result['place'].to_pylist() == ['Groningen', 'Amsterdam', 'Drachten']
    assert result['inhabitants'].to_pylist() == [238147, 921402, 44940]

Comment on lines 534 to 540
manifest_merge_enabled = property_as_bool(
self.table_metadata.properties,
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
)
update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is duplicated below as well, maybe move it into a function?

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small stuff, but apart from that it looks good to me 👍 Thanks for working on this, and sorry for the long wait

@Fokko
Copy link
Contributor

Fokko commented Nov 19, 2024

@jqin61 Do you have time to follow up on the last few comments? Would be great to get this in 👍

@jqin61
Copy link
Contributor Author

jqin61 commented Dec 10, 2024

@Fokko @sungwy Thank you for the review and the suggestions! I fixed the latest comments and let's rerun CI and merge it if looks good to you.

@jqin61
Copy link
Contributor Author

jqin61 commented Dec 11, 2024

Thanks for fixing the CI, shall we rerun and merge? @Fokko Thank you!

Copy link
Collaborator

@sungwy sungwy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jqin61 again for contributing this feature!

@sungwy sungwy merged commit 952d7c0 into apache:main Dec 19, 2024
8 checks passed
sungwy pushed a commit to sungwy/iceberg-python that referenced this pull request Dec 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support dynamic overwrite
3 participants