-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Implement Streaming Aggregation: Do not break pipeline in aggregation if group by columns are ordered (V2) #6124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
mustafasrepo
merged 44 commits into
apache:main
from
synnada-ai:feature/stream_groupby5
Apr 27, 2023
Merged
Changes from all commits
Commits
Show all changes
44 commits
Select commit
Hold shift + click to select a range
6c1dfeb
add starting code for experimenting
mustafasrepo 1d8e6f5
stream group by linear implementation
mustafasrepo e35703b
sorted implementation
mustafasrepo 7057106
Merge branch 'main' into feature/stream_groupby
mustafasrepo 16c52f8
minor changes
mustafasrepo f67313d
Merge branch 'main' into feature/stream_groupby
mustafasrepo 48c8085
simplifications
mustafasrepo da7b2c6
Simplifications
mustafasrepo ab93bf3
convert vec to Option
mustafasrepo 6134751
minor changes
mustafasrepo 2cf0180
minor changes
mustafasrepo 2802685
minor changes
mustafasrepo 786caef
simplifications
mustafasrepo f04bd05
Merge branch 'main' into feature/stream_groupby
mustafasrepo a9f78cb
minor changes
mustafasrepo a9f6d93
all tests pass
mustafasrepo 4f49e55
refactor
mustafasrepo 0a0b496
Merge branch 'main' into feature/stream_groupby
mustafasrepo ae29248
simplifications
mustafasrepo 8828aec
Merge branch 'main' into feature/output_order_vec
mustafasrepo 45a0aab
Merge branch 'feature/output_order_vec' into feature/stream_groupby
mustafasrepo c1872f6
remove unnecessary code
mustafasrepo b4c25ff
simplifications
mustafasrepo c6730c0
Merge branch 'main' into feature/stream_groupby
mustafasrepo e321082
Merge branch 'main' into feature/stream_groupby
mustafasrepo bb55f50
minor changes
mustafasrepo 2eab0d0
simplifications
mustafasrepo cfc86e4
Merge branch 'main' into feature/stream_groupby
mustafasrepo 2fc47a8
Merge branch 'main' into feature/stream_groupby
mustafasrepo 0932f52
minor changes
mustafasrepo 4083422
Simplify the GroupByOrderMode type
ozankabak c17186a
Address reviews
mustafasrepo 01dd18b
separate fully ordered case and remaining cases
mustafasrepo e4f4347
Merge branch 'main' into feature/stream_groupby4
mustafasrepo 479bc0c
change test data type
mustafasrepo 19f82da
Merge branch 'main' into feature/stream_groupby4
mustafasrepo 6e70583
address reviews
mustafasrepo e13742c
Convert to option
mustafasrepo feb9117
Merge branch 'main' into feature/stream_groupby4
mustafasrepo 0de426c
retract back to old API.
mustafasrepo 4a07c10
Merge branch 'main' into feature/stream_groupby4
mustafasrepo 70a13f4
Code quality: stylistic changes
ozankabak 905c6e9
Separate bounded stream and hash stream
mustafasrepo 09944e7
Update comments
mustafasrepo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
1,043 changes: 1,043 additions & 0 deletions
1,043
datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
Large diffs are not rendered by default.
Oops, something went wrong.
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use crate::physical_plan::aggregates::AccumulatorItem; | ||
use arrow::compute; | ||
use arrow::compute::filter; | ||
use arrow::row::OwnedRow; | ||
use arrow_array::types::UInt32Type; | ||
use arrow_array::{Array, ArrayRef, BooleanArray, PrimitiveArray}; | ||
use arrow_schema::{Schema, SchemaRef}; | ||
use datafusion_common::cast::as_boolean_array; | ||
use datafusion_common::utils::get_arrayref_at_indices; | ||
use datafusion_common::{Result, ScalarValue}; | ||
use datafusion_physical_expr::AggregateExpr; | ||
use datafusion_row::reader::{read_row, RowReader}; | ||
use datafusion_row::MutableRecordBatch; | ||
use std::sync::Arc; | ||
|
||
/// The state that is built for each output group. | ||
#[derive(Debug)] | ||
pub(crate) struct GroupState { | ||
/// The actual group by values, stored sequentially | ||
pub group_by_values: OwnedRow, | ||
|
||
// Accumulator state, stored sequentially | ||
pub aggregation_buffer: Vec<u8>, | ||
|
||
// Accumulator state, one for each aggregate that doesn't support row accumulation | ||
pub accumulator_set: Vec<AccumulatorItem>, | ||
|
||
/// scratch space used to collect indices for input rows in a | ||
/// bach that have values to aggregate. Reset on each batch | ||
pub indices: Vec<u32>, | ||
} | ||
|
||
#[derive(Debug)] | ||
/// tracks what phase the aggregation is in | ||
pub(crate) enum ExecutionState { | ||
ReadingInput, | ||
ProducingOutput, | ||
Done, | ||
} | ||
|
||
pub(crate) fn aggr_state_schema( | ||
aggr_expr: &[Arc<dyn AggregateExpr>], | ||
) -> Result<SchemaRef> { | ||
let fields = aggr_expr | ||
.iter() | ||
.flat_map(|expr| expr.state_fields().unwrap().into_iter()) | ||
.collect::<Vec<_>>(); | ||
Ok(Arc::new(Schema::new(fields))) | ||
} | ||
|
||
pub(crate) fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> { | ||
let row_num = rows.len(); | ||
let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone())); | ||
let mut row = RowReader::new(schema); | ||
|
||
for data in rows { | ||
row.point_to(0, data); | ||
read_row(&row, &mut output, schema); | ||
} | ||
|
||
output.output_as_columns() | ||
} | ||
|
||
pub(crate) fn get_at_indices( | ||
input_values: &[Vec<ArrayRef>], | ||
batch_indices: &PrimitiveArray<UInt32Type>, | ||
) -> Result<Vec<Vec<ArrayRef>>> { | ||
input_values | ||
.iter() | ||
.map(|array| get_arrayref_at_indices(array, batch_indices)) | ||
.collect() | ||
} | ||
|
||
pub(crate) fn get_optional_filters( | ||
original_values: &[Option<Arc<dyn Array>>], | ||
batch_indices: &PrimitiveArray<UInt32Type>, | ||
) -> Vec<Option<Arc<dyn Array>>> { | ||
original_values | ||
.iter() | ||
.map(|array| { | ||
array.as_ref().map(|array| { | ||
compute::take( | ||
array.as_ref(), | ||
batch_indices, | ||
None, // None: no index check | ||
) | ||
.unwrap() | ||
}) | ||
}) | ||
.collect() | ||
} | ||
|
||
pub(crate) fn slice_and_maybe_filter( | ||
aggr_array: &[ArrayRef], | ||
filter_opt: Option<&Arc<dyn Array>>, | ||
offsets: &[usize], | ||
) -> Result<Vec<ArrayRef>> { | ||
let sliced_arrays: Vec<ArrayRef> = aggr_array | ||
.iter() | ||
.map(|array| array.slice(offsets[0], offsets[1] - offsets[0])) | ||
.collect(); | ||
|
||
let filtered_arrays = match filter_opt.as_ref() { | ||
Some(f) => { | ||
let sliced = f.slice(offsets[0], offsets[1] - offsets[0]); | ||
let filter_array = as_boolean_array(&sliced)?; | ||
|
||
sliced_arrays | ||
.iter() | ||
.map(|array| filter(array, filter_array).unwrap()) | ||
.collect::<Vec<ArrayRef>>() | ||
} | ||
None => sliced_arrays, | ||
}; | ||
Ok(filtered_arrays) | ||
} | ||
|
||
/// This method is similar to Scalar::try_from_array except for the Null handling. | ||
/// This method returns [ScalarValue::Null] instead of [ScalarValue::Type(None)] | ||
pub(crate) fn col_to_scalar( | ||
array: &ArrayRef, | ||
filter: &Option<&BooleanArray>, | ||
row_index: usize, | ||
) -> Result<ScalarValue> { | ||
if array.is_null(row_index) { | ||
return Ok(ScalarValue::Null); | ||
} | ||
if let Some(filter) = filter { | ||
if !filter.value(row_index) { | ||
return Ok(ScalarValue::Null); | ||
} | ||
} | ||
ScalarValue::try_from_array(array, row_index) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved functions, structs common for both in
row_hash.rs
andbounded_aggregate_stream.rs
to the insideutil.rs
file. All the changes in this file,util.rs
comes because of this reason.