Skip to content

Commit

Permalink
feat(derive): Add caching to providers
Browse files Browse the repository at this point in the history
  • Loading branch information
clabby committed Apr 6, 2024
1 parent 7b0b27b commit af478b5
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 50 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async-trait = "0.1.77"
hashbrown = "0.14.3"
unsigned-varint = "0.8.0"
miniz_oxide = { version = "0.7.2" }
lru = "0.12.3"

# `serde` feature dependencies
serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true }
Expand All @@ -37,7 +38,7 @@ proptest = "1.4.0"
spin = { version = "0.9.8", features = ["mutex"] } # Spin is used for testing synchronization primitives

[features]
default = ["serde", "k256", "alloy-providers"]
default = ["serde", "k256"]
serde = ["dep:serde", "alloy-primitives/serde"]
k256 = ["alloy-primitives/k256", "alloy-consensus/k256"]
alloy-providers = [
Expand Down
86 changes: 73 additions & 13 deletions crates/derive/src/alloy_providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ use alloy_rlp::{Buf, Decodable};
use alloy_transport_http::Http;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use core::num::NonZeroUsize;
use lru::LruCache;

const CACHE_SIZE: usize = 16;

/// The [AlloyChainProvider] is a concrete implementation of the [ChainProvider] trait, providing
/// data over Ethereum JSON-RPC using an alloy provider as the backend.
Expand All @@ -22,19 +26,37 @@ use async_trait::async_trait;
/// `debug_getRawBlock` methods. The RPC must support this namespace.
#[derive(Debug)]
pub struct AlloyChainProvider<T: Provider<Http<reqwest::Client>>> {
/// The inner Ethereum JSON-RPC provider.
inner: T,
/// `block_info_by_number` LRU cache.
block_info_by_number_cache: LruCache<u64, BlockInfo>,
/// `block_info_by_number` LRU cache.
receipts_by_hash_cache: LruCache<B256, Vec<Receipt>>,
/// `block_info_and_transactions_by_hash` LRU cache.
block_info_and_transactions_by_hash_cache: LruCache<B256, (BlockInfo, Vec<TxEnvelope>)>,
}

impl<T: Provider<Http<reqwest::Client>>> AlloyChainProvider<T> {
/// Creates a new [AlloyChainProvider] with the given alloy provider.
pub fn new(inner: T) -> Self {
Self { inner }
Self {
inner,
block_info_by_number_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
receipts_by_hash_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
block_info_and_transactions_by_hash_cache: LruCache::new(
NonZeroUsize::new(CACHE_SIZE).unwrap(),
),
}
}
}

#[async_trait]
impl<T: Provider<Http<reqwest::Client>>> ChainProvider for AlloyChainProvider<T> {
async fn block_info_by_number(&self, number: u64) -> Result<BlockInfo> {
async fn block_info_by_number(&mut self, number: u64) -> Result<BlockInfo> {
if let Some(block_info) = self.block_info_by_number_cache.get(&number) {
return Ok(*block_info);
}

let raw_header: Bytes = self
.inner
.client()
Expand All @@ -43,23 +65,29 @@ impl<T: Provider<Http<reqwest::Client>>> ChainProvider for AlloyChainProvider<T>
.map_err(|e| anyhow!(e))?;
let header = Header::decode(&mut raw_header.as_ref()).map_err(|e| anyhow!(e))?;

Ok(BlockInfo {
let block_info = BlockInfo {
hash: header.hash_slow(),
number,
parent_hash: header.parent_hash,
timestamp: header.timestamp,
})
};
self.block_info_by_number_cache.put(number, block_info);
Ok(block_info)
}

async fn receipts_by_hash(&self, hash: B256) -> Result<Vec<Receipt>> {
async fn receipts_by_hash(&mut self, hash: B256) -> Result<Vec<Receipt>> {
if let Some(receipts) = self.receipts_by_hash_cache.get(&hash) {
return Ok(receipts.clone());
}

let raw_receipts: Vec<Bytes> = self
.inner
.client()
.request("debug_getRawReceipts", [hash])
.await
.map_err(|e| anyhow!(e))?;

raw_receipts
let receipts = raw_receipts
.iter()
.map(|r| {
let r = &mut r.as_ref();
Expand All @@ -71,13 +99,20 @@ impl<T: Provider<Http<reqwest::Client>>> ChainProvider for AlloyChainProvider<T>

Ok(ReceiptWithBloom::decode(r).map_err(|e| anyhow!(e))?.receipt)
})
.collect::<Result<Vec<_>>>()
.collect::<Result<Vec<_>>>()?;
self.receipts_by_hash_cache.put(hash, receipts.clone());
Ok(receipts)
}

async fn block_info_and_transactions_by_hash(
&self,
&mut self,
hash: B256,
) -> Result<(BlockInfo, Vec<TxEnvelope>)> {
if let Some(block_info_and_txs) = self.block_info_and_transactions_by_hash_cache.get(&hash)
{
return Ok(block_info_and_txs.clone());
}

let raw_block: Bytes = self
.inner
.client()
Expand All @@ -92,6 +127,7 @@ impl<T: Provider<Http<reqwest::Client>>> ChainProvider for AlloyChainProvider<T>
parent_hash: block.header.parent_hash,
timestamp: block.header.timestamp,
};
self.block_info_and_transactions_by_hash_cache.put(hash, (block_info, block.body.clone()));
Ok((block_info, block.body))
}
}
Expand All @@ -104,32 +140,56 @@ impl<T: Provider<Http<reqwest::Client>>> ChainProvider for AlloyChainProvider<T>
/// namespace.
#[derive(Debug)]
pub struct AlloyL2SafeHeadProvider<T: Provider<Http<reqwest::Client>>> {
/// The inner Ethereum JSON-RPC provider.
inner: T,
/// The rollup configuration.
rollup_config: Arc<RollupConfig>,
/// `payload_by_number` LRU cache.
payload_by_number_cache: LruCache<u64, ExecutionPayloadEnvelope>,
/// `l2_block_info_by_number` LRU cache.
l2_block_info_by_number_cache: LruCache<u64, L2BlockInfo>,
}

impl<T: Provider<Http<reqwest::Client>>> AlloyL2SafeHeadProvider<T> {
/// Creates a new [AlloyL2SafeHeadProvider] with the given alloy provider and [RollupConfig].
pub fn new(inner: T, rollup_config: Arc<RollupConfig>) -> Self {
Self { inner, rollup_config }
Self {
inner,
rollup_config,
payload_by_number_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
l2_block_info_by_number_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
}
}
}

#[async_trait]
impl<T: Provider<Http<reqwest::Client>>> L2SafeBlockFetcher for AlloyL2SafeHeadProvider<T> {
async fn l2_block_info_by_number(&self, number: u64) -> Result<L2BlockInfo> {
async fn l2_block_info_by_number(&mut self, number: u64) -> Result<L2BlockInfo> {
if let Some(l2_block_info) = self.l2_block_info_by_number_cache.get(&number) {
return Ok(*l2_block_info);
}

let payload = self.payload_by_number(number).await?;
payload.to_l2_block_ref(self.rollup_config.as_ref())
let l2_block_info = payload.to_l2_block_ref(self.rollup_config.as_ref())?;
self.l2_block_info_by_number_cache.put(number, l2_block_info);
Ok(l2_block_info)
}

async fn payload_by_number(&self, number: u64) -> Result<ExecutionPayloadEnvelope> {
async fn payload_by_number(&mut self, number: u64) -> Result<ExecutionPayloadEnvelope> {
if let Some(payload) = self.payload_by_number_cache.get(&number) {
return Ok(payload.clone());
}

let raw_block: Bytes = self
.inner
.client()
.request("debug_getRawBlock", [U64::from(number)])
.await
.map_err(|e| anyhow!(e))?;
let block = Block::decode(&mut raw_block.as_ref()).map_err(|e| anyhow!(e))?;
Ok(block.into())
let payload_envelope: ExecutionPayloadEnvelope = block.into();

self.payload_by_number_cache.put(number, payload_envelope.clone());
Ok(payload_envelope)
}
}
12 changes: 6 additions & 6 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::{
stages::attributes_queue::AttributesProvider,
traits::{L2SafeBlockFetcher, LogLevel, OriginProvider, ResettableStage, TelemetryProvider},
traits::{L2ChainProvider, LogLevel, OriginProvider, ResettableStage, TelemetryProvider},
types::{
Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig,
SingleBatch, StageError, StageResult, SystemConfig,
Expand Down Expand Up @@ -42,7 +42,7 @@ pub trait BatchQueueProvider {
pub struct BatchQueue<P, BF, T>
where
P: BatchQueueProvider + OriginProvider + Debug,
BF: L2SafeBlockFetcher + Debug,
BF: L2ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
/// The rollup config.
Expand Down Expand Up @@ -76,7 +76,7 @@ where
impl<P, BF, T> BatchQueue<P, BF, T>
where
P: BatchQueueProvider + OriginProvider + Debug,
BF: L2SafeBlockFetcher + Debug,
BF: L2ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
/// Creates a new [BatchQueue] stage.
Expand Down Expand Up @@ -269,7 +269,7 @@ where
impl<P, BF, T> AttributesProvider for BatchQueue<P, BF, T>
where
P: BatchQueueProvider + OriginProvider + Send + Debug,
BF: L2SafeBlockFetcher + Send + Debug,
BF: L2ChainProvider + Send + Debug,
T: TelemetryProvider + Send + Debug,
{
/// Returns the next valid batch upon the given safe head.
Expand Down Expand Up @@ -402,7 +402,7 @@ where
impl<P, BF, T> OriginProvider for BatchQueue<P, BF, T>
where
P: BatchQueueProvider + OriginProvider + Debug,
BF: L2SafeBlockFetcher + Debug,
BF: L2ChainProvider + Debug,
T: TelemetryProvider + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
Expand All @@ -414,7 +414,7 @@ where
impl<P, BF, T> ResettableStage for BatchQueue<P, BF, T>
where
P: BatchQueueProvider + OriginProvider + Send + Debug,
BF: L2SafeBlockFetcher + Send + Debug,
BF: L2ChainProvider + Send + Debug,
T: TelemetryProvider + Send + Debug + Sync,
{
async fn reset(&mut self, base: BlockInfo, _: &SystemConfig) -> StageResult<()> {
Expand Down
12 changes: 3 additions & 9 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,9 @@ impl BatchReader {
}
}

impl From<&[u8]> for BatchReader {
fn from(data: &[u8]) -> Self {
Self { data: Some(data.to_vec()), decompressed: Vec::new(), cursor: 0 }
}
}

impl From<Vec<u8>> for BatchReader {
fn from(data: Vec<u8>) -> Self {
Self { data: Some(data), decompressed: Vec::new(), cursor: 0 }
impl<T: Into<Vec<u8>>> From<T> for BatchReader {
fn from(data: T) -> Self {
Self { data: Some(data.into()), decompressed: Vec::new(), cursor: 0 }
}
}

Expand Down
16 changes: 8 additions & 8 deletions crates/derive/src/traits/data_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,34 @@ use alloy_primitives::{Address, Bytes, B256};
use anyhow::Result;
use async_trait::async_trait;

/// Describes the functionality of a data source that can provide information from the blockchain.
/// Describes the functionality of a data source that can provide information from an Ethereum node.
#[async_trait]
pub trait ChainProvider {
/// Returns the block at the given number, or an error if the block does not exist in the data
/// source.
async fn block_info_by_number(&self, number: u64) -> Result<BlockInfo>;
async fn block_info_by_number(&mut self, number: u64) -> Result<BlockInfo>;

/// Returns all receipts in the block with the given hash, or an error if the block does not
/// exist in the data source.
async fn receipts_by_hash(&self, hash: B256) -> Result<Vec<Receipt>>;
async fn receipts_by_hash(&mut self, hash: B256) -> Result<Vec<Receipt>>;

/// Returns the [BlockInfo] and list of [TxEnvelope]s from the given block hash.
async fn block_info_and_transactions_by_hash(
&self,
&mut self,
hash: B256,
) -> Result<(BlockInfo, Vec<TxEnvelope>)>;
}

/// Describes the functionality of a data source that fetches safe blocks.
/// Describes the functionality of a data source that can provide information from an L2 Ethereum node.
#[async_trait]
pub trait L2SafeBlockFetcher {
pub trait L2ChainProvider {
/// Returns the L2 block info given a block number.
/// Errors if the block does not exist.
async fn l2_block_info_by_number(&self, number: u64) -> Result<L2BlockInfo>;
async fn l2_block_info_by_number(&mut self, number: u64) -> Result<L2BlockInfo>;

/// Returns an execution payload for a given number.
/// Errors if the execution payload does not exist.
async fn payload_by_number(&self, number: u64) -> Result<ExecutionPayloadEnvelope>;
async fn payload_by_number(&mut self, number: u64) -> Result<ExecutionPayloadEnvelope>;
}

/// The BlobProvider trait specifies the functionality of a data source that can provide blobs.
Expand Down
14 changes: 7 additions & 7 deletions crates/derive/src/traits/test_utils/data_sources.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Data Sources Test Utilities
use crate::{
traits::{ChainProvider, L2SafeBlockFetcher},
traits::{ChainProvider, L2ChainProvider},
types::{BlockInfo, ExecutionPayloadEnvelope, L2BlockInfo},
};
use alloc::{boxed::Box, vec::Vec};
Expand All @@ -27,16 +27,16 @@ impl MockBlockFetcher {
}

#[async_trait]
impl L2SafeBlockFetcher for MockBlockFetcher {
async fn l2_block_info_by_number(&self, number: u64) -> Result<L2BlockInfo> {
impl L2ChainProvider for MockBlockFetcher {
async fn l2_block_info_by_number(&mut self, number: u64) -> Result<L2BlockInfo> {
self.blocks
.iter()
.find(|b| b.block_info.number == number)
.cloned()
.ok_or_else(|| anyhow::anyhow!("Block not found"))
}

async fn payload_by_number(&self, number: u64) -> Result<ExecutionPayloadEnvelope> {
async fn payload_by_number(&mut self, number: u64) -> Result<ExecutionPayloadEnvelope> {
self.payloads
.iter()
.find(|p| p.execution_payload.block_number == number)
Expand Down Expand Up @@ -84,15 +84,15 @@ impl TestChainProvider {

#[async_trait]
impl ChainProvider for TestChainProvider {
async fn block_info_by_number(&self, _number: u64) -> Result<BlockInfo> {
async fn block_info_by_number(&mut self, _number: u64) -> Result<BlockInfo> {
if let Some((_, block)) = self.blocks.iter().find(|(n, _)| *n == _number) {
Ok(*block)
} else {
Err(anyhow::anyhow!("Block not found"))
}
}

async fn receipts_by_hash(&self, _hash: B256) -> Result<Vec<Receipt>> {
async fn receipts_by_hash(&mut self, _hash: B256) -> Result<Vec<Receipt>> {
if let Some((_, receipts)) = self.receipts.iter().find(|(h, _)| *h == _hash) {
Ok(receipts.clone())
} else {
Expand All @@ -101,7 +101,7 @@ impl ChainProvider for TestChainProvider {
}

async fn block_info_and_transactions_by_hash(
&self,
&mut self,
hash: B256,
) -> Result<(BlockInfo, Vec<TxEnvelope>)> {
let block = self
Expand Down
4 changes: 2 additions & 2 deletions crates/derive/src/types/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::DecodeError;
use crate::{
traits::L2SafeBlockFetcher,
traits::L2ChainProvider,
types::{BlockInfo, L2BlockInfo, RollupConfig},
};
use alloc::vec::Vec;
Expand Down Expand Up @@ -41,7 +41,7 @@ impl BatchWithInclusionBlock {
/// One or more consecutive l1_blocks should be provided.
/// In case of only a single L1 block, the decision whether a batch is valid may have to stay
/// undecided.
pub fn check_batch<BF: L2SafeBlockFetcher>(
pub fn check_batch<BF: L2ChainProvider>(
&self,
cfg: &RollupConfig,
l1_blocks: &[BlockInfo],
Expand Down
Loading

0 comments on commit af478b5

Please sign in to comment.