diff --git a/mountpoint-s3/src/prefetch/part_queue.rs b/mountpoint-s3/src/prefetch/part_queue.rs index 30a2c264c..08778666a 100644 --- a/mountpoint-s3/src/prefetch/part_queue.rs +++ b/mountpoint-s3/src/prefetch/part_queue.rs @@ -5,16 +5,19 @@ use tracing::trace; use crate::prefetch::part::Part; use crate::prefetch::PrefetchReadError; use crate::sync::async_channel::{unbounded, Receiver, RecvError, Sender}; -use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use crate::sync::{Arc, AsyncMutex}; +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::Arc; /// A queue of [Part]s where the first part can be partially read from if the reader doesn't want /// the entire part in one shot. #[derive(Debug)] pub struct PartQueue { - current_part: AsyncMutex>, + /// The auxiliary queue that supports pushing parts to the front of the part queue in order to + /// allow partial reads and backwards seeks. + front_queue: Vec, + /// The main queue that receives parts from [super::ObjectPartStream] receiver: Receiver>>, - failed: AtomicBool, + failed: bool, /// The total number of bytes sent to the underlying queue of `self.receiver` bytes_received: Arc, } @@ -32,9 +35,9 @@ pub fn unbounded_part_queue() -> (PartQueue, PartQueueP let (sender, receiver) = unbounded(); let bytes_counter = Arc::new(AtomicUsize::new(0)); let part_queue = PartQueue { - current_part: AsyncMutex::new(None), + front_queue: Vec::new(), receiver, - failed: AtomicBool::new(false), + failed: false, bytes_received: Arc::clone(&bytes_counter), }; let part_queue_producer = PartQueueProducer { @@ -52,65 +55,48 @@ impl PartQueue { /// /// If this method returns an Err, the PartQueue must never be accessed again. pub async fn read(&mut self, length: usize) -> Result> { - let mut current_part = self.current_part.lock().await; - - assert!( - !self.failed.load(Ordering::SeqCst), - "cannot use a PartQueue after failure" - ); - - let part = if let Some(current_part) = current_part.take() { - Ok(current_part) + assert!(!self.failed, "cannot use a PartQueue after failure"); + + // Read from the auxiliary queue first if it's not empty + let part = if let Some(part) = self.front_queue.pop() { + Ok(part) + // Then do `try_recv` from the main queue so we can track whether the read is starved or not + } else if let Ok(part) = self.receiver.try_recv() { + part } else { - // Do `try_recv` first so we can track whether the read is starved or not - if let Ok(part) = self.receiver.try_recv() { - part - } else { - let start = Instant::now(); - let part = self.receiver.recv().await; - metrics::histogram!("prefetch.part_queue_starved_us").record(start.elapsed().as_micros() as f64); - match part { - Err(RecvError) => Err(PrefetchReadError::GetRequestTerminatedUnexpectedly), - Ok(part) => part, - } + let start = Instant::now(); + let part = self.receiver.recv().await; + metrics::histogram!("prefetch.part_queue_starved_us").record(start.elapsed().as_micros() as f64); + match part { + Err(RecvError) => Err(PrefetchReadError::GetRequestTerminatedUnexpectedly), + Ok(part) => part, } }; let mut part = match part { Err(e) => { - self.failed.store(true, Ordering::SeqCst); + self.failed = true; return Err(e); } Ok(part) => part, }; debug_assert!(!part.is_empty(), "parts must not be empty"); + // Split the part and push the remaining to the front of the queue if it's a partial read if length < part.len() { let tail = part.split_off(length); - *current_part = Some(tail); + self.front_queue.push(tail); } metrics::gauge!("prefetch.bytes_in_queue").decrement(part.len() as f64); Ok(part) } /// Push a new [Part] onto the front of the queue - /// which actually just concatenate it with the current part - pub async fn push_front(&self, mut part: Part) -> Result<(), PrefetchReadError> { - let part_len = part.len(); - let mut current_part = self.current_part.lock().await; - - assert!( - !self.failed.load(Ordering::SeqCst), - "cannot use a PartQueue after failure" - ); - - if let Some(current_part) = current_part.as_mut() { - part.extend(current_part)?; - *current_part = part; - } else { - *current_part = Some(part); - } - metrics::gauge!("prefetch.bytes_in_queue").increment(part_len as f64); + pub async fn push_front(&mut self, part: Part) -> Result<(), PrefetchReadError> { + assert!(!self.failed, "cannot use a PartQueue after failure"); + + metrics::gauge!("prefetch.bytes_in_queue").increment(part.len() as f64); + self.front_queue.push(part); Ok(()) } @@ -137,12 +123,7 @@ impl PartQueueProducer { impl Drop for PartQueue { fn drop(&mut self) { - let current_part = self.current_part.lock_blocking(); - let current_size = match current_part.as_ref() { - Some(part) => part.len(), - None => 0, - }; - // close the channel and drain remaining parts + // close the channel and drain remaining parts from the main queue self.receiver.close(); let mut queue_size = 0; while let Ok(part) = self.receiver.try_recv() { @@ -150,8 +131,11 @@ impl Drop for PartQueue { queue_size += part.len(); } } - let remaining = current_size + queue_size; - metrics::gauge!("prefetch.bytes_in_queue").decrement(remaining as f64); + // count remaining bytes in the auxiliary queue + for part in &self.front_queue { + queue_size += part.len() + } + metrics::gauge!("prefetch.bytes_in_queue").decrement(queue_size as f64); } } @@ -199,7 +183,7 @@ mod tests { current_length -= bytes.len(); } Op::ReadAligned => { - let first_part_length = part_queue.current_part.lock().await.as_ref().map(|p| p.len()); + let first_part_length = part_queue.front_queue.last().map(|p| p.len()); if let Some(n) = first_part_length { let part = part_queue.read(n).await.unwrap(); let checksummed_bytes = part.into_bytes(&part_id, current_offset).unwrap(); diff --git a/mountpoint-s3/src/prefetch/task.rs b/mountpoint-s3/src/prefetch/task.rs index c4de1e125..cd858d7bc 100644 --- a/mountpoint-s3/src/prefetch/task.rs +++ b/mountpoint-s3/src/prefetch/task.rs @@ -38,9 +38,7 @@ impl RequestTask { // Push a given list of parts in front of the part queue pub async fn push_front(&mut self, parts: Vec) -> Result<(), PrefetchReadError> { - // Merge all parts into one single part by pushing them to the front of the part queue. - // This could result in a really big part, but we normally use this only for backward seek - // so its size should not be bigger than the prefetcher's `max_backward_seek_distance`. + // Iterate backwards to push each part to the front of the part queue for part in parts.into_iter().rev() { self.remaining += part.len(); self.part_queue.push_front(part).await?;