From 576ad1745603c0d97ee20e1833092845f197afde Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 3 Apr 2025 17:13:01 +0300 Subject: [PATCH 1/3] perf: replace `merge` `uninitiated_partitions` `VecDeque` with custom fixed size queue this is done to avoid having to clone or change the actual values --- datafusion/physical-plan/src/sorts/merge.rs | 154 ++++++++++++++++++-- 1 file changed, 143 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 1c2b8cd0c91b..d8e079f0c48e 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -18,7 +18,6 @@ //! Merge that deals with an arbitrary size of streaming inputs. //! This is an order-preserving merge. -use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -147,7 +146,7 @@ pub(crate) struct SortPreservingMergeStream { /// it is removed from the vector. If a partition returns `Poll::Pending`, it is moved to the end of the /// vector to ensure the next iteration starts with a different partition, preventing the same partition /// from being continuously polled. - uninitiated_partitions: VecDeque, + uninitiated_partitions: FixedSizeQueue, } impl SortPreservingMergeStream { @@ -178,7 +177,9 @@ impl SortPreservingMergeStream { batch_size, fetch, produced: 0, - uninitiated_partitions: (0..stream_count).collect(), + uninitiated_partitions: FixedSizeQueue::from( + (0..stream_count).collect::>(), + ), enable_round_robin_tie_breaker, } } @@ -217,9 +218,8 @@ impl SortPreservingMergeStream { // we skip the following block. Until then, this function may be called multiple // times and can return Poll::Pending if any partition returns Poll::Pending. if self.loser_tree.is_empty() { - let remaining_partitions = self.uninitiated_partitions.clone(); - for i in remaining_partitions { - match self.maybe_poll_stream(cx, i) { + while let Some(partition_idx) = self.uninitiated_partitions.peek() { + match self.maybe_poll_stream(cx, partition_idx) { Poll::Ready(Err(e)) => { self.aborted = true; return Poll::Ready(Some(Err(e))); @@ -228,10 +228,8 @@ impl SortPreservingMergeStream { // If a partition returns Poll::Pending, to avoid continuously polling it // and potentially increasing upstream buffer sizes, we move it to the // back of the polling queue. - if let Some(front) = self.uninitiated_partitions.pop_front() { - // This pop_front can never return `None`. - self.uninitiated_partitions.push_back(front); - } + self.uninitiated_partitions.rotate(); + // This function could remain in a pending state, so we manually wake it here. // However, this approach can be investigated further to find a more natural way // to avoid disrupting the runtime scheduler. @@ -241,10 +239,13 @@ impl SortPreservingMergeStream { _ => { // If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None), // we remove this partition from the queue so it is not polled again. - self.uninitiated_partitions.retain(|idx| *idx != i); + self.uninitiated_partitions.pop_front(); } } } + + // Claim the memory for the uninitiated partitions + std::mem::take(&mut self.uninitiated_partitions); self.init_loser_tree(); } @@ -533,3 +534,134 @@ impl RecordBatchStream for SortPreservingMergeStream Arc::clone(self.in_progress.schema()) } } + +/// Fixed size queue implemented as a circular buffer +/// The underlying `values` are immutable, so removing elements will not drop them and not change the +/// capacity/length of the actual vector +#[derive(Debug, Default)] +struct FixedSizeQueue { + values: Vec, + start: usize, + end: usize, + len: usize, +} + +impl From> for FixedSizeQueue { + fn from(values: Vec) -> Self { + let len = values.len(); + + Self { + values, + start: 0, + end: len.saturating_sub(1), + len, + } + } +} + +impl FixedSizeQueue { + /// Get the value at the top of the queue + /// + /// # Implementation + /// return the value at [`Self::start`] + /// + /// ## Example + /// + /// ```plain + /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + /// | -------------------------------------------------------- | + /// value: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + /// | -------------------------------------------------------- | + /// ^ ^ + /// end start + /// ``` + /// calling `peek()` will return `Some(2)` + /// + fn peek(&self) -> Option { + if self.len == 0 { + None + } else { + Some(self.values[self.start]) + } + } + + /// Move the current value to last + /// + /// # Implementation + /// + /// Swap the `self.values[start]` with `self.values[end + 1]` value and advance both by 1 + /// (wrapping at the `end + 1` around the underlying vector length) + /// + /// ## Example + /// + /// **Current state:** + /// ```plain + /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + /// | -------------------------------------------------------- | + /// value: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + /// | -------------------------------------------------------- | + /// ^ ^ + /// end start + /// ``` + /// + /// So calling `move_to_last` on the example above will produce this: + /// ```plain + /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + /// | -------------------------------------------------------- | + /// value: | 0 | 2 | 1 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + /// | -------------------------------------------------------- | + /// ^ ^ + /// end start + /// ``` + /// + /// # Panics + /// Must not be empty (if `self.len() == 0`) + /// + fn rotate(&mut self) { + assert_ne!(self.len, 0, "No current value"); + + // Wrap around the vector + self.end = (self.end + 1) % self.values.len(); + + // Swap the current with the last + self.values.swap(self.start, self.end); + self.start = (self.start + 1) % self.values.len(); + } + + /// Remove the value at the top of the queue + /// + /// # Implementation + /// + /// Advance [`Self::start`] by 1 (wrapping around the underlying vector length) + /// + /// # Example + /// + /// **Current state:** + /// ```plain + /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + /// | -------------------------------------------------------- | + /// value: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + /// | -------------------------------------------------------- | + /// ^ ^ + /// end start + /// ``` + /// + /// So calling `pop_front` on the example above will produce this: + /// ```plain + /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + /// | -------------------------------------------------------- | + /// value: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | + /// | -------------------------------------------------------- | + /// ^ ^ + /// end start + /// ``` + /// + /// # Panics + /// Must not be empty (if `self.len() == 0`) + /// + fn pop_front(&mut self) { + assert_ne!(self.len, 0, "No current value"); + self.start = (self.start + 1) % self.values.len(); + self.len -= 1; + } +} From bcbed133ee6a8469b026cdbcbe555ddff094501d Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 3 Apr 2025 23:37:18 +0300 Subject: [PATCH 2/3] revert use of VecDeque but use better api --- datafusion/physical-plan/src/sorts/merge.rs | 144 +------------------- 1 file changed, 6 insertions(+), 138 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index d8e079f0c48e..402f337743c7 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -18,6 +18,7 @@ //! Merge that deals with an arbitrary size of streaming inputs. //! This is an order-preserving merge. +use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -146,7 +147,7 @@ pub(crate) struct SortPreservingMergeStream { /// it is removed from the vector. If a partition returns `Poll::Pending`, it is moved to the end of the /// vector to ensure the next iteration starts with a different partition, preventing the same partition /// from being continuously polled. - uninitiated_partitions: FixedSizeQueue, + uninitiated_partitions: VecDeque, } impl SortPreservingMergeStream { @@ -177,9 +178,7 @@ impl SortPreservingMergeStream { batch_size, fetch, produced: 0, - uninitiated_partitions: FixedSizeQueue::from( - (0..stream_count).collect::>(), - ), + uninitiated_partitions: (0..stream_count).collect(), enable_round_robin_tie_breaker, } } @@ -218,7 +217,7 @@ impl SortPreservingMergeStream { // we skip the following block. Until then, this function may be called multiple // times and can return Poll::Pending if any partition returns Poll::Pending. if self.loser_tree.is_empty() { - while let Some(partition_idx) = self.uninitiated_partitions.peek() { + while let Some(&partition_idx) = self.uninitiated_partitions.front() { match self.maybe_poll_stream(cx, partition_idx) { Poll::Ready(Err(e)) => { self.aborted = true; @@ -228,7 +227,7 @@ impl SortPreservingMergeStream { // If a partition returns Poll::Pending, to avoid continuously polling it // and potentially increasing upstream buffer sizes, we move it to the // back of the polling queue. - self.uninitiated_partitions.rotate(); + self.uninitiated_partitions.rotate_left(1); // This function could remain in a pending state, so we manually wake it here. // However, this approach can be investigated further to find a more natural way @@ -245,7 +244,7 @@ impl SortPreservingMergeStream { } // Claim the memory for the uninitiated partitions - std::mem::take(&mut self.uninitiated_partitions); + self.uninitiated_partitions = VecDeque::new(); self.init_loser_tree(); } @@ -534,134 +533,3 @@ impl RecordBatchStream for SortPreservingMergeStream Arc::clone(self.in_progress.schema()) } } - -/// Fixed size queue implemented as a circular buffer -/// The underlying `values` are immutable, so removing elements will not drop them and not change the -/// capacity/length of the actual vector -#[derive(Debug, Default)] -struct FixedSizeQueue { - values: Vec, - start: usize, - end: usize, - len: usize, -} - -impl From> for FixedSizeQueue { - fn from(values: Vec) -> Self { - let len = values.len(); - - Self { - values, - start: 0, - end: len.saturating_sub(1), - len, - } - } -} - -impl FixedSizeQueue { - /// Get the value at the top of the queue - /// - /// # Implementation - /// return the value at [`Self::start`] - /// - /// ## Example - /// - /// ```plain - /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | - /// | -------------------------------------------------------- | - /// value: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | - /// | -------------------------------------------------------- | - /// ^ ^ - /// end start - /// ``` - /// calling `peek()` will return `Some(2)` - /// - fn peek(&self) -> Option { - if self.len == 0 { - None - } else { - Some(self.values[self.start]) - } - } - - /// Move the current value to last - /// - /// # Implementation - /// - /// Swap the `self.values[start]` with `self.values[end + 1]` value and advance both by 1 - /// (wrapping at the `end + 1` around the underlying vector length) - /// - /// ## Example - /// - /// **Current state:** - /// ```plain - /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | - /// | -------------------------------------------------------- | - /// value: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | - /// | -------------------------------------------------------- | - /// ^ ^ - /// end start - /// ``` - /// - /// So calling `move_to_last` on the example above will produce this: - /// ```plain - /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | - /// | -------------------------------------------------------- | - /// value: | 0 | 2 | 1 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | - /// | -------------------------------------------------------- | - /// ^ ^ - /// end start - /// ``` - /// - /// # Panics - /// Must not be empty (if `self.len() == 0`) - /// - fn rotate(&mut self) { - assert_ne!(self.len, 0, "No current value"); - - // Wrap around the vector - self.end = (self.end + 1) % self.values.len(); - - // Swap the current with the last - self.values.swap(self.start, self.end); - self.start = (self.start + 1) % self.values.len(); - } - - /// Remove the value at the top of the queue - /// - /// # Implementation - /// - /// Advance [`Self::start`] by 1 (wrapping around the underlying vector length) - /// - /// # Example - /// - /// **Current state:** - /// ```plain - /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | - /// | -------------------------------------------------------- | - /// value: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | - /// | -------------------------------------------------------- | - /// ^ ^ - /// end start - /// ``` - /// - /// So calling `pop_front` on the example above will produce this: - /// ```plain - /// index: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | - /// | -------------------------------------------------------- | - /// value: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | - /// | -------------------------------------------------------- | - /// ^ ^ - /// end start - /// ``` - /// - /// # Panics - /// Must not be empty (if `self.len() == 0`) - /// - fn pop_front(&mut self) { - assert_ne!(self.len, 0, "No current value"); - self.start = (self.start + 1) % self.values.len(); - self.len -= 1; - } -} From 586d7fb37938796a2ac9ea49f17f6c6816d57f6f Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 3 Apr 2025 23:39:27 +0300 Subject: [PATCH 3/3] change to shrink to fit --- datafusion/physical-plan/src/sorts/merge.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 402f337743c7..2b42457635f7 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -244,7 +244,7 @@ impl SortPreservingMergeStream { } // Claim the memory for the uninitiated partitions - self.uninitiated_partitions = VecDeque::new(); + self.uninitiated_partitions.shrink_to_fit(); self.init_loser_tree(); }