Skip to content

feat: Add support for Binary/LargeBinary/Utf8/LargeUtf8 data types in data page statistics #11136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow_array::{
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
use half::f16;
use parquet::data_type::FixedLenByteArray;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
use parquet::file::page_index::index::{Index, PageIndex};
use parquet::file::statistics::Statistics as ParquetStatistics;
Expand Down Expand Up @@ -600,6 +600,18 @@ make_data_page_stats_iterator!(
Index::DOUBLE,
f64
);
make_data_page_stats_iterator!(
MinByteArrayDataPageStatsIterator,
|x: &PageIndex<ByteArray>| { x.min.clone() },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time digging in the code and I found that this clone is cloning a Bytes (aka it is ref counted, and is not actually copying the bytes around)

Index::BYTE_ARRAY,
ByteArray
);
make_data_page_stats_iterator!(
MaxByteArrayDataPageStatsIterator,
|x: &PageIndex<ByteArray>| { x.max.clone() },
Index::BYTE_ARRAY,
ByteArray
);
macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
Expand Down Expand Up @@ -692,6 +704,34 @@ macro_rules! get_data_page_statistics {
)),
Some(DataType::Float32) => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Utf8) => Ok(Arc::new(StringArray::from(
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
x.into_iter().filter_map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with this code and was able to avoid the to_string() here but the code was more involved:

                Some(DataType::Utf8) => {
                    let mut builder = StringBuilder::new();
                    let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
                    for x in iterator {
                        for x in x.into_iter() {
                            let Some(x) = x else {
                                builder.append_null(); // no statistics value
                                continue;
                            };

                            let Ok(x) = std::str::from_utf8(x.data()) else {
                                log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
                                builder.append_null();
                                continue;
                            };

                            builder.append_value(x);
                        }
                    }
                    Ok(Arc::new(builder.finish()))
                },

I think this is a good one for now until we have a benchmark to optimize (aka #10934)

if res.is_none() {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
})
}).flatten().collect::<Vec<_>>(),
))),
Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from(
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
x.into_iter().filter_map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
if res.is_none() {
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
})
}).flatten().collect::<Vec<_>>(),
))),
Some(DataType::Timestamp(unit, timezone)) => {
let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten();
Ok(match unit {
Expand Down Expand Up @@ -817,6 +857,11 @@ where
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::BYTE_ARRAY(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
_ => unimplemented!(),
});

Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1801,7 +1801,7 @@ async fn test_byte() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
column_name: "name",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -1821,7 +1821,7 @@ async fn test_byte() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
column_name: "service_string",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -1840,7 +1840,7 @@ async fn test_byte() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
column_name: "service_binary",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand Down Expand Up @@ -1882,7 +1882,7 @@ async fn test_byte() {
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
column_name: "service_large_binary",
check: Check::RowGroup,
check: Check::Both,
}
.run();
}
Expand Down Expand Up @@ -2003,7 +2003,7 @@ async fn test_utf8() {
expected_null_counts: UInt64Array::from(vec![1, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5])),
column_name: "utf8",
check: Check::RowGroup,
check: Check::Both,
}
.run();

Expand All @@ -2015,7 +2015,7 @@ async fn test_utf8() {
expected_null_counts: UInt64Array::from(vec![1, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5])),
column_name: "large_utf8",
check: Check::RowGroup,
check: Check::Both,
}
.run();
}
Expand Down