Skip to content

Commit 16d2557

Browse files
authored
fix: Ensure properly nested null masks for parquet reads (#692)
## What changes are proposed in this pull request? Starting in arrow-53.3, the parquet reader no longer computes NULL masks for non-nullable leaf columns -- even if they have nullable ancestors. This breaks row visitors, who rely on each leaf column to have a fully accurate NULL mask. The quick-fix solution is to manually fixup the null masks of every `RecordBatch` that comes from the parquet reader. Fixes #691 ## How was this change tested? New unit test that checks whether parquet reads produce properly nested NULL masks. The test also leverages (and verifies) the JSON parser, so we can reliably detect any unwelcome behavior changes to JSON parsing that might land in the future.
1 parent eedfd47 commit 16d2557

File tree

4 files changed

+187
-21
lines changed

4 files changed

+187
-21
lines changed

kernel/src/engine/arrow_data.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,13 @@ use tracing::debug;
1212

1313
use std::collections::{HashMap, HashSet};
1414

15-
/// ArrowEngineData holds an Arrow RecordBatch, implements `EngineData` so the kernel can extract from it.
15+
pub use crate::engine::arrow_utils::fix_nested_null_masks;
16+
17+
/// ArrowEngineData holds an Arrow `RecordBatch`, implements `EngineData` so the kernel can extract from it.
18+
///
19+
/// WARNING: Row visitors require that all leaf columns of the record batch have correctly computed
20+
/// NULL masks. The arrow parquet reader is known to produce incomplete NULL masks, for
21+
/// example. When in doubt, call [`fix_nested_null_masks`] first.
1622
pub struct ArrowEngineData {
1723
data: RecordBatch,
1824
}
@@ -43,6 +49,12 @@ impl From<RecordBatch> for ArrowEngineData {
4349
}
4450
}
4551

52+
impl From<StructArray> for ArrowEngineData {
53+
fn from(value: StructArray) -> Self {
54+
ArrowEngineData::new(value.into())
55+
}
56+
}
57+
4658
impl From<ArrowEngineData> for RecordBatch {
4759
fn from(value: ArrowEngineData) -> Self {
4860
value.data

kernel/src/engine/arrow_utils.rs

Lines changed: 168 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ use crate::{
1313
};
1414

1515
use arrow_array::{
16-
cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait,
17-
RecordBatch, StringArray, StructArray,
16+
cast::AsArray, make_array, new_null_array, Array as ArrowArray, GenericListArray,
17+
OffsetSizeTrait, RecordBatch, StringArray, StructArray,
1818
};
19+
use arrow_buffer::NullBuffer;
1920
use arrow_json::{LineDelimitedWriter, ReaderBuilder};
2021
use arrow_schema::{
2122
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields,
@@ -62,6 +63,21 @@ pub(crate) fn make_arrow_error(s: impl Into<String>) -> Error {
6263
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s.into())).with_backtrace()
6364
}
6465

66+
/// Applies post-processing to data read from parquet files. This includes `reorder_struct_array` to
67+
/// ensure schema compatibility, as well as `fix_nested_null_masks` to ensure that leaf columns have
68+
/// accurate null masks that row visitors rely on for correctness.
69+
pub(crate) fn fixup_parquet_read<T>(
70+
batch: RecordBatch,
71+
requested_ordering: &[ReorderIndex],
72+
) -> DeltaResult<T>
73+
where
74+
StructArray: Into<T>,
75+
{
76+
let data = reorder_struct_array(batch.into(), requested_ordering)?;
77+
let data = fix_nested_null_masks(data);
78+
Ok(data.into())
79+
}
80+
6581
/*
6682
* The code below implements proper pruning of columns when reading parquet, reordering of columns to
6783
* match the specified schema, and insertion of null columns if the requested schema includes a
@@ -609,6 +625,53 @@ fn reorder_list<O: OffsetSizeTrait>(
609625
}
610626
}
611627

628+
/// Use this function to recursively compute properly unioned null masks for all nested
629+
/// columns of a record batch, making it safe to project out and consume nested columns.
630+
///
631+
/// Arrow does not guarantee that the null masks associated with nested columns are accurate --
632+
/// instead, the reader must consult the union of logical null masks the column and all
633+
/// ancestors. The parquet reader stopped doing this automatically as of arrow-53.3, for example.
634+
pub fn fix_nested_null_masks(batch: StructArray) -> StructArray {
635+
compute_nested_null_masks(batch, None)
636+
}
637+
638+
/// Splits a StructArray into its parts, unions in the parent null mask, and uses the result to
639+
/// recursively update the children as well before putting everything back together.
640+
fn compute_nested_null_masks(sa: StructArray, parent_nulls: Option<&NullBuffer>) -> StructArray {
641+
let (fields, columns, nulls) = sa.into_parts();
642+
let nulls = NullBuffer::union(parent_nulls, nulls.as_ref());
643+
let columns = columns
644+
.into_iter()
645+
.map(|column| match column.as_struct_opt() {
646+
Some(sa) => Arc::new(compute_nested_null_masks(sa.clone(), nulls.as_ref())) as _,
647+
None => {
648+
let data = column.to_data();
649+
let nulls = NullBuffer::union(nulls.as_ref(), data.nulls());
650+
let builder = data.into_builder().nulls(nulls);
651+
// Use an unchecked build to avoid paying a redundant O(k) validation cost for a
652+
// `RecordBatch` with k leaf columns.
653+
//
654+
// SAFETY: The builder was constructed from an `ArrayData` we extracted from the
655+
// column. The change we make is the null buffer, via `NullBuffer::union` with input
656+
// null buffers that were _also_ extracted from the column and its parent. A union
657+
// can only _grow_ the set of NULL rows, so data validity is preserved. Even if the
658+
// `parent_nulls` somehow had a length mismatch --- which it never should, having
659+
// also been extracted from our grandparent --- the mismatch would have already
660+
// caused `NullBuffer::union` to panic.
661+
let data = unsafe { builder.build_unchecked() };
662+
make_array(data)
663+
}
664+
})
665+
.collect();
666+
667+
// Use an unchecked constructor to avoid paying O(n*k) a redundant null buffer validation cost
668+
// for a `RecordBatch` with n rows and k leaf columns.
669+
//
670+
// SAFETY: We are simply reassembling the input `StructArray` we previously broke apart, with
671+
// updated null buffers. See above for details about null buffer safety.
672+
unsafe { StructArray::new_unchecked(fields, columns, nulls) }
673+
}
674+
612675
/// Arrow lacks the functionality to json-parse a string column into a struct column -- even tho the
613676
/// JSON file reader does exactly the same thing. This function is a hack to work around that gap.
614677
pub(crate) fn parse_json(
@@ -1432,4 +1495,107 @@ mod tests {
14321495
);
14331496
Ok(())
14341497
}
1498+
1499+
#[test]
1500+
fn test_arrow_broken_nested_null_masks() {
1501+
use crate::engine::arrow_utils::fix_nested_null_masks;
1502+
use arrow::datatypes::{DataType, Field, Fields, Schema};
1503+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1504+
1505+
// Parse some JSON into a nested schema
1506+
let schema = Arc::new(Schema::new(vec![Field::new(
1507+
"outer",
1508+
DataType::Struct(Fields::from(vec![
1509+
Field::new(
1510+
"inner_nullable",
1511+
DataType::Struct(Fields::from(vec![
1512+
Field::new("leaf_non_null", DataType::Int32, false),
1513+
Field::new("leaf_nullable", DataType::Int32, true),
1514+
])),
1515+
true,
1516+
),
1517+
Field::new(
1518+
"inner_non_null",
1519+
DataType::Struct(Fields::from(vec![
1520+
Field::new("leaf_non_null", DataType::Int32, false),
1521+
Field::new("leaf_nullable", DataType::Int32, true),
1522+
])),
1523+
false,
1524+
),
1525+
])),
1526+
true,
1527+
)]));
1528+
let json_string = r#"
1529+
{ }
1530+
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 1 } } }
1531+
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 2, "leaf_nullable" : 3 } } }
1532+
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 4 }, "inner_nullable" : { "leaf_non_null" : 5 } } }
1533+
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 6 }, "inner_nullable" : { "leaf_non_null" : 7, "leaf_nullable": 8 } } }
1534+
"#;
1535+
let batch1 = arrow::json::ReaderBuilder::new(schema.clone())
1536+
.build(json_string.as_bytes())
1537+
.unwrap()
1538+
.next()
1539+
.unwrap()
1540+
.unwrap();
1541+
println!("Batch 1: {batch1:?}");
1542+
1543+
macro_rules! assert_nulls {
1544+
( $column: expr, $nulls: expr ) => {
1545+
assert_eq!($column.nulls().unwrap(), &NullBuffer::from(&$nulls[..]));
1546+
};
1547+
}
1548+
1549+
// If any of these tests ever fail, it means the arrow JSON reader started producing
1550+
// incomplete nested NULL masks. If that happens, we need to update all JSON reads to call
1551+
// `fix_nested_null_masks`.
1552+
let outer_1 = batch1.column(0).as_struct();
1553+
assert_nulls!(outer_1, [false, true, true, true, true]);
1554+
let inner_nullable_1 = outer_1.column(0).as_struct();
1555+
assert_nulls!(inner_nullable_1, [false, false, false, true, true]);
1556+
let nullable_leaf_non_null_1 = inner_nullable_1.column(0);
1557+
assert_nulls!(nullable_leaf_non_null_1, [false, false, false, true, true]);
1558+
let nullable_leaf_nullable_1 = inner_nullable_1.column(1);
1559+
assert_nulls!(nullable_leaf_nullable_1, [false, false, false, false, true]);
1560+
let inner_non_null_1 = outer_1.column(1).as_struct();
1561+
assert_nulls!(inner_non_null_1, [false, true, true, true, true]);
1562+
let non_null_leaf_non_null_1 = inner_non_null_1.column(0);
1563+
assert_nulls!(non_null_leaf_non_null_1, [false, true, true, true, true]);
1564+
let non_null_leaf_nullable_1 = inner_non_null_1.column(1);
1565+
assert_nulls!(non_null_leaf_nullable_1, [false, false, true, false, false]);
1566+
1567+
// Write the batch to a parquet file and read it back
1568+
let mut buffer = vec![];
1569+
let mut writer =
1570+
parquet::arrow::ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
1571+
writer.write(&batch1).unwrap();
1572+
writer.close().unwrap(); // writer must be closed to write footer
1573+
let batch2 = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(buffer))
1574+
.unwrap()
1575+
.build()
1576+
.unwrap()
1577+
.next()
1578+
.unwrap()
1579+
.unwrap();
1580+
println!("Batch 2 before: {batch2:?}");
1581+
1582+
// Starting from arrow-53.3, the parquet reader started returning broken nested NULL masks.
1583+
let batch2 = RecordBatch::from(fix_nested_null_masks(batch2.into()));
1584+
1585+
// Verify the data survived the round trip
1586+
let outer_2 = batch2.column(0).as_struct();
1587+
assert_eq!(outer_2, outer_1);
1588+
let inner_nullable_2 = outer_2.column(0).as_struct();
1589+
assert_eq!(inner_nullable_2, inner_nullable_1);
1590+
let nullable_leaf_non_null_2 = inner_nullable_2.column(0);
1591+
assert_eq!(nullable_leaf_non_null_2, nullable_leaf_non_null_1);
1592+
let nullable_leaf_nullable_2 = inner_nullable_2.column(1);
1593+
assert_eq!(nullable_leaf_nullable_2, nullable_leaf_nullable_1);
1594+
let inner_non_null_2 = outer_2.column(1).as_struct();
1595+
assert_eq!(inner_non_null_2, inner_non_null_1);
1596+
let non_null_leaf_non_null_2 = inner_non_null_2.column(0);
1597+
assert_eq!(non_null_leaf_non_null_2, non_null_leaf_non_null_1);
1598+
let non_null_leaf_nullable_2 = inner_non_null_2.column(1);
1599+
assert_eq!(non_null_leaf_nullable_2, non_null_leaf_nullable_1);
1600+
}
14351601
}

kernel/src/engine/default/parquet.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use uuid::Uuid;
1818

1919
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
2020
use crate::engine::arrow_data::ArrowEngineData;
21-
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
21+
use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices};
2222
use crate::engine::default::executor::TaskExecutor;
2323
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
2424
use crate::schema::SchemaRef;
@@ -281,12 +281,7 @@ impl FileOpener for ParquetOpener {
281281

282282
let stream = builder.with_batch_size(batch_size).build()?;
283283

284-
let stream = stream.map(move |rbr| {
285-
// re-order each batch if needed
286-
rbr.map_err(Error::Parquet).and_then(|rb| {
287-
reorder_struct_array(rb.into(), &requested_ordering).map(Into::into)
288-
})
289-
});
284+
let stream = stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering));
290285
Ok(stream.boxed())
291286
}))
292287
}
@@ -355,12 +350,7 @@ impl FileOpener for PresignedUrlOpener {
355350
let reader = builder.with_batch_size(batch_size).build()?;
356351

357352
let stream = futures::stream::iter(reader);
358-
let stream = stream.map(move |rbr| {
359-
// re-order each batch if needed
360-
rbr.map_err(Error::Arrow).and_then(|rb| {
361-
reorder_struct_array(rb.into(), &requested_ordering).map(Into::into)
362-
})
363-
});
353+
let stream = stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering));
364354
Ok(stream.boxed())
365355
}))
366356
}

kernel/src/engine/sync/parquet.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader
55

66
use super::read_files;
77
use crate::engine::arrow_data::ArrowEngineData;
8-
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
8+
use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices};
99
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
1010
use crate::schema::SchemaRef;
1111
use crate::{DeltaResult, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler};
@@ -28,10 +28,8 @@ fn try_create_from_parquet(
2828
if let Some(predicate) = predicate {
2929
builder = builder.with_row_group_filter(predicate.as_ref());
3030
}
31-
Ok(builder.build()?.map(move |data| {
32-
let reordered = reorder_struct_array(data?.into(), &requested_ordering)?;
33-
Ok(ArrowEngineData::new(reordered.into()))
34-
}))
31+
let stream = builder.build()?;
32+
Ok(stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering)))
3533
}
3634

3735
impl ParquetHandler for SyncParquetHandler {

0 commit comments

Comments
 (0)