Skip to content

Pyarrow data type, default to small type and fix large type override #1859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya

| Key | Example | Description |
| ------------------------------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| pyarrow.use-large-types-on-read | True | Use large PyArrow types i.e. [large_string](https://arrow.apache.org/docs/python/generated/pyarrow.large_string.html), [large_binary](https://arrow.apache.org/docs/python/generated/pyarrow.large_binary.html) and [large_list](https://arrow.apache.org/docs/python/generated/pyarrow.large_list.html) field types on table scans. The default value is True. |
| pyarrow.use-large-types-on-read | False | Force large PyArrow types i.e. [large_string](https://arrow.apache.org/docs/python/generated/pyarrow.large_string.html), [large_binary](https://arrow.apache.org/docs/python/generated/pyarrow.large_binary.html) and [large_list](https://arrow.apache.org/docs/python/generated/pyarrow.large_list.html) field types on table scans. The default value is False. |

<!-- markdown-link-check-enable-->

Expand Down
12 changes: 6 additions & 6 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ def field(self, field: NestedField, field_result: pa.DataType) -> pa.Field:

def list(self, list_type: ListType, element_result: pa.DataType) -> pa.DataType:
element_field = self.field(list_type.element_field, element_result)
return pa.large_list(value_type=element_field)
return pa.list_(value_type=element_field)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that we need to change this. We use schema_to_pyarrow in many places:

  • Schema.as_arrow(), this can be problematic when people already allocate buffers that are larger than what fits in the small ones.
  • _ConvertToArrowExpression.{visit_in,visit_not_in}, I checked manually, and it looks like we can mix large and normal types here :)
  • ArrowProjectionVisitor has the issue similar to what you've described in Arrow: Infer the types when reading #1669 (comment). I think the other way around is also an issue. If you would promote a large_string, it would now produce a binary and not a large_binary.
  • ArrowScan.to_table()will return the schema when there is no data, both small and large are okay.
  • DataScan.to_arrow_batch_reader(), I think we should always update to the large type. Since this is streaming, we don't know upfront if the small buffers are big enough, therefore it is safe to go with the large ones.


def map(self, map_type: MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType:
key_field = self.field(map_type.key_field, key_result)
Expand Down Expand Up @@ -676,7 +676,7 @@ def visit_timestamptz_ns(self, _: TimestamptzNanoType) -> pa.DataType:
return pa.timestamp(unit="ns", tz="UTC")

def visit_string(self, _: StringType) -> pa.DataType:
return pa.large_string()
return pa.string()

def visit_uuid(self, _: UUIDType) -> pa.DataType:
return pa.binary(16)
Expand All @@ -685,7 +685,7 @@ def visit_unknown(self, _: UnknownType) -> pa.DataType:
return pa.null()

def visit_binary(self, _: BinaryType) -> pa.DataType:
return pa.large_binary()
return pa.binary()


def _convert_scalar(value: Any, iceberg_type: IcebergType) -> pa.scalar:
Expand Down Expand Up @@ -1609,7 +1609,7 @@ def _table_from_scan_task(task: FileScanTask) -> pa.Table:
removed_in="0.11.0",
help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.",
)
result = result.cast(arrow_schema)
result = result.cast(_pyarrow_schema_ensure_large_types(arrow_schema))

if self._limit is not None:
return result.slice(0, self._limit)
Expand Down Expand Up @@ -1715,8 +1715,8 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
target_schema = schema_to_pyarrow(
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
)
if self._use_large_types is False:
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
if self._use_large_types is True:
target_schema = _pyarrow_schema_ensure_large_types(target_schema)
return values.cast(target_schema)
elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
if field.field_type == TimestampType():
Expand Down
15 changes: 9 additions & 6 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,9 +872,12 @@ def test_table_scan_keep_types(catalog: Catalog) -> None:


@pytest.mark.integration
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.10.0, will be removed in 0.11.0. Property `pyarrow.use-large-types-on-read` will be removed.:DeprecationWarning"
)
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
identifier = "default.test_table_scan_override_with_small_types"
def test_table_scan_override_with_large_types(catalog: Catalog) -> None:
identifier = "default.test_table_scan_override_with_large_types"
arrow_table = pa.Table.from_arrays(
[
pa.array(["a", "b", "c"]),
Expand All @@ -900,15 +903,15 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
with tbl.update_schema() as update_schema:
update_schema.update_column("string-to-binary", BinaryType())

tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False"
tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "True"
result_table = tbl.scan().to_arrow()

expected_schema = pa.schema(
[
pa.field("string", pa.string()),
pa.field("string", pa.large_string()),
pa.field("string-to-binary", pa.large_binary()),
pa.field("binary", pa.binary()),
pa.field("list", pa.list_(pa.string())),
pa.field("binary", pa.large_binary()),
pa.field("list", pa.large_list(pa.large_string())),
]
)
assert result_table.schema.equals(expected_schema)
Expand Down
90 changes: 45 additions & 45 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def test_pyarrow_unified_session_properties() -> None:

def test_schema_to_pyarrow_schema_include_field_ids(table_schema_nested: Schema) -> None:
actual = schema_to_pyarrow(table_schema_nested)
expected = """foo: large_string
expected = """foo: string
-- field metadata --
PARQUET:field_id: '1'
bar: int32 not null
Expand All @@ -415,20 +415,20 @@ def test_schema_to_pyarrow_schema_include_field_ids(table_schema_nested: Schema)
baz: bool
-- field metadata --
PARQUET:field_id: '3'
qux: large_list<element: large_string not null> not null
child 0, element: large_string not null
qux: list<element: string not null> not null
child 0, element: string not null
-- field metadata --
PARQUET:field_id: '5'
-- field metadata --
PARQUET:field_id: '4'
quux: map<large_string, map<large_string, int32>> not null
child 0, entries: struct<key: large_string not null, value: map<large_string, int32> not null> not null
child 0, key: large_string not null
quux: map<string, map<string, int32>> not null
child 0, entries: struct<key: string not null, value: map<string, int32> not null> not null
child 0, key: string not null
-- field metadata --
PARQUET:field_id: '7'
child 1, value: map<large_string, int32> not null
child 0, entries: struct<key: large_string not null, value: int32 not null> not null
child 0, key: large_string not null
child 1, value: map<string, int32> not null
child 0, entries: struct<key: string not null, value: int32 not null> not null
child 0, key: string not null
-- field metadata --
PARQUET:field_id: '9'
child 1, value: int32 not null
Expand All @@ -438,7 +438,7 @@ def test_schema_to_pyarrow_schema_include_field_ids(table_schema_nested: Schema)
PARQUET:field_id: '8'
-- field metadata --
PARQUET:field_id: '6'
location: large_list<element: struct<latitude: float, longitude: float> not null> not null
location: list<element: struct<latitude: float, longitude: float> not null> not null
child 0, element: struct<latitude: float, longitude: float> not null
child 0, latitude: float
-- field metadata --
Expand All @@ -450,8 +450,8 @@ def test_schema_to_pyarrow_schema_include_field_ids(table_schema_nested: Schema)
PARQUET:field_id: '12'
-- field metadata --
PARQUET:field_id: '11'
person: struct<name: large_string, age: int32 not null>
child 0, name: large_string
person: struct<name: string, age: int32 not null>
child 0, name: string
-- field metadata --
PARQUET:field_id: '16'
child 1, age: int32 not null
Expand All @@ -464,24 +464,24 @@ def test_schema_to_pyarrow_schema_include_field_ids(table_schema_nested: Schema)

def test_schema_to_pyarrow_schema_exclude_field_ids(table_schema_nested: Schema) -> None:
actual = schema_to_pyarrow(table_schema_nested, include_field_ids=False)
expected = """foo: large_string
expected = """foo: string
bar: int32 not null
baz: bool
qux: large_list<element: large_string not null> not null
child 0, element: large_string not null
quux: map<large_string, map<large_string, int32>> not null
child 0, entries: struct<key: large_string not null, value: map<large_string, int32> not null> not null
child 0, key: large_string not null
child 1, value: map<large_string, int32> not null
child 0, entries: struct<key: large_string not null, value: int32 not null> not null
child 0, key: large_string not null
qux: list<element: string not null> not null
child 0, element: string not null
quux: map<string, map<string, int32>> not null
child 0, entries: struct<key: string not null, value: map<string, int32> not null> not null
child 0, key: string not null
child 1, value: map<string, int32> not null
child 0, entries: struct<key: string not null, value: int32 not null> not null
child 0, key: string not null
child 1, value: int32 not null
location: large_list<element: struct<latitude: float, longitude: float> not null> not null
location: list<element: struct<latitude: float, longitude: float> not null> not null
child 0, element: struct<latitude: float, longitude: float> not null
child 0, latitude: float
child 1, longitude: float
person: struct<name: large_string, age: int32 not null>
child 0, name: large_string
person: struct<name: string, age: int32 not null>
child 0, name: string
child 1, age: int32 not null"""
assert repr(actual) == expected

Expand Down Expand Up @@ -546,18 +546,18 @@ def test_timestamptz_type_to_pyarrow() -> None:

def test_string_type_to_pyarrow() -> None:
iceberg_type = StringType()
assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.large_string()
assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.string()


def test_binary_type_to_pyarrow() -> None:
iceberg_type = BinaryType()
assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.large_binary()
assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.binary()


def test_struct_type_to_pyarrow(table_schema_simple: Schema) -> None:
expected = pa.struct(
[
pa.field("foo", pa.large_string(), nullable=True, metadata={"field_id": "1"}),
pa.field("foo", pa.string(), nullable=True, metadata={"field_id": "1"}),
pa.field("bar", pa.int32(), nullable=False, metadata={"field_id": "2"}),
pa.field("baz", pa.bool_(), nullable=True, metadata={"field_id": "3"}),
]
Expand All @@ -575,7 +575,7 @@ def test_map_type_to_pyarrow() -> None:
)
assert visit(iceberg_map, _ConvertToArrowSchema()) == pa.map_(
pa.field("key", pa.int32(), nullable=False, metadata={"field_id": "1"}),
pa.field("value", pa.large_string(), nullable=False, metadata={"field_id": "2"}),
pa.field("value", pa.string(), nullable=False, metadata={"field_id": "2"}),
)


Expand All @@ -585,7 +585,7 @@ def test_list_type_to_pyarrow() -> None:
element_type=IntegerType(),
element_required=True,
)
assert visit(iceberg_map, _ConvertToArrowSchema()) == pa.large_list(
assert visit(iceberg_map, _ConvertToArrowSchema()) == pa.list_(
pa.field("element", pa.int32(), nullable=False, metadata={"field_id": "1"})
)

Expand Down Expand Up @@ -668,11 +668,11 @@ def test_expr_less_than_or_equal_to_pyarrow(bound_reference: BoundReference[str]

def test_expr_in_to_pyarrow(bound_reference: BoundReference[str]) -> None:
assert repr(expression_to_pyarrow(BoundIn(bound_reference, {literal("hello"), literal("world")}))) in (
"""<pyarrow.compute.Expression is_in(foo, {value_set=large_string:[
"""<pyarrow.compute.Expression is_in(foo, {value_set=string:[
"hello",
"world"
], null_matching_behavior=MATCH})>""",
"""<pyarrow.compute.Expression is_in(foo, {value_set=large_string:[
"""<pyarrow.compute.Expression is_in(foo, {value_set=string:[
"world",
"hello"
], null_matching_behavior=MATCH})>""",
Expand All @@ -681,11 +681,11 @@ def test_expr_in_to_pyarrow(bound_reference: BoundReference[str]) -> None:

def test_expr_not_in_to_pyarrow(bound_reference: BoundReference[str]) -> None:
assert repr(expression_to_pyarrow(BoundNotIn(bound_reference, {literal("hello"), literal("world")}))) in (
"""<pyarrow.compute.Expression invert(is_in(foo, {value_set=large_string:[
"""<pyarrow.compute.Expression invert(is_in(foo, {value_set=string:[
"hello",
"world"
], null_matching_behavior=MATCH}))>""",
"""<pyarrow.compute.Expression invert(is_in(foo, {value_set=large_string:[
"""<pyarrow.compute.Expression invert(is_in(foo, {value_set=string:[
"world",
"hello"
], null_matching_behavior=MATCH}))>""",
Expand Down Expand Up @@ -1030,12 +1030,12 @@ def test_projection_add_column(file_int: str) -> None:
assert (
repr(result_table.schema)
== """id: int32
list: large_list<element: int32>
list: list<element: int32>
child 0, element: int32
map: map<int32, large_string>
child 0, entries: struct<key: int32 not null, value: large_string> not null
map: map<int32, string>
child 0, entries: struct<key: int32 not null, value: string> not null
child 0, key: int32 not null
child 1, value: large_string
child 1, value: string
location: struct<lat: double, lon: double>
child 0, lat: double
child 1, lon: double"""
Expand All @@ -1051,7 +1051,7 @@ def test_read_list(schema_list: Schema, file_list: str) -> None:

assert (
repr(result_table.schema)
== """ids: large_list<element: int32>
== """ids: list<element: int32>
child 0, element: int32"""
)

Expand Down Expand Up @@ -1088,10 +1088,10 @@ def test_projection_add_column_struct(schema_int: Schema, file_int: str) -> None
assert r.as_py() is None
assert (
repr(result_table.schema)
== """id: map<int32, large_string>
child 0, entries: struct<key: int32 not null, value: large_string> not null
== """id: map<int32, string>
child 0, entries: struct<key: int32 not null, value: string> not null
child 0, key: int32 not null
child 1, value: large_string"""
child 1, value: string"""
)


Expand Down Expand Up @@ -1422,7 +1422,7 @@ def test_projection_list_of_structs(schema_list_of_structs: Schema, file_list_of
]
assert (
repr(result_table.schema)
== """locations: large_list<element: struct<latitude: double not null, longitude: double not null, altitude: double>>
== """locations: list<element: struct<latitude: double not null, longitude: double not null, altitude: double>>
child 0, element: struct<latitude: double not null, longitude: double not null, altitude: double>
child 0, latitude: double not null
child 1, longitude: double not null
Expand Down Expand Up @@ -1567,7 +1567,7 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp
assert (
str(with_deletes)
== """pyarrow.Table
foo: large_string
foo: string
bar: int32 not null
baz: bool
----
Expand Down Expand Up @@ -1604,7 +1604,7 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_
assert (
str(with_deletes)
== """pyarrow.Table
foo: large_string
foo: string
bar: int32 not null
baz: bool
----
Expand Down Expand Up @@ -1635,7 +1635,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Sc
assert (
str(projection)
== """pyarrow.Table
foo: large_string
foo: string
bar: int32 not null
baz: bool
----
Expand Down
4 changes: 2 additions & 2 deletions tests/io/test_pyarrow_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,14 @@ def test_pyarrow_timestamp_tz_invalid_tz() -> None:
def test_pyarrow_string_to_iceberg(pyarrow_type: pa.DataType) -> None:
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == StringType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.large_string()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.string()


@pytest.mark.parametrize("pyarrow_type", [pa.binary(), pa.large_binary(), pa.binary_view()])
def test_pyarrow_variable_binary_to_iceberg(pyarrow_type: pa.DataType) -> None:
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == BinaryType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.large_binary()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pa.binary()


def test_pyarrow_struct_to_iceberg() -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1648,7 +1648,7 @@ def test_arrow_schema() -> None:

expected_schema = pa.schema(
[
pa.field("foo", pa.large_string(), nullable=False),
pa.field("foo", pa.string(), nullable=False),
pa.field("bar", pa.int32(), nullable=True),
pa.field("baz", pa.bool_(), nullable=True),
]
Expand Down