diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index dcf477135a37..f90758c67df2 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -18,15 +18,12 @@ use std::sync::Arc; use crate::fuzz_cases::aggregation_fuzzer::{ - AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig, QueryBuilder, + AggregationFuzzerBuilder, DatasetGeneratorConfig, QueryBuilder, }; use arrow::array::{types::Int64Type, Array, ArrayRef, AsArray, Int64Array, RecordBatch}; use arrow::compute::{concat_batches, SortOptions}; -use arrow::datatypes::{ - DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, - DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, -}; +use arrow::datatypes::DataType; use arrow::util::pretty::pretty_format_batches; use datafusion::common::Result; use datafusion::datasource::memory::MemorySourceConfig; @@ -51,6 +48,8 @@ use test_utils::{add_empty_batches, StringBatchGenerator}; use rand::rngs::StdRng; use rand::{thread_rng, Rng, SeedableRng}; +use super::record_batch_generator::get_supported_types_columns; + // ======================================================================== // The new aggregation fuzz tests based on [`AggregationFuzzer`] // ======================================================================== @@ -201,81 +200,7 @@ async fn test_median() { /// 1. structured types fn baseline_config() -> DatasetGeneratorConfig { let mut rng = thread_rng(); - let columns = vec![ - ColumnDescr::new("i8", DataType::Int8), - ColumnDescr::new("i16", DataType::Int16), - ColumnDescr::new("i32", DataType::Int32), - ColumnDescr::new("i64", DataType::Int64), - ColumnDescr::new("u8", DataType::UInt8), - ColumnDescr::new("u16", DataType::UInt16), - ColumnDescr::new("u32", DataType::UInt32), - ColumnDescr::new("u64", DataType::UInt64), - ColumnDescr::new("date32", DataType::Date32), - ColumnDescr::new("date64", DataType::Date64), - ColumnDescr::new("time32_s", DataType::Time32(TimeUnit::Second)), - ColumnDescr::new("time32_ms", DataType::Time32(TimeUnit::Millisecond)), - ColumnDescr::new("time64_us", DataType::Time64(TimeUnit::Microsecond)), - ColumnDescr::new("time64_ns", DataType::Time64(TimeUnit::Nanosecond)), - // `None` is passed in here however when generating the array, it will generate - // random timezones. - ColumnDescr::new("timestamp_s", DataType::Timestamp(TimeUnit::Second, None)), - ColumnDescr::new( - "timestamp_ms", - DataType::Timestamp(TimeUnit::Millisecond, None), - ), - ColumnDescr::new( - "timestamp_us", - DataType::Timestamp(TimeUnit::Microsecond, None), - ), - ColumnDescr::new( - "timestamp_ns", - DataType::Timestamp(TimeUnit::Nanosecond, None), - ), - ColumnDescr::new("float32", DataType::Float32), - ColumnDescr::new("float64", DataType::Float64), - ColumnDescr::new( - "interval_year_month", - DataType::Interval(IntervalUnit::YearMonth), - ), - ColumnDescr::new( - "interval_day_time", - DataType::Interval(IntervalUnit::DayTime), - ), - ColumnDescr::new( - "interval_month_day_nano", - DataType::Interval(IntervalUnit::MonthDayNano), - ), - // begin decimal columns - ColumnDescr::new("decimal128", { - // Generate valid precision and scale for Decimal128 randomly. - let precision: u8 = rng.gen_range(1..=DECIMAL128_MAX_PRECISION); - // It's safe to cast `precision` to i8 type directly. - let scale: i8 = rng.gen_range( - i8::MIN..=std::cmp::min(precision as i8, DECIMAL128_MAX_SCALE), - ); - DataType::Decimal128(precision, scale) - }), - ColumnDescr::new("decimal256", { - // Generate valid precision and scale for Decimal256 randomly. - let precision: u8 = rng.gen_range(1..=DECIMAL256_MAX_PRECISION); - // It's safe to cast `precision` to i8 type directly. - let scale: i8 = rng.gen_range( - i8::MIN..=std::cmp::min(precision as i8, DECIMAL256_MAX_SCALE), - ); - DataType::Decimal256(precision, scale) - }), - // begin string columns - ColumnDescr::new("utf8", DataType::Utf8), - ColumnDescr::new("largeutf8", DataType::LargeUtf8), - ColumnDescr::new("utf8view", DataType::Utf8View), - // low cardinality columns - ColumnDescr::new("u8_low", DataType::UInt8).with_max_num_distinct(10), - ColumnDescr::new("utf8_low", DataType::Utf8).with_max_num_distinct(10), - ColumnDescr::new("bool", DataType::Boolean), - ColumnDescr::new("binary", DataType::Binary), - ColumnDescr::new("large_binary", DataType::LargeBinary), - ColumnDescr::new("binaryview", DataType::BinaryView), - ]; + let columns = get_supported_types_columns(rng.gen()); let min_num_rows = 512; let max_num_rows = 1024; diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs index d61835a0804e..59f6705e5643 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs @@ -15,34 +15,15 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - -use arrow::array::{ArrayRef, RecordBatch}; -use arrow::datatypes::{ - BinaryType, BinaryViewType, BooleanType, ByteArrayType, ByteViewType, DataType, - Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field, Float32Type, - Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalDayTimeType, - IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, LargeBinaryType, - LargeUtf8Type, Schema, StringViewType, Time32MillisecondType, Time32SecondType, - Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType, - TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, - UInt32Type, UInt64Type, UInt8Type, Utf8Type, -}; -use datafusion_common::{arrow_datafusion_err, DataFusionError, Result}; +use arrow::array::RecordBatch; +use arrow::datatypes::DataType; +use datafusion_common::Result; use datafusion_physical_expr::{expressions::col, PhysicalSortExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::sorts::sort::sort_batch; -use rand::{ - rngs::{StdRng, ThreadRng}, - thread_rng, Rng, SeedableRng, -}; -use test_utils::{ - array_gen::{ - BinaryArrayGenerator, BooleanArrayGenerator, DecimalArrayGenerator, - PrimitiveArrayGenerator, StringArrayGenerator, - }, - stagger_batch, -}; +use test_utils::stagger_batch; + +use crate::fuzz_cases::record_batch_generator::{ColumnDescr, RecordBatchGenerator}; /// Config for Dataset generator /// @@ -154,7 +135,7 @@ impl DatasetGenerator { } } - pub fn generate(&self) -> Result> { + pub fn generate(&mut self) -> Result> { let mut datasets = Vec::with_capacity(self.sort_keys_set.len() + 1); // Generate the base batch (unsorted) @@ -204,553 +185,6 @@ impl Dataset { } } -#[derive(Debug, Clone)] -pub struct ColumnDescr { - /// Column name - name: String, - - /// Data type of this column - column_type: DataType, - - /// The maximum number of distinct values in this column. - /// - /// See [`ColumnDescr::with_max_num_distinct`] for more information - max_num_distinct: Option, -} - -impl ColumnDescr { - #[inline] - pub fn new(name: &str, column_type: DataType) -> Self { - Self { - name: name.to_string(), - column_type, - max_num_distinct: None, - } - } - - pub fn get_max_num_distinct(&self) -> Option { - self.max_num_distinct - } - - /// set the maximum number of distinct values in this column - /// - /// If `None`, the number of distinct values is randomly selected between 1 - /// and the number of rows. - pub fn with_max_num_distinct(mut self, num_distinct: usize) -> Self { - self.max_num_distinct = Some(num_distinct); - self - } -} - -/// Record batch generator -struct RecordBatchGenerator { - min_rows_nun: usize, - - max_rows_num: usize, - - columns: Vec, - - candidate_null_pcts: Vec, -} - -macro_rules! generate_string_array { - ($SELF:ident, $NUM_ROWS:ident, $MAX_NUM_DISTINCT:expr, $BATCH_GEN_RNG:ident, $ARRAY_GEN_RNG:ident, $ARROW_TYPE: ident) => {{ - let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len()); - let null_pct = $SELF.candidate_null_pcts[null_pct_idx]; - let max_len = $BATCH_GEN_RNG.gen_range(1..50); - - let mut generator = StringArrayGenerator { - max_len, - num_strings: $NUM_ROWS, - num_distinct_strings: $MAX_NUM_DISTINCT, - null_pct, - rng: $ARRAY_GEN_RNG, - }; - - match $ARROW_TYPE::DATA_TYPE { - DataType::Utf8 => generator.gen_data::(), - DataType::LargeUtf8 => generator.gen_data::(), - DataType::Utf8View => generator.gen_string_view(), - _ => unreachable!(), - } - }}; -} - -macro_rules! generate_decimal_array { - ($SELF:ident, $NUM_ROWS:ident, $MAX_NUM_DISTINCT: expr, $BATCH_GEN_RNG:ident, $ARRAY_GEN_RNG:ident, $PRECISION: ident, $SCALE: ident, $ARROW_TYPE: ident) => {{ - let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len()); - let null_pct = $SELF.candidate_null_pcts[null_pct_idx]; - - let mut generator = DecimalArrayGenerator { - precision: $PRECISION, - scale: $SCALE, - num_decimals: $NUM_ROWS, - num_distinct_decimals: $MAX_NUM_DISTINCT, - null_pct, - rng: $ARRAY_GEN_RNG, - }; - - generator.gen_data::<$ARROW_TYPE>() - }}; -} - -// Generating `BooleanArray` due to it being a special type in Arrow (bit-packed) -macro_rules! generate_boolean_array { - ($SELF:ident, $NUM_ROWS:ident, $MAX_NUM_DISTINCT:expr, $BATCH_GEN_RNG:ident, $ARRAY_GEN_RNG:ident, $ARROW_TYPE: ident) => {{ - // Select a null percentage from the candidate percentages - let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len()); - let null_pct = $SELF.candidate_null_pcts[null_pct_idx]; - - let num_distinct_booleans = if $MAX_NUM_DISTINCT >= 2 { 2 } else { 1 }; - - let mut generator = BooleanArrayGenerator { - num_booleans: $NUM_ROWS, - num_distinct_booleans, - null_pct, - rng: $ARRAY_GEN_RNG, - }; - - generator.gen_data::<$ARROW_TYPE>() - }}; -} - -macro_rules! generate_primitive_array { - ($SELF:ident, $NUM_ROWS:ident, $MAX_NUM_DISTINCT:expr, $BATCH_GEN_RNG:ident, $ARRAY_GEN_RNG:ident, $ARROW_TYPE:ident) => {{ - let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len()); - let null_pct = $SELF.candidate_null_pcts[null_pct_idx]; - - let mut generator = PrimitiveArrayGenerator { - num_primitives: $NUM_ROWS, - num_distinct_primitives: $MAX_NUM_DISTINCT, - null_pct, - rng: $ARRAY_GEN_RNG, - }; - - generator.gen_data::<$ARROW_TYPE>() - }}; -} - -macro_rules! generate_binary_array { - ( - $SELF:ident, - $NUM_ROWS:ident, - $MAX_NUM_DISTINCT:expr, - $BATCH_GEN_RNG:ident, - $ARRAY_GEN_RNG:ident, - $ARROW_TYPE:ident - ) => {{ - let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len()); - let null_pct = $SELF.candidate_null_pcts[null_pct_idx]; - - let max_len = $BATCH_GEN_RNG.gen_range(1..100); - - let mut generator = BinaryArrayGenerator { - max_len, - num_binaries: $NUM_ROWS, - num_distinct_binaries: $MAX_NUM_DISTINCT, - null_pct, - rng: $ARRAY_GEN_RNG, - }; - - match $ARROW_TYPE::DATA_TYPE { - DataType::Binary => generator.gen_data::(), - DataType::LargeBinary => generator.gen_data::(), - DataType::BinaryView => generator.gen_binary_view(), - _ => unreachable!(), - } - }}; -} - -impl RecordBatchGenerator { - fn new(min_rows_nun: usize, max_rows_num: usize, columns: Vec) -> Self { - let candidate_null_pcts = vec![0.0, 0.01, 0.1, 0.5]; - - Self { - min_rows_nun, - max_rows_num, - columns, - candidate_null_pcts, - } - } - - fn generate(&self) -> Result { - let mut rng = thread_rng(); - let num_rows = rng.gen_range(self.min_rows_nun..=self.max_rows_num); - let array_gen_rng = StdRng::from_seed(rng.gen()); - - // Build arrays - let mut arrays = Vec::with_capacity(self.columns.len()); - for col in self.columns.iter() { - let array = self.generate_array_of_type( - col, - num_rows, - &mut rng, - array_gen_rng.clone(), - ); - arrays.push(array); - } - - // Build schema - let fields = self - .columns - .iter() - .map(|col| Field::new(col.name.clone(), col.column_type.clone(), true)) - .collect::>(); - let schema = Arc::new(Schema::new(fields)); - - RecordBatch::try_new(schema, arrays).map_err(|e| arrow_datafusion_err!(e)) - } - - fn generate_array_of_type( - &self, - col: &ColumnDescr, - num_rows: usize, - batch_gen_rng: &mut ThreadRng, - array_gen_rng: StdRng, - ) -> ArrayRef { - let num_distinct = if num_rows > 1 { - batch_gen_rng.gen_range(1..num_rows) - } else { - num_rows - }; - // cap to at most the num_distinct values - let max_num_distinct = col - .max_num_distinct - .map(|max| num_distinct.min(max)) - .unwrap_or(num_distinct); - - match col.column_type { - DataType::Int8 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Int8Type - ) - } - DataType::Int16 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Int16Type - ) - } - DataType::Int32 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Int32Type - ) - } - DataType::Int64 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Int64Type - ) - } - DataType::UInt8 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - UInt8Type - ) - } - DataType::UInt16 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - UInt16Type - ) - } - DataType::UInt32 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - UInt32Type - ) - } - DataType::UInt64 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - UInt64Type - ) - } - DataType::Float32 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Float32Type - ) - } - DataType::Float64 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Float64Type - ) - } - DataType::Date32 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Date32Type - ) - } - DataType::Date64 => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Date64Type - ) - } - DataType::Time32(TimeUnit::Second) => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Time32SecondType - ) - } - DataType::Time32(TimeUnit::Millisecond) => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Time32MillisecondType - ) - } - DataType::Time64(TimeUnit::Microsecond) => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Time64MicrosecondType - ) - } - DataType::Time64(TimeUnit::Nanosecond) => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Time64NanosecondType - ) - } - DataType::Interval(IntervalUnit::YearMonth) => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - IntervalYearMonthType - ) - } - DataType::Interval(IntervalUnit::DayTime) => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - IntervalDayTimeType - ) - } - DataType::Interval(IntervalUnit::MonthDayNano) => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - IntervalMonthDayNanoType - ) - } - DataType::Timestamp(TimeUnit::Second, None) => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - TimestampSecondType - ) - } - DataType::Timestamp(TimeUnit::Millisecond, None) => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - TimestampMillisecondType - ) - } - DataType::Timestamp(TimeUnit::Microsecond, None) => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - TimestampMicrosecondType - ) - } - DataType::Timestamp(TimeUnit::Nanosecond, None) => { - generate_primitive_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - TimestampNanosecondType - ) - } - DataType::Binary => { - generate_binary_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - BinaryType - ) - } - DataType::LargeBinary => { - generate_binary_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - LargeBinaryType - ) - } - DataType::BinaryView => { - generate_binary_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - BinaryViewType - ) - } - DataType::Decimal128(precision, scale) => { - generate_decimal_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - precision, - scale, - Decimal128Type - ) - } - DataType::Decimal256(precision, scale) => { - generate_decimal_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - precision, - scale, - Decimal256Type - ) - } - DataType::Utf8 => { - generate_string_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - Utf8Type - ) - } - DataType::LargeUtf8 => { - generate_string_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - LargeUtf8Type - ) - } - DataType::Utf8View => { - generate_string_array!( - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - StringViewType - ) - } - DataType::Boolean => { - generate_boolean_array! { - self, - num_rows, - max_num_distinct, - batch_gen_rng, - array_gen_rng, - BooleanType - } - } - _ => { - panic!("Unsupported data generator type: {}", col.column_type) - } - } - } -} - #[cfg(test)] mod test { use arrow::array::UInt32Array; @@ -777,7 +211,7 @@ mod test { sort_keys_set: vec![vec!["b".to_string()]], }; - let gen = DatasetGenerator::new(config); + let mut gen = DatasetGenerator::new(config); let datasets = gen.generate().unwrap(); // Should Generate 2 datasets diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs index bb24fb554d65..88f0269eb368 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs @@ -164,7 +164,7 @@ struct QueryGroup { impl AggregationFuzzer { /// Run the fuzzer, printing an error and panicking if any of the tasks fail - pub async fn run(&self) { + pub async fn run(&mut self) { let res = self.run_inner().await; if let Err(e) = res { @@ -176,7 +176,7 @@ impl AggregationFuzzer { } } - async fn run_inner(&self) -> Result<()> { + async fn run_inner(&mut self) -> Result<()> { let mut join_set = JoinSet::new(); let mut rng = thread_rng(); diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/mod.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/mod.rs index 1e42ac1f4b30..bfb3bb096326 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/mod.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/mod.rs @@ -44,7 +44,8 @@ mod context_generator; mod data_generator; mod fuzzer; -pub use data_generator::{ColumnDescr, DatasetGeneratorConfig}; +pub use crate::fuzz_cases::record_batch_generator::ColumnDescr; +pub use data_generator::DatasetGeneratorConfig; pub use fuzzer::*; #[derive(Debug)] diff --git a/datafusion/core/tests/fuzz_cases/mod.rs b/datafusion/core/tests/fuzz_cases/mod.rs index d5511e2970f4..8ccc2a5bc131 100644 --- a/datafusion/core/tests/fuzz_cases/mod.rs +++ b/datafusion/core/tests/fuzz_cases/mod.rs @@ -20,6 +20,7 @@ mod distinct_count_string_fuzz; mod join_fuzz; mod merge_fuzz; mod sort_fuzz; +mod sort_query_fuzz; mod aggregation_fuzzer; mod equivalence; @@ -29,3 +30,6 @@ mod pruning; mod limit_fuzz; mod sort_preserving_repartition_fuzz; mod window_fuzz; + +// Utility modules +mod record_batch_generator; diff --git a/datafusion/core/tests/fuzz_cases/record_batch_generator.rs b/datafusion/core/tests/fuzz_cases/record_batch_generator.rs new file mode 100644 index 000000000000..9a62a6397d82 --- /dev/null +++ b/datafusion/core/tests/fuzz_cases/record_batch_generator.rs @@ -0,0 +1,644 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::array::{ArrayRef, RecordBatch}; +use arrow::datatypes::{ + BooleanType, DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field, + Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, + IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, + Schema, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, + Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, + UInt8Type, +}; +use arrow_schema::{ + DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, + DECIMAL256_MAX_SCALE, +}; +use datafusion_common::{arrow_datafusion_err, DataFusionError, Result}; +use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng}; +use test_utils::array_gen::{ + BinaryArrayGenerator, BooleanArrayGenerator, DecimalArrayGenerator, + PrimitiveArrayGenerator, StringArrayGenerator, +}; + +/// Columns that are supported by the record batch generator +/// The RNG is used to generate the precision and scale for the decimal columns, thread +/// RNG is not used because this is used in fuzzing and deterministic results are preferred +pub fn get_supported_types_columns(rng_seed: u64) -> Vec { + let mut rng = StdRng::seed_from_u64(rng_seed); + vec![ + ColumnDescr::new("i8", DataType::Int8), + ColumnDescr::new("i16", DataType::Int16), + ColumnDescr::new("i32", DataType::Int32), + ColumnDescr::new("i64", DataType::Int64), + ColumnDescr::new("u8", DataType::UInt8), + ColumnDescr::new("u16", DataType::UInt16), + ColumnDescr::new("u32", DataType::UInt32), + ColumnDescr::new("u64", DataType::UInt64), + ColumnDescr::new("date32", DataType::Date32), + ColumnDescr::new("date64", DataType::Date64), + ColumnDescr::new("time32_s", DataType::Time32(TimeUnit::Second)), + ColumnDescr::new("time32_ms", DataType::Time32(TimeUnit::Millisecond)), + ColumnDescr::new("time64_us", DataType::Time64(TimeUnit::Microsecond)), + ColumnDescr::new("time64_ns", DataType::Time64(TimeUnit::Nanosecond)), + ColumnDescr::new("timestamp_s", DataType::Timestamp(TimeUnit::Second, None)), + ColumnDescr::new( + "timestamp_ms", + DataType::Timestamp(TimeUnit::Millisecond, None), + ), + ColumnDescr::new( + "timestamp_us", + DataType::Timestamp(TimeUnit::Microsecond, None), + ), + ColumnDescr::new( + "timestamp_ns", + DataType::Timestamp(TimeUnit::Nanosecond, None), + ), + ColumnDescr::new("float32", DataType::Float32), + ColumnDescr::new("float64", DataType::Float64), + ColumnDescr::new( + "interval_year_month", + DataType::Interval(IntervalUnit::YearMonth), + ), + ColumnDescr::new( + "interval_day_time", + DataType::Interval(IntervalUnit::DayTime), + ), + ColumnDescr::new( + "interval_month_day_nano", + DataType::Interval(IntervalUnit::MonthDayNano), + ), + ColumnDescr::new("decimal128", { + let precision: u8 = rng.gen_range(1..=DECIMAL128_MAX_PRECISION); + let scale: i8 = rng.gen_range( + i8::MIN..=std::cmp::min(precision as i8, DECIMAL128_MAX_SCALE), + ); + DataType::Decimal128(precision, scale) + }), + ColumnDescr::new("decimal256", { + let precision: u8 = rng.gen_range(1..=DECIMAL256_MAX_PRECISION); + let scale: i8 = rng.gen_range( + i8::MIN..=std::cmp::min(precision as i8, DECIMAL256_MAX_SCALE), + ); + DataType::Decimal256(precision, scale) + }), + ColumnDescr::new("utf8", DataType::Utf8), + ColumnDescr::new("largeutf8", DataType::LargeUtf8), + ColumnDescr::new("utf8view", DataType::Utf8View), + ColumnDescr::new("u8_low", DataType::UInt8).with_max_num_distinct(10), + ColumnDescr::new("utf8_low", DataType::Utf8).with_max_num_distinct(10), + ColumnDescr::new("bool", DataType::Boolean), + ColumnDescr::new("binary", DataType::Binary), + ColumnDescr::new("large_binary", DataType::LargeBinary), + ColumnDescr::new("binaryview", DataType::BinaryView), + ] +} + +#[derive(Debug, Clone)] +pub struct ColumnDescr { + /// Column name + pub name: String, + + /// Data type of this column + pub column_type: DataType, + + /// The maximum number of distinct values in this column. + /// + /// See [`ColumnDescr::with_max_num_distinct`] for more information + max_num_distinct: Option, +} + +impl ColumnDescr { + #[inline] + pub fn new(name: &str, column_type: DataType) -> Self { + Self { + name: name.to_string(), + column_type, + max_num_distinct: None, + } + } + + pub fn get_max_num_distinct(&self) -> Option { + self.max_num_distinct + } + + /// set the maximum number of distinct values in this column + /// + /// If `None`, the number of distinct values is randomly selected between 1 + /// and the number of rows. + pub fn with_max_num_distinct(mut self, num_distinct: usize) -> Self { + self.max_num_distinct = Some(num_distinct); + self + } +} + +/// Record batch generator +pub struct RecordBatchGenerator { + pub min_rows_num: usize, + + pub max_rows_num: usize, + + pub columns: Vec, + + pub candidate_null_pcts: Vec, + + /// If a seed is provided when constructing the generator, it will be used to + /// create `rng` and the pseudo-randomly generated batches will be deterministic. + /// Otherwise, `rng` will be initialized using `thread_rng()` and the batches + /// generated will be different each time. + rng: StdRng, +} + +macro_rules! generate_decimal_array { + ($SELF:ident, $NUM_ROWS:ident, $MAX_NUM_DISTINCT: expr, $BATCH_GEN_RNG:ident, $ARRAY_GEN_RNG:ident, $PRECISION: ident, $SCALE: ident, $ARROW_TYPE: ident) => {{ + let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len()); + let null_pct = $SELF.candidate_null_pcts[null_pct_idx]; + + let mut generator = DecimalArrayGenerator { + precision: $PRECISION, + scale: $SCALE, + num_decimals: $NUM_ROWS, + num_distinct_decimals: $MAX_NUM_DISTINCT, + null_pct, + rng: $ARRAY_GEN_RNG, + }; + + generator.gen_data::<$ARROW_TYPE>() + }}; +} + +// Generating `BooleanArray` due to it being a special type in Arrow (bit-packed) +macro_rules! generate_boolean_array { + ($SELF:ident, $NUM_ROWS:ident, $MAX_NUM_DISTINCT:expr, $BATCH_GEN_RNG:ident, $ARRAY_GEN_RNG:ident, $ARROW_TYPE: ident) => {{ + // Select a null percentage from the candidate percentages + let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len()); + let null_pct = $SELF.candidate_null_pcts[null_pct_idx]; + + let num_distinct_booleans = if $MAX_NUM_DISTINCT >= 2 { 2 } else { 1 }; + + let mut generator = BooleanArrayGenerator { + num_booleans: $NUM_ROWS, + num_distinct_booleans, + null_pct, + rng: $ARRAY_GEN_RNG, + }; + + generator.gen_data::<$ARROW_TYPE>() + }}; +} + +macro_rules! generate_primitive_array { + ($SELF:ident, $NUM_ROWS:ident, $MAX_NUM_DISTINCT:expr, $BATCH_GEN_RNG:ident, $ARRAY_GEN_RNG:ident, $ARROW_TYPE:ident) => {{ + let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len()); + let null_pct = $SELF.candidate_null_pcts[null_pct_idx]; + + let mut generator = PrimitiveArrayGenerator { + num_primitives: $NUM_ROWS, + num_distinct_primitives: $MAX_NUM_DISTINCT, + null_pct, + rng: $ARRAY_GEN_RNG, + }; + + generator.gen_data::<$ARROW_TYPE>() + }}; +} + +impl RecordBatchGenerator { + /// Create a new `RecordBatchGenerator` with a random seed. The generated + /// batches will be different each time. + pub fn new( + min_rows_nun: usize, + max_rows_num: usize, + columns: Vec, + ) -> Self { + let candidate_null_pcts = vec![0.0, 0.01, 0.1, 0.5]; + + Self { + min_rows_num: min_rows_nun, + max_rows_num, + columns, + candidate_null_pcts, + rng: StdRng::from_rng(thread_rng()).unwrap(), + } + } + + /// Set a seed for the generator. The pseudo-randomly generated batches will be + /// deterministic for the same seed. + pub fn with_seed(mut self, seed: u64) -> Self { + self.rng = StdRng::seed_from_u64(seed); + self + } + + pub fn generate(&mut self) -> Result { + let num_rows = self.rng.gen_range(self.min_rows_num..=self.max_rows_num); + let array_gen_rng = StdRng::from_seed(self.rng.gen()); + let mut batch_gen_rng = StdRng::from_seed(self.rng.gen()); + let columns = self.columns.clone(); + + // Build arrays + let mut arrays = Vec::with_capacity(columns.len()); + for col in columns.iter() { + let array = self.generate_array_of_type( + col, + num_rows, + &mut batch_gen_rng, + array_gen_rng.clone(), + ); + arrays.push(array); + } + + // Build schema + let fields = self + .columns + .iter() + .map(|col| Field::new(col.name.clone(), col.column_type.clone(), true)) + .collect::>(); + let schema = Arc::new(Schema::new(fields)); + + RecordBatch::try_new(schema, arrays).map_err(|e| arrow_datafusion_err!(e)) + } + + fn generate_array_of_type( + &mut self, + col: &ColumnDescr, + num_rows: usize, + batch_gen_rng: &mut StdRng, + array_gen_rng: StdRng, + ) -> ArrayRef { + let num_distinct = if num_rows > 1 { + batch_gen_rng.gen_range(1..num_rows) + } else { + num_rows + }; + // cap to at most the num_distinct values + let max_num_distinct = col + .max_num_distinct + .map(|max| num_distinct.min(max)) + .unwrap_or(num_distinct); + + match col.column_type { + DataType::Int8 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Int8Type + ) + } + DataType::Int16 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Int16Type + ) + } + DataType::Int32 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Int32Type + ) + } + DataType::Int64 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Int64Type + ) + } + DataType::UInt8 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + UInt8Type + ) + } + DataType::UInt16 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + UInt16Type + ) + } + DataType::UInt32 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + UInt32Type + ) + } + DataType::UInt64 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + UInt64Type + ) + } + DataType::Float32 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Float32Type + ) + } + DataType::Float64 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Float64Type + ) + } + DataType::Date32 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Date32Type + ) + } + DataType::Date64 => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Date64Type + ) + } + DataType::Time32(TimeUnit::Second) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Time32SecondType + ) + } + DataType::Time32(TimeUnit::Millisecond) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Time32MillisecondType + ) + } + DataType::Time64(TimeUnit::Microsecond) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Time64MicrosecondType + ) + } + DataType::Time64(TimeUnit::Nanosecond) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + Time64NanosecondType + ) + } + DataType::Interval(IntervalUnit::YearMonth) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + IntervalYearMonthType + ) + } + DataType::Interval(IntervalUnit::DayTime) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + IntervalDayTimeType + ) + } + DataType::Interval(IntervalUnit::MonthDayNano) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + IntervalMonthDayNanoType + ) + } + DataType::Timestamp(TimeUnit::Second, None) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + TimestampSecondType + ) + } + DataType::Timestamp(TimeUnit::Millisecond, None) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + TimestampMillisecondType + ) + } + DataType::Timestamp(TimeUnit::Microsecond, None) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + TimestampMicrosecondType + ) + } + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + TimestampNanosecondType + ) + } + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { + let null_pct_idx = + batch_gen_rng.gen_range(0..self.candidate_null_pcts.len()); + let null_pct = self.candidate_null_pcts[null_pct_idx]; + let max_len = batch_gen_rng.gen_range(1..50); + + let mut generator = StringArrayGenerator { + max_len, + num_strings: num_rows, + num_distinct_strings: max_num_distinct, + null_pct, + rng: array_gen_rng, + }; + + match col.column_type { + DataType::Utf8 => generator.gen_data::(), + DataType::LargeUtf8 => generator.gen_data::(), + DataType::Utf8View => generator.gen_string_view(), + _ => unreachable!(), + } + } + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => { + let null_pct_idx = + batch_gen_rng.gen_range(0..self.candidate_null_pcts.len()); + let null_pct = self.candidate_null_pcts[null_pct_idx]; + let max_len = batch_gen_rng.gen_range(1..100); + + let mut generator = BinaryArrayGenerator { + max_len, + num_binaries: num_rows, + num_distinct_binaries: max_num_distinct, + null_pct, + rng: array_gen_rng, + }; + + match col.column_type { + DataType::Binary => generator.gen_data::(), + DataType::LargeBinary => generator.gen_data::(), + DataType::BinaryView => generator.gen_binary_view(), + _ => unreachable!(), + } + } + DataType::Decimal128(precision, scale) => { + generate_decimal_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + precision, + scale, + Decimal128Type + ) + } + DataType::Decimal256(precision, scale) => { + generate_decimal_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + precision, + scale, + Decimal256Type + ) + } + DataType::Boolean => { + generate_boolean_array! { + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + BooleanType + } + } + _ => { + panic!("Unsupported data generator type: {}", col.column_type) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_generator_with_fixed_seed_deterministic() { + let mut gen1 = RecordBatchGenerator::new( + 16, + 32, + vec![ + ColumnDescr::new("a", DataType::Utf8), + ColumnDescr::new("b", DataType::UInt32), + ], + ) + .with_seed(310104); + + let mut gen2 = RecordBatchGenerator::new( + 16, + 32, + vec![ + ColumnDescr::new("a", DataType::Utf8), + ColumnDescr::new("b", DataType::UInt32), + ], + ) + .with_seed(310104); + + let batch1 = gen1.generate().unwrap(); + let batch2 = gen2.generate().unwrap(); + + let batch1_formatted = format!("{:?}", batch1); + let batch2_formatted = format!("{:?}", batch2); + + assert_eq!(batch1_formatted, batch2_formatted); + } +} diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs new file mode 100644 index 000000000000..1319d4817326 --- /dev/null +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -0,0 +1,625 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Fuzz Test for various corner cases sorting RecordBatches exceeds available memory and should spill + +use std::cmp::min; +use std::sync::Arc; + +use arrow::array::RecordBatch; +use arrow_schema::SchemaRef; +use datafusion::datasource::MemTable; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::{instant::Instant, Result}; +use datafusion_execution::memory_pool::{ + human_readable_size, MemoryPool, UnboundedMemoryPool, +}; +use datafusion_expr::display_schema; +use datafusion_physical_plan::spill::get_record_batch_memory_size; +use rand::seq::SliceRandom; +use std::time::Duration; + +use datafusion_execution::{ + disk_manager::DiskManagerConfig, memory_pool::FairSpillPool, + runtime_env::RuntimeEnvBuilder, +}; +use rand::Rng; +use rand::{rngs::StdRng, SeedableRng}; + +use crate::fuzz_cases::aggregation_fuzzer::check_equality_of_batches; + +use super::aggregation_fuzzer::ColumnDescr; +use super::record_batch_generator::{get_supported_types_columns, RecordBatchGenerator}; + +/// Entry point for executing the sort query fuzzer. +/// +/// Now memory limiting is disabled by default. See TODOs in `SortQueryFuzzer`. +#[tokio::test(flavor = "multi_thread")] +async fn sort_query_fuzzer_runner() { + let random_seed = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let test_generator = SortFuzzerTestGenerator::new( + 2000, + 3, + "sort_fuzz_table".to_string(), + get_supported_types_columns(random_seed), + false, + random_seed, + ); + let mut fuzzer = SortQueryFuzzer::new(random_seed) + // Configs for how many random query to test + .with_max_rounds(Some(5)) + .with_queries_per_round(4) + .with_config_variations_per_query(5) + // Will stop early if the time limit is reached + .with_time_limit(Duration::from_secs(5)) + .with_test_generator(test_generator); + + fuzzer.run().await.unwrap(); +} + +/// SortQueryFuzzer holds the runner configuration for executing sort query fuzz tests. The fuzzing details are managed inside `SortFuzzerTestGenerator`. +/// +/// It defines: +/// - `max_rounds`: Maximum number of rounds to run (or None to run until `time_limit`). +/// - `queries_per_round`: Number of different queries to run in each round. +/// - `config_variations_per_query`: Number of different configurations to test per query. +/// - `time_limit`: Time limit for the entire fuzzer execution. +/// +/// TODO: The following improvements are blocked on https://github.com/apache/datafusion/issues/14748: +/// 1. Support generating queries with arbitrary number of ORDER BY clauses +/// Currently limited to be smaller than number of projected columns +/// 2. Enable special type columns like utf8_low to be used in ORDER BY clauses +/// 3. Enable memory limiting functionality in the fuzzer runner +pub struct SortQueryFuzzer { + test_gen: SortFuzzerTestGenerator, + /// Random number generator for the runner, used to generate seeds for inner components. + /// Seeds for each choice (query, config, etc.) are printed out for reproducibility. + runner_rng: StdRng, + + // ======================================================================== + // Runner configurations + // ======================================================================== + /// For each round, a new dataset is generated. If `None`, keep running until + /// the time limit is reached + max_rounds: Option, + /// How many different queries to run in each round + queries_per_round: usize, + /// For each query, how many different configurations to try and make sure their + /// results are consistent + config_variations_per_query: usize, + /// The time limit for the entire sort query fuzzer execution. + time_limit: Option, +} + +impl SortQueryFuzzer { + pub fn new(seed: u64) -> Self { + let max_rounds = Some(2); + let queries_per_round = 3; + let config_variations_per_query = 5; + let time_limit = None; + + // Filtered out one column due to a known bug https://github.com/apache/datafusion/issues/14748 + // TODO: Remove this once the bug is fixed + let candidate_columns = get_supported_types_columns(seed) + .into_iter() + .filter(|col| { + col.name != "utf8_low" + && col.name != "utf8view" + && col.name != "binaryview" + }) + .collect::>(); + + let test_gen = SortFuzzerTestGenerator::new( + 10000, + 4, + "sort_fuzz_table".to_string(), + candidate_columns, + false, + seed, + ); + + Self { + max_rounds, + queries_per_round, + config_variations_per_query, + time_limit, + test_gen, + runner_rng: StdRng::seed_from_u64(seed), + } + } + + pub fn with_test_generator(mut self, test_gen: SortFuzzerTestGenerator) -> Self { + self.test_gen = test_gen; + self + } + + pub fn with_max_rounds(mut self, max_rounds: Option) -> Self { + self.max_rounds = max_rounds; + self + } + + pub fn with_queries_per_round(mut self, queries_per_round: usize) -> Self { + self.queries_per_round = queries_per_round; + self + } + + pub fn with_config_variations_per_query( + mut self, + config_variations_per_query: usize, + ) -> Self { + self.config_variations_per_query = config_variations_per_query; + self + } + + pub fn with_time_limit(mut self, time_limit: Duration) -> Self { + self.time_limit = Some(time_limit); + self + } + + fn should_stop_due_to_time_limit( + &self, + start_time: Instant, + n_round: usize, + n_query: usize, + ) -> bool { + if let Some(time_limit) = self.time_limit { + if Instant::now().duration_since(start_time) > time_limit { + println!( + "[SortQueryFuzzer] Time limit reached: {} queries ({} random configs each) in {} rounds", + n_round * self.queries_per_round + n_query, + self.config_variations_per_query, + n_round + ); + return true; + } + } + false + } + + pub async fn run(&mut self) -> Result<()> { + let start_time = Instant::now(); + + // Execute until either`max_rounds` or `time_limit` is reached + let max_rounds = self.max_rounds.unwrap_or(usize::MAX); + for round in 0..max_rounds { + let init_seed = self.runner_rng.gen(); + for query_i in 0..self.queries_per_round { + let query_seed = self.runner_rng.gen(); + let mut expected_results: Option> = None; // use first config's result as the expected result + for config_i in 0..self.config_variations_per_query { + if self.should_stop_due_to_time_limit(start_time, round, query_i) { + return Ok(()); + } + + let config_seed = self.runner_rng.gen(); + + println!( + "[SortQueryFuzzer] Round {}, Query {} (Config {})", + round, query_i, config_i + ); + println!(" Seeds:"); + println!(" init_seed = {}", init_seed); + println!(" query_seed = {}", query_seed); + println!(" config_seed = {}", config_seed); + + let results = self + .test_gen + .fuzzer_run(init_seed, query_seed, config_seed) + .await?; + println!("\n"); // Seperator between tested runs + + if expected_results.is_none() { + expected_results = Some(results); + } else if let Some(ref expected) = expected_results { + // `fuzzer_run` might append `LIMIT k` to either the + // expected or actual query. The number of results is + // checked inside `fuzzer_run()`. Here we only check + // that the first k rows of each result are consistent. + check_equality_of_batches(expected, &results).unwrap(); + } else { + unreachable!(); + } + } + } + } + Ok(()) + } +} + +/// Struct to generate and manage a random dataset for fuzz testing. +/// It is able to re-run the failed test cases by setting the same seed printed out. +/// See the unit tests for examples. +/// +/// To use this struct: +/// 1. Call `init_partitioned_staggered_batches` to generate a random dataset. +/// 2. Use `generate_random_query` to create a random SQL query. +/// 3. Use `generate_random_config` to create a random configuration. +/// 4. Run the fuzzer check with the generated query and configuration. +pub struct SortFuzzerTestGenerator { + /// The total number of rows for the registered table + num_rows: usize, + /// Max number of partitions for the registered table + max_partitions: usize, + /// The name of the registered table + table_name: String, + /// The selected columns from all available candidate columns to be used for + /// this dataset + selected_columns: Vec, + /// If true, will randomly generate a memory limit for the query. Otherwise + /// the query will run under the context with unlimited memory. + set_memory_limit: bool, + + /// States related to the randomly generated dataset. `None` if not initialized + /// by calling `init_partitioned_staggered_batches()` + dataset_state: Option, +} + +/// Struct to hold states related to the randomly generated dataset +pub struct DatasetState { + /// Dataset to construct the partitioned memory table. Outer vector is the + /// partitions, inner vector is staggered batches within the same partition. + partitioned_staggered_batches: Vec>, + /// Number of rows in the whole dataset + dataset_size: usize, + /// The approximate number of rows of a batch (staggered batches will be generated + /// with random number of rows between 1 and `approx_batch_size`) + approx_batch_num_rows: usize, + /// The schema of the dataset + schema: SchemaRef, + /// The memory size of the whole dataset + mem_size: usize, +} + +impl SortFuzzerTestGenerator { + /// Randomly pick a subset of `candidate_columns` to be used for this dataset + pub fn new( + num_rows: usize, + max_partitions: usize, + table_name: String, + candidate_columns: Vec, + set_memory_limit: bool, + rng_seed: u64, + ) -> Self { + let mut rng = StdRng::seed_from_u64(rng_seed); + let min_ncol = min(candidate_columns.len(), 5); + let max_ncol = min(candidate_columns.len(), 10); + let amount = rng.gen_range(min_ncol..=max_ncol); + let selected_columns = candidate_columns + .choose_multiple(&mut rng, amount) + .cloned() + .collect(); + + Self { + num_rows, + max_partitions, + table_name, + selected_columns, + set_memory_limit, + dataset_state: None, + } + } + + /// The outer vector is the partitions, the inner vector is the chunked batches + /// within each partition. + /// The partition number is determined by `self.max_partitions`. + /// The chunked batch length is a random number between 1 and `self.num_rows` / + /// 100 (make sure a single batch won't exceed memory budget for external sort + /// executions) + /// + /// Hack: If we want the query to run under certain degree of parallelism, the + /// memory table should be generated with more partitions, due to https://github.com/apache/datafusion/issues/15088 + fn init_partitioned_staggered_batches(&mut self, rng_seed: u64) { + let mut rng = StdRng::seed_from_u64(rng_seed); + let num_partitions = rng.gen_range(1..=self.max_partitions); + + let max_batch_size = self.num_rows / num_partitions / 50; + let target_partition_size = self.num_rows / num_partitions; + + let mut partitions = Vec::new(); + let mut schema = None; + for _ in 0..num_partitions { + let mut partition = Vec::new(); + let mut num_rows = 0; + + // For each partition, generate random batches until there is about enough + // rows for the specified total number of rows + while num_rows < target_partition_size { + // Generate a random batch of size between 1 and max_batch_size + + // Let edge case (1-row batch) more common + let (min_nrow, max_nrow) = if rng.gen_bool(0.1) { + (1, 3) + } else { + (1, max_batch_size) + }; + + let mut record_batch_generator = RecordBatchGenerator::new( + min_nrow, + max_nrow, + self.selected_columns.clone(), + ) + .with_seed(rng.gen()); + + let record_batch = record_batch_generator.generate().unwrap(); + num_rows += record_batch.num_rows(); + + if schema.is_none() { + schema = Some(record_batch.schema()); + println!(" Dataset schema:"); + println!(" {}", display_schema(schema.as_ref().unwrap())); + } + + partition.push(record_batch); + } + + partitions.push(partition); + } + + // After all partitions are created, optionally make one partition have 0/1 batch + if num_partitions > 2 && rng.gen_bool(0.1) { + let partition_index = rng.gen_range(0..num_partitions); + if rng.gen_bool(0.5) { + // 0 batch + partitions[partition_index] = Vec::new(); + } else { + // 1 batch, keep the old first batch + let first_batch = partitions[partition_index].first().cloned(); + if let Some(batch) = first_batch { + partitions[partition_index] = vec![batch]; + } + } + } + + // Init self fields + let mem_size: usize = partitions + .iter() + .map(|partition| { + partition + .iter() + .map(get_record_batch_memory_size) + .sum::() + }) + .sum(); + + let dataset_size = partitions + .iter() + .map(|partition| { + partition + .iter() + .map(|batch| batch.num_rows()) + .sum::() + }) + .sum::(); + + let approx_batch_num_rows = max_batch_size; + + self.dataset_state = Some(DatasetState { + partitioned_staggered_batches: partitions, + dataset_size, + approx_batch_num_rows, + schema: schema.unwrap(), + mem_size, + }); + } + + /// Generates a random SQL query string and an optional limit value. + /// Returns a tuple containing the query string and an optional limit. + pub fn generate_random_query(&self, rng_seed: u64) -> (String, Option) { + let mut rng = StdRng::seed_from_u64(rng_seed); + + let num_columns = rng.gen_range(1..=3).min(self.selected_columns.len()); + let selected_columns: Vec<_> = self + .selected_columns + .choose_multiple(&mut rng, num_columns) + .collect(); + + let mut order_by_clauses = Vec::new(); + for col in selected_columns { + let mut clause = col.name.clone(); + if rng.gen_bool(0.5) { + let order = if rng.gen_bool(0.5) { "ASC" } else { "DESC" }; + clause.push_str(&format!(" {}", order)); + } + if rng.gen_bool(0.5) { + let nulls = if rng.gen_bool(0.5) { + "NULLS FIRST" + } else { + "NULLS LAST" + }; + clause.push_str(&format!(" {}", nulls)); + } + order_by_clauses.push(clause); + } + + let dataset_size = self.dataset_state.as_ref().unwrap().dataset_size; + + let limit = if rng.gen_bool(0.2) { + // Prefer edge cases for k like 1, dataset_size, etc. + Some(if rng.gen_bool(0.5) { + let edge_cases = + [1, 2, 3, dataset_size - 1, dataset_size, dataset_size + 1]; + *edge_cases.choose(&mut rng).unwrap() + } else { + rng.gen_range(1..=dataset_size) + }) + } else { + None + }; + + let limit_clause = limit.map_or(String::new(), |l| format!(" LIMIT {}", l)); + + let query = format!( + "SELECT * FROM {} ORDER BY {}{}", + self.table_name, + order_by_clauses.join(", "), + limit_clause + ); + + (query, limit) + } + + pub fn generate_random_config( + &self, + rng_seed: u64, + with_memory_limit: bool, + ) -> Result { + let mut rng = StdRng::seed_from_u64(rng_seed); + let init_state = self.dataset_state.as_ref().unwrap(); + let dataset_size = init_state.mem_size; + let num_partitions = init_state.partitioned_staggered_batches.len(); + + // 30% to 200% of the dataset size (if `with_memory_limit` is false, config + // will use the default unbounded pool to override it later) + let memory_limit = rng.gen_range( + (dataset_size as f64 * 0.5) as usize..=(dataset_size as f64 * 2.0) as usize, + ); + // 10% to 20% of the per-partition memory limit size + let per_partition_mem_limit = memory_limit / num_partitions; + let sort_spill_reservation_bytes = rng.gen_range( + (per_partition_mem_limit as f64 * 0.2) as usize + ..=(per_partition_mem_limit as f64 * 0.3) as usize, + ); + + // 1 to 3 times of the approx batch size. Setting this to a very large nvalue + // will cause external sort to fail. + let sort_in_place_threshold_bytes = if with_memory_limit { + // For memory-limited query, setting `sort_in_place_threshold_bytes` too + // large will cause failure. + 0 + } else { + let dataset_size = self.dataset_state.as_ref().unwrap().dataset_size; + rng.gen_range(0..=dataset_size * 2_usize) + }; + + // Set up strings for printing + let memory_limit_str = if with_memory_limit { + human_readable_size(memory_limit) + } else { + "Unbounded".to_string() + }; + let per_partition_limit_str = if with_memory_limit { + human_readable_size(per_partition_mem_limit) + } else { + "Unbounded".to_string() + }; + + println!(" Config: "); + println!(" Dataset size: {}", human_readable_size(dataset_size)); + println!(" Number of partitions: {}", num_partitions); + println!(" Batch size: {}", init_state.approx_batch_num_rows / 2); + println!(" Memory limit: {}", memory_limit_str); + println!( + " Per partition memory limit: {}", + per_partition_limit_str + ); + println!( + " Sort spill reservation bytes: {}", + human_readable_size(sort_spill_reservation_bytes) + ); + println!( + " Sort in place threshold bytes: {}", + human_readable_size(sort_in_place_threshold_bytes) + ); + + let config = SessionConfig::new() + .with_target_partitions(num_partitions) + .with_batch_size(init_state.approx_batch_num_rows / 2) + .with_sort_spill_reservation_bytes(sort_spill_reservation_bytes) + .with_sort_in_place_threshold_bytes(sort_in_place_threshold_bytes); + + let memory_pool: Arc = if with_memory_limit { + Arc::new(FairSpillPool::new(memory_limit)) + } else { + Arc::new(UnboundedMemoryPool::default()) + }; + + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .with_disk_manager(DiskManagerConfig::NewOs) + .build_arc()?; + + let ctx = SessionContext::new_with_config_rt(config, runtime); + + let dataset = &init_state.partitioned_staggered_batches; + let schema = &init_state.schema; + + let provider = MemTable::try_new(schema.clone(), dataset.clone())?; + ctx.register_table("sort_fuzz_table", Arc::new(provider))?; + + Ok(ctx) + } + + async fn fuzzer_run( + &mut self, + dataset_seed: u64, + query_seed: u64, + config_seed: u64, + ) -> Result> { + self.init_partitioned_staggered_batches(dataset_seed); + let (query_str, limit) = self.generate_random_query(query_seed); + println!(" Query:"); + println!(" {}", query_str); + + // ==== Execute the query ==== + + // Only enable memory limits if: + // 1. Query does not contain LIMIT (since topK does not support external execution) + // 2. Memory limiting is enabled in the test generator config + let with_mem_limit = !query_str.contains("LIMIT") && self.set_memory_limit; + + let ctx = self.generate_random_config(config_seed, with_mem_limit)?; + let df = ctx.sql(&query_str).await.unwrap(); + let results = df.collect().await.unwrap(); + + // ==== Check the result size is consistent with the limit ==== + let result_num_rows = results.iter().map(|batch| batch.num_rows()).sum::(); + let dataset_size = self.dataset_state.as_ref().unwrap().dataset_size; + + if let Some(limit) = limit { + let expected_num_rows = min(limit, dataset_size); + assert_eq!(result_num_rows, expected_num_rows); + } + + Ok(results) + } +} + +#[cfg(test)] +mod test { + use super::*; + + /// Given the same seed, the result should be the same + #[tokio::test] + async fn test_sort_query_fuzzer_deterministic() { + let gen_seed = 310104; + let mut test_generator = SortFuzzerTestGenerator::new( + 2000, + 3, + "sort_fuzz_table".to_string(), + get_supported_types_columns(gen_seed), + false, + gen_seed, + ); + + let res1 = test_generator.fuzzer_run(1, 2, 3).await.unwrap(); + let res2 = test_generator.fuzzer_run(1, 2, 3).await.unwrap(); + check_equality_of_batches(&res1, &res2).unwrap(); + } +}