Skip to content

Commit

Permalink
Implement engine_getBlobsV1 (#9723)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
michaelsproul and mattsse committed Sep 9, 2024
1 parent c258c15 commit d8b12ac
Show file tree
Hide file tree
Showing 19 changed files with 184 additions and 21 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ where
ctx.chain_spec(),
beacon_engine_handle,
ctx.components().payload_builder().clone().into(),
ctx.components().pool().clone(),
Box::new(ctx.task_executor().clone()),
client,
EngineCapabilities::default(),
Expand Down
1 change: 1 addition & 0 deletions crates/node/builder/src/launch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ where
ctx.chain_spec(),
beacon_engine_handle,
ctx.components().payload_builder().clone().into(),
ctx.components().pool().clone(),
Box::new(ctx.task_executor().clone()),
client,
EngineCapabilities::default(),
Expand Down
11 changes: 9 additions & 2 deletions crates/rpc/rpc-api/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use reth_rpc_types::{
ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, TransitionConfiguration,
},
state::StateOverride,
BlockOverrides, EIP1186AccountProofResponse, Filter, JsonStorageKey, Log, SyncStatus,
TransactionRequest,
BlobAndProofV1, BlockOverrides, EIP1186AccountProofResponse, Filter, JsonStorageKey, Log,
SyncStatus, TransactionRequest,
};
// NOTE: We can't use associated types in the `EngineApi` trait because of jsonrpsee, so we use a
// generic here. It would be nice if the rpc macro would understand which types need to have serde.
Expand Down Expand Up @@ -213,6 +213,13 @@ pub trait EngineApi<Engine: EngineTypes> {
/// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
#[method(name = "exchangeCapabilities")]
async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>>;

/// Fetch blobs for the consensus layer from the in-memory blob cache.
#[method(name = "getBlobsV1")]
async fn get_blobs_v1(
&self,
transaction_ids: Vec<B256>,
) -> RpcResult<Vec<Option<BlobAndProofV1>>>;
}

/// A subset of the ETH rpc interface: <https://ethereum.github.io/execution-apis/api-documentation/>
Expand Down
6 changes: 5 additions & 1 deletion crates/rpc/rpc-builder/tests/it/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use reth_rpc_layer::JwtSecret;
use reth_rpc_server_types::RpcModuleSelection;
use reth_rpc_types::engine::{ClientCode, ClientVersionV1};
use reth_tasks::TokioTaskExecutor;
use reth_transaction_pool::test_utils::{TestPool, TestPoolBuilder};
use reth_transaction_pool::{
noop::NoopTransactionPool,
test_utils::{TestPool, TestPoolBuilder},
};
use tokio::sync::mpsc::unbounded_channel;

/// Localhost with port 0 so a free port is used.
Expand All @@ -43,6 +46,7 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle {
MAINNET.clone(),
beacon_engine_handle,
spawn_test_payload_service().into(),
NoopTransactionPool::default(),
Box::<TokioTaskExecutor>::default(),
client,
EngineCapabilities::default(),
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-engine-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ reth-tasks.workspace = true
reth-rpc-types-compat.workspace = true
reth-engine-primitives.workspace = true
reth-evm.workspace = true
reth-transaction-pool.workspace = true

# async
tokio = { workspace = true, features = ["sync"] }
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-engine-api/src/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub const CAPABILITIES: &[&str] = &[
"engine_getPayloadBodiesByRangeV1",
"engine_getPayloadBodiesByHashV2",
"engine_getPayloadBodiesByRangeV2",
"engine_getBlobsV1",
];

// The list of all supported Engine capabilities available over the engine endpoint.
Expand Down
62 changes: 47 additions & 15 deletions crates/rpc/rpc-engine-api/src/engine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ use reth_primitives::{
Block, BlockHash, BlockHashOrNumber, BlockNumber, EthereumHardfork, B256, U64,
};
use reth_rpc_api::EngineApiServer;
use reth_rpc_types::engine::{
CancunPayloadFields, ClientVersionV1, ExecutionPayload, ExecutionPayloadBodiesV1,
ExecutionPayloadBodiesV2, ExecutionPayloadInputV2, ExecutionPayloadV1, ExecutionPayloadV3,
ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
TransitionConfiguration,
use reth_rpc_types::{
engine::{
CancunPayloadFields, ClientVersionV1, ExecutionPayload, ExecutionPayloadBodiesV1,
ExecutionPayloadBodiesV2, ExecutionPayloadInputV2, ExecutionPayloadV1, ExecutionPayloadV3,
ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
TransitionConfiguration,
},
BlobAndProofV1,
};
use reth_rpc_types_compat::engine::payload::{
convert_payload_input_v2_to_payload, convert_to_payload_body_v1, convert_to_payload_body_v2,
};
use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use std::{sync::Arc, time::Instant};
use tokio::sync::oneshot;
use tracing::{trace, warn};
Expand All @@ -37,13 +41,16 @@ pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
/// The upper limit for payload bodies request.
const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;

/// The upper limit blobs `eth_getBlobs`.
const MAX_BLOB_LIMIT: usize = 128;

/// The Engine API implementation that grants the Consensus layer access to data and
/// functions in the Execution layer that are crucial for the consensus process.
pub struct EngineApi<Provider, EngineT: EngineTypes> {
inner: Arc<EngineApiInner<Provider, EngineT>>,
pub struct EngineApi<Provider, EngineT: EngineTypes, Pool> {
inner: Arc<EngineApiInner<Provider, EngineT, Pool>>,
}

struct EngineApiInner<Provider, EngineT: EngineTypes> {
struct EngineApiInner<Provider, EngineT: EngineTypes, Pool> {
/// The provider to interact with the chain.
provider: Provider,
/// Consensus configuration
Expand All @@ -60,19 +67,24 @@ struct EngineApiInner<Provider, EngineT: EngineTypes> {
client: ClientVersionV1,
/// The list of all supported Engine capabilities available over the engine endpoint.
capabilities: EngineCapabilities,
/// Transaction pool.
tx_pool: Pool,
}

impl<Provider, EngineT> EngineApi<Provider, EngineT>
impl<Provider, EngineT, Pool> EngineApi<Provider, EngineT, Pool>
where
Provider: HeaderProvider + BlockReader + StateProviderFactory + EvmEnvProvider + 'static,
EngineT: EngineTypes,
Pool: TransactionPool + 'static,
{
/// Create new instance of [`EngineApi`].
#[allow(clippy::too_many_arguments)]
pub fn new(
provider: Provider,
chain_spec: Arc<ChainSpec>,
beacon_consensus: BeaconConsensusEngineHandle<EngineT>,
payload_store: PayloadStore<EngineT>,
tx_pool: Pool,
task_spawner: Box<dyn TaskSpawner>,
client: ClientVersionV1,
capabilities: EngineCapabilities,
Expand All @@ -86,6 +98,7 @@ where
metrics: EngineApiMetrics::default(),
client,
capabilities,
tx_pool,
});
Self { inner }
}
Expand Down Expand Up @@ -609,10 +622,11 @@ where
}

#[async_trait]
impl<Provider, EngineT> EngineApiServer<EngineT> for EngineApi<Provider, EngineT>
impl<Provider, EngineT, Pool> EngineApiServer<EngineT> for EngineApi<Provider, EngineT, Pool>
where
Provider: HeaderProvider + BlockReader + StateProviderFactory + EvmEnvProvider + 'static,
EngineT: EngineTypes,
Pool: TransactionPool + 'static,
{
/// Handler for `engine_newPayloadV1`
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
Expand Down Expand Up @@ -904,9 +918,25 @@ where
async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
Ok(self.inner.capabilities.list())
}

async fn get_blobs_v1(
&self,
versioned_hashes: Vec<B256>,
) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
if versioned_hashes.len() > MAX_BLOB_LIMIT {
return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() }.into())
}

Ok(self
.inner
.tx_pool
.get_blobs_for_versioned_hashes(&versioned_hashes)
.map_err(|err| EngineApiError::Internal(Box::new(err)))?)
}
}

impl<Provider, EngineT> std::fmt::Debug for EngineApi<Provider, EngineT>
impl<Provider, EngineT, Pool> std::fmt::Debug for EngineApi<Provider, EngineT, Pool>
where
EngineT: EngineTypes,
{
Expand All @@ -920,20 +950,21 @@ mod tests {
use super::*;
use assert_matches::assert_matches;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage};
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_testing_utils::generators::random_block;

use reth_chainspec::MAINNET;
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::SealedBlock;
use reth_provider::test_utils::MockEthProvider;
use reth_rpc_types::engine::{ClientCode, ClientVersionV1};
use reth_rpc_types_compat::engine::payload::execution_payload_from_sealed_block;
use reth_tasks::TokioTaskExecutor;
use reth_testing_utils::generators::random_block;
use reth_tokio_util::EventSender;
use reth_transaction_pool::noop::NoopTransactionPool;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};

fn setup_engine_api() -> (EngineApiTestHandle, EngineApi<Arc<MockEthProvider>, EthEngineTypes>)
fn setup_engine_api(
) -> (EngineApiTestHandle, EngineApi<Arc<MockEthProvider>, EthEngineTypes, NoopTransactionPool>)
{
let client = ClientVersionV1 {
code: ClientCode::RH,
Expand All @@ -953,6 +984,7 @@ mod tests {
chain_spec.clone(),
BeaconConsensusEngineHandle::new(to_engine, event_sender),
payload_store.into(),
NoopTransactionPool::default(),
task_executor,
client,
EngineCapabilities::default(),
Expand Down
9 changes: 8 additions & 1 deletion crates/rpc/rpc-engine-api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ pub enum EngineApiError {
/// The length that was requested.
len: u64,
},
/// Too many requested versioned hashes for blobs request
#[error("requested blob count too large: {len}")]
BlobRequestTooLarge {
/// The length that was requested.
len: usize,
},
/// Thrown if `engine_getPayloadBodiesByRangeV1` contains an invalid range
#[error("invalid start ({start}) or count ({count})")]
InvalidBodiesRange {
Expand Down Expand Up @@ -145,7 +151,8 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
error.to_string(),
None::<()>,
),
EngineApiError::PayloadRequestTooLarge { .. } => {
EngineApiError::PayloadRequestTooLarge { .. } |
EngineApiError::BlobRequestTooLarge { .. } => {
jsonrpsee_types::error::ErrorObject::owned(
REQUEST_TOO_LARGE_CODE,
REQUEST_TOO_LARGE_MESSAGE,
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/rpc-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true
[dependencies]

# ethereum
alloy-eips.workspace = true
alloy-primitives = { workspace = true, features = ["rand", "rlp", "serde"] }
alloy-rpc-types.workspace = true
alloy-rpc-types-admin.workspace = true
Expand All @@ -32,6 +33,7 @@ op-alloy-rpc-types-engine.workspace = true

# misc
jsonrpsee-types = { workspace = true, optional = true }
serde.workspace = true

[dev-dependencies]
# misc
Expand Down
12 changes: 12 additions & 0 deletions crates/rpc/rpc-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ pub use eth::{
},
};

use alloy_eips::eip4844::{Blob, Bytes48};
use serde::{Deserialize, Serialize};

/// Blob type returned in responses to `engine_getBlobsV1`: <https://github.com/ethereum/execution-apis/pull/559>
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct BlobAndProofV1 {
/// The blob data.
pub blob: Box<Blob>,
/// The KZG proof for the blob.
pub proof: Bytes48,
}

/// Optimism specific rpc types.
pub mod optimism {
pub use op_alloy_rpc_types::*;
Expand Down
1 change: 1 addition & 0 deletions crates/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ reth-chainspec.workspace = true
reth-eth-wire-types.workspace = true
reth-primitives = { workspace = true, features = ["c-kzg", "secp256k1"] }
reth-execution-types.workspace = true
reth-rpc-types.workspace = true
reth-fs-util.workspace = true
reth-storage-api.workspace = true
reth-tasks.workspace = true
Expand Down
26 changes: 26 additions & 0 deletions crates/transaction-pool/src/blobstore/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use alloy_primitives::{TxHash, B256};
use alloy_rlp::{Decodable, Encodable};
use parking_lot::{Mutex, RwLock};
use reth_primitives::BlobTransactionSidecar;
use reth_rpc_types::BlobAndProofV1;
use schnellru::{ByLength, LruMap};
use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
use tracing::{debug, trace};
Expand Down Expand Up @@ -128,6 +129,31 @@ impl BlobStore for DiskFileBlobStore {
self.inner.get_exact(txs)
}

fn get_by_versioned_hashes(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
let mut result = vec![None; versioned_hashes.len()];
for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
for (i, blob_versioned_hash) in blob_sidecar.versioned_hashes().enumerate() {
for (j, target_versioned_hash) in versioned_hashes.iter().enumerate() {
if blob_versioned_hash == *target_versioned_hash {
result[j].get_or_insert_with(|| BlobAndProofV1 {
blob: Box::new(blob_sidecar.blobs[i]),
proof: blob_sidecar.proofs[i],
});
}
}
}

// Return early if all blobs are found.
if result.iter().all(|blob| blob.is_some()) {
break;
}
}
Ok(result)
}

fn data_size_hint(&self) -> Option<usize> {
Some(self.inner.size_tracker.data_size())
}
Expand Down
Loading

0 comments on commit d8b12ac

Please sign in to comment.