Skip to content

Commit 5740774

Browse files
JonasDev1alamb
andauthored
Fix AvroReader: Add union resolving for nested struct arrays (#12686)
* Add union resolving for nested struct arrays * Add test * Change test * Reproduce index error * fmt --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 59130f4 commit 5740774

File tree

1 file changed

+88
-1
lines changed

1 file changed

+88
-1
lines changed

datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs

+88-1
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
573573
// extract list values, with non-lists converted to Value::Null
574574
let array_item_count = rows
575575
.iter()
576-
.map(|row| match row {
576+
.map(|row| match maybe_resolve_union(row) {
577577
Value::Array(values) => values.len(),
578578
_ => 1,
579579
})
@@ -1643,6 +1643,93 @@ mod test {
16431643
assert_batches_eq!(expected, &[batch]);
16441644
}
16451645

1646+
#[test]
1647+
fn test_avro_nullable_struct_array() {
1648+
let schema = apache_avro::Schema::parse_str(
1649+
r#"
1650+
{
1651+
"type": "record",
1652+
"name": "r1",
1653+
"fields": [
1654+
{
1655+
"name": "col1",
1656+
"type": [
1657+
"null",
1658+
{
1659+
"type": "array",
1660+
"items": {
1661+
"type": [
1662+
"null",
1663+
{
1664+
"type": "record",
1665+
"name": "Item",
1666+
"fields": [
1667+
{
1668+
"name": "id",
1669+
"type": "long"
1670+
}
1671+
]
1672+
}
1673+
]
1674+
}
1675+
}
1676+
],
1677+
"default": null
1678+
}
1679+
]
1680+
}"#,
1681+
)
1682+
.unwrap();
1683+
let jv1 = serde_json::json!({
1684+
"col1": [
1685+
{
1686+
"id": 234
1687+
},
1688+
{
1689+
"id": 345
1690+
}
1691+
]
1692+
});
1693+
let r1 = apache_avro::to_value(jv1)
1694+
.unwrap()
1695+
.resolve(&schema)
1696+
.unwrap();
1697+
let r2 = apache_avro::to_value(serde_json::json!({ "col1": null }))
1698+
.unwrap()
1699+
.resolve(&schema)
1700+
.unwrap();
1701+
1702+
let mut w = apache_avro::Writer::new(&schema, vec![]);
1703+
for _i in 0..5 {
1704+
w.append(r1.clone()).unwrap();
1705+
}
1706+
w.append(r2).unwrap();
1707+
let bytes = w.into_inner().unwrap();
1708+
1709+
let mut reader = ReaderBuilder::new()
1710+
.read_schema()
1711+
.with_batch_size(20)
1712+
.build(std::io::Cursor::new(bytes))
1713+
.unwrap();
1714+
let batch = reader.next().unwrap().unwrap();
1715+
assert_eq!(batch.num_rows(), 6);
1716+
assert_eq!(batch.num_columns(), 1);
1717+
1718+
let expected = [
1719+
"+------------------------+",
1720+
"| col1 |",
1721+
"+------------------------+",
1722+
"| [{id: 234}, {id: 345}] |",
1723+
"| [{id: 234}, {id: 345}] |",
1724+
"| [{id: 234}, {id: 345}] |",
1725+
"| [{id: 234}, {id: 345}] |",
1726+
"| [{id: 234}, {id: 345}] |",
1727+
"| |",
1728+
"+------------------------+",
1729+
];
1730+
assert_batches_eq!(expected, &[batch]);
1731+
}
1732+
16461733
#[test]
16471734
fn test_avro_iterator() {
16481735
let reader = build_reader("alltypes_plain.avro", 5);

0 commit comments

Comments
 (0)