Skip to content

Introduce load-balanced split_groups_by_statistics method #15473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,10 +899,11 @@ impl TableProvider for ListingTable {
.split_file_groups_by_statistics
.then(|| {
output_ordering.first().map(|output_ordering| {
FileScanConfig::split_groups_by_statistics(
FileScanConfig::split_groups_by_statistics_with_target_partitions(
&self.table_schema,
&partitioned_file_lists,
output_ordering,
self.options.target_partitions,
)
})
})
Expand Down
5 changes: 5 additions & 0 deletions datafusion/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ xz2 = { version = "0.1", optional = true, features = ["static"] }
zstd = { version = "0.13", optional = true, default-features = false }

[dev-dependencies]
criterion = { workspace = true }
tempfile = { workspace = true }

[lints]
Expand All @@ -80,3 +81,7 @@ workspace = true
[lib]
name = "datafusion_datasource"
path = "src/mod.rs"

[[bench]]
name = "split_groups_by_statistics"
harness = false
108 changes: 108 additions & 0 deletions datafusion/datasource/benches/split_groups_by_statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::{DataType, Field, Schema};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::{generate_test_files, verify_sort_integrity};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use std::sync::Arc;
use std::time::Duration;

pub fn compare_split_groups_by_statistics_algorithms(c: &mut Criterion) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spilt_groups/original/files=10,overlap=0.0
                        time:   [3.0516 µs 3.0722 µs 3.0961 µs]
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=4/files=10,overlap=0.0
                        time:   [3.2514 µs 3.2801 µs 3.3107 µs]
Found 5 outliers among 100 measurements (5.00%)
  3 (3.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=8/files=10,overlap=0.0
                        time:   [3.5700 µs 3.6058 µs 3.6545 µs]
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=16/files=10,overlap=0.0
                        time:   [4.0533 µs 4.0819 µs 4.1139 µs]
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high severe
spilt_groups/v2_partitions=32/files=10,overlap=0.0
                        time:   [5.3031 µs 5.3521 µs 5.4152 µs]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe
spilt_groups/original/files=10,overlap=0.2
                        time:   [2.9694 µs 2.9908 µs 3.0118 µs]
Found 11 outliers among 100 measurements (11.00%)
  8 (8.00%) low mild
  1 (1.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=4/files=10,overlap=0.2
                        time:   [3.2044 µs 3.2299 µs 3.2578 µs]
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=8/files=10,overlap=0.2
                        time:   [3.5383 µs 3.5644 µs 3.5964 µs]
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high severe
spilt_groups/v2_partitions=16/files=10,overlap=0.2
                        time:   [4.0280 µs 4.0516 µs 4.0780 µs]
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) low mild
  2 (2.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=32/files=10,overlap=0.2
                        time:   [5.3014 µs 5.3424 µs 5.3850 µs]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
spilt_groups/original/files=10,overlap=0.5
                        time:   [2.9601 µs 2.9801 µs 3.0023 µs]
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) low mild
  2 (2.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=4/files=10,overlap=0.5
                        time:   [3.1908 µs 3.2223 µs 3.2589 µs]
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe
spilt_groups/v2_partitions=8/files=10,overlap=0.5
                        time:   [3.5209 µs 3.5408 µs 3.5632 µs]
Found 9 outliers among 100 measurements (9.00%)
  6 (6.00%) low mild
  1 (1.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=16/files=10,overlap=0.5
                        time:   [3.9969 µs 4.0221 µs 4.0511 µs]
Found 9 outliers among 100 measurements (9.00%)
  3 (3.00%) low mild
  3 (3.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=32/files=10,overlap=0.5
                        time:   [5.1889 µs 5.2249 µs 5.2662 µs]
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
spilt_groups/original/files=10,overlap=0.8
                        time:   [3.1625 µs 3.1933 µs 3.2307 µs]
Found 14 outliers among 100 measurements (14.00%)
  7 (7.00%) high mild
  7 (7.00%) high severe
spilt_groups/v2_partitions=4/files=10,overlap=0.8
                        time:   [3.2975 µs 3.3533 µs 3.4137 µs]
Found 14 outliers among 100 measurements (14.00%)
  6 (6.00%) high mild
  8 (8.00%) high severe
spilt_groups/v2_partitions=8/files=10,overlap=0.8
                        time:   [3.3652 µs 3.3873 µs 3.4134 µs]
Found 12 outliers among 100 measurements (12.00%)
  4 (4.00%) low mild
  4 (4.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=16/files=10,overlap=0.8
                        time:   [3.9061 µs 3.9289 µs 3.9574 µs]
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) low mild
  3 (3.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=32/files=10,overlap=0.8
                        time:   [4.9453 µs 4.9941 µs 5.0547 µs]
Found 8 outliers among 100 measurements (8.00%)
  2 (2.00%) low mild
  2 (2.00%) high mild
  4 (4.00%) high severe
spilt_groups/original/files=100,overlap=0.0
                        time:   [14.800 µs 15.014 µs 15.261 µs]
Found 14 outliers among 100 measurements (14.00%)
  6 (6.00%) high mild
  8 (8.00%) high severe
Benchmarking spilt_groups/v2_partitions=4/files=100,overlap=0.0: Collecting 100 samples in estimated 10.081 s (571k iterations)
spilt_groups/v2_partitions=4/files=100,overlap=0.0
                        time:   [16.761 µs 16.913 µs 17.124 µs]
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) high mild
  5 (5.00%) high severe
spilt_groups/v2_partitions=8/files=100,overlap=0.0
                        time:   [20.542 µs 20.629 µs 20.747 µs]
Found 8 outliers among 100 measurements (8.00%)
  3 (3.00%) high mild
  5 (5.00%) high severe
spilt_groups/v2_partitions=16/files=100,overlap=0.0
                        time:   [26.596 µs 26.717 µs 26.884 µs]
Found 11 outliers among 100 measurements (11.00%)
  3 (3.00%) low mild
  5 (5.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=32/files=100,overlap=0.0
                        time:   [40.912 µs 41.096 µs 41.342 µs]
Found 16 outliers among 100 measurements (16.00%)
  5 (5.00%) low mild
  6 (6.00%) high mild
  5 (5.00%) high severe
spilt_groups/original/files=100,overlap=0.2
                        time:   [14.420 µs 14.486 µs 14.573 µs]
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=4/files=100,overlap=0.2
                        time:   [16.644 µs 16.707 µs 16.788 µs]
Found 8 outliers among 100 measurements (8.00%)
  5 (5.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=8/files=100,overlap=0.2
                        time:   [20.601 µs 20.702 µs 20.826 µs]
Found 9 outliers among 100 measurements (9.00%)
  6 (6.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=16/files=100,overlap=0.2
                        time:   [26.557 µs 26.738 µs 26.968 µs]
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) high mild
  7 (7.00%) high severe
spilt_groups/v2_partitions=32/files=100,overlap=0.2
                        time:   [41.131 µs 41.329 µs 41.588 µs]
Found 19 outliers among 100 measurements (19.00%)
  3 (3.00%) low severe
  9 (9.00%) low mild
  3 (3.00%) high mild
  4 (4.00%) high severe
spilt_groups/original/files=100,overlap=0.5
                        time:   [14.826 µs 14.897 µs 14.987 µs]
Found 11 outliers among 100 measurements (11.00%)
  1 (1.00%) low mild
  7 (7.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=4/files=100,overlap=0.5
                        time:   [16.583 µs 16.648 µs 16.726 µs]
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high severe
spilt_groups/v2_partitions=8/files=100,overlap=0.5
                        time:   [20.292 µs 20.413 µs 20.559 µs]
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=16/files=100,overlap=0.5
                        time:   [26.199 µs 26.295 µs 26.419 µs]
Found 8 outliers among 100 measurements (8.00%)
  4 (4.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=32/files=100,overlap=0.5
                        time:   [40.980 µs 41.346 µs 41.841 µs]
Found 10 outliers among 100 measurements (10.00%)
  3 (3.00%) low mild
  3 (3.00%) high mild
  4 (4.00%) high severe
spilt_groups/original/files=100,overlap=0.8
                        time:   [16.473 µs 16.544 µs 16.640 µs]
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high severe
spilt_groups/v2_partitions=4/files=100,overlap=0.8
                        time:   [20.103 µs 20.190 µs 20.301 µs]
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) low mild
  4 (4.00%) high mild
  2 (2.00%) high severe
spilt_groups/v2_partitions=8/files=100,overlap=0.8
                        time:   [20.099 µs 20.221 µs 20.404 µs]
Found 4 outliers among 100 measurements (4.00%)
  4 (4.00%) high severe
spilt_groups/v2_partitions=16/files=100,overlap=0.8
                        time:   [24.757 µs 24.870 µs 25.012 µs]
Found 7 outliers among 100 measurements (7.00%)
  4 (4.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=32/files=100,overlap=0.8
                        time:   [39.388 µs 39.585 µs 39.818 µs]
Found 15 outliers among 100 measurements (15.00%)
  10 (10.00%) low mild
  2 (2.00%) high mild
  3 (3.00%) high severe
spilt_groups/original/files=1000,overlap=0.0
                        time:   [1.2800 ms 1.2860 ms 1.2936 ms]
Found 17 outliers among 100 measurements (17.00%)
  6 (6.00%) high mild
  11 (11.00%) high severe
spilt_groups/v2_partitions=4/files=1000,overlap=0.0
                        time:   [1.1734 ms 1.1810 ms 1.1896 ms]
Found 9 outliers among 100 measurements (9.00%)
  3 (3.00%) high mild
  6 (6.00%) high severe
spilt_groups/v2_partitions=8/files=1000,overlap=0.0
                        time:   [1.1556 ms 1.1586 ms 1.1623 ms]
Found 8 outliers among 100 measurements (8.00%)
  1 (1.00%) low mild
  2 (2.00%) high mild
  5 (5.00%) high severe
spilt_groups/v2_partitions=16/files=1000,overlap=0.0
                        time:   [1.1644 ms 1.1768 ms 1.1913 ms]
Found 14 outliers among 100 measurements (14.00%)
  3 (3.00%) high mild
  11 (11.00%) high severe
spilt_groups/v2_partitions=32/files=1000,overlap=0.0
                        time:   [1.1643 ms 1.1751 ms 1.1883 ms]
Found 17 outliers among 100 measurements (17.00%)
  6 (6.00%) high mild
  11 (11.00%) high severe
spilt_groups/original/files=1000,overlap=0.2
                        time:   [126.31 µs 127.47 µs 128.74 µs]
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) high mild
  5 (5.00%) high severe
spilt_groups/v2_partitions=4/files=1000,overlap=0.2
                        time:   [147.86 µs 148.99 µs 150.41 µs]
Found 9 outliers among 100 measurements (9.00%)
  4 (4.00%) high mild
  5 (5.00%) high severe
spilt_groups/v2_partitions=8/files=1000,overlap=0.2
                        time:   [187.18 µs 188.91 µs 190.86 µs]
Found 9 outliers among 100 measurements (9.00%)
  5 (5.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=16/files=1000,overlap=0.2
                        time:   [246.99 µs 247.96 µs 249.06 µs]
Found 5 outliers among 100 measurements (5.00%)
  5 (5.00%) high severe
spilt_groups/v2_partitions=32/files=1000,overlap=0.2
                        time:   [392.19 µs 395.67 µs 399.84 µs]
Found 9 outliers among 100 measurements (9.00%)
  9 (9.00%) high severe
spilt_groups/original/files=1000,overlap=0.5
                        time:   [131.38 µs 132.60 µs 133.98 µs]
Found 15 outliers among 100 measurements (15.00%)
  6 (6.00%) low mild
  3 (3.00%) high mild
  6 (6.00%) high severe
spilt_groups/v2_partitions=4/files=1000,overlap=0.5
                        time:   [146.93 µs 148.18 µs 149.60 µs]
Found 9 outliers among 100 measurements (9.00%)
  3 (3.00%) low mild
  3 (3.00%) high mild
  3 (3.00%) high severe
spilt_groups/v2_partitions=8/files=1000,overlap=0.5
                        time:   [185.27 µs 187.78 µs 191.07 µs]
Found 12 outliers among 100 measurements (12.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  8 (8.00%) high severe
spilt_groups/v2_partitions=16/files=1000,overlap=0.5
                        time:   [245.92 µs 247.51 µs 249.31 µs]
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) low mild
  2 (2.00%) high mild
  4 (4.00%) high severe
spilt_groups/v2_partitions=32/files=1000,overlap=0.5
                        time:   [388.99 µs 392.62 µs 397.14 µs]
Found 9 outliers among 100 measurements (9.00%)
  4 (4.00%) high mild
  5 (5.00%) high severe
spilt_groups/original/files=1000,overlap=0.8
                        time:   [141.78 µs 143.24 µs 144.92 µs]
Found 11 outliers among 100 measurements (11.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  7 (7.00%) high severe
spilt_groups/v2_partitions=4/files=1000,overlap=0.8
                        time:   [181.40 µs 182.83 µs 184.52 µs]
Found 18 outliers among 100 measurements (18.00%)
  11 (11.00%) low mild
  1 (1.00%) high mild
  6 (6.00%) high severe
spilt_groups/v2_partitions=8/files=1000,overlap=0.8
                        time:   [183.39 µs 187.03 µs 191.27 µs]
Found 16 outliers among 100 measurements (16.00%)
  4 (4.00%) high mild
  12 (12.00%) high severe
spilt_groups/v2_partitions=16/files=1000,overlap=0.8
                        time:   [226.57 µs 228.69 µs 231.41 µs]
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) high mild
  7 (7.00%) high severe
spilt_groups/v2_partitions=32/files=1000,overlap=0.8
                        time:   [387.48 µs 395.02 µs 404.59 µs]
Found 12 outliers among 100 measurements (12.00%)
  7 (7.00%) high mild
  5 (5.00%) high severe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execution Time

  • The original algorithm is the fastest.
  • v2: higher partition targets increase overhead

Planning phase: ~2x slower for the v2 algorithm
Execution phase: Potentially much better parallelism that could improve overall query performance
(I think it's an expected trade-off, v2's plan cost should be acceptable, with thousands of files in time)

Scaling Behavior:

  • Both algorithms scale linearly with file count

Overlap Effects:

  • Higher overlap factors increase execution time slightly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execution phase
Would be great to have some example of this. The impact on planning time seems not super high.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to have some example of this. The impact on planning time seems not super high.

Yes, fyi: #10336 (comment)

And I think @leoyvens can ouput more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm prepared to give this a test run on some real data, by installing datafusion-cli from this branch. But I need some way to enable it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @leoyvens , I enabled it in 3666ae8

let file_schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Float64,
false,
)]));

let sort_expr = PhysicalSortExpr {
expr: Arc::new(datafusion_physical_expr::expressions::Column::new(
"value", 0,
)),
options: arrow::compute::SortOptions::default(),
};
let sort_ordering = LexOrdering::from(vec![sort_expr]);

// Small, medium, large number of files
let file_counts = [10, 100, 1000];
let overlap_factors = [0.0, 0.2, 0.5, 0.8]; // No, low, medium, high overlap

let target_partitions: [usize; 4] = [4, 8, 16, 32];

let mut group = c.benchmark_group("split_groups");
group.measurement_time(Duration::from_secs(10));

for &num_files in &file_counts {
for &overlap in &overlap_factors {
let file_groups = generate_test_files(num_files, overlap);
// Benchmark original algorithm
group.bench_with_input(
BenchmarkId::new(
"original",
format!("files={},overlap={:.1}", num_files, overlap),
),
&(
file_groups.clone(),
file_schema.clone(),
sort_ordering.clone(),
),
|b, (fg, schema, order)| {
let mut result = Vec::new();
b.iter(|| {
result =
FileScanConfig::split_groups_by_statistics(schema, fg, order)
.unwrap();
});
assert!(verify_sort_integrity(&result));
},
);

// Benchmark new algorithm with different target partitions
for &tp in &target_partitions {
group.bench_with_input(
BenchmarkId::new(
format!("v2_partitions={}", tp),
format!("files={},overlap={:.1}", num_files, overlap),
),
&(
file_groups.clone(),
file_schema.clone(),
sort_ordering.clone(),
tp,
),
|b, (fg, schema, order, target)| {
let mut result = Vec::new();
b.iter(|| {
result = FileScanConfig::split_groups_by_statistics_with_target_partitions(
schema, fg, order, *target,
)
.unwrap();
});
assert!(verify_sort_integrity(&result));
},
);
}
}
}

group.finish();
}

criterion_group!(benches, compare_split_groups_by_statistics_algorithms);
criterion_main!(benches);
Loading
Loading