Skip to content

Commit

Permalink
refactor: improve state processing documentation and visibility
Browse files Browse the repository at this point in the history
feat: implement recursion limit, better error reports
  • Loading branch information
mmtftr committed Feb 28, 2025
1 parent a97e1a8 commit b57b529
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 110 deletions.
23 changes: 21 additions & 2 deletions core/src/states/kickoff.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;

use bitcoin::{OutPoint, Txid, Witness};
use eyre::Report;
use statig::prelude::*;

use crate::{
Expand Down Expand Up @@ -94,6 +95,18 @@ impl<T: Owner> KickoffStateMachine<T> {
state(derive(Debug, Clone))
)]
impl<T: Owner> KickoffStateMachine<T> {
pub fn wrap_err(
&'_ self,
method: &'static str,
) -> impl FnOnce(BridgeError) -> eyre::Report + '_ {
move |e| {
Report::from(e).wrap_err(format!(
"Error in kickoff state machine for kickoff {:?} in {}",
self.kickoff_id, method
))
}
}

#[action]
pub(crate) fn on_transition(&mut self, state_a: &State, state_b: &State) {
tracing::debug!(?self.kickoff_id, "Transitioning from {:?} to {:?}", state_a, state_b);
Expand Down Expand Up @@ -213,7 +226,8 @@ impl<T: Owner> KickoffStateMachine<T> {
// Add all watchtower challenges and operator asserts to matchers
self.add_default_matchers(context).await?;
Ok(())
})
}.map_err(self.wrap_err("on_kickoff_started_entry"))
)
.await;
}

Expand All @@ -232,7 +246,12 @@ impl<T: Owner> KickoffStateMachine<T> {
pub(crate) async fn on_challenge_entry(&mut self, context: &mut StateContext<T>) {
println!("Watchtower Challenge Stage");
context
.capture_error(async |context| context.dispatch_duty(Duty::WatchtowerChallenge).await)
.capture_error(async |context| {
context
.dispatch_duty(Duty::WatchtowerChallenge)
.await
.map_err(self.wrap_err("on_watchtower_challenge_entry"))
})
.await;
}
}
109 changes: 64 additions & 45 deletions core/src/states/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ enum ContextProcessResult<
Processing(Fut),
}

/// Utility trait to make a generic process_block function
/// Utility trait to make processing generic
trait ContextProcessor<T: Owner, M: IntoStateMachine> {
/// Processes the machine with the given state context (which contains the block cache)
/// If the machine is unchanged, it is returned as is. Otherwise, the machine is processed
/// and the result is returned as a future that processes the new events.
fn process_with_ctx(
self,
block: &StateContext<T>,
Expand All @@ -41,6 +44,7 @@ trait ContextProcessor<T: Owner, M: IntoStateMachine> {
>;
}

/// Generic implementation for all state machines
impl<T, M> ContextProcessor<T, M> for InitializedStateMachine<M>
where
T: Owner,
Expand Down Expand Up @@ -73,7 +77,7 @@ where
mod kickoff;
mod round;

// Block cache to optimize lookups
/// Block cache to optimize Txid and UTXO lookups for a block
#[derive(Debug, Clone, Default)]
pub struct BlockCache {
txids: HashMap<Txid, Transaction>,
Expand Down Expand Up @@ -159,12 +163,8 @@ pub trait Owner: Send + Sync + Clone + Default {
// Enhanced StateError enum
#[derive(Error, Debug)]
pub enum StateError {
#[error("State machine error in {state_name} during {action}: {source}")]
StateMachineError {
state_name: String,
action: String,
source: BridgeError,
},
#[error("{0}")]
StateMachineError(#[from] eyre::Report),
#[error("Context error: {0}")]
ContextError(String),
#[error("Multiple errors: {0:?}")]
Expand All @@ -178,9 +178,7 @@ pub struct StateContext<T: Owner> {
pub cache: Arc<BlockCache>,
pub new_round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
pub new_kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
pub errors: Vec<Arc<StateError>>,
pub current_state: Option<String>,
pub current_action: Option<String>,
pub errors: Vec<Arc<eyre::Report>>,
pub paramset: &'static ProtocolParamset,
}

Expand All @@ -198,18 +196,10 @@ impl<T: Owner> StateContext<T> {
new_round_machines: Vec::new(),
new_kickoff_machines: Vec::new(),
errors: Vec::new(),
current_state: None,
current_action: None,
paramset,
}
}

// Method to set the current state and action context
pub fn set_state_info(&mut self, state: impl Into<String>, action: impl Into<String>) {
self.current_state = Some(state.into());
self.current_action = Some(action.into());
}

pub async fn dispatch_duty(&self, duty: Duty) -> Result<(), BridgeError> {
self.owner.handle_duty(duty).await
}
Expand All @@ -228,22 +218,28 @@ impl<T: Owner> StateContext<T> {
self.new_kickoff_machines.push(machine);
}

// Enhanced try_run method for better error context
/// Run an async closure and capture any errors in execution.
///
/// It will store the error report in the context's `errors` field. The
/// errors are later collected by the state manager and reported. This
/// ensures that all errors are collected and reported in a single place.
/// In general, it's expected that the closure attaches context about the
/// state machine to the error report. You may check
/// [`KickoffStateMachine::wrap_err`] and [`RoundStateMachine::wrap_err`]
/// for an example implementation of an error wrapper utility function.
///
/// # Parameters
/// * `fnc`: An async closure that takes a mutable reference to the state context and returns a result.
///
/// # Returns
/// * `()`
pub async fn capture_error(
&mut self,
fnc: impl AsyncFnOnce(&mut Self) -> Result<(), BridgeError>,
fnc: impl AsyncFnOnce(&mut Self) -> Result<(), eyre::Report>,
) {
let result = fnc(self).await;
if let Err(e) = result {
let state_error = match (&self.current_state, &self.current_action) {
(Some(state), Some(action)) => Arc::new(StateError::StateMachineError {
state_name: state.clone(),
action: action.clone(),
source: e,
}),
_ => Arc::new(StateError::ContextError(e.to_string())),
};
self.errors.push(state_error);
self.errors.push(e.into());
}
}
}
Expand Down Expand Up @@ -326,13 +322,26 @@ impl<T: Owner + 'static> StateManager<T> {
new_kickoff_machines: Vec::new(),
new_round_machines: Vec::new(),
errors: Vec::new(),
current_state: None,
current_action: None,
paramset,
}
}

pub fn update_machines<'a, M>(
/// Updates the machines using the context and returns machines without
/// events and futures that process new events for machines that changed.
/// Empties the `machines` vector.
///
/// # Parameters
/// * `machines`: A mutable reference to the vector of state machines to update.
/// * `base_context`: A reference to the base state context.
///
/// # Returns
/// A tuple of the unchanged machines and the futures that process new
/// events for machines that generated events.
///
/// # Type Parameters
/// * `M`: The type of the state machine.
/// * `a`: The lifetime of the state context reference (the future captures the context by reference).
fn update_machines<'a, M>(
machines: &mut Vec<InitializedStateMachine<M>>,
base_context: &'a StateContext<T>,
) -> (
Expand All @@ -348,7 +357,7 @@ impl<T: Owner + 'static> StateManager<T> {
let mut processing_futures = Vec::new();

for machine in std::mem::take(machines).into_iter() {
match machine.process_with_ctx(&base_context) {
match machine.process_with_ctx(base_context) {
ContextProcessResult::Processing(future) => processing_futures.push(future),
ContextProcessResult::Unchanged(machine) => new_machines.push(machine),
}
Expand All @@ -357,6 +366,12 @@ impl<T: Owner + 'static> StateManager<T> {
(new_machines, processing_futures)
}

/// Processes the block and moves all state machines forward in parallel.
/// The state machines are updated until they stabilize in their state (ie.
/// the block does not generate any new events)
///
/// # Errors
/// If the state machines do not stabilize after 50 iterations, we return an error.
pub async fn process_block_parallel(
&mut self,
block: &Block,
Expand All @@ -368,34 +383,36 @@ impl<T: Owner + 'static> StateManager<T> {
let base_context =
Self::create_context(self.owner.clone(), self.db.clone(), cache, self.paramset);

// Process all machines, for those unaffected return them, otherwise return
// Process all machines, for those unaffected collect them them, otherwise return
// a future that processes the new events.
let (mut final_kickoff_machines, mut kickoff_futures) =
Self::update_machines(&mut self.kickoff_machines, &base_context);
let (mut final_round_machines, mut round_futures) =
Self::update_machines(&mut self.round_machines, &base_context);

let mut iterations = 0;
// On each iteration, we'll update the changed machines until all machines
// stabilize in their state.
while !kickoff_futures.is_empty() || !round_futures.is_empty() {
if iterations > 50 {
return Err(BridgeError::Error(
"State machines did not stabilize after 50 iterations".into(),
));
}

// Execute all futures in parallel
let (kickoff_results, round_results) =
join(join_all(kickoff_futures), join_all(round_futures)).await;

// Unzip the results into updated machines and state contexts
let (mut changed_kickoff_machines, kickoff_contexts): (Vec<_>, Vec<_>) =
let (mut changed_kickoff_machines, mut kickoff_contexts): (Vec<_>, Vec<_>) =
kickoff_results.into_iter().unzip();
let (mut changed_round_machines, round_contexts): (Vec<_>, Vec<_>) =
let (mut changed_round_machines, mut round_contexts): (Vec<_>, Vec<_>) =
round_results.into_iter().unzip();

// Merge contexts
let mut all_contexts: Vec<StateContext<T>> = Vec::new();
all_contexts.extend(kickoff_contexts);
all_contexts.extend(round_contexts);

// Merge and handle errors
let mut all_errors = Vec::new();
for ctx in &mut all_contexts {
for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
all_errors.extend(std::mem::take(&mut ctx.errors));
}

Expand All @@ -407,7 +424,7 @@ impl<T: Owner + 'static> StateManager<T> {
}

// Append the newly generated state machines into the changed machines list
for ctx in &mut all_contexts {
for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
changed_round_machines.extend(std::mem::take(&mut ctx.new_round_machines));
changed_kickoff_machines.extend(std::mem::take(&mut ctx.new_kickoff_machines));
}
Expand All @@ -421,14 +438,16 @@ impl<T: Owner + 'static> StateManager<T> {
final_kickoff_machines.extend(finalized_kickoff_machines);
final_round_machines.extend(finalized_round_machines);

// Update the futures to be processed
kickoff_futures = new_kickoff_futures;
round_futures = new_round_futures;
iterations += 1;
}

// TODO: commit to db

// Add back the original machines
self.round_machines.extend(final_round_machines);
self.round_machines = (final_round_machines);
self.kickoff_machines.extend(final_kickoff_machines);

// Commit new machines - moved out of the borrow region
Expand Down
Loading

0 comments on commit b57b529

Please sign in to comment.