Skip to content

Commit

Permalink
Skipping partial aggregation when it is not helping for high cardinal…
Browse files Browse the repository at this point in the history
…ity aggregates (#11627)

* rfc: optional skipping partial aggregation

* benchmarks for convert_to_state

* speeding up conversion to state

* Fix MSRV error on 1.76.0

* Improve aggregatation documentation

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
korowa and alamb authored Aug 5, 2024
1 parent 336c15e commit c340b6a
Show file tree
Hide file tree
Showing 14 changed files with 1,261 additions and 15 deletions.
9 changes: 9 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ config_namespace! {

/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false

/// Aggregation ratio (number of distinct groups / number of input rows)
/// threshold for skipping partial aggregation. If the value is greater
/// then partial aggregation will skip aggregation for further input
pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8

/// Number of input rows partial aggregation partition should process, before
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
}
}

Expand Down
105 changes: 103 additions & 2 deletions datafusion/expr/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub trait Accumulator: Send + Sync + Debug {
///
/// Intermediate state is used for "multi-phase" grouping in
/// DataFusion, where an aggregate is computed in parallel with
/// multiple `Accumulator` instances, as illustrated below:
/// multiple `Accumulator` instances, as described below:
///
/// # MultiPhase Grouping
///
Expand Down Expand Up @@ -130,7 +130,7 @@ pub trait Accumulator: Send + Sync + Debug {
/// `───────' `───────'
/// ```
///
/// The partial state is serialied as `Arrays` and then combined
/// The partial state is serialized as `Arrays` and then combined
/// with other partial states from different instances of this
/// Accumulator (that ran on different partitions, for example).
///
Expand All @@ -147,6 +147,107 @@ pub trait Accumulator: Send + Sync + Debug {
/// Note that [`ScalarValue::List`] can be used to pass multiple
/// values if the number of intermediate values is not known at
/// planning time (e.g. for `MEDIAN`)
///
/// # Multi-phase repartitioned Grouping
///
/// Many multi-phase grouping plans contain a Repartition operation
/// as well as shown below:
///
/// ```text
/// ▲ ▲
/// │ │
/// │ │
/// │ │
/// │ │
/// │ │
/// ┌───────────────────────┐ ┌───────────────────────┐ 4. Each AggregateMode::Final
/// │GroupBy │ │GroupBy │ GroupBy has an entry for its
/// │(AggregateMode::Final) │ │(AggregateMode::Final) │ subset of groups (in this case
/// │ │ │ │ that means half the entries)
/// └───────────────────────┘ └───────────────────────┘
/// ▲ ▲
/// │ │
/// └─────────────┬────────────┘
/// │
/// │
/// │
/// ┌─────────────────────────┐ 3. Repartitioning by hash(group
/// │ Repartition │ keys) ensures that each distinct
/// │ HASH(x) │ group key now appears in exactly
/// └─────────────────────────┘ one partition
/// ▲
/// │
/// ┌───────────────┴─────────────┐
/// │ │
/// │ │
/// ┌─────────────────────────┐ ┌──────────────────────────┐ 2. Each AggregateMode::Partial
/// │ GroubyBy │ │ GroubyBy │ GroupBy has an entry for *all*
/// │(AggregateMode::Partial) │ │ (AggregateMode::Partial) │ the groups
/// └─────────────────────────┘ └──────────────────────────┘
/// ▲ ▲
/// │ ┌┘
/// │ │
/// .─────────. .─────────.
/// ,─' '─. ,─' '─.
/// ; Input : ; Input : 1. Since input data is
/// : Partition 0 ; : Partition 1 ; arbitrarily or RoundRobin
/// ╲ ╱ ╲ ╱ distributed, each partition
/// '─. ,─' '─. ,─' likely has all distinct
/// `───────' `───────'
/// ```
///
/// This structure is used so that the `AggregateMode::Partial` accumulators
/// reduces the cardinality of the input as soon as possible. Typically,
/// each partial accumulator sees all groups in the input as the group keys
/// are evenly distributed across the input.
///
/// The final output is computed by repartitioning the result of
/// [`Self::state`] from each Partial aggregate and `hash(group keys)` so
/// that each distinct group key appears in exactly one of the
/// `AggregateMode::Final` GroupBy nodes. The output of the final nodes are
/// then unioned together to produce the overall final output.
///
/// Here is an example that shows the distribution of groups in the
/// different phases
///
/// ```text
/// ┌─────┐ ┌─────┐
/// │ 1 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │ After repartitioning by
/// └─────┘ └─────┘ hash(group keys), each distinct
/// ┌─────┐ ┌─────┐ group key now appears in exactly
/// │ 1 │ │ 3 │ one partition
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │
/// └─────┘ └─────┘
///
///
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
///
/// ┌─────┐ ┌─────┐
/// │ 2 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 3 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 1 │
/// └─────┘ └─────┘ Input data is arbitrarily or
/// ... ... RoundRobin distributed, each
/// ┌─────┐ ┌─────┐ partition likely has all
/// │ 1 │ │ 4 │ distinct group keys
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 1 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// └─────┘ └─────┘
///
/// group values group values
/// in partition 0 in partition 1
/// ```
fn state(&mut self) -> Result<Vec<ScalarValue>>;

/// Updates the accumulator's state from an `Array` containing one
Expand Down
66 changes: 61 additions & 5 deletions datafusion/expr/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Vectorized [`GroupsAccumulator`]

use arrow_array::{ArrayRef, BooleanArray};
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Result};

/// Describes how many rows should be emitted during grouping.
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -128,18 +128,23 @@ pub trait GroupsAccumulator: Send {
/// Returns the intermediate aggregate state for this accumulator,
/// used for multi-phase grouping, resetting its internal state.
///
/// See [`Accumulator::state`] for more information on multi-phase
/// aggregation.
///
/// For example, `AVG` might return two arrays: `SUM` and `COUNT`
/// but the `MIN` aggregate would just return a single array.
///
/// Note more sophisticated internal state can be passed as
/// single `StructArray` rather than multiple arrays.
///
/// See [`Self::evaluate`] for details on the required output
/// order and `emit_to`.
/// order and `emit_to`.
///
/// [`Accumulator::state`]: crate::Accumulator::state
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

/// Merges intermediate state (the output from [`Self::state`])
/// into this accumulator's values.
/// into this accumulator's current state.
///
/// For some aggregates (such as `SUM`), `merge_batch` is the same
/// as `update_batch`, but for some aggregates (such as `COUNT`,
Expand All @@ -158,8 +163,59 @@ pub trait GroupsAccumulator: Send {
total_num_groups: usize,
) -> Result<()>;

/// Converts an input batch directly the intermediate aggregate state.
///
/// This is the equivalent of treating each input row as its own group. It
/// is invoked when the Partial phase of a multi-phase aggregation is not
/// reducing the cardinality enough to warrant spending more effort on
/// pre-aggregation (see `Background` section below), and switches to
/// passing intermediate state directly on to the next aggregation phase.
///
/// Examples:
/// * `COUNT`: an array of 1s for each row in the input batch.
/// * `SUM/MIN/MAX`: the input values themselves.
///
/// # Arguments
/// * `values`: the input arguments to the accumulator
/// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored
///
/// # Background
///
/// In a multi-phase aggregation (see [`Accumulator::state`]), the initial
/// Partial phase reduces the cardinality of the input data as soon as
/// possible in the plan.
///
/// This strategy is very effective for queries with a small number of
/// groups, as most of the data is aggregated immediately and only a small
/// amount of data must be repartitioned (see [`Accumulator::state`] for
/// background)
///
/// However, for queries with a large number of groups, the Partial phase
/// often does not reduce the cardinality enough to warrant the memory and
/// CPU cost of actually performing the aggregation. For such cases, the
/// HashAggregate operator will dynamically switch to passing intermediate
/// state directly to the next aggregation phase with minimal processing
/// using this method.
///
/// [`Accumulator::state`]: crate::Accumulator::state
fn convert_to_state(
&self,
_values: &[ArrayRef],
_opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
not_impl_err!("Input batch conversion to state not implemented")
}

/// Returns `true` if [`Self::convert_to_state`] is implemented to support
/// intermediate aggregate state conversion.
fn supports_convert_to_state(&self) -> bool {
false
}

/// Amount of memory used to store the state of this accumulator,
/// in bytes. This function is called once per batch, so it should
/// be `O(n)` to compute, not `O(num_groups)`
/// in bytes.
///
/// This function is called once per batch, so it should be `O(n)` to
/// compute, not `O(num_groups)`
fn size(&self) -> usize;
}
4 changes: 3 additions & 1 deletion datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {

/// Return the fields used to store the intermediate state of this accumulator.
///
/// See [`Accumulator::state`] for background information.
///
/// args: [`StateFieldsArgs`] contains arguments passed to the
/// aggregate function's accumulator.
///
Expand Down Expand Up @@ -388,7 +390,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// # Notes
///
/// Even if this function returns true, DataFusion will still use
/// `Self::accumulator` for certain queries, such as when this aggregate is
/// [`Self::accumulator`] for certain queries, such as when this aggregate is
/// used as a window function or when there no GROUP BY columns in the
/// query.
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
Expand Down
10 changes: 10 additions & 0 deletions datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,14 @@ paste = "1.0.14"
sqlparser = { workspace = true }

[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
criterion = "0.5"
rand = { workspace = true }

[[bench]]
name = "count"
harness = false

[[bench]]
name = "sum"
harness = false
98 changes: 98 additions & 0 deletions datafusion/functions-aggregate/benches/count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, BooleanArray};
use arrow::datatypes::Int32Type;
use arrow::util::bench_util::{create_boolean_array, create_primitive_array};
use arrow_schema::{DataType, Field, Schema};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_common::DFSchema;
use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator};
use datafusion_functions_aggregate::count::Count;
use std::sync::Arc;

fn prepare_accumulator() -> Box<dyn GroupsAccumulator> {
let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Int32, true)]));
let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
let accumulator_args = AccumulatorArgs {
data_type: &DataType::Int64,
schema: &schema,
dfschema: &df_schema,
ignore_nulls: false,
sort_exprs: &[],
is_reversed: false,
name: "COUNT(f)",
is_distinct: false,
input_types: &[DataType::Int32],
input_exprs: &[datafusion_expr::col("f")],
};
let count_fn = Count::new();

count_fn
.create_groups_accumulator(accumulator_args)
.unwrap()
}

fn convert_to_state_bench(
c: &mut Criterion,
name: &str,
values: ArrayRef,
opt_filter: Option<&BooleanArray>,
) {
let accumulator = prepare_accumulator();
c.bench_function(name, |b| {
b.iter(|| {
black_box(
accumulator
.convert_to_state(&[values.clone()], opt_filter)
.unwrap(),
)
})
});
}

fn count_benchmark(c: &mut Criterion) {
let values = Arc::new(create_primitive_array::<Int32Type>(8192, 0.0)) as ArrayRef;
convert_to_state_bench(c, "count convert state no nulls, no filter", values, None);

let values = Arc::new(create_primitive_array::<Int32Type>(8192, 0.3)) as ArrayRef;
convert_to_state_bench(c, "count convert state 30% nulls, no filter", values, None);

let values = Arc::new(create_primitive_array::<Int32Type>(8192, 0.3)) as ArrayRef;
convert_to_state_bench(c, "count convert state 70% nulls, no filter", values, None);

let values = Arc::new(create_primitive_array::<Int32Type>(8192, 0.0)) as ArrayRef;
let filter = create_boolean_array(8192, 0.0, 0.5);
convert_to_state_bench(
c,
"count convert state no nulls, filter",
values,
Some(&filter),
);

let values = Arc::new(create_primitive_array::<Int32Type>(8192, 0.3)) as ArrayRef;
let filter = create_boolean_array(8192, 0.0, 0.5);
convert_to_state_bench(
c,
"count convert state nulls, filter",
values,
Some(&filter),
);
}

criterion_group!(benches, count_benchmark);
criterion_main!(benches);
Loading

0 comments on commit c340b6a

Please sign in to comment.