diff --git a/code/crates/core-consensus/src/handle/timeout.rs b/code/crates/core-consensus/src/handle/timeout.rs index af28f8537..9eb805f50 100644 --- a/code/crates/core-consensus/src/handle/timeout.rs +++ b/code/crates/core-consensus/src/handle/timeout.rs @@ -34,14 +34,20 @@ where "Timeout elapsed" ); - // Persist the timeout in the Write-ahead Log - perform!(co, Effect::WalAppendTimeout(timeout, Default::default())); + if !matches!( + timeout.kind, + TimeoutKind::PrevoteTimeLimit | TimeoutKind::PrecommitTimeLimit + ) { + // Persist the timeout in the Write-ahead Log. + // Time-limit timeouts are not persisted because they only occur when consensus is stuck. + perform!(co, Effect::WalAppendTimeout(timeout, Default::default())); + } apply_driver_input(co, state, metrics, DriverInput::TimeoutElapsed(timeout)).await?; match timeout.kind { TimeoutKind::PrevoteTimeLimit | TimeoutKind::PrecommitTimeLimit => { - on_step_limit_timeout(co, state, metrics, timeout.round).await?; + on_step_limit_timeout(co, state, metrics, timeout.round).await } TimeoutKind::Commit => { let proposal = state @@ -49,10 +55,8 @@ where .remove(&(height, round)) .ok_or_else(|| Error::DecidedValueNotFound(height, round))?; - decide(co, state, metrics, round, proposal).await?; + decide(co, state, metrics, round, proposal).await } - _ => {} + _ => Ok(()), } - - Ok(()) } diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index b8a6813c5..74087b6cf 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -1,4 +1,5 @@ use std::collections::BTreeSet; +use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; @@ -600,12 +601,15 @@ where if let Err(e) = self.replay_wal_entries(myself, state, entries).await { error!(%height, "Failed to replay WAL entries: {e}"); + self.tx_event.send(|| Event::WalReplayError(Arc::new(e))); } state.phase = Phase::Running; } Err(e) => { - error!(%height, "Error when notifying WAL of started height: {e}") + error!(%height, "Error when notifying WAL of started height: {e}"); + self.tx_event + .send(|| Event::WalReplayError(Arc::new(e.into()))); } } diff --git a/code/crates/engine/src/util/events.rs b/code/crates/engine/src/util/events.rs index cde03d24e..a7ecb4430 100644 --- a/code/crates/engine/src/util/events.rs +++ b/code/crates/engine/src/util/events.rs @@ -1,6 +1,8 @@ use core::fmt; +use std::sync::Arc; use derive_where::derive_where; +use ractor::ActorProcessingErr; use tokio::sync::broadcast; use malachitebft_core_consensus::{LocallyProposedValue, ProposedValue, SignedConsensusMsg}; @@ -49,6 +51,7 @@ pub enum Event { WalReplayConsensus(SignedConsensusMsg), WalReplayTimeout(Timeout), WalReplayDone(Ctx::Height), + WalReplayError(Arc), } impl fmt::Display for Event { @@ -82,6 +85,7 @@ impl fmt::Display for Event { Event::WalReplayConsensus(msg) => write!(f, "WalReplayConsensus(msg: {msg:?})"), Event::WalReplayTimeout(timeout) => write!(f, "WalReplayTimeout(timeout: {timeout:?})"), Event::WalReplayDone(height) => write!(f, "WalReplayDone(height: {height})"), + Event::WalReplayError(error) => write!(f, "WalReplayError({error})"), } } } diff --git a/code/crates/engine/src/wal/entry.rs b/code/crates/engine/src/wal/entry.rs index fd7b2de07..80508fb2c 100644 --- a/code/crates/engine/src/wal/entry.rs +++ b/code/crates/engine/src/wal/entry.rs @@ -60,9 +60,6 @@ where { match self { WalEntry::ConsensusMsg(msg) => { - // Write tag - buf.write_u8(Self::TAG_CONSENSUS)?; - let bytes = codec.encode(msg).map_err(|e| { io::Error::new( io::ErrorKind::InvalidData, @@ -70,6 +67,9 @@ where ) })?; + // Write tag + buf.write_u8(Self::TAG_CONSENSUS)?; + // Write encoded length buf.write_u64::(bytes.len() as u64)?; @@ -80,11 +80,8 @@ where } WalEntry::Timeout(timeout) => { - // Write tag - buf.write_u8(Self::TAG_TIMEOUT)?; - - // Write timeout - encode_timeout(timeout, &mut buf)?; + // Write tag and timeout if applicable + encode_timeout(Self::TAG_TIMEOUT, timeout, &mut buf)?; Ok(()) } @@ -124,7 +121,7 @@ where } } -fn encode_timeout(timeout: &Timeout, mut buf: impl Write) -> io::Result<()> { +fn encode_timeout(tag: u8, timeout: &Timeout, mut buf: impl Write) -> io::Result<()> { use malachitebft_core_types::TimeoutKind; let step = match timeout.kind { @@ -133,14 +130,15 @@ fn encode_timeout(timeout: &Timeout, mut buf: impl Write) -> io::Result<()> { TimeoutKind::Precommit => 3, TimeoutKind::Commit => 4, - // We do not store these two timeouts in the WAL - TimeoutKind::PrevoteTimeLimit | TimeoutKind::PrecommitTimeLimit => 0, + // Consensus will typically not want to store these two timeouts in the WAL, + // but we still need to handle them here. + TimeoutKind::PrevoteTimeLimit => 5, + TimeoutKind::PrecommitTimeLimit => 6, }; - if step > 0 { - buf.write_u8(step)?; - buf.write_i64::(timeout.round.as_i64())?; - } + buf.write_u8(tag)?; + buf.write_u8(step)?; + buf.write_i64::(timeout.round.as_i64())?; Ok(()) } @@ -153,6 +151,8 @@ fn decode_timeout(mut buf: impl Read) -> io::Result { 2 => TimeoutKind::Prevote, 3 => TimeoutKind::Precommit, 4 => TimeoutKind::Commit, + 5 => TimeoutKind::PrevoteTimeLimit, + 6 => TimeoutKind::PrecommitTimeLimit, _ => { return Err(io::Error::new( io::ErrorKind::InvalidData, diff --git a/code/crates/engine/src/wal/thread.rs b/code/crates/engine/src/wal/thread.rs index 25a39815b..32986aad4 100644 --- a/code/crates/engine/src/wal/thread.rs +++ b/code/crates/engine/src/wal/thread.rs @@ -90,19 +90,21 @@ where let mut buf = Vec::new(); entry.encode(codec, &mut buf)?; - let result = log.append(&buf).map_err(Into::into); - - if let Err(e) = &result { - error!("ATTENTION: Failed to append entry to WAL: {e}"); - } else { - debug!( - type = %tpe, entry.size = %buf.len(), log.entries = %log.len(), - "Wrote log entry" - ); - } + if !buf.is_empty() { + let result = log.append(&buf).map_err(Into::into); + + if let Err(e) = &result { + error!("ATTENTION: Failed to append entry to WAL: {e}"); + } else { + debug!( + type = %tpe, entry.size = %buf.len(), log.entries = %log.len(), + "Wrote log entry" + ); + } - if reply.send(result).is_err() { - error!("Failed to send WAL append reply"); + if reply.send(result).is_err() { + error!("Failed to send WAL append reply"); + } } } @@ -113,8 +115,8 @@ where error!("ATTENTION: Failed to flush WAL to disk: {e}"); } else { debug!( - log.entries = %log.len(), - log.size = %log.size_bytes().unwrap_or(0), + wal.entries = %log.len(), + wal.size = %log.size_bytes().unwrap_or(0), "Flushed WAL to disk" ); } @@ -144,23 +146,32 @@ where let entries = log .iter()? - .filter_map(|result| match result { - Ok(entry) => Some(entry), + .enumerate() // Add enumeration to get the index + .filter_map(|(idx, result)| match result { + Ok(entry) => Some((idx, entry)), Err(e) => { - error!("Failed to retrieve a WAL entry: {e}"); + error!("Failed to retrieve WAL entry {idx}: {e}"); None } }) .filter_map( - |bytes| match WalEntry::decode(codec, io::Cursor::new(bytes)) { + |(idx, bytes)| match WalEntry::decode(codec, io::Cursor::new(bytes.clone())) { Ok(entry) => Some(entry), Err(e) => { - error!("Failed to decode WAL entry: {e}"); + error!("Failed to decode WAL entry {idx}: {e} {:?}", bytes); None } }, ) .collect::>(); - Ok(entries) + if log.len() != entries.len() { + Err(eyre::eyre!( + "Failed to fetch and decode all WAL entries: expected {}, got {}", + log.len(), + entries.len() + )) + } else { + Ok(entries) + } } diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index 12a2969ac..39df3606a 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -512,6 +512,9 @@ async fn run_node( Event::Published(msg) if is_full_node => { panic!("Full nodes unexpectedly publish a consensus message: {msg:?}"); } + Event::WalReplayError(e) => { + panic!("WAL replay error: {e}"); + } _ => (), } @@ -530,14 +533,12 @@ async fn run_node( info!("Waiting until node reaches height {target_height}"); 'inner: while let Ok(event) = rx_event.recv().await { - let Event::StartedHeight(height) = event else { - continue; - }; - - info!("Node started height {height}"); + if let Event::StartedHeight(height) = &event { + info!("Node started height {height}"); - if height.as_u64() == target_height { - break 'inner; + if height.as_u64() == target_height { + break 'inner; + } } } } diff --git a/code/crates/starknet/test/src/tests/wal.rs b/code/crates/starknet/test/src/tests/wal.rs index 88b14a0db..3cdd6cf5f 100644 --- a/code/crates/starknet/test/src/tests/wal.rs +++ b/code/crates/starknet/test/src/tests/wal.rs @@ -192,3 +192,38 @@ async fn non_proposer_crashes_after_voting(params: TestParams) { ) .await } + +#[tokio::test] +pub async fn node_crashes_after_vote_set_request() { + init_logging(module_path!()); + + const HEIGHT: u64 = 3; + + let mut test = TestBuilder::<()>::new(); + + test.add_node().start().wait_until(HEIGHT).success(); + test.add_node() + .start() + .wait_until(2) + .crash() + // Restart from the latest height + .restart_after(Duration::from_secs(5)) + // Wait for a vote set request for height 2 + .expect_vote_set_request(2) + .crash() + // Restart again + .restart_after(Duration::from_secs(5)) + .wait_until(HEIGHT) + .success(); + + test.build() + .run_with_custom_config( + Duration::from_secs(60), + TestParams { + enable_sync: true, + timeout_step: Duration::from_secs(5), + ..Default::default() + }, + ) + .await +}