Skip to content

Commit 75df701

Browse files
committed
Improve split_groups_by_statistics method
1 parent 46f4024 commit 75df701

File tree

4 files changed

+255
-0
lines changed

4 files changed

+255
-0
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource/Cargo.toml

+5
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ xz2 = { version = "0.1", optional = true, features = ["static"] }
7272
zstd = { version = "0.13", optional = true, default-features = false }
7373

7474
[dev-dependencies]
75+
criterion = { workspace = true }
7576
tempfile = { workspace = true }
7677

7778
[lints]
@@ -80,3 +81,7 @@ workspace = true
8081
[lib]
8182
name = "datafusion_datasource"
8283
path = "src/mod.rs"
84+
85+
[[bench]]
86+
name = "split_groups_by_statistics"
87+
harness = false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
use arrow::datatypes::{DataType, Field, Schema};
2+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
3+
use datafusion_common::stats::Precision;
4+
use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
5+
use datafusion_datasource::file_groups::FileGroup;
6+
use datafusion_datasource::file_scan_config::FileScanConfig;
7+
use datafusion_datasource::PartitionedFile;
8+
use datafusion_physical_expr::PhysicalSortExpr;
9+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
10+
use object_store::{path::Path, ObjectMeta};
11+
use std::sync::Arc;
12+
use std::time::Duration;
13+
14+
/// Generates test files with min-max statistics in different overlap patterns
15+
fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
16+
let mut files = Vec::with_capacity(num_files);
17+
let range_size = if overlap_factor == 0.0 {
18+
100 / num_files as i64
19+
} else {
20+
(100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
21+
};
22+
23+
for i in 0..num_files {
24+
let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
25+
let min = base as f64;
26+
let max = (base + range_size) as f64;
27+
28+
let file = PartitionedFile {
29+
object_meta: ObjectMeta {
30+
location: Path::from(format!("file_{}.parquet", i)),
31+
last_modified: chrono::Utc::now(),
32+
size: 1000,
33+
e_tag: None,
34+
version: None,
35+
},
36+
partition_values: vec![],
37+
range: None,
38+
statistics: Some(Statistics {
39+
num_rows: Precision::Exact(100),
40+
total_byte_size: Precision::Exact(1000),
41+
column_statistics: vec![ColumnStatistics {
42+
null_count: Precision::Exact(0),
43+
max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
44+
min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
45+
sum_value: Precision::Absent,
46+
distinct_count: Precision::Absent,
47+
}],
48+
}),
49+
extensions: None,
50+
metadata_size_hint: None,
51+
};
52+
files.push(file);
53+
}
54+
55+
vec![FileGroup::new(files)]
56+
}
57+
58+
pub fn compare_split_groups_by_statistics_algorithms(c: &mut Criterion) {
59+
let file_schema = Arc::new(Schema::new(vec![Field::new(
60+
"value",
61+
DataType::Float64,
62+
false,
63+
)]));
64+
65+
let sort_expr = PhysicalSortExpr {
66+
expr: Arc::new(datafusion_physical_expr::expressions::Column::new(
67+
"value", 0,
68+
)),
69+
options: arrow::compute::SortOptions::default(),
70+
};
71+
let sort_ordering = LexOrdering::from(vec![sort_expr]);
72+
73+
let file_counts = [10, 100, 1000]; // Small, medium, large number of files
74+
let overlap_factors = [0.0, 0.2, 0.5, 0.8]; // Low, medium, high overlap
75+
76+
let target_partitions: [usize; 4] = [4, 8, 16, 32];
77+
78+
let mut group = c.benchmark_group("file_distribution_algorithms");
79+
group.measurement_time(Duration::from_secs(10));
80+
81+
for &num_files in &file_counts {
82+
for &overlap in &overlap_factors {
83+
let file_groups = generate_test_files(num_files, overlap);
84+
// Benchmark original algorithm
85+
group.bench_with_input(
86+
BenchmarkId::new(
87+
"original",
88+
format!("files={},overlap={:.1}", num_files, overlap),
89+
),
90+
&(
91+
file_groups.clone(),
92+
file_schema.clone(),
93+
sort_ordering.clone(),
94+
),
95+
|b, (fg, schema, order)| {
96+
b.iter(|| {
97+
let file_groups =
98+
FileScanConfig::split_groups_by_statistics(schema, fg, order)
99+
.unwrap();
100+
assert!(verify_sort_integrity(&file_groups));
101+
});
102+
},
103+
);
104+
105+
// Benchmark new algorithm with different target partitions
106+
for &tp in &target_partitions {
107+
group.bench_with_input(
108+
BenchmarkId::new(
109+
format!("v2_partitions={}", tp),
110+
format!("files={},overlap={:.1}", num_files, overlap),
111+
),
112+
&(
113+
file_groups.clone(),
114+
file_schema.clone(),
115+
sort_ordering.clone(),
116+
tp,
117+
),
118+
|b, (fg, schema, order, target)| {
119+
b.iter(|| {
120+
let file_groups =
121+
FileScanConfig::split_groups_by_statistics_v2(
122+
schema, fg, order, *target,
123+
)
124+
.unwrap();
125+
assert!(verify_sort_integrity(&file_groups));
126+
});
127+
},
128+
);
129+
}
130+
}
131+
}
132+
133+
group.finish();
134+
}
135+
136+
// Helper function to verify that files within each group maintain sort order
137+
fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
138+
for group in file_groups {
139+
let files = group.iter().collect::<Vec<_>>();
140+
for i in 1..files.len() {
141+
let prev_file = files[i - 1];
142+
let curr_file = files[i];
143+
144+
// Check if the min value of current file is greater than max value of previous file
145+
if let (Some(prev_stats), Some(curr_stats)) =
146+
(&prev_file.statistics, &curr_file.statistics)
147+
{
148+
let prev_max = &prev_stats.column_statistics[0].max_value;
149+
let curr_min = &curr_stats.column_statistics[0].min_value;
150+
if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
151+
return false;
152+
}
153+
}
154+
}
155+
}
156+
true
157+
}
158+
159+
criterion_group!(benches, compare_split_groups_by_statistics_algorithms);
160+
criterion_main!(benches);

datafusion/datasource/src/file_scan_config.rs

+89
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,95 @@ impl FileScanConfig {
575575
})
576576
}
577577

578+
/// Splits file groups into new groups based on statistics to enable efficient parallel processing.
579+
///
580+
/// The method distributes files across a target number of partitions while ensuring
581+
/// files within each partition maintain sort order based on their min/max statistics.
582+
///
583+
/// The algorithm works by:
584+
/// 1. Sorting all files by their minimum values
585+
/// 2. Trying to place each file into an existing group where it can maintain sort order
586+
/// 3. Creating new groups when necessary if a file cannot fit into existing groups
587+
/// 4. Prioritizing smaller groups when multiple suitable groups exist (for load balancing)
588+
///
589+
/// # Parameters
590+
/// * `table_schema`: Schema containing information about the columns
591+
/// * `file_groups`: The original file groups to split
592+
/// * `sort_order`: The lexicographical ordering to maintain within each group
593+
/// * `target_partitions`: The desired number of output partitions
594+
///
595+
/// # Returns
596+
/// A new set of file groups, where files within each group are non-overlapping with respect to
597+
/// their min/max statistics and maintain the specified sort order.
598+
pub fn split_groups_by_statistics_v2(
599+
table_schema: &SchemaRef,
600+
file_groups: &[FileGroup],
601+
sort_order: &LexOrdering,
602+
target_partitions: usize,
603+
) -> Result<Vec<FileGroup>> {
604+
let flattened_files = file_groups
605+
.iter()
606+
.flat_map(FileGroup::iter)
607+
.collect::<Vec<_>>();
608+
609+
if flattened_files.is_empty() {
610+
return Ok(vec![]);
611+
}
612+
613+
let statistics = MinMaxStatistics::new_from_files(
614+
sort_order,
615+
table_schema,
616+
None,
617+
flattened_files.iter().copied(),
618+
)?;
619+
620+
let indices_sorted_by_min = statistics.min_values_sorted();
621+
622+
// Initialize with target_partitions empty groups
623+
let mut file_groups_indices: Vec<Vec<usize>> =
624+
vec![vec![]; target_partitions.max(1)];
625+
626+
for (idx, min) in indices_sorted_by_min {
627+
// Find all groups where the file can fit
628+
let mut suitable_groups: Vec<(usize, &mut Vec<usize>)> = file_groups_indices
629+
.iter_mut()
630+
.enumerate()
631+
.filter(|(_, group)| {
632+
group.is_empty()
633+
|| min
634+
> statistics
635+
.max(*group.last().expect("groups should not be empty"))
636+
})
637+
.collect();
638+
639+
// Sort by group size to prioritize smaller groups
640+
suitable_groups.sort_by_key(|(_, group)| group.len());
641+
642+
if let Some((_, group)) = suitable_groups.first_mut() {
643+
group.push(idx);
644+
} else {
645+
// Create a new group if no existing group fits
646+
file_groups_indices.push(vec![idx]);
647+
}
648+
}
649+
650+
// Remove any empty groups
651+
file_groups_indices.retain(|group| !group.is_empty());
652+
653+
// Assemble indices back into groups of PartitionedFiles
654+
Ok(file_groups_indices
655+
.into_iter()
656+
.map(|file_group_indices| {
657+
FileGroup::new(
658+
file_group_indices
659+
.into_iter()
660+
.map(|idx| flattened_files[idx].clone())
661+
.collect(),
662+
)
663+
})
664+
.collect())
665+
}
666+
578667
/// Attempts to do a bin-packing on files into file groups, such that any two files
579668
/// in a file group are ordered and non-overlapping with respect to their statistics.
580669
/// It will produce the smallest number of file groups possible.

0 commit comments

Comments
 (0)