Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk committed Sep 9, 2024
1 parent b2385d8 commit f020827
Showing 1 changed file with 32 additions and 59 deletions.
91 changes: 32 additions & 59 deletions mountpoint-s3/src/prefetch/part_queue.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
use std::collections::VecDeque;
use std::time::Instant;

use tracing::trace;

use crate::prefetch::part::Part;
use crate::prefetch::PrefetchReadError;
use crate::sync::async_channel::{unbounded, Receiver, RecvError, Sender};
use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use crate::sync::{Arc, AsyncMutex};
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::Arc;

/// A queue of [Part]s where the first part can be partially read from if the reader doesn't want
/// the entire part in one shot.
#[derive(Debug)]
pub struct PartQueue<E: std::error::Error> {
current_part: AsyncMutex<Option<Part>>,
/// The auxiliary queue that supports pushing parts to the front of the part queue in order to
/// allow backward seek.
front_queue: VecDeque<Part>,
/// allow partial reads and backwards seeks.
front_queue: Vec<Part>,
/// The main queue that receives parts from [super::ObjectPartStream]
rear_queue: Receiver<Result<Part, PrefetchReadError<E>>>,
failed: AtomicBool,
receiver: Receiver<Result<Part, PrefetchReadError<E>>>,
failed: bool,
/// The total number of bytes sent to the underlying queue of `self.receiver`
bytes_received: Arc<AtomicUsize>,
}
Expand All @@ -37,10 +35,9 @@ pub fn unbounded_part_queue<E: std::error::Error>() -> (PartQueue<E>, PartQueueP
let (sender, receiver) = unbounded();
let bytes_counter = Arc::new(AtomicUsize::new(0));
let part_queue = PartQueue {
current_part: AsyncMutex::new(None),
front_queue: VecDeque::new(),
rear_queue: receiver,
failed: AtomicBool::new(false),
front_queue: Vec::new(),
receiver,
failed: false,
bytes_received: Arc::clone(&bytes_counter),
};
let part_queue_producer = PartQueueProducer {
Expand All @@ -58,66 +55,48 @@ impl<E: std::error::Error + Send + Sync> PartQueue<E> {
///
/// If this method returns an Err, the PartQueue must never be accessed again.
pub async fn read(&mut self, length: usize) -> Result<Part, PrefetchReadError<E>> {
let mut current_part = self.current_part.lock().await;

assert!(
!self.failed.load(Ordering::SeqCst),
"cannot use a PartQueue after failure"
);

let part = if let Some(current_part) = current_part.take() {
Ok(current_part)
assert!(!self.failed, "cannot use a PartQueue after failure");

// Read from the auxiliary queue first if it's not empty
let part = if let Some(part) = self.front_queue.pop() {
Ok(part)
// Then do `try_recv` from the main queue so we can track whether the read is starved or not
} else if let Ok(part) = self.receiver.try_recv() {
part
} else {
// Read from the auxiliary queue first if it's not empty
if !self.front_queue.is_empty() {
Ok(self
.front_queue
.pop_front()
.expect("checked above that the queue is not empty"))
// Then do `try_recv` from the main queue so we can track whether the read is starved or not
} else if let Ok(part) = self.rear_queue.try_recv() {
part
} else {
let start = Instant::now();
let part = self.rear_queue.recv().await;
metrics::histogram!("prefetch.part_queue_starved_us").record(start.elapsed().as_micros() as f64);
match part {
Err(RecvError) => Err(PrefetchReadError::GetRequestTerminatedUnexpectedly),
Ok(part) => part,
}
let start = Instant::now();
let part = self.receiver.recv().await;
metrics::histogram!("prefetch.part_queue_starved_us").record(start.elapsed().as_micros() as f64);
match part {
Err(RecvError) => Err(PrefetchReadError::GetRequestTerminatedUnexpectedly),
Ok(part) => part,
}
};

let mut part = match part {
Err(e) => {
self.failed.store(true, Ordering::SeqCst);
self.failed = true;
return Err(e);
}
Ok(part) => part,
};
debug_assert!(!part.is_empty(), "parts must not be empty");

// Split the part and push the remaining to the front of the queue if it's a partial read
if length < part.len() {
let tail = part.split_off(length);
*current_part = Some(tail);
self.front_queue.push(tail);
}
metrics::gauge!("prefetch.bytes_in_queue").decrement(part.len() as f64);
Ok(part)
}

/// Push a new [Part] onto the front of the queue
pub async fn push_front(&mut self, part: Part) -> Result<(), PrefetchReadError<E>> {
let mut current_part = self.current_part.lock().await;
if let Some(current_part) = current_part.take() {
self.front_queue.push_front(current_part);
}
assert!(
!self.failed.load(Ordering::SeqCst),
"cannot use a PartQueue after failure"
);
assert!(!self.failed, "cannot use a PartQueue after failure");

metrics::gauge!("prefetch.bytes_in_queue").increment(part.len() as f64);
self.front_queue.push_front(part);
self.front_queue.push(part);
Ok(())
}

Expand All @@ -144,15 +123,10 @@ impl<E: std::error::Error + Send + Sync> PartQueueProducer<E> {

impl<E: std::error::Error> Drop for PartQueue<E> {
fn drop(&mut self) {
let current_part = self.current_part.lock_blocking();
let current_size = match current_part.as_ref() {
Some(part) => part.len(),
None => 0,
};
// close the channel and drain remaining parts from the main queue
self.rear_queue.close();
self.receiver.close();
let mut queue_size = 0;
while let Ok(part) = self.rear_queue.try_recv() {
while let Ok(part) = self.receiver.try_recv() {
if let Ok(part) = part {
queue_size += part.len();
}
Expand All @@ -161,8 +135,7 @@ impl<E: std::error::Error> Drop for PartQueue<E> {
for part in &self.front_queue {
queue_size += part.len()
}
let remaining = current_size + queue_size;
metrics::gauge!("prefetch.bytes_in_queue").decrement(remaining as f64);
metrics::gauge!("prefetch.bytes_in_queue").decrement(queue_size as f64);
}
}

Expand Down Expand Up @@ -210,7 +183,7 @@ mod tests {
current_length -= bytes.len();
}
Op::ReadAligned => {
let first_part_length = part_queue.current_part.lock().await.as_ref().map(|p| p.len());
let first_part_length = part_queue.front_queue.last().map(|p| p.len());
if let Some(n) = first_part_length {
let part = part_queue.read(n).await.unwrap();
let checksummed_bytes = part.into_bytes(&part_id, current_offset).unwrap();
Expand Down

0 comments on commit f020827

Please sign in to comment.