Skip to content

Commit 4842965

Browse files
committed
impl equal to.
1 parent ffcc1a2 commit 4842965

File tree

1 file changed

+81
-3
lines changed

1 file changed

+81
-3
lines changed

datafusion/physical-plan/src/aggregates/group_values/group_column.rs

+81-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use arrow::array::make_view;
1919
use arrow::array::BufferBuilder;
20+
use arrow::array::ByteView;
2021
use arrow::array::GenericBinaryArray;
2122
use arrow::array::GenericStringArray;
2223
use arrow::array::OffsetSizeTrait;
@@ -445,7 +446,9 @@ impl ByteGroupValueViewBuilder {
445446
let value: &[u8] = arr.value(row).as_ref();
446447

447448
let value_len = value.len();
448-
let view = if value_len > 12 {
449+
let view = if value_len <= 12 {
450+
make_view(value, 0, 0)
451+
} else {
449452
// Ensure big enough block to hold the value firstly
450453
self.ensure_in_progress_big_enough(value_len);
451454

@@ -455,8 +458,6 @@ impl ByteGroupValueViewBuilder {
455458
self.in_progress.extend_from_slice(value);
456459

457460
make_view(value, block_id, offset)
458-
} else {
459-
make_view(value, 0, 0)
460461
};
461462

462463
// Append view
@@ -477,6 +478,83 @@ impl ByteGroupValueViewBuilder {
477478
self.completed.push(buffer);
478479
}
479480
}
481+
482+
fn equal_to_inner<B>(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool
483+
where
484+
B: ByteViewType,
485+
{
486+
let array = array.as_byte_view::<B>();
487+
488+
// Check if nulls equal firstly
489+
let exist_null = self.nulls.is_null(lhs_row);
490+
let input_null = array.is_null(rhs_row);
491+
if let Some(result) = nulls_equal_to(exist_null, input_null) {
492+
return result;
493+
}
494+
495+
// Otherwise, we need to check their values
496+
let exist_view = self.views[lhs_row];
497+
let exist_view_len = exist_view as u32;
498+
499+
let input_view = array.views()[rhs_row];
500+
let input_view_len = input_view as u32;
501+
502+
// The check logic
503+
// - Check len equality
504+
// - If non-inlined, check prefix and then check value in buffer
505+
// when needed
506+
// - If inlined, check inlined value
507+
if exist_view_len != input_view_len {
508+
return false;
509+
}
510+
511+
if exist_view_len <= 12 {
512+
let exist_inline = unsafe {
513+
GenericByteViewArray::<T>::inline_value(
514+
&exist_view,
515+
exist_view_len as usize,
516+
)
517+
};
518+
let input_inline = unsafe {
519+
GenericByteViewArray::<T>::inline_value(
520+
&input_view,
521+
input_view_len as usize,
522+
)
523+
};
524+
exist_inline == input_inline
525+
} else {
526+
let exist_prefix =
527+
unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 4) };
528+
let input_prefix =
529+
unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 4) };
530+
531+
if exist_prefix != input_prefix {
532+
return false;
533+
}
534+
535+
let exist_full = {
536+
let byte_view = ByteView::from(exist_view);
537+
self.value(
538+
byte_view.buffer_index as usize,
539+
byte_view.offset as usize,
540+
byte_view.length as usize,
541+
)
542+
};
543+
let input_full: &[u8] = unsafe { array.value_unchecked(rhs_row).as_ref() };
544+
exist_full == input_full
545+
}
546+
}
547+
548+
fn value(&self, buffer_index: usize, offset: usize, length: usize) -> &[u8] {
549+
debug_assert!(buffer_index <= self.completed.len());
550+
551+
if buffer_index < self.completed.len() {
552+
let block = &self.completed[buffer_index];
553+
&block[offset..offset + length]
554+
} else {
555+
&self.in_progress[offset..offset + length]
556+
}
557+
}
480558
}
481559

482560
/// Determines if the nullability of the existing and new input array can be used

0 commit comments

Comments
 (0)