Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement engine_getBlobsV1 #9723

Merged
merged 9 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ where
ctx.chain_spec(),
beacon_engine_handle,
ctx.components().payload_builder().clone().into(),
ctx.components().pool().clone(),
Box::new(ctx.task_executor().clone()),
client,
EngineCapabilities::default(),
Expand Down
1 change: 1 addition & 0 deletions crates/node/builder/src/launch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ where
ctx.chain_spec(),
beacon_engine_handle,
ctx.components().payload_builder().clone().into(),
ctx.components().pool().clone(),
Box::new(ctx.task_executor().clone()),
client,
EngineCapabilities::default(),
Expand Down
11 changes: 9 additions & 2 deletions crates/rpc/rpc-api/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use reth_rpc_types::{
ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, TransitionConfiguration,
},
state::StateOverride,
BlockOverrides, EIP1186AccountProofResponse, Filter, JsonStorageKey, Log, SyncStatus,
TransactionRequest,
BlobAndProofV1, BlockOverrides, EIP1186AccountProofResponse, Filter, JsonStorageKey, Log,
SyncStatus, TransactionRequest,
};
// NOTE: We can't use associated types in the `EngineApi` trait because of jsonrpsee, so we use a
// generic here. It would be nice if the rpc macro would understand which types need to have serde.
Expand Down Expand Up @@ -213,6 +213,13 @@ pub trait EngineApi<Engine: EngineTypes> {
/// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
#[method(name = "exchangeCapabilities")]
async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>>;

/// Fetch blobs for the consensus layer from the in-memory blob cache.
#[method(name = "getBlobsV1")]
async fn get_blobs_v1(
&self,
transaction_ids: Vec<B256>,
) -> RpcResult<Vec<Option<BlobAndProofV1>>>;
}

/// A subset of the ETH rpc interface: <https://ethereum.github.io/execution-apis/api-documentation/>
Expand Down
6 changes: 5 additions & 1 deletion crates/rpc/rpc-builder/tests/it/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use reth_rpc_layer::JwtSecret;
use reth_rpc_server_types::RpcModuleSelection;
use reth_rpc_types::engine::{ClientCode, ClientVersionV1};
use reth_tasks::TokioTaskExecutor;
use reth_transaction_pool::test_utils::{TestPool, TestPoolBuilder};
use reth_transaction_pool::{
noop::NoopTransactionPool,
test_utils::{TestPool, TestPoolBuilder},
};
use tokio::sync::mpsc::unbounded_channel;

/// Localhost with port 0 so a free port is used.
Expand All @@ -43,6 +46,7 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle {
MAINNET.clone(),
beacon_engine_handle,
spawn_test_payload_service().into(),
NoopTransactionPool::default(),
Box::<TokioTaskExecutor>::default(),
client,
EngineCapabilities::default(),
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-engine-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ reth-tasks.workspace = true
reth-rpc-types-compat.workspace = true
reth-engine-primitives.workspace = true
reth-evm.workspace = true
reth-transaction-pool.workspace = true

# async
tokio = { workspace = true, features = ["sync"] }
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-engine-api/src/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub const CAPABILITIES: &[&str] = &[
"engine_getPayloadBodiesByRangeV1",
"engine_getPayloadBodiesByHashV2",
"engine_getPayloadBodiesByRangeV2",
"engine_getBlobsV1",
];

// The list of all supported Engine capabilities available over the engine endpoint.
Expand Down
62 changes: 47 additions & 15 deletions crates/rpc/rpc-engine-api/src/engine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ use reth_primitives::{
Block, BlockHash, BlockHashOrNumber, BlockNumber, EthereumHardfork, B256, U64,
};
use reth_rpc_api::EngineApiServer;
use reth_rpc_types::engine::{
CancunPayloadFields, ClientVersionV1, ExecutionPayload, ExecutionPayloadBodiesV1,
ExecutionPayloadBodiesV2, ExecutionPayloadInputV2, ExecutionPayloadV1, ExecutionPayloadV3,
ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
TransitionConfiguration,
use reth_rpc_types::{
engine::{
CancunPayloadFields, ClientVersionV1, ExecutionPayload, ExecutionPayloadBodiesV1,
ExecutionPayloadBodiesV2, ExecutionPayloadInputV2, ExecutionPayloadV1, ExecutionPayloadV3,
ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
TransitionConfiguration,
},
BlobAndProofV1,
};
use reth_rpc_types_compat::engine::payload::{
convert_payload_input_v2_to_payload, convert_to_payload_body_v1, convert_to_payload_body_v2,
};
use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use std::{sync::Arc, time::Instant};
use tokio::sync::oneshot;
use tracing::{trace, warn};
Expand All @@ -37,13 +41,16 @@ pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
/// The upper limit for payload bodies request.
const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;

/// The upper limit blobs `eth_getBlobs`.
const MAX_BLOB_LIMIT: usize = 128;

/// The Engine API implementation that grants the Consensus layer access to data and
/// functions in the Execution layer that are crucial for the consensus process.
pub struct EngineApi<Provider, EngineT: EngineTypes> {
inner: Arc<EngineApiInner<Provider, EngineT>>,
pub struct EngineApi<Provider, EngineT: EngineTypes, Pool> {
inner: Arc<EngineApiInner<Provider, EngineT, Pool>>,
}

struct EngineApiInner<Provider, EngineT: EngineTypes> {
struct EngineApiInner<Provider, EngineT: EngineTypes, Pool> {
/// The provider to interact with the chain.
provider: Provider,
/// Consensus configuration
Expand All @@ -60,19 +67,24 @@ struct EngineApiInner<Provider, EngineT: EngineTypes> {
client: ClientVersionV1,
/// The list of all supported Engine capabilities available over the engine endpoint.
capabilities: EngineCapabilities,
/// Transaction pool.
tx_pool: Pool,
}

impl<Provider, EngineT> EngineApi<Provider, EngineT>
impl<Provider, EngineT, Pool> EngineApi<Provider, EngineT, Pool>
where
Provider: HeaderProvider + BlockReader + StateProviderFactory + EvmEnvProvider + 'static,
EngineT: EngineTypes,
Pool: TransactionPool + 'static,
{
/// Create new instance of [`EngineApi`].
#[allow(clippy::too_many_arguments)]
pub fn new(
provider: Provider,
chain_spec: Arc<ChainSpec>,
beacon_consensus: BeaconConsensusEngineHandle<EngineT>,
payload_store: PayloadStore<EngineT>,
tx_pool: Pool,
task_spawner: Box<dyn TaskSpawner>,
client: ClientVersionV1,
capabilities: EngineCapabilities,
Expand All @@ -86,6 +98,7 @@ where
metrics: EngineApiMetrics::default(),
client,
capabilities,
tx_pool,
});
Self { inner }
}
Expand Down Expand Up @@ -609,10 +622,11 @@ where
}

#[async_trait]
impl<Provider, EngineT> EngineApiServer<EngineT> for EngineApi<Provider, EngineT>
impl<Provider, EngineT, Pool> EngineApiServer<EngineT> for EngineApi<Provider, EngineT, Pool>
where
Provider: HeaderProvider + BlockReader + StateProviderFactory + EvmEnvProvider + 'static,
EngineT: EngineTypes,
Pool: TransactionPool + 'static,
{
/// Handler for `engine_newPayloadV1`
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
Expand Down Expand Up @@ -904,9 +918,25 @@ where
async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
Ok(self.inner.capabilities.list())
}

async fn get_blobs_v1(
&self,
versioned_hashes: Vec<B256>,
) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
if versioned_hashes.len() > MAX_BLOB_LIMIT {
return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() }.into())
}

Ok(self
.inner
.tx_pool
.get_blobs_for_versioned_hashes(&versioned_hashes)
.map_err(|err| EngineApiError::Internal(Box::new(err)))?)
}
}

impl<Provider, EngineT> std::fmt::Debug for EngineApi<Provider, EngineT>
impl<Provider, EngineT, Pool> std::fmt::Debug for EngineApi<Provider, EngineT, Pool>
where
EngineT: EngineTypes,
{
Expand All @@ -920,20 +950,21 @@ mod tests {
use super::*;
use assert_matches::assert_matches;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, BeaconEngineMessage};
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_testing_utils::generators::random_block;

use reth_chainspec::MAINNET;
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::SealedBlock;
use reth_provider::test_utils::MockEthProvider;
use reth_rpc_types::engine::{ClientCode, ClientVersionV1};
use reth_rpc_types_compat::engine::payload::execution_payload_from_sealed_block;
use reth_tasks::TokioTaskExecutor;
use reth_testing_utils::generators::random_block;
use reth_tokio_util::EventSender;
use reth_transaction_pool::noop::NoopTransactionPool;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};

fn setup_engine_api() -> (EngineApiTestHandle, EngineApi<Arc<MockEthProvider>, EthEngineTypes>)
fn setup_engine_api(
) -> (EngineApiTestHandle, EngineApi<Arc<MockEthProvider>, EthEngineTypes, NoopTransactionPool>)
{
let client = ClientVersionV1 {
code: ClientCode::RH,
Expand All @@ -953,6 +984,7 @@ mod tests {
chain_spec.clone(),
BeaconConsensusEngineHandle::new(to_engine, event_sender),
payload_store.into(),
NoopTransactionPool::default(),
task_executor,
client,
EngineCapabilities::default(),
Expand Down
9 changes: 8 additions & 1 deletion crates/rpc/rpc-engine-api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ pub enum EngineApiError {
/// The length that was requested.
len: u64,
},
/// Too many requested versioned hashes for blobs request
#[error("requested blob count too large: {len}")]
BlobRequestTooLarge {
/// The length that was requested.
len: usize,
},
/// Thrown if `engine_getPayloadBodiesByRangeV1` contains an invalid range
#[error("invalid start ({start}) or count ({count})")]
InvalidBodiesRange {
Expand Down Expand Up @@ -145,7 +151,8 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
error.to_string(),
None::<()>,
),
EngineApiError::PayloadRequestTooLarge { .. } => {
EngineApiError::PayloadRequestTooLarge { .. } |
EngineApiError::BlobRequestTooLarge { .. } => {
jsonrpsee_types::error::ErrorObject::owned(
REQUEST_TOO_LARGE_CODE,
REQUEST_TOO_LARGE_MESSAGE,
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/rpc-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true
[dependencies]

# ethereum
alloy-eips.workspace = true
alloy-primitives = { workspace = true, features = ["rand", "rlp", "serde"] }
alloy-rpc-types.workspace = true
alloy-rpc-types-admin.workspace = true
Expand All @@ -32,6 +33,7 @@ op-alloy-rpc-types-engine.workspace = true

# misc
jsonrpsee-types = { workspace = true, optional = true }
serde.workspace = true

[dev-dependencies]
# misc
Expand Down
12 changes: 12 additions & 0 deletions crates/rpc/rpc-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ pub use eth::{
},
};

use alloy_eips::eip4844::{Blob, Bytes48};
use serde::{Deserialize, Serialize};

/// Blob type returned in responses to `engine_getBlobsV1`: <https://github.com/ethereum/execution-apis/pull/559>
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct BlobAndProofV1 {
/// The blob data.
pub blob: Box<Blob>,
/// The KZG proof for the blob.
pub proof: Bytes48,
}

/// Optimism specific rpc types.
pub mod optimism {
pub use op_alloy_rpc_types::*;
Expand Down
1 change: 1 addition & 0 deletions crates/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ reth-chainspec.workspace = true
reth-eth-wire-types.workspace = true
reth-primitives = { workspace = true, features = ["c-kzg", "secp256k1"] }
reth-execution-types.workspace = true
reth-rpc-types.workspace = true
reth-fs-util.workspace = true
reth-storage-api.workspace = true
reth-tasks.workspace = true
Expand Down
26 changes: 26 additions & 0 deletions crates/transaction-pool/src/blobstore/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use alloy_primitives::{TxHash, B256};
use alloy_rlp::{Decodable, Encodable};
use parking_lot::{Mutex, RwLock};
use reth_primitives::BlobTransactionSidecar;
use reth_rpc_types::BlobAndProofV1;
use schnellru::{ByLength, LruMap};
use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
use tracing::{debug, trace};
Expand Down Expand Up @@ -128,6 +129,31 @@ impl BlobStore for DiskFileBlobStore {
self.inner.get_exact(txs)
}

fn get_by_versioned_hashes(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
let mut result = vec![None; versioned_hashes.len()];
for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
for (i, blob_versioned_hash) in blob_sidecar.versioned_hashes().enumerate() {
for (j, target_versioned_hash) in versioned_hashes.iter().enumerate() {
if blob_versioned_hash == *target_versioned_hash {
result[j].get_or_insert_with(|| BlobAndProofV1 {
blob: Box::new(blob_sidecar.blobs[i]),
proof: blob_sidecar.proofs[i],
});
}
}
}

// Return early if all blobs are found.
if result.iter().all(|blob| blob.is_some()) {
break;
}
}
Ok(result)
}

fn data_size_hint(&self) -> Option<usize> {
Some(self.inner.size_tracker.data_size())
}
Expand Down
Loading
Loading