diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 7bb140d37f5..ad205dfea31 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -554,36 +554,33 @@ fn filter_boolean(array: &BooleanArray, predicate: &FilterPredicate) -> BooleanA fn filter_native(values: &[T], predicate: &FilterPredicate) -> Buffer { assert!(values.len() >= predicate.filter.len()); - let buffer = match &predicate.strategy { + match &predicate.strategy { IterationStrategy::SlicesIterator => { - let mut buffer = MutableBuffer::with_capacity(predicate.count * T::get_byte_width()); + let mut buffer = Vec::with_capacity(predicate.count); for (start, end) in SlicesIterator::new(&predicate.filter) { buffer.extend_from_slice(&values[start..end]); } - buffer + buffer.into() } IterationStrategy::Slices(slices) => { - let mut buffer = MutableBuffer::with_capacity(predicate.count * T::get_byte_width()); + let mut buffer = Vec::with_capacity(predicate.count); for (start, end) in slices { buffer.extend_from_slice(&values[*start..*end]); } - buffer + buffer.into() } IterationStrategy::IndexIterator => { let iter = IndexIterator::new(&predicate.filter, predicate.count).map(|x| values[x]); // SAFETY: IndexIterator is trusted length - unsafe { MutableBuffer::from_trusted_len_iter(iter) } + unsafe { MutableBuffer::from_trusted_len_iter(iter) }.into() } IterationStrategy::Indices(indices) => { let iter = indices.iter().map(|x| values[*x]); - // SAFETY: `Vec::iter` is trusted length - unsafe { MutableBuffer::from_trusted_len_iter(iter) } + iter.collect::>().into() } IterationStrategy::All | IterationStrategy::None => unreachable!(), - }; - - buffer.into() + } } /// `filter` implementation for primitive arrays @@ -656,29 +653,46 @@ where (start, end, len) } - /// Extends the in-progress array by the indexes in the provided iterator - fn extend_idx(&mut self, iter: impl Iterator) { + fn extend_offsets_idx(&mut self, iter: impl Iterator) { self.dst_offsets.extend(iter.map(|idx| { let start = self.src_offsets[idx].as_usize(); let end = self.src_offsets[idx + 1].as_usize(); let len = OffsetSize::from_usize(end - start).expect("illegal offset range"); self.cur_offset += len; - self.dst_values - .extend_from_slice(&self.src_values[start..end]); + self.cur_offset })); } - /// Extends the in-progress array by the ranges in the provided iterator - fn extend_slices(&mut self, iter: impl Iterator) { + /// Extends the in-progress array by the indexes in the provided iterator + fn extend_idx(&mut self, iter: impl Iterator) { + self.dst_values.reserve_exact(self.cur_offset.as_usize()); + + for idx in iter { + let start = self.src_offsets[idx].as_usize(); + let end = self.src_offsets[idx + 1].as_usize(); + self.dst_values + .extend_from_slice(&self.src_values[start..end]); + } + } + + fn extend_offsets_slices(&mut self, iter: impl Iterator, count: usize) { + self.dst_offsets.reserve_exact(count); for (start, end) in iter { // These can only fail if `array` contains invalid data for idx in start..end { let (_, _, len) = self.get_value_range(idx); self.cur_offset += len; - self.dst_offsets.push(self.cur_offset); // push_unchecked? + self.dst_offsets.push(self.cur_offset); } + } + } + /// Extends the in-progress array by the ranges in the provided iterator + fn extend_slices(&mut self, iter: impl Iterator) { + self.dst_values.reserve_exact(self.cur_offset.as_usize()); + + for (start, end) in iter { let value_start = self.get_value_offset(start); let value_end = self.get_value_offset(end); self.dst_values @@ -699,13 +713,21 @@ where match &predicate.strategy { IterationStrategy::SlicesIterator => { + filter.extend_offsets_slices(SlicesIterator::new(&predicate.filter), predicate.count); filter.extend_slices(SlicesIterator::new(&predicate.filter)) } - IterationStrategy::Slices(slices) => filter.extend_slices(slices.iter().cloned()), + IterationStrategy::Slices(slices) => { + filter.extend_offsets_slices(slices.iter().cloned(), predicate.count); + filter.extend_slices(slices.iter().cloned()) + } IterationStrategy::IndexIterator => { + filter.extend_offsets_idx(IndexIterator::new(&predicate.filter, predicate.count)); filter.extend_idx(IndexIterator::new(&predicate.filter, predicate.count)) } - IterationStrategy::Indices(indices) => filter.extend_idx(indices.iter().cloned()), + IterationStrategy::Indices(indices) => { + filter.extend_offsets_idx(indices.iter().cloned()); + filter.extend_idx(indices.iter().cloned()) + } IterationStrategy::All | IterationStrategy::None => unreachable!(), }