Skip to content

Commit f97a208

Browse files
authored
Use concat to simplify Nested Scalar creation (apache#9174)
* replace with concat Signed-off-by: jayzhan211 <[email protected]> * rewrite Signed-off-by: jayzhan211 <[email protected]> * remove map_err Signed-off-by: jayzhan211 <[email protected]> --------- Signed-off-by: jayzhan211 <[email protected]>
1 parent ae88235 commit f97a208

File tree

1 file changed

+65
-120
lines changed

1 file changed

+65
-120
lines changed

datafusion/common/src/scalar.rs

+65-120
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use arrow::{
5353
};
5454
use arrow_array::cast::as_list_array;
5555
use arrow_array::{ArrowNativeTypeOp, Scalar};
56-
use arrow_buffer::{Buffer, NullBuffer};
56+
use arrow_buffer::NullBuffer;
5757

5858
/// A dynamically typed, nullable single value, (the single-valued counter-part
5959
/// to arrow's [`Array`])
@@ -1402,121 +1402,6 @@ impl ScalarValue {
14021402
}};
14031403
}
14041404

1405-
fn build_struct_array(
1406-
scalars: impl IntoIterator<Item = ScalarValue>,
1407-
) -> Result<ArrayRef> {
1408-
let arrays = scalars
1409-
.into_iter()
1410-
.map(|s| s.to_array())
1411-
.collect::<Result<Vec<_>>>()?;
1412-
1413-
let first_struct = arrays[0].as_struct_opt();
1414-
if first_struct.is_none() {
1415-
return _internal_err!(
1416-
"Inconsistent types in ScalarValue::iter_to_array. \
1417-
Expected ScalarValue::Struct, got {:?}",
1418-
arrays[0].clone()
1419-
);
1420-
}
1421-
1422-
let mut valid = BooleanBufferBuilder::new(arrays.len());
1423-
1424-
let first_struct = first_struct.unwrap();
1425-
valid.append(first_struct.is_valid(0));
1426-
1427-
let mut column_values: Vec<Vec<ScalarValue>> =
1428-
vec![Vec::with_capacity(arrays.len()); first_struct.num_columns()];
1429-
1430-
for (i, v) in first_struct.columns().iter().enumerate() {
1431-
// ScalarValue::Struct contains a single element in each column.
1432-
let sv = ScalarValue::try_from_array(v, 0)?;
1433-
column_values[i].push(sv);
1434-
}
1435-
1436-
for arr in arrays.iter().skip(1) {
1437-
if let Some(struct_array) = arr.as_struct_opt() {
1438-
valid.append(struct_array.is_valid(0));
1439-
1440-
for (i, v) in struct_array.columns().iter().enumerate() {
1441-
// ScalarValue::Struct contains a single element in each column.
1442-
let sv = ScalarValue::try_from_array(v, 0)?;
1443-
column_values[i].push(sv);
1444-
}
1445-
} else {
1446-
return _internal_err!(
1447-
"Inconsistent types in ScalarValue::iter_to_array. \
1448-
Expected ScalarValue::Struct, got {arr:?}"
1449-
);
1450-
}
1451-
}
1452-
1453-
let column_fields = first_struct.fields().to_vec();
1454-
1455-
let mut data = vec![];
1456-
for (field, values) in
1457-
column_fields.into_iter().zip(column_values.into_iter())
1458-
{
1459-
let field = field.to_owned();
1460-
let array = ScalarValue::iter_to_array(values.into_iter())?;
1461-
data.push((field, array));
1462-
}
1463-
1464-
let bool_buffer = valid.finish();
1465-
let buffer: Buffer = bool_buffer.values().into();
1466-
Ok(Arc::new(StructArray::from((data, buffer))))
1467-
}
1468-
1469-
fn build_list_array(
1470-
scalars: impl IntoIterator<Item = ScalarValue>,
1471-
) -> Result<ArrayRef> {
1472-
let arrays = scalars
1473-
.into_iter()
1474-
.map(|s| s.to_array())
1475-
.collect::<Result<Vec<_>>>()?;
1476-
1477-
let capacity = Capacities::Array(
1478-
arrays
1479-
.iter()
1480-
.filter_map(|arr| {
1481-
if !arr.is_null(0) {
1482-
Some(arr.len())
1483-
} else {
1484-
None
1485-
}
1486-
})
1487-
.sum(),
1488-
);
1489-
1490-
// ScalarValue::List contains a single element ListArray.
1491-
let nulls = arrays
1492-
.iter()
1493-
.map(|arr| arr.is_null(0))
1494-
.collect::<Vec<bool>>();
1495-
let arrays_data = arrays
1496-
.iter()
1497-
.filter(|arr| !arr.is_null(0))
1498-
.map(|arr| arr.to_data())
1499-
.collect::<Vec<_>>();
1500-
1501-
let arrays_ref = arrays_data.iter().collect::<Vec<_>>();
1502-
let mut mutable =
1503-
MutableArrayData::with_capacities(arrays_ref, true, capacity);
1504-
1505-
// ScalarValue::List contains a single element ListArray.
1506-
let mut index = 0;
1507-
for is_null in nulls.into_iter() {
1508-
if is_null {
1509-
mutable.extend_nulls(1);
1510-
} else {
1511-
// mutable array contains non-null elements
1512-
mutable.extend(index, 0, 1);
1513-
index += 1;
1514-
}
1515-
}
1516-
let data = mutable.freeze();
1517-
Ok(arrow_array::make_array(data))
1518-
}
1519-
15201405
let array: ArrayRef = match &data_type {
15211406
DataType::Decimal128(precision, scale) => {
15221407
let decimal_array =
@@ -1591,10 +1476,32 @@ impl ScalarValue {
15911476
DataType::Interval(IntervalUnit::MonthDayNano) => {
15921477
build_array_primitive!(IntervalMonthDayNanoArray, IntervalMonthDayNano)
15931478
}
1594-
DataType::Struct(_) => build_struct_array(scalars)?,
1595-
DataType::List(_)
1596-
| DataType::LargeList(_)
1597-
| DataType::FixedSizeList(_, _) => build_list_array(scalars)?,
1479+
DataType::FixedSizeList(_, _) => {
1480+
// arrow::compute::concat does not allow inconsistent types including the size of FixedSizeList.
1481+
// The length of nulls here we got is 1, so we need to resize the length of nulls to
1482+
// the length of non-nulls.
1483+
let mut arrays =
1484+
scalars.map(|s| s.to_array()).collect::<Result<Vec<_>>>()?;
1485+
let first_non_null_data_type = arrays
1486+
.iter()
1487+
.find(|sv| !sv.is_null(0))
1488+
.map(|sv| sv.data_type().to_owned());
1489+
if let Some(DataType::FixedSizeList(f, l)) = first_non_null_data_type {
1490+
for array in arrays.iter_mut() {
1491+
if array.is_null(0) {
1492+
*array =
1493+
Arc::new(FixedSizeListArray::new_null(f.clone(), l, 1));
1494+
}
1495+
}
1496+
}
1497+
let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
1498+
arrow::compute::concat(arrays.as_slice())?
1499+
}
1500+
DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => {
1501+
let arrays = scalars.map(|s| s.to_array()).collect::<Result<Vec<_>>>()?;
1502+
let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
1503+
arrow::compute::concat(arrays.as_slice())?
1504+
}
15981505
DataType::Dictionary(key_type, value_type) => {
15991506
// create the values array
16001507
let value_scalars = scalars
@@ -3529,6 +3436,44 @@ mod tests {
35293436
.collect()
35303437
}
35313438

3439+
#[test]
3440+
fn test_iter_to_array_fixed_size_list() {
3441+
let field = Arc::new(Field::new("item", DataType::Int32, true));
3442+
let f1 = Arc::new(FixedSizeListArray::new(
3443+
field.clone(),
3444+
3,
3445+
Arc::new(Int32Array::from(vec![1, 2, 3])),
3446+
None,
3447+
));
3448+
let f2 = Arc::new(FixedSizeListArray::new(
3449+
field.clone(),
3450+
3,
3451+
Arc::new(Int32Array::from(vec![4, 5, 6])),
3452+
None,
3453+
));
3454+
let f_nulls = Arc::new(FixedSizeListArray::new_null(field, 1, 1));
3455+
3456+
let scalars = vec![
3457+
ScalarValue::FixedSizeList(f_nulls.clone()),
3458+
ScalarValue::FixedSizeList(f1),
3459+
ScalarValue::FixedSizeList(f2),
3460+
ScalarValue::FixedSizeList(f_nulls),
3461+
];
3462+
3463+
let array = ScalarValue::iter_to_array(scalars).unwrap();
3464+
3465+
let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
3466+
vec![
3467+
None,
3468+
Some(vec![Some(1), Some(2), Some(3)]),
3469+
Some(vec![Some(4), Some(5), Some(6)]),
3470+
None,
3471+
],
3472+
3,
3473+
);
3474+
assert_eq!(array.as_ref(), &expected);
3475+
}
3476+
35323477
#[test]
35333478
fn test_iter_to_array_struct() {
35343479
let s1 = StructArray::from(vec![

0 commit comments

Comments
 (0)