Skip to content

Commit e98116b

Browse files
committed
add tests
1 parent 7fe0354 commit e98116b

File tree

4 files changed

+238
-76
lines changed

4 files changed

+238
-76
lines changed

datafusion/datasource/benches/split_groups_by_statistics.rs

Lines changed: 2 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -17,61 +17,14 @@
1717

1818
use arrow::datatypes::{DataType, Field, Schema};
1919
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
20-
use datafusion_common::stats::Precision;
21-
use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
22-
use datafusion_datasource::file_groups::FileGroup;
2320
use datafusion_datasource::file_scan_config::FileScanConfig;
24-
use datafusion_datasource::PartitionedFile;
21+
use datafusion_datasource::test_util::generate_test_files;
22+
use datafusion_datasource::test_util::verify_sort_integrity;
2523
use datafusion_physical_expr::PhysicalSortExpr;
2624
use datafusion_physical_expr_common::sort_expr::LexOrdering;
27-
use object_store::{path::Path, ObjectMeta};
2825
use std::sync::Arc;
2926
use std::time::Duration;
3027

31-
/// Generates test files with min-max statistics in different overlap patterns
32-
fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
33-
let mut files = Vec::with_capacity(num_files);
34-
let range_size = if overlap_factor == 0.0 {
35-
100 / num_files as i64
36-
} else {
37-
(100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
38-
};
39-
40-
for i in 0..num_files {
41-
let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
42-
let min = base as f64;
43-
let max = (base + range_size) as f64;
44-
45-
let file = PartitionedFile {
46-
object_meta: ObjectMeta {
47-
location: Path::from(format!("file_{}.parquet", i)),
48-
last_modified: chrono::Utc::now(),
49-
size: 1000,
50-
e_tag: None,
51-
version: None,
52-
},
53-
partition_values: vec![],
54-
range: None,
55-
statistics: Some(Statistics {
56-
num_rows: Precision::Exact(100),
57-
total_byte_size: Precision::Exact(1000),
58-
column_statistics: vec![ColumnStatistics {
59-
null_count: Precision::Exact(0),
60-
max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
61-
min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
62-
sum_value: Precision::Absent,
63-
distinct_count: Precision::Absent,
64-
}],
65-
}),
66-
extensions: None,
67-
metadata_size_hint: None,
68-
};
69-
files.push(file);
70-
}
71-
72-
vec![FileGroup::new(files)]
73-
}
74-
7528
pub fn compare_split_groups_by_statistics_algorithms(c: &mut Criterion) {
7629
let file_schema = Arc::new(Schema::new(vec![Field::new(
7730
"value",
@@ -180,28 +133,5 @@ pub fn compare_split_groups_by_statistics_algorithms(c: &mut Criterion) {
180133
group.finish();
181134
}
182135

183-
// Helper function to verify that files within each group maintain sort order
184-
fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
185-
for group in file_groups {
186-
let files = group.iter().collect::<Vec<_>>();
187-
for i in 1..files.len() {
188-
let prev_file = files[i - 1];
189-
let curr_file = files[i];
190-
191-
// Check if the min value of current file is greater than max value of previous file
192-
if let (Some(prev_stats), Some(curr_stats)) =
193-
(&prev_file.statistics, &curr_file.statistics)
194-
{
195-
let prev_max = &prev_stats.column_statistics[0].max_value;
196-
let curr_min = &curr_stats.column_statistics[0].min_value;
197-
if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
198-
return false;
199-
}
200-
}
201-
}
202-
}
203-
true
204-
}
205-
206136
criterion_group!(benches, compare_split_groups_by_statistics_algorithms);
207137
criterion_main!(benches);

datafusion/datasource/src/file_scan_config.rs

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2311,5 +2311,163 @@ mod tests {
23112311
);
23122312
assert_eq!(new_config.constraints, Constraints::default());
23132313
assert!(new_config.new_lines_in_values);
2314+
fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2315+
use crate::test_util::generate_test_files;
2316+
use crate::test_util::verify_sort_integrity;
2317+
use datafusion_common::DFSchema;
2318+
use datafusion_expr::{col, execution_props::ExecutionProps};
2319+
2320+
let schema = Arc::new(Schema::new(vec![Field::new(
2321+
"value",
2322+
DataType::Float64,
2323+
false,
2324+
)]));
2325+
2326+
// Setup sort expression
2327+
let exec_props = ExecutionProps::new();
2328+
let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2329+
let sort_expr = vec![col("value").sort(true, false)];
2330+
2331+
let physical_sort_exprs: Vec<_> = sort_expr
2332+
.iter()
2333+
.map(|expr| create_physical_sort_expr(expr, &df_schema, &exec_props).unwrap())
2334+
.collect();
2335+
2336+
let sort_ordering = LexOrdering::from(physical_sort_exprs);
2337+
2338+
// Test case parameters
2339+
struct TestCase {
2340+
name: String,
2341+
file_count: usize,
2342+
overlap_factor: f64,
2343+
target_partitions: usize,
2344+
expected_partition_count: usize,
2345+
}
2346+
2347+
let test_cases = vec![
2348+
// Basic cases
2349+
TestCase {
2350+
name: "no_overlap_10_files_4_partitions".to_string(),
2351+
file_count: 10,
2352+
overlap_factor: 0.0,
2353+
target_partitions: 4,
2354+
expected_partition_count: 4,
2355+
},
2356+
TestCase {
2357+
name: "medium_overlap_20_files_5_partitions".to_string(),
2358+
file_count: 20,
2359+
overlap_factor: 0.5,
2360+
target_partitions: 5,
2361+
expected_partition_count: 5,
2362+
},
2363+
TestCase {
2364+
name: "high_overlap_30_files_3_partitions".to_string(),
2365+
file_count: 30,
2366+
overlap_factor: 0.8,
2367+
target_partitions: 3,
2368+
expected_partition_count: 7,
2369+
},
2370+
// Edge cases
2371+
TestCase {
2372+
name: "fewer_files_than_partitions".to_string(),
2373+
file_count: 3,
2374+
overlap_factor: 0.0,
2375+
target_partitions: 10,
2376+
expected_partition_count: 3, // Should only create as many partitions as files
2377+
},
2378+
TestCase {
2379+
name: "single_file".to_string(),
2380+
file_count: 1,
2381+
overlap_factor: 0.0,
2382+
target_partitions: 5,
2383+
expected_partition_count: 1, // Should create only one partition
2384+
},
2385+
TestCase {
2386+
name: "empty_files".to_string(),
2387+
file_count: 0,
2388+
overlap_factor: 0.0,
2389+
target_partitions: 3,
2390+
expected_partition_count: 0, // Empty result for empty input
2391+
},
2392+
];
2393+
2394+
for case in test_cases {
2395+
println!("Running test case: {}", case.name);
2396+
2397+
// Generate files using bench utility function
2398+
let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2399+
2400+
// Call the function under test
2401+
let result =
2402+
FileScanConfig::split_groups_by_statistics_with_target_partitions(
2403+
&schema,
2404+
&file_groups,
2405+
&sort_ordering,
2406+
case.target_partitions,
2407+
)?;
2408+
2409+
// Verify results
2410+
println!(
2411+
"Created {} partitions (target was {})",
2412+
result.len(),
2413+
case.target_partitions
2414+
);
2415+
2416+
// Check partition count
2417+
assert_eq!(
2418+
result.len(),
2419+
case.expected_partition_count,
2420+
"Case '{}': Unexpected partition count",
2421+
case.name
2422+
);
2423+
2424+
// Verify sort integrity
2425+
assert!(
2426+
verify_sort_integrity(&result),
2427+
"Case '{}': Files within partitions are not properly ordered",
2428+
case.name
2429+
);
2430+
2431+
// Distribution check for partitions
2432+
if case.file_count > 1 && case.expected_partition_count > 1 {
2433+
let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2434+
let max_size = *group_sizes.iter().max().unwrap();
2435+
let min_size = *group_sizes.iter().min().unwrap();
2436+
2437+
// Check partition balancing - difference shouldn't be extreme
2438+
let avg_files_per_partition =
2439+
case.file_count as f64 / case.expected_partition_count as f64;
2440+
assert!(
2441+
(max_size as f64) < 2.0 * avg_files_per_partition,
2442+
"Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2443+
case.name,
2444+
max_size,
2445+
avg_files_per_partition
2446+
);
2447+
2448+
println!(
2449+
"Distribution - min files: {}, max files: {}",
2450+
min_size, max_size
2451+
);
2452+
}
2453+
}
2454+
2455+
// Test error case: zero target partitions
2456+
let empty_groups: Vec<FileGroup> = vec![];
2457+
let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2458+
&schema,
2459+
&empty_groups,
2460+
&sort_ordering,
2461+
0,
2462+
)
2463+
.unwrap_err();
2464+
2465+
assert!(
2466+
err.to_string()
2467+
.contains("target_partitions must be greater than 0"),
2468+
"Expected error for zero target partitions"
2469+
);
2470+
2471+
Ok(())
23142472
}
23152473
}

datafusion/datasource/src/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ pub mod sink;
4343
pub mod source;
4444
mod statistics;
4545

46-
#[cfg(test)]
47-
mod test_util;
46+
#[cfg(any(test))]
47+
pub mod test_util;
4848

4949
pub mod url;
5050
pub mod write;

datafusion/datasource/src/test_util.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717

1818
use crate::{
1919
file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener,
20+
PartitionedFile,
2021
};
2122

2223
use std::sync::Arc;
2324

25+
use crate::file_groups::FileGroup;
2426
use arrow::datatypes::SchemaRef;
25-
use datafusion_common::{Result, Statistics};
27+
use datafusion_common::stats::Precision;
28+
use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics};
2629
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
27-
use object_store::ObjectStore;
30+
use object_store::path::Path;
31+
use object_store::{ObjectMeta, ObjectStore};
2832

2933
/// Minimal [`crate::file::FileSource`] implementation for use in tests.
3034
#[derive(Clone, Default)]
@@ -81,3 +85,73 @@ impl FileSource for MockSource {
8185
"mock"
8286
}
8387
}
88+
89+
/// Generates test files with min-max statistics in different overlap patterns
90+
pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
91+
let mut files = Vec::with_capacity(num_files);
92+
if num_files == 0 {
93+
return vec![];
94+
}
95+
let range_size = if overlap_factor == 0.0 {
96+
100 / num_files as i64
97+
} else {
98+
(100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
99+
};
100+
101+
for i in 0..num_files {
102+
let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
103+
let min = base as f64;
104+
let max = (base + range_size) as f64;
105+
106+
let file = PartitionedFile {
107+
object_meta: ObjectMeta {
108+
location: Path::from(format!("file_{}.parquet", i)),
109+
last_modified: chrono::Utc::now(),
110+
size: 1000,
111+
e_tag: None,
112+
version: None,
113+
},
114+
partition_values: vec![],
115+
range: None,
116+
statistics: Some(Statistics {
117+
num_rows: Precision::Exact(100),
118+
total_byte_size: Precision::Exact(1000),
119+
column_statistics: vec![ColumnStatistics {
120+
null_count: Precision::Exact(0),
121+
max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
122+
min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
123+
sum_value: Precision::Absent,
124+
distinct_count: Precision::Absent,
125+
}],
126+
}),
127+
extensions: None,
128+
metadata_size_hint: None,
129+
};
130+
files.push(file);
131+
}
132+
133+
vec![FileGroup::new(files)]
134+
}
135+
136+
// Helper function to verify that files within each group maintain sort order
137+
pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
138+
for group in file_groups {
139+
let files = group.iter().collect::<Vec<_>>();
140+
for i in 1..files.len() {
141+
let prev_file = files[i - 1];
142+
let curr_file = files[i];
143+
144+
// Check if the min value of current file is greater than max value of previous file
145+
if let (Some(prev_stats), Some(curr_stats)) =
146+
(&prev_file.statistics, &curr_file.statistics)
147+
{
148+
let prev_max = &prev_stats.column_statistics[0].max_value;
149+
let curr_min = &curr_stats.column_statistics[0].min_value;
150+
if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
151+
return false;
152+
}
153+
}
154+
}
155+
}
156+
true
157+
}

0 commit comments

Comments
 (0)