Skip to content

[WIP] Manage group values by blocks in aggregation #11932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/aggregates/group_values/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
self.num_groups
}

fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<Vec<ArrayRef>>> {
// Reset the map to default, and convert it into a single array
let map_contents = self.map.take().into_state();

Expand Down Expand Up @@ -117,7 +117,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
}
};

Ok(vec![group_values])
Ok(vec![vec![group_values]])
}

fn clear_shrink(&mut self, _batch: &RecordBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl GroupValues for GroupValuesBytesView {
self.num_groups
}

fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<Vec<ArrayRef>>> {
// Reset the map to default, and convert it into a single array
let map_contents = self.map.take().into_state();

Expand Down Expand Up @@ -118,7 +118,7 @@ impl GroupValues for GroupValuesBytesView {
}
};

Ok(vec![group_values])
Ok(vec![vec![group_values]])
}

fn clear_shrink(&mut self, _batch: &RecordBatch) {
Expand Down
38 changes: 35 additions & 3 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,38 @@ mod bytes_view;
use bytes::GroupValuesByes;
use datafusion_physical_expr::binary_map::OutputType;

const GROUP_IDX_HIGH_16_BITS_MASK: u64 = 0xffff000000000000;
const GROUP_IDX_LOW_48_BITS_MASK: u64 = 0x0000ffffffffffff;

#[derive(Debug, Clone, Copy)]
pub struct GroupIdx(u64);

impl GroupIdx {
pub fn new(block_id: u16, block_offset: u64) -> Self {
let group_idx_high_part = ((block_id as u64) << 48) & GROUP_IDX_HIGH_16_BITS_MASK;
let group_idx_low_part = block_offset & GROUP_IDX_LOW_48_BITS_MASK;

Self(group_idx_high_part | group_idx_low_part)
}

#[inline]
pub fn block_id(&self) -> usize {
((self.0 & GROUP_IDX_HIGH_16_BITS_MASK) >> 48) as usize
}

#[inline]
pub fn block_offset(&self) -> usize {
(self.0 & GROUP_IDX_LOW_48_BITS_MASK) as usize
}

pub fn as_flat_group_idx(&self, max_block_size: usize) -> usize {
let block_id = self.block_id();
let block_offset = self.block_offset();

block_id * max_block_size + block_offset
}
}

/// An interning store for group keys
pub trait GroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
Expand All @@ -48,13 +80,13 @@ pub trait GroupValues: Send {
fn len(&self) -> usize;

/// Emits the group values
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<Vec<ArrayRef>>>;

/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
fn clear_shrink(&mut self, batch: &RecordBatch);
}

pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
pub fn new_group_values(schema: SchemaRef, batch_size: usize) -> Result<Box<dyn GroupValues>> {
if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();

Expand Down Expand Up @@ -92,5 +124,5 @@ pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
}
}

Ok(Box::new(GroupValuesRows::try_new(schema)?))
Ok(Box::new(GroupValuesRows::try_new(schema, batch_size)?))
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ where
self.values.len()
}

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<Vec<ArrayRef>>> {
fn build_primitive<T: ArrowPrimitiveType>(
values: Vec<T::Native>,
null_idx: Option<usize>,
Expand Down Expand Up @@ -207,7 +207,7 @@ where
build_primitive(split, null_group)
}
};
Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))])
Ok(vec![vec![Arc::new(array.with_data_type(self.data_type.clone()))]])
}

fn clear_shrink(&mut self, batch: &RecordBatch) {
Expand Down
Loading
Loading