Skip to content

Commit 923bfb7

Browse files
authored
Improve performance of first_value by implementing special GroupsAccumulator (#15266)
* Improve speed of first_value by implementing special GroupsAccumulator * rename and other improvements * `append_n` -> `resize` * address comment * use HashMap::entry * remove hashMap in get_filtered_min_of_each_group
1 parent e2e7354 commit 923bfb7

File tree

6 files changed

+897
-12
lines changed

6 files changed

+897
-12
lines changed

datafusion/common/src/utils/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,23 @@ pub fn project_schema(
8282
Ok(schema)
8383
}
8484

85+
/// Extracts a row at the specified index from a set of columns and stores it in the provided buffer.
86+
pub fn extract_row_at_idx_to_buf(
87+
columns: &[ArrayRef],
88+
idx: usize,
89+
buf: &mut Vec<ScalarValue>,
90+
) -> Result<()> {
91+
buf.clear();
92+
93+
let iter = columns
94+
.iter()
95+
.map(|arr| ScalarValue::try_from_array(arr, idx));
96+
for v in iter.into_iter() {
97+
buf.push(v?);
98+
}
99+
100+
Ok(())
101+
}
85102
/// Given column vectors, returns row at `idx`.
86103
pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> {
87104
columns

datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::str;
1918
use std::sync::Arc;
2019

2120
use crate::fuzz_cases::aggregation_fuzzer::{
@@ -88,6 +87,32 @@ async fn test_min() {
8887
.await;
8988
}
9089

90+
#[tokio::test(flavor = "multi_thread")]
91+
async fn test_first_val() {
92+
let mut data_gen_config: DatasetGeneratorConfig = baseline_config();
93+
94+
for i in 0..data_gen_config.columns.len() {
95+
if data_gen_config.columns[i].get_max_num_distinct().is_none() {
96+
data_gen_config.columns[i] = data_gen_config.columns[i]
97+
.clone()
98+
// Minimize the chance of identical values in the order by columns to make the test more stable
99+
.with_max_num_distinct(usize::MAX);
100+
}
101+
}
102+
103+
let query_builder = QueryBuilder::new()
104+
.with_table_name("fuzz_table")
105+
.with_aggregate_function("first_value")
106+
.with_aggregate_arguments(data_gen_config.all_columns())
107+
.set_group_by_columns(data_gen_config.all_columns());
108+
109+
AggregationFuzzerBuilder::from(data_gen_config)
110+
.add_query_builder(query_builder)
111+
.build()
112+
.run()
113+
.await;
114+
}
115+
91116
#[tokio::test(flavor = "multi_thread")]
92117
async fn test_max() {
93118
let data_gen_config = baseline_config();

datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,10 @@ impl ColumnDescr {
228228
}
229229
}
230230

231+
pub fn get_max_num_distinct(&self) -> Option<usize> {
232+
self.max_num_distinct
233+
}
234+
231235
/// set the maximum number of distinct values in this column
232236
///
233237
/// If `None`, the number of distinct values is randomly selected between 1

datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::collections::HashSet;
1918
use std::sync::Arc;
19+
use std::{collections::HashSet, str::FromStr};
2020

2121
use arrow::array::RecordBatch;
2222
use arrow::util::pretty::pretty_format_batches;
2323
use datafusion_common::{DataFusionError, Result};
2424
use datafusion_common_runtime::JoinSet;
25+
use rand::seq::SliceRandom;
2526
use rand::{thread_rng, Rng};
2627

2728
use crate::fuzz_cases::aggregation_fuzzer::{
@@ -452,7 +453,11 @@ impl QueryBuilder {
452453
pub fn generate_query(&self) -> String {
453454
let group_by = self.random_group_by();
454455
let mut query = String::from("SELECT ");
455-
query.push_str(&self.random_aggregate_functions().join(", "));
456+
query.push_str(&group_by.join(", "));
457+
if !group_by.is_empty() {
458+
query.push_str(", ");
459+
}
460+
query.push_str(&self.random_aggregate_functions(&group_by).join(", "));
456461
query.push_str(" FROM ");
457462
query.push_str(&self.table_name);
458463
if !group_by.is_empty() {
@@ -474,22 +479,42 @@ impl QueryBuilder {
474479
/// * `function_names` are randomly selected from [`Self::aggregate_functions`]
475480
/// * `<DISTINCT> argument` is randomly selected from [`Self::arguments`]
476481
/// * `alias` is a unique alias `colN` for the column (to avoid duplicate column names)
477-
fn random_aggregate_functions(&self) -> Vec<String> {
482+
fn random_aggregate_functions(&self, group_by_cols: &[String]) -> Vec<String> {
478483
const MAX_NUM_FUNCTIONS: usize = 5;
479484
let mut rng = thread_rng();
480485
let num_aggregate_functions = rng.gen_range(1..MAX_NUM_FUNCTIONS);
481486

482487
let mut alias_gen = 1;
483488

484489
let mut aggregate_functions = vec![];
490+
491+
let mut order_by_black_list: HashSet<String> =
492+
group_by_cols.iter().cloned().collect();
493+
// remove one random col
494+
if let Some(first) = order_by_black_list.iter().next().cloned() {
495+
order_by_black_list.remove(&first);
496+
}
497+
485498
while aggregate_functions.len() < num_aggregate_functions {
486499
let idx = rng.gen_range(0..self.aggregate_functions.len());
487500
let (function_name, is_distinct) = &self.aggregate_functions[idx];
488501
let argument = self.random_argument();
489502
let alias = format!("col{}", alias_gen);
490503
let distinct = if *is_distinct { "DISTINCT " } else { "" };
491504
alias_gen += 1;
492-
let function = format!("{function_name}({distinct}{argument}) as {alias}");
505+
506+
let (order_by, null_opt) = if function_name.eq("first_value") {
507+
(
508+
self.order_by(&order_by_black_list), /* Among the order by columns, at most one group by column can be included to avoid all order by column values being identical */
509+
self.null_opt(),
510+
)
511+
} else {
512+
("".to_string(), "".to_string())
513+
};
514+
515+
let function = format!(
516+
"{function_name}({distinct}{argument}{order_by}) {null_opt} as {alias}"
517+
);
493518
aggregate_functions.push(function);
494519
}
495520
aggregate_functions
@@ -502,6 +527,39 @@ impl QueryBuilder {
502527
self.arguments[idx].clone()
503528
}
504529

530+
fn order_by(&self, black_list: &HashSet<String>) -> String {
531+
let mut available_columns: Vec<String> = self
532+
.arguments
533+
.iter()
534+
.filter(|col| !black_list.contains(*col))
535+
.cloned()
536+
.collect();
537+
538+
available_columns.shuffle(&mut thread_rng());
539+
540+
let num_of_order_by_col = 12;
541+
let column_count = std::cmp::min(num_of_order_by_col, available_columns.len());
542+
543+
let selected_columns = &available_columns[0..column_count];
544+
545+
let mut rng = thread_rng();
546+
let mut result = String::from_str(" order by ").unwrap();
547+
for col in selected_columns {
548+
let order = if rng.gen_bool(0.5) { "ASC" } else { "DESC" };
549+
result.push_str(&format!("{} {},", col, order));
550+
}
551+
552+
result.strip_suffix(",").unwrap().to_string()
553+
}
554+
555+
fn null_opt(&self) -> String {
556+
if thread_rng().gen_bool(0.5) {
557+
"RESPECT NULLS".to_string()
558+
} else {
559+
"IGNORE NULLS".to_string()
560+
}
561+
}
562+
505563
/// Pick a random number of fields to group by (non-repeating)
506564
///
507565
/// Limited to 3 group by columns to ensure coverage for large groups. With

0 commit comments

Comments
 (0)