Skip to content

Commit

Permalink
fix(derive): decouple stages using provider traits
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Apr 4, 2024
1 parent db7f575 commit e5992af
Show file tree
Hide file tree
Showing 7 changed files with 416 additions and 368 deletions.
49 changes: 28 additions & 21 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
//! This module contains the `BatchQueue` stage implementation.
use crate::{
stages::channel_reader::ChannelReader,
traits::{
ChainProvider, DataAvailabilityProvider, OriginProvider, ResettableStage, SafeBlockFetcher,
TelemetryProvider,
},
traits::{LogLevel, OriginProvider, ResettableStage, SafeBlockFetcher, TelemetryProvider},
types::{
Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig,
SingleBatch, StageError, StageResult, SystemConfig,
},
};
use alloc::{boxed::Box, vec::Vec};
use alloy_primitives::Bytes;
use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;

/// Provides batches for the [BatchQueue] stage.
#[async_trait]
pub trait BatchQueueProvider {
/// Returns the next batch in the [ChannelReader] stage, if the stage is not complete.
/// This function can only be called once while the stage is in progress, and will return
/// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is
/// complete and the batch has been consumed, an [StageError::Eof] error is returned.
async fn next_batch(&mut self) -> StageResult<Batch>;
}

/// [BatchQueue] is responsible for o rdering unordered batches
/// and gnerating empty batches when the sequence window has passed.
///
Expand All @@ -31,20 +38,22 @@ use core::fmt::Debug;
/// It is internally responsible for making sure that batches with L1 inclusions block outside it's
/// working range are not considered or pruned.
#[derive(Debug)]
pub struct BatchQueue<DAP, CP, BF, T>
pub struct BatchQueue<P, BF, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
P: BatchQueueProvider + OriginProvider + Debug,
BF: SafeBlockFetcher + Debug,
T: TelemetryProvider + Debug,
{
/// The rollup config.
cfg: RollupConfig,
/// The previous stage of the derivation pipeline.
prev: ChannelReader<DAP, CP, T>,
prev: P,
/// The l1 block ref
origin: Option<BlockInfo>,

/// Telemetry
telemetry: T,

/// A consecutive, time-centric window of L1 Blocks.
/// Every L1 origin of unsafe L2 Blocks must be included in this list.
/// If every L2 Block corresponding to a single L1 Block becomes safe,
Expand All @@ -63,19 +72,19 @@ where
fetcher: BF,
}

impl<DAP, CP, BF, T> BatchQueue<DAP, CP, BF, T>
impl<P, BF, T> BatchQueue<P, BF, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
P: BatchQueueProvider + OriginProvider + Debug,
BF: SafeBlockFetcher + Debug,
T: TelemetryProvider + Debug,
{
/// Creates a new [BatchQueue] stage.
pub fn new(cfg: RollupConfig, prev: ChannelReader<DAP, CP, T>, fetcher: BF) -> Self {
pub fn new(cfg: RollupConfig, prev: P, telemetry: T, fetcher: BF) -> Self {
Self {
cfg,
prev,
origin: None,
telemetry,
l1_blocks: Vec::new(),
batches: Vec::new(),
next_spans: Vec::new(),
Expand Down Expand Up @@ -128,8 +137,8 @@ where
for (i, block) in self.l1_blocks.iter().enumerate() {
if parent.l1_origin.number == block.number {
self.l1_blocks.drain(0..i);
// TODO: log that the pipelien has advanced the epoch.
// TODO: metrice the internal epoch advancement.
self.telemetry
.write(Bytes::from("Advancing internal L1 blocks"), LogLevel::Info);
break;
}
}
Expand Down Expand Up @@ -348,10 +357,9 @@ where
}
}

impl<DAP, CP, BF, T> OriginProvider for BatchQueue<DAP, CP, BF, T>
impl<P, BF, T> OriginProvider for BatchQueue<P, BF, T>
where
DAP: DataAvailabilityProvider + Debug,
CP: ChainProvider + Debug,
P: BatchQueueProvider + OriginProvider + Debug,
BF: SafeBlockFetcher + Debug,
T: TelemetryProvider + Debug,
{
Expand All @@ -361,10 +369,9 @@ where
}

#[async_trait]
impl<DAP, CP, BF, T> ResettableStage for BatchQueue<DAP, CP, BF, T>
impl<P, BF, T> ResettableStage for BatchQueue<P, BF, T>
where
DAP: DataAvailabilityProvider + Send + Debug,
CP: ChainProvider + Send + Debug,
P: BatchQueueProvider + OriginProvider + Send + Debug,
BF: SafeBlockFetcher + Send + Debug,
T: TelemetryProvider + Send + Debug + Sync,
{
Expand Down
Loading

0 comments on commit e5992af

Please sign in to comment.