diff --git a/rocketmq/src/blocking_queue.rs b/rocketmq/src/blocking_queue.rs index 7a38cee4..920de974 100644 --- a/rocketmq/src/blocking_queue.rs +++ b/rocketmq/src/blocking_queue.rs @@ -71,3 +71,48 @@ impl BlockingQueue { time::timeout(timeout, self.take()).await.ok() } } + +#[cfg(test)] +mod tests { + use tokio::time::Duration; + + use super::*; + + #[tokio::test] + async fn put_item_in_queue() { + let queue = BlockingQueue::new(2); + queue.put(1).await; + let item = queue.take().await; + assert_eq!(item, 1); + } + + #[tokio::test] + async fn offer_item_within_timeout() { + let queue = BlockingQueue::new(1); + let result = queue.offer(1, Duration::from_millis(100)).await; + assert!(result); + } + + #[tokio::test] + async fn offer_item_exceeds_timeout() { + let queue = BlockingQueue::new(1); + queue.put(1).await; + let result = queue.offer(2, Duration::from_millis(100)).await; + assert!(!result); + } + + #[tokio::test] + async fn poll_item_within_timeout() { + let queue = BlockingQueue::new(1); + queue.put(1).await; + let item = queue.poll(Duration::from_millis(100)).await; + assert_eq!(item, Some(1)); + } + + #[tokio::test] + async fn poll_item_exceeds_timeout() { + let queue = BlockingQueue::<()>::new(1); + let item = queue.poll(Duration::from_millis(100)).await; + assert_eq!(item, None); + } +}