Skip to content

[Not for Merge] PoC implementation of PARQUET-2249: Introduce IEEE 754 total order #7408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
2 changes: 1 addition & 1 deletion parquet/regen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.

REVISION=5b564f3c47679526cf72e54f207013f28f53acc4
REVISION=74bd03d63350a6a87a0e2facad5292567185d4ff

SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]:-$0}")" && pwd)"

Expand Down
51 changes: 45 additions & 6 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ pub enum LogicalType {
Uuid,
/// A 16-bit floating point number.
Float16,
/// A Variant.
Variant,
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -529,6 +531,8 @@ pub enum SortOrder {
UNSIGNED,
/// Comparison is undefined.
UNDEFINED,
/// Use IEEE 754 total order.
TOTAL_ORDER,
}

impl SortOrder {
Expand All @@ -549,17 +553,43 @@ pub enum ColumnOrder {
/// Column uses the order defined by its logical or physical type
/// (if there is no logical type), parquet-format 2.4.0+.
TYPE_DEFINED_ORDER(SortOrder),
/// Column ordering to use for floating point types.
IEEE_754_TOTAL_ORDER,
/// Undefined column order, means legacy behaviour before parquet-format 2.4.0.
/// Sort order is always SIGNED.
UNDEFINED,
}

impl ColumnOrder {
/// Returns sort order for a physical/logical type.
/// Returns the sort order for a physical/logical type.
///
/// If `ieee754_total_order` is `true` then IEEE 754 total order will be used for floating point
/// types.
pub fn get_sort_order(
logical_type: Option<LogicalType>,
converted_type: ConvertedType,
physical_type: Type,
ieee754_total_order: bool,
) -> SortOrder {
// check for floating point types, then fall back to type defined order
match logical_type {
Some(LogicalType::Float16) if ieee754_total_order => SortOrder::TOTAL_ORDER,
_ => match physical_type {
Type::FLOAT | Type::DOUBLE if ieee754_total_order => SortOrder::TOTAL_ORDER,
_ => ColumnOrder::get_type_defined_sort_order(
logical_type,
converted_type,
physical_type,
),
},
}
}

/// Returns the type defined sort order for a physical/logical type.
pub fn get_type_defined_sort_order(
logical_type: Option<LogicalType>,
converted_type: ConvertedType,
physical_type: Type,
) -> SortOrder {
// TODO: Should this take converted and logical type, for compatibility?
match logical_type {
Expand All @@ -579,6 +609,7 @@ impl ColumnOrder {
LogicalType::Unknown => SortOrder::UNDEFINED,
LogicalType::Uuid => SortOrder::UNSIGNED,
LogicalType::Float16 => SortOrder::SIGNED,
LogicalType::Variant => SortOrder::UNDEFINED,
},
// Fall back to converted type
None => Self::get_converted_sort_order(converted_type, physical_type),
Expand Down Expand Up @@ -647,6 +678,7 @@ impl ColumnOrder {
pub fn sort_order(&self) -> SortOrder {
match *self {
ColumnOrder::TYPE_DEFINED_ORDER(order) => order,
ColumnOrder::IEEE_754_TOTAL_ORDER => SortOrder::TOTAL_ORDER,
ColumnOrder::UNDEFINED => SortOrder::SIGNED,
}
}
Expand Down Expand Up @@ -841,6 +873,7 @@ impl From<parquet::LogicalType> for LogicalType {
parquet::LogicalType::BSON(_) => LogicalType::Bson,
parquet::LogicalType::UUID(_) => LogicalType::Uuid,
parquet::LogicalType::FLOAT16(_) => LogicalType::Float16,
parquet::LogicalType::VARIANT(_) => LogicalType::Variant,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is probably valuable in itself to get a PR ready to upgrade the format spec (to get support for the Variant logical type, among other things)

}
}
}
Expand Down Expand Up @@ -882,6 +915,7 @@ impl From<LogicalType> for parquet::LogicalType {
LogicalType::Bson => parquet::LogicalType::BSON(Default::default()),
LogicalType::Uuid => parquet::LogicalType::UUID(Default::default()),
LogicalType::Float16 => parquet::LogicalType::FLOAT16(Default::default()),
LogicalType::Variant => parquet::LogicalType::VARIANT(Default::default()),
}
}
}
Expand Down Expand Up @@ -931,9 +965,10 @@ impl From<Option<LogicalType>> for ConvertedType {
},
LogicalType::Json => ConvertedType::JSON,
LogicalType::Bson => ConvertedType::BSON,
LogicalType::Uuid | LogicalType::Float16 | LogicalType::Unknown => {
ConvertedType::NONE
}
LogicalType::Uuid
| LogicalType::Float16
| LogicalType::Variant
| LogicalType::Unknown => ConvertedType::NONE,
},
None => ConvertedType::NONE,
}
Expand Down Expand Up @@ -2134,7 +2169,11 @@ mod tests {
fn check_sort_order(types: Vec<LogicalType>, expected_order: SortOrder) {
for tpe in types {
assert_eq!(
ColumnOrder::get_sort_order(Some(tpe), ConvertedType::NONE, Type::BYTE_ARRAY),
ColumnOrder::get_type_defined_sort_order(
Some(tpe),
ConvertedType::NONE,
Type::BYTE_ARRAY
),
expected_order
);
}
Expand Down Expand Up @@ -2229,7 +2268,7 @@ mod tests {
fn check_sort_order(types: Vec<ConvertedType>, expected_order: SortOrder) {
for tpe in types {
assert_eq!(
ColumnOrder::get_sort_order(None, tpe, Type::BYTE_ARRAY),
ColumnOrder::get_type_defined_sort_order(None, tpe, Type::BYTE_ARRAY),
expected_order
);
}
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/bin/parquet-rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ struct Args {
/// Sets whether to coerce Arrow types to match Parquet specification
#[clap(long)]
coerce_types: Option<bool>,

/// Sets whether to use IEEE 754 total order for floating point columns
#[clap(long)]
total_order: Option<bool>,
}

fn main() {
Expand Down Expand Up @@ -270,6 +274,9 @@ fn main() {
if let Some(value) = args.coerce_types {
writer_properties_builder = writer_properties_builder.set_coerce_types(value);
}
if let Some(value) = args.total_order {
writer_properties_builder = writer_properties_builder.set_ieee754_total_order(value);
}
let writer_properties = writer_properties_builder.build();
let mut parquet_writer = ArrowWriter::try_new(
File::create(&args.output).expect("Unable to open output file"),
Expand Down
25 changes: 17 additions & 8 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use bytes::Bytes;
use half::f16;

use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
use crate::basic::{ConvertedType, Encoding, LogicalType, SortOrder, Type};
use crate::bloom_filter::Sbbf;
use crate::column::writer::{
compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
Expand All @@ -28,7 +28,9 @@ use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
use crate::schema::types::ColumnDescPtr;

use super::OrderedColumnDescriptor;

/// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`]
pub trait ColumnValues {
Expand Down Expand Up @@ -126,7 +128,7 @@ pub trait ColumnValueEncoder {
pub struct ColumnValueEncoderImpl<T: DataType> {
encoder: Box<dyn Encoder<T>>,
dict_encoder: Option<DictEncoder<T>>,
descr: ColumnDescPtr,
descr: OrderedColumnDescriptor,
num_values: usize,
statistics_enabled: EnabledStatistics,
min_value: Option<T::T>,
Expand Down Expand Up @@ -201,10 +203,12 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;

let descr = OrderedColumnDescriptor::new(descr.clone(), props.ieee754_total_order());

Ok(Self {
encoder,
dict_encoder,
descr: descr.clone(),
descr,
num_values: 0,
statistics_enabled,
bloom_filter,
Expand Down Expand Up @@ -309,22 +313,23 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
}
}

fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
fn get_min_max<'a, T, I>(descr: &OrderedColumnDescriptor, mut iter: I) -> Option<(T, T)>
where
T: ParquetValueType + 'a,
I: Iterator<Item = &'a T>,
{
let ieee754_total_order = descr.sort_order == SortOrder::TOTAL_ORDER;
let first = loop {
let next = iter.next()?;
if !is_nan(descr, next) {
if ieee754_total_order || !is_nan(&descr.descr, next) {
break next;
}
};

let mut min = first;
let mut max = first;
for val in iter {
if is_nan(descr, val) {
if !ieee754_total_order && is_nan(&descr.descr, val) {
continue;
}
if compare_greater(descr, min, val) {
Expand All @@ -350,7 +355,11 @@ where
}

#[inline]
fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace: f32) -> T {
fn replace_zero<T: ParquetValueType>(val: &T, descr: &OrderedColumnDescriptor, replace: f32) -> T {
if descr.sort_order == SortOrder::TOTAL_ORDER {
return val.clone();
}

match T::PHYSICAL_TYPE {
Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap()
Expand Down
Loading
Loading