-
Notifications
You must be signed in to change notification settings - Fork 267
Fallback for upsert when arrow cannot compare source rows with target rows #1878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @koenvo It looks like a lot of folks are waiting for this.
Could you run a poormans benchmark, similar to what I did here: #1685 (comment) Just to see how the two methods compare in terms of performance?
pyiceberg/table/upsert_util.py
Outdated
|
||
MARKER_COLUMN_NAME = "__from_target" | ||
|
||
assert MARKER_COLUMN_NAME not in join_cols_set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try to avoid assert
outside of the tests. Could you raise a ValueError
instead?
Poor man's benchmarkThis compares the performance of the original vs fallback
No significant performance regression observed. Fallback behaves as expected. |
First of all, sorry for the late reply, I was busy with the Iceberg summit :) @koenvo The reason I was asking for a benchmark is to see if we can replace the existing logic with your logic that also works with |
pyiceberg/table/upsert_util.py
Outdated
|
||
# Step 2: Prepare target index with join keys and a marker | ||
target_index = target_table.select(join_cols_set).append_column( | ||
MARKER_COLUMN_NAME, pa.array([True] * len(target_table), pa.bool_()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can optimize this allocation by avoiding creating a Python array, but that can be done in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed :-)
Rationale for this change
Upsert operations in PyIceberg rely on Arrow joins between source and target rows. However, Arrow Acero cannot compare certain complex types — like
struct
,list
, andmap
— unless they’re part of the join key. When such types exist in non-join columns, the upsert fails with an error like:ArrowInvalid: Data type struct<...> is not supported in join non-key field venue_geo
This PR introduces a fallback mechanism: if Arrow fails to join due to unsupported types, we fall back to comparing only the key columns. Non-key complex fields are ignored in the join condition, but still retained in the final upserted data.
Before
After
✅ Are these changes tested?
Yes:
Are there any user-facing changes?
Yes — upserts involving complex non-key columns (like
struct
,list
, ormap
) no longer fail. They now succeed by skipping unsupported comparisons during the join phase.