-
Notifications
You must be signed in to change notification settings - Fork 288
Add Support for Dynamic Overwrite #931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Support for Dynamic Overwrite #931
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jqin61 this PR is looking great. I left a few nit suggestions and a few pointers to incorporate a new features like merge_append
pyiceberg/table/__init__.py
Outdated
@@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) | |||
for data_file in data_files: | |||
append_files.append_data_file(data_file) | |||
|
|||
def _build_partition_predicate(self, spec_id: int, delete_partitions: List[Record]) -> BooleanExpression: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I found the delete_partitions
argument a bit confusing here, because this function just translates a set of partition record values to its corresponding predicate. Could we rename it to something more generic to indicate that? W should also remove spec_id
which isn't used in this function
def _build_partition_predicate(self, spec_id: int, delete_partitions: List[Record]) -> BooleanExpression: | |
def _build_partition_predicate(self, partition_records: List[Record]) -> BooleanExpression: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jqin61 Good seeing you here again! 🙌 I'll do a more in-depth review tomorrow morning. Could you also document this in the docs under mkdocs/
? Otherwise folks won't be able to find this awesome feature 👍
pyiceberg/table/__init__.py
Outdated
@@ -502,6 +503,73 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) | |||
for data_file in data_files: | |||
append_files.append_data_file(data_file) | |||
|
|||
def _build_partition_predicate(self, partition_records: List[Record]) -> BooleanExpression: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add two tests where:
- We start with an unpartitioned table, write some data, and then evolve to a partition.
- Start with a Monthly partitioned table, insert data for a few days, convert the partition to a daily partition, and dynamically overwrite a single day.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the thorough review! So if the table is firstly unpartitioned and then evolved into a partition. I think the expected bahavior is that dynamic overwrite will also delete (potentially through overwrite) data in the unpartitioned files?
Left some more comments @jqin61, thanks for working on this 👍 |
@jqin61 Sorry for the slow review, I was doing some other stuff as well. Can you fix the merge conflicts? I think this looks good to go 👍 |
Thank you Fokko! Sorry for the delay, I was extremely busy recently, I will get some time next weekend to fix the comments, add tests and fix the documentation. I will also move the transform support out of the scope of this pr due to its complexity, will send you details about it soon. |
…pyarrow_schema_compatible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jqin61 - this looks good to me. I've added some nit suggestions to the documentation.
Thank you again for working on this amazing feature!
mkdocs/docs/api.md
Outdated
@@ -353,6 +353,127 @@ lat: [[52.371807,37.773972,53.11254],[53.21917]] | |||
long: [[4.896029,-122.431297,6.0989],[6.56667]] | |||
``` | |||
|
|||
### Partial overwrites | |||
|
|||
You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use overwrite with an overwrite filter `tbl.overwrite(df,overwrite_filter)` to delete partial table data which matches the filter before appending new data. | |
When using the `overwrite` API, you can use an `overwrite_filter` to delete data that that matches the filter before appending new data into the table. |
mkdocs/docs/api.md
Outdated
tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris")) | ||
``` | ||
|
||
This results in such data if data is printed by `tbl.scan().to_arrow()`: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This results in such data if data is printed by `tbl.scan().to_arrow()`: | |
This produces the following result with `tbl.scan().to_arrow()`: |
mkdocs/docs/api.md
Outdated
long: [[74.006],[4.896029,6.0989,2.349014]] | ||
``` | ||
|
||
If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically. | |
If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the existing partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically from the provided arrow table. |
mkdocs/docs/api.md
Outdated
``` | ||
|
||
If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically. | ||
To try out it, you could firstly create a same PyIceberg table with partition specified on `"city"` field: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To try out it, you could firstly create a same PyIceberg table with partition specified on `"city"` field: | |
For example, with an iceberg table with a partition specified on `"city"` field: |
mkdocs/docs/api.md
Outdated
) | ||
``` | ||
|
||
And then suppose the data for the partition of `"paris"` is wrong: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And then suppose the data for the partition of `"paris"` is wrong: | |
And we want to overwrite the data for the partition of `"Paris"`: |
mkdocs/docs/api.md
Outdated
tbl.append(df) | ||
``` | ||
|
||
Then you could use dynamic overwrite on this partition: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then you could use dynamic overwrite on this partition: | |
Then we can call `dynamic_partition_overwrite` with this arrow table: |
mkdocs/docs/api.md
Outdated
tbl.dynamic_partition_overwrite(df_corrected) | ||
``` | ||
|
||
This results in such data if data is printed by `tbl.scan().to_arrow()`: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This results in such data if data is printed by `tbl.scan().to_arrow()`: | |
This produces the following result with `tbl.scan().to_arrow()`: |
pyiceberg/table/__init__.py
Outdated
The function detects partition values in the provided arrow table that using the current table | ||
partition spec, and deletes existing partitions matching these values. Finally, the | ||
data in the table is appended to the table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function detects partition values in the provided arrow table that using the current table | |
partition spec, and deletes existing partitions matching these values. Finally, the | |
data in the table is appended to the table. | |
The function detects partition values in the provided arrow table using the current | |
partition spec, and deletes existing partitions matching these values. Finally, the | |
data in the table is appended to the table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sungwy Thank you for the detailed wording enhancements guidance. I updated the docs. Pls re-review when you get a chance
Thank you for making this contribution @jqin61 ! I'll leave this PR open for another review, especially given that it introduces a new table commit API |
mkdocs/docs/api.md
Outdated
lat: double | ||
long: double | ||
---- | ||
city: [["New York"],["Amsterdam","Drachten","Paris"]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this example is correct. Paris should have been overwritten, right? It looks like we lost San Fran'.
pyiceberg/table/__init__.py
Outdated
@@ -456,6 +461,89 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) | |||
for data_file in data_files: | |||
append_files.append_data_file(data_file) | |||
|
|||
def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanExpression: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a little doc here describing what the function does?
tbl = session_catalog.create_table( | ||
identifier=identifier, | ||
schema=TABLE_SCHEMA, | ||
properties={"format-version": "2"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we're testing the same thing twice :)
properties={"format-version": "2"}, | |
properties={"format-version": str(format_version)}, |
# expecting 3 files: | ||
rows = spark.sql(f"select partition from {identifier}.files").collect() | ||
assert len(rows) == 3 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is also a good test to have:
@pytest.mark.integration
@pytest.mark.parametrize(
"format_version",
[1, 2],
)
def test_dynamic_partition_overwrite_rename_column(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
arrow_table = pa.Table.from_pydict(
{
"place": ["Amsterdam", "Drachten"],
"inhabitants": [921402, 44940],
},
)
identifier = f"default.partitioned_{format_version}_dynamic_partition_overwrite_rename_column"
try:
session_catalog.drop_table(identifier)
except:
pass
tbl = session_catalog.create_table(
identifier= identifier,
schema=arrow_table.schema,
properties={"format-version": str(format_version)},
)
with tbl.transaction() as tx:
with tx.update_spec() as schema:
schema.add_identity("place")
tbl.append(arrow_table)
with tbl.transaction() as tx:
with tx.update_schema() as schema:
schema.rename_column("place", "city")
arrow_table = pa.Table.from_pydict(
{
"city": ["Drachten"],
"inhabitants": [44941], # A new baby was born!
},
)
tbl.dynamic_partition_overwrite(arrow_table)
result = tbl.scan().to_arrow()
assert result['city'].to_pylist() == ['Drachten', 'Amsterdam']
assert result['inhabitants'].to_pylist() == [44941, 921402]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pytest.mark.integration
@pytest.mark.parametrize(
"format_version",
[1, 2],
)
@pytest.mark.filterwarnings("ignore")
def test_dynamic_partition_overwrite_evolve_partition(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
arrow_table = pa.Table.from_pydict(
{
"place": ["Amsterdam", "Drachten"],
"inhabitants": [921402, 44940],
},
)
identifier = f"default.partitioned_{format_version}_test_dynamic_partition_overwrite_evolve_partition"
try:
session_catalog.drop_table(identifier)
except:
pass
tbl = session_catalog.create_table(
identifier=identifier,
schema=arrow_table.schema,
properties={"format-version": str(format_version)},
)
with tbl.transaction() as tx:
with tx.update_spec() as schema:
schema.add_identity("place")
tbl.append(arrow_table)
with tbl.transaction() as tx:
with tx.update_schema() as schema:
schema.add_column("country", StringType())
with tx.update_spec() as schema:
schema.add_identity("country")
arrow_table = pa.Table.from_pydict(
{
"place": ["Groningen"],
"country": ["Netherlands"],
"inhabitants": [238147],
},
)
tbl.dynamic_partition_overwrite(arrow_table)
result = tbl.scan().to_arrow()
assert result['place'].to_pylist() == ['Groningen', 'Amsterdam', 'Drachten']
assert result['inhabitants'].to_pylist() == [238147, 921402, 44940]
pyiceberg/table/__init__.py
Outdated
manifest_merge_enabled = property_as_bool( | ||
self.table_metadata.properties, | ||
TableProperties.MANIFEST_MERGE_ENABLED, | ||
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, | ||
) | ||
update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties) | ||
append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is duplicated below as well, maybe move it into a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small stuff, but apart from that it looks good to me 👍 Thanks for working on this, and sorry for the long wait
@jqin61 Do you have time to follow up on the last few comments? Would be great to get this in 👍 |
…jqin61-dynamic-overwrite-reapplied
Thanks for fixing the CI, shall we rerun and merge? @Fokko Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jqin61 again for contributing this feature!
Added support for dynamic overwrite leveraging delete and fast-append(counterpart in Iceberg Spark).
Several follow-ups:
Closes #1287