Skip to content

Commit 952d7c0

Browse files
authored
Add Support for Dynamic Overwrite (#931)
1 parent 2529c81 commit 952d7c0

File tree

5 files changed

+633
-29
lines changed

5 files changed

+633
-29
lines changed

mkdocs/docs/api.md

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,127 @@ lat: [[52.371807,37.773972,53.11254],[53.21917]]
353353
long: [[4.896029,-122.431297,6.0989],[6.56667]]
354354
```
355355

356+
### Partial overwrites
357+
358+
When using the `overwrite` API, you can use an `overwrite_filter` to delete data that matches the filter before appending new data into the table.
359+
360+
For example, with an iceberg table created as:
361+
362+
```python
363+
from pyiceberg.catalog import load_catalog
364+
365+
catalog = load_catalog("default")
366+
367+
from pyiceberg.schema import Schema
368+
from pyiceberg.types import NestedField, StringType, DoubleType
369+
370+
schema = Schema(
371+
NestedField(1, "city", StringType(), required=False),
372+
NestedField(2, "lat", DoubleType(), required=False),
373+
NestedField(3, "long", DoubleType(), required=False),
374+
)
375+
376+
tbl = catalog.create_table("default.cities", schema=schema)
377+
```
378+
379+
And with initial data populating the table:
380+
381+
```python
382+
import pyarrow as pa
383+
df = pa.Table.from_pylist(
384+
[
385+
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
386+
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
387+
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
388+
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
389+
],
390+
)
391+
tbl.append(df)
392+
```
393+
394+
You can overwrite the record of `Paris` with a record of `New York`:
395+
396+
```python
397+
from pyiceberg.expressions import EqualTo
398+
df = pa.Table.from_pylist(
399+
[
400+
{"city": "New York", "lat": 40.7128, "long": 74.0060},
401+
]
402+
)
403+
tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris"))
404+
```
405+
406+
This produces the following result with `tbl.scan().to_arrow()`:
407+
408+
```python
409+
pyarrow.Table
410+
city: large_string
411+
lat: double
412+
long: double
413+
----
414+
city: [["New York"],["Amsterdam","San Francisco","Drachten"]]
415+
lat: [[40.7128],[52.371807,37.773972,53.11254]]
416+
long: [[74.006],[4.896029,-122.431297,6.0989]]
417+
```
418+
419+
If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the existing partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically from the provided arrow table.
420+
For example, with an iceberg table with a partition specified on `"city"` field:
421+
422+
```python
423+
from pyiceberg.schema import Schema
424+
from pyiceberg.types import DoubleType, NestedField, StringType
425+
426+
schema = Schema(
427+
NestedField(1, "city", StringType(), required=False),
428+
NestedField(2, "lat", DoubleType(), required=False),
429+
NestedField(3, "long", DoubleType(), required=False),
430+
)
431+
432+
tbl = catalog.create_table(
433+
"default.cities",
434+
schema=schema,
435+
partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="city_identity"))
436+
)
437+
```
438+
439+
And we want to overwrite the data for the partition of `"Paris"`:
440+
441+
```python
442+
import pyarrow as pa
443+
444+
df = pa.Table.from_pylist(
445+
[
446+
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
447+
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
448+
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
449+
{"city": "Paris", "lat": -48.864716, "long": -2.349014},
450+
],
451+
)
452+
tbl.append(df)
453+
```
454+
455+
Then we can call `dynamic_partition_overwrite` with this arrow table:
456+
457+
```python
458+
df_corrected = pa.Table.from_pylist([
459+
{"city": "Paris", "lat": 48.864716, "long": 2.349014}
460+
])
461+
tbl.dynamic_partition_overwrite(df_corrected)
462+
```
463+
464+
This produces the following result with `tbl.scan().to_arrow()`:
465+
466+
```python
467+
pyarrow.Table
468+
city: large_string
469+
lat: double
470+
long: double
471+
----
472+
city: [["Paris"],["Amsterdam"],["Drachten"],["San Francisco"]]
473+
lat: [[48.864716],[52.371807],[53.11254],[37.773972]]
474+
long: [[2.349014],[4.896029],[6.0989],[-122.431297]]
475+
```
476+
356477
## Inspecting tables
357478

358479
To explore the table metadata, tables can be inspected.

pyiceberg/expressions/literals.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,10 @@ def _(self, _: TimeType) -> Literal[int]:
311311
def _(self, _: TimestampType) -> Literal[int]:
312312
return TimestampLiteral(self.value)
313313

314+
@to.register(TimestamptzType)
315+
def _(self, _: TimestamptzType) -> Literal[int]:
316+
return TimestampLiteral(self.value)
317+
314318
@to.register(DecimalType)
315319
def _(self, type_var: DecimalType) -> Literal[Decimal]:
316320
unscaled = Decimal(self.value)

pyiceberg/io/pyarrow.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2519,7 +2519,6 @@ def _check_pyarrow_schema_compatible(
25192519
raise ValueError(
25202520
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
25212521
) from e
2522-
25232522
_check_schema_compatible(requested_schema, provided_schema)
25242523

25252524

pyiceberg/table/__init__.py

Lines changed: 120 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,14 @@
4444

4545
import pyiceberg.expressions.parser as parser
4646
from pyiceberg.expressions import (
47+
AlwaysFalse,
4748
AlwaysTrue,
4849
And,
4950
BooleanExpression,
5051
EqualTo,
52+
IsNull,
53+
Or,
54+
Reference,
5155
)
5256
from pyiceberg.expressions.visitors import (
5357
_InclusiveMetricsEvaluator,
@@ -117,6 +121,7 @@
117121
_OverwriteFiles,
118122
)
119123
from pyiceberg.table.update.spec import UpdateSpec
124+
from pyiceberg.transforms import IdentityTransform
120125
from pyiceberg.typedef import (
121126
EMPTY_DICT,
122127
IcebergBaseModel,
@@ -344,6 +349,48 @@ def _set_ref_snapshot(
344349

345350
return updates, requirements
346351

352+
def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanExpression:
353+
"""Build a filter predicate matching any of the input partition records.
354+
355+
Args:
356+
partition_records: A set of partition records to match
357+
Returns:
358+
A predicate matching any of the input partition records.
359+
"""
360+
partition_spec = self.table_metadata.spec()
361+
schema = self.table_metadata.schema()
362+
partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields]
363+
364+
expr: BooleanExpression = AlwaysFalse()
365+
for partition_record in partition_records:
366+
match_partition_expression: BooleanExpression = AlwaysTrue()
367+
368+
for pos, partition_field in enumerate(partition_fields):
369+
predicate = (
370+
EqualTo(Reference(partition_field), partition_record[pos])
371+
if partition_record[pos] is not None
372+
else IsNull(Reference(partition_field))
373+
)
374+
match_partition_expression = And(match_partition_expression, predicate)
375+
expr = Or(expr, match_partition_expression)
376+
return expr
377+
378+
def _append_snapshot_producer(self, snapshot_properties: Dict[str, str]) -> _FastAppendFiles:
379+
"""Determine the append type based on table properties.
380+
381+
Args:
382+
snapshot_properties: Custom properties to be added to the snapshot summary
383+
Returns:
384+
Either a fast-append or a merge-append snapshot producer.
385+
"""
386+
manifest_merge_enabled = property_as_bool(
387+
self.table_metadata.properties,
388+
TableProperties.MANIFEST_MERGE_ENABLED,
389+
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
390+
)
391+
update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
392+
return update_snapshot.merge_append() if manifest_merge_enabled else update_snapshot.fast_append()
393+
347394
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
348395
"""Create a new UpdateSchema to alter the columns of this table.
349396
@@ -398,15 +445,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
398445
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
399446
)
400447

401-
manifest_merge_enabled = property_as_bool(
402-
self.table_metadata.properties,
403-
TableProperties.MANIFEST_MERGE_ENABLED,
404-
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
405-
)
406-
update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
407-
append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append
408-
409-
with append_method() as append_files:
448+
with self._append_snapshot_producer(snapshot_properties) as append_files:
410449
# skip writing data files if the dataframe is empty
411450
if df.shape[0] > 0:
412451
data_files = _dataframe_to_data_files(
@@ -415,6 +454,62 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
415454
for data_file in data_files:
416455
append_files.append_data_file(data_file)
417456

457+
def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
458+
"""
459+
Shorthand for overwriting existing partitions with a PyArrow table.
460+
461+
The function detects partition values in the provided arrow table using the current
462+
partition spec, and deletes existing partitions matching these values. Finally, the
463+
data in the table is appended to the table.
464+
465+
Args:
466+
df: The Arrow dataframe that will be used to overwrite the table
467+
snapshot_properties: Custom properties to be added to the snapshot summary
468+
"""
469+
try:
470+
import pyarrow as pa
471+
except ModuleNotFoundError as e:
472+
raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e
473+
474+
from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files
475+
476+
if not isinstance(df, pa.Table):
477+
raise ValueError(f"Expected PyArrow table, got: {df}")
478+
479+
if self.table_metadata.spec().is_unpartitioned():
480+
raise ValueError("Cannot apply dynamic overwrite on an unpartitioned table.")
481+
482+
for field in self.table_metadata.spec().fields:
483+
if not isinstance(field.transform, IdentityTransform):
484+
raise ValueError(
485+
f"For now dynamic overwrite does not support a table with non-identity-transform field in the latest partition spec: {field}"
486+
)
487+
488+
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
489+
_check_pyarrow_schema_compatible(
490+
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
491+
)
492+
493+
# If dataframe does not have data, there is no need to overwrite
494+
if df.shape[0] == 0:
495+
return
496+
497+
append_snapshot_commit_uuid = uuid.uuid4()
498+
data_files: List[DataFile] = list(
499+
_dataframe_to_data_files(
500+
table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
501+
)
502+
)
503+
504+
partitions_to_overwrite = {data_file.partition for data_file in data_files}
505+
delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite)
506+
self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)
507+
508+
with self._append_snapshot_producer(snapshot_properties) as append_files:
509+
append_files.commit_uuid = append_snapshot_commit_uuid
510+
for data_file in data_files:
511+
append_files.append_data_file(data_file)
512+
418513
def overwrite(
419514
self,
420515
df: pa.Table,
@@ -461,14 +556,14 @@ def overwrite(
461556

462557
self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties)
463558

464-
with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
559+
with self._append_snapshot_producer(snapshot_properties) as append_files:
465560
# skip writing data files if the dataframe is empty
466561
if df.shape[0] > 0:
467562
data_files = _dataframe_to_data_files(
468-
table_metadata=self.table_metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io
563+
table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
469564
)
470565
for data_file in data_files:
471-
update_snapshot.append_data_file(data_file)
566+
append_files.append_data_file(data_file)
472567

473568
def delete(
474569
self,
@@ -552,9 +647,8 @@ def delete(
552647
))
553648

554649
if len(replaced_files) > 0:
555-
with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite(
556-
commit_uuid=commit_uuid
557-
) as overwrite_snapshot:
650+
with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as overwrite_snapshot:
651+
overwrite_snapshot.commit_uuid = commit_uuid
558652
for original_data_file, replaced_data_files in replaced_files:
559653
overwrite_snapshot.delete_data_file(original_data_file)
560654
for replaced_data_file in replaced_data_files:
@@ -989,6 +1083,17 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
9891083
with self.transaction() as tx:
9901084
tx.append(df=df, snapshot_properties=snapshot_properties)
9911085

1086+
def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
1087+
"""Shorthand for dynamic overwriting the table with a PyArrow table.
1088+
1089+
Old partitions are auto detected and replaced with data files created for input arrow table.
1090+
Args:
1091+
df: The Arrow dataframe that will be used to overwrite the table
1092+
snapshot_properties: Custom properties to be added to the snapshot summary
1093+
"""
1094+
with self.transaction() as tx:
1095+
tx.dynamic_partition_overwrite(df=df, snapshot_properties=snapshot_properties)
1096+
9921097
def overwrite(
9931098
self,
9941099
df: pa.Table,

0 commit comments

Comments
 (0)