diff --git a/openraft/src/base/mod.rs b/openraft/src/base/mod.rs new file mode 100644 index 000000000..5fffeeda5 --- /dev/null +++ b/openraft/src/base/mod.rs @@ -0,0 +1,59 @@ +//! Basic types used in the Raft implementation. + +pub use serde_able::OptionalSerde; +pub use threaded::BoxAny; +pub use threaded::BoxAsyncOnceMut; +pub use threaded::BoxFuture; +pub use threaded::BoxOnce; +pub use threaded::OptionalSend; +pub use threaded::OptionalSync; + +#[cfg(not(feature = "singlethreaded"))] +mod threaded { + use std::any::Any; + use std::future::Future; + use std::pin::Pin; + + pub trait OptionalSend: Send {} + impl OptionalSend for T {} + + pub trait OptionalSync: Sync {} + impl OptionalSync for T {} + + pub type BoxFuture<'a, T = ()> = Pin + Send + 'a>>; + pub type BoxAsyncOnceMut<'a, A, T = ()> = Box BoxFuture + Send + 'a>; + pub type BoxOnce<'a, A, T = ()> = Box T + Send + 'a>; + pub type BoxAny = Box; +} + +#[cfg(feature = "singlethreaded")] +mod threaded { + use std::any::Any; + use std::future::Future; + use std::pin::Pin; + + pub trait OptionalSend {} + impl OptionalSend for T {} + + pub trait OptionalSync {} + impl OptionalSync for T {} + + pub type BoxFuture<'a, T = ()> = Pin + 'a>>; + pub type BoxAsyncOnceMut<'a, A, T = ()> = Box BoxFuture + 'a>; + pub type BoxOnce<'a, A, T = ()> = Box T + 'a>; + pub type BoxAny = Box; +} + +#[cfg(not(feature = "serde"))] +mod serde_able { + #[doc(hidden)] + pub trait OptionalSerde {} + impl OptionalSerde for T {} +} + +#[cfg(feature = "serde")] +mod serde_able { + #[doc(hidden)] + pub trait OptionalSerde: serde::Serialize + for<'a> serde::Deserialize<'a> {} + impl OptionalSerde for T where T: serde::Serialize + for<'a> serde::Deserialize<'a> {} +} diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 37b030f12..ebfab3eae 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1214,6 +1214,12 @@ where ExternalCommand::PurgeLog { upto } => { self.engine.trigger_purge_log(upto); } + ExternalCommand::StateMachineCommand { sm_cmd } => { + let res = self.sm_handle.send(sm_cmd); + if let Err(e) = res { + tracing::error!(error = display(e), "error sending sm::Command to sm::Worker"); + } + } } } }; diff --git a/openraft/src/core/raft_msg/external_command.rs b/openraft/src/core/raft_msg/external_command.rs index 5714df39c..e80aaa263 100644 --- a/openraft/src/core/raft_msg/external_command.rs +++ b/openraft/src/core/raft_msg/external_command.rs @@ -3,6 +3,7 @@ use std::fmt; use crate::core::raft_msg::ResultSender; +use crate::core::sm; use crate::RaftTypeConfig; use crate::Snapshot; @@ -31,6 +32,10 @@ pub(crate) enum ExternalCommand { /// /// [`max_in_snapshot_log_to_keep`]: `crate::Config::max_in_snapshot_log_to_keep` PurgeLog { upto: u64 }, + + /// Send a [`sm::Command`] to [`sm::worker::Worker`]. + /// This command is run in the sm task. + StateMachineCommand { sm_cmd: sm::Command }, } impl fmt::Debug for ExternalCommand @@ -61,6 +66,9 @@ where C: RaftTypeConfig ExternalCommand::PurgeLog { upto } => { write!(f, "PurgeLog[..={}]", upto) } + ExternalCommand::StateMachineCommand { sm_cmd } => { + write!(f, "StateMachineCommand: {}", sm_cmd) + } } } } diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index 64cd0b2ea..d56a65dd9 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -1,13 +1,13 @@ use std::collections::BTreeMap; use std::fmt; +use crate::base::BoxOnce; use crate::core::raft_msg::external_command::ExternalCommand; use crate::error::CheckIsLeaderError; use crate::error::Infallible; use crate::error::InitializeError; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; -use crate::raft::BoxCoreFn; use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; @@ -16,6 +16,7 @@ use crate::type_config::alias::OneshotSenderOf; use crate::type_config::alias::ResponderOf; use crate::type_config::alias::SnapshotDataOf; use crate::ChangeMembers; +use crate::RaftState; use crate::RaftTypeConfig; use crate::Snapshot; use crate::Vote; @@ -91,7 +92,7 @@ where C: RaftTypeConfig }, ExternalCoreRequest { - req: BoxCoreFn, + req: BoxOnce<'static, RaftState>, }, ExternalCommand { diff --git a/openraft/src/core/sm/command.rs b/openraft/src/core/sm/command.rs index d9c90d55c..a18d65392 100644 --- a/openraft/src/core/sm/command.rs +++ b/openraft/src/core/sm/command.rs @@ -2,6 +2,7 @@ use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; +use crate::base::BoxAny; use crate::core::raft_msg::ResultSender; use crate::error::Infallible; use crate::raft_state::IOId; @@ -41,6 +42,17 @@ where C: RaftTypeConfig /// The last log id to apply, inclusive. last: LogIdOf, }, + + /// Apply a custom function to the state machine. + /// + /// To erase the type parameter `SM`, it is a + /// `Box Box> + Send + 'static>` + /// wrapped in a `Box` + Func { + func: BoxAny, + /// The SM type user specified, for debug purpose. + input_sm_type: &'static str, + }, } impl Command @@ -75,6 +87,7 @@ where C: RaftTypeConfig Command::BeginReceivingSnapshot { .. } => None, Command::InstallFullSnapshot { io_id, .. } => Some(*io_id), Command::Apply { .. } => None, + Command::Func { .. } => None, } } } @@ -93,6 +106,7 @@ where C: RaftTypeConfig write!(f, "BeginReceivingSnapshot") } Command::Apply { first, last } => write!(f, "Apply: [{},{}]", first, last), + Command::Func { .. } => write!(f, "Func"), } } } @@ -111,6 +125,7 @@ where C: RaftTypeConfig write!(f, "BeginReceivingSnapshot") } Command::Apply { first, last } => write!(f, "Apply: [{},{}]", first, last), + Command::Func { .. } => write!(f, "Func"), } } } @@ -141,6 +156,7 @@ where C: RaftTypeConfig last: last2, }, ) => first == first2 && last == last2, + (Command::Func { .. }, Command::Func { .. }) => false, _ => false, } } diff --git a/openraft/src/core/sm/worker.rs b/openraft/src/core/sm/worker.rs index 71b452021..d99a78fc5 100644 --- a/openraft/src/core/sm/worker.rs +++ b/openraft/src/core/sm/worker.rs @@ -4,6 +4,7 @@ use tracing_futures::Instrument; use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::OneshotSender; +use crate::base::BoxAsyncOnceMut; use crate::core::notification::Notification; use crate::core::raft_msg::ResultSender; use crate::core::sm::handle::Handle; @@ -142,6 +143,20 @@ where let res = CommandResult::new(Ok(Response::Apply(resp))); let _ = self.resp_tx.send(Notification::sm(res)); } + Command::Func { func, input_sm_type } => { + tracing::debug!("{}: run user defined Func", func_name!()); + + let res: Result>, _> = func.downcast(); + if let Ok(f) = res { + f(&mut self.state_machine).await; + } else { + tracing::warn!( + "User-defined SM function uses incorrect state machine type, expected: {}, got: {}", + std::any::type_name::(), + input_sm_type + ); + }; + } }; } } diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 4ce978b9e..34553e2b6 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -2,6 +2,7 @@ pub mod decompose; pub mod into_ok; +mod invalid_sm; mod replication_closed; mod streaming_error; @@ -13,6 +14,7 @@ use std::time::Duration; use anyerror::AnyError; +pub use self::invalid_sm::InvalidStateMachineType; pub use self::replication_closed::ReplicationClosed; pub use self::streaming_error::StreamingError; use crate::network::RPCTypes; diff --git a/openraft/src/error/invalid_sm.rs b/openraft/src/error/invalid_sm.rs new file mode 100644 index 000000000..9beca1682 --- /dev/null +++ b/openraft/src/error/invalid_sm.rs @@ -0,0 +1,30 @@ +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +#[error( + "User-defined function on the state machine failed to run; \ + It may have used a different type \ + of state machine from the one in RaftCore (`{actual_type}`)" +)] + +pub struct InvalidStateMachineType { + pub actual_type: &'static str, +} + +impl InvalidStateMachineType { + pub(crate) fn new() -> Self { + Self { + actual_type: std::any::type_name::(), + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn test_invalid_state_machine_type_to_string() { + let err = super::InvalidStateMachineType::new::(); + assert_eq!( + err.to_string(), + "User-defined function on the state machine failed to run; It may have used a different type of state machine from the one in RaftCore (`u32`)" + ); + } +} diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index df41dda9e..ba80f0d2d 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -49,6 +49,7 @@ pub(crate) mod proposer; pub(crate) mod raft_state; pub(crate) mod utime; +pub mod base; #[cfg(feature = "compat")] pub mod compat; pub mod docs; @@ -74,6 +75,9 @@ pub use type_config::async_runtime; pub use type_config::async_runtime::impls::TokioRuntime; pub use type_config::AsyncRuntime; +pub use crate::base::OptionalSend; +pub use crate::base::OptionalSerde; +pub use crate::base::OptionalSync; pub use crate::change_members::ChangeMembers; pub use crate::config::Config; pub use crate::config::ConfigError; @@ -124,44 +128,6 @@ pub use crate::vote::CommittedLeaderId; pub use crate::vote::LeaderId; pub use crate::vote::Vote; -#[cfg(feature = "serde")] -#[doc(hidden)] -pub trait OptionalSerde: serde::Serialize + for<'a> serde::Deserialize<'a> {} - -#[cfg(feature = "serde")] -impl OptionalSerde for T where T: serde::Serialize + for<'a> serde::Deserialize<'a> {} - -#[cfg(not(feature = "serde"))] -#[doc(hidden)] -pub trait OptionalSerde {} - -#[cfg(not(feature = "serde"))] -impl OptionalSerde for T {} - -#[cfg(feature = "singlethreaded")] -pub trait OptionalSend {} - -#[cfg(feature = "singlethreaded")] -pub trait OptionalSync {} - -#[cfg(feature = "singlethreaded")] -impl OptionalSend for T {} - -#[cfg(feature = "singlethreaded")] -impl OptionalSync for T {} - -#[cfg(not(feature = "singlethreaded"))] -pub trait OptionalSend: Send {} - -#[cfg(not(feature = "singlethreaded"))] -pub trait OptionalSync: Sync {} - -#[cfg(not(feature = "singlethreaded"))] -impl OptionalSend for T {} - -#[cfg(not(feature = "singlethreaded"))] -impl OptionalSync for T {} - /// A trait defining application specific data. /// /// The intention of this trait is that applications which are using this crate will be able to diff --git a/openraft/src/raft/core_state.rs b/openraft/src/raft/core_state.rs index acefa3c42..b6892af26 100644 --- a/openraft/src/raft/core_state.rs +++ b/openraft/src/raft/core_state.rs @@ -13,3 +13,12 @@ where C: RaftTypeConfig /// The RaftCore task has finished. The return value of the task is stored. Done(Result>), } + +impl CoreState +where C: RaftTypeConfig +{ + /// Returns `true` if the RaftCore task is still running. + pub(in crate::raft) fn is_running(&self) -> bool { + matches!(self, CoreState::Running(_)) + } +} diff --git a/openraft/src/raft/external_request.rs b/openraft/src/raft/external_request.rs deleted file mode 100644 index 267e71e95..000000000 --- a/openraft/src/raft/external_request.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! Defines API for application to send request to access Raft core. - -use crate::OptionalSend; -use crate::RaftState; -use crate::RaftTypeConfig; - -pub trait BoxCoreFnInternal: FnOnce(&RaftState) + OptionalSend -where C: RaftTypeConfig -{ -} - -impl) + OptionalSend> BoxCoreFnInternal for T {} - -/// Boxed trait object for external request function run in `RaftCore` task. -pub(crate) type BoxCoreFn = Box + 'static>; diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index af238eee3..3d5aa6acd 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -10,7 +10,6 @@ #[cfg(test)] mod declare_raft_types_test; -mod external_request; mod impl_raft_blocking_write; pub(crate) mod message; mod raft_inner; @@ -21,8 +20,6 @@ pub mod trigger; use std::collections::BTreeMap; use std::error::Error; -pub(crate) use self::external_request::BoxCoreFn; - pub(in crate::raft) mod core_state; use std::fmt::Debug; @@ -48,11 +45,15 @@ use tracing::Level; use crate::async_runtime::watch::WatchReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::OneshotSender; +use crate::base::BoxAsyncOnceMut; +use crate::base::BoxFuture; +use crate::base::BoxOnce; use crate::config::Config; use crate::config::RuntimeConfig; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::RaftMsg; use crate::core::replication_lag; +use crate::core::sm; use crate::core::sm::worker; use crate::core::RaftCore; use crate::core::Tick; @@ -63,6 +64,7 @@ use crate::error::ClientWriteError; use crate::error::Fatal; use crate::error::Infallible; use crate::error::InitializeError; +use crate::error::InvalidStateMachineType; use crate::error::RaftError; use crate::membership::IntoNodes; use crate::metrics::RaftDataMetrics; @@ -224,8 +226,6 @@ where C: RaftTypeConfig /// ### `storage` /// An implementation of the [`RaftLogStorage`] and [`RaftStateMachine`] trait which will be /// used by Raft for data storage. - /// - /// [`RaftNetworkFactory`]: crate::network::RaftNetworkFactory #[tracing::instrument(level="debug", skip_all, fields(cluster=%config.cluster_name))] pub async fn new( id: C::NodeId, @@ -407,7 +407,7 @@ where C: RaftTypeConfig /// Install a completely received snapshot to the state machine. /// - /// This method is used to implement a totally application defined snapshot transmission. + /// This method is used to implement an application defined snapshot transmission. /// The application receives a snapshot from the leader, in chunks or a stream, and /// then rebuild a snapshot, then pass the snapshot to Raft to install. #[tracing::instrument(level = "debug", skip_all)] @@ -780,10 +780,95 @@ where C: RaftTypeConfig /// destroyed right away and not called at all. pub fn external_request(&self, req: F) where F: FnOnce(&RaftState) + OptionalSend + 'static { - let req: BoxCoreFn = Box::new(req); + let req: BoxOnce<'static, RaftState> = Box::new(req); let _ignore_error = self.inner.tx_api.send(RaftMsg::ExternalCoreRequest { req }); } + /// Provides mutable access to [`RaftStateMachine`] through a user-provided function. + /// + /// The function `func` is applied to the current [`RaftStateMachine`]. The result of this + /// function, of type `V`, is returned wrapped in + /// `Result, Fatal>`. + /// `Fatal` error will be returned if failed to receive a reply from `RaftCore`. + /// + /// A `Fatal` error is returned if: + /// - Raft core task is stopped normally. + /// - Raft core task is panicked due to programming error. + /// - Raft core task is encountered a storage error. + /// + /// If the user function fail to run, e.g., the input `SM` is different one from the one in + /// `RaftCore`, it returns an [`InvalidStateMachineType`] error. + /// + /// Example for getting the last applied log id from SM(assume there is `last_applied()` method + /// provided): + /// + /// ```rust,ignore + /// let last_applied_log_id = my_raft.with_state_machine(|sm| { + /// async move { sm.last_applied().await } + /// }).await?; + /// ``` + pub async fn with_state_machine(&self, func: F) -> Result, Fatal> + where + SM: RaftStateMachine, + F: FnOnce(&mut SM) -> BoxFuture + OptionalSend + 'static, + V: OptionalSend + 'static, + { + let (tx, rx) = C::oneshot(); + + self.external_state_machine_request(|sm| { + Box::pin(async move { + let resp = func(sm).await; + if let Err(_err) = tx.send(resp) { + tracing::error!("{}: fail to send response to user communicating tx", func_name!()); + } + }) + }); + + let recv_res = rx.await; + tracing::debug!("{} receives result is error: {:?}", func_name!(), recv_res.is_err()); + + let Ok(v) = recv_res else { + if self.inner.is_core_running().await { + return Ok(Err(InvalidStateMachineType::new::())); + } else { + let fatal = self.inner.get_core_stopped_error("receiving rx from RaftCore", None::<&'static str>).await; + tracing::error!(error = debug(&fatal), "error when {}", func_name!()); + return Err(fatal); + } + }; + + Ok(Ok(v)) + } + + /// Send a request to the [`RaftStateMachine`] worker in a fire-and-forget manner. + /// + /// The request functor will be called with a mutable reference to the state machine. + /// The functor returns a [`Future`] because state machine methods are `async`. + /// + /// If the API channel is already closed (Raft is in shutdown), then the request functor is + /// destroyed right away and not called at all. + /// + /// If the input `SM` is different from the one in `RaftCore`, it just silently ignores it. + pub fn external_state_machine_request(&self, req: F) + where + SM: 'static, + F: FnOnce(&mut SM) -> BoxFuture<()> + OptionalSend + 'static, + { + let input_sm_type = std::any::type_name::(); + + let func: BoxAsyncOnceMut<'static, SM> = Box::new(req); + + // Erase the type so that to send through a channel without `SM` type parameter. + // `sm::Worker` will downcast it back to BoxAsyncOnce. + let func = Box::new(func); + + let sm_cmd = sm::Command::Func { func, input_sm_type }; + let raft_msg = RaftMsg::ExternalCommand { + cmd: ExternalCommand::StateMachineCommand { sm_cmd }, + }; + let _ignore_error = self.inner.tx_api.send(raft_msg); + } + /// Get a handle to the metrics channel. pub fn metrics(&self) -> WatchReceiverOf> { self.inner.rx_metrics.clone() diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index 6d3241932..9fb2caa25 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -131,6 +131,11 @@ where C: RaftTypeConfig Ok(()) } + pub(in crate::raft) async fn is_core_running(&self) -> bool { + let state = self.core_state.lock().await; + state.is_running() + } + /// Get the error that caused RaftCore to stop. pub(in crate::raft) async fn get_core_stopped_error( &self, diff --git a/tests/tests/client_api/main.rs b/tests/tests/client_api/main.rs index 94711e97d..e6453099b 100644 --- a/tests/tests/client_api/main.rs +++ b/tests/tests/client_api/main.rs @@ -15,5 +15,6 @@ mod t13_get_snapshot; mod t13_install_full_snapshot; mod t13_trigger_snapshot; mod t16_with_raft_state; +mod t16_with_state_machine; mod t50_lagging_network_write; mod t51_write_when_leader_quit; diff --git a/tests/tests/client_api/t16_with_raft_state.rs b/tests/tests/client_api/t16_with_raft_state.rs index c95621f9e..ac877dfc5 100644 --- a/tests/tests/client_api/t16_with_raft_state.rs +++ b/tests/tests/client_api/t16_with_raft_state.rs @@ -9,7 +9,7 @@ use openraft::Config; use crate::fixtures::ut_harness; use crate::fixtures::RaftRouter; -/// Access Raft state via `Raft::with_raft_state()` +/// Access Raft state via [`Raft::with_raft_state()`](openraft::Raft::with_raft_state) #[tracing::instrument] #[test_harness::test(harness = ut_harness)] async fn with_raft_state() -> Result<()> { diff --git a/tests/tests/client_api/t16_with_state_machine.rs b/tests/tests/client_api/t16_with_state_machine.rs new file mode 100644 index 000000000..9b39efe2d --- /dev/null +++ b/tests/tests/client_api/t16_with_state_machine.rs @@ -0,0 +1,140 @@ +use std::sync::Arc; + +use anyhow::Result; +use maplit::btreeset; +use openraft::error::Fatal; +use openraft::storage::RaftStateMachine; +use openraft::testing::log_id; +use openraft::Config; +use openraft::Entry; +use openraft::LogId; +use openraft::OptionalSend; +use openraft::RaftSnapshotBuilder; +use openraft::RaftTypeConfig; +use openraft::Snapshot; +use openraft::SnapshotMeta; +use openraft::StorageError; +use openraft::StoredMembership; +use openraft_memstore::ClientResponse; +use openraft_memstore::MemNodeId; +use openraft_memstore::TypeConfig; + +use crate::fixtures::ut_harness; +use crate::fixtures::MemStateMachine; +use crate::fixtures::RaftRouter; + +/// Access [`RaftStateMachine`] via +/// [`Raft::with_state_machine()`](openraft::Raft::with_state_machine) +#[tracing::instrument] +#[test_harness::test(harness = ut_harness)] +async fn with_state_machine() -> Result<()> { + let config = Arc::new( + Config { + enable_heartbeat: false, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?; + + let n0 = router.get_raft_handle(&0)?; + + tracing::info!("--- get last applied from SM"); + { + let applied = n0 + .with_state_machine(|sm: &mut MemStateMachine| { + Box::pin(async move { + let d = sm.get_state_machine().await; + d.last_applied_log + }) + }) + .await? + .unwrap(); + assert_eq!(applied, Some(log_id(1, 0, log_index))); + } + + tracing::info!("--- shutting down node 0"); + n0.shutdown().await?; + + let res = n0.with_state_machine(|_sm: &mut MemStateMachine| Box::pin(async move {})).await; + assert_eq!(Err(Fatal::Stopped), res); + + Ok(()) +} + +/// Call [`Raft::with_state_machine()`](openraft::Raft::with_state_machine) with wrong type +/// [`RaftStateMachine`] +#[tracing::instrument] +#[test_harness::test(harness = ut_harness)] +async fn with_state_machine_wrong_sm_type() -> Result<()> { + let config = Arc::new( + Config { + enable_heartbeat: false, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + router.new_cluster(btreeset! {0}, btreeset! {}).await?; + + let n0 = router.get_raft_handle(&0)?; + + tracing::info!("--- use wrong type SM"); + { + type TC = TypeConfig; + type Err = StorageError; + struct FooSM; + impl RaftSnapshotBuilder for FooSM { + async fn build_snapshot(&mut self) -> Result, Err> { + todo!() + } + } + impl RaftStateMachine for FooSM { + type SnapshotBuilder = Self; + + async fn applied_state(&mut self) -> Result<(Option>, StoredMembership), Err> { + todo!() + } + + async fn apply(&mut self, _entries: I) -> Result, Err> + where + I: IntoIterator> + OptionalSend, + I::IntoIter: OptionalSend, + { + todo!() + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + todo!() + } + + async fn begin_receiving_snapshot(&mut self) -> Result::SnapshotData>, Err> { + todo!() + } + + async fn install_snapshot( + &mut self, + _meta: &SnapshotMeta, + _snapshot: Box<::SnapshotData>, + ) -> Result<(), Err> { + todo!() + } + + async fn get_current_snapshot(&mut self) -> Result>, Err> { + todo!() + } + } + + let applied = n0.with_state_machine::<_, FooSM, _>(|_sm: &mut FooSM| Box::pin(async move {})).await?; + assert!(applied.is_err()); + } + + Ok(()) +}