Skip to content

Commit dbee260

Browse files
committed
Use upstream is_null and is_not_null kernels
1 parent d6b667e commit dbee260

File tree

2 files changed

+7
-72
lines changed

2 files changed

+7
-72
lines changed

datafusion/physical-expr/src/expressions/is_not_null.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl PhysicalExpr for IsNotNullExpr {
7373
let arg = self.arg.evaluate(batch)?;
7474
match arg {
7575
ColumnarValue::Array(array) => {
76-
let is_not_null = super::is_null::compute_is_not_null(array)?;
76+
let is_not_null = arrow::compute::is_not_null(&array)?;
7777
Ok(ColumnarValue::Array(Arc::new(is_not_null)))
7878
}
7979
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(

datafusion/physical-expr/src/expressions/is_null.rs

+6-71
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,10 @@
2020
use std::hash::{Hash, Hasher};
2121
use std::{any::Any, sync::Arc};
2222

23-
use arrow::compute;
2423
use arrow::{
2524
datatypes::{DataType, Schema},
2625
record_batch::RecordBatch,
2726
};
28-
use arrow_array::{Array, ArrayRef, BooleanArray, Int8Array, UnionArray};
29-
use arrow_buffer::{BooleanBuffer, ScalarBuffer};
30-
use arrow_ord::cmp;
3127

3228
use crate::physical_expr::down_cast_any_ref;
3329
use crate::PhysicalExpr;
@@ -77,9 +73,9 @@ impl PhysicalExpr for IsNullExpr {
7773
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
7874
let arg = self.arg.evaluate(batch)?;
7975
match arg {
80-
ColumnarValue::Array(array) => {
81-
Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?)))
82-
}
76+
ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new(
77+
arrow::compute::is_null(&array)?,
78+
))),
8379
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
8480
ScalarValue::Boolean(Some(scalar.is_null())),
8581
)),
@@ -103,65 +99,6 @@ impl PhysicalExpr for IsNullExpr {
10399
}
104100
}
105101

106-
/// workaround <https://github.com/apache/arrow-rs/issues/6017>,
107-
/// this can be replaced with a direct call to `arrow::compute::is_null` once it's fixed.
108-
pub(crate) fn compute_is_null(array: ArrayRef) -> Result<BooleanArray> {
109-
if let Some(union_array) = array.as_any().downcast_ref::<UnionArray>() {
110-
if let Some(offsets) = union_array.offsets() {
111-
dense_union_is_null(union_array, offsets)
112-
} else {
113-
sparse_union_is_null(union_array)
114-
}
115-
} else {
116-
compute::is_null(array.as_ref()).map_err(Into::into)
117-
}
118-
}
119-
120-
/// workaround <https://github.com/apache/arrow-rs/issues/6017>,
121-
/// this can be replaced with a direct call to `arrow::compute::is_not_null` once it's fixed.
122-
pub(crate) fn compute_is_not_null(array: ArrayRef) -> Result<BooleanArray> {
123-
if array.as_any().is::<UnionArray>() {
124-
compute::not(&compute_is_null(array)?).map_err(Into::into)
125-
} else {
126-
compute::is_not_null(array.as_ref()).map_err(Into::into)
127-
}
128-
}
129-
130-
fn dense_union_is_null(
131-
union_array: &UnionArray,
132-
offsets: &ScalarBuffer<i32>,
133-
) -> Result<BooleanArray> {
134-
let child_arrays = (0..union_array.type_names().len())
135-
.map(|type_id| {
136-
compute::is_null(&union_array.child(type_id as i8)).map_err(Into::into)
137-
})
138-
.collect::<Result<Vec<BooleanArray>>>()?;
139-
140-
let buffer: BooleanBuffer = offsets
141-
.iter()
142-
.zip(union_array.type_ids())
143-
.map(|(offset, type_id)| child_arrays[*type_id as usize].value(*offset as usize))
144-
.collect();
145-
146-
Ok(BooleanArray::new(buffer, None))
147-
}
148-
149-
fn sparse_union_is_null(union_array: &UnionArray) -> Result<BooleanArray> {
150-
let type_ids = Int8Array::new(union_array.type_ids().clone(), None);
151-
152-
let mut union_is_null =
153-
BooleanArray::new(BooleanBuffer::new_unset(union_array.len()), None);
154-
for type_id in 0..union_array.type_names().len() {
155-
let type_id = type_id as i8;
156-
let union_is_child = cmp::eq(&type_ids, &Int8Array::new_scalar(type_id))?;
157-
let child = union_array.child(type_id);
158-
let child_array_is_null = compute::is_null(&child)?;
159-
let child_is_null = compute::and(&union_is_child, &child_array_is_null)?;
160-
union_is_null = compute::or(&union_is_null, &child_is_null)?;
161-
}
162-
Ok(union_is_null)
163-
}
164-
165102
impl PartialEq<dyn Any> for IsNullExpr {
166103
fn eq(&self, other: &dyn Any) -> bool {
167104
down_cast_any_ref(other)
@@ -184,7 +121,7 @@ mod tests {
184121
array::{BooleanArray, StringArray},
185122
datatypes::*,
186123
};
187-
use arrow_array::{Float64Array, Int32Array};
124+
use arrow_array::{Array, Float64Array, Int32Array, UnionArray};
188125
use arrow_buffer::ScalarBuffer;
189126
use datafusion_common::cast::as_boolean_array;
190127

@@ -243,8 +180,7 @@ mod tests {
243180
let array =
244181
UnionArray::try_new(union_fields(), type_ids, None, children).unwrap();
245182

246-
let array_ref = Arc::new(array) as ArrayRef;
247-
let result = compute_is_null(array_ref).unwrap();
183+
let result = arrow::compute::is_null(&array).unwrap();
248184

249185
let expected =
250186
&BooleanArray::from(vec![false, true, false, false, true, true, false]);
@@ -272,8 +208,7 @@ mod tests {
272208
UnionArray::try_new(union_fields(), type_ids, Some(offsets), children)
273209
.unwrap();
274210

275-
let array_ref = Arc::new(array) as ArrayRef;
276-
let result = compute_is_null(array_ref).unwrap();
211+
let result = arrow::compute::is_null(&array).unwrap();
277212

278213
let expected = &BooleanArray::from(vec![false, true, false, true, false, true]);
279214
assert_eq!(expected, &result);

0 commit comments

Comments
 (0)