Skip to content

Commit dd5b948

Browse files
committed
draft.
1 parent b9bf6c9 commit dd5b948

File tree

3 files changed

+13
-8
lines changed

3 files changed

+13
-8
lines changed

datafusion/physical-plan/src/aggregates/group_values/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ mod bytes_view;
3333
use bytes::GroupValuesByes;
3434
use datafusion_physical_expr::binary_map::OutputType;
3535

36+
pub type GroupBlock = Vec<ArrayRef>;
37+
3638
/// An interning store for group keys
3739
pub trait GroupValues: Send {
3840
/// Calculates the `groups` for each input row of `cols`
@@ -48,13 +50,13 @@ pub trait GroupValues: Send {
4850
fn len(&self) -> usize;
4951

5052
/// Emits the group values
51-
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
53+
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<GroupBlock>>;
5254

5355
/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
5456
fn clear_shrink(&mut self, batch: &RecordBatch);
5557
}
5658

57-
pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
59+
pub fn new_group_values(schema: SchemaRef, batch_size: usize) -> Result<Box<dyn GroupValues>> {
5860
if schema.fields.len() == 1 {
5961
let d = schema.fields[0].data_type();
6062

@@ -92,5 +94,5 @@ pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
9294
}
9395
}
9496

95-
Ok(Box::new(GroupValuesRows::try_new(schema)?))
97+
Ok(Box::new(GroupValuesRows::try_new(schema, batch_size)?))
9698
}

datafusion/physical-plan/src/aggregates/group_values/row.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::aggregates::group_values::GroupValues;
18+
use crate::aggregates::group_values::{GroupBlock, GroupValues};
1919
use ahash::RandomState;
2020
use arrow::compute::cast;
2121
use arrow::record_batch::RecordBatch;
@@ -57,7 +57,7 @@ pub struct GroupValuesRows {
5757
/// important for multi-column group keys.
5858
///
5959
/// [`Row`]: arrow::row::Row
60-
group_values: Option<Rows>,
60+
group_values: Vec<Rows>,
6161

6262
/// reused buffer to store hashes
6363
hashes_buffer: Vec<u64>,
@@ -67,10 +67,12 @@ pub struct GroupValuesRows {
6767

6868
/// Random state for creating hashes
6969
random_state: RandomState,
70+
71+
page_size: usize,
7072
}
7173

7274
impl GroupValuesRows {
73-
pub fn try_new(schema: SchemaRef) -> Result<Self> {
75+
pub fn try_new(schema: SchemaRef, page_size: usize) -> Result<Self> {
7476
let row_converter = RowConverter::new(
7577
schema
7678
.fields()
@@ -94,6 +96,7 @@ impl GroupValuesRows {
9496
hashes_buffer: Default::default(),
9597
rows_buffer,
9698
random_state: Default::default(),
99+
page_size,
97100
})
98101
}
99102
}
@@ -179,7 +182,7 @@ impl GroupValues for GroupValuesRows {
179182
.unwrap_or(0)
180183
}
181184

182-
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
185+
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<GroupBlock>> {
183186
let mut group_values = self
184187
.group_values
185188
.take()

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ impl GroupedHashAggregateStream {
463463
ordering.as_slice(),
464464
)?;
465465

466-
let group_values = new_group_values(group_schema)?;
466+
let group_values = new_group_values(group_schema, batch_size)?;
467467
timer.done();
468468

469469
let exec_state = ExecutionState::ReadingInput;

0 commit comments

Comments
 (0)