Skip to content

Commit 64b8eea

Browse files
authored
feat: Add support for Binary/LargeBinary/Utf8/LargeUtf8 data types in data page statistics (#11136)
* *: for binary Signed-off-by: Chojan Shang <[email protected]> * *: for large binary Signed-off-by: Chojan Shang <[email protected]> * feat: add utf8 and largeutf8 Signed-off-by: Chojan Shang <[email protected]> --------- Signed-off-by: Chojan Shang <[email protected]>
1 parent 4d16655 commit 64b8eea

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

datafusion/core/src/datasource/physical_plan/parquet/statistics.rs

+46-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use arrow_array::{
3333
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
3434
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
3535
use half::f16;
36-
use parquet::data_type::FixedLenByteArray;
36+
use parquet::data_type::{ByteArray, FixedLenByteArray};
3737
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
3838
use parquet::file::page_index::index::{Index, PageIndex};
3939
use parquet::file::statistics::Statistics as ParquetStatistics;
@@ -600,6 +600,18 @@ make_data_page_stats_iterator!(
600600
Index::DOUBLE,
601601
f64
602602
);
603+
make_data_page_stats_iterator!(
604+
MinByteArrayDataPageStatsIterator,
605+
|x: &PageIndex<ByteArray>| { x.min.clone() },
606+
Index::BYTE_ARRAY,
607+
ByteArray
608+
);
609+
make_data_page_stats_iterator!(
610+
MaxByteArrayDataPageStatsIterator,
611+
|x: &PageIndex<ByteArray>| { x.max.clone() },
612+
Index::BYTE_ARRAY,
613+
ByteArray
614+
);
603615
macro_rules! get_data_page_statistics {
604616
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
605617
paste! {
@@ -692,6 +704,34 @@ macro_rules! get_data_page_statistics {
692704
)),
693705
Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
694706
Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
707+
Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
708+
Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
709+
Some(DataType::Utf8) => Ok(Arc::new(StringArray::from(
710+
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
711+
x.into_iter().filter_map(|x| {
712+
x.and_then(|x| {
713+
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
714+
if res.is_none() {
715+
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
716+
}
717+
res
718+
})
719+
})
720+
}).flatten().collect::<Vec<_>>(),
721+
))),
722+
Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from(
723+
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
724+
x.into_iter().filter_map(|x| {
725+
x.and_then(|x| {
726+
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
727+
if res.is_none() {
728+
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
729+
}
730+
res
731+
})
732+
})
733+
}).flatten().collect::<Vec<_>>(),
734+
))),
695735
Some(DataType::Timestamp(unit, timezone)) => {
696736
let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten();
697737
Ok(match unit {
@@ -831,6 +871,11 @@ where
831871
.iter()
832872
.map(|x| x.null_count.map(|x| x as u64))
833873
.collect::<Vec<_>>(),
874+
Index::BYTE_ARRAY(native_index) => native_index
875+
.indexes
876+
.iter()
877+
.map(|x| x.null_count.map(|x| x as u64))
878+
.collect::<Vec<_>>(),
834879
_ => unimplemented!(),
835880
});
836881

datafusion/core/tests/parquet/arrow_statistics.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -1801,7 +1801,7 @@ async fn test_byte() {
18011801
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
18021802
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
18031803
column_name: "name",
1804-
check: Check::RowGroup,
1804+
check: Check::Both,
18051805
}
18061806
.run();
18071807

@@ -1821,7 +1821,7 @@ async fn test_byte() {
18211821
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
18221822
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
18231823
column_name: "service_string",
1824-
check: Check::RowGroup,
1824+
check: Check::Both,
18251825
}
18261826
.run();
18271827

@@ -1840,7 +1840,7 @@ async fn test_byte() {
18401840
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
18411841
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
18421842
column_name: "service_binary",
1843-
check: Check::RowGroup,
1843+
check: Check::Both,
18441844
}
18451845
.run();
18461846

@@ -1882,7 +1882,7 @@ async fn test_byte() {
18821882
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
18831883
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
18841884
column_name: "service_large_binary",
1885-
check: Check::RowGroup,
1885+
check: Check::Both,
18861886
}
18871887
.run();
18881888
}
@@ -2003,7 +2003,7 @@ async fn test_utf8() {
20032003
expected_null_counts: UInt64Array::from(vec![1, 0]),
20042004
expected_row_counts: Some(UInt64Array::from(vec![5, 5])),
20052005
column_name: "utf8",
2006-
check: Check::RowGroup,
2006+
check: Check::Both,
20072007
}
20082008
.run();
20092009

@@ -2015,7 +2015,7 @@ async fn test_utf8() {
20152015
expected_null_counts: UInt64Array::from(vec![1, 0]),
20162016
expected_row_counts: Some(UInt64Array::from(vec![5, 5])),
20172017
column_name: "large_utf8",
2018-
check: Check::RowGroup,
2018+
check: Check::Both,
20192019
}
20202020
.run();
20212021
}

0 commit comments

Comments
 (0)