Skip to content

Commit 9f93e9b

Browse files
committed
use Builder in row group level stats.
1 parent 88e3957 commit 9f93e9b

File tree

1 file changed

+179
-26
lines changed
  • datafusion/core/src/datasource/physical_plan/parquet

1 file changed

+179
-26
lines changed

datafusion/core/src/datasource/physical_plan/parquet/statistics.rs

+179-26
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ use arrow_array::{
2626
new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array,
2727
Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array,
2828
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
29-
LargeStringArray, StringArray, Time32MillisecondArray, Time32SecondArray,
30-
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
31-
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
32-
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
29+
LargeStringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
30+
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
31+
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
32+
UInt64Array, UInt8Array,
3333
};
3434
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
3535
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
@@ -398,29 +398,61 @@ macro_rules! get_statistics {
398398
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
399399
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())),
400400
))),
401-
DataType::Utf8 => Ok(Arc::new(StringArray::from_iter(
402-
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
403-
x.and_then(|x| {
404-
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
405-
if res.is_none() {
406-
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
401+
DataType::Utf8 => {
402+
let mode = std::env::var("MODE").unwrap_or_default();
403+
match mode.as_str() {
404+
"use_builder" => {
405+
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
406+
let mut builder = StringBuilder::new();
407+
for x in iterator {
408+
let Some(x) = x else {
409+
builder.append_null(); // no statistics value
410+
continue;
411+
};
412+
413+
let Ok(x) = std::str::from_utf8(x) else {
414+
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
415+
builder.append_null();
416+
continue;
417+
};
418+
419+
builder.append_value(x);
407420
}
408-
res
409-
})
410-
}),
411-
))),
421+
Ok(Arc::new(builder.finish()))
422+
}
423+
_ => {
424+
Ok(Arc::new(LargeStringArray::from_iter(
425+
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
426+
x.and_then(|x| {
427+
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
428+
if res.is_none() {
429+
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
430+
}
431+
res
432+
})
433+
}),
434+
)))
435+
}
436+
}
437+
},
412438
DataType::LargeUtf8 => {
413-
Ok(Arc::new(LargeStringArray::from_iter(
414-
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
415-
x.and_then(|x| {
416-
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
417-
if res.is_none() {
418-
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
419-
}
420-
res
421-
})
422-
}),
423-
)))
439+
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
440+
let mut builder = StringBuilder::new();
441+
for x in iterator {
442+
let Some(x) = x else {
443+
builder.append_null(); // no statistics value
444+
continue;
445+
};
446+
447+
let Ok(x) = std::str::from_utf8(x) else {
448+
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
449+
builder.append_null();
450+
continue;
451+
};
452+
453+
builder.append_value(x);
454+
}
455+
Ok(Arc::new(builder.finish()))
424456
}
425457
DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from(
426458
[<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| {
@@ -992,11 +1024,132 @@ fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
9921024
/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
9931025
///
9941026
/// This is an internal helper -- see [`StatisticsConverter`] for public API
1027+
// fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1028+
// data_type: &DataType,
1029+
// iterator: I,
1030+
// ) -> Result<ArrayRef> {
1031+
// get_statistics!(Max, data_type, iterator)
1032+
// }
9951033
fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
9961034
data_type: &DataType,
9971035
iterator: I,
9981036
) -> Result<ArrayRef> {
999-
get_statistics!(Max, data_type, iterator)
1037+
match data_type {
1038+
DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(MaxBooleanStatsIterator::new(iterator).map(|x|x.copied()),))),
1039+
DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
1040+
x.and_then(|x|i8::try_from(*x).ok())
1041+
}),))),
1042+
DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
1043+
x.and_then(|x|i16::try_from(*x).ok())
1044+
}),))),
1045+
DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))),
1046+
DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),))),
1047+
DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
1048+
x.and_then(|x|u8::try_from(*x).ok())
1049+
}),))),
1050+
DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
1051+
x.and_then(|x|u16::try_from(*x).ok())
1052+
}),))),
1053+
DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x| *x as u32)),))),
1054+
DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.map(|x| *x as u64)),))),
1055+
DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|x.and_then(|x|{
1056+
from_bytes_to_f16(x)
1057+
})),))),
1058+
DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(MaxFloatStatsIterator::new(iterator).map(|x|x.copied()),))),
1059+
DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(MaxDoubleStatsIterator::new(iterator).map(|x|x.copied()),))),
1060+
DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))),
1061+
DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x|i64::from(*x)*24*60*60*1000)),))),
1062+
DataType::Timestamp(unit,timezone) => {
1063+
let iter = MaxInt64StatsIterator::new(iterator).map(|x|x.copied());
1064+
Ok(match unit {
1065+
TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1066+
TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1067+
TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1068+
TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1069+
})
1070+
},
1071+
DataType::Time32(unit) => {
1072+
Ok(match unit {
1073+
TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)),
1074+
TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)),
1075+
_ => {
1076+
let len = iterator.count();
1077+
new_null_array(data_type,len)
1078+
}
1079+
})
1080+
},
1081+
DataType::Time64(unit) => {
1082+
Ok(match unit {
1083+
TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)),
1084+
TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)),
1085+
_ => {
1086+
let len = iterator.count();
1087+
new_null_array(data_type,len)
1088+
}
1089+
})
1090+
},
1091+
DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))),
1092+
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))),
1093+
DataType::Utf8 => {
1094+
let iterator = MaxByteArrayStatsIterator::new(iterator);
1095+
let mut builder = StringBuilder::new();
1096+
for x in iterator {
1097+
let Some(x) = x else {
1098+
builder.append_null(); // no statistics value
1099+
continue;
1100+
};
1101+
1102+
let Ok(x) = std::str::from_utf8(x) else {
1103+
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
1104+
builder.append_null();
1105+
continue;
1106+
};
1107+
1108+
builder.append_value(x);
1109+
}
1110+
Ok(Arc::new(builder.finish()))
1111+
},
1112+
DataType::LargeUtf8 => {
1113+
Ok(Arc::new(LargeStringArray::from_iter(MaxByteArrayStatsIterator::new(iterator).map(|x|{
1114+
x.and_then(|x|{
1115+
let res = std::str::from_utf8(x).map(|s|s.to_string()).ok();
1116+
if res.is_none() {
1117+
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
1118+
}
1119+
res
1120+
})
1121+
}),)))
1122+
}
1123+
DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|{
1124+
x.and_then(|x|{
1125+
if x.len().try_into()==Ok(*size){
1126+
Some(x)
1127+
}else {
1128+
log::debug!(
1129+
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
1130+
size,
1131+
x.len(),
1132+
);
1133+
None
1134+
}
1135+
})
1136+
}).collect::<Vec<_>>(),))),
1137+
DataType::Decimal128(precision,scale) => {
1138+
let arr = Decimal128Array::from_iter(MaxDecimal128StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?;
1139+
Ok(Arc::new(arr))
1140+
},
1141+
DataType::Decimal256(precision,scale) => {
1142+
let arr = Decimal256Array::from_iter(MaxDecimal256StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?;
1143+
Ok(Arc::new(arr))
1144+
},
1145+
DataType::Dictionary(_,value_type) => {
1146+
max_statistics(value_type,iterator)
1147+
}
1148+
DataType::Map(_,_)|DataType::Duration(_)|DataType::Interval(_)|DataType::Null|DataType::BinaryView|DataType::Utf8View|DataType::List(_)|DataType::ListView(_)|DataType::FixedSizeList(_,_)|DataType::LargeList(_)|DataType::LargeListView(_)|DataType::Struct(_)|DataType::Union(_,_)|DataType::RunEndEncoded(_,_) => {
1149+
let len = iterator.count();
1150+
Ok(new_null_array(data_type,len))
1151+
}
1152+
}
10001153
}
10011154

10021155
/// Extracts the min statistics from an iterator

0 commit comments

Comments
 (0)