From 885e1d1e5157a16e2d36b209d9a7863b7eb5bbe0 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Wed, 26 Jun 2024 11:52:07 +0800 Subject: [PATCH 1/3] *: for binary Signed-off-by: Chojan Shang --- .../physical_plan/parquet/statistics.rs | 20 ++++++++++++++++++- .../core/tests/parquet/arrow_statistics.rs | 2 +- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 17bfe72dbd65..e82dce57eccc 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -33,7 +33,7 @@ use arrow_array::{ use arrow_schema::{Field, FieldRef, Schema, TimeUnit}; use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result}; use half::f16; -use parquet::data_type::FixedLenByteArray; +use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; use parquet::file::page_index::index::{Index, PageIndex}; use parquet::file::statistics::Statistics as ParquetStatistics; @@ -621,6 +621,18 @@ make_data_page_stats_iterator!( Index::DOUBLE, f64 ); +make_data_page_stats_iterator!( + MinByteArrayDataPageStatsIterator, + |x: &PageIndex| { x.min.clone() }, + Index::BYTE_ARRAY, + ByteArray +); +make_data_page_stats_iterator!( + MaxByteArrayDataPageStatsIterator, + |x: &PageIndex| { x.max.clone() }, + Index::BYTE_ARRAY, + ByteArray +); macro_rules! get_data_page_statistics { ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { paste! { @@ -713,6 +725,7 @@ macro_rules! get_data_page_statistics { )), Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), _ => unimplemented!() } } @@ -829,6 +842,11 @@ where .iter() .map(|x| x.null_count.map(|x| x as u64)) .collect::>(), + Index::BYTE_ARRAY(native_index) => native_index + .indexes + .iter() + .map(|x| x.null_count.map(|x| x as u64)) + .collect::>(), _ => unimplemented!(), }); diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index cd6985b311c3..5fbaf4f941c2 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -1840,7 +1840,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "service_binary", - check: Check::RowGroup, + check: Check::Both, } .run(); From 8f95ad6c2ff0ad00d8d324c39c7344d4e5612623 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Wed, 26 Jun 2024 12:03:44 +0800 Subject: [PATCH 2/3] *: for large binary Signed-off-by: Chojan Shang --- .../core/src/datasource/physical_plan/parquet/statistics.rs | 1 + datafusion/core/tests/parquet/arrow_statistics.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index e82dce57eccc..9c674c350cc4 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -726,6 +726,7 @@ macro_rules! get_data_page_statistics { Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), _ => unimplemented!() } } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 5fbaf4f941c2..271225236a48 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -1882,7 +1882,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "service_large_binary", - check: Check::RowGroup, + check: Check::Both, } .run(); } From 9cd6c935022d98a3a78650cbd5fc3df6dd62ab68 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Thu, 27 Jun 2024 04:29:11 +0800 Subject: [PATCH 3/3] feat: add utf8 and largeutf8 Signed-off-by: Chojan Shang --- .../physical_plan/parquet/statistics.rs | 26 +++++++++++++++++++ .../core/tests/parquet/arrow_statistics.rs | 8 +++--- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 9c674c350cc4..b55b91f50234 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -727,6 +727,32 @@ macro_rules! get_data_page_statistics { Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), + Some(DataType::Utf8) => Ok(Arc::new(StringArray::from( + [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { + x.into_iter().filter_map(|x| { + x.and_then(|x| { + let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); + if res.is_none() { + log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); + } + res + }) + }) + }).flatten().collect::>(), + ))), + Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from( + [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| { + x.into_iter().filter_map(|x| { + x.and_then(|x| { + let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok(); + if res.is_none() { + log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); + } + res + }) + }) + }).flatten().collect::>(), + ))), _ => unimplemented!() } } diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 271225236a48..920c54933603 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -1801,7 +1801,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "name", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -1821,7 +1821,7 @@ async fn test_byte() { expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "service_string", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -2003,7 +2003,7 @@ async fn test_utf8() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5])), column_name: "utf8", - check: Check::RowGroup, + check: Check::Both, } .run(); @@ -2015,7 +2015,7 @@ async fn test_utf8() { expected_null_counts: UInt64Array::from(vec![1, 0]), expected_row_counts: Some(UInt64Array::from(vec![5, 5])), column_name: "large_utf8", - check: Check::RowGroup, + check: Check::Both, } .run(); }