diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 44bacbdae147..efb848b80eec 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -33,7 +33,7 @@ use arrow_array::{ use arrow_schema::{Field, FieldRef, Schema, TimeUnit}; use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result}; use half::f16; -use parquet::data_type::FixedLenByteArray; +use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; use parquet::file::page_index::index::{Index, PageIndex}; use parquet::file::statistics::Statistics as ParquetStatistics; @@ -600,6 +600,18 @@ make_data_page_stats_iterator!( Index::DOUBLE, f64 ); +make_data_page_stats_iterator!( + MinByteArrayDataPageStatsIterator, + |x: &PageIndex| { x.min.clone() }, + Index::BYTE_ARRAY, + ByteArray +); +make_data_page_stats_iterator!( + MaxByteArrayDataPageStatsIterator, + |x: &PageIndex| { x.max.clone() }, + Index::BYTE_ARRAY, + ByteArray +); macro_rules! get_data_page_statistics { ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { paste! { @@ -692,6 +704,34 @@ macro_rules! get_data_page_statistics { )), Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Utf8) => Ok(Arc::new(StringArray::from( + [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { + x.into_iter().filter_map(|x| { + x.and_then(|x| { + let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); + if res.is_none() { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + } + res + }) + }) + }).flatten().collect::>(), + ))), + Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from( + [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { + x.into_iter().filter_map(|x| { + x.and_then(|x| { + let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); + if res.is_none() { + log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); + } + res + }) + }) + }).flatten().collect::>(), + ))), Some(DataType::Timestamp(unit, timezone)) => { let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(); Ok(match unit { @@ -817,6 +857,11 @@ where .iter() .map(|x| x.null_count.map(|x| x as u64)) .collect::>(), + Index::BYTE_ARRAY(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::>(), _ => unimplemented!(), }); diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 596015d581e2..630ac152299f 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -1801,7 +1801,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "name", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -1821,7 +1821,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "service_string", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -1840,7 +1840,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "service_binary", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -1882,7 +1882,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "service_large_binary", - check: Check::RowGroup, + check: Check::Both, } .run(); } @@ -2003,7 +2003,7 @@ async fn test_utf8() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5])), column_name: "utf8", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -2015,7 +2015,7 @@ async fn test_utf8() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5])), column_name: "large_utf8", - check: Check::RowGroup, + check: Check::Both, } .run(); }