Skip to content

Commit c09912f

Browse files
committed
use Builder in row group level stats.
1 parent 88e3957 commit c09912f

File tree

1 file changed

+211
-43
lines changed
  • datafusion/core/src/datasource/physical_plan/parquet

1 file changed

+211
-43
lines changed

datafusion/core/src/datasource/physical_plan/parquet/statistics.rs

+211-43
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@
1919
2020
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328
2121

22-
use arrow::array::StringBuilder;
22+
use arrow::array::{FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder};
2323
use arrow::datatypes::i256;
2424
use arrow::{array::ArrayRef, datatypes::DataType};
2525
use arrow_array::{
2626
new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array,
2727
Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array,
2828
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
29-
LargeStringArray, StringArray, Time32MillisecondArray, Time32SecondArray,
30-
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
31-
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
32-
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
29+
LargeStringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
30+
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
31+
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
32+
UInt64Array, UInt8Array,
3333
};
3434
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
3535
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
@@ -398,46 +398,67 @@ macro_rules! get_statistics {
398398
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
399399
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())),
400400
))),
401-
DataType::Utf8 => Ok(Arc::new(StringArray::from_iter(
402-
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
403-
x.and_then(|x| {
404-
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
405-
if res.is_none() {
406-
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
407-
}
408-
res
409-
})
410-
}),
411-
))),
401+
DataType::Utf8 => {
402+
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
403+
let mut builder = StringBuilder::new();
404+
for x in iterator {
405+
let Some(x) = x else {
406+
builder.append_null(); // no statistics value
407+
continue;
408+
};
409+
410+
let Ok(x) = std::str::from_utf8(x) else {
411+
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
412+
builder.append_null();
413+
continue;
414+
};
415+
416+
builder.append_value(x);
417+
}
418+
Ok(Arc::new(builder.finish()))
419+
},
412420
DataType::LargeUtf8 => {
413-
Ok(Arc::new(LargeStringArray::from_iter(
414-
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
415-
x.and_then(|x| {
416-
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
417-
if res.is_none() {
418-
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
419-
}
420-
res
421-
})
422-
}),
423-
)))
421+
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
422+
let mut builder = LargeStringBuilder::new();
423+
for x in iterator {
424+
let Some(x) = x else {
425+
builder.append_null(); // no statistics value
426+
continue;
427+
};
428+
429+
let Ok(x) = std::str::from_utf8(x) else {
430+
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
431+
builder.append_null();
432+
continue;
433+
};
434+
435+
builder.append_value(x);
436+
}
437+
Ok(Arc::new(builder.finish()))
438+
}
439+
DataType::FixedSizeBinary(size) => {
440+
let iterator = MaxFixedLenByteArrayStatsIterator::new($iterator);
441+
let mut builder = FixedSizeBinaryBuilder::new(*size);
442+
for x in iterator {
443+
let Some(x) = x else {
444+
builder.append_null(); // no statistics value
445+
continue;
446+
};
447+
448+
if x.len().try_into() != Ok(*size){
449+
log::debug!(
450+
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
451+
size,
452+
x.len(),
453+
);
454+
builder.append_null(); // no statistics value
455+
continue;
456+
}
457+
458+
let _ = builder.append_value(x);
459+
}
460+
Ok(Arc::new(builder.finish()))
424461
}
425-
DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from(
426-
[<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| {
427-
x.and_then(|x| {
428-
if x.len().try_into() == Ok(*size) {
429-
Some(x)
430-
} else {
431-
log::debug!(
432-
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
433-
size,
434-
x.len(),
435-
);
436-
None
437-
}
438-
})
439-
}).collect::<Vec<_>>(),
440-
))),
441462
DataType::Decimal128(precision, scale) => {
442463
let arr = Decimal128Array::from_iter(
443464
[<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
@@ -999,6 +1020,153 @@ fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
9991020
get_statistics!(Max, data_type, iterator)
10001021
}
10011022

1023+
// fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
1024+
// data_type: &DataType,
1025+
// iterator: I,
1026+
// ) -> Result<ArrayRef> {
1027+
// match data_type {
1028+
// DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(MaxBooleanStatsIterator::new(iterator).map(|x|x.copied()),))),
1029+
// DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
1030+
// x.and_then(|x|i8::try_from(*x).ok())
1031+
// }),))),
1032+
// DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
1033+
// x.and_then(|x|i16::try_from(*x).ok())
1034+
// }),))),
1035+
// DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))),
1036+
// DataType::Int64 => Ok(Arc::new(Int64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),))),
1037+
// DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
1038+
// x.and_then(|x|u8::try_from(*x).ok())
1039+
// }),))),
1040+
// DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|{
1041+
// x.and_then(|x|u16::try_from(*x).ok())
1042+
// }),))),
1043+
// DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x| *x as u32)),))),
1044+
// DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.map(|x| *x as u64)),))),
1045+
// DataType::Float16 => Ok(Arc::new(Float16Array::from_iter(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|x.and_then(|x|{
1046+
// from_bytes_to_f16(x)
1047+
// })),))),
1048+
// DataType::Float32 => Ok(Arc::new(Float32Array::from_iter(MaxFloatStatsIterator::new(iterator).map(|x|x.copied()),))),
1049+
// DataType::Float64 => Ok(Arc::new(Float64Array::from_iter(MaxDoubleStatsIterator::new(iterator).map(|x|x.copied()),))),
1050+
// DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),))),
1051+
// DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.map(|x|i64::from(*x)*24*60*60*1000)),))),
1052+
// DataType::Timestamp(unit,timezone) => {
1053+
// let iter = MaxInt64StatsIterator::new(iterator).map(|x|x.copied());
1054+
// Ok(match unit {
1055+
// TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1056+
// TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1057+
// TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1058+
// TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())),
1059+
// })
1060+
// },
1061+
// DataType::Time32(unit) => {
1062+
// Ok(match unit {
1063+
// TimeUnit::Second => Arc::new(Time32SecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)),
1064+
// TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter(MaxInt32StatsIterator::new(iterator).map(|x|x.copied()),)),
1065+
// _ => {
1066+
// let len = iterator.count();
1067+
// new_null_array(data_type,len)
1068+
// }
1069+
// })
1070+
// },
1071+
// DataType::Time64(unit) => {
1072+
// Ok(match unit {
1073+
// TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)),
1074+
// TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter(MaxInt64StatsIterator::new(iterator).map(|x|x.copied()),)),
1075+
// _ => {
1076+
// let len = iterator.count();
1077+
// new_null_array(data_type,len)
1078+
// }
1079+
// })
1080+
// },
1081+
// DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))),
1082+
// DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(MaxByteArrayStatsIterator::new(iterator)))),
1083+
// DataType::Utf8 => {
1084+
// let iterator = MaxByteArrayStatsIterator::new(iterator);
1085+
// let mut builder = StringBuilder::new();
1086+
// for x in iterator {
1087+
// let Some(x) = x else {
1088+
// builder.append_null(); // no statistics value
1089+
// continue;
1090+
// };
1091+
1092+
// let Ok(x) = std::str::from_utf8(x) else {
1093+
// log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
1094+
// builder.append_null();
1095+
// continue;
1096+
// };
1097+
1098+
// builder.append_value(x);
1099+
// }
1100+
// Ok(Arc::new(builder.finish()))
1101+
// },
1102+
// DataType::LargeUtf8 => {
1103+
// Ok(Arc::new(LargeStringArray::from_iter(MaxByteArrayStatsIterator::new(iterator).map(|x|{
1104+
// x.and_then(|x|{
1105+
// let res = std::str::from_utf8(x).map(|s|s.to_string()).ok();
1106+
// if res.is_none() {
1107+
// log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
1108+
// }
1109+
// res
1110+
// })
1111+
// }),)))
1112+
// }
1113+
// DataType::FixedSizeBinary(size) => {
1114+
// let iterator = MaxFixedLenByteArrayStatsIterator::new(iterator);
1115+
// let mut builder = FixedSizeBinaryBuilder::new(size);
1116+
// for x in iterator {
1117+
// let Some(x) = x else {
1118+
// builder.append_null(); // no statistics value
1119+
// continue;
1120+
// };
1121+
1122+
// if x.len().try_into() != Ok(*size){
1123+
// log::debug!(
1124+
// "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
1125+
// size,
1126+
// x.len(),
1127+
// );
1128+
// builder.append_null(); // no statistics value
1129+
// continue;
1130+
// }
1131+
1132+
// builder.append_value(x);
1133+
// }
1134+
// Ok(Arc::new(builder.finish()))
1135+
// }
1136+
1137+
// // Ok(Arc::new(FixedSizeBinaryArray::from(MaxFixedLenByteArrayStatsIterator::new(iterator).map(|x|{
1138+
// // x.and_then(|x|{
1139+
// // if x.len().try_into()==Ok(*size){
1140+
// // Some(x)
1141+
// // }else {
1142+
// log::debug!(
1143+
// "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
1144+
// size,
1145+
// x.len(),
1146+
// );
1147+
// // None
1148+
// // }
1149+
// // })
1150+
// // }).collect::<Vec<_>>(),))),
1151+
// DataType::Decimal128(precision,scale) => {
1152+
// let arr = Decimal128Array::from_iter(MaxDecimal128StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?;
1153+
// Ok(Arc::new(arr))
1154+
// },
1155+
// DataType::Decimal256(precision,scale) => {
1156+
// let arr = Decimal256Array::from_iter(MaxDecimal256StatsIterator::new(iterator)).with_precision_and_scale(*precision, *scale)?;
1157+
// Ok(Arc::new(arr))
1158+
// },
1159+
// DataType::Dictionary(_,value_type) => {
1160+
// max_statistics(value_type,iterator)
1161+
// }
1162+
// DataType::Map(_,_)|DataType::Duration(_)|DataType::Interval(_)|DataType::Null|DataType::BinaryView|DataType::Utf8View|DataType::List(_)|DataType::ListView(_)|DataType::FixedSizeList(_,_)|DataType::LargeList(_)|DataType::LargeListView(_)|DataType::Struct(_)|DataType::Union(_,_)|DataType::RunEndEncoded(_,_) => {
1163+
// let len = iterator.count();
1164+
// Ok(new_null_array(data_type,len))
1165+
// }
1166+
// }
1167+
// }
1168+
1169+
10021170
/// Extracts the min statistics from an iterator
10031171
/// of parquet page [`Index`]'es to an [`ArrayRef`]
10041172
pub(crate) fn min_page_statistics<'a, I>(

0 commit comments

Comments
 (0)