Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(code): Some WAL entries fail to decode #840

Merged
merged 11 commits into from
Feb 10, 2025
18 changes: 11 additions & 7 deletions code/crates/core-consensus/src/handle/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,29 @@ where
"Timeout elapsed"
);

// Persist the timeout in the Write-ahead Log
perform!(co, Effect::WalAppendTimeout(timeout, Default::default()));
if !matches!(
timeout.kind,
TimeoutKind::PrevoteTimeLimit | TimeoutKind::PrecommitTimeLimit
) {
// Persist the timeout in the Write-ahead Log.
// Time-limit timeouts are not persisted because they only occur when consensus is stuck.
perform!(co, Effect::WalAppendTimeout(timeout, Default::default()));
}

apply_driver_input(co, state, metrics, DriverInput::TimeoutElapsed(timeout)).await?;

match timeout.kind {
TimeoutKind::PrevoteTimeLimit | TimeoutKind::PrecommitTimeLimit => {
on_step_limit_timeout(co, state, metrics, timeout.round).await?;
on_step_limit_timeout(co, state, metrics, timeout.round).await
}
TimeoutKind::Commit => {
let proposal = state
.decision
.remove(&(height, round))
.ok_or_else(|| Error::DecidedValueNotFound(height, round))?;

decide(co, state, metrics, round, proposal).await?;
decide(co, state, metrics, round, proposal).await
}
_ => {}
_ => Ok(()),
}

Ok(())
}
6 changes: 5 additions & 1 deletion code/crates/engine/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
Expand Down Expand Up @@ -600,12 +601,15 @@ where

if let Err(e) = self.replay_wal_entries(myself, state, entries).await {
error!(%height, "Failed to replay WAL entries: {e}");
self.tx_event.send(|| Event::WalReplayError(Arc::new(e)));
}

state.phase = Phase::Running;
}
Err(e) => {
error!(%height, "Error when notifying WAL of started height: {e}")
error!(%height, "Error when notifying WAL of started height: {e}");
self.tx_event
.send(|| Event::WalReplayError(Arc::new(e.into())));
}
}

Expand Down
4 changes: 4 additions & 0 deletions code/crates/engine/src/util/events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use core::fmt;
use std::sync::Arc;

use derive_where::derive_where;
use ractor::ActorProcessingErr;
use tokio::sync::broadcast;

use malachitebft_core_consensus::{LocallyProposedValue, ProposedValue, SignedConsensusMsg};
Expand Down Expand Up @@ -49,6 +51,7 @@ pub enum Event<Ctx: Context> {
WalReplayConsensus(SignedConsensusMsg<Ctx>),
WalReplayTimeout(Timeout),
WalReplayDone(Ctx::Height),
WalReplayError(Arc<ActorProcessingErr>),
}

impl<Ctx: Context> fmt::Display for Event<Ctx> {
Expand Down Expand Up @@ -82,6 +85,7 @@ impl<Ctx: Context> fmt::Display for Event<Ctx> {
Event::WalReplayConsensus(msg) => write!(f, "WalReplayConsensus(msg: {msg:?})"),
Event::WalReplayTimeout(timeout) => write!(f, "WalReplayTimeout(timeout: {timeout:?})"),
Event::WalReplayDone(height) => write!(f, "WalReplayDone(height: {height})"),
Event::WalReplayError(error) => write!(f, "WalReplayError({error})"),
}
}
}
30 changes: 15 additions & 15 deletions code/crates/engine/src/wal/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ where
{
match self {
WalEntry::ConsensusMsg(msg) => {
// Write tag
buf.write_u8(Self::TAG_CONSENSUS)?;

let bytes = codec.encode(msg).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("failed to encode consensus message: {e}"),
)
})?;

// Write tag
buf.write_u8(Self::TAG_CONSENSUS)?;

// Write encoded length
buf.write_u64::<BE>(bytes.len() as u64)?;

Expand All @@ -80,11 +80,8 @@ where
}

WalEntry::Timeout(timeout) => {
// Write tag
buf.write_u8(Self::TAG_TIMEOUT)?;

// Write timeout
encode_timeout(timeout, &mut buf)?;
// Write tag and timeout if applicable
encode_timeout(Self::TAG_TIMEOUT, timeout, &mut buf)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Thanks so much for figuring this out 🙌

Copy link
Collaborator Author

@ancazamfir ancazamfir Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should have consensus decide which entries should be logged and have wal write everything it's told to (by consensus).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Ok(())
}
Expand Down Expand Up @@ -124,7 +121,7 @@ where
}
}

fn encode_timeout(timeout: &Timeout, mut buf: impl Write) -> io::Result<()> {
fn encode_timeout(tag: u8, timeout: &Timeout, mut buf: impl Write) -> io::Result<()> {
use malachitebft_core_types::TimeoutKind;

let step = match timeout.kind {
Expand All @@ -133,14 +130,15 @@ fn encode_timeout(timeout: &Timeout, mut buf: impl Write) -> io::Result<()> {
TimeoutKind::Precommit => 3,
TimeoutKind::Commit => 4,

// We do not store these two timeouts in the WAL
TimeoutKind::PrevoteTimeLimit | TimeoutKind::PrecommitTimeLimit => 0,
// Consensus will typically not want to store these two timeouts in the WAL,
// but we still need to handle them here.
TimeoutKind::PrevoteTimeLimit => 5,
TimeoutKind::PrecommitTimeLimit => 6,
};

if step > 0 {
buf.write_u8(step)?;
buf.write_i64::<BE>(timeout.round.as_i64())?;
}
buf.write_u8(tag)?;
buf.write_u8(step)?;
buf.write_i64::<BE>(timeout.round.as_i64())?;

Ok(())
}
Expand All @@ -153,6 +151,8 @@ fn decode_timeout(mut buf: impl Read) -> io::Result<Timeout> {
2 => TimeoutKind::Prevote,
3 => TimeoutKind::Precommit,
4 => TimeoutKind::Commit,
5 => TimeoutKind::PrevoteTimeLimit,
6 => TimeoutKind::PrecommitTimeLimit,
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand Down
51 changes: 31 additions & 20 deletions code/crates/engine/src/wal/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,21 @@ where
let mut buf = Vec::new();
entry.encode(codec, &mut buf)?;

let result = log.append(&buf).map_err(Into::into);

if let Err(e) = &result {
error!("ATTENTION: Failed to append entry to WAL: {e}");
} else {
debug!(
type = %tpe, entry.size = %buf.len(), log.entries = %log.len(),
"Wrote log entry"
);
}
if !buf.is_empty() {
let result = log.append(&buf).map_err(Into::into);

if let Err(e) = &result {
error!("ATTENTION: Failed to append entry to WAL: {e}");
} else {
debug!(
type = %tpe, entry.size = %buf.len(), log.entries = %log.len(),
"Wrote log entry"
);
}

if reply.send(result).is_err() {
error!("Failed to send WAL append reply");
if reply.send(result).is_err() {
error!("Failed to send WAL append reply");
}
}
}

Expand All @@ -113,8 +115,8 @@ where
error!("ATTENTION: Failed to flush WAL to disk: {e}");
} else {
debug!(
log.entries = %log.len(),
log.size = %log.size_bytes().unwrap_or(0),
wal.entries = %log.len(),
wal.size = %log.size_bytes().unwrap_or(0),
"Flushed WAL to disk"
);
}
Expand Down Expand Up @@ -144,23 +146,32 @@ where

let entries = log
.iter()?
.filter_map(|result| match result {
Ok(entry) => Some(entry),
.enumerate() // Add enumeration to get the index
.filter_map(|(idx, result)| match result {
Ok(entry) => Some((idx, entry)),
Err(e) => {
error!("Failed to retrieve a WAL entry: {e}");
error!("Failed to retrieve WAL entry {idx}: {e}");
None
}
})
.filter_map(
|bytes| match WalEntry::decode(codec, io::Cursor::new(bytes)) {
|(idx, bytes)| match WalEntry::decode(codec, io::Cursor::new(bytes.clone())) {
Ok(entry) => Some(entry),
Err(e) => {
error!("Failed to decode WAL entry: {e}");
error!("Failed to decode WAL entry {idx}: {e} {:?}", bytes);
None
}
},
)
.collect::<Vec<_>>();

Ok(entries)
if log.len() != entries.len() {
Err(eyre::eyre!(
"Failed to fetch and decode all WAL entries: expected {}, got {}",
log.len(),
entries.len()
))
} else {
Ok(entries)
}
}
15 changes: 8 additions & 7 deletions code/crates/starknet/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,9 @@ async fn run_node<S>(
Event::Published(msg) if is_full_node => {
panic!("Full nodes unexpectedly publish a consensus message: {msg:?}");
}
Event::WalReplayError(e) => {
panic!("WAL replay error: {e}");
}
_ => (),
}

Expand All @@ -530,14 +533,12 @@ async fn run_node<S>(
info!("Waiting until node reaches height {target_height}");

'inner: while let Ok(event) = rx_event.recv().await {
let Event::StartedHeight(height) = event else {
continue;
};

info!("Node started height {height}");
if let Event::StartedHeight(height) = &event {
info!("Node started height {height}");

if height.as_u64() == target_height {
break 'inner;
if height.as_u64() == target_height {
break 'inner;
}
}
}
}
Expand Down
35 changes: 35 additions & 0 deletions code/crates/starknet/test/src/tests/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,38 @@ async fn non_proposer_crashes_after_voting(params: TestParams) {
)
.await
}

#[tokio::test]
pub async fn node_crashes_after_vote_set_request() {
init_logging(module_path!());

const HEIGHT: u64 = 3;

let mut test = TestBuilder::<()>::new();

test.add_node().start().wait_until(HEIGHT).success();
test.add_node()
.start()
.wait_until(2)
.crash()
// Restart from the latest height
.restart_after(Duration::from_secs(5))
// Wait for a vote set request for height 2
.expect_vote_set_request(2)
.crash()
// Restart again
.restart_after(Duration::from_secs(5))
.wait_until(HEIGHT)
.success();

test.build()
.run_with_custom_config(
Duration::from_secs(60),
TestParams {
enable_sync: true,
timeout_step: Duration::from_secs(5),
..Default::default()
},
)
.await
}