Skip to content

Commit

Permalink
Merge pull request #50 from ethereum-optimism/refcell/data-sources
Browse files Browse the repository at this point in the history
feat(derive): Data Sources
  • Loading branch information
refcell authored Apr 4, 2024
2 parents e7a92f0 + da6a4ab commit 341bd27
Show file tree
Hide file tree
Showing 17 changed files with 826 additions and 29 deletions.
1 change: 1 addition & 0 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use params::{
MAX_RLP_BYTES_PER_CHANNEL, MAX_SPAN_BATCH_BYTES,
};

pub mod sources;
pub mod stages;
pub mod traits;
pub mod types;
Expand Down
176 changes: 176 additions & 0 deletions crates/derive/src/sources/blobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
//! Blob Data Source
use crate::{
traits::{AsyncIterator, BlobProvider, ChainProvider},
types::{BlobData, BlockInfo, IndexedBlobHash, StageError, StageResult, TxEnvelope, TxType},
};
use alloc::{boxed::Box, vec::Vec};
use alloy_primitives::{Address, Bytes};
use anyhow::Result;
use async_trait::async_trait;

/// A data iterator that reads from a blob.
#[derive(Debug, Clone)]
pub struct BlobSource<F, B>
where
F: ChainProvider + Send,
B: BlobProvider + Send,
{
/// Chain provider.
chain_provider: F,
/// Fetches blobs.
blob_fetcher: B,
/// The address of the batcher contract.
batcher_address: Address,
/// Block Ref
block_ref: BlockInfo,
/// The L1 Signer.
signer: Address,
/// Data.
data: Vec<BlobData>,
/// Whether the source is open.
open: bool,
}

impl<F, B> BlobSource<F, B>
where
F: ChainProvider + Send,
B: BlobProvider + Send,
{
/// Creates a new blob source.
pub fn new(
chain_provider: F,
blob_fetcher: B,
batcher_address: Address,
block_ref: BlockInfo,
signer: Address,
) -> Self {
Self {
chain_provider,
blob_fetcher,
batcher_address,
block_ref,
signer,
data: Vec::new(),
open: false,
}
}

fn extract_blob_data(&self, txs: Vec<TxEnvelope>) -> (Vec<BlobData>, Vec<IndexedBlobHash>) {
let mut index = 0;
let mut data = Vec::new();
let mut hashes = Vec::new();
for tx in txs {
if tx.to() != Some(self.batcher_address) {
index += tx.blob_hashes().map_or(0, |h| h.len());
continue;
}
if tx.from() != Some(self.signer) {
index += tx.blob_hashes().map_or(0, |h| h.len());
continue;
}
if tx.tx_type() != TxType::Eip4844 {
let calldata = tx.data().clone();
let blob_data = BlobData { data: None, calldata: Some(calldata) };
data.push(blob_data);
continue;
}
if !tx.data().is_empty() {
// TODO(refcell): Add a warning log here if the blob data is not empty
// https://github.com/ethereum-optimism/optimism/blob/develop/op-node/rollup/derive/blob_data_source.go#L136
}
let blob_hashes = if let Some(b) = tx.blob_hashes() {
b
} else {
continue;
};
for blob in blob_hashes {
let indexed = IndexedBlobHash { hash: blob, index };
hashes.push(indexed);
data.push(BlobData::default());
index += 1;
}
}
(data, hashes)
}

/// Loads blob data into the source if it is not open.
async fn load_blobs(&mut self) -> Result<()> {
if self.open {
return Ok(());
}

let info =
self.chain_provider.block_info_and_transactions_by_hash(self.block_ref.hash).await?;

let (mut data, blob_hashes) = self.extract_blob_data(info.1);

// If there are no hashes, set the calldata and return.
if blob_hashes.is_empty() {
self.open = true;
self.data = data;
return Ok(());
}

let blobs = self.blob_fetcher.get_blobs(&self.block_ref, blob_hashes).await?;

// Fill the blob pointers.
let mut blob_index = 0;
for blob in data.iter_mut() {
match blob.fill(&blobs, blob_index) {
Ok(_) => {
blob_index += 1;
}
Err(e) => {
return Err(e);
}
}
}

self.open = true;
self.data = data;
Ok(())
}

/// Extracts the next data from the source.
fn next_data(&mut self) -> Result<BlobData, Option<Result<Bytes, StageError>>> {
if self.data.is_empty() {
return Err(Some(Err(StageError::Eof)));
}

Ok(self.data.remove(0))
}
}

#[async_trait]
impl<F, B> AsyncIterator for BlobSource<F, B>
where
F: ChainProvider + Send,
B: BlobProvider + Send,
{
type Item = Bytes;

async fn next(&mut self) -> Option<StageResult<Self::Item>> {
if self.load_blobs().await.is_err() {
return Some(Err(StageError::BlockFetch(self.block_ref.hash)));
}

let next_data = match self.next_data() {
Ok(d) => d,
Err(e) => return e,
};
if next_data.calldata.is_some() {
return Some(Ok(next_data.calldata.unwrap()));
}

// Decode the blob data to raw bytes.
// Otherwise, ignore blob and recurse next.
match next_data.decode() {
Ok(d) => Some(Ok(d)),
Err(_) => {
// TODO(refcell): Add a warning log here if decoding fails
self.next().await
}
}
}
}
87 changes: 87 additions & 0 deletions crates/derive/src/sources/calldata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//! CallData Source
use crate::{
traits::{AsyncIterator, ChainProvider},
types::{BlockInfo, StageError, StageResult},
};
use alloc::{boxed::Box, collections::VecDeque};
use alloy_primitives::{Address, Bytes};
use async_trait::async_trait;

/// A data iterator that reads from calldata.
#[derive(Debug, Clone)]
pub struct CalldataSource<CP>
where
CP: ChainProvider + Send,
{
/// The chain provider to use for the calldata source.
chain_provider: CP,
/// The address of the batcher contract.
batcher_address: Address,
/// Block Ref
block_ref: BlockInfo,
/// The L1 Signer.
signer: Address,
/// Current calldata.
calldata: VecDeque<Bytes>,
/// Whether the calldata source is open.
open: bool,
}

impl<CP: ChainProvider + Send> CalldataSource<CP> {
/// Creates a new calldata source.
pub fn new(
chain_provider: CP,
batcher_address: Address,
block_ref: BlockInfo,
signer: Address,
) -> Self {
Self {
chain_provider,
batcher_address,
block_ref,
signer,
calldata: VecDeque::new(),
open: false,
}
}

/// Loads the calldata into the source if it is not open.
async fn load_calldata(&mut self) -> anyhow::Result<()> {
if self.open {
return Ok(());
}

let (_, txs) =
self.chain_provider.block_info_and_transactions_by_hash(self.block_ref.hash).await?;

self.calldata = txs
.iter()
.filter_map(|tx| {
if tx.to() != Some(self.batcher_address) {
return None;
}
if tx.from() != Some(self.signer) {
return None;
}
Some(tx.data())
})
.collect::<VecDeque<_>>();

self.open = true;

Ok(())
}
}

#[async_trait]
impl<CP: ChainProvider + Send> AsyncIterator for CalldataSource<CP> {
type Item = Bytes;

async fn next(&mut self) -> Option<StageResult<Self::Item>> {
if self.load_calldata().await.is_err() {
return Some(Err(StageError::BlockFetch(self.block_ref.hash)));
}
Some(self.calldata.pop_front().ok_or(StageError::Eof))
}
}
89 changes: 89 additions & 0 deletions crates/derive/src/sources/factory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! Contains a Factory for creating a calldata and blob provider.
use crate::{
sources::{BlobSource, CalldataSource, DataSource, PlasmaSource},
traits::{BlobProvider, ChainProvider, DataAvailabilityProvider},
types::{BlockInfo, RollupConfig},
};
use alloc::{boxed::Box, fmt::Debug};
use alloy_primitives::{Address, Bytes};
use anyhow::{anyhow, Result};
use async_trait::async_trait;

/// A factory for creating a calldata and blob provider.
#[derive(Debug, Clone, Copy)]
pub struct DataSourceFactory<C, B>
where
C: ChainProvider + Clone,
B: BlobProvider + Clone,
{
/// The chain provider to use for the factory.
pub chain_provider: C,
/// The blob provider
pub blob_provider: B,
/// The ecotone timestamp.
pub ecotone_timestamp: Option<u64>,
/// Whether or not plasma is enabled.
pub plasma_enabled: bool,
/// The L1 Signer.
pub signer: Address,
}

impl<C, B> DataSourceFactory<C, B>
where
C: ChainProvider + Clone + Debug,
B: BlobProvider + Clone + Debug,
{
/// Creates a new factory.
pub fn new(provider: C, blobs: B, cfg: RollupConfig) -> Self {
Self {
chain_provider: provider,
blob_provider: blobs,
ecotone_timestamp: cfg.ecotone_time,
plasma_enabled: cfg.is_plasma_enabled(),
signer: cfg.l1_signer_address(),
}
}
}

#[async_trait]
impl<C, B> DataAvailabilityProvider for DataSourceFactory<C, B>
where
C: ChainProvider + Send + Sync + Clone + Debug,
B: BlobProvider + Send + Sync + Clone + Debug,
{
type Item = Bytes;
type DataIter = DataSource<C, B>;

async fn open_data(
&self,
block_ref: &BlockInfo,
batcher_address: Address,
) -> Result<Self::DataIter> {
if let Some(ecotone) = self.ecotone_timestamp {
let source = (block_ref.timestamp >= ecotone)
.then(|| {
DataSource::Blob(BlobSource::new(
self.chain_provider.clone(),
self.blob_provider.clone(),
batcher_address,
*block_ref,
self.signer,
))
})
.unwrap_or_else(|| {
DataSource::Calldata(CalldataSource::new(
self.chain_provider.clone(),
batcher_address,
*block_ref,
self.signer,
))
});
Ok(source)
} else if self.plasma_enabled {
Ok(DataSource::Plasma(PlasmaSource::new(self.chain_provider.clone())))
} else {
Err(anyhow!("No data source available"))
}
}
}
16 changes: 16 additions & 0 deletions crates/derive/src/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//! This module contains data source impelmentations.
mod factory;
pub use factory::DataSourceFactory;

mod blobs;
pub use blobs::BlobSource;

mod calldata;
pub use calldata::CalldataSource;

mod plasma;
pub use plasma::PlasmaSource;

mod source;
pub use source::DataSource;
Loading

0 comments on commit 341bd27

Please sign in to comment.