From 2d122f68d9f2969e76d2da8aa35cee1b6871ce4d Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Sun, 27 Oct 2024 10:19:29 +0100 Subject: [PATCH] feat: Implement nested Parquet writing for High-Precision Decimals This implements writing the writing of arbitrary nested `FixedSizeBinary`, which in particular allows for the writing of nested high-precision decimals. This removes the silent data-loss that was happening before. Fixes #19448. --- .../src/arrow/write/dictionary.rs | 2 +- .../arrow/write/fixed_size_binary/basic.rs | 47 ++++++++++++++++++ .../mod.rs} | 49 +++---------------- .../arrow/write/fixed_size_binary/nested.rs | 39 +++++++++++++++ crates/polars-parquet/src/arrow/write/mod.rs | 38 +++++++------- py-polars/tests/unit/io/test_parquet.py | 46 ++++++++++++++++- 6 files changed, 157 insertions(+), 64 deletions(-) create mode 100644 crates/polars-parquet/src/arrow/write/fixed_size_binary/basic.rs rename crates/polars-parquet/src/arrow/write/{fixed_len_bytes.rs => fixed_size_binary/mod.rs} (79%) create mode 100644 crates/polars-parquet/src/arrow/write/fixed_size_binary/nested.rs diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index 17527fc488f7..fc97c268c0fd 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -13,7 +13,7 @@ use polars_error::{polars_bail, PolarsResult}; use super::binary::{ build_statistics as binary_build_statistics, encode_plain as binary_encode_plain, }; -use super::fixed_len_bytes::{ +use super::fixed_size_binary::{ build_statistics as fixed_binary_build_statistics, encode_plain as fixed_binary_encode_plain, }; use super::pages::PrimitiveNested; diff --git a/crates/polars-parquet/src/arrow/write/fixed_size_binary/basic.rs b/crates/polars-parquet/src/arrow/write/fixed_size_binary/basic.rs new file mode 100644 index 000000000000..27151ce51f70 --- /dev/null +++ b/crates/polars-parquet/src/arrow/write/fixed_size_binary/basic.rs @@ -0,0 +1,47 @@ +use arrow::array::{Array, FixedSizeBinaryArray}; +use polars_error::PolarsResult; + +use super::encode_plain; +use crate::parquet::page::DataPage; +use crate::parquet::schema::types::PrimitiveType; +use crate::parquet::statistics::FixedLenStatistics; +use crate::read::schema::is_nullable; +use crate::write::{utils, EncodeNullability, Encoding, WriteOptions}; + +pub fn array_to_page( + array: &FixedSizeBinaryArray, + options: WriteOptions, + type_: PrimitiveType, + statistics: Option, +) -> PolarsResult { + let is_optional = is_nullable(&type_.field_info); + let encode_options = EncodeNullability::new(is_optional); + + let validity = array.validity(); + + let mut buffer = vec![]; + utils::write_def_levels( + &mut buffer, + is_optional, + validity, + array.len(), + options.version, + )?; + + let definition_levels_byte_length = buffer.len(); + + encode_plain(array, encode_options, &mut buffer); + + utils::build_plain_page( + buffer, + array.len(), + array.len(), + array.null_count(), + 0, + definition_levels_byte_length, + statistics.map(|x| x.serialize()), + type_, + options, + Encoding::Plain, + ) +} diff --git a/crates/polars-parquet/src/arrow/write/fixed_len_bytes.rs b/crates/polars-parquet/src/arrow/write/fixed_size_binary/mod.rs similarity index 79% rename from crates/polars-parquet/src/arrow/write/fixed_len_bytes.rs rename to crates/polars-parquet/src/arrow/write/fixed_size_binary/mod.rs index 9277b9c78a98..58f11adfa491 100644 --- a/crates/polars-parquet/src/arrow/write/fixed_len_bytes.rs +++ b/crates/polars-parquet/src/arrow/write/fixed_size_binary/mod.rs @@ -1,12 +1,13 @@ +mod basic; +mod nested; + use arrow::array::{Array, FixedSizeBinaryArray, PrimitiveArray}; use arrow::types::i256; -use polars_error::PolarsResult; +pub use basic::array_to_page; +pub use nested::array_to_page as nested_array_to_page; use super::binary::ord_binary; -use super::{utils, EncodeNullability, StatisticsOptions, WriteOptions}; -use crate::arrow::read::schema::is_nullable; -use crate::parquet::encoding::Encoding; -use crate::parquet::page::DataPage; +use super::{EncodeNullability, StatisticsOptions}; use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::FixedLenStatistics; @@ -27,44 +28,6 @@ pub(crate) fn encode_plain( } } -pub fn array_to_page( - array: &FixedSizeBinaryArray, - options: WriteOptions, - type_: PrimitiveType, - statistics: Option, -) -> PolarsResult { - let is_optional = is_nullable(&type_.field_info); - let encode_options = EncodeNullability::new(is_optional); - - let validity = array.validity(); - - let mut buffer = vec![]; - utils::write_def_levels( - &mut buffer, - is_optional, - validity, - array.len(), - options.version, - )?; - - let definition_levels_byte_length = buffer.len(); - - encode_plain(array, encode_options, &mut buffer); - - utils::build_plain_page( - buffer, - array.len(), - array.len(), - array.null_count(), - 0, - definition_levels_byte_length, - statistics.map(|x| x.serialize()), - type_, - options, - Encoding::Plain, - ) -} - pub(super) fn build_statistics( array: &FixedSizeBinaryArray, primitive_type: PrimitiveType, diff --git a/crates/polars-parquet/src/arrow/write/fixed_size_binary/nested.rs b/crates/polars-parquet/src/arrow/write/fixed_size_binary/nested.rs new file mode 100644 index 000000000000..81175cf5db18 --- /dev/null +++ b/crates/polars-parquet/src/arrow/write/fixed_size_binary/nested.rs @@ -0,0 +1,39 @@ +use arrow::array::{Array, FixedSizeBinaryArray}; +use polars_error::PolarsResult; + +use super::encode_plain; +use crate::parquet::page::DataPage; +use crate::parquet::schema::types::PrimitiveType; +use crate::parquet::statistics::FixedLenStatistics; +use crate::read::schema::is_nullable; +use crate::write::{nested, utils, EncodeNullability, Encoding, Nested, WriteOptions}; + +pub fn array_to_page( + array: &FixedSizeBinaryArray, + options: WriteOptions, + type_: PrimitiveType, + nested: &[Nested], + statistics: Option, +) -> PolarsResult { + let is_optional = is_nullable(&type_.field_info); + let encode_options = EncodeNullability::new(is_optional); + + let mut buffer = vec![]; + let (repetition_levels_byte_length, definition_levels_byte_length) = + nested::write_rep_and_def(options.version, nested, &mut buffer)?; + + encode_plain(array, encode_options, &mut buffer); + + utils::build_plain_page( + buffer, + nested::num_values(nested), + nested[0].len(), + array.null_count(), + repetition_levels_byte_length, + definition_levels_byte_length, + statistics.map(|x| x.serialize()), + type_, + options, + Encoding::Plain, + ) +} diff --git a/crates/polars-parquet/src/arrow/write/mod.rs b/crates/polars-parquet/src/arrow/write/mod.rs index 02f0165d04c7..17a342ac9d67 100644 --- a/crates/polars-parquet/src/arrow/write/mod.rs +++ b/crates/polars-parquet/src/arrow/write/mod.rs @@ -17,7 +17,7 @@ mod binview; mod boolean; mod dictionary; mod file; -mod fixed_len_bytes; +mod fixed_size_binary; mod nested; mod pages; mod primitive; @@ -528,7 +528,7 @@ pub fn array_to_page_simple( array.validity().cloned(), ); let statistics = if options.has_statistics() { - Some(fixed_len_bytes::build_statistics( + Some(fixed_size_binary::build_statistics( &array, type_.clone(), &options.statistics, @@ -536,7 +536,7 @@ pub fn array_to_page_simple( } else { None }; - fixed_len_bytes::array_to_page(&array, options, type_, statistics) + fixed_size_binary::array_to_page(&array, options, type_, statistics) }, ArrowDataType::Interval(IntervalUnit::DayTime) => { let array = array @@ -555,7 +555,7 @@ pub fn array_to_page_simple( array.validity().cloned(), ); let statistics = if options.has_statistics() { - Some(fixed_len_bytes::build_statistics( + Some(fixed_size_binary::build_statistics( &array, type_.clone(), &options.statistics, @@ -563,12 +563,12 @@ pub fn array_to_page_simple( } else { None }; - fixed_len_bytes::array_to_page(&array, options, type_, statistics) + fixed_size_binary::array_to_page(&array, options, type_, statistics) }, ArrowDataType::FixedSizeBinary(_) => { let array = array.as_any().downcast_ref().unwrap(); let statistics = if options.has_statistics() { - Some(fixed_len_bytes::build_statistics( + Some(fixed_size_binary::build_statistics( array, type_.clone(), &options.statistics, @@ -577,7 +577,7 @@ pub fn array_to_page_simple( None }; - fixed_len_bytes::array_to_page(array, options, type_, statistics) + fixed_size_binary::array_to_page(array, options, type_, statistics) }, ArrowDataType::Decimal256(precision, _) => { let precision = *precision; @@ -620,7 +620,7 @@ pub fn array_to_page_simple( } else if precision <= 38 { let size = decimal_length_from_precision(precision); let statistics = if options.has_statistics() { - let stats = fixed_len_bytes::build_statistics_decimal256_with_i128( + let stats = fixed_size_binary::build_statistics_decimal256_with_i128( array, type_.clone(), size, @@ -641,7 +641,7 @@ pub fn array_to_page_simple( values.into(), array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, type_, statistics) + fixed_size_binary::array_to_page(&array, options, type_, statistics) } else { let size = 32; let array = array @@ -649,7 +649,7 @@ pub fn array_to_page_simple( .downcast_ref::>() .unwrap(); let statistics = if options.has_statistics() { - let stats = fixed_len_bytes::build_statistics_decimal256( + let stats = fixed_size_binary::build_statistics_decimal256( array, type_.clone(), size, @@ -670,7 +670,7 @@ pub fn array_to_page_simple( array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, type_, statistics) + fixed_size_binary::array_to_page(&array, options, type_, statistics) } }, ArrowDataType::Decimal(precision, _) => { @@ -715,7 +715,7 @@ pub fn array_to_page_simple( let size = decimal_length_from_precision(precision); let statistics = if options.has_statistics() { - let stats = fixed_len_bytes::build_statistics_decimal( + let stats = fixed_size_binary::build_statistics_decimal( array, type_.clone(), size, @@ -736,7 +736,7 @@ pub fn array_to_page_simple( values.into(), array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, type_, statistics) + fixed_size_binary::array_to_page(&array, options, type_, statistics) } }, other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"), @@ -858,7 +858,7 @@ fn array_to_page_nested( let size = decimal_length_from_precision(precision); let statistics = if options.has_statistics() { - let stats = fixed_len_bytes::build_statistics_decimal( + let stats = fixed_size_binary::build_statistics_decimal( array, type_.clone(), size, @@ -879,7 +879,7 @@ fn array_to_page_nested( values.into(), array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, type_, statistics) + fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics) } }, Decimal256(precision, _) => { @@ -919,7 +919,7 @@ fn array_to_page_nested( } else if precision <= 38 { let size = decimal_length_from_precision(precision); let statistics = if options.has_statistics() { - let stats = fixed_len_bytes::build_statistics_decimal256_with_i128( + let stats = fixed_size_binary::build_statistics_decimal256_with_i128( array, type_.clone(), size, @@ -940,7 +940,7 @@ fn array_to_page_nested( values.into(), array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, type_, statistics) + fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics) } else { let size = 32; let array = array @@ -948,7 +948,7 @@ fn array_to_page_nested( .downcast_ref::>() .unwrap(); let statistics = if options.has_statistics() { - let stats = fixed_len_bytes::build_statistics_decimal256( + let stats = fixed_size_binary::build_statistics_decimal256( array, type_.clone(), size, @@ -969,7 +969,7 @@ fn array_to_page_nested( array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, type_, statistics) + fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics) } }, other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"), diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 564c20aadde2..fe299c0c6b17 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1,9 +1,10 @@ from __future__ import annotations +import decimal import io from datetime import datetime, time, timezone from decimal import Decimal -from typing import IO, TYPE_CHECKING, Any, Literal, cast +from typing import IO, TYPE_CHECKING, Any, Callable, Literal, cast import fsspec import numpy as np @@ -1995,6 +1996,49 @@ def test_nested_nonnullable_19158() -> None: assert_frame_equal(pl.read_parquet(f), pl.DataFrame(tbl)) +D = Decimal + + +@pytest.mark.parametrize("precision", range(1, 37, 2)) +@pytest.mark.parametrize( + "nesting", + [ + # Struct + lambda t: ([{"x": None}, None], pl.Struct({"x": t})), + lambda t: ([None, {"x": None}], pl.Struct({"x": t})), + lambda t: ([{"x": D("1.5")}, None], pl.Struct({"x": t})), + lambda t: ([{"x": D("1.5")}, {"x": D("4.8")}], pl.Struct({"x": t})), + # Array + lambda t: ([[None, None, D("8.2")], None], pl.Array(t, 3)), + lambda t: ([None, [None, D("8.9"), None]], pl.Array(t, 3)), + lambda t: ([[D("1.5"), D("3.7"), D("4.1")], None], pl.Array(t, 3)), + lambda t: ( + [[D("1.5"), D("3.7"), D("4.1")], [D("2.8"), D("5.2"), D("8.9")]], + pl.Array(t, 3), + ), + # List + lambda t: ([[None, D("8.2")], None], pl.List(t)), + lambda t: ([None, [D("8.9"), None]], pl.List(t)), + lambda t: ([[D("1.5"), D("4.1")], None], pl.List(t)), + lambda t: ([[D("1.5"), D("3.7"), D("4.1")], [D("2.8"), D("8.9")]], pl.List(t)), + ], +) +def test_decimal_precision_nested_roundtrip( + nesting: Callable[[pl.DataType], tuple[list[Any], pl.DataType]], + precision: int, +) -> None: + # Limit the context as to not disturb any other tests + with decimal.localcontext() as ctx: + ctx.prec = precision + + decimal_dtype = pl.Decimal(precision=precision) + values, dtype = nesting(decimal_dtype) + + df = pl.Series("a", values, dtype).to_frame() + + test_round_trip(df) + + @pytest.mark.parametrize("parallel", ["prefiltered", "columns", "row_groups", "auto"]) def test_conserve_sortedness( monkeypatch: Any, capfd: Any, parallel: pl.ParallelStrategy