Skip to content

Commit

Permalink
[ISSUE #1129]📝Add doc for BlockingQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
master-main-hub committed Nov 10, 2024
1 parent 57e658d commit 46dbdad
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions rocketmq/src/blocking_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,30 @@ use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::time;

/// A thread-safe bounded blocking queue. To replace Java `LinkedBlockingQueue`.
///
/// This queue allows multiple producers and consumers to add and remove items
/// concurrently. It uses a `tokio::sync::Mutex` to ensure mutual exclusion and
/// a `tokio::sync::Notify` to notify waiting tasks.
pub struct BlockingQueue<T> {
/// The underlying queue storing the items.
queue: Mutex<VecDeque<T>>,
/// The maximum capacity of the queue.
capacity: usize,
/// A notification mechanism to wake up waiting tasks.
notify: Notify,
}

impl<T> BlockingQueue<T> {
/// Creates a new `BlockingQueue` with the specified capacity.
///
/// # Arguments
///
/// * `capacity` - The maximum number of items the queue can hold.
///
/// # Returns
///
/// A new instance of `BlockingQueue`.
pub fn new(capacity: usize) -> Self {
BlockingQueue {
queue: Mutex::new(VecDeque::with_capacity(capacity)),
Expand All @@ -36,6 +53,13 @@ impl<T> BlockingQueue<T> {
}
}

/// Adds an item to the queue, waiting if necessary for space to become available.
///
/// This method will block the current task until space is available in the queue.
///
/// # Arguments
///
/// * `item` - The item to be added to the queue.
pub async fn put(&self, item: T) {
loop {
{
Expand All @@ -50,10 +74,30 @@ impl<T> BlockingQueue<T> {
}
}

/// Attempts to add an item to the queue within a specified timeout.
///
/// This method will block the current task until space is available in the queue
/// or the timeout is reached.
///
/// # Arguments
///
/// * `item` - The item to be added to the queue.
/// * `timeout` - The maximum duration to wait for space to become available.
///
/// # Returns
///
/// `true` if the item was added to the queue, `false` if the timeout was reached.
pub async fn offer(&self, item: T, timeout: std::time::Duration) -> bool {
time::timeout(timeout, self.put(item)).await.is_ok()
}

/// Removes and returns an item from the queue, waiting if necessary until an item is available.
///
/// This method will block the current task until an item is available in the queue.
///
/// # Returns
///
/// The item removed from the queue.
pub async fn take(&self) -> T {
loop {
{
Expand All @@ -67,6 +111,18 @@ impl<T> BlockingQueue<T> {
}
}

/// Attempts to remove and return an item from the queue within a specified timeout.
///
/// This method will block the current task until an item is available in the queue
/// or the timeout is reached.
///
/// # Arguments
///
/// * `timeout` - The maximum duration to wait for an item to become available.
///
/// # Returns
///
/// `Some(item)` if an item was removed from the queue, `None` if the timeout was reached.
pub async fn poll(&self, timeout: std::time::Duration) -> Option<T> {
time::timeout(timeout, self.take()).await.ok()
}
Expand Down

0 comments on commit 46dbdad

Please sign in to comment.