diff --git a/core/src/states/kickoff.rs b/core/src/states/kickoff.rs index 91f6b83e..d866cade 100644 --- a/core/src/states/kickoff.rs +++ b/core/src/states/kickoff.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use bitcoin::{OutPoint, Txid, Witness}; +use eyre::Report; use statig::prelude::*; use crate::{ @@ -94,6 +95,18 @@ impl KickoffStateMachine { state(derive(Debug, Clone)) )] impl KickoffStateMachine { + pub fn wrap_err( + &'_ self, + method: &'static str, + ) -> impl FnOnce(BridgeError) -> eyre::Report + '_ { + move |e| { + Report::from(e).wrap_err(format!( + "Error in kickoff state machine for kickoff {:?} in {}", + self.kickoff_id, method + )) + } + } + #[action] pub(crate) fn on_transition(&mut self, state_a: &State, state_b: &State) { tracing::debug!(?self.kickoff_id, "Transitioning from {:?} to {:?}", state_a, state_b); @@ -213,7 +226,8 @@ impl KickoffStateMachine { // Add all watchtower challenges and operator asserts to matchers self.add_default_matchers(context).await?; Ok(()) - }) + }.map_err(self.wrap_err("on_kickoff_started_entry")) + ) .await; } @@ -232,7 +246,12 @@ impl KickoffStateMachine { pub(crate) async fn on_challenge_entry(&mut self, context: &mut StateContext) { println!("Watchtower Challenge Stage"); context - .capture_error(async |context| context.dispatch_duty(Duty::WatchtowerChallenge).await) + .capture_error(async |context| { + context + .dispatch_duty(Duty::WatchtowerChallenge) + .await + .map_err(self.wrap_err("on_watchtower_challenge_entry")) + }) .await; } } diff --git a/core/src/states/mod.rs b/core/src/states/mod.rs index 8cc0e284..c1c4969c 100644 --- a/core/src/states/mod.rs +++ b/core/src/states/mod.rs @@ -29,8 +29,11 @@ enum ContextProcessResult< Processing(Fut), } -/// Utility trait to make a generic process_block function +/// Utility trait to make processing generic trait ContextProcessor { + /// Processes the machine with the given state context (which contains the block cache) + /// If the machine is unchanged, it is returned as is. Otherwise, the machine is processed + /// and the result is returned as a future that processes the new events. fn process_with_ctx( self, block: &StateContext, @@ -41,6 +44,7 @@ trait ContextProcessor { >; } +/// Generic implementation for all state machines impl ContextProcessor for InitializedStateMachine where T: Owner, @@ -73,7 +77,7 @@ where mod kickoff; mod round; -// Block cache to optimize lookups +/// Block cache to optimize Txid and UTXO lookups for a block #[derive(Debug, Clone, Default)] pub struct BlockCache { txids: HashMap, @@ -159,12 +163,8 @@ pub trait Owner: Send + Sync + Clone + Default { // Enhanced StateError enum #[derive(Error, Debug)] pub enum StateError { - #[error("State machine error in {state_name} during {action}: {source}")] - StateMachineError { - state_name: String, - action: String, - source: BridgeError, - }, + #[error("{0}")] + StateMachineError(#[from] eyre::Report), #[error("Context error: {0}")] ContextError(String), #[error("Multiple errors: {0:?}")] @@ -178,9 +178,7 @@ pub struct StateContext { pub cache: Arc, pub new_round_machines: Vec>>, pub new_kickoff_machines: Vec>>, - pub errors: Vec>, - pub current_state: Option, - pub current_action: Option, + pub errors: Vec>, pub paramset: &'static ProtocolParamset, } @@ -198,18 +196,10 @@ impl StateContext { new_round_machines: Vec::new(), new_kickoff_machines: Vec::new(), errors: Vec::new(), - current_state: None, - current_action: None, paramset, } } - // Method to set the current state and action context - pub fn set_state_info(&mut self, state: impl Into, action: impl Into) { - self.current_state = Some(state.into()); - self.current_action = Some(action.into()); - } - pub async fn dispatch_duty(&self, duty: Duty) -> Result<(), BridgeError> { self.owner.handle_duty(duty).await } @@ -228,22 +218,28 @@ impl StateContext { self.new_kickoff_machines.push(machine); } - // Enhanced try_run method for better error context + /// Run an async closure and capture any errors in execution. + /// + /// It will store the error report in the context's `errors` field. The + /// errors are later collected by the state manager and reported. This + /// ensures that all errors are collected and reported in a single place. + /// In general, it's expected that the closure attaches context about the + /// state machine to the error report. You may check + /// [`KickoffStateMachine::wrap_err`] and [`RoundStateMachine::wrap_err`] + /// for an example implementation of an error wrapper utility function. + /// + /// # Parameters + /// * `fnc`: An async closure that takes a mutable reference to the state context and returns a result. + /// + /// # Returns + /// * `()` pub async fn capture_error( &mut self, - fnc: impl AsyncFnOnce(&mut Self) -> Result<(), BridgeError>, + fnc: impl AsyncFnOnce(&mut Self) -> Result<(), eyre::Report>, ) { let result = fnc(self).await; if let Err(e) = result { - let state_error = match (&self.current_state, &self.current_action) { - (Some(state), Some(action)) => Arc::new(StateError::StateMachineError { - state_name: state.clone(), - action: action.clone(), - source: e, - }), - _ => Arc::new(StateError::ContextError(e.to_string())), - }; - self.errors.push(state_error); + self.errors.push(e.into()); } } } @@ -326,13 +322,26 @@ impl StateManager { new_kickoff_machines: Vec::new(), new_round_machines: Vec::new(), errors: Vec::new(), - current_state: None, - current_action: None, paramset, } } - pub fn update_machines<'a, M>( + /// Updates the machines using the context and returns machines without + /// events and futures that process new events for machines that changed. + /// Empties the `machines` vector. + /// + /// # Parameters + /// * `machines`: A mutable reference to the vector of state machines to update. + /// * `base_context`: A reference to the base state context. + /// + /// # Returns + /// A tuple of the unchanged machines and the futures that process new + /// events for machines that generated events. + /// + /// # Type Parameters + /// * `M`: The type of the state machine. + /// * `a`: The lifetime of the state context reference (the future captures the context by reference). + fn update_machines<'a, M>( machines: &mut Vec>, base_context: &'a StateContext, ) -> ( @@ -348,7 +357,7 @@ impl StateManager { let mut processing_futures = Vec::new(); for machine in std::mem::take(machines).into_iter() { - match machine.process_with_ctx(&base_context) { + match machine.process_with_ctx(base_context) { ContextProcessResult::Processing(future) => processing_futures.push(future), ContextProcessResult::Unchanged(machine) => new_machines.push(machine), } @@ -357,6 +366,12 @@ impl StateManager { (new_machines, processing_futures) } + /// Processes the block and moves all state machines forward in parallel. + /// The state machines are updated until they stabilize in their state (ie. + /// the block does not generate any new events) + /// + /// # Errors + /// If the state machines do not stabilize after 50 iterations, we return an error. pub async fn process_block_parallel( &mut self, block: &Block, @@ -368,34 +383,36 @@ impl StateManager { let base_context = Self::create_context(self.owner.clone(), self.db.clone(), cache, self.paramset); - // Process all machines, for those unaffected return them, otherwise return + // Process all machines, for those unaffected collect them them, otherwise return // a future that processes the new events. let (mut final_kickoff_machines, mut kickoff_futures) = Self::update_machines(&mut self.kickoff_machines, &base_context); let (mut final_round_machines, mut round_futures) = Self::update_machines(&mut self.round_machines, &base_context); + let mut iterations = 0; // On each iteration, we'll update the changed machines until all machines // stabilize in their state. while !kickoff_futures.is_empty() || !round_futures.is_empty() { + if iterations > 50 { + return Err(BridgeError::Error( + "State machines did not stabilize after 50 iterations".into(), + )); + } + // Execute all futures in parallel let (kickoff_results, round_results) = join(join_all(kickoff_futures), join_all(round_futures)).await; // Unzip the results into updated machines and state contexts - let (mut changed_kickoff_machines, kickoff_contexts): (Vec<_>, Vec<_>) = + let (mut changed_kickoff_machines, mut kickoff_contexts): (Vec<_>, Vec<_>) = kickoff_results.into_iter().unzip(); - let (mut changed_round_machines, round_contexts): (Vec<_>, Vec<_>) = + let (mut changed_round_machines, mut round_contexts): (Vec<_>, Vec<_>) = round_results.into_iter().unzip(); - // Merge contexts - let mut all_contexts: Vec> = Vec::new(); - all_contexts.extend(kickoff_contexts); - all_contexts.extend(round_contexts); - // Merge and handle errors let mut all_errors = Vec::new(); - for ctx in &mut all_contexts { + for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) { all_errors.extend(std::mem::take(&mut ctx.errors)); } @@ -407,7 +424,7 @@ impl StateManager { } // Append the newly generated state machines into the changed machines list - for ctx in &mut all_contexts { + for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) { changed_round_machines.extend(std::mem::take(&mut ctx.new_round_machines)); changed_kickoff_machines.extend(std::mem::take(&mut ctx.new_kickoff_machines)); } @@ -421,14 +438,16 @@ impl StateManager { final_kickoff_machines.extend(finalized_kickoff_machines); final_round_machines.extend(finalized_round_machines); + // Update the futures to be processed kickoff_futures = new_kickoff_futures; round_futures = new_round_futures; + iterations += 1; } // TODO: commit to db // Add back the original machines - self.round_machines.extend(final_round_machines); + self.round_machines = (final_round_machines); self.kickoff_machines.extend(final_kickoff_machines); // Commit new machines - moved out of the borrow region diff --git a/core/src/states/round.rs b/core/src/states/round.rs index ccf3a33c..6cc48a67 100644 --- a/core/src/states/round.rs +++ b/core/src/states/round.rs @@ -1,3 +1,4 @@ +use eyre::Context; use statig::prelude::*; use std::collections::{HashMap, HashSet}; @@ -53,6 +54,7 @@ impl RoundStateMachine { } } } +use eyre::Report; #[state_machine( initial = "State::initial_collateral()", @@ -61,6 +63,18 @@ impl RoundStateMachine { )] // TODO: Add exit conditions too (ex: burn connector spent on smth else) impl RoundStateMachine { + pub fn wrap_err( + &'_ self, + method: &'static str, + ) -> impl FnOnce(BridgeError) -> eyre::Report + '_ { + move |e| { + Report::from(e).wrap_err(format!( + "Error in round state machine for operator {} in {}", + self.operator_idx, method + )) + } + } + #[action] pub(crate) fn on_transition(&mut self, state_a: &State, state_b: &State) { tracing::debug!(?self.operator_data, ?self.operator_idx, "Transitioning from {:?} to {:?}", state_a, state_b); @@ -119,15 +133,18 @@ impl RoundStateMachine { ) { context .capture_error(async |context| { - context - .owner - .handle_duty(Duty::NewReadyToReimburse { - round_idx: *round_idx, - used_kickoffs: used_kickoffs.clone(), - operator_idx: self.operator_idx, - }) - .await?; - Ok(()) + { + context + .owner + .handle_duty(Duty::NewReadyToReimburse { + round_idx: *round_idx, + used_kickoffs: used_kickoffs.clone(), + operator_idx: self.operator_idx, + }) + .await?; + Ok(()) + } + .map_err(self.wrap_err("on_round_tx_exit")) }) .await; } @@ -140,41 +157,44 @@ impl RoundStateMachine { ) { context .capture_error(async |context| { - self.matchers = HashMap::new(); - let contract_context = ContractContext::new_context_for_rounds( - self.operator_idx, - *round_idx, - context.paramset, - ); - let mut txhandlers = context - .owner - .create_txhandlers(TransactionType::Round, contract_context) - .await?; - let round_txhandler = txhandlers - .remove(&TransactionType::Round) - .ok_or(BridgeError::TxHandlerNotFound(TransactionType::Round))?; - let ready_to_reimburse_txhandler = txhandlers - .remove(&TransactionType::ReadyToReimburse) - .ok_or(BridgeError::TxHandlerNotFound( - TransactionType::ReadyToReimburse, - ))?; - self.matchers.insert( - Matcher::SentTx(*ready_to_reimburse_txhandler.get_txid()), - RoundEvent::ReadyToReimburseSent { - round_idx: *round_idx, - }, - ); - for idx in 1..context.paramset.num_kickoffs_per_round + 1 { + { + self.matchers = HashMap::new(); + let contract_context = ContractContext::new_context_for_rounds( + self.operator_idx, + *round_idx, + context.paramset, + ); + let mut txhandlers = context + .owner + .create_txhandlers(TransactionType::Round, contract_context) + .await?; + let round_txhandler = txhandlers + .remove(&TransactionType::Round) + .ok_or(BridgeError::TxHandlerNotFound(TransactionType::Round))?; + let ready_to_reimburse_txhandler = txhandlers + .remove(&TransactionType::ReadyToReimburse) + .ok_or(BridgeError::TxHandlerNotFound( + TransactionType::ReadyToReimburse, + ))?; self.matchers.insert( - Matcher::SpentUtxo( - *round_txhandler - .get_spendable_output(idx)? - .get_prev_outpoint(), - ), - RoundEvent::KickoffUtxoUsed { kickoff_idx: idx }, + Matcher::SentTx(*ready_to_reimburse_txhandler.get_txid()), + RoundEvent::ReadyToReimburseSent { + round_idx: *round_idx, + }, ); + for idx in 1..context.paramset.num_kickoffs_per_round + 1 { + self.matchers.insert( + Matcher::SpentUtxo( + *round_txhandler + .get_spendable_output(idx)? + .get_prev_outpoint(), + ), + RoundEvent::KickoffUtxoUsed { kickoff_idx: idx }, + ); + } + Ok(()) } - Ok(()) + .map_err(self.wrap_err("on_round_tx_entry")) }) .await; } @@ -202,28 +222,31 @@ impl RoundStateMachine { ) { context .capture_error(async |context| { - self.matchers = HashMap::new(); - // get next rounds Round tx - let contract_context = ContractContext::new_context_for_rounds( - self.operator_idx, - *round_idx + 1, - context.paramset, - ); - let next_round_txhandlers = context - .owner - .create_txhandlers(TransactionType::Round, contract_context) - .await?; - let next_round_txid = next_round_txhandlers - .get(&TransactionType::Round) - .ok_or(BridgeError::TxHandlerNotFound(TransactionType::Round))? - .get_txid(); - self.matchers.insert( - Matcher::SentTx(*next_round_txid), - RoundEvent::RoundSent { - round_idx: *round_idx + 1, - }, - ); - Ok(()) + { + self.matchers = HashMap::new(); + // get next rounds Round tx + let contract_context = ContractContext::new_context_for_rounds( + self.operator_idx, + *round_idx + 1, + context.paramset, + ); + let next_round_txhandlers = context + .owner + .create_txhandlers(TransactionType::Round, contract_context) + .await?; + let next_round_txid = next_round_txhandlers + .get(&TransactionType::Round) + .ok_or(BridgeError::TxHandlerNotFound(TransactionType::Round))? + .get_txid(); + self.matchers.insert( + Matcher::SentTx(*next_round_txid), + RoundEvent::RoundSent { + round_idx: *round_idx + 1, + }, + ); + Ok(()) + } + .map_err(self.wrap_err("on_ready_to_reimburse_entry")) }) .await; }