Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement nested Parquet writing for High-Precision Decimals #19476

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use polars_error::{polars_bail, PolarsResult};
use super::binary::{
build_statistics as binary_build_statistics, encode_plain as binary_encode_plain,
};
use super::fixed_len_bytes::{
use super::fixed_size_binary::{
build_statistics as fixed_binary_build_statistics, encode_plain as fixed_binary_encode_plain,
};
use super::pages::PrimitiveNested;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use arrow::array::{Array, FixedSizeBinaryArray};
use polars_error::PolarsResult;

use super::encode_plain;
use crate::parquet::page::DataPage;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::FixedLenStatistics;
use crate::read::schema::is_nullable;
use crate::write::{utils, EncodeNullability, Encoding, WriteOptions};

pub fn array_to_page(
array: &FixedSizeBinaryArray,
options: WriteOptions,
type_: PrimitiveType,
statistics: Option<FixedLenStatistics>,
) -> PolarsResult<DataPage> {
let is_optional = is_nullable(&type_.field_info);
let encode_options = EncodeNullability::new(is_optional);

let validity = array.validity();

let mut buffer = vec![];
utils::write_def_levels(
&mut buffer,
is_optional,
validity,
array.len(),
options.version,
)?;

let definition_levels_byte_length = buffer.len();

encode_plain(array, encode_options, &mut buffer);

utils::build_plain_page(
buffer,
array.len(),
array.len(),
array.null_count(),
0,
definition_levels_byte_length,
statistics.map(|x| x.serialize()),
type_,
options,
Encoding::Plain,
)
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
mod basic;
mod nested;

use arrow::array::{Array, FixedSizeBinaryArray, PrimitiveArray};
use arrow::types::i256;
use polars_error::PolarsResult;
pub use basic::array_to_page;
pub use nested::array_to_page as nested_array_to_page;

use super::binary::ord_binary;
use super::{utils, EncodeNullability, StatisticsOptions, WriteOptions};
use crate::arrow::read::schema::is_nullable;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::DataPage;
use super::{EncodeNullability, StatisticsOptions};
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::FixedLenStatistics;

Expand All @@ -27,44 +28,6 @@ pub(crate) fn encode_plain(
}
}

pub fn array_to_page(
array: &FixedSizeBinaryArray,
options: WriteOptions,
type_: PrimitiveType,
statistics: Option<FixedLenStatistics>,
) -> PolarsResult<DataPage> {
let is_optional = is_nullable(&type_.field_info);
let encode_options = EncodeNullability::new(is_optional);

let validity = array.validity();

let mut buffer = vec![];
utils::write_def_levels(
&mut buffer,
is_optional,
validity,
array.len(),
options.version,
)?;

let definition_levels_byte_length = buffer.len();

encode_plain(array, encode_options, &mut buffer);

utils::build_plain_page(
buffer,
array.len(),
array.len(),
array.null_count(),
0,
definition_levels_byte_length,
statistics.map(|x| x.serialize()),
type_,
options,
Encoding::Plain,
)
}

pub(super) fn build_statistics(
array: &FixedSizeBinaryArray,
primitive_type: PrimitiveType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use arrow::array::{Array, FixedSizeBinaryArray};
use polars_error::PolarsResult;

use super::encode_plain;
use crate::parquet::page::DataPage;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::FixedLenStatistics;
use crate::read::schema::is_nullable;
use crate::write::{nested, utils, EncodeNullability, Encoding, Nested, WriteOptions};

pub fn array_to_page(
array: &FixedSizeBinaryArray,
options: WriteOptions,
type_: PrimitiveType,
nested: &[Nested],
statistics: Option<FixedLenStatistics>,
) -> PolarsResult<DataPage> {
let is_optional = is_nullable(&type_.field_info);
let encode_options = EncodeNullability::new(is_optional);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

encode_plain(array, encode_options, &mut buffer);

utils::build_plain_page(
buffer,
nested::num_values(nested),
nested[0].len(),
array.null_count(),
repetition_levels_byte_length,
definition_levels_byte_length,
statistics.map(|x| x.serialize()),
type_,
options,
Encoding::Plain,
)
}
38 changes: 19 additions & 19 deletions crates/polars-parquet/src/arrow/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod binview;
mod boolean;
mod dictionary;
mod file;
mod fixed_len_bytes;
mod fixed_size_binary;
mod nested;
mod pages;
mod primitive;
Expand Down Expand Up @@ -528,15 +528,15 @@ pub fn array_to_page_simple(
array.validity().cloned(),
);
let statistics = if options.has_statistics() {
Some(fixed_len_bytes::build_statistics(
Some(fixed_size_binary::build_statistics(
&array,
type_.clone(),
&options.statistics,
))
} else {
None
};
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::array_to_page(&array, options, type_, statistics)
},
ArrowDataType::Interval(IntervalUnit::DayTime) => {
let array = array
Expand All @@ -555,20 +555,20 @@ pub fn array_to_page_simple(
array.validity().cloned(),
);
let statistics = if options.has_statistics() {
Some(fixed_len_bytes::build_statistics(
Some(fixed_size_binary::build_statistics(
&array,
type_.clone(),
&options.statistics,
))
} else {
None
};
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::array_to_page(&array, options, type_, statistics)
},
ArrowDataType::FixedSizeBinary(_) => {
let array = array.as_any().downcast_ref().unwrap();
let statistics = if options.has_statistics() {
Some(fixed_len_bytes::build_statistics(
Some(fixed_size_binary::build_statistics(
array,
type_.clone(),
&options.statistics,
Expand All @@ -577,7 +577,7 @@ pub fn array_to_page_simple(
None
};

fixed_len_bytes::array_to_page(array, options, type_, statistics)
fixed_size_binary::array_to_page(array, options, type_, statistics)
},
ArrowDataType::Decimal256(precision, _) => {
let precision = *precision;
Expand Down Expand Up @@ -620,7 +620,7 @@ pub fn array_to_page_simple(
} else if precision <= 38 {
let size = decimal_length_from_precision(precision);
let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal256_with_i128(
let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
array,
type_.clone(),
size,
Expand All @@ -641,15 +641,15 @@ pub fn array_to_page_simple(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::array_to_page(&array, options, type_, statistics)
} else {
let size = 32;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
.unwrap();
let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal256(
let stats = fixed_size_binary::build_statistics_decimal256(
array,
type_.clone(),
size,
Expand All @@ -670,7 +670,7 @@ pub fn array_to_page_simple(
array.validity().cloned(),
);

fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::array_to_page(&array, options, type_, statistics)
}
},
ArrowDataType::Decimal(precision, _) => {
Expand Down Expand Up @@ -715,7 +715,7 @@ pub fn array_to_page_simple(
let size = decimal_length_from_precision(precision);

let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal(
let stats = fixed_size_binary::build_statistics_decimal(
array,
type_.clone(),
size,
Expand All @@ -736,7 +736,7 @@ pub fn array_to_page_simple(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::array_to_page(&array, options, type_, statistics)
}
},
other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
Expand Down Expand Up @@ -858,7 +858,7 @@ fn array_to_page_nested(
let size = decimal_length_from_precision(precision);

let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal(
let stats = fixed_size_binary::build_statistics_decimal(
array,
type_.clone(),
size,
Expand All @@ -879,7 +879,7 @@ fn array_to_page_nested(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
}
},
Decimal256(precision, _) => {
Expand Down Expand Up @@ -919,7 +919,7 @@ fn array_to_page_nested(
} else if precision <= 38 {
let size = decimal_length_from_precision(precision);
let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal256_with_i128(
let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
array,
type_.clone(),
size,
Expand All @@ -940,15 +940,15 @@ fn array_to_page_nested(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
} else {
let size = 32;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
.unwrap();
let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal256(
let stats = fixed_size_binary::build_statistics_decimal256(
array,
type_.clone(),
size,
Expand All @@ -969,7 +969,7 @@ fn array_to_page_nested(
array.validity().cloned(),
);

fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
}
},
other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
Expand Down
46 changes: 45 additions & 1 deletion py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import decimal
import io
from datetime import datetime, time, timezone
from decimal import Decimal
from typing import IO, TYPE_CHECKING, Any, Literal, cast
from typing import IO, TYPE_CHECKING, Any, Callable, Literal, cast

import fsspec
import numpy as np
Expand Down Expand Up @@ -1995,6 +1996,49 @@ def test_nested_nonnullable_19158() -> None:
assert_frame_equal(pl.read_parquet(f), pl.DataFrame(tbl))


D = Decimal


@pytest.mark.parametrize("precision", range(1, 37, 2))
@pytest.mark.parametrize(
"nesting",
[
# Struct
lambda t: ([{"x": None}, None], pl.Struct({"x": t})),
lambda t: ([None, {"x": None}], pl.Struct({"x": t})),
lambda t: ([{"x": D("1.5")}, None], pl.Struct({"x": t})),
lambda t: ([{"x": D("1.5")}, {"x": D("4.8")}], pl.Struct({"x": t})),
# Array
lambda t: ([[None, None, D("8.2")], None], pl.Array(t, 3)),
lambda t: ([None, [None, D("8.9"), None]], pl.Array(t, 3)),
lambda t: ([[D("1.5"), D("3.7"), D("4.1")], None], pl.Array(t, 3)),
lambda t: (
[[D("1.5"), D("3.7"), D("4.1")], [D("2.8"), D("5.2"), D("8.9")]],
pl.Array(t, 3),
),
# List
lambda t: ([[None, D("8.2")], None], pl.List(t)),
lambda t: ([None, [D("8.9"), None]], pl.List(t)),
lambda t: ([[D("1.5"), D("4.1")], None], pl.List(t)),
lambda t: ([[D("1.5"), D("3.7"), D("4.1")], [D("2.8"), D("8.9")]], pl.List(t)),
],
)
def test_decimal_precision_nested_roundtrip(
nesting: Callable[[pl.DataType], tuple[list[Any], pl.DataType]],
precision: int,
) -> None:
# Limit the context as to not disturb any other tests
with decimal.localcontext() as ctx:
ctx.prec = precision

decimal_dtype = pl.Decimal(precision=precision)
values, dtype = nesting(decimal_dtype)

df = pl.Series("a", values, dtype).to_frame()

test_round_trip(df)


@pytest.mark.parametrize("parallel", ["prefiltered", "columns", "row_groups", "auto"])
def test_conserve_sortedness(
monkeypatch: Any, capfd: Any, parallel: pl.ParallelStrategy
Expand Down