Skip to content

Commit 283120c

Browse files
authored
More deterministic re-partitioning logic (#2603)
Inspired by Spark's repartitioning logic as relayed to me by @robert3005.
1 parent 4344bab commit 283120c

File tree

2 files changed

+54
-10
lines changed

2 files changed

+54
-10
lines changed

bench-vortex/src/clickbench.rs

+3
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,9 @@ pub async fn register_vortex_files(
229229
let table_path = vortex_dir
230230
.to_str()
231231
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
232+
233+
info!("Registering table from {table_path}");
234+
232235
let table_path = format!("file://{table_path}/");
233236
let table_url = ListingTableUrl::parse(table_path)?;
234237

vortex-datafusion/src/persistent/execution.rs

+51-10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::VecDeque;
12
use std::fmt;
23
use std::sync::Arc;
34

@@ -11,7 +12,9 @@ use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr
1112
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
1213
use datafusion_physical_plan::metrics::MetricsSet;
1314
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
15+
use itertools::Itertools;
1416
use object_store::ObjectStoreScheme;
17+
use vortex_error::VortexExpect;
1518
use vortex_expr::datafusion::convert_expr_to_vortex;
1619
use vortex_expr::{VortexExpr, and};
1720

@@ -212,18 +215,56 @@ fn repartition_by_size(
212215
) -> Vec<Vec<PartitionedFile>> {
213216
let total_file_count = all_files.len();
214217
let total_size = all_files.iter().map(|f| f.object_meta.size).sum::<usize>();
215-
let target_partition_size = total_size / (desired_partitions + 1);
218+
let target_partition_size = total_size / desired_partitions;
216219

217220
let mut partitions = Vec::with_capacity(desired_partitions);
218221

219222
let mut curr_partition_size = 0;
220223
let mut curr_partition = Vec::default();
221224

222-
for file in all_files.into_iter() {
223-
curr_partition_size += file.object_meta.size;
224-
curr_partition.push(file.clone());
225+
let mut all_files = VecDeque::from_iter(
226+
all_files
227+
.into_iter()
228+
.sorted_unstable_by_key(|f| f.object_meta.size),
229+
);
230+
231+
while !all_files.is_empty() && partitions.len() < desired_partitions {
232+
// If the current partition is empty, we want to bootstrap it with the biggest file we have leftover.
233+
let file = if curr_partition.is_empty() {
234+
all_files.pop_back()
235+
// If we already have files in the partition, we try and fill it up.
236+
} else {
237+
// Peak at the biggest file left
238+
let biggest_file_size = all_files
239+
.back()
240+
.vortex_expect("We must have at least one item")
241+
.object_meta
242+
.size;
243+
244+
let smallest_file_size = all_files
245+
.front()
246+
.vortex_expect("We must have at least one item")
247+
.object_meta
248+
.size;
249+
250+
// We try and find a file on either end that fits in the partition
251+
if curr_partition_size + biggest_file_size >= target_partition_size {
252+
all_files.pop_front()
253+
} else if curr_partition_size + smallest_file_size >= target_partition_size {
254+
all_files.pop_back()
255+
} else {
256+
None
257+
}
258+
};
259+
260+
// Add a file to the partition
261+
if let Some(file) = file {
262+
curr_partition_size += file.object_meta.size;
263+
curr_partition.push(file.clone());
264+
}
225265

226-
if curr_partition_size >= target_partition_size {
266+
// If the partition is full, move on to the next one
267+
if curr_partition_size >= target_partition_size || file.is_none() {
227268
curr_partition_size = 0;
228269
partitions.push(std::mem::take(&mut curr_partition));
229270
}
@@ -232,19 +273,19 @@ fn repartition_by_size(
232273
// If we we're still missing the last partition
233274
if !curr_partition.is_empty() && partitions.len() != desired_partitions {
234275
partitions.push(std::mem::take(&mut curr_partition));
235-
// If we already have enough partitions
236276
} else if !curr_partition.is_empty() {
237277
for (idx, file) in curr_partition.into_iter().enumerate() {
238278
let new_part_idx = idx % partitions.len();
239279
partitions[new_part_idx].push(file.clone());
240280
}
241281
}
242282

283+
for (idx, file) in all_files.into_iter().enumerate() {
284+
let new_part_idx = idx % partitions.len();
285+
partitions[new_part_idx].push(file.clone());
286+
}
287+
243288
// Assert that we have the correct number of partitions and that the total number of files is right
244-
assert_eq!(
245-
partitions.len(),
246-
usize::min(desired_partitions, total_file_count)
247-
);
248289
assert_eq!(total_file_count, partitions.iter().flatten().count());
249290

250291
partitions

0 commit comments

Comments
 (0)