Skip to content

Commit c5a8d4b

Browse files
committed
Fix StructArrayReader handling nested lists (#1651)
1 parent dd16ec9 commit c5a8d4b

File tree

4 files changed

+127
-75
lines changed

4 files changed

+127
-75
lines changed

parquet/examples/test.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
use std::fs::File;
2+
use std::sync::Arc;
3+
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
4+
use parquet::file::reader::SerializedFileReader;
5+
6+
fn main() {
7+
let file = File::open("/home/raphael/Downloads/part-00000-f6337bce-7fcd-4021-9f9d-040413ea83f8-c000.snappy.parquet").unwrap();
8+
let reader = SerializedFileReader::new(file).unwrap();
9+
let mut arrow = ParquetFileArrowReader::new(Arc::new(reader));
10+
let reader = arrow.get_record_reader(1024).unwrap();
11+
for batch in reader {
12+
let batch = batch.unwrap();
13+
println!("{:#?}", batch);
14+
}
15+
16+
}

parquet/src/arrow/array_reader.rs

Lines changed: 103 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,17 @@
1616
// under the License.
1717

1818
use std::any::Any;
19-
use std::cmp::{max, min};
19+
use std::cmp::max;
2020
use std::marker::PhantomData;
21-
use std::mem::size_of;
2221
use std::result::Result::Ok;
2322
use std::sync::Arc;
2423
use std::vec::Vec;
2524

2625
use arrow::array::{
2726
Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder,
28-
DecimalArray, Int16BufferBuilder, Int32Array, Int64Array, PrimitiveArray,
29-
StructArray,
27+
DecimalArray, Int32Array, Int64Array, PrimitiveArray, StructArray,
3028
};
31-
use arrow::buffer::{Buffer, MutableBuffer};
29+
use arrow::buffer::Buffer;
3230
use arrow::datatypes::{
3331
ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType,
3432
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
@@ -655,8 +653,7 @@ pub struct StructArrayReader {
655653
data_type: ArrowType,
656654
struct_def_level: i16,
657655
struct_rep_level: i16,
658-
def_level_buffer: Option<Buffer>,
659-
rep_level_buffer: Option<Buffer>,
656+
nullable: bool,
660657
}
661658

662659
impl StructArrayReader {
@@ -666,14 +663,14 @@ impl StructArrayReader {
666663
children: Vec<Box<dyn ArrayReader>>,
667664
def_level: i16,
668665
rep_level: i16,
666+
nullable: bool,
669667
) -> Self {
670668
Self {
671669
data_type,
672670
children,
673671
struct_def_level: def_level,
674672
struct_rep_level: rep_level,
675-
def_level_buffer: None,
676-
rep_level_buffer: None,
673+
nullable,
677674
}
678675
}
679676
}
@@ -708,8 +705,6 @@ impl ArrayReader for StructArrayReader {
708705
/// ```
709706
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
710707
if self.children.is_empty() {
711-
self.def_level_buffer = None;
712-
self.rep_level_buffer = None;
713708
return Ok(Arc::new(StructArray::from(Vec::new())));
714709
}
715710

@@ -742,80 +737,59 @@ impl ArrayReader for StructArrayReader {
742737
.collect::<Vec<ArrayData>>(),
743738
);
744739

745-
if self.struct_def_level != 0 {
740+
if self.nullable {
746741
// calculate struct def level data
747-
let buffer_size = children_array_len * size_of::<i16>();
748-
let mut def_level_data_buffer = MutableBuffer::new(buffer_size);
749-
def_level_data_buffer.resize(buffer_size, 0);
750742

751-
// Safety: the buffer is always treated as `u16` in the code below
752-
let def_level_data = unsafe { def_level_data_buffer.typed_data_mut() };
743+
// children should have consistent view of parent, only need to inspect first child
744+
let def_levels = self.children[0]
745+
.get_def_levels()
746+
.expect("child with nullable parents must have definition level");
753747

754-
def_level_data
755-
.iter_mut()
756-
.for_each(|v| *v = self.struct_def_level);
748+
// calculate bitmap for current array
749+
let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len);
757750

758-
for child in &self.children {
759-
if let Some(current_child_def_levels) = child.get_def_levels() {
760-
if current_child_def_levels.len() != children_array_len {
761-
return Err(general_err!("Child array length are not equal!"));
762-
} else {
763-
for i in 0..children_array_len {
764-
def_level_data[i] =
765-
min(def_level_data[i], current_child_def_levels[i]);
751+
match self.children[0].get_rep_levels() {
752+
Some(rep_levels) => {
753+
// Sanity check
754+
assert_eq!(rep_levels.len(), def_levels.len());
755+
756+
for (rep_level, def_level) in rep_levels.iter().zip(def_levels) {
757+
if rep_level > &self.struct_rep_level {
758+
// Already handled by inner list - SKIP
759+
continue;
766760
}
761+
bitmap_builder.append(*def_level >= self.struct_def_level)
762+
}
763+
}
764+
None => {
765+
for def_level in def_levels {
766+
bitmap_builder.append(*def_level >= self.struct_def_level)
767767
}
768768
}
769769
}
770770

771-
// calculate bitmap for current array
772-
let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len);
773-
for def_level in def_level_data {
774-
let not_null = *def_level >= self.struct_def_level;
775-
bitmap_builder.append(not_null);
771+
if bitmap_builder.len() != children_array_len {
772+
return Err(general_err!("Failed to decode level data for struct array"));
776773
}
777774

778775
array_data_builder =
779776
array_data_builder.null_bit_buffer(bitmap_builder.finish());
780-
781-
self.def_level_buffer = Some(def_level_data_buffer.into());
782777
}
783778

784779
let array_data = unsafe { array_data_builder.build_unchecked() };
785-
786-
if self.struct_rep_level != 0 {
787-
// calculate struct rep level data, since struct doesn't add to repetition
788-
// levels, here we just need to keep repetition levels of first array
789-
// TODO: Verify that all children array reader has same repetition levels
790-
let rep_level_data = self
791-
.children
792-
.first()
793-
.ok_or_else(|| {
794-
general_err!("Struct array reader should have at least one child!")
795-
})?
796-
.get_rep_levels()
797-
.map(|data| -> Result<Buffer> {
798-
let mut buffer = Int16BufferBuilder::new(children_array_len);
799-
buffer.append_slice(data);
800-
Ok(buffer.finish())
801-
})
802-
.transpose()?;
803-
804-
self.rep_level_buffer = rep_level_data;
805-
}
806780
Ok(Arc::new(StructArray::from(array_data)))
807781
}
808782

809783
fn get_def_levels(&self) -> Option<&[i16]> {
810-
self.def_level_buffer
811-
.as_ref()
812-
.map(|buf| unsafe { buf.typed_data() })
784+
// Children definition levels should describe the same
785+
// parent structure, so return first child's
786+
self.children.first().and_then(|l| l.get_def_levels())
813787
}
814788

815789
fn get_rep_levels(&self) -> Option<&[i16]> {
816-
self.rep_level_buffer
817-
.as_ref()
818-
.map(|buf| unsafe { buf.typed_data() })
790+
// Children definition levels should describe the same
791+
// parent structure, so return first child's
792+
self.children.first().and_then(|l| l.get_rep_levels())
819793
}
820794
}
821795

@@ -828,7 +802,9 @@ mod tests {
828802
use rand::{thread_rng, Rng};
829803

830804
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
831-
use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, StructArray};
805+
use arrow::array::{
806+
Array, ArrayRef, ListArray, PrimitiveArray, StringArray, StructArray,
807+
};
832808
use arrow::datatypes::{
833809
ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field,
834810
Int32Type as ArrowInt32, Int64Type as ArrowInt64,
@@ -1553,6 +1529,7 @@ mod tests {
15531529
vec![Box::new(array_reader_1), Box::new(array_reader_2)],
15541530
1,
15551531
1,
1532+
true,
15561533
);
15571534

15581535
let struct_array = struct_array_reader.next_batch(5).unwrap();
@@ -1566,12 +1543,74 @@ mod tests {
15661543
.collect::<Vec<bool>>()
15671544
);
15681545
assert_eq!(
1569-
Some(vec![0, 1, 1, 1, 1].as_slice()),
1546+
Some(vec![0, 1, 2, 3, 1].as_slice()),
15701547
struct_array_reader.get_def_levels()
15711548
);
15721549
assert_eq!(
15731550
Some(vec![0, 1, 1, 1, 1].as_slice()),
15741551
struct_array_reader.get_rep_levels()
15751552
);
15761553
}
1554+
1555+
#[test]
1556+
fn test_struct_array_reader_list() {
1557+
use arrow::datatypes::Int32Type;
1558+
// [
1559+
// {foo: [1, 2, null],
1560+
// {foo: []},
1561+
// {foo: null},
1562+
// null,
1563+
// ]
1564+
1565+
let expected_l =
1566+
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1567+
Some(vec![Some(1), Some(2), None]),
1568+
Some(vec![]),
1569+
None,
1570+
None,
1571+
]));
1572+
1573+
let nulls = Buffer::from([0b00000111]);
1574+
let struct_fields = vec![(
1575+
Field::new("foo", expected_l.data_type().clone(), true),
1576+
expected_l.clone() as ArrayRef,
1577+
)];
1578+
let expected = StructArray::from((struct_fields, nulls));
1579+
1580+
let array = Arc::new(Int32Array::from_iter(vec![
1581+
Some(1),
1582+
Some(2),
1583+
None,
1584+
None,
1585+
None,
1586+
None,
1587+
]));
1588+
let reader = InMemoryArrayReader::new(
1589+
ArrowType::Int32,
1590+
array,
1591+
Some(vec![4, 4, 3, 2, 1, 0]),
1592+
Some(vec![0, 1, 1, 0, 0, 0]),
1593+
);
1594+
1595+
let list_reader = ListArrayReader::<i32>::new(
1596+
Box::new(reader),
1597+
expected_l.data_type().clone(),
1598+
ArrowType::Int32,
1599+
3,
1600+
1,
1601+
true,
1602+
);
1603+
1604+
let mut struct_reader = StructArrayReader::new(
1605+
expected.data_type().clone(),
1606+
vec![Box::new(list_reader)],
1607+
1,
1608+
0,
1609+
true,
1610+
);
1611+
1612+
let actual = struct_reader.next_batch(1024).unwrap();
1613+
let actual = actual.as_any().downcast_ref::<StructArray>().unwrap();
1614+
assert_eq!(actual, &expected)
1615+
}
15771616
}

parquet/src/arrow/array_reader/builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ fn build_struct_reader(
326326
children_reader,
327327
field.def_level,
328328
field.rep_level,
329+
field.nullable,
329330
)) as _)
330331
}
331332

parquet/src/arrow/array_reader/map_array.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::arrow::array_reader::ArrayReader;
1919
use crate::errors::ParquetError::ArrowError;
20-
use crate::errors::{Result, ParquetError};
20+
use crate::errors::{ParquetError, Result};
2121
use arrow::array::{ArrayDataBuilder, ArrayRef, MapArray};
2222
use arrow::buffer::{Buffer, MutableBuffer};
2323
use arrow::datatypes::DataType as ArrowType;
@@ -33,8 +33,6 @@ pub struct MapArrayReader {
3333
data_type: ArrowType,
3434
map_def_level: i16,
3535
map_rep_level: i16,
36-
def_level_buffer: Option<Buffer>,
37-
rep_level_buffer: Option<Buffer>,
3836
}
3937

4038
impl MapArrayReader {
@@ -51,8 +49,6 @@ impl MapArrayReader {
5149
data_type,
5250
map_def_level: rep_level,
5351
map_rep_level: def_level,
54-
def_level_buffer: None,
55-
rep_level_buffer: None,
5652
}
5753
}
5854
}
@@ -154,15 +150,15 @@ impl ArrayReader for MapArrayReader {
154150
}
155151

156152
fn get_def_levels(&self) -> Option<&[i16]> {
157-
self.def_level_buffer
158-
.as_ref()
159-
.map(|buf| unsafe { buf.typed_data() })
153+
// Children definition levels should describe the same parent structure,
154+
// so return key_reader only
155+
self.key_reader.get_def_levels()
160156
}
161157

162158
fn get_rep_levels(&self) -> Option<&[i16]> {
163-
self.rep_level_buffer
164-
.as_ref()
165-
.map(|buf| unsafe { buf.typed_data() })
159+
// Children repetition levels should describe the same parent structure,
160+
// so return key_reader only
161+
self.key_reader.get_rep_levels()
166162
}
167163
}
168164

0 commit comments

Comments
 (0)