Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement engine_getBlobsV1 #1

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/node/builder/src/launch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ where
ctx.chain_spec(),
beacon_engine_handle,
ctx.components().payload_builder().clone().into(),
ctx.components().pool().clone(),
Box::new(ctx.task_executor().clone()),
client,
EngineCapabilities::default(),
Expand Down
9 changes: 8 additions & 1 deletion crates/rpc/rpc-api/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use reth_rpc_types::{
ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, TransitionConfiguration,
},
state::StateOverride,
BlockOverrides, Filter, Log, RichBlock, SyncStatus, TransactionRequest,
BlobAndProofV1, BlockOverrides, Filter, Log, RichBlock, SyncStatus, TransactionRequest,
};
// NOTE: We can't use associated types in the `EngineApi` trait because of jsonrpsee, so we use a
// generic here. It would be nice if the rpc macro would understand which types need to have serde.
Expand Down Expand Up @@ -211,6 +211,13 @@ pub trait EngineApi<Engine: EngineTypes> {
/// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
#[method(name = "exchangeCapabilities")]
async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>>;

/// Fetch blobs for the consensus layer from the in-memory blob cache.
#[method(name = "getBlobsV1")]
async fn get_blobs_v1(
&self,
transaction_ids: Vec<B256>,
) -> RpcResult<Vec<Option<BlobAndProofV1>>>;
}

/// A subset of the ETH rpc interface: <https://ethereum.github.io/execution-apis/api-documentation/>
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-engine-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ reth-tasks.workspace = true
reth-rpc-types-compat.workspace = true
reth-engine-primitives.workspace = true
reth-evm.workspace = true
reth-transaction-pool.workspace = true

# async
tokio = { workspace = true, features = ["sync"] }
Expand Down
44 changes: 33 additions & 11 deletions crates/rpc/rpc-engine-api/src/engine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ use reth_primitives::{
Block, BlockHash, BlockHashOrNumber, BlockNumber, EthereumHardfork, B256, U64,
};
use reth_rpc_api::EngineApiServer;
use reth_rpc_types::engine::{
CancunPayloadFields, ClientVersionV1, ExecutionPayload, ExecutionPayloadBodiesV1,
ExecutionPayloadBodiesV2, ExecutionPayloadInputV2, ExecutionPayloadV1, ExecutionPayloadV3,
ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
TransitionConfiguration,
use reth_rpc_types::{
engine::{
CancunPayloadFields, ClientVersionV1, ExecutionPayload, ExecutionPayloadBodiesV1,
ExecutionPayloadBodiesV2, ExecutionPayloadInputV2, ExecutionPayloadV1, ExecutionPayloadV3,
ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
TransitionConfiguration,
},
BlobAndProofV1,
};
use reth_rpc_types_compat::engine::payload::{
convert_payload_input_v2_to_payload, convert_to_payload_body_v1, convert_to_payload_body_v2,
};
use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use std::{sync::Arc, time::Instant};
use tokio::sync::oneshot;
use tracing::{trace, warn};
Expand All @@ -39,11 +43,11 @@ const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;

/// The Engine API implementation that grants the Consensus layer access to data and
/// functions in the Execution layer that are crucial for the consensus process.
pub struct EngineApi<Provider, EngineT: EngineTypes> {
inner: Arc<EngineApiInner<Provider, EngineT>>,
pub struct EngineApi<Provider, EngineT: EngineTypes, Pool> {
inner: Arc<EngineApiInner<Provider, EngineT, Pool>>,
}

struct EngineApiInner<Provider, EngineT: EngineTypes> {
struct EngineApiInner<Provider, EngineT: EngineTypes, Pool> {
/// The provider to interact with the chain.
provider: Provider,
/// Consensus configuration
Expand All @@ -60,19 +64,23 @@ struct EngineApiInner<Provider, EngineT: EngineTypes> {
client: ClientVersionV1,
/// The list of all supported Engine capabilities available over the engine endpoint.
capabilities: EngineCapabilities,
/// Transaction pool.
tx_pool: Pool,
}

impl<Provider, EngineT> EngineApi<Provider, EngineT>
impl<Provider, EngineT, Pool> EngineApi<Provider, EngineT, Pool>
where
Provider: HeaderProvider + BlockReader + StateProviderFactory + EvmEnvProvider + 'static,
EngineT: EngineTypes + 'static,
Pool: TransactionPool + 'static,
{
/// Create new instance of [`EngineApi`].
pub fn new(
provider: Provider,
chain_spec: Arc<ChainSpec>,
beacon_consensus: BeaconConsensusEngineHandle<EngineT>,
payload_store: PayloadStore<EngineT>,
tx_pool: Pool,
task_spawner: Box<dyn TaskSpawner>,
client: ClientVersionV1,
capabilities: EngineCapabilities,
Expand All @@ -86,6 +94,7 @@ where
metrics: EngineApiMetrics::default(),
client,
capabilities,
tx_pool,
});
Self { inner }
}
Expand Down Expand Up @@ -609,10 +618,11 @@ where
}

#[async_trait]
impl<Provider, EngineT> EngineApiServer<EngineT> for EngineApi<Provider, EngineT>
impl<Provider, EngineT, Pool> EngineApiServer<EngineT> for EngineApi<Provider, EngineT, Pool>
where
Provider: HeaderProvider + BlockReader + StateProviderFactory + EvmEnvProvider + 'static,
EngineT: EngineTypes + 'static,
Pool: TransactionPool + 'static,
{
/// Handler for `engine_newPayloadV1`
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
Expand Down Expand Up @@ -904,9 +914,21 @@ where
async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
Ok(self.inner.capabilities.list())
}

async fn get_blobs_v1(
&self,
versioned_hashes: Vec<B256>,
) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
// FIXME(sproul): work out error wrapping or make infallible
Ok(self
.inner
.tx_pool
.get_blobs_for_versioned_hashes(&versioned_hashes)
.expect("get_blobs_for_versioned_hashes is infallible"))
}
}

impl<Provider, EngineT> std::fmt::Debug for EngineApi<Provider, EngineT>
impl<Provider, EngineT, Pool> std::fmt::Debug for EngineApi<Provider, EngineT, Pool>
where
EngineT: EngineTypes,
{
Expand Down
3 changes: 2 additions & 1 deletion crates/rpc/rpc-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true
[dependencies]

# ethereum
alloy-eips = { workspace = true }
alloy-primitives = { workspace = true, features = ["rand", "rlp", "serde"] }
alloy-rpc-types = { workspace = true, features = ["jsonrpsee-types"] }
alloy-rpc-types-admin.workspace = true
Expand Down Expand Up @@ -42,4 +43,4 @@ serde_json.workspace = true

[features]
default = ["jsonrpsee-types"]
arbitrary = ["alloy-primitives/arbitrary", "alloy-rpc-types/arbitrary"]
arbitrary = ["alloy-primitives/arbitrary", "alloy-rpc-types/arbitrary"]
14 changes: 14 additions & 0 deletions crates/rpc/rpc-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,17 @@ pub use eth::{

pub use peer::*;
pub use rpc::*;

use alloy_eips::eip4844::BYTES_PER_BLOB;
use alloy_primitives::FixedBytes;
use serde::{Deserialize, Serialize};

/// Blob type returned in responses to `engine_getBlobsV1`.
// FIXME(sproul): move to alloy?
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct BlobAndProofV1 {
/// The blob data.
pub blob: FixedBytes<BYTES_PER_BLOB>,
/// The KZG proof for the blob.
pub proof: FixedBytes<48>,
}
1 change: 1 addition & 0 deletions crates/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ reth-chainspec.workspace = true
reth-eth-wire-types.workspace = true
reth-primitives.workspace = true
reth-execution-types.workspace = true
reth-rpc-types.workspace = true
reth-fs-util.workspace = true
reth-provider.workspace = true
reth-tasks.workspace = true
Expand Down
27 changes: 27 additions & 0 deletions crates/transaction-pool/src/blobstore/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStor
use alloy_rlp::{Decodable, Encodable};
use parking_lot::{Mutex, RwLock};
use reth_primitives::{BlobTransactionSidecar, TxHash, B256};
use reth_rpc_types::BlobAndProofV1;
use schnellru::{ByLength, LruMap};
use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
use tracing::{debug, trace};
Expand Down Expand Up @@ -124,6 +125,32 @@ impl BlobStore for DiskFileBlobStore {
self.inner.get_exact(txs)
}

fn get_by_versioned_hashes(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
let mut result = vec![None; versioned_hashes.len()];
for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
// FIXME(sproul): these versioned hashes could be cached
for (i, blob_versioned_hash) in blob_sidecar.versioned_hashes().enumerate() {
for (j, target_versioned_hash) in versioned_hashes.iter().enumerate() {
if blob_versioned_hash == *target_versioned_hash {
result[j].get_or_insert_with(|| BlobAndProofV1 {
blob: blob_sidecar.blobs[i].clone(),
proof: blob_sidecar.proofs[i].clone(),
});
}
}
}

// Return early if all blobs are found.
if result.iter().all(|blob| blob.is_some()) {
break;
}
}
Ok(result)
}

fn data_size_hint(&self) -> Option<usize> {
Some(self.inner.size_tracker.data_size())
}
Expand Down
27 changes: 27 additions & 0 deletions crates/transaction-pool/src/blobstore/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::blobstore::{
};
use parking_lot::RwLock;
use reth_primitives::B256;
use reth_rpc_types::BlobAndProofV1;
use std::{collections::HashMap, sync::Arc};

/// An in-memory blob store.
Expand Down Expand Up @@ -113,6 +114,32 @@ impl BlobStore for InMemoryBlobStore {
Ok(items)
}

fn get_by_versioned_hashes(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
let mut result = vec![None; versioned_hashes.len()];
for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
// FIXME(sproul): these versioned hashes could be cached
for (i, blob_versioned_hash) in blob_sidecar.versioned_hashes().enumerate() {
for (j, target_versioned_hash) in versioned_hashes.iter().enumerate() {
if blob_versioned_hash == *target_versioned_hash {
result[j].get_or_insert_with(|| BlobAndProofV1 {
blob: blob_sidecar.blobs[i].clone(),
proof: blob_sidecar.proofs[i].clone(),
});
}
}
}

// Return early if all blobs are found.
if result.iter().all(|blob| blob.is_some()) {
break;
}
}
Ok(result)
}

fn data_size_hint(&self) -> Option<usize> {
Some(self.inner.size_tracker.data_size())
}
Expand Down
7 changes: 7 additions & 0 deletions crates/transaction-pool/src/blobstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub use disk::{DiskFileBlobStore, DiskFileBlobStoreConfig, OpenDiskFileBlobStore
pub use mem::InMemoryBlobStore;
pub use noop::NoopBlobStore;
use reth_primitives::{BlobTransactionSidecar, B256};
use reth_rpc_types::BlobAndProofV1;
use std::{
fmt,
sync::atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -64,6 +65,12 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
/// Returns an error if any of the blobs are not found in the blob store.
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError>;

/// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
fn get_by_versioned_hashes(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;

/// Data size of all transactions in the blob store.
fn data_size_hint(&self) -> Option<usize>;

Expand Down
8 changes: 8 additions & 0 deletions crates/transaction-pool/src/blobstore/noop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobTransactionSidecar};
use reth_primitives::B256;
use reth_rpc_types::BlobAndProofV1;

/// A blobstore implementation that does nothing
#[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Default)]
Expand Down Expand Up @@ -49,6 +50,13 @@ impl BlobStore for NoopBlobStore {
Err(BlobStoreError::MissingSidecar(txs[0]))
}

fn get_by_versioned_hashes(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
Ok(vec![None; versioned_hashes.len()])
}

fn data_size_hint(&self) -> Option<usize> {
Some(0)
}
Expand Down
12 changes: 11 additions & 1 deletion crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,11 @@
use crate::{identifier::TransactionId, pool::PoolInner};
use aquamarine as _;
use reth_eth_wire_types::HandleMempoolData;
use reth_primitives::{Address, BlobTransactionSidecar, PooledTransactionsElement, TxHash, U256};
use reth_primitives::{
Address, BlobTransactionSidecar, PooledTransactionsElement, TxHash, B256, U256,
};
use reth_provider::StateProviderFactory;
use reth_rpc_types::BlobAndProofV1;
use std::{collections::HashSet, sync::Arc};
use tokio::sync::mpsc::Receiver;
use tracing::{instrument, trace};
Expand Down Expand Up @@ -521,6 +524,13 @@ where
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
self.pool.blob_store().get_exact(tx_hashes)
}

fn get_blobs_for_versioned_hashes(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
self.pool.blob_store().get_by_versioned_hashes(versioned_hashes)
}
}

impl<V, T, S> TransactionPoolExt for Pool<V, T, S>
Expand Down
10 changes: 9 additions & 1 deletion crates/transaction-pool/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use crate::{
TransactionPool, TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction,
};
use reth_eth_wire_types::HandleMempoolData;
use reth_primitives::{Address, BlobTransactionSidecar, TxHash, U256};
use reth_primitives::{Address, BlobTransactionSidecar, TxHash, B256, U256};
use reth_rpc_types::BlobAndProofV1;
use std::{collections::HashSet, marker::PhantomData, sync::Arc};
use tokio::sync::{mpsc, mpsc::Receiver};

Expand Down Expand Up @@ -242,6 +243,13 @@ impl TransactionPool for NoopTransactionPool {
}
Err(BlobStoreError::MissingSidecar(tx_hashes[0]))
}

fn get_blobs_for_versioned_hashes(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
Ok(vec![None; versioned_hashes.len()])
}
}

/// A [`TransactionValidator`] that does nothing.
Expand Down
Loading