diff --git a/Cargo.lock b/Cargo.lock index c708c516ab36..c1e5208a51a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2020,6 +2020,7 @@ dependencies = [ "bytes", "bzip2 0.5.2", "chrono", + "criterion", "datafusion-common", "datafusion-common-runtime", "datafusion-execution", diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 6049614f37e8..2971bdc1215d 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -899,10 +899,11 @@ impl TableProvider for ListingTable { .split_file_groups_by_statistics .then(|| { output_ordering.first().map(|output_ordering| { - FileScanConfig::split_groups_by_statistics( + FileScanConfig::split_groups_by_statistics_with_target_partitions( &self.table_schema, &partitioned_file_lists, output_ordering, + self.options.target_partitions, ) }) }) diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 2132272b5768..84e5e76e30a3 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -72,6 +72,7 @@ xz2 = { version = "0.1", optional = true, features = ["static"] } zstd = { version = "0.13", optional = true, default-features = false } [dev-dependencies] +criterion = { workspace = true } tempfile = { workspace = true } [lints] @@ -80,3 +81,7 @@ workspace = true [lib] name = "datafusion_datasource" path = "src/mod.rs" + +[[bench]] +name = "split_groups_by_statistics" +harness = false diff --git a/datafusion/datasource/benches/split_groups_by_statistics.rs b/datafusion/datasource/benches/split_groups_by_statistics.rs new file mode 100644 index 000000000000..f7c5e1b44ae0 --- /dev/null +++ b/datafusion/datasource/benches/split_groups_by_statistics.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::{generate_test_files, verify_sort_integrity}; +use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use std::sync::Arc; +use std::time::Duration; + +pub fn compare_split_groups_by_statistics_algorithms(c: &mut Criterion) { + let file_schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Float64, + false, + )])); + + let sort_expr = PhysicalSortExpr { + expr: Arc::new(datafusion_physical_expr::expressions::Column::new( + "value", 0, + )), + options: arrow::compute::SortOptions::default(), + }; + let sort_ordering = LexOrdering::from(vec![sort_expr]); + + // Small, medium, large number of files + let file_counts = [10, 100, 1000]; + let overlap_factors = [0.0, 0.2, 0.5, 0.8]; // No, low, medium, high overlap + + let target_partitions: [usize; 4] = [4, 8, 16, 32]; + + let mut group = c.benchmark_group("split_groups"); + group.measurement_time(Duration::from_secs(10)); + + for &num_files in &file_counts { + for &overlap in &overlap_factors { + let file_groups = generate_test_files(num_files, overlap); + // Benchmark original algorithm + group.bench_with_input( + BenchmarkId::new( + "original", + format!("files={},overlap={:.1}", num_files, overlap), + ), + &( + file_groups.clone(), + file_schema.clone(), + sort_ordering.clone(), + ), + |b, (fg, schema, order)| { + let mut result = Vec::new(); + b.iter(|| { + result = + FileScanConfig::split_groups_by_statistics(schema, fg, order) + .unwrap(); + }); + assert!(verify_sort_integrity(&result)); + }, + ); + + // Benchmark new algorithm with different target partitions + for &tp in &target_partitions { + group.bench_with_input( + BenchmarkId::new( + format!("v2_partitions={}", tp), + format!("files={},overlap={:.1}", num_files, overlap), + ), + &( + file_groups.clone(), + file_schema.clone(), + sort_ordering.clone(), + tp, + ), + |b, (fg, schema, order, target)| { + let mut result = Vec::new(); + b.iter(|| { + result = FileScanConfig::split_groups_by_statistics_with_target_partitions( + schema, fg, order, *target, + ) + .unwrap(); + }); + assert!(verify_sort_integrity(&result)); + }, + ); + } + } + } + + group.finish(); +} + +criterion_group!(benches, compare_split_groups_by_statistics_algorithms); +criterion_main!(benches); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 729283289caf..dc1659ca1cd0 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -858,6 +858,96 @@ impl FileScanConfig { }) } + /// Splits file groups into new groups based on statistics to enable efficient parallel processing. + /// + /// The method distributes files across a target number of partitions while ensuring + /// files within each partition maintain sort order based on their min/max statistics. + /// + /// The algorithm works by: + /// 1. Takes files sorted by minimum values + /// 2. For each file: + /// - Finds eligible groups (empty or where file's min > group's last max) + /// - Selects the smallest eligible group + /// - Creates a new group if needed + /// + /// # Parameters + /// * `table_schema`: Schema containing information about the columns + /// * `file_groups`: The original file groups to split + /// * `sort_order`: The lexicographical ordering to maintain within each group + /// * `target_partitions`: The desired number of output partitions + /// + /// # Returns + /// A new set of file groups, where files within each group are non-overlapping with respect to + /// their min/max statistics and maintain the specified sort order. + pub fn split_groups_by_statistics_with_target_partitions( + table_schema: &SchemaRef, + file_groups: &[FileGroup], + sort_order: &LexOrdering, + target_partitions: usize, + ) -> Result> { + if target_partitions == 0 { + return Err(DataFusionError::Internal( + "target_partitions must be greater than 0".to_string(), + )); + } + + let flattened_files = file_groups + .iter() + .flat_map(FileGroup::iter) + .collect::>(); + + if flattened_files.is_empty() { + return Ok(vec![]); + } + + let statistics = MinMaxStatistics::new_from_files( + sort_order, + table_schema, + None, + flattened_files.iter().copied(), + )?; + + let indices_sorted_by_min = statistics.min_values_sorted(); + + // Initialize with target_partitions empty groups + let mut file_groups_indices: Vec> = vec![vec![]; target_partitions]; + + for (idx, min) in indices_sorted_by_min { + if let Some((_, group)) = file_groups_indices + .iter_mut() + .enumerate() + .filter(|(_, group)| { + group.is_empty() + || min + > statistics + .max(*group.last().expect("groups should not be empty")) + }) + .min_by_key(|(_, group)| group.len()) + { + group.push(idx); + } else { + // Create a new group if no existing group fits + file_groups_indices.push(vec![idx]); + } + } + + // Remove any empty groups + file_groups_indices.retain(|group| !group.is_empty()); + + // Assemble indices back into groups of PartitionedFiles + Ok(file_groups_indices + .into_iter() + .map(|file_group_indices| { + FileGroup::new( + file_group_indices + .into_iter() + .map(|idx| flattened_files[idx].clone()) + .collect(), + ) + }) + .collect()) + } + /// Attempts to do a bin-packing on files into file groups, such that any two files /// in a file group are ordered and non-overlapping with respect to their statistics. /// It will produce the smallest number of file groups possible. @@ -1377,7 +1467,10 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue { #[cfg(test)] mod tests { - use crate::{test_util::MockSource, tests::aggr_test_schema}; + use crate::{ + generate_test_files, test_util::MockSource, tests::aggr_test_schema, + verify_sort_integrity, + }; use super::*; use arrow::{ @@ -2222,4 +2315,163 @@ mod tests { assert_eq!(new_config.constraints, Constraints::default()); assert!(new_config.new_lines_in_values); } + + #[test] + fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> { + use datafusion_common::DFSchema; + use datafusion_expr::{col, execution_props::ExecutionProps}; + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Float64, + false, + )])); + + // Setup sort expression + let exec_props = ExecutionProps::new(); + let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?; + let sort_expr = vec![col("value").sort(true, false)]; + + let physical_sort_exprs: Vec<_> = sort_expr + .iter() + .map(|expr| create_physical_sort_expr(expr, &df_schema, &exec_props).unwrap()) + .collect(); + + let sort_ordering = LexOrdering::from(physical_sort_exprs); + + // Test case parameters + struct TestCase { + name: String, + file_count: usize, + overlap_factor: f64, + target_partitions: usize, + expected_partition_count: usize, + } + + let test_cases = vec![ + // Basic cases + TestCase { + name: "no_overlap_10_files_4_partitions".to_string(), + file_count: 10, + overlap_factor: 0.0, + target_partitions: 4, + expected_partition_count: 4, + }, + TestCase { + name: "medium_overlap_20_files_5_partitions".to_string(), + file_count: 20, + overlap_factor: 0.5, + target_partitions: 5, + expected_partition_count: 5, + }, + TestCase { + name: "high_overlap_30_files_3_partitions".to_string(), + file_count: 30, + overlap_factor: 0.8, + target_partitions: 3, + expected_partition_count: 7, + }, + // Edge cases + TestCase { + name: "fewer_files_than_partitions".to_string(), + file_count: 3, + overlap_factor: 0.0, + target_partitions: 10, + expected_partition_count: 3, // Should only create as many partitions as files + }, + TestCase { + name: "single_file".to_string(), + file_count: 1, + overlap_factor: 0.0, + target_partitions: 5, + expected_partition_count: 1, // Should create only one partition + }, + TestCase { + name: "empty_files".to_string(), + file_count: 0, + overlap_factor: 0.0, + target_partitions: 3, + expected_partition_count: 0, // Empty result for empty input + }, + ]; + + for case in test_cases { + println!("Running test case: {}", case.name); + + // Generate files using bench utility function + let file_groups = generate_test_files(case.file_count, case.overlap_factor); + + // Call the function under test + let result = + FileScanConfig::split_groups_by_statistics_with_target_partitions( + &schema, + &file_groups, + &sort_ordering, + case.target_partitions, + )?; + + // Verify results + println!( + "Created {} partitions (target was {})", + result.len(), + case.target_partitions + ); + + // Check partition count + assert_eq!( + result.len(), + case.expected_partition_count, + "Case '{}': Unexpected partition count", + case.name + ); + + // Verify sort integrity + assert!( + verify_sort_integrity(&result), + "Case '{}': Files within partitions are not properly ordered", + case.name + ); + + // Distribution check for partitions + if case.file_count > 1 && case.expected_partition_count > 1 { + let group_sizes: Vec = result.iter().map(FileGroup::len).collect(); + let max_size = *group_sizes.iter().max().unwrap(); + let min_size = *group_sizes.iter().min().unwrap(); + + // Check partition balancing - difference shouldn't be extreme + let avg_files_per_partition = + case.file_count as f64 / case.expected_partition_count as f64; + assert!( + (max_size as f64) < 2.0 * avg_files_per_partition, + "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}", + case.name, + max_size, + avg_files_per_partition + ); + + println!( + "Distribution - min files: {}, max files: {}", + min_size, max_size + ); + } + } + + // Test error case: zero target partitions + let empty_groups: Vec = vec![]; + let err = FileScanConfig::split_groups_by_statistics_with_target_partitions( + &schema, + &empty_groups, + &sort_ordering, + 0, + ) + .unwrap_err(); + + assert!( + err.to_string() + .contains("target_partitions must be greater than 0"), + "Expected error for zero target partitions" + ); + + Ok(()) + } } diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index e4461c0b90a4..c02f84c74d64 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -44,25 +44,26 @@ pub mod source; mod statistics; #[cfg(test)] -mod test_util; +pub mod test_util; pub mod url; pub mod write; +pub use self::url::ListingTableUrl; +use crate::file_groups::FileGroup; use chrono::TimeZone; -use datafusion_common::Result; +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, Result}; use datafusion_common::{ScalarValue, Statistics}; use file_meta::FileMeta; use futures::{Stream, StreamExt}; use object_store::{path::Path, ObjectMeta}; use object_store::{GetOptions, GetRange, ObjectStore}; +pub use statistics::add_row_stats; +pub use statistics::compute_all_files_statistics; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; -pub use self::url::ListingTableUrl; -pub use statistics::add_row_stats; -pub use statistics::compute_all_files_statistics; - /// Stream of files get listed from object store pub type PartitionedFileStream = Pin> + Send + Sync + 'static>>; @@ -313,6 +314,115 @@ async fn find_first_newline( Ok(index) } +/// Generates test files with min-max statistics in different overlap patterns. +/// +/// Used by tests and benchmarks. +/// +/// # Overlap Factors +/// +/// The `overlap_factor` parameter controls how much the value ranges in generated test files overlap: +/// - `0.0`: No overlap between files (completely disjoint ranges) +/// - `0.2`: Low overlap (20% of the range size overlaps with adjacent files) +/// - `0.5`: Medium overlap (50% of ranges overlap) +/// - `0.8`: High overlap (80% of ranges overlap between files) +/// +/// # Examples +/// +/// With 5 files and different overlap factors showing `[min, max]` ranges: +/// +/// overlap_factor = 0.0 (no overlap): +/// +/// File 0: [0, 20] +/// File 1: [20, 40] +/// File 2: [40, 60] +/// File 3: [60, 80] +/// File 4: [80, 100] +/// +/// overlap_factor = 0.5 (50% overlap): +/// +/// File 0: [0, 40] +/// File 1: [20, 60] +/// File 2: [40, 80] +/// File 3: [60, 100] +/// File 4: [80, 120] +/// +/// overlap_factor = 0.8 (80% overlap): +/// +/// File 0: [0, 100] +/// File 1: [20, 120] +/// File 2: [40, 140] +/// File 3: [60, 160] +/// File 4: [80, 180] +pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec { + let mut files = Vec::with_capacity(num_files); + if num_files == 0 { + return vec![]; + } + let range_size = if overlap_factor == 0.0 { + 100 / num_files as i64 + } else { + (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64 + }; + + for i in 0..num_files { + let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64; + let min = base as f64; + let max = (base + range_size) as f64; + + let file = PartitionedFile { + object_meta: ObjectMeta { + location: Path::from(format!("file_{}.parquet", i)), + last_modified: chrono::Utc::now(), + size: 1000, + e_tag: None, + version: None, + }, + partition_values: vec![], + range: None, + statistics: Some(Arc::new(Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(1000), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Float64(Some(max))), + min_value: Precision::Exact(ScalarValue::Float64(Some(min))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + })), + extensions: None, + metadata_size_hint: None, + }; + files.push(file); + } + + vec![FileGroup::new(files)] +} + +// Helper function to verify that files within each group maintain sort order +/// Used by tests and benchmarks +pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool { + for group in file_groups { + let files = group.iter().collect::>(); + for i in 1..files.len() { + let prev_file = files[i - 1]; + let curr_file = files[i]; + + // Check if the min value of current file is greater than max value of previous file + if let (Some(prev_stats), Some(curr_stats)) = + (&prev_file.statistics, &curr_file.statistics) + { + let prev_max = &prev_stats.column_statistics[0].max_value; + let curr_min = &curr_stats.column_statistics[0].min_value; + if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() { + return false; + } + } + } + } + true +} + #[cfg(test)] mod tests { use super::ListingTableUrl; diff --git a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt index d325ca423dac..a10243f62720 100644 --- a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt @@ -109,7 +109,9 @@ ORDER BY int_col, bigint_col; logical_plan 01)Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST 02)--TableScan: test_table projection=[int_col, bigint_col] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[int_col, bigint_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet +physical_plan +01)SortPreservingMergeExec: [int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST] +02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet # Another planning test, but project on a column with unsupported statistics # We should be able to ignore this and look at only the relevant statistics @@ -123,7 +125,10 @@ logical_plan 02)--Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST 03)----Projection: test_table.string_col, test_table.int_col, test_table.bigint_col 04)------TableScan: test_table projection=[int_col, string_col, bigint_col] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[string_col], file_type=parquet +physical_plan +01)ProjectionExec: expr=[string_col@0 as string_col] +02)--SortPreservingMergeExec: [int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST] +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[string_col, int_col, bigint_col], output_ordering=[int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST], file_type=parquet # Clean up & recreate but sort on descending column statement ok @@ -155,7 +160,9 @@ ORDER BY descending_col DESC NULLS LAST, bigint_col ASC NULLS LAST; logical_plan 01)Sort: test_table.descending_col DESC NULLS LAST, test_table.bigint_col ASC NULLS LAST 02)--TableScan: test_table projection=[descending_col, bigint_col] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet]]}, projection=[descending_col, bigint_col], output_ordering=[descending_col@0 DESC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet +physical_plan +01)SortPreservingMergeExec: [descending_col@0 DESC NULLS LAST, bigint_col@1 ASC NULLS LAST] +02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[descending_col, bigint_col], output_ordering=[descending_col@0 DESC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet # Clean up & re-create with partition columns in sort order statement ok @@ -189,7 +196,9 @@ ORDER BY partition_col, int_col, bigint_col; logical_plan 01)Sort: test_table.partition_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST 02)--TableScan: test_table projection=[int_col, bigint_col, partition_col] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[int_col, bigint_col, partition_col], output_ordering=[partition_col@2 ASC NULLS LAST, int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet +physical_plan +01)SortPreservingMergeExec: [partition_col@2 ASC NULLS LAST, int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST] +02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, partition_col], output_ordering=[partition_col@2 ASC NULLS LAST, int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet # Clean up & re-create with overlapping column in sort order # This will test the ability to sort files with overlapping statistics