-
Notifications
You must be signed in to change notification settings - Fork 892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
filter for run end array #5573
filter for run end array #5573
Changes from all commits
d5f4a47
0751ad9
34cf37f
540e548
9dd3eda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,16 @@ | |
|
||
//! Defines filter kernels | ||
|
||
use std::ops::AddAssign; | ||
use std::sync::Arc; | ||
|
||
use arrow_array::builder::BooleanBufferBuilder; | ||
use arrow_array::cast::AsArray; | ||
use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType}; | ||
use arrow_array::types::{ | ||
ArrowDictionaryKeyType, ArrowPrimitiveType, ByteArrayType, RunEndIndexType, | ||
}; | ||
use arrow_array::*; | ||
use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer}; | ||
use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, RunEndBuffer}; | ||
use arrow_buffer::{Buffer, MutableBuffer}; | ||
use arrow_data::bit_iterator::{BitIndexIterator, BitSliceIterator}; | ||
use arrow_data::transform::MutableArrayData; | ||
|
@@ -336,6 +339,12 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result<Array | |
DataType::LargeBinary => { | ||
Ok(Arc::new(filter_bytes(values.as_binary::<i64>(), predicate))) | ||
} | ||
DataType::RunEndEncoded(_, _) => { | ||
downcast_run_array!{ | ||
values => Ok(Arc::new(filter_run_end_array(values, predicate)?)), | ||
t => unimplemented!("Filter not supported for RunEndEncoded type {:?}", t) | ||
} | ||
} | ||
DataType::Dictionary(_, _) => downcast_dictionary_array! { | ||
values => Ok(Arc::new(filter_dict(values, predicate))), | ||
t => unimplemented!("Filter not supported for dictionary type {:?}", t) | ||
|
@@ -368,6 +377,55 @@ fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result<Array | |
} | ||
} | ||
|
||
/// Filter any supported [`RunArray`] based on a [`FilterPredicate`] | ||
fn filter_run_end_array<R: RunEndIndexType>( | ||
re_arr: &RunArray<R>, | ||
pred: &FilterPredicate, | ||
) -> Result<RunArray<R>, ArrowError> | ||
where | ||
R::Native: Into<i64> + From<bool>, | ||
R::Native: AddAssign, | ||
{ | ||
let run_ends: &RunEndBuffer<R::Native> = re_arr.run_ends(); | ||
let mut values_filter = BooleanBufferBuilder::new(run_ends.len()); | ||
let mut new_run_ends = vec![R::default_value(); run_ends.len()]; | ||
|
||
let mut start = 0i64; | ||
let mut i = 0; | ||
let filter_values = pred.filter.values(); | ||
let mut count = R::default_value(); | ||
|
||
for end in run_ends.inner().into_iter().map(|i| (*i).into()) { | ||
let mut keep = false; | ||
// in filter_array the predicate array is checked to have the same len as the run end array | ||
// this means the largest value in the run_ends is == to pred.len() | ||
// so we're always within bounds when calling value_unchecked | ||
for pred in (start..end).map(|i| unsafe { filter_values.value_unchecked(i as usize) }) { | ||
count += R::Native::from(pred); | ||
keep |= pred | ||
} | ||
// this is to avoid branching | ||
new_run_ends[i] = count; | ||
i += keep as usize; | ||
|
||
values_filter.append(keep); | ||
start = end; | ||
} | ||
|
||
new_run_ends.truncate(i); | ||
|
||
if values_filter.is_empty() { | ||
new_run_ends.clear(); | ||
} | ||
Comment on lines
+415
to
+419
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you allocate the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm using this trick new_run_ends[i] = count;
i += keep as usize; to make it branchless, I can't do the same thing with push |
||
|
||
let values = re_arr.values(); | ||
let pred = BooleanArray::new(values_filter.finish(), None); | ||
let values = filter(&values, &pred)?; | ||
|
||
let run_ends = PrimitiveArray::<R>::new(new_run_ends.into(), None); | ||
RunArray::try_new(&run_ends, &values) | ||
} | ||
|
||
/// Computes a new null mask for `data` based on `predicate` | ||
/// | ||
/// If the predicate selected no null-rows, returns `None`, otherwise returns | ||
|
@@ -635,6 +693,7 @@ where | |
#[cfg(test)] | ||
mod tests { | ||
use arrow_array::builder::*; | ||
use arrow_array::cast::as_run_array; | ||
use arrow_array::types::*; | ||
use rand::distributions::{Alphanumeric, Standard}; | ||
use rand::prelude::*; | ||
|
@@ -844,6 +903,78 @@ mod tests { | |
assert_eq!(9, d.value(1)); | ||
} | ||
|
||
#[test] | ||
fn test_filter_run_end_encoding_array() { | ||
let run_ends = Int64Array::from(vec![2, 3, 8]); | ||
let values = Int64Array::from(vec![7, -2, 9]); | ||
let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray"); | ||
let b = BooleanArray::from(vec![true, false, true, false, true, false, true, false]); | ||
let c = filter(&a, &b).unwrap(); | ||
let actual: &RunArray<Int64Type> = as_run_array(&c); | ||
assert_eq!(4, actual.len()); | ||
|
||
let expected = RunArray::try_new( | ||
&Int64Array::from(vec![1, 2, 4]), | ||
&Int64Array::from(vec![7, -2, 9]), | ||
) | ||
.expect("Failed to make expected RunArray test is broken"); | ||
|
||
assert_eq!(&actual.run_ends().values(), &expected.run_ends().values()); | ||
assert_eq!(actual.values(), expected.values()) | ||
} | ||
|
||
#[test] | ||
fn test_filter_run_end_encoding_array_remove_value() { | ||
let run_ends = Int32Array::from(vec![2, 3, 8, 10]); | ||
let values = Int32Array::from(vec![7, -2, 9, -8]); | ||
let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray"); | ||
let b = BooleanArray::from(vec![ | ||
false, true, false, false, true, false, true, false, false, false, | ||
]); | ||
let c = filter(&a, &b).unwrap(); | ||
let actual: &RunArray<Int32Type> = as_run_array(&c); | ||
assert_eq!(3, actual.len()); | ||
|
||
let expected = | ||
RunArray::try_new(&Int32Array::from(vec![1, 3]), &Int32Array::from(vec![7, 9])) | ||
.expect("Failed to make expected RunArray test is broken"); | ||
|
||
assert_eq!(&actual.run_ends().values(), &expected.run_ends().values()); | ||
assert_eq!(actual.values(), expected.values()) | ||
} | ||
|
||
#[test] | ||
fn test_filter_run_end_encoding_array_remove_all_but_one() { | ||
let run_ends = Int16Array::from(vec![2, 3, 8, 10]); | ||
let values = Int16Array::from(vec![7, -2, 9, -8]); | ||
let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray"); | ||
let b = BooleanArray::from(vec![ | ||
false, false, false, false, false, false, true, false, false, false, | ||
]); | ||
let c = filter(&a, &b).unwrap(); | ||
let actual: &RunArray<Int16Type> = as_run_array(&c); | ||
assert_eq!(1, actual.len()); | ||
|
||
let expected = RunArray::try_new(&Int16Array::from(vec![1]), &Int16Array::from(vec![9])) | ||
.expect("Failed to make expected RunArray test is broken"); | ||
|
||
assert_eq!(&actual.run_ends().values(), &expected.run_ends().values()); | ||
assert_eq!(actual.values(), expected.values()) | ||
} | ||
|
||
#[test] | ||
fn test_filter_run_end_encoding_array_empty() { | ||
let run_ends = Int64Array::from(vec![2, 3, 8, 10]); | ||
let values = Int64Array::from(vec![7, -2, 9, -8]); | ||
let a = RunArray::try_new(&run_ends, &values).expect("Failed to create RunArray"); | ||
let b = BooleanArray::from(vec![ | ||
false, false, false, false, false, false, false, false, false, false, | ||
]); | ||
let c = filter(&a, &b).unwrap(); | ||
let actual: &RunArray<Int64Type> = as_run_array(&c); | ||
assert_eq!(0, actual.len()); | ||
} | ||
|
||
#[test] | ||
fn test_filter_dictionary_array() { | ||
let values = [Some("hello"), None, Some("world"), Some("!")]; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can leave a TODO item to utilize the
IterationStrategy
withinFilterPredicate
for potential performance benefit to keep this PR as more an initial version of filter for run end arrays?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could handle the None case in this PR, the index based ones are a poor fit for REE I suspect unless the selectivity of the filter is high. I'd need a benchmark but in short I would prefer to leave it as
TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think None case is already handled by the parent
arrow-rs/arrow-select/src/filter.rs
Line 318 in 77a3132