Skip to content

Commit

Permalink
chore(derive): L1Traversal Doc and Test Cleanup (#79)
Browse files Browse the repository at this point in the history
* fix(derive): clean up the L1Traversal doc comments

* fix: traversal stage test for reorgs

* fix(derive): make block fetch error concrete

* fix(derive): system config update fails
  • Loading branch information
refcell authored Apr 4, 2024
1 parent 477051c commit 869d485
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 65 deletions.
9 changes: 4 additions & 5 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,15 @@ mod tests {
use super::*;
use crate::{
stages::{
frame_queue::tests::new_test_frames, l1_retrieval::L1Retrieval,
l1_traversal::tests::new_test_traversal,
frame_queue::tests::new_test_frames, l1_retrieval::L1Retrieval, l1_traversal::tests::*,
},
traits::test_utils::TestDAP,
};
use alloc::vec;

#[test]
fn test_ingest_empty_origin() {
let mut traversal = new_test_traversal(false, false);
let mut traversal = new_test_traversal(vec![], vec![]);
traversal.block = None;
let dap = TestDAP::default();
let retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -218,7 +217,7 @@ mod tests {

#[test]
fn test_ingest_and_prune_channel_bank() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x00]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand Down Expand Up @@ -246,7 +245,7 @@ mod tests {

#[tokio::test]
async fn test_read_empty_channel_bank() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x00]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand Down
14 changes: 7 additions & 7 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ where
pub(crate) mod tests {
use super::*;
use crate::{
stages::l1_traversal::tests::new_test_traversal, traits::test_utils::TestDAP,
stages::l1_traversal::tests::new_populated_test_traversal, traits::test_utils::TestDAP,
DERIVATION_VERSION_0,
};
use alloc::{vec, vec::Vec};
Expand Down Expand Up @@ -114,7 +114,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_empty_bytes() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x00]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -125,7 +125,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_no_frames_decoded() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Err(StageError::Eof), Ok(Bytes::default())];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -136,7 +136,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_wrong_derivation_version() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x01]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -147,7 +147,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_frame_too_short() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x00, 0x01]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -159,7 +159,7 @@ pub(crate) mod tests {
#[tokio::test]
async fn test_frame_queue_single_frame() {
let data = new_encoded_test_frames(1);
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![Ok(data)] };
let retrieval = L1Retrieval::new(traversal, dap);
let mut frame_queue = FrameQueue::new(retrieval);
Expand All @@ -173,7 +173,7 @@ pub(crate) mod tests {
#[tokio::test]
async fn test_frame_queue_multiple_frames() {
let data = new_encoded_test_frames(3);
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![Ok(data)] };
let retrieval = L1Retrieval::new(traversal, dap);
let mut frame_queue = FrameQueue::new(retrieval);
Expand Down
10 changes: 5 additions & 5 deletions crates/derive/src/stages/l1_retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ where
mod tests {
use super::*;
use crate::{
stages::l1_traversal::tests::new_test_traversal,
stages::l1_traversal::tests::*,
traits::test_utils::{TestDAP, TestIter},
};
use alloc::vec;
use alloy_primitives::Address;

#[tokio::test]
async fn test_l1_retrieval_origin() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![] };
let retrieval = L1Retrieval::new(traversal, dap);
let expected = BlockInfo::default();
Expand All @@ -103,7 +103,7 @@ mod tests {

#[tokio::test]
async fn test_l1_retrieval_next_data() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Err(StageError::Eof), Ok(Bytes::default())];
let dap = TestDAP { results };
let mut retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -130,7 +130,7 @@ mod tests {
// Create a new traversal with no blocks or receipts.
// This would bubble up an error if the prev stage
// (traversal) is called in the retrieval stage.
let traversal = new_test_traversal(false, false);
let traversal = new_test_traversal(vec![], vec![]);
let dap = TestDAP { results: vec![] };
let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) };
let data = retrieval.next_data().await.unwrap();
Expand All @@ -146,7 +146,7 @@ mod tests {
open_data_calls: vec![(BlockInfo::default(), Address::default())],
results: vec![Err(StageError::Eof)],
};
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![] };
let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) };
let data = retrieval.next_data().await.unwrap_err();
Expand Down
149 changes: 101 additions & 48 deletions crates/derive/src/stages/l1_traversal.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
//! Contains the L1 traversal stage of the derivation pipeline.
//! Contains the [L1Traversal] stage of the derivation pipeline.
use crate::{
traits::{ChainProvider, ResettableStage},
types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig},
};
use alloc::{boxed::Box, sync::Arc};
use anyhow::anyhow;
use async_trait::async_trait;

/// The L1 traversal stage of the derivation pipeline.
/// The [L1Traversal] stage of the derivation pipeline.
///
/// This stage sits at the bottom of the pipeline, holding a handle to the data source
/// (a [ChainProvider] implementation) and the current L1 [BlockInfo] in the pipeline,
/// which are used to traverse the L1 chain. When the [L1Traversal] stage is advanced,
/// it fetches the next L1 [BlockInfo] from the data source and updates the [SystemConfig]
/// with the receipts from the block.
#[derive(Debug, Clone)]
pub struct L1Traversal<Provider: ChainProvider> {
/// The current block in the traversal stage.
pub(crate) block: Option<BlockInfo>,
/// The data source for the traversal stage.
data_source: Provider,
/// Signals whether or not the traversal stage has been completed.
/// Signals whether or not the traversal stage is complete.
done: bool,
/// The system config
/// The system config.
pub system_config: SystemConfig,
/// The rollup config
/// A reference to the rollup config.
pub rollup_config: Arc<RollupConfig>,
}

Expand All @@ -40,9 +45,10 @@ impl<F: ChainProvider> L1Traversal<F> {
&self.data_source
}

/// Returns the next L1 block in the traversal stage, if the stage has not been completed.
/// This function can only be called once, and will return `None` on subsequent calls
/// unless the stage is reset.
/// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete.
/// This function can only be called once while the stage is in progress, and will return
/// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is
/// complete and the [BlockInfo] has been consumed, an [StageError::Eof] error is returned.
pub fn next_l1_block(&mut self) -> StageResult<Option<BlockInfo>> {
if !self.done {
self.done = true;
Expand All @@ -52,35 +58,41 @@ impl<F: ChainProvider> L1Traversal<F> {
}
}

/// Returns the current L1 block in the traversal stage, if it exists.
/// Returns the current L1 [BlockInfo] in the [L1Traversal] stage, if it exists.
pub fn origin(&self) -> Option<&BlockInfo> {
self.block.as_ref()
}

/// Advances the internal state of the [L1Traversal] stage to the next L1 block.
/// This function fetches the next L1 [BlockInfo] from the data source and updates the
/// [SystemConfig] with the receipts from the block.
pub async fn advance_l1_block(&mut self) -> StageResult<()> {
// Pull the next block or return EOF which has special
// handling further up the pipeline.
// Pull the next block or return EOF.
// StageError::EOF has special handling further up the pipeline.
let block = self.block.ok_or(StageError::Eof)?;
let next_l1_origin = self.data_source.block_info_by_number(block.number + 1).await?;
let next_l1_origin = match self.data_source.block_info_by_number(block.number + 1).await {
Ok(block) => block,
Err(e) => return Err(StageError::BlockInfoFetch(e)),
};

// Check for reorgs
// Check block hashes for reorgs.
if block.hash != next_l1_origin.parent_hash {
return Err(anyhow!(
"Detected L1 reorg from {} to {} with conflicting parent",
block.hash,
next_l1_origin.hash
)
.into());
return Err(StageError::ReorgDetected(block.hash, next_l1_origin.parent_hash));
}

// Fetch receipts.
let receipts = self.data_source.receipts_by_hash(next_l1_origin.hash).await?;
self.system_config.update_with_receipts(
// Fetch receipts for the next l1 block and update the system config.
let receipts = match self.data_source.receipts_by_hash(next_l1_origin.hash).await {
Ok(receipts) => receipts,
Err(e) => return Err(StageError::ReceiptFetch(e)),
};

if let Err(e) = self.system_config.update_with_receipts(
receipts.as_slice(),
&self.rollup_config,
next_l1_origin.timestamp,
)?;
) {
return Err(StageError::SystemConfigUpdate(e));
}

self.block = Some(next_l1_origin);
self.done = false;
Expand Down Expand Up @@ -126,62 +138,103 @@ pub(crate) mod tests {
}
}

pub(crate) fn new_receipts() -> alloc::vec::Vec<Receipt> {
let mut receipt = Receipt { success: true, ..Receipt::default() };
let bad = Log::new(
Address::from([2; 20]),
vec![CONFIG_UPDATE_TOPIC, B256::default()],
Bytes::default(),
)
.unwrap();
receipt.logs = vec![new_update_batcher_log(), bad, new_update_batcher_log()];
vec![receipt.clone(), Receipt::default(), receipt]
}

pub(crate) fn new_test_traversal(
blocks: bool,
receipts: bool,
blocks: alloc::vec::Vec<BlockInfo>,
receipts: alloc::vec::Vec<Receipt>,
) -> L1Traversal<TestChainProvider> {
let mut provider = TestChainProvider::default();
let rollup_config = RollupConfig {
l1_system_config_address: L1_SYS_CONFIG_ADDR,
..RollupConfig::default()
};
let block = BlockInfo::default();
if blocks {
provider.insert_block(0, block);
provider.insert_block(1, block);
for (i, block) in blocks.iter().enumerate() {
provider.insert_block(i as u64, *block);
}
if receipts {
let mut receipt = Receipt { success: true, ..Receipt::default() };
let bad = Log::new(
Address::from([2; 20]),
vec![CONFIG_UPDATE_TOPIC, B256::default()],
Bytes::default(),
)
.unwrap();
receipt.logs = vec![new_update_batcher_log(), bad, new_update_batcher_log()];
let receipts = vec![receipt.clone(), Receipt::default(), receipt];
provider.insert_receipts(block.hash, receipts);
for (i, receipt) in receipts.iter().enumerate() {
let hash = blocks.get(i).map(|b| b.hash).unwrap_or_default();
provider.insert_receipts(hash, vec![receipt.clone()]);
}
L1Traversal::new(provider, Arc::new(rollup_config))
}

pub(crate) fn new_populated_test_traversal() -> L1Traversal<TestChainProvider> {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
new_test_traversal(blocks, receipts)
}

#[tokio::test]
async fn test_l1_traversal() {
let mut traversal = new_test_traversal(true, true);
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
let mut traversal = new_test_traversal(blocks, receipts);
assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof);
assert!(traversal.advance_l1_block().await.is_ok());
}

#[tokio::test]
async fn test_l1_traversal_missing_receipts() {
let mut traversal = new_test_traversal(true, false);
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let mut traversal = new_test_traversal(blocks, vec![]);
assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof);
matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::Custom(_));
matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::ReceiptFetch(_));
}

#[tokio::test]
async fn test_l1_traversal_reorgs() {
let hash = b256!("3333333333333333333333333333333333333333333333333333333333333333");
let block = BlockInfo { hash, ..BlockInfo::default() };
let blocks = vec![block, block];
let receipts = new_receipts();
let mut traversal = new_test_traversal(blocks, receipts);
assert!(traversal.advance_l1_block().await.is_ok());
let err = traversal.advance_l1_block().await.unwrap_err();
assert_eq!(err, StageError::ReorgDetected(block.hash, block.parent_hash));
}

#[tokio::test]
async fn test_l1_traversal_missing_blocks() {
let mut traversal = new_test_traversal(false, false);
let mut traversal = new_test_traversal(vec![], vec![]);
assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof);
matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::Custom(_));
matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::BlockInfoFetch(_));
}

#[tokio::test]
async fn test_l1_traversal_system_config_update_fails() {
let first = b256!("3333333333333333333333333333333333333333333333333333333333333333");
let second = b256!("4444444444444444444444444444444444444444444444444444444444444444");
let block1 = BlockInfo { hash: first, ..BlockInfo::default() };
let block2 = BlockInfo { hash: second, ..BlockInfo::default() };
let blocks = vec![block1, block2];
let receipts = new_receipts();
let mut traversal = new_test_traversal(blocks, receipts);
assert!(traversal.advance_l1_block().await.is_ok());
// Only the second block should fail since the second receipt
// contains invalid logs that will error for a system config update.
let err = traversal.advance_l1_block().await.unwrap_err();
matches!(err, StageError::SystemConfigUpdate(_));
}

#[tokio::test]
async fn test_system_config_updated() {
let mut traversal = new_test_traversal(true, true);
async fn test_l1_traversal_system_config_updated() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
let mut traversal = new_test_traversal(blocks, receipts);
assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof);
assert!(traversal.advance_l1_block().await.is_ok());
Expand Down
Loading

0 comments on commit 869d485

Please sign in to comment.