Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
Signed-off-by: Xinye <[email protected]>
  • Loading branch information
Xinye committed Oct 25, 2023
1 parent 92aaa34 commit e6b4645
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 160 deletions.
7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@ edition = "2021"
exclude = ["/.github/*", "/.travis.yml", "/appveyor.yml"]

[dependencies]
futures = { version = "0.3", optional = true }
log = { version = "0.4", features = ["std"] }
once_cell = "1.9.0"
rand = "0.8"
tokio = { version = "1.32", features = [ "sync" ] }
tokio = { version = "1.32", features = ["sync"], optional = true }

[dev-dependencies]
tokio = { version = "1.32", features = [ "sync", "rt-multi-thread", "time", "macros" ] }
tokio = { version = "1.32", features = ["sync", "rt-multi-thread", "time", "macros"] }

[features]
failpoints = []
async = [ "futures" ]
async = ["tokio"]

[package.metadata.docs.rs]
all-features = true
320 changes: 164 additions & 156 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,162 +234,6 @@ use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, TryLockError};
use std::time::{Duration, Instant};
use std::{env, thread};

#[cfg(feature = "async")]
mod async_imp {
use super::*;
use futures::future::BoxFuture;

#[derive(Clone)]
pub(crate) struct AsyncCallback(
Arc<dyn Fn() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
);

impl Debug for AsyncCallback {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("AsyncCallback()")
}
}

impl PartialEq for AsyncCallback {
#[allow(clippy::vtable_address_comparisons)]
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}

impl AsyncCallback {
fn new(f: impl Fn() -> BoxFuture<'static, ()> + Send + Sync + 'static) -> AsyncCallback {
AsyncCallback(Arc::new(f))
}

async fn run(&self) {
let callback = &self.0;
callback().await;
}
}

/// `fail_point` but with support for async callback and pause.
#[macro_export]
#[cfg(all(feature = "failpoints", feature = "async"))]
macro_rules! async_fail_point {
($name:expr) => {{
$crate::async_eval($name, |_| {
panic!("Return is not supported for the fail point \"{}\"", $name);
})
.await;
}};
($name:expr, $e:expr) => {{
if let Some(res) = $crate::async_eval($name, $e).await {
return res;
}
}};
($name:expr, $cond:expr, $e:expr) => {{
if $cond {
$crate::async_fail_point!($name, $e);
}
}};
}

/// Configures an async callback to be triggered at the specified
/// failpoint. If the failpoint is not implemented using
/// `async_fail_point`, the execution will raise an exception.
pub fn cfg_async_callback<S, F>(name: S, f: F) -> Result<(), String>
where
S: Into<String>,
F: Fn() -> BoxFuture<'static, ()> + Send + Sync + 'static,
{
let mut registry = REGISTRY.registry.write().unwrap();
let p = registry
.entry(name.into())
.or_insert_with(|| Arc::new(FailPoint::new()));
let action = Action::from_async_callback(f);
let actions = vec![action];
p.set_actions("callback", actions);
Ok(())
}

#[doc(hidden)]
pub async fn async_eval<R, F: FnOnce(Option<String>) -> R>(name: &str, f: F) -> Option<R> {
let p = {
let registry = REGISTRY.registry.read().unwrap();
match registry.get(name) {
None => return None,
Some(p) => p.clone(),
}
};
p.async_eval(name).await.map(f)
}

impl Action {
#[cfg(feature = "async")]
fn from_async_callback(
f: impl Fn() -> BoxFuture<'static, ()> + Send + Sync + 'static,
) -> Action {
let task = Task::CallbackAsync(AsyncCallback::new(f));
Action {
task,
freq: 1.0,
count: None,
}
}
}

impl FailPoint {
#[cfg_attr(feature = "cargo-clippy", allow(clippy::option_option))]
async fn async_eval(&self, name: &str) -> Option<Option<String>> {
let task = {
let task = self
.actions
.read()
.unwrap()
.iter()
.filter_map(Action::get_task)
.next();
match task {
Some(Task::Pause) => {
// let n = self.async_pause_notify.clone();
self.async_pause_notify.notified().await;
return None;
}
Some(t) => t,
None => return None,
}
};

match task {
Task::Off => {}
Task::Return(s) => return Some(s),
Task::Sleep(_) => panic!(
"fail does not support async sleep, please use a async closure to sleep."
),
Task::Panic(msg) => match msg {
Some(ref msg) => panic!("{}", msg),
None => panic!("failpoint {} panic", name),
},
Task::Print(msg) => match msg {
Some(ref msg) => log::info!("{}", msg),
None => log::info!("failpoint {} executed.", name),
},
Task::Pause => unreachable!(),
Task::Yield => thread::yield_now(),
Task::Delay(_) => panic!(
"fail does not support async delay, please use a async closure to delay."
),
Task::Callback(f) => {
f.run();
}
Task::CallbackAsync(f) => {
f.run().await;
}
}
None
}
}
}

#[cfg(feature = "async")]
pub use async_imp::*;

#[derive(Clone)]
struct SyncCallback(Arc<dyn Fn() + Send + Sync>);

Expand Down Expand Up @@ -1020,6 +864,170 @@ macro_rules! fail_point {
($name:expr, $cond:expr, $e:expr) => {{}};
}

#[cfg(feature = "async")]
mod async_imp {
use super::*;
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a, Global>>;

#[derive(Clone)]
pub(crate) struct AsyncCallback(
Arc<dyn Fn() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
);

impl Debug for AsyncCallback {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("AsyncCallback()")
}
}

impl PartialEq for AsyncCallback {
#[allow(clippy::vtable_address_comparisons)]
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}

impl AsyncCallback {
fn new(f: impl Fn() -> BoxFuture<'static, ()> + Send + Sync + 'static) -> AsyncCallback {
AsyncCallback(Arc::new(f))
}

async fn run(&self) {
let callback = &self.0;
callback().await;
}
}

/// `fail_point` but with support for async callback and pause.
#[macro_export]
#[cfg(feature = "failpoints")]
macro_rules! async_fail_point {
($name:expr) => {{
$crate::async_eval($name, |_| {
panic!("Return is not supported for the fail point \"{}\"", $name);
})
.await;
}};
($name:expr, $e:expr) => {{
if let Some(res) = $crate::async_eval($name, $e).await {
return res;
}
}};
($name:expr, $cond:expr, $e:expr) => {{
if $cond {
$crate::async_fail_point!($name, $e);
}
}};
}

/// Define an async fail point (disabled, see `failpoints` feature).
#[macro_export]
#[cfg(not(feature = "failpoints"))]
macro_rules! async_fail_point {
($name:expr, $e:expr) => {{}};
($name:expr) => {{}};
($name:expr, $cond:expr, $e:expr) => {{}};
}

/// Configures an async callback to be triggered at the specified
/// failpoint. If the failpoint is not implemented using
/// `async_fail_point`, the execution will raise an exception.
pub fn cfg_async_callback<S, F>(name: S, f: F) -> Result<(), String>
where
S: Into<String>,
F: Fn() -> BoxFuture<'static, ()> + Send + Sync + 'static,
{
let mut registry = REGISTRY.registry.write().unwrap();
let p = registry
.entry(name.into())
.or_insert_with(|| Arc::new(FailPoint::new()));
let action = Action::from_async_callback(f);
let actions = vec![action];
p.set_actions("callback", actions);
Ok(())
}

#[doc(hidden)]
pub async fn async_eval<R, F: FnOnce(Option<String>) -> R>(name: &str, f: F) -> Option<R> {
let p = {
let registry = REGISTRY.registry.read().unwrap();
match registry.get(name) {
None => return None,
Some(p) => p.clone(),
}
};
p.async_eval(name).await.map(f)
}

impl Action {
fn from_async_callback(
f: impl Fn() -> BoxFuture<'static, ()> + Send + Sync + 'static,
) -> Action {
let task = Task::CallbackAsync(AsyncCallback::new(f));
Action {
task,
freq: 1.0,
count: None,
}
}
}

impl FailPoint {
#[cfg_attr(feature = "cargo-clippy", allow(clippy::option_option))]
async fn async_eval(&self, name: &str) -> Option<Option<String>> {
let task = {
let task = self
.actions
.read()
.unwrap()
.iter()
.filter_map(Action::get_task)
.next();
match task {
Some(Task::Pause) => {
// let n = self.async_pause_notify.clone();
self.async_pause_notify.notified().await;
return None;
}
Some(t) => t,
None => return None,
}
};

match task {
Task::Off => {}
Task::Return(s) => return Some(s),
Task::Sleep(_) => panic!(
"fail does not support async sleep, please use a async closure to sleep."
),
Task::Panic(msg) => match msg {
Some(ref msg) => panic!("{}", msg),
None => panic!("failpoint {} panic", name),
},
Task::Print(msg) => match msg {
Some(ref msg) => log::info!("{}", msg),
None => log::info!("failpoint {} executed.", name),
},
Task::Pause => unreachable!(),
Task::Yield => thread::yield_now(),
Task::Delay(_) => panic!(
"fail does not support async delay, please use a async closure to delay."
),
Task::Callback(f) => {
f.run();
}
Task::CallbackAsync(f) => {
f.run().await;
}
}
None
}
}
}

#[cfg(feature = "async")]
pub use async_imp::*;

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit e6b4645

Please sign in to comment.