Skip to content

Commit 7f5ccd2

Browse files
committed
impl the new interface functions for all GroupValues impls.
1 parent 6269fb7 commit 7f5ccd2

File tree

5 files changed

+24
-26
lines changed

5 files changed

+24
-26
lines changed

datafusion/physical-plan/src/aggregates/group_values/bytes.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::aggregates::group_values::GroupValues;
18+
use crate::aggregates::group_values::{GroupIdx, GroupValues};
1919
use arrow_array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch};
2020
use datafusion_expr::EmitTo;
2121
use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
@@ -44,7 +44,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
4444
fn intern(
4545
&mut self,
4646
cols: &[ArrayRef],
47-
groups: &mut Vec<usize>,
47+
groups: &mut Vec<GroupIdx>,
4848
) -> datafusion_common::Result<()> {
4949
assert_eq!(cols.len(), 1);
5050

@@ -63,7 +63,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
6363
},
6464
// called for each group
6565
|group_idx| {
66-
groups.push(group_idx);
66+
groups.push(GroupIdx::new(0, group_idx as u64));
6767
},
6868
);
6969

@@ -84,7 +84,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
8484
self.num_groups
8585
}
8686

87-
fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
87+
fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<Vec<ArrayRef>>> {
8888
// Reset the map to default, and convert it into a single array
8989
let map_contents = self.map.take().into_state();
9090

@@ -111,13 +111,13 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
111111
self.intern(&[remaining_group_values], &mut group_indexes)?;
112112

113113
// Verify that the group indexes were assigned in the correct order
114-
assert_eq!(0, group_indexes[0]);
114+
assert_eq!(0, group_indexes[0].block_offset());
115115

116116
emit_group_values
117117
}
118118
};
119119

120-
Ok(vec![group_values])
120+
Ok(vec![vec![group_values]])
121121
}
122122

123123
fn clear_shrink(&mut self, _batch: &RecordBatch) {

datafusion/physical-plan/src/aggregates/group_values/bytes_view.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::aggregates::group_values::GroupValues;
18+
use crate::aggregates::group_values::{GroupIdx, GroupValues};
1919
use arrow_array::{Array, ArrayRef, RecordBatch};
2020
use datafusion_expr::EmitTo;
2121
use datafusion_physical_expr::binary_map::OutputType;
@@ -45,7 +45,7 @@ impl GroupValues for GroupValuesBytesView {
4545
fn intern(
4646
&mut self,
4747
cols: &[ArrayRef],
48-
groups: &mut Vec<usize>,
48+
groups: &mut Vec<GroupIdx>,
4949
) -> datafusion_common::Result<()> {
5050
assert_eq!(cols.len(), 1);
5151

@@ -64,7 +64,7 @@ impl GroupValues for GroupValuesBytesView {
6464
},
6565
// called for each group
6666
|group_idx| {
67-
groups.push(group_idx);
67+
groups.push(GroupIdx::new(0, group_idx as u64));
6868
},
6969
);
7070

@@ -85,7 +85,7 @@ impl GroupValues for GroupValuesBytesView {
8585
self.num_groups
8686
}
8787

88-
fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
88+
fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<Vec<ArrayRef>>> {
8989
// Reset the map to default, and convert it into a single array
9090
let map_contents = self.map.take().into_state();
9191

@@ -112,13 +112,13 @@ impl GroupValues for GroupValuesBytesView {
112112
self.intern(&[remaining_group_values], &mut group_indexes)?;
113113

114114
// Verify that the group indexes were assigned in the correct order
115-
assert_eq!(0, group_indexes[0]);
115+
assert_eq!(0, group_indexes[0].block_offset());
116116

117117
emit_group_values
118118
}
119119
};
120120

121-
Ok(vec![group_values])
121+
Ok(vec![vec![group_values]])
122122
}
123123

124124
fn clear_shrink(&mut self, _batch: &RecordBatch) {

datafusion/physical-plan/src/aggregates/group_values/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl GroupIdx {
6161
/// An interning store for group keys
6262
pub trait GroupValues: Send {
6363
/// Calculates the `groups` for each input row of `cols`
64-
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>;
64+
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<GroupIdx>) -> Result<()>;
6565

6666
/// Returns the number of bytes used by this [`GroupValues`]
6767
fn size(&self) -> usize;

datafusion/physical-plan/src/aggregates/group_values/primitive.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::aggregates::group_values::GroupValues;
18+
use crate::aggregates::group_values::{GroupIdx, GroupValues};
1919
use ahash::RandomState;
2020
use arrow::array::BooleanBufferBuilder;
2121
use arrow::buffer::NullBuffer;
@@ -111,7 +111,7 @@ impl<T: ArrowPrimitiveType> GroupValues for GroupValuesPrimitive<T>
111111
where
112112
T::Native: HashValue,
113113
{
114-
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
114+
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<GroupIdx>) -> Result<()> {
115115
assert_eq!(cols.len(), 1);
116116
groups.clear();
117117

@@ -145,7 +145,7 @@ where
145145
}
146146
}
147147
};
148-
groups.push(group_id)
148+
groups.push(GroupIdx::new(0, group_id as u64))
149149
}
150150
Ok(())
151151
}
@@ -162,7 +162,7 @@ where
162162
self.values.len()
163163
}
164164

165-
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
165+
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<Vec<ArrayRef>>> {
166166
fn build_primitive<T: ArrowPrimitiveType>(
167167
values: Vec<T::Native>,
168168
null_idx: Option<usize>,
@@ -207,7 +207,7 @@ where
207207
build_primitive(split, null_group)
208208
}
209209
};
210-
Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))])
210+
Ok(vec![vec![Arc::new(array.with_data_type(self.data_type.clone()))]])
211211
}
212212

213213
fn clear_shrink(&mut self, batch: &RecordBatch) {

datafusion/physical-plan/src/aggregates/group_values/row.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::collections::VecDeque;
1919
use std::mem;
2020

21-
use crate::aggregates::group_values::{GroupBlock, GroupIdx, GroupValues};
21+
use crate::aggregates::group_values::{GroupIdx, GroupValues};
2222
use ahash::RandomState;
2323
use arrow::compute::cast;
2424
use arrow::record_batch::RecordBatch;
@@ -189,7 +189,7 @@ impl GroupValues for GroupValuesRows {
189189
}
190190

191191
fn size(&self) -> usize {
192-
let group_values_size = self.group_values_blocks.as_ref().map(|v| v.size()).unwrap_or(0);
192+
let group_values_size = self.group_values_blocks.iter().map(|v| v.size()).sum::<usize>();
193193
self.row_converter.size()
194194
+ group_values_size
195195
+ self.map_size
@@ -203,9 +203,9 @@ impl GroupValues for GroupValuesRows {
203203

204204
fn len(&self) -> usize {
205205
self.group_values_blocks
206-
.as_ref()
206+
.iter()
207207
.map(|group_values| group_values.num_rows())
208-
.unwrap_or(0)
208+
.sum::<usize>()
209209
}
210210

211211
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<Vec<ArrayRef>>> {
@@ -293,10 +293,8 @@ impl GroupValues for GroupValuesRows {
293293

294294
fn clear_shrink(&mut self, batch: &RecordBatch) {
295295
let count = batch.num_rows();
296-
self.group_values_blocks = self.group_values_blocks.take().map(|mut rows| {
297-
rows.clear();
298-
rows
299-
});
296+
let mut old_blocks = mem::replace(&mut self.group_values_blocks, VecDeque::new());
297+
old_blocks.iter_mut().for_each(|block| block.clear());
300298
self.map.clear();
301299
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
302300
self.map_size = self.map.capacity() * std::mem::size_of::<(u64, usize)>();

0 commit comments

Comments
 (0)