Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(code): Some WAL entries fail to decode #840

Merged
merged 11 commits into from
Feb 10, 2025
2 changes: 2 additions & 0 deletions code/crates/engine/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,12 +599,14 @@ where
state.phase = Phase::Recovering;

if let Err(e) = self.replay_wal_entries(myself, state, entries).await {
self.tx_event.send(|| Event::WalReplayError);
error!(%height, "Failed to replay WAL entries: {e}");
}

state.phase = Phase::Running;
}
Err(e) => {
self.tx_event.send(|| Event::WalReplayError);
error!(%height, "Error when notifying WAL of started height: {e}")
}
}
Expand Down
2 changes: 2 additions & 0 deletions code/crates/engine/src/util/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub enum Event<Ctx: Context> {
WalReplayConsensus(SignedConsensusMsg<Ctx>),
WalReplayTimeout(Timeout),
WalReplayDone(Ctx::Height),
WalReplayError,
}

impl<Ctx: Context> fmt::Display for Event<Ctx> {
Expand Down Expand Up @@ -82,6 +83,7 @@ impl<Ctx: Context> fmt::Display for Event<Ctx> {
Event::WalReplayConsensus(msg) => write!(f, "WalReplayConsensus(msg: {msg:?})"),
Event::WalReplayTimeout(timeout) => write!(f, "WalReplayTimeout(timeout: {timeout:?})"),
Event::WalReplayDone(height) => write!(f, "WalReplayDone(height: {height})"),
Event::WalReplayError => write!(f, "WalReplayError"), // TODO: add error message
}
}
}
10 changes: 4 additions & 6 deletions code/crates/engine/src/wal/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,8 @@ where
}

WalEntry::Timeout(timeout) => {
// Write tag
buf.write_u8(Self::TAG_TIMEOUT)?;

// Write timeout
encode_timeout(timeout, &mut buf)?;
// Write tag and timeout if applicable
encode_timeout(Self::TAG_TIMEOUT, timeout, &mut buf)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Thanks so much for figuring this out 🙌

Copy link
Collaborator Author

@ancazamfir ancazamfir Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should have consensus decide which entries should be logged and have wal write everything it's told to (by consensus).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Ok(())
}
Expand Down Expand Up @@ -124,7 +121,7 @@ where
}
}

fn encode_timeout(timeout: &Timeout, mut buf: impl Write) -> io::Result<()> {
fn encode_timeout(tag: u8, timeout: &Timeout, mut buf: impl Write) -> io::Result<()> {
use malachitebft_core_types::TimeoutKind;

let step = match timeout.kind {
Expand All @@ -138,6 +135,7 @@ fn encode_timeout(timeout: &Timeout, mut buf: impl Write) -> io::Result<()> {
};

if step > 0 {
buf.write_u8(tag)?;
buf.write_u8(step)?;
buf.write_i64::<BE>(timeout.round.as_i64())?;
}
Expand Down
45 changes: 27 additions & 18 deletions code/crates/engine/src/wal/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ where
let sequence = height.as_u64();

if sequence == log.sequence() {
info!("Wal has {} entries", log.len());
// WAL is already at that sequence
// Let's check if there are any entries to replay
let entries = fetch_entries(log, codec);
Expand Down Expand Up @@ -90,19 +91,22 @@ where
let mut buf = Vec::new();
entry.encode(codec, &mut buf)?;

let result = log.append(&buf).map_err(Into::into);

if let Err(e) = &result {
error!("ATTENTION: Failed to append entry to WAL: {e}");
} else {
debug!(
type = %tpe, entry.size = %buf.len(), log.entries = %log.len(),
"Wrote log entry"
);
}
if !buf.is_empty() {
let result = log.append(&buf).map_err(Into::into);

if let Err(e) = &result {
error!("ATTENTION: Failed to append entry to WAL: {e}");
} else {
let length = log.len();
debug!(
type = %tpe, entry.size = %buf.len(), length,
"Wrote log entry"
);
}

if reply.send(result).is_err() {
error!("Failed to send WAL append reply");
if reply.send(result).is_err() {
error!("Failed to send WAL append reply");
}
}
}

Expand Down Expand Up @@ -144,23 +148,28 @@ where

let entries = log
.iter()?
.filter_map(|result| match result {
Ok(entry) => Some(entry),
.enumerate() // Add enumeration to get the index
.filter_map(|(idx, result)| match result {
Ok(entry) => Some((idx, entry)),
Err(e) => {
error!("Failed to retrieve a WAL entry: {e}");
error!("Failed to retrieve WAL entry {idx}: {e}");
None
}
})
.filter_map(
|bytes| match WalEntry::decode(codec, io::Cursor::new(bytes)) {
|(idx, bytes)| match WalEntry::decode(codec, io::Cursor::new(bytes.clone())) {
Ok(entry) => Some(entry),
Err(e) => {
error!("Failed to decode WAL entry: {e}");
error!("Failed to decode WAL entry {idx}: {e} {:?}", bytes);
None
}
},
)
.collect::<Vec<_>>();

Ok(entries)
if log.len() != entries.len() {
Err(eyre::eyre!("Failed to fetch and decode all WAL entries"))
} else {
Ok(entries)
}
}
20 changes: 14 additions & 6 deletions code/crates/starknet/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,22 @@ async fn run_node<S>(
info!("Waiting until node reaches height {target_height}");

'inner: while let Ok(event) = rx_event.recv().await {
let Event::StartedHeight(height) = event else {
continue;
};
match &event {
Event::StartedHeight(height) => {
info!("Node started height {height}");

info!("Node started height {height}");
if height.as_u64() == target_height {
break 'inner;
}
}
Event::WalReplayError => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is the best place to catch errors (WAL or others) in tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure either, but since we already detect errors in the background task, I moved it there.

actor_ref.stop(Some("WAL replay error".to_string()));
handle.abort();
bg.abort();

if height.as_u64() == target_height {
break 'inner;
return TestResult::Failure("WAL replay error".to_string());
}
_ => (),
}
}
}
Expand Down
38 changes: 38 additions & 0 deletions code/crates/starknet/test/src/tests/vote_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ pub async fn crash_restart_from_latest() {
.restart_after(Duration::from_secs(5))
// Expect a vote set request for height 2
.expect_vote_set_request(2)
.crash()
// We do not reset the database so that the node can restart from the latest height
.restart_after(Duration::from_secs(5))
.wait_until(HEIGHT)
.success();

Expand Down Expand Up @@ -136,3 +139,38 @@ pub async fn start_late() {
)
.await
}

#[tokio::test]
pub async fn crash_restart_after_vote_set_request() {
init_logging(module_path!());

const HEIGHT: u64 = 3;

let mut test = TestBuilder::<()>::new();

test.add_node().start().wait_until(HEIGHT).success();
test.add_node()
.start()
.wait_until(2)
.crash()
// Restart from the latest height
.restart_after(Duration::from_secs(5))
// Wait for a vote set request for height 2
.expect_vote_set_request(2)
.crash()
// Restart again
.restart_after(Duration::from_secs(5))
.wait_until(HEIGHT)
.success();

test.build()
.run_with_custom_config(
Duration::from_secs(60),
TestParams {
enable_sync: true,
timeout_step: Duration::from_secs(5),
..Default::default()
},
)
.await
}