Skip to content

Commit a5b25d0

Browse files
Rachelintfindepi
authored andcommitted
Improve stats convert performance for Binary/String/Boolean arrays (apache#11319)
* add u64 poc. * use env to support the quick bench. * use flatten in builder mode yet. * add new mode. * use Builder in Utf8 and LargeUtf8 page level stats' convert. * use Builder in row group level stats. * eliminate some unnecessary tmp `Vec`s. * remove the quick modify in bench. * process the result return from append_value of FixedSizeBinaryBuilder. * remove the modification of FixedSizeBinary&Bool, and fix the String case. * fix and re-enable the modification of FixedSizeBinary. * fix comments. * use BooleanBuilder to eliminate the collect in BooleanArray case. * fix compile.
1 parent c527c3e commit a5b25d0

File tree

1 file changed

+130
-85
lines changed
  • datafusion/core/src/datasource/physical_plan/parquet

1 file changed

+130
-85
lines changed

datafusion/core/src/datasource/physical_plan/parquet/statistics.rs

Lines changed: 130 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@
1919
2020
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328
2121

22-
use arrow::array::builder::FixedSizeBinaryBuilder;
22+
use arrow::array::{
23+
BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
24+
};
2325
use arrow::datatypes::i256;
2426
use arrow::{array::ArrayRef, datatypes::DataType};
2527
use arrow_array::{
2628
new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array,
27-
Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array,
28-
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
29-
LargeStringArray, StringArray, Time32MillisecondArray, Time32SecondArray,
30-
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
31-
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
32-
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
29+
Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array,
30+
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
31+
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
32+
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
33+
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
34+
UInt64Array, UInt8Array,
3335
};
3436
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
3537
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
@@ -393,51 +395,73 @@ macro_rules! get_statistics {
393395
})
394396
},
395397
DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
396-
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x| x.to_vec())),
398+
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
397399
))),
398400
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
399-
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())),
400-
))),
401-
DataType::Utf8 => Ok(Arc::new(StringArray::from_iter(
402-
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
403-
x.and_then(|x| {
404-
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
405-
if res.is_none() {
406-
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
407-
}
408-
res
409-
})
410-
}),
401+
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
411402
))),
403+
DataType::Utf8 => {
404+
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
405+
let mut builder = StringBuilder::new();
406+
for x in iterator {
407+
let Some(x) = x else {
408+
builder.append_null(); // no statistics value
409+
continue;
410+
};
411+
412+
let Ok(x) = std::str::from_utf8(x) else {
413+
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
414+
builder.append_null();
415+
continue;
416+
};
417+
418+
builder.append_value(x);
419+
}
420+
Ok(Arc::new(builder.finish()))
421+
},
412422
DataType::LargeUtf8 => {
413-
Ok(Arc::new(LargeStringArray::from_iter(
414-
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
415-
x.and_then(|x| {
416-
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
417-
if res.is_none() {
418-
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
419-
}
420-
res
421-
})
422-
}),
423-
)))
424-
}
425-
DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from(
426-
[<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| {
427-
x.and_then(|x| {
428-
if x.len().try_into() == Ok(*size) {
429-
Some(x)
430-
} else {
431-
log::debug!(
432-
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
433-
size,
434-
x.len(),
435-
);
436-
None
437-
}
438-
})
439-
}).collect::<Vec<_>>(),
440-
))),
423+
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
424+
let mut builder = LargeStringBuilder::new();
425+
for x in iterator {
426+
let Some(x) = x else {
427+
builder.append_null(); // no statistics value
428+
continue;
429+
};
430+
431+
let Ok(x) = std::str::from_utf8(x) else {
432+
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
433+
builder.append_null();
434+
continue;
435+
};
436+
437+
builder.append_value(x);
438+
}
439+
Ok(Arc::new(builder.finish()))
440+
},
441+
DataType::FixedSizeBinary(size) => {
442+
let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
443+
let mut builder = FixedSizeBinaryBuilder::new(*size);
444+
for x in iterator {
445+
let Some(x) = x else {
446+
builder.append_null(); // no statistics value
447+
continue;
448+
};
449+
450+
// ignore invalid values
451+
if x.len().try_into() != Ok(*size){
452+
log::debug!(
453+
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
454+
size,
455+
x.len(),
456+
);
457+
builder.append_null();
458+
continue;
459+
}
460+
461+
builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
462+
}
463+
Ok(Arc::new(builder.finish()))
464+
},
441465
DataType::Decimal128(precision, scale) => {
442466
let arr = Decimal128Array::from_iter(
443467
[<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
@@ -740,15 +764,20 @@ macro_rules! get_data_page_statistics {
740764
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
741765
paste! {
742766
match $data_type {
743-
Some(DataType::Boolean) => Ok(Arc::new(
744-
BooleanArray::from_iter(
745-
[<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator)
746-
.flatten()
747-
// BooleanArray::from_iter required a sized iterator, so collect into Vec first
748-
.collect::<Vec<_>>()
749-
.into_iter()
750-
)
751-
)),
767+
Some(DataType::Boolean) => {
768+
let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator);
769+
let mut builder = BooleanBuilder::new();
770+
for x in iterator {
771+
for x in x.into_iter() {
772+
let Some(x) = x else {
773+
builder.append_null(); // no statistics value
774+
continue;
775+
};
776+
builder.append_value(x);
777+
}
778+
}
779+
Ok(Arc::new(builder.finish()))
780+
},
752781
Some(DataType::UInt8) => Ok(Arc::new(
753782
UInt8Array::from_iter(
754783
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
@@ -830,32 +859,48 @@ macro_rules! get_data_page_statistics {
830859
Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
831860
Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
832861
Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
833-
Some(DataType::Utf8) => Ok(Arc::new(StringArray::from(
834-
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
835-
x.into_iter().map(|x| {
836-
x.and_then(|x| {
837-
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
838-
if res.is_none() {
862+
Some(DataType::Utf8) => {
863+
let mut builder = StringBuilder::new();
864+
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
865+
for x in iterator {
866+
for x in x.into_iter() {
867+
let Some(x) = x else {
868+
builder.append_null(); // no statistics value
869+
continue;
870+
};
871+
872+
let Ok(x) = std::str::from_utf8(x.data()) else {
839873
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
840-
}
841-
res
842-
})
843-
})
844-
}).flatten().collect::<Vec<_>>(),
845-
))),
846-
Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from(
847-
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
848-
x.into_iter().map(|x| {
849-
x.and_then(|x| {
850-
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
851-
if res.is_none() {
852-
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
853-
}
854-
res
855-
})
856-
})
857-
}).flatten().collect::<Vec<_>>(),
858-
))),
874+
builder.append_null();
875+
continue;
876+
};
877+
878+
builder.append_value(x);
879+
}
880+
}
881+
Ok(Arc::new(builder.finish()))
882+
},
883+
Some(DataType::LargeUtf8) => {
884+
let mut builder = LargeStringBuilder::new();
885+
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
886+
for x in iterator {
887+
for x in x.into_iter() {
888+
let Some(x) = x else {
889+
builder.append_null(); // no statistics value
890+
continue;
891+
};
892+
893+
let Ok(x) = std::str::from_utf8(x.data()) else {
894+
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
895+
builder.append_null();
896+
continue;
897+
};
898+
899+
builder.append_value(x);
900+
}
901+
}
902+
Ok(Arc::new(builder.finish()))
903+
},
859904
Some(DataType::Dictionary(_, value_type)) => {
860905
[<$stat_type_prefix:lower _ page_statistics>](Some(value_type), $iterator)
861906
},
@@ -871,14 +916,14 @@ macro_rules! get_data_page_statistics {
871916
Some(DataType::Date32) => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
872917
Some(DataType::Date64) => Ok(
873918
Arc::new(
874-
Date64Array::from([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
919+
Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
875920
.map(|x| {
876921
x.into_iter()
877922
.map(|x| {
878923
x.and_then(|x| i64::try_from(x).ok())
879-
.map(|x| x * 24 * 60 * 60 * 1000)
880924
})
881-
}).flatten().collect::<Vec<_>>()
925+
.map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
926+
}).flatten()
882927
)
883928
)
884929
),

0 commit comments

Comments
 (0)