Skip to content

Commit ffcc1a2

Browse files
committed
impl append.
1 parent ca033e0 commit ffcc1a2

File tree

1 file changed

+54
-17
lines changed

1 file changed

+54
-17
lines changed

datafusion/physical-plan/src/aggregates/group_values/group_column.rs

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use arrow::array::make_view;
1819
use arrow::array::BufferBuilder;
1920
use arrow::array::GenericBinaryArray;
2021
use arrow::array::GenericStringArray;
@@ -39,6 +40,7 @@ use datafusion_common::utils::proxy::VecAllocExt;
3940
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
4041
use arrow_array::types::GenericStringType;
4142
use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};
43+
use std::mem;
4244
use std::sync::Arc;
4345
use std::vec;
4446

@@ -424,23 +426,58 @@ pub struct ByteGroupValueViewBuilder {
424426
nulls: MaybeNullBufferBuilder,
425427
}
426428

427-
// impl ByteGroupValueViewBuilder {
428-
// fn append_val_inner<B>(&mut self, array: &ArrayRef, row: usize)
429-
// where
430-
// B: ByteViewType,
431-
// {
432-
// let arr = array.as_byte_view::<B>();
433-
// if arr.is_null(row) {
434-
// self.nulls.append(true);
435-
// self.views.push(0);
436-
// } else {
437-
// self.nulls.append(false);
438-
// let value: &[u8] = arr.value(row).as_ref();
439-
// self.buffer.append_slice(value);
440-
// self.offsets.push(O::usize_as(self.buffer.len()));
441-
// }
442-
// }
443-
// }
429+
impl ByteGroupValueViewBuilder {
430+
fn append_val_inner<B>(&mut self, array: &ArrayRef, row: usize)
431+
where
432+
B: ByteViewType,
433+
{
434+
let arr = array.as_byte_view::<B>();
435+
436+
// If a null row, set and return
437+
if arr.is_null(row) {
438+
self.nulls.append(true);
439+
self.views.push(0);
440+
return;
441+
}
442+
443+
// Not null case
444+
self.nulls.append(false);
445+
let value: &[u8] = arr.value(row).as_ref();
446+
447+
let value_len = value.len();
448+
let view = if value_len > 12 {
449+
// Ensure big enough block to hold the value firstly
450+
self.ensure_in_progress_big_enough(value_len);
451+
452+
// Append value
453+
let block_id = self.completed.len();
454+
let offset = self.in_progress.len();
455+
self.in_progress.extend_from_slice(value);
456+
457+
make_view(value, block_id, offset)
458+
} else {
459+
make_view(value, 0, 0)
460+
};
461+
462+
// Append view
463+
self.views.push(view);
464+
}
465+
466+
fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
467+
debug_assert!(value_len > 12);
468+
let require_cap = self.in_progress.len() + value_len;
469+
470+
// If current block isn't big enough, flush it and create a new in progress block
471+
if require_cap > self.max_block_size {
472+
let flushed_block = mem::replace(
473+
&mut self.in_progress,
474+
Vec::with_capacity(self.max_block_size),
475+
);
476+
let buffer = Buffer::from_vec(flushed_block);
477+
self.completed.push(buffer);
478+
}
479+
}
480+
}
444481

445482
/// Determines if the nullability of the existing and new input array can be used
446483
/// to short-circuit the comparison of the two values.

0 commit comments

Comments
 (0)