Skip to content

Commit

Permalink
define VectorizedOperationBuffers to hold buffers used in vectorize…
Browse files Browse the repository at this point in the history
…d operations to make code clearer.
  • Loading branch information
Rachelint committed Nov 2, 2024
1 parent 406acb4 commit 7a1ed90
Showing 1 changed file with 109 additions and 50 deletions.
159 changes: 109 additions & 50 deletions datafusion/physical-plan/src/aggregates/group_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,8 @@ pub struct VectorizedGroupValuesColumn {
/// a specific list in `group_index_lists`.
emit_group_index_list_buffer: Vec<usize>,

/// Similar as `current_indices`, but `remaining_indices`
/// is used to store the rows will be processed in next round.
scalarized_indices: Vec<usize>,

/// The `vectorized_equal_tod` row indices buffer
vectorized_equal_to_row_indices: Vec<usize>,

/// The `vectorized_equal_tod` group indices buffer
vectorized_equal_to_group_indices: Vec<usize>,

/// The `vectorized_equal_tod` result buffer
vectorized_equal_to_results: Vec<bool>,

/// The `vectorized append` row indices buffer
vectorized_append_row_indices: Vec<usize>,
/// Buffers for `vectorized_append` and `vectorized_equal_to`
vectorized_operation_buffers: VectorizedOperationBuffers,

/// The actual group by values, stored column-wise. Compare from
/// the left to right, each column is stored as [`GroupColumn`].
Expand All @@ -161,6 +148,38 @@ pub struct VectorizedGroupValuesColumn {
random_state: RandomState,
}

/// Buffers to store intermediate results in `vectorized_append`
/// and `vectorized_equal_to`, for reducing memory allocation
#[derive(Default)]
struct VectorizedOperationBuffers {
/// The `vectorized append` row indices buffer
append_row_indices: Vec<usize>,

/// The `vectorized_equal_to` row indices buffer
equal_to_row_indices: Vec<usize>,

/// The `vectorized_equal_to` group indices buffer
equal_to_group_indices: Vec<usize>,

/// The `vectorized_equal_to` result buffer
equal_to_results: Vec<bool>,

/// The buffer for storing row indices found not equal to
/// exist groups in `group_values` in `vectorized_equal_to`.
/// We will perform `scalarized_intern` for such rows.
remaining_row_indices: Vec<usize>,
}

impl VectorizedOperationBuffers {
fn clear(&mut self) {
self.append_row_indices.clear();
self.equal_to_row_indices.clear();
self.equal_to_group_indices.clear();
self.equal_to_results.clear();
self.remaining_row_indices.clear();
}
}

impl VectorizedGroupValuesColumn {
/// Create a new instance of GroupValuesColumn if supported for the specified schema
pub fn try_new(schema: SchemaRef) -> Result<Self> {
Expand All @@ -170,15 +189,11 @@ impl VectorizedGroupValuesColumn {
map,
group_index_lists: Vec::new(),
emit_group_index_list_buffer: Vec::new(),
vectorized_operation_buffers: VectorizedOperationBuffers::default(),
map_size: 0,
group_values: vec![],
hashes_buffer: Default::default(),
random_state: Default::default(),
scalarized_indices: Default::default(),
vectorized_equal_to_row_indices: Default::default(),
vectorized_equal_to_group_indices: Default::default(),
vectorized_equal_to_results: Default::default(),
vectorized_append_row_indices: Default::default(),
})
}

Expand All @@ -201,9 +216,13 @@ impl VectorizedGroupValuesColumn {
batch_hashes: &[u64],
groups: &mut [usize],
) {
self.vectorized_append_row_indices.clear();
self.vectorized_equal_to_row_indices.clear();
self.vectorized_equal_to_group_indices.clear();
self.vectorized_operation_buffers.append_row_indices.clear();
self.vectorized_operation_buffers
.equal_to_row_indices
.clear();
self.vectorized_operation_buffers
.equal_to_group_indices
.clear();

let mut group_values_len = self.group_values[0].len();
for (row, &target_hash) in batch_hashes.iter().enumerate() {
Expand All @@ -227,7 +246,9 @@ impl VectorizedGroupValuesColumn {
);

// Add row index to `vectorized_append_row_indices`
self.vectorized_append_row_indices.push(row);
self.vectorized_operation_buffers
.append_row_indices
.push(row);

// Set group index to row in `groups`
groups[row] = current_group_idx;
Expand All @@ -245,26 +266,41 @@ impl VectorizedGroupValuesColumn {
let list_offset = group_index_view.value() as usize;
let group_index_list = &self.group_index_lists[list_offset];
for &group_index in group_index_list {
self.vectorized_equal_to_row_indices.push(row);
self.vectorized_equal_to_group_indices.push(group_index);
self.vectorized_operation_buffers
.equal_to_row_indices
.push(row);
self.vectorized_operation_buffers
.equal_to_group_indices
.push(group_index);
}
} else {
let group_index = group_index_view.value() as usize;
self.vectorized_equal_to_row_indices.push(row);
self.vectorized_equal_to_group_indices.push(group_index);
self.vectorized_operation_buffers
.equal_to_row_indices
.push(row);
self.vectorized_operation_buffers
.equal_to_group_indices
.push(group_index);
}
}
}

/// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices`
fn vectorized_append(&mut self, cols: &[ArrayRef]) {
if self.vectorized_append_row_indices.is_empty() {
if self
.vectorized_operation_buffers
.append_row_indices
.is_empty()
{
return;
}

let iter = self.group_values.iter_mut().zip(cols.iter());
for (group_column, col) in iter {
group_column.vectorized_append(col, &self.vectorized_append_row_indices);
group_column.vectorized_append(
col,
&self.vectorized_operation_buffers.append_row_indices,
);
}
}

Expand All @@ -283,63 +319,86 @@ impl VectorizedGroupValuesColumn {
/// are very few.
fn vectorized_equal_to(&mut self, cols: &[ArrayRef], groups: &mut [usize]) {
assert_eq!(
self.vectorized_equal_to_group_indices.len(),
self.vectorized_equal_to_row_indices.len()
self.vectorized_operation_buffers
.equal_to_group_indices
.len(),
self.vectorized_operation_buffers.equal_to_row_indices.len()
);

self.scalarized_indices.clear();
self.vectorized_operation_buffers
.remaining_row_indices
.clear();

if self.vectorized_equal_to_group_indices.is_empty() {
if self
.vectorized_operation_buffers
.equal_to_group_indices
.is_empty()
{
return;
}

// 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices`
// and `group_indices` in `vectorized_equal_to_group_indices`
let mut equal_to_results = mem::take(&mut self.vectorized_equal_to_results);
let mut equal_to_results =
mem::take(&mut self.vectorized_operation_buffers.equal_to_results);
equal_to_results.clear();
equal_to_results.resize(self.vectorized_equal_to_group_indices.len(), true);
equal_to_results.resize(
self.vectorized_operation_buffers
.equal_to_group_indices
.len(),
true,
);

for (col_idx, group_col) in self.group_values.iter().enumerate() {
group_col.vectorized_equal_to(
&self.vectorized_equal_to_group_indices,
&self.vectorized_operation_buffers.equal_to_group_indices,
&cols[col_idx],
&self.vectorized_equal_to_row_indices,
&self.vectorized_operation_buffers.equal_to_row_indices,
&mut equal_to_results,
);
}

// 2. Check `equal_to_results`, if found not equal to `row`s, just add them
// to `scalarized_indices`, and perform `scalarized_intern` for them after.
let mut current_row_equal_to_result = false;
for (idx, &row) in self.vectorized_equal_to_row_indices.iter().enumerate() {
for (idx, &row) in self
.vectorized_operation_buffers
.equal_to_row_indices
.iter()
.enumerate()
{
let equal_to_result = equal_to_results[idx];

// Equal to case, set the `group_indices` to `rows` in `groups`
if equal_to_result {
groups[row] = self.vectorized_equal_to_group_indices[idx];
groups[row] =
self.vectorized_operation_buffers.equal_to_group_indices[idx];
}
current_row_equal_to_result |= equal_to_result;

// Look forward next one row to check if have checked all results
// of current row
let next_row = self
.vectorized_equal_to_row_indices
.vectorized_operation_buffers
.equal_to_row_indices
.get(idx + 1)
.unwrap_or(&usize::MAX);

// Have checked all results of current row, check the total result
if row != *next_row {
// Not equal to case, add `row` to `scalarized_indices`
if !current_row_equal_to_result {
self.scalarized_indices.push(row);
self.vectorized_operation_buffers
.remaining_row_indices
.push(row);
}

// Init the total result for checking next row
current_row_equal_to_result = false;
}
}

self.vectorized_equal_to_results = equal_to_results;
self.vectorized_operation_buffers.equal_to_results = equal_to_results;
}

/// It is possible that some `input rows` have the same
Expand Down Expand Up @@ -384,13 +443,17 @@ impl VectorizedGroupValuesColumn {
batch_hashes: &[u64],
groups: &mut [usize],
) {
if self.scalarized_indices.is_empty() {
if self
.vectorized_operation_buffers
.remaining_row_indices
.is_empty()
{
return;
}

let mut map = mem::take(&mut self.map);

for &row in &self.scalarized_indices {
for &row in &self.vectorized_operation_buffers.remaining_row_indices {
let target_hash = batch_hashes[row];
let entry = map.get_mut(target_hash, |(exist_hash, _)| {
// Somewhat surprisingly, this closure can be called even if the
Expand Down Expand Up @@ -781,11 +844,7 @@ impl GroupValues for VectorizedGroupValuesColumn {
self.hashes_buffer.shrink_to(count);
self.group_index_lists.clear();
self.emit_group_index_list_buffer.clear();
self.scalarized_indices.clear();
self.vectorized_append_row_indices.clear();
self.vectorized_equal_to_row_indices.clear();
self.vectorized_equal_to_group_indices.clear();
self.vectorized_equal_to_results.clear();
self.vectorized_operation_buffers.clear();
}
}

Expand Down

0 comments on commit 7a1ed90

Please sign in to comment.