Skip to content

Commit 680bf3b

Browse files
committed
draft.
1 parent b9bf6c9 commit 680bf3b

File tree

3 files changed

+80
-27
lines changed

3 files changed

+80
-27
lines changed

datafusion/physical-plan/src/aggregates/group_values/mod.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,32 @@ mod bytes_view;
3333
use bytes::GroupValuesByes;
3434
use datafusion_physical_expr::binary_map::OutputType;
3535

36+
pub type GroupBlock = Vec<ArrayRef>;
37+
38+
const GROUP_IDX_HIGH_16_BITS_MASK: u64 = 0xffff000000000000;
39+
const GROUP_IDX_LOW_48_BITS_MASK: u64 = 0x0000ffffffffffff;
40+
41+
pub struct GroupIdx(u64);
42+
43+
impl GroupIdx {
44+
pub fn new(block_id: u16, block_offset: u64) -> Self {
45+
let group_idx_high_part = ((block_id as u64) << 48) & GROUP_IDX_HIGH_16_BITS_MASK;
46+
let group_idx_low_part = block_offset & GROUP_IDX_LOW_48_BITS_MASK;
47+
48+
Self(group_idx_high_part | group_idx_low_part)
49+
}
50+
51+
#[inline]
52+
pub fn block_id(&self) -> usize {
53+
((self.0 & GROUP_IDX_HIGH_16_BITS_MASK) >> 48) as usize
54+
}
55+
56+
#[inline]
57+
pub fn block_offset(&self) -> usize {
58+
(self.0 & GROUP_IDX_LOW_48_BITS_MASK) as usize
59+
}
60+
}
61+
3662
/// An interning store for group keys
3763
pub trait GroupValues: Send {
3864
/// Calculates the `groups` for each input row of `cols`
@@ -48,13 +74,13 @@ pub trait GroupValues: Send {
4874
fn len(&self) -> usize;
4975

5076
/// Emits the group values
51-
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
77+
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<GroupBlock>>;
5278

5379
/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
5480
fn clear_shrink(&mut self, batch: &RecordBatch);
5581
}
5682

57-
pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
83+
pub fn new_group_values(schema: SchemaRef, batch_size: usize) -> Result<Box<dyn GroupValues>> {
5884
if schema.fields.len() == 1 {
5985
let d = schema.fields[0].data_type();
6086

@@ -92,5 +118,5 @@ pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
92118
}
93119
}
94120

95-
Ok(Box::new(GroupValuesRows::try_new(schema)?))
121+
Ok(Box::new(GroupValuesRows::try_new(schema, batch_size)?))
96122
}

datafusion/physical-plan/src/aggregates/group_values/row.rs

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::aggregates::group_values::GroupValues;
18+
use std::mem;
19+
20+
use crate::aggregates::group_values::{GroupBlock, GroupIdx, GroupValues};
1921
use ahash::RandomState;
2022
use arrow::compute::cast;
2123
use arrow::record_batch::RecordBatch;
@@ -44,7 +46,7 @@ pub struct GroupValuesRows {
4446
///
4547
/// keys: u64 hashes of the GroupValue
4648
/// values: (hash, group_index)
47-
map: RawTable<(u64, usize)>,
49+
map: RawTable<(u64, GroupIdx)>,
4850

4951
/// The size of `map` in bytes
5052
map_size: usize,
@@ -57,7 +59,7 @@ pub struct GroupValuesRows {
5759
/// important for multi-column group keys.
5860
///
5961
/// [`Row`]: arrow::row::Row
60-
group_values: Option<Rows>,
62+
group_values_blocks: Vec<Rows>,
6163

6264
/// reused buffer to store hashes
6365
hashes_buffer: Vec<u64>,
@@ -67,10 +69,14 @@ pub struct GroupValuesRows {
6769

6870
/// Random state for creating hashes
6971
random_state: RandomState,
72+
73+
max_block_size: usize,
74+
75+
cur_block_id: u16,
7076
}
7177

7278
impl GroupValuesRows {
73-
pub fn try_new(schema: SchemaRef) -> Result<Self> {
79+
pub fn try_new(schema: SchemaRef, page_size: usize) -> Result<Self> {
7480
let row_converter = RowConverter::new(
7581
schema
7682
.fields()
@@ -90,27 +96,31 @@ impl GroupValuesRows {
9096
row_converter,
9197
map,
9298
map_size: 0,
93-
group_values: None,
99+
group_values_blocks: Vec::new(),
94100
hashes_buffer: Default::default(),
95101
rows_buffer,
96102
random_state: Default::default(),
103+
max_block_size: page_size,
97104
})
98105
}
99106
}
100107

101108
impl GroupValues for GroupValuesRows {
102-
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
109+
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<GroupIdx>) -> Result<()> {
103110
// Convert the group keys into the row format
104111
let group_rows = &mut self.rows_buffer;
105112
group_rows.clear();
106113
self.row_converter.append(group_rows, cols)?;
107114
let n_rows = group_rows.num_rows();
108115

109-
let mut group_values = match self.group_values.take() {
110-
Some(group_values) => group_values,
111-
None => self.row_converter.empty_rows(0, 0),
116+
if self.group_values_blocks.is_empty() {
117+
// TODO: calc and use the capacity to init.
118+
let block = self.row_converter.empty_rows(0, 0);
119+
self.group_values_blocks.push(block);
112120
};
113121

122+
let mut group_values_blocks = mem::take(&mut self.group_values_blocks);
123+
114124
// tracks to which group each of the input rows belongs
115125
groups.clear();
116126

@@ -126,21 +136,38 @@ impl GroupValues for GroupValuesRows {
126136
// hash doesn't match, so check the hash first with an integer
127137
// comparison first avoid the more expensive comparison with
128138
// group value. https://github.com/apache/datafusion/pull/11718
129-
target_hash == *exist_hash
130-
// verify that the group that we are inserting with hash is
131-
// actually the same key value as the group in
132-
// existing_idx (aka group_values @ row)
133-
&& group_rows.row(row) == group_values.row(*group_idx)
139+
if target_hash != *exist_hash {
140+
return false;
141+
}
142+
143+
// verify that the group that we are inserting with hash is
144+
// actually the same key value as the group in
145+
// existing_idx (aka group_values @ row)
146+
let block_id = group_idx.block_id();
147+
let block_offset = group_idx.block_offset();
148+
let group_value = group_values_blocks[block_id].row(block_offset);
149+
group_rows.row(row) == group_value
134150
});
135151

136152
let group_idx = match entry {
137153
// Existing group_index for this group value
138154
Some((_hash, group_idx)) => *group_idx,
139155
// 1.2 Need to create new entry for the group
140156
None => {
157+
// Check if the block size has reached the limit, if so we switch to next block.
158+
let block_size = group_values_blocks.last().unwrap().num_rows();
159+
if block_size == self.max_block_size {
160+
self.cur_block_id += 1;
161+
// TODO: calc and use the capacity to init.
162+
let block = self.row_converter.empty_rows(0, 0);
163+
self.group_values_blocks.push(block);
164+
}
165+
141166
// Add new entry to aggr_state and save newly created index
142-
let group_idx = group_values.num_rows();
143-
group_values.push(group_rows.row(row));
167+
let cur_group_values = self.group_values_blocks.last_mut().unwrap();
168+
let block_offset = group_values.num_rows();
169+
let group_idx = GroupIdx::new(self.cur_block_id, block_offset);
170+
cur_group_values.push(group_rows.row(row));
144171

145172
// for hasher function, use precomputed hash value
146173
self.map.insert_accounted(
@@ -154,13 +181,13 @@ impl GroupValues for GroupValuesRows {
154181
groups.push(group_idx);
155182
}
156183

157-
self.group_values = Some(group_values);
184+
self.group_values_blocks = group_values_blocks;
158185

159186
Ok(())
160187
}
161188

162189
fn size(&self) -> usize {
163-
let group_values_size = self.group_values.as_ref().map(|v| v.size()).unwrap_or(0);
190+
let group_values_size = self.group_values_blocks.as_ref().map(|v| v.size()).unwrap_or(0);
164191
self.row_converter.size()
165192
+ group_values_size
166193
+ self.map_size
@@ -173,15 +200,15 @@ impl GroupValues for GroupValuesRows {
173200
}
174201

175202
fn len(&self) -> usize {
176-
self.group_values
203+
self.group_values_blocks
177204
.as_ref()
178205
.map(|group_values| group_values.num_rows())
179206
.unwrap_or(0)
180207
}
181208

182-
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
209+
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<GroupBlock>> {
183210
let mut group_values = self
184-
.group_values
211+
.group_values_blocks
185212
.take()
186213
.expect("Can not emit from empty rows");
187214

@@ -232,13 +259,13 @@ impl GroupValues for GroupValuesRows {
232259
}
233260
}
234261

235-
self.group_values = Some(group_values);
262+
self.group_values_blocks = Some(group_values);
236263
Ok(output)
237264
}
238265

239266
fn clear_shrink(&mut self, batch: &RecordBatch) {
240267
let count = batch.num_rows();
241-
self.group_values = self.group_values.take().map(|mut rows| {
268+
self.group_values_blocks = self.group_values_blocks.take().map(|mut rows| {
242269
rows.clear();
243270
rows
244271
});

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ impl GroupedHashAggregateStream {
463463
ordering.as_slice(),
464464
)?;
465465

466-
let group_values = new_group_values(group_schema)?;
466+
let group_values = new_group_values(group_schema, batch_size)?;
467467
timer.done();
468468

469469
let exec_state = ExecutionState::ReadingInput;

0 commit comments

Comments
 (0)