Skip to content

Commit 37f84a5

Browse files
committed
resolve conflict
1 parent e98116b commit 37f84a5

File tree

4 files changed

+89
-87
lines changed

4 files changed

+89
-87
lines changed

datafusion/datasource/benches/split_groups_by_statistics.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
use arrow::datatypes::{DataType, Field, Schema};
1919
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
2020
use datafusion_datasource::file_scan_config::FileScanConfig;
21-
use datafusion_datasource::test_util::generate_test_files;
22-
use datafusion_datasource::test_util::verify_sort_integrity;
21+
use datafusion_datasource::{generate_test_files, verify_sort_integrity};
2322
use datafusion_physical_expr::PhysicalSortExpr;
2423
use datafusion_physical_expr_common::sort_expr::LexOrdering;
2524
use std::sync::Arc;

datafusion/datasource/src/file_scan_config.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1467,7 +1467,10 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
14671467

14681468
#[cfg(test)]
14691469
mod tests {
1470-
use crate::{test_util::MockSource, tests::aggr_test_schema};
1470+
use crate::{
1471+
generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1472+
verify_sort_integrity,
1473+
};
14711474

14721475
use super::*;
14731476
use arrow::{
@@ -2311,9 +2314,10 @@ mod tests {
23112314
);
23122315
assert_eq!(new_config.constraints, Constraints::default());
23132316
assert!(new_config.new_lines_in_values);
2317+
}
2318+
2319+
#[test]
23142320
fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2315-
use crate::test_util::generate_test_files;
2316-
use crate::test_util::verify_sort_integrity;
23172321
use datafusion_common::DFSchema;
23182322
use datafusion_expr::{col, execution_props::ExecutionProps};
23192323

datafusion/datasource/src/mod.rs

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,26 +43,27 @@ pub mod sink;
4343
pub mod source;
4444
mod statistics;
4545

46-
#[cfg(any(test))]
46+
#[cfg(test)]
4747
pub mod test_util;
4848

4949
pub mod url;
5050
pub mod write;
51+
pub use self::url::ListingTableUrl;
52+
use crate::file_groups::FileGroup;
5153
use chrono::TimeZone;
52-
use datafusion_common::Result;
54+
use datafusion_common::stats::Precision;
55+
use datafusion_common::{ColumnStatistics, Result};
5356
use datafusion_common::{ScalarValue, Statistics};
5457
use file_meta::FileMeta;
5558
use futures::{Stream, StreamExt};
5659
use object_store::{path::Path, ObjectMeta};
5760
use object_store::{GetOptions, GetRange, ObjectStore};
61+
pub use statistics::add_row_stats;
62+
pub use statistics::compute_all_files_statistics;
5863
use std::ops::Range;
5964
use std::pin::Pin;
6065
use std::sync::Arc;
6166

62-
pub use self::url::ListingTableUrl;
63-
pub use statistics::add_row_stats;
64-
pub use statistics::compute_all_files_statistics;
65-
6667
/// Stream of files get listed from object store
6768
pub type PartitionedFileStream =
6869
Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
@@ -313,6 +314,78 @@ async fn find_first_newline(
313314
Ok(index)
314315
}
315316

317+
/// Generates test files with min-max statistics in different overlap patterns
318+
/// Used by tests and benchmarks
319+
pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
320+
let mut files = Vec::with_capacity(num_files);
321+
if num_files == 0 {
322+
return vec![];
323+
}
324+
let range_size = if overlap_factor == 0.0 {
325+
100 / num_files as i64
326+
} else {
327+
(100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
328+
};
329+
330+
for i in 0..num_files {
331+
let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
332+
let min = base as f64;
333+
let max = (base + range_size) as f64;
334+
335+
let file = PartitionedFile {
336+
object_meta: ObjectMeta {
337+
location: Path::from(format!("file_{}.parquet", i)),
338+
last_modified: chrono::Utc::now(),
339+
size: 1000,
340+
e_tag: None,
341+
version: None,
342+
},
343+
partition_values: vec![],
344+
range: None,
345+
statistics: Some(Arc::new(Statistics {
346+
num_rows: Precision::Exact(100),
347+
total_byte_size: Precision::Exact(1000),
348+
column_statistics: vec![ColumnStatistics {
349+
null_count: Precision::Exact(0),
350+
max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
351+
min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
352+
sum_value: Precision::Absent,
353+
distinct_count: Precision::Absent,
354+
}],
355+
})),
356+
extensions: None,
357+
metadata_size_hint: None,
358+
};
359+
files.push(file);
360+
}
361+
362+
vec![FileGroup::new(files)]
363+
}
364+
365+
// Helper function to verify that files within each group maintain sort order
366+
/// Used by tests and benchmarks
367+
pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
368+
for group in file_groups {
369+
let files = group.iter().collect::<Vec<_>>();
370+
for i in 1..files.len() {
371+
let prev_file = files[i - 1];
372+
let curr_file = files[i];
373+
374+
// Check if the min value of current file is greater than max value of previous file
375+
if let (Some(prev_stats), Some(curr_stats)) =
376+
(&prev_file.statistics, &curr_file.statistics)
377+
{
378+
let prev_max = &prev_stats.column_statistics[0].max_value;
379+
let curr_min = &curr_stats.column_statistics[0].min_value;
380+
if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
381+
return false;
382+
}
383+
}
384+
}
385+
}
386+
true
387+
}
388+
316389
#[cfg(test)]
317390
mod tests {
318391
use super::ListingTableUrl;

datafusion/datasource/src/test_util.rs

Lines changed: 2 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,14 @@
1717

1818
use crate::{
1919
file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener,
20-
PartitionedFile,
2120
};
2221

2322
use std::sync::Arc;
2423

25-
use crate::file_groups::FileGroup;
2624
use arrow::datatypes::SchemaRef;
27-
use datafusion_common::stats::Precision;
28-
use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics};
25+
use datafusion_common::{Result, Statistics};
2926
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
30-
use object_store::path::Path;
31-
use object_store::{ObjectMeta, ObjectStore};
27+
use object_store::ObjectStore;
3228

3329
/// Minimal [`crate::file::FileSource`] implementation for use in tests.
3430
#[derive(Clone, Default)]
@@ -85,73 +81,3 @@ impl FileSource for MockSource {
8581
"mock"
8682
}
8783
}
88-
89-
/// Generates test files with min-max statistics in different overlap patterns
90-
pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
91-
let mut files = Vec::with_capacity(num_files);
92-
if num_files == 0 {
93-
return vec![];
94-
}
95-
let range_size = if overlap_factor == 0.0 {
96-
100 / num_files as i64
97-
} else {
98-
(100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
99-
};
100-
101-
for i in 0..num_files {
102-
let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
103-
let min = base as f64;
104-
let max = (base + range_size) as f64;
105-
106-
let file = PartitionedFile {
107-
object_meta: ObjectMeta {
108-
location: Path::from(format!("file_{}.parquet", i)),
109-
last_modified: chrono::Utc::now(),
110-
size: 1000,
111-
e_tag: None,
112-
version: None,
113-
},
114-
partition_values: vec![],
115-
range: None,
116-
statistics: Some(Statistics {
117-
num_rows: Precision::Exact(100),
118-
total_byte_size: Precision::Exact(1000),
119-
column_statistics: vec![ColumnStatistics {
120-
null_count: Precision::Exact(0),
121-
max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
122-
min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
123-
sum_value: Precision::Absent,
124-
distinct_count: Precision::Absent,
125-
}],
126-
}),
127-
extensions: None,
128-
metadata_size_hint: None,
129-
};
130-
files.push(file);
131-
}
132-
133-
vec![FileGroup::new(files)]
134-
}
135-
136-
// Helper function to verify that files within each group maintain sort order
137-
pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
138-
for group in file_groups {
139-
let files = group.iter().collect::<Vec<_>>();
140-
for i in 1..files.len() {
141-
let prev_file = files[i - 1];
142-
let curr_file = files[i];
143-
144-
// Check if the min value of current file is greater than max value of previous file
145-
if let (Some(prev_stats), Some(curr_stats)) =
146-
(&prev_file.statistics, &curr_file.statistics)
147-
{
148-
let prev_max = &prev_stats.column_statistics[0].max_value;
149-
let curr_min = &curr_stats.column_statistics[0].min_value;
150-
if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
151-
return false;
152-
}
153-
}
154-
}
155-
}
156-
true
157-
}

0 commit comments

Comments
 (0)