Skip to content

Commit f81b292

Browse files
committed
introduce PartitionedOutput.
1 parent 553c6a3 commit f81b292

File tree

1 file changed

+57
-9
lines changed

1 file changed

+57
-9
lines changed

datafusion/physical-plan/src/aggregates/row_hash.rs

+57-9
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use std::sync::Arc;
2121
use std::task::{Context, Poll};
22-
use std::vec;
22+
use std::{mem, vec};
2323

2424
use crate::aggregates::group_values::{new_group_values, GroupValuesLike};
2525
use crate::aggregates::order::GroupOrderingFull;
@@ -79,28 +79,76 @@ use super::order::GroupOrdering;
7979
use super::AggregateExec;
8080

8181
struct PartitionedOutput {
82-
batches: Vec<Option<RecordBatch>>,
83-
current_idx: usize,
84-
exhausted: bool
82+
partitions: Vec<Option<RecordBatch>>,
83+
start_idx: usize,
84+
batch_size: usize,
85+
num_partitions: usize,
86+
exhausted: bool,
8587
}
8688

8789
impl PartitionedOutput {
88-
pub fn new(batches: Vec<RecordBatch>) -> Self {
89-
let batches = batches.into_iter().map(|batch| Some(batch)).collect();
90+
pub fn new(
91+
src_batches: Vec<RecordBatch>,
92+
batch_size: usize,
93+
num_partitions: usize,
94+
) -> Self {
95+
let partitions = src_batches.into_iter().map(|batch| Some(batch)).collect();
9096

9197
Self {
92-
batches,
93-
current_idx: 0,
98+
partitions,
99+
start_idx: 0,
100+
batch_size,
101+
num_partitions,
94102
exhausted: false,
95103
}
96104
}
97105

98106
pub fn next_batch(&mut self) -> Option<RecordBatch> {
107+
let mut current_idx = self.start_idx;
108+
loop {
109+
// If found a partition having data,
110+
let batch_opt = if self.partitions[current_idx].is_some() {
111+
Some(self.extract_batch_from_partition(current_idx))
112+
} else {
113+
None
114+
};
115+
116+
// Advance the `current_idx`
117+
current_idx = (current_idx + 1) % self.num_partitions;
118+
119+
if batch_opt.is_some() {
120+
// If found batch, we update the `start_idx` and return it
121+
self.start_idx = current_idx;
122+
return batch_opt;
123+
} else if self.start_idx == current_idx {
124+
// If not found, and has loop to end, we return None
125+
return batch_opt;
126+
}
127+
// Otherwise, we loop to check next partition
128+
}
129+
}
99130

131+
pub fn extract_batch_from_partition(&mut self, part_idx: usize) -> RecordBatch {
132+
let partition_batch = mem::take(&mut self.partitions[part_idx]).unwrap();
133+
if partition_batch.num_rows() > self.batch_size {
134+
// If still the exist rows num > `batch_size`,
135+
// cut off `batch_size` rows as `output``,
136+
// and set back `remaining`.
137+
let size = self.batch_size;
138+
let num_remaining = batch.num_rows() - size;
139+
let remaining = partition_batch.slice(size, num_remaining);
140+
let output = partition_batch.slice(0, size);
141+
self.partitions[part_idx] = Some(remaining);
142+
143+
output
144+
} else {
145+
// If they are the last rows in `partition_batch`, just return,
146+
// because `partition_batch` has been set to `None`.
147+
partition_batch
148+
}
100149
}
101150
}
102151

103-
104152
/// This encapsulates the spilling state
105153
struct SpillState {
106154
// ========================================================================

0 commit comments

Comments
 (0)