Skip to content

Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered (V2) #6124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 44 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6c1dfeb
add starting code for experimenting
mustafasrepo Mar 22, 2023
1d8e6f5
stream group by linear implementation
mustafasrepo Mar 22, 2023
e35703b
sorted implementation
mustafasrepo Mar 29, 2023
7057106
Merge branch 'main' into feature/stream_groupby
mustafasrepo Mar 29, 2023
16c52f8
minor changes
mustafasrepo Mar 29, 2023
f67313d
Merge branch 'main' into feature/stream_groupby
mustafasrepo Mar 31, 2023
48c8085
simplifications
mustafasrepo Mar 31, 2023
da7b2c6
Simplifications
mustafasrepo Mar 31, 2023
ab93bf3
convert vec to Option
mustafasrepo Apr 3, 2023
6134751
minor changes
mustafasrepo Apr 4, 2023
2cf0180
minor changes
mustafasrepo Apr 4, 2023
2802685
minor changes
mustafasrepo Apr 4, 2023
786caef
simplifications
mustafasrepo Apr 4, 2023
f04bd05
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 5, 2023
a9f78cb
minor changes
mustafasrepo Apr 5, 2023
a9f6d93
all tests pass
mustafasrepo Apr 6, 2023
4f49e55
refactor
mustafasrepo Apr 5, 2023
0a0b496
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 7, 2023
ae29248
simplifications
mustafasrepo Apr 7, 2023
8828aec
Merge branch 'main' into feature/output_order_vec
mustafasrepo Apr 7, 2023
45a0aab
Merge branch 'feature/output_order_vec' into feature/stream_groupby
mustafasrepo Apr 10, 2023
c1872f6
remove unnecessary code
mustafasrepo Apr 10, 2023
b4c25ff
simplifications
mustafasrepo Apr 10, 2023
c6730c0
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 11, 2023
e321082
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 12, 2023
bb55f50
minor changes
mustafasrepo Apr 12, 2023
2eab0d0
simplifications
mustafasrepo Apr 12, 2023
cfc86e4
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 12, 2023
2fc47a8
Merge branch 'main' into feature/stream_groupby
mustafasrepo Apr 14, 2023
0932f52
minor changes
mustafasrepo Apr 14, 2023
4083422
Simplify the GroupByOrderMode type
ozankabak Apr 13, 2023
c17186a
Address reviews
mustafasrepo Apr 17, 2023
01dd18b
separate fully ordered case and remaining cases
mustafasrepo Apr 17, 2023
e4f4347
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 20, 2023
479bc0c
change test data type
mustafasrepo Apr 21, 2023
19f82da
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 21, 2023
6e70583
address reviews
mustafasrepo Apr 24, 2023
e13742c
Convert to option
mustafasrepo Apr 24, 2023
feb9117
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 25, 2023
0de426c
retract back to old API.
mustafasrepo Apr 25, 2023
4a07c10
Merge branch 'main' into feature/stream_groupby4
mustafasrepo Apr 25, 2023
70a13f4
Code quality: stylistic changes
ozankabak Apr 25, 2023
905c6e9
Separate bounded stream and hash stream
mustafasrepo Apr 26, 2023
09944e7
Update comments
mustafasrepo Apr 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,043 changes: 1,043 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs

Large diffs are not rendered by default.

291 changes: 257 additions & 34 deletions datafusion/core/src/physical_plan/aggregates/mod.rs

Large diffs are not rendered by default.

140 changes: 11 additions & 129 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,34 @@ use std::task::{Context, Poll};
use std::vec;

use ahash::RandomState;
use arrow::row::{OwnedRow, RowConverter, SortField};
use arrow::row::{RowConverter, SortField};
use datafusion_physical_expr::hash_utils::create_hashes;
use futures::ready;
use futures::stream::{Stream, StreamExt};

use crate::execution::context::TaskContext;
use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use crate::physical_plan::aggregates::utils::{
aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
};
use crate::physical_plan::aggregates::{
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AccumulatorItem,
AggregateMode, PhysicalGroupBy, RowAccumulatorItem,
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
PhysicalGroupBy, RowAccumulatorItem,
};
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use arrow::array::*;
use arrow::compute::{cast, filter};
use arrow::datatypes::{DataType, Schema, UInt32Type};
use arrow::{compute, datatypes::SchemaRef, record_batch::RecordBatch};
use arrow::compute::cast;
use arrow::datatypes::DataType;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::utils::get_arrayref_at_indices;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;
use datafusion_row::accessor::RowAccessor;
use datafusion_row::layout::RowLayout;
use datafusion_row::reader::{read_row, RowReader};
use datafusion_row::MutableRecordBatch;
use hashbrown::raw::RawTable;
use itertools::izip;

Expand All @@ -68,7 +69,6 @@ use itertools::izip;
/// 4. The state's RecordBatch is `merge`d to a new state
/// 5. The state is mapped to the final value
///
/// [Arrow-row]: OwnedRow
/// [WordAligned]: datafusion_row::layout
pub(crate) struct GroupedHashAggregateStream {
schema: SchemaRef,
Expand Down Expand Up @@ -107,22 +107,6 @@ pub(crate) struct GroupedHashAggregateStream {
indices: [Vec<Range<usize>>; 2],
}

#[derive(Debug)]
Copy link
Contributor Author

@mustafasrepo mustafasrepo Apr 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved functions, structs common for both in row_hash.rs and bounded_aggregate_stream.rs to the inside util.rs file. All the changes in this file, util.rs comes because of this reason.

/// tracks what phase the aggregation is in
enum ExecutionState {
ReadingInput,
ProducingOutput,
Done,
}

fn aggr_state_schema(aggr_expr: &[Arc<dyn AggregateExpr>]) -> Result<SchemaRef> {
let fields = aggr_expr
.iter()
.flat_map(|expr| expr.state_fields().unwrap().into_iter())
.collect::<Vec<_>>();
Ok(Arc::new(Schema::new(fields)))
}

impl GroupedHashAggregateStream {
/// Create a new GroupedHashAggregateStream
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -617,25 +601,8 @@ impl GroupedHashAggregateStream {
}
}

/// The state that is built for each output group.
#[derive(Debug)]
pub struct GroupState {
/// The actual group by values, stored sequentially
group_by_values: OwnedRow,

// Accumulator state, stored sequentially
pub aggregation_buffer: Vec<u8>,

// Accumulator state, one for each aggregate that doesn't support row accumulation
pub accumulator_set: Vec<AccumulatorItem>,

/// scratch space used to collect indices for input rows in a
/// bach that have values to aggregate. Reset on each batch
pub indices: Vec<u32>,
}

/// The state of all the groups
pub struct AggregationState {
pub(crate) struct AggregationState {
pub reservation: MemoryReservation,

/// Logically maps group values to an index in `group_states`
Expand Down Expand Up @@ -788,88 +755,3 @@ impl GroupedHashAggregateStream {
Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
}
}

fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
let row_num = rows.len();
let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
let mut row = RowReader::new(schema);

for data in rows {
row.point_to(0, data);
read_row(&row, &mut output, schema);
}

output.output_as_columns()
}

fn get_at_indices(
input_values: &[Vec<ArrayRef>],
batch_indices: &PrimitiveArray<UInt32Type>,
) -> Result<Vec<Vec<ArrayRef>>> {
input_values
.iter()
.map(|array| get_arrayref_at_indices(array, batch_indices))
.collect()
}

fn get_optional_filters(
original_values: &[Option<Arc<dyn Array>>],
batch_indices: &PrimitiveArray<UInt32Type>,
) -> Vec<Option<Arc<dyn Array>>> {
original_values
.iter()
.map(|array| {
array.as_ref().map(|array| {
compute::take(
array.as_ref(),
batch_indices,
None, // None: no index check
)
.unwrap()
})
})
.collect()
}

fn slice_and_maybe_filter(
aggr_array: &[ArrayRef],
filter_opt: Option<&Arc<dyn Array>>,
offsets: &[usize],
) -> Result<Vec<ArrayRef>> {
let sliced_arrays: Vec<ArrayRef> = aggr_array
.iter()
.map(|array| array.slice(offsets[0], offsets[1] - offsets[0]))
.collect();

let filtered_arrays = match filter_opt.as_ref() {
Some(f) => {
let sliced = f.slice(offsets[0], offsets[1] - offsets[0]);
let filter_array = as_boolean_array(&sliced)?;

sliced_arrays
.iter()
.map(|array| filter(array, filter_array).unwrap())
.collect::<Vec<ArrayRef>>()
}
None => sliced_arrays,
};
Ok(filtered_arrays)
}

/// This method is similar to Scalar::try_from_array except for the Null handling.
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
fn col_to_scalar(
array: &ArrayRef,
filter: &Option<&BooleanArray>,
row_index: usize,
) -> Result<ScalarValue> {
if array.is_null(row_index) {
return Ok(ScalarValue::Null);
}
if let Some(filter) = filter {
if !filter.value(row_index) {
return Ok(ScalarValue::Null);
}
}
ScalarValue::try_from_array(array, row_index)
}
151 changes: 151 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::physical_plan::aggregates::AccumulatorItem;
use arrow::compute;
use arrow::compute::filter;
use arrow::row::OwnedRow;
use arrow_array::types::UInt32Type;
use arrow_array::{Array, ArrayRef, BooleanArray, PrimitiveArray};
use arrow_schema::{Schema, SchemaRef};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::utils::get_arrayref_at_indices;
use datafusion_common::{Result, ScalarValue};
use datafusion_physical_expr::AggregateExpr;
use datafusion_row::reader::{read_row, RowReader};
use datafusion_row::MutableRecordBatch;
use std::sync::Arc;

/// The state that is built for each output group.
#[derive(Debug)]
pub(crate) struct GroupState {
/// The actual group by values, stored sequentially
pub group_by_values: OwnedRow,

// Accumulator state, stored sequentially
pub aggregation_buffer: Vec<u8>,

// Accumulator state, one for each aggregate that doesn't support row accumulation
pub accumulator_set: Vec<AccumulatorItem>,

/// scratch space used to collect indices for input rows in a
/// bach that have values to aggregate. Reset on each batch
pub indices: Vec<u32>,
}

#[derive(Debug)]
/// tracks what phase the aggregation is in
pub(crate) enum ExecutionState {
ReadingInput,
ProducingOutput,
Done,
}

pub(crate) fn aggr_state_schema(
aggr_expr: &[Arc<dyn AggregateExpr>],
) -> Result<SchemaRef> {
let fields = aggr_expr
.iter()
.flat_map(|expr| expr.state_fields().unwrap().into_iter())
.collect::<Vec<_>>();
Ok(Arc::new(Schema::new(fields)))
}

pub(crate) fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
let row_num = rows.len();
let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
let mut row = RowReader::new(schema);

for data in rows {
row.point_to(0, data);
read_row(&row, &mut output, schema);
}

output.output_as_columns()
}

pub(crate) fn get_at_indices(
input_values: &[Vec<ArrayRef>],
batch_indices: &PrimitiveArray<UInt32Type>,
) -> Result<Vec<Vec<ArrayRef>>> {
input_values
.iter()
.map(|array| get_arrayref_at_indices(array, batch_indices))
.collect()
}

pub(crate) fn get_optional_filters(
original_values: &[Option<Arc<dyn Array>>],
batch_indices: &PrimitiveArray<UInt32Type>,
) -> Vec<Option<Arc<dyn Array>>> {
original_values
.iter()
.map(|array| {
array.as_ref().map(|array| {
compute::take(
array.as_ref(),
batch_indices,
None, // None: no index check
)
.unwrap()
})
})
.collect()
}

pub(crate) fn slice_and_maybe_filter(
aggr_array: &[ArrayRef],
filter_opt: Option<&Arc<dyn Array>>,
offsets: &[usize],
) -> Result<Vec<ArrayRef>> {
let sliced_arrays: Vec<ArrayRef> = aggr_array
.iter()
.map(|array| array.slice(offsets[0], offsets[1] - offsets[0]))
.collect();

let filtered_arrays = match filter_opt.as_ref() {
Some(f) => {
let sliced = f.slice(offsets[0], offsets[1] - offsets[0]);
let filter_array = as_boolean_array(&sliced)?;

sliced_arrays
.iter()
.map(|array| filter(array, filter_array).unwrap())
.collect::<Vec<ArrayRef>>()
}
None => sliced_arrays,
};
Ok(filtered_arrays)
}

/// This method is similar to Scalar::try_from_array except for the Null handling.
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)]
pub(crate) fn col_to_scalar(
array: &ArrayRef,
filter: &Option<&BooleanArray>,
row_index: usize,
) -> Result<ScalarValue> {
if array.is_null(row_index) {
return Ok(ScalarValue::Null);
}
if let Some(filter) = filter {
if !filter.value(row_index) {
return Ok(ScalarValue::Null);
}
}
ScalarValue::try_from_array(array, row_index)
}
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl ExecutionPlan for AnalyzeExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl ExecutionPlan for FilterExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl ExecutionPlan for CrossJoinExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] || children[1] {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl ExecutionPlan for HashJoinExec {
}

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
let (left, right) = (children[0], children[1]);
Expand Down
Loading