diff --git a/Cargo.lock b/Cargo.lock index 5f778f69..026c828f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2270,7 +2270,9 @@ name = "eth1_api" version = "0.0.0" dependencies = [ "anyhow", + "arc-swap", "bls", + "dedicated_executor", "derive_more 1.0.0", "either", "enum-iterator", @@ -2280,9 +2282,11 @@ dependencies = [ "fork_choice_control", "fs-err", "futures", + "helper_functions", "hex", "hex-literal", "httpmock", + "itertools 0.13.0", "jwt-simple", "log", "memoffset", diff --git a/eth1_api/Cargo.toml b/eth1_api/Cargo.toml index 6431d57a..bbd95a3b 100644 --- a/eth1_api/Cargo.toml +++ b/eth1_api/Cargo.toml @@ -8,7 +8,9 @@ workspace = true [dependencies] anyhow = { workspace = true } +arc-swap = { workspace = true } bls = { workspace = true } +dedicated_executor = { workspace = true } derive_more = { workspace = true } either = { workspace = true } enum-iterator = { workspace = true } @@ -18,8 +20,10 @@ features = { workspace = true } fork_choice_control = { workspace = true } fs-err = { workspace = true } futures = { workspace = true } +helper_functions = { workspace = true } hex = { workspace = true } hex-literal = { workspace = true } +itertools = { workspace = true } jwt-simple = { workspace = true } log = { workspace = true } memoffset = { workspace = true } diff --git a/eth1_api/src/endpoints.rs b/eth1_api/src/endpoints.rs index eaf7ab33..c50845e6 100644 --- a/eth1_api/src/endpoints.rs +++ b/eth1_api/src/endpoints.rs @@ -1,25 +1,20 @@ +use core::sync::atomic::{AtomicBool, Ordering}; +use std::{collections::HashSet, sync::Arc}; + +use arc_swap::ArcSwap; use derive_more::Debug; +use itertools::Itertools as _; use types::redacting_url::RedactingUrl; -#[derive(Clone, Copy, Debug)] -#[cfg_attr(test, derive(PartialEq, Eq))] -pub enum EndpointStatus { - Online, - Offline, -} +const ORDERING: Ordering = Ordering::SeqCst; -impl EndpointStatus { - const fn is_offline(self) -> bool { - matches!(self, Self::Offline) - } -} - -#[derive(Clone, Debug)] -#[cfg_attr(test, derive(PartialEq, Eq))] +#[derive(Debug)] +#[expect(clippy::partial_pub_fields)] pub struct Endpoint { - index: usize, - status: EndpointStatus, - url: RedactingUrl, + is_online: AtomicBool, + pub is_fallback: bool, + pub url: RedactingUrl, + capabilities: ArcSwap>, } impl Endpoint { @@ -27,13 +22,20 @@ impl Endpoint { &self.url } - pub const fn is_fallback(&self) -> bool { - self.index > 0 + pub fn is_online(&self) -> bool { + self.is_online.load(ORDERING) + } + + pub fn set_capabilities(&self, capabilities: HashSet) { + self.capabilities.store(Arc::new(capabilities)); + } + + pub fn set_online_status(&self, is_online: bool) { + self.is_online.store(is_online, ORDERING) } } pub struct Endpoints { - current: usize, endpoints: Vec, } @@ -43,64 +45,46 @@ impl Endpoints { .into_iter() .enumerate() .map(|(index, url)| Endpoint { - index, - status: EndpointStatus::Online, + is_online: AtomicBool::new(true), + is_fallback: index > 0, url, + capabilities: ArcSwap::from_pointee(HashSet::default()), }) .collect(); - Self { - current: 0, - endpoints, - } + Self { endpoints } } pub fn el_offline(&self) -> bool { - self.endpoints - .iter() - .all(|endpoint| endpoint.status.is_offline()) - } - - pub fn current(&self) -> Option<&Endpoint> { - self.endpoints.get(self.current) + self.endpoints.iter().all(|endpoint| !endpoint.is_online()) } pub fn is_empty(&self) -> bool { self.endpoints.is_empty() } - pub fn peek_next(&self) -> Option<&Endpoint> { - self.endpoints.get(self.next_index()) - } - - pub fn advance(&mut self) { - self.current = self.next_index(); - } - - pub fn set_status(&mut self, status: EndpointStatus) { - if let Some(current) = self.current_mut() { - current.status = status; - } - } - - pub fn reset(&mut self) { - self.current = 0; - } - - const fn next_index(&self) -> usize { - self.current.saturating_add(1) - } - - fn current_mut(&mut self) -> Option<&mut Endpoint> { - self.endpoints.get_mut(self.current) + pub fn endpoints_for_request( + &self, + capability: Option<&str>, + ) -> impl Iterator { + self.endpoints + .iter() + .filter(|endpoint| { + capability + .map(|capability| endpoint.capabilities.load().contains(capability)) + .unwrap_or(true) + }) + .sorted_by_key(|endpoint| !endpoint.is_online()) } } #[cfg(test)] mod tests { + use std::collections::HashSet; + use anyhow::Result; - use crate::endpoints::{Endpoint, EndpointStatus, Endpoints}; + use crate::{endpoints::Endpoints, eth1_api::ENGINE_GET_EL_BLOBS_V1}; #[test] fn test_empty_endpoints() { @@ -109,13 +93,14 @@ mod tests { assert!(endpoints.is_empty()); assert!(endpoints.el_offline()); - assert_eq!(endpoints.current(), None); - assert_eq!(endpoints.peek_next(), None); + let mut endpoints_for_request = endpoints.endpoints_for_request(None); + + assert!(endpoints_for_request.next().is_none()); } #[test] fn test_endpoints() -> Result<()> { - let mut endpoints = Endpoints::new([ + let endpoints = Endpoints::new([ "https://example1.net".parse()?, "https://example2.net".parse()?, ]); @@ -124,74 +109,89 @@ mod tests { assert!(!endpoints.el_offline(), "initially endpoints are online"); assert_eq!( - endpoints.current().cloned(), - Some(Endpoint { - index: 0, - status: EndpointStatus::Online, - url: "https://example1.net".parse()?, - }), - ); - - assert_eq!( - endpoints.peek_next().cloned(), - Some(Endpoint { - index: 1, - status: EndpointStatus::Online, - url: "https://example2.net".parse()?, - }), + endpoints + .endpoints_for_request(None) + .map(|endpoint| (endpoint.url().clone(), endpoint.is_online())) + .collect::>(), + [ + ("https://example1.net".parse()?, true), + ("https://example2.net".parse()?, true), + ] ); - endpoints.set_status(EndpointStatus::Offline); + // set first endpoint to be offline + let current_endpoint = endpoints + .endpoints_for_request(None) + .next() + .expect("current endpoint should be present"); - assert_eq!( - endpoints.current().map(|endpoint| endpoint.status), - Some(EndpointStatus::Offline), - ); + current_endpoint.set_online_status(false); - endpoints.advance(); + assert!(!current_endpoint.is_online()); + assert!(!endpoints.el_offline()); + // check that online endpoint is used for requests assert_eq!( - endpoints.current().cloned(), - Some(Endpoint { - index: 1, - status: EndpointStatus::Online, - url: "https://example2.net".parse()?, - }), + endpoints + .endpoints_for_request(None) + .map(|endpoint| (endpoint.url().clone(), endpoint.is_online())) + .collect::>(), + [ + ("https://example2.net".parse()?, true), + ("https://example1.net".parse()?, false), + ] ); - assert_eq!(endpoints.peek_next(), None); - assert!(!endpoints.el_offline()); + // set the fallback endpoint to be offline + let current_endpoint = endpoints + .endpoints_for_request(None) + .next() + .expect("current endpoint should be present"); - endpoints.set_status(EndpointStatus::Offline); - endpoints.advance(); + current_endpoint.set_online_status(false); - assert!(!endpoints.is_empty()); + assert!(!current_endpoint.is_online()); assert!(endpoints.el_offline()); - assert_eq!(endpoints.current(), None); - assert_eq!(endpoints.peek_next(), None); + assert_eq!( + endpoints + .endpoints_for_request(None) + .map(|endpoint| (endpoint.url().clone(), endpoint.is_online())) + .collect::>(), + [ + ("https://example1.net".parse()?, false), + ("https://example2.net".parse()?, false), + ] + ); + + Ok(()) + } + + #[test] + fn test_endpoints_with_capabilities() -> Result<()> { + let endpoints = Endpoints::new([ + "https://example1.net".parse()?, + "https://example2.net".parse()?, + ]); - endpoints.reset(); + assert!(endpoints + .endpoints_for_request(Some(ENGINE_GET_EL_BLOBS_V1)) + .next() + .is_none()); - // offline endpoints are still offline after reset - assert!(endpoints.el_offline()); + let current_endpoint = endpoints + .endpoints_for_request(None) + .next() + .expect("current endpoint should be present"); - assert_eq!( - endpoints.current().cloned(), - Some(Endpoint { - index: 0, - status: EndpointStatus::Offline, - url: "https://example1.net".parse()?, - }), - ); + current_endpoint.set_capabilities(HashSet::from([ENGINE_GET_EL_BLOBS_V1.to_owned()])); assert_eq!( - endpoints.peek_next().cloned(), - Some(Endpoint { - index: 1, - status: EndpointStatus::Offline, - url: "https://example2.net".parse()?, - }), + endpoints + .endpoints_for_request(Some(ENGINE_GET_EL_BLOBS_V1)) + .map(|endpoint| endpoint.url().clone()) + .collect::>(), + ["https://example1.net".parse()?] ); Ok(()) diff --git a/eth1_api/src/eth1_api.rs b/eth1_api/src/eth1_api.rs index dc2b0b04..f99f3458 100644 --- a/eth1_api/src/eth1_api.rs +++ b/eth1_api/src/eth1_api.rs @@ -6,12 +6,12 @@ use either::Either; use enum_iterator::Sequence as _; use ethereum_types::H64; use execution_engine::{ - EngineGetPayloadV1Response, EngineGetPayloadV2Response, EngineGetPayloadV3Response, - EngineGetPayloadV4Response, ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, - ForkChoiceStateV1, ForkChoiceUpdatedResponse, PayloadAttributes, PayloadId, PayloadStatusV1, - RawExecutionRequests, + BlobAndProofV1, EngineGetPayloadV1Response, EngineGetPayloadV2Response, + EngineGetPayloadV3Response, EngineGetPayloadV4Response, ExecutionPayloadV1, ExecutionPayloadV2, + ExecutionPayloadV3, ForkChoiceStateV1, ForkChoiceUpdatedResponse, PayloadAttributes, PayloadId, + PayloadStatusV1, RawExecutionRequests, }; -use futures::{channel::mpsc::UnboundedSender, lock::Mutex, Future}; +use futures::{channel::mpsc::UnboundedSender, Future}; use log::warn; use prometheus_metrics::Metrics; use reqwest::{header::HeaderMap, Client}; @@ -23,6 +23,7 @@ use thiserror::Error; use types::{ combined::{ExecutionPayload, ExecutionPayloadParams}, config::Config, + deneb::primitives::VersionedHash, nonstandard::{Phase, WithBlobsAndMev}, phase0::primitives::{ExecutionBlockHash, ExecutionBlockNumber}, preset::Preset, @@ -39,23 +40,26 @@ use web3::{ use crate::{ auth::Auth, deposit_event::DepositEvent, - endpoints::{Endpoint, EndpointStatus, Endpoints}, + endpoints::{Endpoint, Endpoints}, eth1_block::Eth1Block, Eth1ApiToMetrics, Eth1ConnectionData, }; const ENGINE_FORKCHOICE_UPDATED_TIMEOUT: Duration = Duration::from_secs(8); +const ENGINE_GET_BLOBS_TIMEOUT: Duration = Duration::from_secs(1); const ENGINE_GET_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(1); const ENGINE_NEW_PAYLOAD_TIMEOUT: Duration = Duration::from_secs(8); +pub const ENGINE_GET_EL_BLOBS_V1: &str = "engine_getBlobsV1"; + #[expect(clippy::struct_field_names)] pub struct Eth1Api { config: Arc, client: Client, - auth: Arc, - endpoints: Mutex, + pub(crate) auth: Arc, + pub(crate) endpoints: Endpoints, eth1_api_to_metrics_tx: Option>, - metrics: Option>, + pub(crate) metrics: Option>, } impl Eth1Api { @@ -72,7 +76,7 @@ impl Eth1Api { config, client, auth, - endpoints: Mutex::new(Endpoints::new(eth1_rpc_urls)), + endpoints: Endpoints::new(eth1_rpc_urls), eth1_api_to_metrics_tx, metrics, } @@ -80,13 +84,13 @@ impl Eth1Api { pub async fn current_head_number(&self) -> Result { Ok(self - .request_with_fallback(|(api, headers)| Ok(api.block_number(headers))) + .request_with_fallback(|(api, headers)| Ok(api.block_number(headers)), None) .await? .as_u64()) } pub async fn get_block(&self, block_id: BlockId) -> Result> { - self.request_with_fallback(|(api, headers)| Ok(api.block(block_id, headers))) + self.request_with_fallback(|(api, headers)| Ok(api.block(block_id, headers)), None) .await? .map(Eth1Block::try_from) .transpose() @@ -120,7 +124,7 @@ impl Eth1Api { .build(); let logs = self - .request_with_fallback(|(api, headers)| Ok(api.logs(filter.clone(), headers))) + .request_with_fallback(|(api, headers)| Ok(api.logs(filter.clone(), headers)), None) .await?; if let Some(log) = logs.first() { @@ -132,6 +136,21 @@ impl Eth1Api { Ok(None) } + pub(crate) async fn get_blobs( + &self, + versioned_hashes: Vec, + ) -> Result>>> { + let params = vec![serde_json::to_value(versioned_hashes)?]; + + self.execute( + ENGINE_GET_EL_BLOBS_V1, + params, + Some(ENGINE_GET_BLOBS_TIMEOUT), + Some(ENGINE_GET_EL_BLOBS_V1), + ) + .await + } + pub async fn get_blocks( &self, block_number_range: RangeInclusive, @@ -176,7 +195,7 @@ impl Eth1Api { let mut deposit_events = BTreeMap::<_, Vec<_>>::new(); for log in self - .request_with_fallback(|(api, headers)| Ok(api.logs(filter.clone(), headers))) + .request_with_fallback(|(api, headers)| Ok(api.logs(filter.clone(), headers)), None) .await? { let block_number = match log.block_number { @@ -217,6 +236,7 @@ impl Eth1Api { "engine_newPayloadV1", params, Some(ENGINE_NEW_PAYLOAD_TIMEOUT), + None, ) .await } @@ -227,6 +247,7 @@ impl Eth1Api { "engine_newPayloadV2", params, Some(ENGINE_NEW_PAYLOAD_TIMEOUT), + None, ) .await } @@ -247,6 +268,7 @@ impl Eth1Api { "engine_newPayloadV3", params, Some(ENGINE_NEW_PAYLOAD_TIMEOUT), + None, ) .await } @@ -272,6 +294,7 @@ impl Eth1Api { "engine_newPayloadV4", params, Some(ENGINE_NEW_PAYLOAD_TIMEOUT), + None, ) .await } @@ -320,6 +343,7 @@ impl Eth1Api { "engine_forkchoiceUpdatedV1", params, Some(ENGINE_FORKCHOICE_UPDATED_TIMEOUT), + None, ) .await? } @@ -328,6 +352,7 @@ impl Eth1Api { "engine_forkchoiceUpdatedV2", params, Some(ENGINE_FORKCHOICE_UPDATED_TIMEOUT), + None, ) .await? } @@ -336,6 +361,7 @@ impl Eth1Api { "engine_forkchoiceUpdatedV3", params, Some(ENGINE_FORKCHOICE_UPDATED_TIMEOUT), + None, ) .await? } @@ -344,6 +370,7 @@ impl Eth1Api { "engine_forkchoiceUpdatedV3", params, Some(ENGINE_FORKCHOICE_UPDATED_TIMEOUT), + None, ) .await? } @@ -397,6 +424,7 @@ impl Eth1Api { "engine_getPayloadV1", params, Some(ENGINE_GET_PAYLOAD_TIMEOUT), + None, ) .await .map(Into::into) @@ -408,6 +436,7 @@ impl Eth1Api { "engine_getPayloadV2", params, Some(ENGINE_GET_PAYLOAD_TIMEOUT), + None, ) .await .map(Into::into) @@ -419,6 +448,7 @@ impl Eth1Api { "engine_getPayloadV3", params, Some(ENGINE_GET_PAYLOAD_TIMEOUT), + None, ) .await .map(Into::into) @@ -430,6 +460,7 @@ impl Eth1Api { "engine_getPayloadV4", params, Some(ENGINE_GET_PAYLOAD_TIMEOUT), + None, ) .await .map(Into::into) @@ -442,82 +473,70 @@ impl Eth1Api { method: &str, params: Vec, timeout: Option, + capability: Option<&str>, ) -> Result { let _timer = self.metrics.as_ref().map(|metrics| { prometheus_metrics::start_timer_vec(&metrics.eth1_api_request_times, method) }); - self.request_with_fallback(|(api, headers)| { - Ok(CallFuture::new(api.transport().execute_with_headers( - method, - params.clone(), - headers, - timeout, - ))) - }) + self.request_with_fallback( + |(api, headers)| { + Ok(CallFuture::new(api.transport().execute_with_headers( + method, + params.clone(), + headers, + timeout, + ))) + }, + capability, + ) .await } - pub async fn el_offline(&self) -> bool { - self.endpoints.lock().await.el_offline() + #[must_use] + pub fn el_offline(&self) -> bool { + self.endpoints.el_offline() } - async fn request_with_fallback(&self, request_from_api: R) -> Result + async fn request_with_fallback( + &self, + request_from_api: R, + capability: Option<&str>, + ) -> Result where R: Fn((Eth, Option)) -> Result> + Sync + Send, O: DeserializeOwned + Send, F: Future> + Send, { - while let Some(endpoint) = self.current_endpoint().await { - let url = endpoint.url(); - let http = Http::with_client(self.client.clone(), url.clone().into_url()); - let api = Web3::new(http).eth(); - let headers = self.auth.headers()?; - let query = request_from_api((api, headers))?.await; + let mut endpoints_for_request = self.endpoints.endpoints_for_request(capability).peekable(); + + while let Some(endpoint) = endpoints_for_request.next() { + let api = self.build_api_for_request(endpoint); + let query = request_from_api((api, self.auth.headers()?))?.await; match query { Ok(result) => { - self.set_endpoint_status(EndpointStatus::Online).await; - - if let Some(metrics_tx) = self.eth1_api_to_metrics_tx.as_ref() { - Eth1ApiToMetrics::Eth1Connection(Eth1ConnectionData { - sync_eth1_connected: true, - sync_eth1_fallback_connected: endpoint.is_fallback(), - }) - .send(metrics_tx); - } - + self.on_ok_response(endpoint); return Ok(result); } Err(error) => { - if let Some(metrics) = self.metrics.as_ref() { - metrics.eth1_api_errors_count.inc(); - } + self.on_error_response(endpoint); - match self.peek_next_endpoint().await { + match endpoints_for_request.peek() { Some(next_endpoint) => warn!( - "Eth1 RPC endpoint {url} returned an error: {error}; \ - switching to {}", + "Eth1 RPC endpoint {} returned an error: {error}; switching to {}", + endpoint.url(), next_endpoint.url(), ), None => warn!( - "last available Eth1 RPC endpoint {url} returned an error: {error}", + "last available Eth1 RPC endpoint {} returned an error: {error}", + endpoint.url(), ), } - - if let Some(metrics_tx) = self.eth1_api_to_metrics_tx.as_ref() { - Eth1ApiToMetrics::Eth1Connection(Eth1ConnectionData::default()) - .send(metrics_tx); - } - - self.set_endpoint_status(EndpointStatus::Offline).await; - self.next_endpoint().await; } } } - self.reset_endpoints().await; - if let Some(metrics) = self.metrics.as_ref() { metrics.eth1_api_reset_count.inc(); } @@ -525,32 +544,38 @@ impl Eth1Api { // Checking this in `Eth1Api::new` would be unnecessarily strict. // Syncing a predefined network without proposing blocks does not require an Eth1 RPC // (except during the Merge transition). - ensure!( - !self.endpoints.lock().await.is_empty(), - Error::NoEndpointsProvided - ); + ensure!(!self.endpoints.is_empty(), Error::NoEndpointsProvided); bail!(Error::EndpointsExhausted) } - async fn current_endpoint(&self) -> Option { - (*self.endpoints.lock().await).current().cloned() + pub(crate) fn build_api_for_request(&self, endpoint: &Endpoint) -> Eth { + let http = Http::with_client(self.client.clone(), endpoint.url().clone().into_url()); + Web3::new(http).eth() } - async fn set_endpoint_status(&self, status: EndpointStatus) { - (*self.endpoints.lock().await).set_status(status) - } + pub(crate) fn on_ok_response(&self, endpoint: &Endpoint) { + endpoint.set_online_status(true); - async fn next_endpoint(&self) { - self.endpoints.lock().await.advance(); + if let Some(metrics_tx) = self.eth1_api_to_metrics_tx.as_ref() { + Eth1ApiToMetrics::Eth1Connection(Eth1ConnectionData { + sync_eth1_connected: true, + sync_eth1_fallback_connected: endpoint.is_fallback, + }) + .send(metrics_tx); + } } - async fn peek_next_endpoint(&self) -> Option { - self.endpoints.lock().await.peek_next().cloned() - } + pub(crate) fn on_error_response(&self, endpoint: &Endpoint) { + endpoint.set_online_status(false); - async fn reset_endpoints(&self) { - self.endpoints.lock().await.reset(); + if let Some(metrics) = self.metrics.as_ref() { + metrics.eth1_api_errors_count.inc(); + } + + if let Some(metrics_tx) = self.eth1_api_to_metrics_tx.as_ref() { + Eth1ApiToMetrics::Eth1Connection(Eth1ConnectionData::default()).send(metrics_tx); + } } } @@ -605,8 +630,7 @@ mod tests { None, )); - assert!(eth1_api.el_offline().await); - assert_eq!(eth1_api.current_endpoint().await, None); + assert!(eth1_api.el_offline()); assert_eq!( eth1_api @@ -642,7 +666,7 @@ mod tests { None, )); - assert!(!eth1_api.el_offline().await); + assert!(!eth1_api.el_offline()); assert_eq!( eth1_api .current_head_number() @@ -653,8 +677,15 @@ mod tests { ); // Despite the endpoint returning an error, it remains the only available option - assert!(eth1_api.current_endpoint().await.is_some()); - assert!(eth1_api.el_offline().await); + assert!(eth1_api.el_offline()); + assert_eq!( + eth1_api + .current_head_number() + .await + .expect_err("500 response should be a an error") + .downcast::()?, + Error::EndpointsExhausted, + ); Ok(()) } @@ -693,14 +724,9 @@ mod tests { None, )); - // Set to use the primary endpoint which is not a fallback - assert!(!eth1_api.el_offline().await); - assert!(!eth1_api - .current_endpoint() - .await - .expect("endpoint should be available") - .is_fallback()); + assert!(!eth1_api.el_offline()); + // Expect to use the fallback endpoint when the primary endpoint returns an error assert_eq!( eth1_api .current_head_number() @@ -709,15 +735,8 @@ mod tests { 119_363, ); - // Expect to use the fallback endpoint when the primary endpoint returns an error - assert!(eth1_api - .current_endpoint() - .await - .expect("the fallback endpoint should be available") - .is_fallback()); - // Even though the primary endpoint is offline, eth1_api itself is not offline - assert!(!eth1_api.el_offline().await); + assert!(!eth1_api.el_offline()); Ok(()) } diff --git a/eth1_api/src/eth1_execution_engine.rs b/eth1_api/src/eth1_execution_engine.rs index bed44723..04e7bf4c 100644 --- a/eth1_api/src/eth1_execution_engine.rs +++ b/eth1_api/src/eth1_execution_engine.rs @@ -8,8 +8,9 @@ use futures::channel::{mpsc::UnboundedSender, oneshot::Sender}; use log::{info, warn}; use tokio::runtime::{Builder, Handle}; use types::{ - combined::{ExecutionPayload, ExecutionPayloadParams}, + combined::{ExecutionPayload, ExecutionPayloadParams, SignedBeaconBlock}, config::Config, + deneb::primitives::BlobIndex, nonstandard::{Phase, TimedPowBlock, WithBlobsAndMev}, phase0::primitives::{ExecutionBlockHash, H256}, preset::Preset, @@ -32,6 +33,18 @@ impl ExecutionEngine

for Eth1ExecutionEngine

{ true } + fn exchange_capabilities(&self) { + ExecutionServiceMessage::ExchangeCapabilities.send(&self.execution_service_tx); + } + + fn get_blobs(&self, block: Arc>, blob_indices: Vec) { + ExecutionServiceMessage::GetBlobs { + block, + blob_indices, + } + .send(&self.execution_service_tx); + } + fn notify_forkchoice_updated( &self, head_eth1_block_hash: ExecutionBlockHash, diff --git a/eth1_api/src/execution_service.rs b/eth1_api/src/execution_service.rs index aeab1cfe..82fd7fca 100644 --- a/eth1_api/src/execution_service.rs +++ b/eth1_api/src/execution_service.rs @@ -1,12 +1,14 @@ use std::sync::Arc; use anyhow::Result; +use dedicated_executor::DedicatedExecutor; use derive_more::Constructor; use either::Either; use execution_engine::{ForkChoiceUpdatedResponse, PayloadAttributes, PayloadStatusV1}; use fork_choice_control::Wait; use futures::{channel::mpsc::UnboundedReceiver, StreamExt as _}; use log::warn; +use std_ext::ArcExt as _; use types::{ combined::{ExecutionPayload, ExecutionPayloadParams}, nonstandard::Phase, @@ -14,12 +16,16 @@ use types::{ preset::Preset, }; -use crate::{eth1_api::Eth1Api, messages::ExecutionServiceMessage, misc::ApiController}; +use crate::{ + eth1_api::Eth1Api, messages::ExecutionServiceMessage, misc::ApiController, + spawn_blobs_download_task, spawn_exchange_capabilities_task, +}; #[derive(Constructor)] pub struct ExecutionService { api: Arc, controller: ApiController, + dedicated_executor: Arc, rx: UnboundedReceiver>, } @@ -27,6 +33,24 @@ impl ExecutionService { pub async fn run(mut self) -> Result<()> { while let Some(message) = self.rx.next().await { match message { + ExecutionServiceMessage::ExchangeCapabilities => { + spawn_exchange_capabilities_task( + self.api.clone_arc(), + &self.dedicated_executor, + ); + } + ExecutionServiceMessage::GetBlobs { + block, + blob_indices, + } => { + spawn_blobs_download_task( + self.api.clone_arc(), + self.controller.clone_arc(), + &self.dedicated_executor, + block, + blob_indices, + ); + } ExecutionServiceMessage::NotifyForkchoiceUpdated { head_eth1_block_hash, safe_eth1_block_hash, diff --git a/eth1_api/src/lib.rs b/eth1_api/src/lib.rs index 5ed2ea4f..03e0a6f8 100644 --- a/eth1_api/src/lib.rs +++ b/eth1_api/src/lib.rs @@ -7,6 +7,7 @@ pub use crate::{ execution_service::ExecutionService, messages::{Eth1ApiToMetrics, Eth1ConnectionData, Eth1Metrics, ExecutionServiceMessage}, misc::{ApiController, RealController}, + tasks::{spawn_blobs_download_task, spawn_exchange_capabilities_task}, }; mod auth; @@ -18,3 +19,4 @@ mod eth1_execution_engine; mod execution_service; mod messages; mod misc; +mod tasks; diff --git a/eth1_api/src/messages.rs b/eth1_api/src/messages.rs index 6434dc64..2b687782 100644 --- a/eth1_api/src/messages.rs +++ b/eth1_api/src/messages.rs @@ -1,16 +1,24 @@ +use std::sync::Arc; + use anyhow::Result; use either::Either; use execution_engine::{PayloadAttributes, PayloadId, PayloadStatusV1}; use futures::channel::{mpsc::UnboundedSender, oneshot::Sender}; use log::debug; use types::{ - combined::{ExecutionPayload, ExecutionPayloadParams}, + combined::{ExecutionPayload, ExecutionPayloadParams, SignedBeaconBlock}, + deneb::primitives::BlobIndex, nonstandard::Phase, phase0::primitives::{ExecutionBlockHash, H256}, preset::Preset, }; pub enum ExecutionServiceMessage { + ExchangeCapabilities, + GetBlobs { + block: Arc>, + blob_indices: Vec, + }, NotifyForkchoiceUpdated { head_eth1_block_hash: ExecutionBlockHash, safe_eth1_block_hash: ExecutionBlockHash, diff --git a/eth1_api/src/tasks.rs b/eth1_api/src/tasks.rs new file mode 100644 index 00000000..212e23eb --- /dev/null +++ b/eth1_api/src/tasks.rs @@ -0,0 +1,140 @@ +use core::time::Duration; +use std::{collections::HashSet, sync::Arc}; + +use anyhow::Result; +use dedicated_executor::DedicatedExecutor; +use execution_engine::BlobAndProofV1; +use fork_choice_control::Wait; +use helper_functions::misc; +use log::{info, warn}; +use types::{ + combined::SignedBeaconBlock, deneb::primitives::BlobIndex, preset::Preset, + traits::SignedBeaconBlock as _, +}; +use web3::{api::Namespace as _, helpers::CallFuture, Error, Transport as _}; + +use crate::{eth1_api::ENGINE_GET_EL_BLOBS_V1, ApiController, Eth1Api}; + +const ENGINE_EXCHANGE_CAPABILITIES_TIMEOUT: Duration = Duration::from_secs(1); + +pub fn spawn_blobs_download_task( + eth1_api: Arc, + controller: ApiController, + dedicated_executor: &DedicatedExecutor, + block: Arc>, + blob_indices: Vec, +) { + dedicated_executor + .spawn(async move { download_blobs(ð1_api, controller, block, blob_indices).await }) + .detach(); +} + +pub fn spawn_exchange_capabilities_task( + eth1_api: Arc, + dedicated_executor: &DedicatedExecutor, +) { + dedicated_executor + .spawn(async move { + if let Err(error) = exchange_capabilities(ð1_api).await { + warn!("exhcange capabilities task failed: {error:?}"); + } + }) + .detach(); +} + +async fn download_blobs( + eth1_api: &Eth1Api, + controller: ApiController, + block: Arc>, + blob_indices: Vec, +) { + if let Some(body) = block.message().body().post_deneb() { + let kzg_commitments = body + .blob_kzg_commitments() + .iter() + .zip(0..) + .filter(|(_, index)| blob_indices.contains(index)) + .collect::>(); + + let versioned_hashes = kzg_commitments + .iter() + .copied() + .map(|(commitment, _)| misc::kzg_commitment_to_versioned_hash(*commitment)) + .collect(); + + match eth1_api.get_blobs::

(versioned_hashes).await { + Ok(blobs_and_proofs) => { + let block_header = block.to_header(); + + for (blob_and_proof, kzg_commitment, index) in blobs_and_proofs + .into_iter() + .zip(kzg_commitments.into_iter()) + .filter_map(|(blob_and_proof, (kzg_commitment, index))| { + blob_and_proof.map(|blob_and_proof| (blob_and_proof, kzg_commitment, index)) + }) + { + let BlobAndProofV1 { blob, proof } = blob_and_proof; + + match misc::construct_blob_sidecar( + &block, + block_header, + index, + blob, + *kzg_commitment, + proof, + ) { + Ok(blob_sidecar) => { + controller.on_el_blob_sidecar(Arc::new(blob_sidecar)); + } + Err(error) => warn!( + "failed to construct blob sidecar with blob and proof \ + received from execution layer: {error:?}" + ), + } + } + } + Err(error) => warn!("engine_getBlobsV1 call failed: {error}"), + } + } +} + +async fn exchange_capabilities(eth1_api: &Eth1Api) -> Result<()> { + let params = vec![serde_json::to_value([ENGINE_GET_EL_BLOBS_V1])?]; + let method = "engine_exchangeCapabilities"; + + for endpoint in eth1_api.endpoints.endpoints_for_request(None) { + let _timer = eth1_api.metrics.as_ref().map(|metrics| { + prometheus_metrics::start_timer_vec(&metrics.eth1_api_request_times, method) + }); + + let api = eth1_api.build_api_for_request(endpoint); + + let response: Result, Error> = + CallFuture::new(api.transport().execute_with_headers( + method, + params.clone(), + eth1_api.auth.headers()?, + Some(ENGINE_EXCHANGE_CAPABILITIES_TIMEOUT), + )) + .await; + + match response { + Ok(response) => { + eth1_api.on_ok_response(endpoint); + endpoint.set_capabilities(response); + + info!("updated capabilities for eth1 endpoint: {}", endpoint.url()); + } + Err(error) => { + eth1_api.on_error_response(endpoint); + + warn!( + "unable to update capabilities for eth1 endpoint: {} {error:?}", + endpoint.url(), + ); + } + } + } + + Ok(()) +} diff --git a/execution_engine/src/execution_engine.rs b/execution_engine/src/execution_engine.rs index b81a2a59..e80cc22c 100644 --- a/execution_engine/src/execution_engine.rs +++ b/execution_engine/src/execution_engine.rs @@ -10,7 +10,8 @@ use either::Either; use futures::channel::oneshot::Sender; use thiserror::Error; use types::{ - combined::{ExecutionPayload, ExecutionPayloadParams}, + combined::{ExecutionPayload, ExecutionPayloadParams, SignedBeaconBlock}, + deneb::primitives::BlobIndex, nonstandard::{Phase, TimedPowBlock}, phase0::primitives::{ExecutionBlockHash, H256}, preset::Preset, @@ -23,6 +24,12 @@ pub trait ExecutionEngine { fn allow_optimistic_merge_block_validation(&self) -> bool; + /// [`engine_exchangeCapabilities`](https://github.com/ethereum/execution-apis/blob/9707339bc8222f6d43b3bf0a7a91623f7ce52213/src/engine/common.md#engine_exchangecapabilities) + fn exchange_capabilities(&self); + + /// [`engine_getBlobsV1`](https://github.com/ethereum/execution-apis/blob/9707339bc8222f6d43b3bf0a7a91623f7ce52213/src/engine/cancun.md#engine_getblobsv1) + fn get_blobs(&self, block: Arc>, blob_indices: Vec); + /// [`notify_forkchoice_updated`](https://github.com/ethereum/consensus-specs/blob/1bfefe301da592375e2e02f65849a96aadec1936/specs/bellatrix/fork-choice.md#notify_forkchoice_updated) fn notify_forkchoice_updated( &self, @@ -53,6 +60,14 @@ impl> ExecutionEngine

for &E { (*self).allow_optimistic_merge_block_validation() } + fn exchange_capabilities(&self) { + (*self).exchange_capabilities(); + } + + fn get_blobs(&self, block: Arc>, blob_indices: Vec) { + (*self).get_blobs(block, blob_indices) + } + fn notify_forkchoice_updated( &self, head_eth1_block_hash: ExecutionBlockHash, @@ -92,6 +107,14 @@ impl> ExecutionEngine

for Arc { self.as_ref().allow_optimistic_merge_block_validation() } + fn exchange_capabilities(&self) { + self.as_ref().exchange_capabilities() + } + + fn get_blobs(&self, block: Arc>, blob_indices: Vec) { + self.as_ref().get_blobs(block, blob_indices) + } + fn notify_forkchoice_updated( &self, head_eth1_block_hash: ExecutionBlockHash, @@ -134,6 +157,18 @@ impl> ExecutionEngine

for Mutex { .allow_optimistic_merge_block_validation() } + fn exchange_capabilities(&self) { + self.lock() + .expect("execution engine mutex is poisoned") + .exchange_capabilities() + } + + fn get_blobs(&self, block: Arc>, blob_indices: Vec) { + self.lock() + .expect("execution engine mutex is poisoned") + .get_blobs(block, blob_indices) + } + fn notify_forkchoice_updated( &self, head_eth1_block_hash: ExecutionBlockHash, @@ -182,6 +217,10 @@ impl ExecutionEngine

for NullExecutionEngine { false } + fn exchange_capabilities(&self) {} + + fn get_blobs(&self, _block: Arc>, _blob_indices: Vec) {} + fn notify_forkchoice_updated( &self, _head_eth1_block_hash: ExecutionBlockHash, @@ -220,6 +259,10 @@ impl ExecutionEngine

for MockExecutionEngine { self.optimistic_merge_block_validation } + fn exchange_capabilities(&self) {} + + fn get_blobs(&self, _block: Arc>, _blob_indices: Vec) {} + fn notify_forkchoice_updated( &self, _head_eth1_block_hash: ExecutionBlockHash, diff --git a/execution_engine/src/lib.rs b/execution_engine/src/lib.rs index 02895d2d..ca41bc88 100644 --- a/execution_engine/src/lib.rs +++ b/execution_engine/src/lib.rs @@ -1,11 +1,12 @@ pub use crate::{ execution_engine::{ExecutionEngine, MockExecutionEngine, NullExecutionEngine}, types::{ - EngineGetPayloadV1Response, EngineGetPayloadV2Response, EngineGetPayloadV3Response, - EngineGetPayloadV4Response, ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3, - ForkChoiceStateV1, ForkChoiceUpdatedResponse, PayloadAttributes, PayloadAttributesV1, - PayloadAttributesV2, PayloadAttributesV3, PayloadId, PayloadStatus, PayloadStatusV1, - PayloadStatusWithBlockHash, PayloadValidationStatus, RawExecutionRequests, WithdrawalV1, + BlobAndProofV1, EngineGetPayloadV1Response, EngineGetPayloadV2Response, + EngineGetPayloadV3Response, EngineGetPayloadV4Response, ExecutionPayloadV1, + ExecutionPayloadV2, ExecutionPayloadV3, ForkChoiceStateV1, ForkChoiceUpdatedResponse, + PayloadAttributes, PayloadAttributesV1, PayloadAttributesV2, PayloadAttributesV3, + PayloadId, PayloadStatus, PayloadStatusV1, PayloadStatusWithBlockHash, + PayloadValidationStatus, RawExecutionRequests, WithdrawalV1, }, }; diff --git a/execution_engine/src/types.rs b/execution_engine/src/types.rs index 0116f8d9..24ba8d6a 100644 --- a/execution_engine/src/types.rs +++ b/execution_engine/src/types.rs @@ -723,6 +723,13 @@ impl From> for ExecutionRequests

{ } } +#[derive(Deserialize)] +#[serde(bound = "")] +pub struct BlobAndProofV1 { + pub blob: Blob

, + pub proof: KzgProof, +} + #[cfg(test)] mod tests { use anyhow::Result; diff --git a/fork_choice_control/src/controller.rs b/fork_choice_control/src/controller.rs index d069eda4..12226b38 100644 --- a/fork_choice_control/src/controller.rs +++ b/fork_choice_control/src/controller.rs @@ -399,6 +399,10 @@ where }) } + pub fn on_el_blob_sidecar(&self, blob_sidecar: Arc>) { + self.spawn_blob_sidecar_task(blob_sidecar, true, BlobSidecarOrigin::ExecutionLayer) + } + pub fn on_gossip_blob_sidecar( &self, blob_sidecar: Arc>, diff --git a/fork_choice_control/src/messages.rs b/fork_choice_control/src/messages.rs index 59822e81..f37b7160 100644 --- a/fork_choice_control/src/messages.rs +++ b/fork_choice_control/src/messages.rs @@ -21,7 +21,7 @@ use log::debug; use serde::Serialize; use types::{ combined::{Attestation, BeaconState, SignedAggregateAndProof, SignedBeaconBlock}, - deneb::containers::BlobIdentifier, + deneb::containers::{BlobIdentifier, BlobSidecar}, phase0::{ containers::Checkpoint, primitives::{DepositIndex, ExecutionBlockHash, Slot, ValidatorIndex, H256}, @@ -165,6 +165,7 @@ pub enum P2pMessage { Slot(Slot), Accept(GossipId), Ignore(GossipId), + PublishBlobSidecar(Arc>), Reject(GossipId, MutatorRejectionReason), BlockNeeded(H256, Option), BlobsNeeded(Vec, Slot, Option), diff --git a/fork_choice_control/src/mutator.rs b/fork_choice_control/src/mutator.rs index 6c2ae108..db066f5b 100644 --- a/fork_choice_control/src/mutator.rs +++ b/fork_choice_control/src/mutator.rs @@ -47,7 +47,10 @@ use ssz::SszHash as _; use std_ext::ArcExt as _; use types::{ combined::{BeaconState, ExecutionPayloadParams, SignedBeaconBlock}, - deneb::containers::{BlobIdentifier, BlobSidecar}, + deneb::{ + containers::{BlobIdentifier, BlobSidecar}, + primitives::BlobIndex, + }, nonstandard::{RelativeEpoch, ValidationOutcome}, phase0::{ containers::Checkpoint, @@ -345,6 +348,10 @@ where } } + if tick.is_start_of_epoch::

() { + self.execution_engine.exchange_capabilities(); + } + // Query the execution engine for the current status of the head // if it is still optimistic 1 second before the next interval. if tick.is_end_of_interval() { @@ -519,6 +526,11 @@ where Ok(ValidationOutcome::Ignore(false)), ); + self.request_blobs_from_execution_engine( + pending_block.block.clone_arc(), + missing_blob_indices.clone(), + ); + let blob_ids = missing_blob_indices .into_iter() .map(|index| BlobIdentifier { block_root, index }) @@ -1046,6 +1058,10 @@ where ) { match result { Ok(BlobSidecarAction::Accept(blob_sidecar)) => { + if origin.is_from_el() { + P2pMessage::PublishBlobSidecar(blob_sidecar.clone_arc()).send(&self.p2p_tx); + } + let (gossip_id, sender) = origin.split(); if let Some(gossip_id) = gossip_id { @@ -1701,6 +1717,14 @@ where self.notify_forkchoice_updated(&new_head); } + fn request_blobs_from_execution_engine( + &self, + block: Arc>, + missing_blob_indices: Vec, + ) { + self.execution_engine.get_blobs(block, missing_blob_indices); + } + fn notify_forkchoice_updated(&self, new_head: &ChainLink

) { let new_head_state = new_head.state(&self.store); diff --git a/fork_choice_store/src/misc.rs b/fork_choice_store/src/misc.rs index 7d995184..894d6dde 100644 --- a/fork_choice_store/src/misc.rs +++ b/fork_choice_store/src/misc.rs @@ -494,6 +494,7 @@ impl AttesterSlashingOrigin { #[derive(Debug)] pub enum BlobSidecarOrigin { Api(Option>>), + ExecutionLayer, Gossip(SubnetId, GossipId), Requested(PeerId), Own, @@ -510,7 +511,7 @@ impl BlobSidecarOrigin { match self { Self::Gossip(_, gossip_id) => (Some(gossip_id), None), Self::Api(sender) => (None, sender), - Self::Own | Self::Requested(_) => (None, None), + Self::ExecutionLayer | Self::Own | Self::Requested(_) => (None, None), } } @@ -518,7 +519,7 @@ impl BlobSidecarOrigin { pub fn gossip_id(self) -> Option { match self { Self::Gossip(_, gossip_id) => Some(gossip_id), - Self::Api(_) | Self::Own | Self::Requested(_) => None, + Self::Api(_) | Self::ExecutionLayer | Self::Own | Self::Requested(_) => None, } } @@ -527,7 +528,7 @@ impl BlobSidecarOrigin { match self { Self::Gossip(_, gossip_id) => Some(gossip_id.source), Self::Requested(peer_id) => Some(*peer_id), - Self::Api(_) | Self::Own => None, + Self::Api(_) | Self::ExecutionLayer | Self::Own => None, } } @@ -535,9 +536,14 @@ impl BlobSidecarOrigin { pub const fn subnet_id(&self) -> Option { match self { Self::Gossip(subnet_id, _) => Some(*subnet_id), - Self::Api(_) | Self::Own | Self::Requested(_) => None, + Self::Api(_) | Self::ExecutionLayer | Self::Own | Self::Requested(_) => None, } } + + #[must_use] + pub const fn is_from_el(&self) -> bool { + matches!(self, Self::ExecutionLayer) + } } pub enum BlockAction { diff --git a/helper_functions/src/error.rs b/helper_functions/src/error.rs index 01e9bc35..4b8fed13 100644 --- a/helper_functions/src/error.rs +++ b/helper_functions/src/error.rs @@ -1,5 +1,7 @@ use parse_display::Display; +use ssz::H256; use thiserror::Error; +use types::phase0::primitives::Slot; #[derive(Debug, Error)] pub(crate) enum Error { @@ -9,6 +11,10 @@ pub(crate) enum Error { AttestationSourceMismatch, #[error("attesting indices are not sorted and unique")] AttestingIndicesNotSortedAndUnique, + #[error( + "attempted to construct a blob sidecar for pre-Deneb block: slot: {slot}, root: {root:?}" + )] + BlobsForPreDenebBlock { root: H256, slot: Slot }, #[error("committee index is out of bounds")] CommitteeIndexOutOfBounds, #[error("aggregation bitlist length {aggregation_bitlist_length} does not match committee length {committee_length}")] diff --git a/helper_functions/src/misc.rs b/helper_functions/src/misc.rs index da312e45..b9868256 100644 --- a/helper_functions/src/misc.rs +++ b/helper_functions/src/misc.rs @@ -28,7 +28,7 @@ use types::{ AttestationSubnetCount, BLS_WITHDRAWAL_PREFIX, ETH1_ADDRESS_WITHDRAWAL_PREFIX, GENESIS_EPOCH, GENESIS_SLOT, }, - containers::{ForkData, SigningData, Validator}, + containers::{ForkData, SignedBeaconBlockHeader, SigningData, Validator}, primitives::{ CommitteeIndex, Domain, DomainType, Epoch, ExecutionAddress, ForkDigest, Gwei, NodeId, Slot, SubnetId, Uint256, UnixSeconds, ValidatorIndex, Version, H256, @@ -514,6 +514,39 @@ pub fn blob_serve_range_slot(config: &Config, current_slot: Slot) -> compute_start_slot_at_epoch::

(epoch) } +pub fn construct_blob_sidecar( + block: &SignedBeaconBlock

, + signed_block_header: SignedBeaconBlockHeader, + index: BlobIndex, + blob: Blob

, + kzg_commitment: KzgCommitment, + kzg_proof: KzgProof, +) -> Result> { + let message = block.message(); + + let Some(body) = message.body().post_deneb() else { + return Err(Error::BlobsForPreDenebBlock { + root: message.hash_tree_root(), + slot: message.slot(), + } + .into()); + }; + + let kzg_commitment_inclusion_proof = match message.body().post_electra() { + Some(body) => electra_kzg_commitment_inclusion_proof(body, index)?, + None => deneb_kzg_commitment_inclusion_proof(body, index)?, + }; + + Ok(BlobSidecar { + index, + blob, + kzg_commitment, + kzg_proof, + signed_block_header, + kzg_commitment_inclusion_proof, + }) +} + pub fn construct_blob_sidecars( block: &SignedBeaconBlock

, blobs: impl IntoIterator>, @@ -528,19 +561,14 @@ pub fn construct_blob_sidecars( izip!(0.., blobs, proofs, commitments) .map(|(index, blob, kzg_proof, kzg_commitment)| { - let kzg_commitment_inclusion_proof = match block.message().body().post_electra() { - Some(body) => electra_kzg_commitment_inclusion_proof(body, index)?, - None => deneb_kzg_commitment_inclusion_proof(body, index)?, - }; - - Ok(BlobSidecar { + construct_blob_sidecar( + block, + signed_block_header, index, blob, kzg_commitment, kzg_proof, - signed_block_header, - kzg_commitment_inclusion_proof, - }) + ) }) .collect() } diff --git a/http_api/src/context.rs b/http_api/src/context.rs index 0003b67f..a5089420 100644 --- a/http_api/src/context.rs +++ b/http_api/src/context.rs @@ -198,9 +198,17 @@ impl Context

{ controller.on_requested_block(block, None); } + let dedicated_executor = Arc::new(DedicatedExecutor::new( + "dedicated-executor", + num_cpus::get(), + None, + None, + )); + let execution_service = ExecutionService::new( eth1_api.clone_arc(), controller.clone_arc(), + dedicated_executor.clone_arc(), execution_service_rx, ); @@ -233,13 +241,6 @@ impl Context

{ H256::default(), )); - let dedicated_executor = Arc::new(DedicatedExecutor::new( - "dedicated-executor", - num_cpus::get(), - None, - None, - )); - let attestation_verifier = AttestationVerifier::new( controller.clone_arc(), dedicated_executor.clone_arc(), diff --git a/http_api/src/standard.rs b/http_api/src/standard.rs index e9679532..cb5286ed 100644 --- a/http_api/src/standard.rs +++ b/http_api/src/standard.rs @@ -1706,7 +1706,7 @@ pub async fn node_syncing_status( let head_slot = snapshot.head_slot(); let is_synced = is_synced.get(); let is_back_synced = is_back_synced.get(); - let el_offline = eth1_api.el_offline().await; + let el_offline = eth1_api.el_offline(); EthResponse::json(NodeSyncingResponse { head_slot, diff --git a/p2p/src/network.rs b/p2p/src/network.rs index 4d2234a7..63e520fd 100644 --- a/p2p/src/network.rs +++ b/p2p/src/network.rs @@ -312,6 +312,9 @@ impl Network

{ P2pMessage::Ignore(gossip_id) => { self.report_outcome(gossip_id, MessageAcceptance::Ignore); } + P2pMessage::PublishBlobSidecar(blob_sidecar) => { + self.publish_blob_sidecar(blob_sidecar); + }, P2pMessage::Reject(gossip_id, mutator_rejection_reason) => { self.report_outcome(gossip_id.clone(), MessageAcceptance::Reject); self.report_peer( diff --git a/runtime/src/runtime.rs b/runtime/src/runtime.rs index 8f1d145a..d240d533 100644 --- a/runtime/src/runtime.rs +++ b/runtime/src/runtime.rs @@ -144,6 +144,22 @@ pub async fn run_after_genesis( let mut validator_to_slasher_tx = None; let mut validator_to_liveness_tx = None; + let num_of_cpus = num_cpus::get(); + + let dedicated_executor_low_priority = Arc::new(DedicatedExecutor::new( + "de-low", + (num_of_cpus / 4).max(1), + Some(19), + metrics.clone(), + )); + + let dedicated_executor_normal_priority = Arc::new(DedicatedExecutor::new( + "de-normal", + num_of_cpus, + None, + metrics.clone(), + )); + let eth1_api = Arc::new(Eth1Api::new( chain_config.clone_arc(), signer_snapshot.client().clone(), @@ -153,6 +169,11 @@ pub async fn run_after_genesis( metrics.clone(), )); + eth1_api::spawn_exchange_capabilities_task( + eth1_api.clone_arc(), + &dedicated_executor_low_priority, + ); + let execution_engine = Arc::new(Eth1ExecutionEngine::new( chain_config.clone_arc(), eth1_api.clone_arc(), @@ -227,27 +248,12 @@ pub async fn run_after_genesis( let execution_service = ExecutionService::new( eth1_api.clone_arc(), controller.clone_arc(), + dedicated_executor_low_priority.clone_arc(), execution_service_rx, ); let validator_keys = Arc::new(signer_snapshot.keys().copied().collect::>()); - let num_of_cpus = num_cpus::get(); - - let dedicated_executor_low_priority = Arc::new(DedicatedExecutor::new( - "de-low", - (num_of_cpus / 4).max(1), - Some(19), - metrics.clone(), - )); - - let dedicated_executor_normal_priority = Arc::new(DedicatedExecutor::new( - "de-normal", - num_of_cpus, - None, - metrics.clone(), - )); - let attestation_verifier = AttestationVerifier::new( controller.clone_arc(), dedicated_executor_low_priority,