Skip to content

Commit 80f0489

Browse files
authored
Perf: remove clone on uninitiated_partitions in SortPreservingMergeStream (#15562)
* perf: replace `merge` `uninitiated_partitions` `VecDeque<usize>` with custom fixed size queue this is done to avoid having to clone or change the actual values * revert use of VecDeque but use better api * change to shrink to fit
1 parent 5c31692 commit 80f0489

File tree

1 file changed

+8
-8
lines changed
  • datafusion/physical-plan/src/sorts

1 file changed

+8
-8
lines changed

datafusion/physical-plan/src/sorts/merge.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,8 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
217217
// we skip the following block. Until then, this function may be called multiple
218218
// times and can return Poll::Pending if any partition returns Poll::Pending.
219219
if self.loser_tree.is_empty() {
220-
let remaining_partitions = self.uninitiated_partitions.clone();
221-
for i in remaining_partitions {
222-
match self.maybe_poll_stream(cx, i) {
220+
while let Some(&partition_idx) = self.uninitiated_partitions.front() {
221+
match self.maybe_poll_stream(cx, partition_idx) {
223222
Poll::Ready(Err(e)) => {
224223
self.aborted = true;
225224
return Poll::Ready(Some(Err(e)));
@@ -228,10 +227,8 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
228227
// If a partition returns Poll::Pending, to avoid continuously polling it
229228
// and potentially increasing upstream buffer sizes, we move it to the
230229
// back of the polling queue.
231-
if let Some(front) = self.uninitiated_partitions.pop_front() {
232-
// This pop_front can never return `None`.
233-
self.uninitiated_partitions.push_back(front);
234-
}
230+
self.uninitiated_partitions.rotate_left(1);
231+
235232
// This function could remain in a pending state, so we manually wake it here.
236233
// However, this approach can be investigated further to find a more natural way
237234
// to avoid disrupting the runtime scheduler.
@@ -241,10 +238,13 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
241238
_ => {
242239
// If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None),
243240
// we remove this partition from the queue so it is not polled again.
244-
self.uninitiated_partitions.retain(|idx| *idx != i);
241+
self.uninitiated_partitions.pop_front();
245242
}
246243
}
247244
}
245+
246+
// Claim the memory for the uninitiated partitions
247+
self.uninitiated_partitions.shrink_to_fit();
248248
self.init_loser_tree();
249249
}
250250

0 commit comments

Comments
 (0)