From 601ed671e4f991bde6c90dbd678bb8129c4235d6 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 9 Apr 2025 13:00:24 -0700 Subject: [PATCH 01/18] format changes --- parquet/src/basic.rs | 39 +++++++- parquet/src/file/metadata/reader.rs | 5 +- parquet/src/format.rs | 150 +++++++++++++++------------- parquet/src/schema/types.rs | 2 +- 4 files changed, 124 insertions(+), 72 deletions(-) diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 99f122fe4c3e..3fd1fae74dc0 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -529,6 +529,8 @@ pub enum SortOrder { UNSIGNED, /// Comparison is undefined. UNDEFINED, + /// Use IEEE 754 total order. + TOTAL_ORDER, } impl SortOrder { @@ -549,17 +551,43 @@ pub enum ColumnOrder { /// Column uses the order defined by its logical or physical type /// (if there is no logical type), parquet-format 2.4.0+. TYPE_DEFINED_ORDER(SortOrder), + /// Column ordering to use for floating point types. + IEEE_754_TOTAL_ORDER, /// Undefined column order, means legacy behaviour before parquet-format 2.4.0. /// Sort order is always SIGNED. UNDEFINED, } impl ColumnOrder { - /// Returns sort order for a physical/logical type. + /// Returns the sort order for a physical/logical type. + /// + /// If `use_total_order` is `true` then IEEE 754 total order will be used for floating point + /// types. pub fn get_sort_order( logical_type: Option, converted_type: ConvertedType, physical_type: Type, + use_total_order: bool, + ) -> SortOrder { + // check for floating point types, then fall back to type defined order + match logical_type { + Some(LogicalType::Float16) if use_total_order => SortOrder::TOTAL_ORDER, + _ => match physical_type { + Type::FLOAT | Type::DOUBLE if use_total_order => SortOrder::TOTAL_ORDER, + _ => ColumnOrder::get_type_defined_sort_order( + logical_type, + converted_type, + physical_type, + ), + }, + } + } + + /// Returns the type defined sort order for a physical/logical type. + pub fn get_type_defined_sort_order( + logical_type: Option, + converted_type: ConvertedType, + physical_type: Type, ) -> SortOrder { // TODO: Should this take converted and logical type, for compatibility? match logical_type { @@ -647,6 +675,7 @@ impl ColumnOrder { pub fn sort_order(&self) -> SortOrder { match *self { ColumnOrder::TYPE_DEFINED_ORDER(order) => order, + ColumnOrder::IEEE_754_TOTAL_ORDER => SortOrder::TOTAL_ORDER, ColumnOrder::UNDEFINED => SortOrder::SIGNED, } } @@ -2134,7 +2163,11 @@ mod tests { fn check_sort_order(types: Vec, expected_order: SortOrder) { for tpe in types { assert_eq!( - ColumnOrder::get_sort_order(Some(tpe), ConvertedType::NONE, Type::BYTE_ARRAY), + ColumnOrder::get_type_defined_sort_order( + Some(tpe), + ConvertedType::NONE, + Type::BYTE_ARRAY + ), expected_order ); } @@ -2229,7 +2262,7 @@ mod tests { fn check_sort_order(types: Vec, expected_order: SortOrder) { for tpe in types { assert_eq!( - ColumnOrder::get_sort_order(None, tpe, Type::BYTE_ARRAY), + ColumnOrder::get_type_defined_sort_order(None, tpe, Type::BYTE_ARRAY), expected_order ); } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index aebf1a890621..b22c3bcce3f5 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -954,13 +954,16 @@ impl ParquetMetaDataReader { for (i, column) in schema_descr.columns().iter().enumerate() { match orders[i] { TColumnOrder::TYPEORDER(_) => { - let sort_order = ColumnOrder::get_sort_order( + let sort_order = ColumnOrder::get_type_defined_sort_order( column.logical_type(), column.converted_type(), column.physical_type(), ); res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); } + TColumnOrder::IEEE754TOTALORDER(_) => { + res.push(ColumnOrder::IEEE_754_TOTAL_ORDER) + } } } Ok(Some(res)) diff --git a/parquet/src/format.rs b/parquet/src/format.rs index 287d08b7a95c..f31db1bba737 100644 --- a/parquet/src/format.rs +++ b/parquet/src/format.rs @@ -1,5 +1,5 @@ //! See [`crate::file`] for easier to use APIs. -// Autogenerated by Thrift Compiler (0.20.0) +// Autogenerated by Thrift Compiler (0.21.0) // DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING #![allow(dead_code)] @@ -117,12 +117,12 @@ impl ConvertedType { /// a list is converted into an optional field containing a repeated field for its /// values pub const LIST: ConvertedType = ConvertedType(3); - /// an enum is converted into a BYTE_ARRAY field + /// an enum is converted into a binary field pub const ENUM: ConvertedType = ConvertedType(4); /// A decimal value. /// - /// This may be used to annotate BYTE_ARRAY or FIXED_LEN_BYTE_ARRAY primitive - /// types. The underlying byte array stores the unscaled value encoded as two's + /// This may be used to annotate binary or fixed primitive types. The + /// underlying byte array stores the unscaled value encoded as two's /// complement using big-endian byte order (the most significant byte is the /// zeroth element). The value of the decimal is the value * 10^{-scale}. /// @@ -185,7 +185,7 @@ impl ConvertedType { pub const JSON: ConvertedType = ConvertedType(19); /// An embedded BSON document /// - /// A BSON document embedded within a single BYTE_ARRAY column. + /// A BSON document embedded within a single BINARY column. pub const BSON: ConvertedType = ConvertedType(20); /// An interval of time /// @@ -288,9 +288,9 @@ impl From<&ConvertedType> for i32 { pub struct FieldRepetitionType(pub i32); impl FieldRepetitionType { - /// This field is required (can not be null) and each row has exactly 1 value. + /// This field is required (can not be null) and each record has exactly 1 value. pub const REQUIRED: FieldRepetitionType = FieldRepetitionType(0); - /// The field is optional (can be null) and each row has 0 or 1 values. + /// The field is optional (can be null) and each record has 0 or 1 values. pub const OPTIONAL: FieldRepetitionType = FieldRepetitionType(1); /// The field is repeated and can contain 0 or more values pub const REPEATED: FieldRepetitionType = FieldRepetitionType(2); @@ -379,15 +379,12 @@ impl Encoding { pub const DELTA_BYTE_ARRAY: Encoding = Encoding(7); /// Dictionary encoding: the ids are encoded using the RLE encoding pub const RLE_DICTIONARY: Encoding = Encoding(8); - /// Encoding for fixed-width data (FLOAT, DOUBLE, INT32, INT64, FIXED_LEN_BYTE_ARRAY). + /// Encoding for floating-point data. /// K byte-streams are created where K is the size in bytes of the data type. - /// The individual bytes of a value are scattered to the corresponding stream and + /// The individual bytes of an FP value are scattered to the corresponding stream and /// the streams are concatenated. /// This itself does not reduce the size of the data but can lead to better compression /// afterwards. - /// - /// Added in 2.8 for FLOAT and DOUBLE. - /// Support for INT32, INT64 and FIXED_LEN_BYTE_ARRAY added in 2.11. pub const BYTE_STREAM_SPLIT: Encoding = Encoding(9); pub const ENUM_VALUES: &'static [Self] = &[ Self::PLAIN, @@ -1263,7 +1260,7 @@ impl crate::thrift::TSerializable for NullType { /// To maintain forward-compatibility in v1, implementations using this logical /// type must also set scale and precision on the annotated SchemaElement. /// -/// Allowed for physical types: INT32, INT64, FIXED_LEN_BYTE_ARRAY, and BYTE_ARRAY. +/// Allowed for physical types: INT32, INT64, FIXED, and BINARY #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DecimalType { pub scale: i32, @@ -1760,7 +1757,7 @@ impl crate::thrift::TSerializable for IntType { /// Embedded JSON logical type annotation /// -/// Allowed for physical types: BYTE_ARRAY +/// Allowed for physical types: BINARY #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct JsonType { } @@ -1800,7 +1797,7 @@ impl crate::thrift::TSerializable for JsonType { /// Embedded BSON logical type annotation /// -/// Allowed for physical types: BYTE_ARRAY +/// Allowed for physical types: BINARY #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct BsonType { } @@ -2081,13 +2078,9 @@ impl crate::thrift::TSerializable for LogicalType { // /// Represents a element inside a schema definition. -/// -/// - if it is a group (inner node) then type is undefined and num_children -/// is defined -/// - if it is a primitive type (leaf) then type is defined and -/// num_children is undefined -/// -/// Note the nodes are listed in depth first traversal order. +/// - if it is a group (inner node) then type is undefined and num_children is defined +/// - if it is a primitive type (leaf) then type is defined and num_children is undefined +/// the nodes are listed in depth first traversal order. #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct SchemaElement { /// Data type for this field. Not set if the current element is a non-leaf node @@ -2290,12 +2283,7 @@ impl crate::thrift::TSerializable for SchemaElement { /// Data page header #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DataPageHeader { - /// Number of values, including NULLs, in this data page. - /// - /// If a OffsetIndex is present, a page must begin at a row - /// boundary (repetition_level = 0). Otherwise, pages may begin - /// within a row (repetition_level > 0). - /// + /// Number of values, including NULLs, in this data page. * pub num_values: i32, /// Encoding used for this data page * pub encoding: Encoding, @@ -2539,10 +2527,7 @@ pub struct DataPageHeaderV2 { /// Number of NULL values, in this data page. /// Number of non-null = num_values - num_nulls which is also the number of values in the data section * pub num_nulls: i32, - /// Number of rows in this data page. Every page must begin at a - /// row boundary (repetition_level = 0): rows must **not** be - /// split across page boundaries when using V2 data pages. - /// + /// Number of rows in this data page. which means pages change on record boundaries (r = 0) * pub num_rows: i32, /// Encoding used for data in this page * pub encoding: Encoding, @@ -3359,10 +3344,10 @@ impl crate::thrift::TSerializable for KeyValue { // SortingColumn // -/// Sort order within a RowGroup of a leaf column +/// Wrapper struct to specify sort order #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct SortingColumn { - /// The ordinal position of the column (in this row group) * + /// The column index (in this row group) * pub column_idx: i32, /// If true, indicates this column is sorted in descending order. * pub descending: bool, @@ -4050,19 +4035,11 @@ pub struct ColumnChunk { /// metadata. This path is relative to the current file. /// pub file_path: Option, - /// Deprecated: Byte offset in file_path to the ColumnMetaData - /// - /// Past use of this field has been inconsistent, with some implementations - /// using it to point to the ColumnMetaData and some using it to point to - /// the first page in the column chunk. In many cases, the ColumnMetaData at this - /// location is wrong. This field is now deprecated and should not be used. - /// Writers should set this field to 0 if no ColumnMetaData has been written outside - /// the footer. + /// Byte offset in file_path to the ColumnMetaData * pub file_offset: i64, - /// Column metadata for this chunk. Some writers may also replicate this at the - /// location pointed to by file_path/file_offset. - /// Note: while marked as optional, this field is in fact required by most major - /// Parquet implementations. As such, writers MUST populate this field. + /// Column metadata for this chunk. This is the same content as what is at + /// file_path/file_offset. Having it here has it replicated in the file + /// metadata. /// pub meta_data: Option, /// File offset of ColumnChunk's OffsetIndex * @@ -4424,6 +4401,44 @@ impl crate::thrift::TSerializable for TypeDefinedOrder { } } +// +// IEEE754TotalOrder +// + +/// Empty struct to signal IEEE 754 total order for floating point types +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct IEEE754TotalOrder { +} + +impl IEEE754TotalOrder { + pub fn new() -> IEEE754TotalOrder { + IEEE754TotalOrder {} + } +} + +impl crate::thrift::TSerializable for IEEE754TotalOrder { + fn read_from_in_protocol(i_prot: &mut T) -> thrift::Result { + i_prot.read_struct_begin()?; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + i_prot.skip(field_ident.field_type)?; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = IEEE754TotalOrder {}; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut T) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("IEEE754TotalOrder"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + // // ColumnOrder // @@ -4431,6 +4446,7 @@ impl crate::thrift::TSerializable for TypeDefinedOrder { #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub enum ColumnOrder { TYPEORDER(TypeDefinedOrder), + IEEE754TOTALORDER(IEEE754TotalOrder), } impl crate::thrift::TSerializable for ColumnOrder { @@ -4452,6 +4468,13 @@ impl crate::thrift::TSerializable for ColumnOrder { } received_field_count += 1; }, + 2 => { + let val = IEEE754TotalOrder::read_from_in_protocol(i_prot)?; + if ret.is_none() { + ret = Some(ColumnOrder::IEEE754TOTALORDER(val)); + } + received_field_count += 1; + }, _ => { i_prot.skip(field_ident.field_type)?; received_field_count += 1; @@ -4491,6 +4514,11 @@ impl crate::thrift::TSerializable for ColumnOrder { f.write_to_out_protocol(o_prot)?; o_prot.write_field_end()?; }, + ColumnOrder::IEEE754TOTALORDER(ref f) => { + o_prot.write_field_begin(&TFieldIdentifier::new("IEEE_754_TOTAL_ORDER", TType::Struct, 2))?; + f.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + }, } o_prot.write_field_stop()?; o_prot.write_struct_end() @@ -4508,9 +4536,8 @@ pub struct PageLocation { /// Size of the page, including header. Sum of compressed_page_size and header /// length pub compressed_page_size: i32, - /// Index within the RowGroup of the first row of the page. When an - /// OffsetIndex is present, pages must begin on row boundaries - /// (repetition_level = 0). + /// Index within the RowGroup of the first row of the page; this means pages + /// change on record boundaries (r = 0). pub first_row_index: i64, } @@ -4587,15 +4614,10 @@ impl crate::thrift::TSerializable for PageLocation { // OffsetIndex // -/// Optional offsets for each data page in a ColumnChunk. -/// -/// Forms part of the page index, along with ColumnIndex. -/// -/// OffsetIndex may be present even if ColumnIndex is not. #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct OffsetIndex { /// PageLocations, ordered by increasing PageLocation.offset. It is required - /// that page_locations\[i\].first_row_index < page_locations\[i+1\].first_row_index. + /// that page_locations[i].first_row_index < page_locations[i+1].first_row_index. pub page_locations: Vec, /// Unencoded/uncompressed size for BYTE_ARRAY types. /// @@ -4687,27 +4709,21 @@ impl crate::thrift::TSerializable for OffsetIndex { // ColumnIndex // -/// Optional statistics for each data page in a ColumnChunk. -/// -/// Forms part the page index, along with OffsetIndex. -/// -/// If this structure is present, OffsetIndex must also be present. -/// -/// For each field in this structure, ``\[i\] refers to the page at -/// OffsetIndex.page_locations\[i\] +/// Description for ColumnIndex. +/// Each [i] refers to the page at OffsetIndex.page_locations[i] #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct ColumnIndex { /// A list of Boolean values to determine the validity of the corresponding /// min and max values. If true, a page contains only null values, and writers /// have to set the corresponding entries in min_values and max_values to - /// byte\[0\], so that all lists have the same length. If false, the + /// byte[0], so that all lists have the same length. If false, the /// corresponding entries in min_values and max_values must be valid. pub null_pages: Vec, /// Two lists containing lower and upper bounds for the values of each page /// determined by the ColumnOrder of the column. These may be the actual /// minimum and maximum values found on a page, but can also be (more compact) /// values that do not exist on a page. For example, instead of storing ""Blart - /// Versenwald III", a writer may set min_values\[i\]="B", max_values\[i\]="C". + /// Versenwald III", a writer may set min_values[i]="B", max_values[i]="C". /// Such more compact values must still be valid values within the column's /// logical type. Readers must make sure that list entries are populated before /// using them by inspecting null_pages. @@ -4715,7 +4731,7 @@ pub struct ColumnIndex { pub max_values: Vec>, /// Stores whether both min_values and max_values are ordered and if so, in /// which direction. This allows readers to perform binary searches in both - /// lists. Readers cannot assume that max_values\[i\] <= min_values\[i+1\], even + /// lists. Readers cannot assume that max_values[i] <= min_values[i+1], even /// if the lists are ordered. pub boundary_order: BoundaryOrder, /// A list containing the number of null values for each page * @@ -5196,7 +5212,7 @@ pub struct FileMetaData { /// Optional key/value metadata * pub key_value_metadata: Option>, /// String for application that wrote this file. This should be in the format - /// `` version `` (build ``). + /// version (build ). /// e.g. impala version 1.0 (build 6cf94d29b2b7115df4de2c06e2ab4326d721eb55) /// pub created_by: Option, diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 68492e19f437..6486239b4490 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -941,7 +941,7 @@ impl ColumnDescriptor { /// Returns the sort order for this column pub fn sort_order(&self) -> SortOrder { - ColumnOrder::get_sort_order( + ColumnOrder::get_type_defined_sort_order( self.logical_type(), self.converted_type(), self.physical_type(), From b91a2a138656d5cba384431a0b1651d148399440 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 9 Apr 2025 13:04:40 -0700 Subject: [PATCH 02/18] change format revision --- parquet/regen.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/regen.sh b/parquet/regen.sh index 39999c7872cd..653e60ce9bcc 100755 --- a/parquet/regen.sh +++ b/parquet/regen.sh @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -REVISION=5b564f3c47679526cf72e54f207013f28f53acc4 +REVISION=74bd03d63350a6a87a0e2facad5292567185d4ff SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]:-$0}")" && pwd)" From ba4cda39a03ae9d20cb786be7af4e4993631b3c4 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 9 Apr 2025 13:28:34 -0700 Subject: [PATCH 03/18] add option to use total order to WriterProperties --- parquet/src/file/properties.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index a69a854df608..8ac4e65b3cdc 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -63,6 +63,8 @@ pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option = None; pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false; /// Default values for [`WriterProperties::coerce_types`] pub const DEFAULT_COERCE_TYPES: bool = false; +/// Default values for [`WriterProperties::ieee754_total_order`] +pub const DEFAULT_IEEE754_TOTAL_ORDER: bool = false; /// Parquet writer version. /// @@ -171,6 +173,7 @@ pub struct WriterProperties { column_index_truncate_length: Option, statistics_truncate_length: Option, coerce_types: bool, + ieee754_total_order: bool, #[cfg(feature = "encryption")] pub(crate) file_encryption_properties: Option, } @@ -296,6 +299,11 @@ impl WriterProperties { self.coerce_types } + /// Returns `true` if IEEE 754 total order should be used for floating point statistics. + pub fn ieee754_total_order(&self) -> bool { + self.ieee754_total_order + } + /// Returns encoding for a data page, when dictionary encoding is enabled. /// This is not configurable. #[inline] @@ -402,6 +410,7 @@ pub struct WriterPropertiesBuilder { column_index_truncate_length: Option, statistics_truncate_length: Option, coerce_types: bool, + ieee754_total_order: bool, #[cfg(feature = "encryption")] file_encryption_properties: Option, } @@ -426,6 +435,7 @@ impl WriterPropertiesBuilder { column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH, coerce_types: DEFAULT_COERCE_TYPES, + ieee754_total_order: DEFAULT_IEEE754_TOTAL_ORDER, #[cfg(feature = "encryption")] file_encryption_properties: None, } @@ -450,6 +460,7 @@ impl WriterPropertiesBuilder { column_index_truncate_length: self.column_index_truncate_length, statistics_truncate_length: self.statistics_truncate_length, coerce_types: self.coerce_types, + ieee754_total_order: self.ieee754_total_order, #[cfg(feature = "encryption")] file_encryption_properties: self.file_encryption_properties, } @@ -825,6 +836,20 @@ impl WriterPropertiesBuilder { self } + /// Should statistics for floating point types use IEEE 754 total order. + /// + /// Setting this to `true` will use the statistics ordering specified by + /// [PARQUET-2249]. This removes much of the ambiguity present in these + /// statistics when using the Parquet type defined sort ordering. In particular, + /// `NaN` (and `-NaN`) may now appear in min/max fields. Readers that have not + /// yet implemented this ordering should simply ignore the statistics. + /// + /// [PARQUET-2249]: https://github.com/apache/parquet-format/pull/221 + pub fn set_ieee754_total_order(mut self, ieee754_total_order: bool) -> Self { + self.ieee754_total_order = ieee754_total_order; + self + } + /// Sets FileEncryptionProperties. #[cfg(feature = "encryption")] pub fn with_file_encryption_properties( From d39d89ee13e0d9abc2307e2f85df140d08b17767 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 9 Apr 2025 14:08:54 -0700 Subject: [PATCH 04/18] write correct ColumnOrders --- parquet/src/file/metadata/writer.rs | 45 +++++++++++++++++++++++++---- parquet/src/file/writer.rs | 1 + 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index c1fc41314415..f89e38434881 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::basic::SortOrder; #[cfg(feature = "encryption")] use crate::encryption::encrypt::{encrypt_object, encrypt_object_to_vec, FileEncryptor}; #[cfg(feature = "encryption")] @@ -27,7 +28,10 @@ use crate::file::page_index::index::Index; use crate::file::writer::{get_file_magic, TrackedWrite}; #[cfg(feature = "encryption")] use crate::format::{AesGcmV1, ColumnCryptoMetaData, EncryptionAlgorithm}; -use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex, RowGroup}; +use crate::format::{ + ColumnChunk, ColumnIndex, ColumnOrder, FileMetaData, IEEE754TotalOrder, OffsetIndex, RowGroup, + TypeDefinedOrder, +}; use crate::schema::types; use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr}; use crate::thrift::TSerializable; @@ -49,6 +53,7 @@ pub(crate) struct ThriftMetadataWriter<'a, W: Write> { created_by: Option, object_writer: MetadataObjectWriter, writer_version: i32, + ieee754_total_order: bool, } impl<'a, W: Write> ThriftMetadataWriter<'a, W> { @@ -125,12 +130,23 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { } // We only include ColumnOrder for leaf nodes. - // Currently only supported ColumnOrder is TypeDefinedOrder so we set this - // for all leaf nodes. // Even if the column has an undefined sort order, such as INTERVAL, this // is still technically the defined TYPEORDER so it should still be set. - let column_orders = (0..self.schema_descr.num_columns()) - .map(|_| crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {})) + let column_orders: Vec = self + .schema_descr + .columns() + .iter() + .map(|c| { + match crate::basic::ColumnOrder::get_sort_order( + c.logical_type(), + c.converted_type(), + c.physical_type(), + self.ieee754_total_order, + ) { + SortOrder::TOTAL_ORDER => ColumnOrder::IEEE754TOTALORDER(IEEE754TotalOrder {}), + _ => ColumnOrder::TYPEORDER(TypeDefinedOrder {}), + } + }) .collect(); // This field is optional, perhaps in cases where no min/max fields are set // in any Statistics or ColumnIndex object in the whole file. @@ -183,6 +199,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { row_groups: Vec, created_by: Option, writer_version: i32, + ieee754_total_order: bool, ) -> Self { Self { buf, @@ -195,6 +212,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { created_by, object_writer: Default::default(), writer_version, + ieee754_total_order, } } @@ -298,6 +316,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { pub struct ParquetMetaDataWriter<'a, W: Write> { buf: TrackedWrite, metadata: &'a ParquetMetaData, + ieee754_total_order: bool, } impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { @@ -319,7 +338,20 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { /// /// See example on the struct level documentation pub fn new_with_tracked(buf: TrackedWrite, metadata: &'a ParquetMetaData) -> Self { - Self { buf, metadata } + Self { + buf, + metadata, + ieee754_total_order: false, + } + } + + /// Sets whether to use IEEE 754 total order for statistics. + /// + /// See [`crate::file::properties::WriterBuilderProperties::set_ieee754_total_order`] for + /// more information. + pub fn with_ieee754_total_order(mut self, ieee754_total_order: bool) -> Self { + self.ieee754_total_order = ieee754_total_order; + self } /// Write the metadata to the buffer @@ -349,6 +381,7 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { row_groups, created_by, file_metadata.version(), + self.ieee754_total_order, ); encoder = encoder.with_column_indexes(&column_indexes); encoder = encoder.with_offset_indexes(&offset_indexes); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 18e357ebc2b9..e22acb2f7f32 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -352,6 +352,7 @@ impl SerializedFileWriter { row_groups, Some(self.props.created_by().to_string()), self.props.writer_version().as_num(), + self.props.ieee754_total_order(), ); #[cfg(feature = "encryption")] From c6f93295010979689f750c17707660454d276bec Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 9 Apr 2025 15:12:14 -0700 Subject: [PATCH 05/18] use new sort order when requested --- parquet/src/column/writer/encoder.rs | 53 +++++++++--- parquet/src/column/writer/mod.rs | 117 ++++++++++++++++++++++----- 2 files changed, 138 insertions(+), 32 deletions(-) diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 7371c72a5896..294c317d3cdb 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -133,13 +133,18 @@ pub struct ColumnValueEncoderImpl { max_value: Option, bloom_filter: Option, variable_length_bytes: Option, + ieee754_total_order: bool, } impl ColumnValueEncoderImpl { fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> { match value_indices { - Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])), - None => get_min_max(&self.descr, values.iter()), + Some(indices) => get_min_max( + &self.descr, + indices.iter().map(|x| &values[*x]), + self.ieee754_total_order, + ), + None => get_min_max(&self.descr, values.iter(), self.ieee754_total_order), } } @@ -149,8 +154,18 @@ impl ColumnValueEncoderImpl { && self.descr.converted_type() != ConvertedType::INTERVAL { if let Some((min, max)) = self.min_max(slice, None) { - update_min(&self.descr, &min, &mut self.min_value); - update_max(&self.descr, &max, &mut self.max_value); + update_min( + &self.descr, + &min, + &mut self.min_value, + self.ieee754_total_order, + ); + update_max( + &self.descr, + &max, + &mut self.max_value, + self.ieee754_total_order, + ); } if let Some(var_bytes) = T::T::variable_length_bytes(slice) { @@ -211,6 +226,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { min_value: None, max_value: None, variable_length_bytes: None, + ieee754_total_order: props.ieee754_total_order(), }) } @@ -309,14 +325,18 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { } } -fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)> +fn get_min_max<'a, T, I>( + descr: &ColumnDescriptor, + mut iter: I, + ieee754_total_order: bool, +) -> Option<(T, T)> where T: ParquetValueType + 'a, I: Iterator, { let first = loop { let next = iter.next()?; - if !is_nan(descr, next) { + if ieee754_total_order || !is_nan(descr, next) { break next; } }; @@ -324,13 +344,13 @@ where let mut min = first; let mut max = first; for val in iter { - if is_nan(descr, val) { + if !ieee754_total_order && is_nan(descr, val) { continue; } - if compare_greater(descr, min, val) { + if compare_greater(descr, min, val, ieee754_total_order) { min = val; } - if compare_greater(descr, val, max) { + if compare_greater(descr, val, max, ieee754_total_order) { max = val; } } @@ -343,14 +363,23 @@ where // // For max, it has similar logic but will be written as 0.0 // (positive zero) - let min = replace_zero(min, descr, -0.0); - let max = replace_zero(max, descr, 0.0); + let min = replace_zero(min, descr, -0.0, ieee754_total_order); + let max = replace_zero(max, descr, 0.0, ieee754_total_order); Some((min, max)) } #[inline] -fn replace_zero(val: &T, descr: &ColumnDescriptor, replace: f32) -> T { +fn replace_zero( + val: &T, + descr: &ColumnDescriptor, + replace: f32, + ieee754_total_order: bool, +) -> T { + if ieee754_total_order { + return val.clone(); + } + match T::PHYSICAL_TYPE { Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => { T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap() diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 02570d3f3c69..1bc4927493c2 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -22,10 +22,13 @@ use half::f16; use crate::bloom_filter::Sbbf; use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex}; +use std::cmp::Ordering; use std::collections::{BTreeSet, VecDeque}; use std::str; -use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type}; +use crate::basic::{ + ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, PageType, SortOrder, Type, +}; use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues}; use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; @@ -466,10 +469,20 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }; if let Some(min) = min { - update_min(&self.descr, min, &mut self.column_metrics.min_column_value); + update_min( + &self.descr, + min, + &mut self.column_metrics.min_column_value, + self.props.ieee754_total_order(), + ); } if let Some(max) = max { - update_max(&self.descr, max, &mut self.column_metrics.max_column_value); + update_max( + &self.descr, + max, + &mut self.column_metrics.max_column_value, + self.props.ieee754_total_order(), + ); } // We can only set the distinct count if there are no other writes @@ -774,6 +787,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Ok(()) } + /// Evaluate `a > b` according to underlying logical type. + fn compare_greater(&self, a: &E::T, b: &E::T) -> bool { + compare_greater(&self.descr, a, b, self.props.ieee754_total_order()) + } + /// Update the column index and offset index when adding the data page fn update_column_offset_index( &mut self, @@ -806,8 +824,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max { if self.data_page_boundary_ascending { // If last min/max are greater than new min/max then not ascending anymore - let not_ascending = compare_greater(&self.descr, last_min, new_min) - || compare_greater(&self.descr, last_max, new_max); + let not_ascending = self.compare_greater(last_min, new_min) + || self.compare_greater(last_max, new_max); if not_ascending { self.data_page_boundary_ascending = false; } @@ -815,8 +833,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { if self.data_page_boundary_descending { // If new min/max are greater than last min/max then not descending anymore - let not_descending = compare_greater(&self.descr, new_min, last_min) - || compare_greater(&self.descr, new_max, last_max); + let not_descending = self.compare_greater(new_min, last_min) + || self.compare_greater(new_max, last_max); if not_descending { self.data_page_boundary_descending = false; } @@ -963,8 +981,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { let page_statistics = match (values_data.min_value, values_data.max_value) { (Some(min), Some(max)) => { // Update chunk level statistics - update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); - update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); + update_min( + &self.descr, + &min, + &mut self.column_metrics.min_column_value, + self.props.ieee754_total_order(), + ); + update_max( + &self.descr, + &max, + &mut self.column_metrics.max_column_value, + self.props.ieee754_total_order(), + ); (self.statistics_enabled == EnabledStatistics::Page).then_some( ValueStatistics::new( @@ -1350,12 +1378,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } -fn update_min(descr: &ColumnDescriptor, val: &T, min: &mut Option) { - update_stat::(descr, val, min, |cur| compare_greater(descr, cur, val)) +fn update_min( + descr: &ColumnDescriptor, + val: &T, + min: &mut Option, + ieee754_total_order: bool, +) { + update_stat::(descr, val, min, |cur| { + compare_greater(descr, cur, val, ieee754_total_order) + }) } -fn update_max(descr: &ColumnDescriptor, val: &T, max: &mut Option) { - update_stat::(descr, val, max, |cur| compare_greater(descr, val, cur)) +fn update_max( + descr: &ColumnDescriptor, + val: &T, + max: &mut Option, + ieee754_total_order: bool, +) { + update_stat::(descr, val, max, |cur| { + compare_greater(descr, val, cur, ieee754_total_order) + }) } #[inline] @@ -1394,7 +1436,12 @@ fn update_stat( } /// Evaluate `a > b` according to underlying logical type. -fn compare_greater(descr: &ColumnDescriptor, a: &T, b: &T) -> bool { +fn compare_greater( + descr: &ColumnDescriptor, + a: &T, + b: &T, + ieee754_total_order: bool, +) -> bool { if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() { if !is_signed { // need to compare unsigned @@ -1430,12 +1477,42 @@ fn compare_greater(descr: &ColumnDescriptor, a: &T, b: &T) }; }; - if let Some(LogicalType::Float16) = descr.logical_type() { - let a = a.as_bytes(); - let a = f16::from_le_bytes([a[0], a[1]]); - let b = b.as_bytes(); - let b = f16::from_le_bytes([b[0], b[1]]); - return a > b; + if ieee754_total_order { + if ColumnOrder::get_sort_order( + descr.logical_type(), + descr.converted_type(), + descr.physical_type(), + true, + ) == SortOrder::TOTAL_ORDER + { + if let Some(LogicalType::Float16) = descr.logical_type() { + let a = a.as_bytes(); + let a = f16::from_le_bytes([a[0], a[1]]); + let b = b.as_bytes(); + let b = f16::from_le_bytes([b[0], b[1]]); + return a.total_cmp(&b) == Ordering::Greater; + } + + if descr.physical_type() == Type::FLOAT { + let a = f32::from_le_bytes(a.as_bytes().try_into().unwrap()); + let b = f32::from_le_bytes(b.as_bytes().try_into().unwrap()); + return a.total_cmp(&b) == Ordering::Greater; + } + + if descr.physical_type() == Type::DOUBLE { + let a = f64::from_le_bytes(a.as_bytes().try_into().unwrap()); + let b = f64::from_le_bytes(b.as_bytes().try_into().unwrap()); + return a.total_cmp(&b) == Ordering::Greater; + } + } + } else { + if let Some(LogicalType::Float16) = descr.logical_type() { + let a = a.as_bytes(); + let a = f16::from_le_bytes([a[0], a[1]]); + let b = b.as_bytes(); + let b = f16::from_le_bytes([b[0], b[1]]); + return a > b; + } } a > b From 20fb7ab69eabaee4be11735fb1df85d4f97f9268 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 9 Apr 2025 16:05:14 -0700 Subject: [PATCH 06/18] get proper column ordering from ColumnDescriptor --- parquet/src/column/writer/mod.rs | 5 ++++- parquet/src/schema/types.rs | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 1bc4927493c2..8068c1352f48 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -1163,7 +1163,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .set_dictionary_page_offset(dict_page_offset); if self.statistics_enabled != EnabledStatistics::None { - let backwards_compatible_min_max = self.descr.sort_order().is_signed(); + let backwards_compatible_min_max = self + .descr + .sort_order(self.props.ieee754_total_order()) + .is_signed(); let statistics = ValueStatistics::::new( self.column_metrics.min_column_value.clone(), diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 6486239b4490..dcbb6625d5a3 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -940,11 +940,12 @@ impl ColumnDescriptor { } /// Returns the sort order for this column - pub fn sort_order(&self) -> SortOrder { - ColumnOrder::get_type_defined_sort_order( + pub fn sort_order(&self, ieee754_total_order: bool) -> SortOrder { + ColumnOrder::get_sort_order( self.logical_type(), self.converted_type(), self.physical_type(), + ieee754_total_order, ) } } From 631bea0c62a7d7c6c63fade81448659b595ebd48 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 9 Apr 2025 16:46:54 -0700 Subject: [PATCH 07/18] simplify how sort order is communicated --- parquet/src/column/writer/encoder.rs | 64 +++++--------- parquet/src/column/writer/mod.rs | 126 +++++++++++++-------------- 2 files changed, 84 insertions(+), 106 deletions(-) diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 294c317d3cdb..fa10f385f5df 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -18,7 +18,7 @@ use bytes::Bytes; use half::f16; -use crate::basic::{ConvertedType, Encoding, LogicalType, Type}; +use crate::basic::{ConvertedType, Encoding, LogicalType, SortOrder, Type}; use crate::bloom_filter::Sbbf; use crate::column::writer::{ compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min, @@ -28,7 +28,9 @@ use crate::data_type::DataType; use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; use crate::errors::{ParquetError, Result}; use crate::file::properties::{EnabledStatistics, WriterProperties}; -use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; +use crate::schema::types::ColumnDescPtr; + +use super::OrderedColumnDescriptor; /// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`] pub trait ColumnValues { @@ -126,25 +128,20 @@ pub trait ColumnValueEncoder { pub struct ColumnValueEncoderImpl { encoder: Box>, dict_encoder: Option>, - descr: ColumnDescPtr, + descr: OrderedColumnDescriptor, num_values: usize, statistics_enabled: EnabledStatistics, min_value: Option, max_value: Option, bloom_filter: Option, variable_length_bytes: Option, - ieee754_total_order: bool, } impl ColumnValueEncoderImpl { fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> { match value_indices { - Some(indices) => get_min_max( - &self.descr, - indices.iter().map(|x| &values[*x]), - self.ieee754_total_order, - ), - None => get_min_max(&self.descr, values.iter(), self.ieee754_total_order), + Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])), + None => get_min_max(&self.descr, values.iter()), } } @@ -154,18 +151,8 @@ impl ColumnValueEncoderImpl { && self.descr.converted_type() != ConvertedType::INTERVAL { if let Some((min, max)) = self.min_max(slice, None) { - update_min( - &self.descr, - &min, - &mut self.min_value, - self.ieee754_total_order, - ); - update_max( - &self.descr, - &max, - &mut self.max_value, - self.ieee754_total_order, - ); + update_min(&self.descr, &min, &mut self.min_value); + update_max(&self.descr, &max, &mut self.max_value); } if let Some(var_bytes) = T::T::variable_length_bytes(slice) { @@ -216,17 +203,18 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp)) .transpose()?; + let descr = OrderedColumnDescriptor::new(descr.clone(), props.ieee754_total_order()); + Ok(Self { encoder, dict_encoder, - descr: descr.clone(), + descr, num_values: 0, statistics_enabled, bloom_filter, min_value: None, max_value: None, variable_length_bytes: None, - ieee754_total_order: props.ieee754_total_order(), }) } @@ -325,18 +313,15 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { } } -fn get_min_max<'a, T, I>( - descr: &ColumnDescriptor, - mut iter: I, - ieee754_total_order: bool, -) -> Option<(T, T)> +fn get_min_max<'a, T, I>(descr: &OrderedColumnDescriptor, mut iter: I) -> Option<(T, T)> where T: ParquetValueType + 'a, I: Iterator, { + let ieee754_total_order = descr.sort_order == SortOrder::TOTAL_ORDER; let first = loop { let next = iter.next()?; - if ieee754_total_order || !is_nan(descr, next) { + if ieee754_total_order || !is_nan(&descr.descr, next) { break next; } }; @@ -344,13 +329,13 @@ where let mut min = first; let mut max = first; for val in iter { - if !ieee754_total_order && is_nan(descr, val) { + if !ieee754_total_order && is_nan(&descr.descr, val) { continue; } - if compare_greater(descr, min, val, ieee754_total_order) { + if compare_greater(descr, min, val) { min = val; } - if compare_greater(descr, val, max, ieee754_total_order) { + if compare_greater(descr, val, max) { max = val; } } @@ -363,20 +348,15 @@ where // // For max, it has similar logic but will be written as 0.0 // (positive zero) - let min = replace_zero(min, descr, -0.0, ieee754_total_order); - let max = replace_zero(max, descr, 0.0, ieee754_total_order); + let min = replace_zero(min, descr, -0.0); + let max = replace_zero(max, descr, 0.0); Some((min, max)) } #[inline] -fn replace_zero( - val: &T, - descr: &ColumnDescriptor, - replace: f32, - ieee754_total_order: bool, -) -> T { - if ieee754_total_order { +fn replace_zero(val: &T, descr: &OrderedColumnDescriptor, replace: f32) -> T { + if descr.sort_order == SortOrder::TOTAL_ORDER { return val.clone(); } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 8068c1352f48..52f8c3d7164e 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -326,13 +326,52 @@ impl ColumnMetrics { } } +// ColumnDescriptor that knows how it should be sorted +struct OrderedColumnDescriptor { + descr: ColumnDescPtr, + sort_order: SortOrder, +} + +impl OrderedColumnDescriptor { + fn new(descr: ColumnDescPtr, ieee754_total_order: bool) -> Self { + let sort_order = descr.sort_order(ieee754_total_order); + Self { descr, sort_order } + } + + // add some pass-through methods for convenience + #[inline] + fn max_def_level(&self) -> i16 { + self.descr.max_def_level() + } + + #[inline] + fn max_rep_level(&self) -> i16 { + self.descr.max_rep_level() + } + + #[inline] + fn converted_type(&self) -> ConvertedType { + self.descr.converted_type() + } + + #[inline] + fn logical_type(&self) -> Option { + self.descr.logical_type() + } + + #[inline] + fn physical_type(&self) -> Type { + self.descr.physical_type() + } +} + /// Typed column writer for a primitive column. pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl>; /// Generic column writer for a primitive column. pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> { // Column writer properties - descr: ColumnDescPtr, + descr: OrderedColumnDescriptor, props: WriterPropertiesPtr, statistics_enabled: EnabledStatistics, @@ -408,6 +447,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { _ => None, }; + let descr = OrderedColumnDescriptor::new(descr, props.ieee754_total_order()); + Self { descr, props, @@ -469,20 +510,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }; if let Some(min) = min { - update_min( - &self.descr, - min, - &mut self.column_metrics.min_column_value, - self.props.ieee754_total_order(), - ); + update_min(&self.descr, min, &mut self.column_metrics.min_column_value); } if let Some(max) = max { - update_max( - &self.descr, - max, - &mut self.column_metrics.max_column_value, - self.props.ieee754_total_order(), - ); + update_max(&self.descr, max, &mut self.column_metrics.max_column_value); } // We can only set the distinct count if there are no other writes @@ -610,7 +641,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// Returns a reference to a [`ColumnDescPtr`] pub fn get_descriptor(&self) -> &ColumnDescPtr { - &self.descr + &self.descr.descr } /// Finalizes writes and closes the column writer. @@ -787,11 +818,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Ok(()) } - /// Evaluate `a > b` according to underlying logical type. - fn compare_greater(&self, a: &E::T, b: &E::T) -> bool { - compare_greater(&self.descr, a, b, self.props.ieee754_total_order()) - } - /// Update the column index and offset index when adding the data page fn update_column_offset_index( &mut self, @@ -824,8 +850,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max { if self.data_page_boundary_ascending { // If last min/max are greater than new min/max then not ascending anymore - let not_ascending = self.compare_greater(last_min, new_min) - || self.compare_greater(last_max, new_max); + let not_ascending = compare_greater(&self.descr, last_min, new_min) + || compare_greater(&self.descr, last_max, new_max); if not_ascending { self.data_page_boundary_ascending = false; } @@ -833,8 +859,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { if self.data_page_boundary_descending { // If new min/max are greater than last min/max then not descending anymore - let not_descending = self.compare_greater(new_min, last_min) - || self.compare_greater(new_max, last_max); + let not_descending = compare_greater(&self.descr, new_min, last_min) + || compare_greater(&self.descr, new_max, last_max); if not_descending { self.data_page_boundary_descending = false; } @@ -981,18 +1007,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { let page_statistics = match (values_data.min_value, values_data.max_value) { (Some(min), Some(max)) => { // Update chunk level statistics - update_min( - &self.descr, - &min, - &mut self.column_metrics.min_column_value, - self.props.ieee754_total_order(), - ); - update_max( - &self.descr, - &max, - &mut self.column_metrics.max_column_value, - self.props.ieee754_total_order(), - ); + update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); + update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); (self.statistics_enabled == EnabledStatistics::Page).then_some( ValueStatistics::new( @@ -1152,7 +1168,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // If data page offset is not set, then no pages have been written let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64; - let mut builder = ColumnChunkMetaData::builder(self.descr.clone()) + let mut builder = ColumnChunkMetaData::builder(self.descr.descr.clone()) .set_compression(self.codec) .set_encodings(self.encodings.iter().cloned().collect()) .set_page_encoding_stats(self.encoding_stats.clone()) @@ -1163,10 +1179,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .set_dictionary_page_offset(dict_page_offset); if self.statistics_enabled != EnabledStatistics::None { - let backwards_compatible_min_max = self - .descr - .sort_order(self.props.ieee754_total_order()) - .is_signed(); + let backwards_compatible_min_max = self.descr.sort_order.is_signed(); let statistics = ValueStatistics::::new( self.column_metrics.min_column_value.clone(), @@ -1364,7 +1377,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() { builder.set_column_crypto_metadata(get_column_crypto_metadata( encryption_properties, - &self.descr, + &self.descr.descr, )) } else { builder @@ -1381,25 +1394,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } -fn update_min( - descr: &ColumnDescriptor, - val: &T, - min: &mut Option, - ieee754_total_order: bool, -) { - update_stat::(descr, val, min, |cur| { - compare_greater(descr, cur, val, ieee754_total_order) +fn update_min(descr: &OrderedColumnDescriptor, val: &T, min: &mut Option) { + update_stat::(&descr.descr, val, min, |cur| { + compare_greater(descr, cur, val) }) } -fn update_max( - descr: &ColumnDescriptor, - val: &T, - max: &mut Option, - ieee754_total_order: bool, -) { - update_stat::(descr, val, max, |cur| { - compare_greater(descr, val, cur, ieee754_total_order) +fn update_max(descr: &OrderedColumnDescriptor, val: &T, max: &mut Option) { + update_stat::(&descr.descr, val, max, |cur| { + compare_greater(descr, val, cur) }) } @@ -1439,12 +1442,7 @@ fn update_stat( } /// Evaluate `a > b` according to underlying logical type. -fn compare_greater( - descr: &ColumnDescriptor, - a: &T, - b: &T, - ieee754_total_order: bool, -) -> bool { +fn compare_greater(descr: &OrderedColumnDescriptor, a: &T, b: &T) -> bool { if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() { if !is_signed { // need to compare unsigned @@ -1480,7 +1478,7 @@ fn compare_greater( }; }; - if ieee754_total_order { + if descr.sort_order == SortOrder::TOTAL_ORDER { if ColumnOrder::get_sort_order( descr.logical_type(), descr.converted_type(), From 52dee5089aa174888db0c97b332e153ddabe774e Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 9 Apr 2025 16:54:24 -0700 Subject: [PATCH 08/18] clean up --- parquet/src/basic.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 3fd1fae74dc0..750c439cf2e4 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -561,19 +561,19 @@ pub enum ColumnOrder { impl ColumnOrder { /// Returns the sort order for a physical/logical type. /// - /// If `use_total_order` is `true` then IEEE 754 total order will be used for floating point + /// If `ieee754_total_order` is `true` then IEEE 754 total order will be used for floating point /// types. pub fn get_sort_order( logical_type: Option, converted_type: ConvertedType, physical_type: Type, - use_total_order: bool, + ieee754_total_order: bool, ) -> SortOrder { // check for floating point types, then fall back to type defined order match logical_type { - Some(LogicalType::Float16) if use_total_order => SortOrder::TOTAL_ORDER, + Some(LogicalType::Float16) if ieee754_total_order => SortOrder::TOTAL_ORDER, _ => match physical_type { - Type::FLOAT | Type::DOUBLE if use_total_order => SortOrder::TOTAL_ORDER, + Type::FLOAT | Type::DOUBLE if ieee754_total_order => SortOrder::TOTAL_ORDER, _ => ColumnOrder::get_type_defined_sort_order( logical_type, converted_type, From d25204836033dc5e367f3209690054e932508c32 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 9 Apr 2025 18:55:53 -0700 Subject: [PATCH 09/18] add first test and fix bug in stats --- parquet/src/column/writer/mod.rs | 48 ++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 52f8c3d7164e..de819a13c40b 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -1395,13 +1395,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } fn update_min(descr: &OrderedColumnDescriptor, val: &T, min: &mut Option) { - update_stat::(&descr.descr, val, min, |cur| { + update_stat::(descr, val, min, |cur| { compare_greater(descr, cur, val) }) } fn update_max(descr: &OrderedColumnDescriptor, val: &T, max: &mut Option) { - update_stat::(&descr.descr, val, max, |cur| { + update_stat::(descr, val, max, |cur| { compare_greater(descr, val, cur) }) } @@ -1425,14 +1425,14 @@ fn is_nan(descr: &ColumnDescriptor, val: &T) -> bool { /// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with /// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true` fn update_stat( - descr: &ColumnDescriptor, + descr: &OrderedColumnDescriptor, val: &T, cur: &mut Option, should_update: F, ) where F: Fn(&T) -> bool, { - if is_nan(descr, val) { + if descr.sort_order != SortOrder::TOTAL_ORDER && is_nan(&descr.descr, val) { return; } @@ -2676,7 +2676,7 @@ mod tests { .map(|s| ByteArray::from(s).into()) .collect::>(); - let stats = float16_statistics_roundtrip(&input); + let stats = float16_statistics_roundtrip(&input, false); assert!(stats.is_min_max_backwards_compatible()); assert_eq!( stats.min_opt().unwrap(), @@ -2695,7 +2695,7 @@ mod tests { .map(|s| ByteArray::from(s).into()) .collect::>(); - let stats = float16_statistics_roundtrip(&input); + let stats = float16_statistics_roundtrip(&input, false); assert!(stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); assert_eq!( @@ -2704,6 +2704,19 @@ mod tests { ); } + #[test] + fn test_column_writer_check_float16_nan_middle_total_order() { + let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE] + .into_iter() + .map(|s| ByteArray::from(s).into()) + .collect::>(); + + let stats = float16_statistics_roundtrip(&input, true); + assert!(!stats.is_min_max_backwards_compatible()); + assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); + assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::NAN)); + } + #[test] fn test_float16_statistics_nan_middle() { let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE] @@ -2711,7 +2724,7 @@ mod tests { .map(|s| ByteArray::from(s).into()) .collect::>(); - let stats = float16_statistics_roundtrip(&input); + let stats = float16_statistics_roundtrip(&input, false); assert!(stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); assert_eq!( @@ -2727,7 +2740,7 @@ mod tests { .map(|s| ByteArray::from(s).into()) .collect::>(); - let stats = float16_statistics_roundtrip(&input); + let stats = float16_statistics_roundtrip(&input, false); assert!(stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); assert_eq!( @@ -2743,7 +2756,7 @@ mod tests { .map(|s| ByteArray::from(s).into()) .collect::>(); - let stats = float16_statistics_roundtrip(&input); + let stats = float16_statistics_roundtrip(&input, false); assert!(stats.min_bytes_opt().is_none()); assert!(stats.max_bytes_opt().is_none()); assert!(stats.is_min_max_backwards_compatible()); @@ -2756,7 +2769,7 @@ mod tests { .map(|s| ByteArray::from(s).into()) .collect::>(); - let stats = float16_statistics_roundtrip(&input); + let stats = float16_statistics_roundtrip(&input, false); assert!(stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO)); assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO)); @@ -2769,7 +2782,7 @@ mod tests { .map(|s| ByteArray::from(s).into()) .collect::>(); - let stats = float16_statistics_roundtrip(&input); + let stats = float16_statistics_roundtrip(&input, false); assert!(stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO)); assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO)); @@ -2782,7 +2795,7 @@ mod tests { .map(|s| ByteArray::from(s).into()) .collect::>(); - let stats = float16_statistics_roundtrip(&input); + let stats = float16_statistics_roundtrip(&input, false); assert!(stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO)); assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI)); @@ -2795,7 +2808,7 @@ mod tests { .map(|s| ByteArray::from(s).into()) .collect::>(); - let stats = float16_statistics_roundtrip(&input); + let stats = float16_statistics_roundtrip(&input, false); assert!(stats.is_min_max_backwards_compatible()); assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI)); assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO)); @@ -4180,9 +4193,16 @@ mod tests { fn float16_statistics_roundtrip( values: &[FixedLenByteArray], + total_order: bool, ) -> ValueStatistics { let page_writer = get_test_page_writer(); - let mut writer = get_test_float16_column_writer(page_writer, Default::default()); + + let props = Arc::new( + WriterProperties::builder() + .set_ieee754_total_order(total_order) + .build(), + ); + let mut writer = get_test_float16_column_writer(page_writer, props); writer.write_batch(values, None, None).unwrap(); let metadata = writer.close().unwrap().metadata; From 23fc43f7636c393775de1f4d78b11688fbc4a797 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 9 Apr 2025 21:50:18 -0700 Subject: [PATCH 10/18] more tests --- parquet/src/column/writer/mod.rs | 265 ++++++++++++++++++++++++++++--- 1 file changed, 245 insertions(+), 20 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index de819a13c40b..09ef93268854 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -1395,15 +1395,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } fn update_min(descr: &OrderedColumnDescriptor, val: &T, min: &mut Option) { - update_stat::(descr, val, min, |cur| { - compare_greater(descr, cur, val) - }) + update_stat::(descr, val, min, |cur| compare_greater(descr, cur, val)) } fn update_max(descr: &OrderedColumnDescriptor, val: &T, max: &mut Option) { - update_stat::(descr, val, max, |cur| { - compare_greater(descr, val, cur) - }) + update_stat::(descr, val, max, |cur| compare_greater(descr, val, cur)) } #[inline] @@ -2704,19 +2700,6 @@ mod tests { ); } - #[test] - fn test_column_writer_check_float16_nan_middle_total_order() { - let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE] - .into_iter() - .map(|s| ByteArray::from(s).into()) - .collect::>(); - - let stats = float16_statistics_roundtrip(&input, true); - assert!(!stats.is_min_max_backwards_compatible()); - assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); - assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::NAN)); - } - #[test] fn test_float16_statistics_nan_middle() { let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE] @@ -2814,6 +2797,71 @@ mod tests { assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO)); } + #[test] + fn test_float16_nan_max_total_order() { + let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE] + .into_iter() + .map(|s| ByteArray::from(s).into()) + .collect::>(); + + let stats = float16_statistics_roundtrip(&input, true); + assert!(!stats.is_min_max_backwards_compatible()); + assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE)); + assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::NAN)); + } + + #[test] + fn test_float16_nan_min_total_order() { + let input = [f16::ONE, -f16::NAN, f16::ZERO] + .into_iter() + .map(|s| ByteArray::from(s).into()) + .collect::>(); + + let stats = float16_statistics_roundtrip(&input, true); + assert!(!stats.is_min_max_backwards_compatible()); + assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::NAN)); + assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ONE)); + } + + #[test] + fn test_float16_statistics_nan_only_total_order() { + let input = [f16::NAN, f16::NAN] + .into_iter() + .map(|s| ByteArray::from(s).into()) + .collect::>(); + + let stats = float16_statistics_roundtrip(&input, true); + assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NAN)); + assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::NAN)); + assert!(!stats.is_min_max_backwards_compatible()); + } + + #[test] + fn test_float16_statistics_nan_min_max_total_order() { + let input = [f16::ONE, -f16::NAN, f16::NAN, f16::PI] + .into_iter() + .map(|s| ByteArray::from(s).into()) + .collect::>(); + + let stats = float16_statistics_roundtrip(&input, true); + assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::NAN)); + assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::NAN)); + assert!(!stats.is_min_max_backwards_compatible()); + } + + #[test] + fn test_float16_statistics_zero_min_max_total_order() { + let input = [-f16::ZERO, f16::ZERO] + .into_iter() + .map(|s| ByteArray::from(s).into()) + .collect::>(); + + let stats = float16_statistics_roundtrip(&input, true); + assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::ZERO)); + assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO)); + assert!(!stats.is_min_max_backwards_compatible()); + } + #[test] fn test_float_statistics_nan_middle() { let stats = statistics_roundtrip::(&[1.0, f32::NAN, 2.0]); @@ -2901,6 +2949,84 @@ mod tests { } } + #[test] + fn test_float_nan_max_total_order() { + let stats = statistics_roundtrip_total_order::(&[1.0, f32::NAN, 2.0]); + assert!(!stats.is_min_max_backwards_compatible()); + if let Statistics::Float(stats) = stats { + assert_eq!(stats.min_opt().unwrap(), &1.0); + assert_eq!( + stats.max_opt().unwrap().total_cmp(&f32::NAN), + Ordering::Equal + ); + } else { + panic!("expecting Statistics::Float"); + } + } + + #[test] + fn test_float_nan_min_total_order() { + let stats = statistics_roundtrip_total_order::(&[1.0, -f32::NAN, 0.0]); + assert!(!stats.is_min_max_backwards_compatible()); + if let Statistics::Float(stats) = stats { + assert_eq!( + stats.min_opt().unwrap().total_cmp(&-f32::NAN), + Ordering::Equal + ); + assert_eq!(stats.max_opt().unwrap(), &1.0); + } else { + panic!("expecting Statistics::Float"); + } + } + + #[test] + fn test_float_statistics_nan_only_total_order() { + let stats = statistics_roundtrip_total_order::(&[f32::NAN, f32::NAN]); + assert!(!stats.is_min_max_backwards_compatible()); + if let Statistics::Float(stats) = stats { + assert_eq!( + stats.min_opt().unwrap().total_cmp(&f32::NAN), + Ordering::Equal + ); + assert_eq!( + stats.max_opt().unwrap().total_cmp(&f32::NAN), + Ordering::Equal + ); + } else { + panic!("expecting Statistics::Float"); + } + } + + #[test] + fn test_float_statistics_nan_min_max_total_order() { + let stats = statistics_roundtrip_total_order::(&[1.0, -f32::NAN, f32::NAN, 2.0]); + assert!(!stats.is_min_max_backwards_compatible()); + if let Statistics::Float(stats) = stats { + assert_eq!( + stats.min_opt().unwrap().total_cmp(&-f32::NAN), + Ordering::Equal + ); + assert_eq!( + stats.max_opt().unwrap().total_cmp(&f32::NAN), + Ordering::Equal + ); + } else { + panic!("expecting Statistics::Float"); + } + } + + #[test] + fn test_float_statistics_zero_min_max_total_order() { + let stats = statistics_roundtrip_total_order::(&[-0.0, 0.0]); + assert!(!stats.is_min_max_backwards_compatible()); + if let Statistics::Float(stats) = stats { + assert_eq!(stats.min_opt().unwrap(), &-0.0); + assert_eq!(stats.max_opt().unwrap(), &0.0); + } else { + panic!("expecting Statistics::Float"); + } + } + #[test] fn test_double_statistics_nan_middle() { let stats = statistics_roundtrip::(&[1.0, f64::NAN, 2.0]); @@ -2988,6 +3114,84 @@ mod tests { } } + #[test] + fn test_double_nan_max_total_order() { + let stats = statistics_roundtrip_total_order::(&[1.0, f64::NAN, 2.0]); + assert!(!stats.is_min_max_backwards_compatible()); + if let Statistics::Double(stats) = stats { + assert_eq!(stats.min_opt().unwrap(), &1.0); + assert_eq!( + stats.max_opt().unwrap().total_cmp(&f64::NAN), + Ordering::Equal + ); + } else { + panic!("expecting Statistics::Float"); + } + } + + #[test] + fn test_double_nan_min_total_order() { + let stats = statistics_roundtrip_total_order::(&[1.0, -f64::NAN, 0.0]); + assert!(!stats.is_min_max_backwards_compatible()); + if let Statistics::Double(stats) = stats { + assert_eq!( + stats.min_opt().unwrap().total_cmp(&-f64::NAN), + Ordering::Equal + ); + assert_eq!(stats.max_opt().unwrap(), &1.0); + } else { + panic!("expecting Statistics::Float"); + } + } + + #[test] + fn test_double_statistics_nan_only_total_order() { + let stats = statistics_roundtrip_total_order::(&[f64::NAN, f64::NAN]); + assert!(!stats.is_min_max_backwards_compatible()); + if let Statistics::Double(stats) = stats { + assert_eq!( + stats.min_opt().unwrap().total_cmp(&f64::NAN), + Ordering::Equal + ); + assert_eq!( + stats.max_opt().unwrap().total_cmp(&f64::NAN), + Ordering::Equal + ); + } else { + panic!("expecting Statistics::Float"); + } + } + + #[test] + fn test_double_statistics_nan_min_max_total_order() { + let stats = statistics_roundtrip_total_order::(&[1.0, -f64::NAN, f64::NAN, 2.0]); + assert!(!stats.is_min_max_backwards_compatible()); + if let Statistics::Double(stats) = stats { + assert_eq!( + stats.min_opt().unwrap().total_cmp(&-f64::NAN), + Ordering::Equal + ); + assert_eq!( + stats.max_opt().unwrap().total_cmp(&f64::NAN), + Ordering::Equal + ); + } else { + panic!("expecting Statistics::Float"); + } + } + + #[test] + fn test_double_statistics_zero_min_max_total_order() { + let stats = statistics_roundtrip_total_order::(&[-0.0, 0.0]); + assert!(!stats.is_min_max_backwards_compatible()); + if let Statistics::Double(stats) = stats { + assert_eq!(stats.min_opt().unwrap(), &-0.0); + assert_eq!(stats.max_opt().unwrap(), &0.0); + } else { + panic!("expecting Statistics::Float"); + } + } + #[test] fn test_compare_greater_byte_array_decimals() { assert!(!compare_greater_byte_array_decimals(&[], &[],),); @@ -4142,7 +4346,8 @@ mod tests { } } - /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] and returns generated statistics. + /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] + /// and returns generated statistics. fn statistics_roundtrip(values: &[::T]) -> Statistics { let page_writer = get_test_page_writer(); let props = Default::default(); @@ -4157,6 +4362,26 @@ mod tests { } } + /// Write data into parquet using [`get_test_page_writer`] and [`get_test_column_writer`] + /// and returns generated statistics. Sets `ieee754_total_order` property to `true`. + fn statistics_roundtrip_total_order(values: &[::T]) -> Statistics { + let page_writer = get_test_page_writer(); + let props = Arc::new( + WriterProperties::builder() + .set_ieee754_total_order(true) + .build(), + ); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + writer.write_batch(values, None, None).unwrap(); + + let metadata = writer.close().unwrap().metadata; + if let Some(stats) = metadata.statistics() { + stats.clone() + } else { + panic!("metadata missing statistics"); + } + } + /// Returns Decimals column writer. fn get_test_decimals_column_writer( page_writer: Box, From 07e2564400da0d943f9078ee5efe44f1a6845335 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 10 Apr 2025 08:12:09 -0700 Subject: [PATCH 11/18] add more tests --- parquet/src/column/writer/mod.rs | 9 ++++++++- parquet/src/file/writer.rs | 30 +++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 09ef93268854..f9aa8955a1aa 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -3164,7 +3164,14 @@ mod tests { #[test] fn test_double_statistics_nan_min_max_total_order() { - let stats = statistics_roundtrip_total_order::(&[1.0, -f64::NAN, f64::NAN, 2.0]); + let stats = statistics_roundtrip_total_order::(&[ + -1.0, + -f64::INFINITY, + -f64::NAN, + f64::NAN, + 2.0, + f64::INFINITY, + ]); assert!(!stats.is_min_max_backwards_compatible()); if let Statistics::Double(stats) = stats { assert_eq!( diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index e22acb2f7f32..2388b431ce29 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1169,6 +1169,34 @@ mod tests { ); let props = Default::default(); + let writer = + SerializedFileWriter::new(file.try_clone().unwrap(), schema.clone(), props).unwrap(); + writer.close().unwrap(); + + let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap(); + + // only leaves + let expected = vec![ + // INT32 + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), + // INTERVAL + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED), + // Float16 + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), + // String + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED), + ]; + let actual = reader.metadata().file_metadata().column_orders(); + + assert!(actual.is_some()); + let actual = actual.unwrap(); + assert_eq!(*actual, expected); + + let props = Arc::new( + WriterProperties::builder() + .set_ieee754_total_order(true) + .build(), + ); let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); writer.close().unwrap(); @@ -1181,7 +1209,7 @@ mod tests { // INTERVAL ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNDEFINED), // Float16 - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), + ColumnOrder::IEEE_754_TOTAL_ORDER, // String ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED), ]; From 84aec341597deea4001756b5a2f1b0c014ce7604 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 10 Apr 2025 08:27:45 -0700 Subject: [PATCH 12/18] more tests --- parquet/tests/arrow_reader/mod.rs | 19 ++++- parquet/tests/arrow_reader/statistics.rs | 88 ++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 3 deletions(-) diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index 0e6783583cd5..ebbb521b22a1 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -84,6 +84,10 @@ enum Scenario { Float16, Float32, Float64, + /// Float tests with Parquet sort order set to IEEE 754 total order + Float16TotalOrder, + Float32TotalOrder, + Float64TotalOrder, Decimal, Decimal256, ByteArray, @@ -686,7 +690,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { Scenario::NumericLimits => { vec![make_numeric_limit_batch()] } - Scenario::Float16 => { + Scenario::Float16 | Scenario::Float16TotalOrder => { vec![ make_f16_batch( vec![-5.0, -4.0, -3.0, -2.0, -1.0] @@ -714,7 +718,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { ), ] } - Scenario::Float32 => { + Scenario::Float32 | Scenario::Float32TotalOrder => { vec![ make_f32_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), make_f32_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]), @@ -722,7 +726,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { make_f32_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]), ] } - Scenario::Float64 => { + Scenario::Float64 | Scenario::Float64TotalOrder => { vec![ make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), make_f64_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]), @@ -1027,10 +1031,19 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem .tempfile() .expect("tempfile creation"); + let total_order = match scenario { + Scenario::Float16TotalOrder | Scenario::Float32TotalOrder | Scenario::Float64TotalOrder => { + true + } + _ => false, + }; + + // TODO(ets): need to get total order option down here let props = WriterProperties::builder() .set_max_row_group_size(row_per_group) .set_bloom_filter_enabled(true) .set_statistics_enabled(EnabledStatistics::Page) + .set_ieee754_total_order(total_order) .build(); let batches = create_data_batch(scenario); diff --git a/parquet/tests/arrow_reader/statistics.rs b/parquet/tests/arrow_reader/statistics.rs index 0eb0fc2b277f..1e5f8f2a03b0 100644 --- a/parquet/tests/arrow_reader/statistics.rs +++ b/parquet/tests/arrow_reader/statistics.rs @@ -705,6 +705,42 @@ async fn test_float_16() { .run(); } +#[tokio::test] +async fn test_float_16_total_order() { + // This creates a parquet files of 1 column named f + let reader = TestReader { + scenario: Scenario::Float16TotalOrder, + row_per_group: 5, + } + .build() + .await; + + Test { + reader: &reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Float16Array::from(vec![ + f16::from_f32(-5.), + f16::from_f32(-4.), + f16::from_f32(0.), + f16::from_f32(5.), + ])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Float16Array::from(vec![ + f16::from_f32(-1.), + f16::from_f32(0.), + f16::from_f32(4.), + f16::from_f32(9.), + ])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), + column_name: "f", + check: Check::Both, + } + .run(); +} + #[tokio::test] async fn test_float_32() { // This creates a parquet files of 1 column named f @@ -731,6 +767,32 @@ async fn test_float_32() { .run(); } +#[tokio::test] +async fn test_float_32_total_order() { + // This creates a parquet files of 1 column named f + let reader = TestReader { + scenario: Scenario::Float32TotalOrder, + row_per_group: 5, + } + .build() + .await; + + Test { + reader: &reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Float32Array::from(vec![-5., -4., 0., 5.0])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Float32Array::from(vec![-1., 0., 4., 9.])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), + column_name: "f", + check: Check::Both, + } + .run(); +} + #[tokio::test] async fn test_float_64() { // This creates a parquet files of 1 column named f @@ -757,6 +819,32 @@ async fn test_float_64() { .run(); } +#[tokio::test] +async fn test_float_64_total_order() { + // This creates a parquet files of 1 column named f + let reader = TestReader { + scenario: Scenario::Float64TotalOrder, + row_per_group: 5, + } + .build() + .await; + + Test { + reader: &reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Float64Array::from(vec![-5., -4., 0., 5.0])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Float64Array::from(vec![-1., 0., 4., 9.])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + // row counts are [5, 5, 5, 5] + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), + column_name: "f", + check: Check::Both, + } + .run(); +} + // timestamp #[tokio::test] async fn test_timestamp() { From c72b6de1cfb692babad9d1ea483003d689e6cc20 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 11 Apr 2025 12:28:15 -0700 Subject: [PATCH 13/18] add total_order option to parquet-rewrite --- parquet/src/bin/parquet-rewrite.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/parquet/src/bin/parquet-rewrite.rs b/parquet/src/bin/parquet-rewrite.rs index 5a1ec94d5502..072098a16387 100644 --- a/parquet/src/bin/parquet-rewrite.rs +++ b/parquet/src/bin/parquet-rewrite.rs @@ -203,6 +203,10 @@ struct Args { /// Sets whether to coerce Arrow types to match Parquet specification #[clap(long)] coerce_types: Option, + + /// Sets whether to use IEEE 754 total order for floating point columns + #[clap(long)] + total_order: Option, } fn main() { @@ -270,6 +274,9 @@ fn main() { if let Some(value) = args.coerce_types { writer_properties_builder = writer_properties_builder.set_coerce_types(value); } + if let Some(value) = args.total_order { + writer_properties_builder = writer_properties_builder.set_ieee754_total_order(value); + } let writer_properties = writer_properties_builder.build(); let mut parquet_writer = ArrowWriter::try_new( File::create(&args.output).expect("Unable to open output file"), From 3d360ec3ac68dd14df2735569fd4f19bf9e0ad98 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 11 Apr 2025 14:36:50 -0700 Subject: [PATCH 14/18] manually regen from merged thrift --- parquet/src/format.rs | 154 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 124 insertions(+), 30 deletions(-) diff --git a/parquet/src/format.rs b/parquet/src/format.rs index f31db1bba737..a197735bd9d3 100644 --- a/parquet/src/format.rs +++ b/parquet/src/format.rs @@ -117,12 +117,12 @@ impl ConvertedType { /// a list is converted into an optional field containing a repeated field for its /// values pub const LIST: ConvertedType = ConvertedType(3); - /// an enum is converted into a binary field + /// an enum is converted into a BYTE_ARRAY field pub const ENUM: ConvertedType = ConvertedType(4); /// A decimal value. /// - /// This may be used to annotate binary or fixed primitive types. The - /// underlying byte array stores the unscaled value encoded as two's + /// This may be used to annotate BYTE_ARRAY or FIXED_LEN_BYTE_ARRAY primitive + /// types. The underlying byte array stores the unscaled value encoded as two's /// complement using big-endian byte order (the most significant byte is the /// zeroth element). The value of the decimal is the value * 10^{-scale}. /// @@ -185,7 +185,7 @@ impl ConvertedType { pub const JSON: ConvertedType = ConvertedType(19); /// An embedded BSON document /// - /// A BSON document embedded within a single BINARY column. + /// A BSON document embedded within a single BYTE_ARRAY column. pub const BSON: ConvertedType = ConvertedType(20); /// An interval of time /// @@ -288,9 +288,9 @@ impl From<&ConvertedType> for i32 { pub struct FieldRepetitionType(pub i32); impl FieldRepetitionType { - /// This field is required (can not be null) and each record has exactly 1 value. + /// This field is required (can not be null) and each row has exactly 1 value. pub const REQUIRED: FieldRepetitionType = FieldRepetitionType(0); - /// The field is optional (can be null) and each record has 0 or 1 values. + /// The field is optional (can be null) and each row has 0 or 1 values. pub const OPTIONAL: FieldRepetitionType = FieldRepetitionType(1); /// The field is repeated and can contain 0 or more values pub const REPEATED: FieldRepetitionType = FieldRepetitionType(2); @@ -379,12 +379,15 @@ impl Encoding { pub const DELTA_BYTE_ARRAY: Encoding = Encoding(7); /// Dictionary encoding: the ids are encoded using the RLE encoding pub const RLE_DICTIONARY: Encoding = Encoding(8); - /// Encoding for floating-point data. + /// Encoding for fixed-width data (FLOAT, DOUBLE, INT32, INT64, FIXED_LEN_BYTE_ARRAY). /// K byte-streams are created where K is the size in bytes of the data type. - /// The individual bytes of an FP value are scattered to the corresponding stream and + /// The individual bytes of a value are scattered to the corresponding stream and /// the streams are concatenated. /// This itself does not reduce the size of the data but can lead to better compression /// afterwards. + /// + /// Added in 2.8 for FLOAT and DOUBLE. + /// Support for INT32, INT64 and FIXED_LEN_BYTE_ARRAY added in 2.11. pub const BYTE_STREAM_SPLIT: Encoding = Encoding(9); pub const ENUM_VALUES: &'static [Self] = &[ Self::PLAIN, @@ -792,7 +795,12 @@ pub struct Statistics { /// signed. pub max: Option>, pub min: Option>, - /// count of null value in the column + /// Count of null values in the column. + /// + /// Writers SHOULD always write this field even if it is zero (i.e. no null value) + /// or the column is not nullable. + /// Readers MUST distinguish between null_count not being present and null_count == 0. + /// If null_count is not present, readers MUST NOT assume null_count == 0. pub null_count: Option, /// count of distinct values occurring pub distinct_count: Option, @@ -1260,7 +1268,7 @@ impl crate::thrift::TSerializable for NullType { /// To maintain forward-compatibility in v1, implementations using this logical /// type must also set scale and precision on the annotated SchemaElement. /// -/// Allowed for physical types: INT32, INT64, FIXED, and BINARY +/// Allowed for physical types: INT32, INT64, FIXED_LEN_BYTE_ARRAY, and BYTE_ARRAY. #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DecimalType { pub scale: i32, @@ -1757,7 +1765,7 @@ impl crate::thrift::TSerializable for IntType { /// Embedded JSON logical type annotation /// -/// Allowed for physical types: BINARY +/// Allowed for physical types: BYTE_ARRAY #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct JsonType { } @@ -1797,7 +1805,7 @@ impl crate::thrift::TSerializable for JsonType { /// Embedded BSON logical type annotation /// -/// Allowed for physical types: BINARY +/// Allowed for physical types: BYTE_ARRAY #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct BsonType { } @@ -1831,6 +1839,44 @@ impl crate::thrift::TSerializable for BsonType { } } +// +// VariantType +// + +/// Embedded Variant logical type annotation +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct VariantType { +} + +impl VariantType { + pub fn new() -> VariantType { + VariantType {} + } +} + +impl crate::thrift::TSerializable for VariantType { + fn read_from_in_protocol(i_prot: &mut T) -> thrift::Result { + i_prot.read_struct_begin()?; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + i_prot.skip(field_ident.field_type)?; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = VariantType {}; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut T) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("VariantType"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + // // LogicalType // @@ -1851,6 +1897,7 @@ pub enum LogicalType { BSON(BsonType), UUID(UUIDType), FLOAT16(Float16Type), + VARIANT(VariantType), } impl crate::thrift::TSerializable for LogicalType { @@ -1963,6 +2010,13 @@ impl crate::thrift::TSerializable for LogicalType { } received_field_count += 1; }, + 16 => { + let val = VariantType::read_from_in_protocol(i_prot)?; + if ret.is_none() { + ret = Some(LogicalType::VARIANT(val)); + } + received_field_count += 1; + }, _ => { i_prot.skip(field_ident.field_type)?; received_field_count += 1; @@ -2067,6 +2121,11 @@ impl crate::thrift::TSerializable for LogicalType { f.write_to_out_protocol(o_prot)?; o_prot.write_field_end()?; }, + LogicalType::VARIANT(ref f) => { + o_prot.write_field_begin(&TFieldIdentifier::new("VARIANT", TType::Struct, 16))?; + f.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + }, } o_prot.write_field_stop()?; o_prot.write_struct_end() @@ -2283,7 +2342,12 @@ impl crate::thrift::TSerializable for SchemaElement { /// Data page header #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DataPageHeader { - /// Number of values, including NULLs, in this data page. * + /// Number of values, including NULLs, in this data page. + /// + /// If a OffsetIndex is present, a page must begin at a row + /// boundary (repetition_level = 0). Otherwise, pages may begin + /// within a row (repetition_level > 0). + /// pub num_values: i32, /// Encoding used for this data page * pub encoding: Encoding, @@ -2527,7 +2591,10 @@ pub struct DataPageHeaderV2 { /// Number of NULL values, in this data page. /// Number of non-null = num_values - num_nulls which is also the number of values in the data section * pub num_nulls: i32, - /// Number of rows in this data page. which means pages change on record boundaries (r = 0) * + /// Number of rows in this data page. Every page must begin at a + /// row boundary (repetition_level = 0): rows must **not** be + /// split across page boundaries when using V2 data pages. + /// pub num_rows: i32, /// Encoding used for data in this page * pub encoding: Encoding, @@ -3344,10 +3411,10 @@ impl crate::thrift::TSerializable for KeyValue { // SortingColumn // -/// Wrapper struct to specify sort order +/// Sort order within a RowGroup of a leaf column #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct SortingColumn { - /// The column index (in this row group) * + /// The ordinal position of the column (in this row group) * pub column_idx: i32, /// If true, indicates this column is sorted in descending order. * pub descending: bool, @@ -4035,11 +4102,19 @@ pub struct ColumnChunk { /// metadata. This path is relative to the current file. /// pub file_path: Option, - /// Byte offset in file_path to the ColumnMetaData * + /// Deprecated: Byte offset in file_path to the ColumnMetaData + /// + /// Past use of this field has been inconsistent, with some implementations + /// using it to point to the ColumnMetaData and some using it to point to + /// the first page in the column chunk. In many cases, the ColumnMetaData at this + /// location is wrong. This field is now deprecated and should not be used. + /// Writers should set this field to 0 if no ColumnMetaData has been written outside + /// the footer. pub file_offset: i64, - /// Column metadata for this chunk. This is the same content as what is at - /// file_path/file_offset. Having it here has it replicated in the file - /// metadata. + /// Column metadata for this chunk. Some writers may also replicate this at the + /// location pointed to by file_path/file_offset. + /// Note: while marked as optional, this field is in fact required by most major + /// Parquet implementations. As such, writers MUST populate this field. /// pub meta_data: Option, /// File offset of ColumnChunk's OffsetIndex * @@ -4536,8 +4611,9 @@ pub struct PageLocation { /// Size of the page, including header. Sum of compressed_page_size and header /// length pub compressed_page_size: i32, - /// Index within the RowGroup of the first row of the page; this means pages - /// change on record boundaries (r = 0). + /// Index within the RowGroup of the first row of the page. When an + /// OffsetIndex is present, pages must begin on row boundaries + /// (repetition_level = 0). pub first_row_index: i64, } @@ -4614,10 +4690,15 @@ impl crate::thrift::TSerializable for PageLocation { // OffsetIndex // +/// Optional offsets for each data page in a ColumnChunk. +/// +/// Forms part of the page index, along with ColumnIndex. +/// +/// OffsetIndex may be present even if ColumnIndex is not. #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct OffsetIndex { /// PageLocations, ordered by increasing PageLocation.offset. It is required - /// that page_locations[i].first_row_index < page_locations[i+1].first_row_index. + /// that page_locations\[i\].first_row_index < page_locations\[i+1\].first_row_index. pub page_locations: Vec, /// Unencoded/uncompressed size for BYTE_ARRAY types. /// @@ -4709,21 +4790,27 @@ impl crate::thrift::TSerializable for OffsetIndex { // ColumnIndex // -/// Description for ColumnIndex. -/// Each [i] refers to the page at OffsetIndex.page_locations[i] +/// Optional statistics for each data page in a ColumnChunk. +/// +/// Forms part the page index, along with OffsetIndex. +/// +/// If this structure is present, OffsetIndex must also be present. +/// +/// For each field in this structure, ``\[i\] refers to the page at +/// OffsetIndex.page_locations\[i\] #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct ColumnIndex { /// A list of Boolean values to determine the validity of the corresponding /// min and max values. If true, a page contains only null values, and writers /// have to set the corresponding entries in min_values and max_values to - /// byte[0], so that all lists have the same length. If false, the + /// byte\[0\], so that all lists have the same length. If false, the /// corresponding entries in min_values and max_values must be valid. pub null_pages: Vec, /// Two lists containing lower and upper bounds for the values of each page /// determined by the ColumnOrder of the column. These may be the actual /// minimum and maximum values found on a page, but can also be (more compact) /// values that do not exist on a page. For example, instead of storing ""Blart - /// Versenwald III", a writer may set min_values[i]="B", max_values[i]="C". + /// Versenwald III", a writer may set min_values\[i\]="B", max_values\[i\]="C". /// Such more compact values must still be valid values within the column's /// logical type. Readers must make sure that list entries are populated before /// using them by inspecting null_pages. @@ -4731,10 +4818,17 @@ pub struct ColumnIndex { pub max_values: Vec>, /// Stores whether both min_values and max_values are ordered and if so, in /// which direction. This allows readers to perform binary searches in both - /// lists. Readers cannot assume that max_values[i] <= min_values[i+1], even + /// lists. Readers cannot assume that max_values\[i\] <= min_values\[i+1\], even /// if the lists are ordered. pub boundary_order: BoundaryOrder, - /// A list containing the number of null values for each page * + /// A list containing the number of null values for each page + /// + /// Writers SHOULD always write this field even if no null values + /// are present or the column is not nullable. + /// Readers MUST distinguish between null_counts not being present + /// and null_count being 0. + /// If null_counts are not present, readers MUST NOT assume all + /// null counts are 0. pub null_counts: Option>, /// Contains repetition level histograms for each page /// concatenated together. The repetition_level_histogram field on @@ -5212,7 +5306,7 @@ pub struct FileMetaData { /// Optional key/value metadata * pub key_value_metadata: Option>, /// String for application that wrote this file. This should be in the format - /// version (build ). + /// `` version `` (build ``). /// e.g. impala version 1.0 (build 6cf94d29b2b7115df4de2c06e2ab4326d721eb55) /// pub created_by: Option, From 13ce8a904e5ecaf146151986d8d2b8f35f0f4706 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 11 Apr 2025 15:40:49 -0700 Subject: [PATCH 15/18] stub in variant --- parquet/src/basic.rs | 12 +++++++++--- parquet/src/schema/printer.rs | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 750c439cf2e4..b4844120335b 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -228,6 +228,8 @@ pub enum LogicalType { Uuid, /// A 16-bit floating point number. Float16, + /// A Variant. + Variant, } // ---------------------------------------------------------------------- @@ -607,6 +609,7 @@ impl ColumnOrder { LogicalType::Unknown => SortOrder::UNDEFINED, LogicalType::Uuid => SortOrder::UNSIGNED, LogicalType::Float16 => SortOrder::SIGNED, + LogicalType::Variant => SortOrder::UNDEFINED, }, // Fall back to converted type None => Self::get_converted_sort_order(converted_type, physical_type), @@ -870,6 +873,7 @@ impl From for LogicalType { parquet::LogicalType::BSON(_) => LogicalType::Bson, parquet::LogicalType::UUID(_) => LogicalType::Uuid, parquet::LogicalType::FLOAT16(_) => LogicalType::Float16, + parquet::LogicalType::VARIANT(_) => LogicalType::Variant, } } } @@ -911,6 +915,7 @@ impl From for parquet::LogicalType { LogicalType::Bson => parquet::LogicalType::BSON(Default::default()), LogicalType::Uuid => parquet::LogicalType::UUID(Default::default()), LogicalType::Float16 => parquet::LogicalType::FLOAT16(Default::default()), + LogicalType::Variant => parquet::LogicalType::VARIANT(Default::default()), } } } @@ -960,9 +965,10 @@ impl From> for ConvertedType { }, LogicalType::Json => ConvertedType::JSON, LogicalType::Bson => ConvertedType::BSON, - LogicalType::Uuid | LogicalType::Float16 | LogicalType::Unknown => { - ConvertedType::NONE - } + LogicalType::Uuid + | LogicalType::Float16 + | LogicalType::Variant + | LogicalType::Unknown => ConvertedType::NONE, }, None => ConvertedType::NONE, } diff --git a/parquet/src/schema/printer.rs b/parquet/src/schema/printer.rs index 44c742fca66e..4b4ffac8569c 100644 --- a/parquet/src/schema/printer.rs +++ b/parquet/src/schema/printer.rs @@ -326,6 +326,7 @@ fn print_logical_and_converted( LogicalType::List => "LIST".to_string(), LogicalType::Map => "MAP".to_string(), LogicalType::Float16 => "FLOAT16".to_string(), + LogicalType::Variant => "VARIANT".to_string(), LogicalType::Unknown => "UNKNOWN".to_string(), }, None => { From b1e65fb97b27681aed153085299af836e98184ab Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 11 Apr 2025 16:02:28 -0700 Subject: [PATCH 16/18] clippy --- parquet/src/column/writer/mod.rs | 4 +--- parquet/tests/arrow_reader/mod.rs | 10 ++++------ 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index f9aa8955a1aa..e2d44d94e094 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -1502,14 +1502,12 @@ fn compare_greater(descr: &OrderedColumnDescriptor, a: &T, return a.total_cmp(&b) == Ordering::Greater; } } - } else { - if let Some(LogicalType::Float16) = descr.logical_type() { + } else if let Some(LogicalType::Float16) = descr.logical_type() { let a = a.as_bytes(); let a = f16::from_le_bytes([a[0], a[1]]); let b = b.as_bytes(); let b = f16::from_le_bytes([b[0], b[1]]); return a > b; - } } a > b diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index ebbb521b22a1..ce1a16ffc02d 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -1031,12 +1031,10 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem .tempfile() .expect("tempfile creation"); - let total_order = match scenario { - Scenario::Float16TotalOrder | Scenario::Float32TotalOrder | Scenario::Float64TotalOrder => { - true - } - _ => false, - }; + let total_order = matches!( + scenario, + Scenario::Float16TotalOrder | Scenario::Float32TotalOrder | Scenario::Float64TotalOrder + ); // TODO(ets): need to get total order option down here let props = WriterProperties::builder() From 63b6c6355ef9899c2505d9016250e3f327a716a8 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 11 Apr 2025 16:04:19 -0700 Subject: [PATCH 17/18] correct spoonerism --- parquet/src/file/metadata/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index f89e38434881..9331fad39be4 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -347,7 +347,7 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { /// Sets whether to use IEEE 754 total order for statistics. /// - /// See [`crate::file::properties::WriterBuilderProperties::set_ieee754_total_order`] for + /// See [`crate::file::properties::WriterPropertiesBuilder::set_ieee754_total_order`] for /// more information. pub fn with_ieee754_total_order(mut self, ieee754_total_order: bool) -> Self { self.ieee754_total_order = ieee754_total_order; From 12830b563fb9183a3d6afb7ec9cf81e7a93f9a2f Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 11 Apr 2025 16:07:29 -0700 Subject: [PATCH 18/18] fix lint --- parquet/src/column/writer/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index e2d44d94e094..6395104599a6 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -1503,11 +1503,11 @@ fn compare_greater(descr: &OrderedColumnDescriptor, a: &T, } } } else if let Some(LogicalType::Float16) = descr.logical_type() { - let a = a.as_bytes(); - let a = f16::from_le_bytes([a[0], a[1]]); - let b = b.as_bytes(); - let b = f16::from_le_bytes([b[0], b[1]]); - return a > b; + let a = a.as_bytes(); + let a = f16::from_le_bytes([a[0], a[1]]); + let b = b.as_bytes(); + let b = f16::from_le_bytes([b[0], b[1]]); + return a > b; } a > b