From 5d546cf8ab0d4f2e2fb8345eea0a935072c94cd3 Mon Sep 17 00:00:00 2001 From: Xinye Date: Wed, 25 Oct 2023 15:33:54 +0800 Subject: [PATCH] address comment Signed-off-by: Xinye --- src/lib.rs | 55 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d1d497a..cf270fb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -867,7 +867,7 @@ macro_rules! fail_point { #[cfg(feature = "async")] mod async_imp { use super::*; - type BoxFuture<'a, T> = Pin + Send + 'a, Global>>; + type BoxFuture<'a, T> = std::pin::Pin + Send + 'a>>; #[derive(Clone)] pub(crate) struct AsyncCallback( @@ -997,9 +997,16 @@ mod async_imp { match task { Task::Off => {} Task::Return(s) => return Some(s), - Task::Sleep(_) => panic!( - "fail does not support async sleep, please use a async closure to sleep." - ), + Task::Sleep(t) => { + let not = Arc::new(tokio::sync::Notify::new()); + let not_for_thread = not.clone(); + let handle = std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(t)); + not_for_thread.notify_waiters(); + }); + not.notified().await; + handle.join().unwrap(); + } Task::Panic(msg) => match msg { Some(ref msg) => panic!("{}", msg), None => panic!("failpoint {} panic", name), @@ -1010,9 +1017,11 @@ mod async_imp { }, Task::Pause => unreachable!(), Task::Yield => thread::yield_now(), - Task::Delay(_) => panic!( - "fail does not support async delay, please use a async closure to delay." - ), + Task::Delay(t) => { + let timer = Instant::now(); + let timeout = Duration::from_millis(t); + while timer.elapsed() < timeout {} + } Task::Callback(f) => { f.run(); } @@ -1242,19 +1251,17 @@ mod tests { #[cfg(feature = "async")] #[cfg_attr(not(feature = "failpoints"), ignore)] #[tokio::test] - async fn test_async_failpoint() { - use std::time::Duration; - + async fn test_async_failpoints() { let f1 = async { - async_fail_point!("cb"); + async_fail_point!("async_cb"); }; let f2 = async { - async_fail_point!("cb"); + async_fail_point!("async_cb"); }; let counter = Arc::new(AtomicUsize::new(0)); let counter2 = counter.clone(); - cfg_async_callback("cb", move || { + cfg_async_callback("async_cb", move || { counter2.fetch_add(1, Ordering::SeqCst); Box::pin(async move { tokio::time::sleep(Duration::from_millis(10)).await; @@ -1265,19 +1272,35 @@ mod tests { f2.await; assert_eq!(2, counter.load(Ordering::SeqCst)); - cfg("pause", "pause").unwrap(); + cfg("async_pause", "pause").unwrap(); let (tx, mut rx) = tokio::sync::mpsc::channel(1); let handle = tokio::spawn(async move { - async_fail_point!("pause"); + async_fail_point!("async_pause"); tx.send(()).await.unwrap(); }); tokio::time::timeout(Duration::from_millis(500), rx.recv()) .await .unwrap_err(); - remove("pause"); + remove("async_pause"); tokio::time::timeout(Duration::from_millis(500), rx.recv()) .await .unwrap(); handle.await.unwrap(); + + cfg("async_sleep", "sleep(500)").unwrap(); + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let handle = tokio::spawn(async move { + tx.send(()).await.unwrap(); + async_fail_point!("async_sleep"); + tx.send(()).await.unwrap(); + }); + rx.recv().await.unwrap(); + tokio::time::timeout(Duration::from_millis(300), rx.recv()) + .await + .unwrap_err(); + tokio::time::timeout(Duration::from_millis(300), rx.recv()) + .await + .unwrap(); + handle.await.unwrap(); } }