Skip to content

Commit

Permalink
chore(derive): cleanups (#91)
Browse files Browse the repository at this point in the history
ref sys cfg
  • Loading branch information
clabby authored Apr 5, 2024
1 parent d52e6cc commit f9041df
Show file tree
Hide file tree
Showing 21 changed files with 84 additions and 123 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ anyhow.workspace = true
alloy-primitives = { version = "0.7.0", default-features = false, features = ["rlp"] }
alloy-rlp = { version = "0.3.4", default-features = false, features = ["derive"] }
alloy-sol-types = { version = "0.7.0", default-features = false }
alloy-consensus = { git = "https://github.com/clabby/alloy", branch = "cl/alloy-consensus-no-std", default-features = false }
alloy-eips = { git = "https://github.com/clabby/alloy", branch = "cl/alloy-consensus-no-std", default-features = false }
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", default-features = false }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", default-features = false }
async-trait = "0.1.77"
hashbrown = "0.14.3"
unsigned-varint = "0.8.0"
Expand Down
30 changes: 30 additions & 0 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

extern crate alloc;

use alloc::sync::Arc;
use core::fmt::Debug;
use traits::{ChainProvider, TelemetryProvider};
use types::RollupConfig;

mod params;
pub use params::{
ChannelID, CHANNEL_ID_LENGTH, CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC,
Expand All @@ -21,3 +26,28 @@ pub mod types;
/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug, Clone, Copy)]
pub struct DerivationPipeline;

impl DerivationPipeline {
/// Creates a new instance of the [DerivationPipeline].
pub fn new<P, T>(
_rollup_config: Arc<RollupConfig>,
_chain_provider: P,
_telemetry: Arc<T>,
) -> Self
where
P: ChainProvider + Clone + Debug + Send,
T: TelemetryProvider + Clone + Debug + Send + Sync,
{
// let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone(),
// telemetry.clone()); let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source,
// telemetry.clone()); let frame_queue = FrameQueue::new(l1_retrieval,
// telemetry.clone()); let channel_bank = ChannelBank::new(rollup_config.clone(),
// frame_queue, telemetry.clone()); let channel_reader =
// ChannelReader::new(channel_bank, telemetry.clone()); let batch_queue =
// BatchQueue::new(rollup_config.clone(), channel_reader, telemetry.clone(), fetcher);
// let attributes_queue = AttributesQueue::new(rollup_config.clone(), batch_queue,
// telemetry.clone(), builder);

unimplemented!()
}
}
1 change: 1 addition & 0 deletions crates/derive/src/sources/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ where
(tx.to(), tx.input.clone(), Some(tx.blob_versioned_hashes.clone()))
}
},
_ => continue,
};
let TxKind::Call(to) = tx_kind else { continue };

Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/sources/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ where
B: BlobProvider + Clone + Debug,
{
/// Creates a new factory.
pub fn new(provider: C, blobs: B, cfg: RollupConfig) -> Self {
pub fn new(provider: C, blobs: B, cfg: &RollupConfig) -> Self {
Self {
chain_provider: provider,
blob_provider: blobs,
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where
T: TelemetryProvider + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> {
async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> {
self.telemetry.write(Bytes::from("resetting attributes queue"), LogLevel::Info);
// TODO: metrice the reset using telemetry
// telemetry can provide a method of logging and metricing
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ where
BF: SafeBlockFetcher + Send + Debug,
T: TelemetryProvider + Send + Debug + Sync,
{
async fn reset(&mut self, base: BlockInfo, _: SystemConfig) -> StageResult<()> {
async fn reset(&mut self, base: BlockInfo, _: &SystemConfig) -> StageResult<()> {
// Copy over the Origin from the next stage.
// It is set in the engine queue (two stages away)
// such that the L2 Safe Head origin is the progress.
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ where
P: ChannelBankProvider + OriginProvider + Send + Debug,
T: TelemetryProvider + Send + Sync + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> {
async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> {
self.channels.clear();
self.channel_queue = VecDeque::with_capacity(10);
Err(StageError::Eof)
Expand Down
27 changes: 13 additions & 14 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
//! This module contains the [FrameQueue] stage of the derivation pipeline.
use core::fmt::Debug;

use crate::{
stages::ChannelBankProvider,
traits::{LogLevel, OriginProvider, ResettableStage, TelemetryProvider},
types::{into_frames, BlockInfo, Frame, StageError, StageResult, SystemConfig},
};
use alloc::{boxed::Box, collections::VecDeque};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
use alloy_primitives::Bytes;
use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;

/// Provides data frames for the [FrameQueue] stage.
#[async_trait]
Expand All @@ -35,7 +34,7 @@ where
/// The previous stage in the pipeline.
pub prev: P,
/// Telemetry
pub telemetry: T,
pub telemetry: Arc<T>,
/// The current frame queue.
queue: VecDeque<Frame>,
}
Expand All @@ -46,7 +45,7 @@ where
T: TelemetryProvider + Debug,
{
/// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage.
pub fn new(prev: P, telemetry: T) -> Self {
pub fn new(prev: P, telemetry: Arc<T>) -> Self {
Self { prev, telemetry, queue: VecDeque::new() }
}
}
Expand All @@ -55,7 +54,7 @@ where
impl<P, T> ChannelBankProvider for FrameQueue<P, T>
where
P: FrameQueueProvider + OriginProvider + Send + Debug,
T: TelemetryProvider + Send + Debug,
T: TelemetryProvider + Send + Debug + Sync,
{
async fn next_frame(&mut self) -> StageResult<Frame> {
if self.queue.is_empty() {
Expand Down Expand Up @@ -107,9 +106,9 @@ where
impl<P, T> ResettableStage for FrameQueue<P, T>
where
P: FrameQueueProvider + OriginProvider + Send + Debug,
T: TelemetryProvider + Send + Debug,
T: TelemetryProvider + Send + Debug + Sync,
{
async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> {
async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> {
self.queue = VecDeque::default();
Err(StageError::Eof)
}
Expand Down Expand Up @@ -147,7 +146,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_empty_bytes() {
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let data = vec![Ok(Bytes::from(vec![0x00]))];
let mock = MockFrameQueueProvider { data };
let mut frame_queue = FrameQueue::new(mock, telemetry);
Expand All @@ -157,7 +156,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_no_frames_decoded() {
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let data = vec![Err(StageError::Eof), Ok(Bytes::default())];
let mock = MockFrameQueueProvider { data };
let mut frame_queue = FrameQueue::new(mock, telemetry);
Expand All @@ -167,7 +166,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_wrong_derivation_version() {
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let data = vec![Ok(Bytes::from(vec![0x01]))];
let mock = MockFrameQueueProvider { data };
let mut frame_queue = FrameQueue::new(mock, telemetry);
Expand All @@ -177,7 +176,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_frame_too_short() {
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let data = vec![Ok(Bytes::from(vec![0x00, 0x01]))];
let mock = MockFrameQueueProvider { data };
let mut frame_queue = FrameQueue::new(mock, telemetry);
Expand All @@ -188,7 +187,7 @@ pub(crate) mod tests {
#[tokio::test]
async fn test_frame_queue_single_frame() {
let data = new_encoded_test_frames(1);
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let mock = MockFrameQueueProvider { data: vec![Ok(data)] };
let mut frame_queue = FrameQueue::new(mock, telemetry);
let frame_decoded = frame_queue.next_frame().await.unwrap();
Expand All @@ -200,7 +199,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_multiple_frames() {
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let data = new_encoded_test_frames(3);
let mock = MockFrameQueueProvider { data: vec![Ok(data)] };
let mut frame_queue = FrameQueue::new(mock, telemetry);
Expand Down
20 changes: 10 additions & 10 deletions crates/derive/src/stages/l1_retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
},
types::{BlockInfo, StageError, StageResult, SystemConfig},
};
use alloc::boxed::Box;
use alloc::{boxed::Box, sync::Arc};
use alloy_primitives::Address;
use anyhow::anyhow;
use async_trait::async_trait;
Expand Down Expand Up @@ -40,7 +40,7 @@ where
/// The previous stage in the pipeline.
pub prev: P,
/// Telemetry provider for the L1 retrieval stage.
pub telemetry: T,
pub telemetry: Arc<T>,
/// The data availability provider to use for the L1 retrieval stage.
pub provider: DAP,
/// The current data iterator.
Expand All @@ -55,7 +55,7 @@ where
{
/// Creates a new [L1Retrieval] stage with the previous [L1Traversal]
/// stage and given [DataAvailabilityProvider].
pub fn new(prev: P, provider: DAP, telemetry: T) -> Self {
pub fn new(prev: P, provider: DAP, telemetry: Arc<T>) -> Self {
Self { prev, telemetry, provider, data: None }
}
}
Expand All @@ -65,7 +65,7 @@ impl<DAP, P, T> FrameQueueProvider for L1Retrieval<DAP, P, T>
where
DAP: DataAvailabilityProvider + Send,
P: L1RetrievalProvider + OriginProvider + Send,
T: TelemetryProvider + Send,
T: TelemetryProvider + Send + Sync,
{
type Item = DAP::Item;

Expand Down Expand Up @@ -106,9 +106,9 @@ impl<DAP, P, T> ResettableStage for L1Retrieval<DAP, P, T>
where
DAP: DataAvailabilityProvider + Send,
P: L1RetrievalProvider + OriginProvider + Send,
T: TelemetryProvider + Send,
T: TelemetryProvider + Send + Sync,
{
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> {
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> {
self.data = Some(self.provider.open_data(&base, cfg.batcher_addr).await?);
Ok(())
}
Expand All @@ -129,7 +129,7 @@ mod tests {
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![] };
let telemetry = TestTelemetry::new();
let retrieval = L1Retrieval::new(traversal, dap, telemetry);
let retrieval = L1Retrieval::new(traversal, dap, Arc::new(telemetry));
let expected = BlockInfo::default();
assert_eq!(retrieval.origin(), Some(&expected));
}
Expand All @@ -140,7 +140,7 @@ mod tests {
let results = vec![Err(StageError::Eof), Ok(Bytes::default())];
let dap = TestDAP { results };
let telemetry = TestTelemetry::new();
let mut retrieval = L1Retrieval::new(traversal, dap, telemetry);
let mut retrieval = L1Retrieval::new(traversal, dap, Arc::new(telemetry));
assert_eq!(retrieval.data, None);
let data = retrieval.next_data().await.unwrap();
assert_eq!(data, Bytes::default());
Expand All @@ -164,7 +164,7 @@ mod tests {
// Create a new traversal with no blocks or receipts.
// This would bubble up an error if the prev stage
// (traversal) is called in the retrieval stage.
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let traversal = new_test_traversal(vec![], vec![]);
let dap = TestDAP { results: vec![] };
let mut retrieval =
Expand All @@ -182,7 +182,7 @@ mod tests {
open_data_calls: vec![(BlockInfo::default(), Address::default())],
results: vec![Err(StageError::Eof)],
};
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![] };
let mut retrieval =
Expand Down
14 changes: 8 additions & 6 deletions crates/derive/src/stages/l1_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct L1Traversal<Provider: ChainProvider, Telemetry: TelemetryProvider> {
/// The data source for the traversal stage.
data_source: Provider,
/// The telemetry provider for the traversal stage.
telemetry: Telemetry,
telemetry: Arc<Telemetry>,
/// Signals whether or not the traversal stage is complete.
done: bool,
/// The system config.
Expand All @@ -49,7 +49,7 @@ impl<F: ChainProvider, T: TelemetryProvider> L1RetrievalProvider for L1Traversal

impl<F: ChainProvider, T: TelemetryProvider> L1Traversal<F, T> {
/// Creates a new [L1Traversal] instance.
pub fn new(data_source: F, cfg: Arc<RollupConfig>, telemetry: T) -> Self {
pub fn new(data_source: F, cfg: Arc<RollupConfig>, telemetry: Arc<T>) -> Self {
Self {
block: Some(BlockInfo::default()),
data_source,
Expand Down Expand Up @@ -118,11 +118,13 @@ impl<F: ChainProvider, T: TelemetryProvider> OriginProvider for L1Traversal<F, T
}

#[async_trait]
impl<F: ChainProvider + Send, T: TelemetryProvider + Send> ResettableStage for L1Traversal<F, T> {
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> {
impl<F: ChainProvider + Send, T: TelemetryProvider + Send + Sync> ResettableStage
for L1Traversal<F, T>
{
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> {
self.block = Some(base);
self.done = false;
self.system_config = cfg;
self.system_config = *cfg;
Err(StageError::Eof)
}
}
Expand Down Expand Up @@ -173,7 +175,7 @@ pub(crate) mod tests {
receipts: alloc::vec::Vec<Receipt>,
) -> L1Traversal<TestChainProvider, TestTelemetry> {
let mut provider = TestChainProvider::default();
let telemetry = TestTelemetry::default();
let telemetry = Arc::new(TestTelemetry::default());
let rollup_config = RollupConfig {
l1_system_config_address: L1_SYS_CONFIG_ADDR,
..RollupConfig::default()
Expand Down
1 change: 1 addition & 0 deletions crates/derive/src/traits/ecrecover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl SignedRecoverable for TxEnvelope {
TxEnvelope::Eip4844(signed_tx) => {
recover_public_key(*signed_tx.signature(), &signed_tx.signature_hash())
}
_ => unreachable!("Impossible case"),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/traits/stages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ use async_trait::async_trait;
#[async_trait]
pub trait ResettableStage {
/// Resets the derivation stage to its initial state.
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()>;
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()>;
}
2 changes: 1 addition & 1 deletion crates/derive/src/types/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use validity::BatchValidity;

mod span_batch;
pub use span_batch::{
RawSpanBatch, SpanBatch, SpanBatchBits, SpanBatchBuilder, SpanBatchEip1559TransactionData,
RawSpanBatch, SpanBatch, SpanBatchBits, SpanBatchEip1559TransactionData,
SpanBatchEip2930TransactionData, SpanBatchElement, SpanBatchError,
SpanBatchLegacyTransactionData, SpanBatchPayload, SpanBatchPrefix, SpanBatchTransactionData,
SpanBatchTransactions, SpanDecodingError, MAX_SPAN_BATCH_SIZE,
Expand Down
Loading

0 comments on commit f9041df

Please sign in to comment.