From f0208276e8e41da52399c3b5f89be01388c37b74 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Mon, 9 Sep 2024 08:43:37 +0000 Subject: [PATCH] PR comments Signed-off-by: Monthon Klongklaew --- mountpoint-s3/src/prefetch/part_queue.rs | 91 +++++++++--------------- 1 file changed, 32 insertions(+), 59 deletions(-) diff --git a/mountpoint-s3/src/prefetch/part_queue.rs b/mountpoint-s3/src/prefetch/part_queue.rs index 5bae5bb86..08778666a 100644 --- a/mountpoint-s3/src/prefetch/part_queue.rs +++ b/mountpoint-s3/src/prefetch/part_queue.rs @@ -1,4 +1,3 @@ -use std::collections::VecDeque; use std::time::Instant; use tracing::trace; @@ -6,20 +5,19 @@ use tracing::trace; use crate::prefetch::part::Part; use crate::prefetch::PrefetchReadError; use crate::sync::async_channel::{unbounded, Receiver, RecvError, Sender}; -use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use crate::sync::{Arc, AsyncMutex}; +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::Arc; /// A queue of [Part]s where the first part can be partially read from if the reader doesn't want /// the entire part in one shot. #[derive(Debug)] pub struct PartQueue { - current_part: AsyncMutex>, /// The auxiliary queue that supports pushing parts to the front of the part queue in order to - /// allow backward seek. - front_queue: VecDeque, + /// allow partial reads and backwards seeks. + front_queue: Vec, /// The main queue that receives parts from [super::ObjectPartStream] - rear_queue: Receiver>>, - failed: AtomicBool, + receiver: Receiver>>, + failed: bool, /// The total number of bytes sent to the underlying queue of `self.receiver` bytes_received: Arc, } @@ -37,10 +35,9 @@ pub fn unbounded_part_queue() -> (PartQueue, PartQueueP let (sender, receiver) = unbounded(); let bytes_counter = Arc::new(AtomicUsize::new(0)); let part_queue = PartQueue { - current_part: AsyncMutex::new(None), - front_queue: VecDeque::new(), - rear_queue: receiver, - failed: AtomicBool::new(false), + front_queue: Vec::new(), + receiver, + failed: false, bytes_received: Arc::clone(&bytes_counter), }; let part_queue_producer = PartQueueProducer { @@ -58,48 +55,37 @@ impl PartQueue { /// /// If this method returns an Err, the PartQueue must never be accessed again. pub async fn read(&mut self, length: usize) -> Result> { - let mut current_part = self.current_part.lock().await; - - assert!( - !self.failed.load(Ordering::SeqCst), - "cannot use a PartQueue after failure" - ); - - let part = if let Some(current_part) = current_part.take() { - Ok(current_part) + assert!(!self.failed, "cannot use a PartQueue after failure"); + + // Read from the auxiliary queue first if it's not empty + let part = if let Some(part) = self.front_queue.pop() { + Ok(part) + // Then do `try_recv` from the main queue so we can track whether the read is starved or not + } else if let Ok(part) = self.receiver.try_recv() { + part } else { - // Read from the auxiliary queue first if it's not empty - if !self.front_queue.is_empty() { - Ok(self - .front_queue - .pop_front() - .expect("checked above that the queue is not empty")) - // Then do `try_recv` from the main queue so we can track whether the read is starved or not - } else if let Ok(part) = self.rear_queue.try_recv() { - part - } else { - let start = Instant::now(); - let part = self.rear_queue.recv().await; - metrics::histogram!("prefetch.part_queue_starved_us").record(start.elapsed().as_micros() as f64); - match part { - Err(RecvError) => Err(PrefetchReadError::GetRequestTerminatedUnexpectedly), - Ok(part) => part, - } + let start = Instant::now(); + let part = self.receiver.recv().await; + metrics::histogram!("prefetch.part_queue_starved_us").record(start.elapsed().as_micros() as f64); + match part { + Err(RecvError) => Err(PrefetchReadError::GetRequestTerminatedUnexpectedly), + Ok(part) => part, } }; let mut part = match part { Err(e) => { - self.failed.store(true, Ordering::SeqCst); + self.failed = true; return Err(e); } Ok(part) => part, }; debug_assert!(!part.is_empty(), "parts must not be empty"); + // Split the part and push the remaining to the front of the queue if it's a partial read if length < part.len() { let tail = part.split_off(length); - *current_part = Some(tail); + self.front_queue.push(tail); } metrics::gauge!("prefetch.bytes_in_queue").decrement(part.len() as f64); Ok(part) @@ -107,17 +93,10 @@ impl PartQueue { /// Push a new [Part] onto the front of the queue pub async fn push_front(&mut self, part: Part) -> Result<(), PrefetchReadError> { - let mut current_part = self.current_part.lock().await; - if let Some(current_part) = current_part.take() { - self.front_queue.push_front(current_part); - } - assert!( - !self.failed.load(Ordering::SeqCst), - "cannot use a PartQueue after failure" - ); + assert!(!self.failed, "cannot use a PartQueue after failure"); metrics::gauge!("prefetch.bytes_in_queue").increment(part.len() as f64); - self.front_queue.push_front(part); + self.front_queue.push(part); Ok(()) } @@ -144,15 +123,10 @@ impl PartQueueProducer { impl Drop for PartQueue { fn drop(&mut self) { - let current_part = self.current_part.lock_blocking(); - let current_size = match current_part.as_ref() { - Some(part) => part.len(), - None => 0, - }; // close the channel and drain remaining parts from the main queue - self.rear_queue.close(); + self.receiver.close(); let mut queue_size = 0; - while let Ok(part) = self.rear_queue.try_recv() { + while let Ok(part) = self.receiver.try_recv() { if let Ok(part) = part { queue_size += part.len(); } @@ -161,8 +135,7 @@ impl Drop for PartQueue { for part in &self.front_queue { queue_size += part.len() } - let remaining = current_size + queue_size; - metrics::gauge!("prefetch.bytes_in_queue").decrement(remaining as f64); + metrics::gauge!("prefetch.bytes_in_queue").decrement(queue_size as f64); } } @@ -210,7 +183,7 @@ mod tests { current_length -= bytes.len(); } Op::ReadAligned => { - let first_part_length = part_queue.current_part.lock().await.as_ref().map(|p| p.len()); + let first_part_length = part_queue.front_queue.last().map(|p| p.len()); if let Some(n) = first_part_length { let part = part_queue.read(n).await.unwrap(); let checksummed_bytes = part.into_bytes(&part_id, current_offset).unwrap();