From a18a111e8bdc7c2fb7efbde53bfa93b2a35082a0 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Fri, 20 Sep 2024 23:17:29 +0200 Subject: [PATCH] feat(provider): subscribe to new blocks if possible in heartbeat --- crates/network-primitives/src/traits.rs | 4 +- crates/provider/src/{chain.rs => blocks.rs} | 113 ++++++++++++-------- crates/provider/src/heart.rs | 58 +++++----- crates/provider/src/lib.rs | 2 +- crates/provider/src/provider/root.rs | 59 +++++----- crates/provider/src/provider/trait.rs | 15 +-- crates/pubsub/src/sub.rs | 5 + crates/rpc-client/src/builtin.rs | 21 ++-- crates/rpc-client/src/client.rs | 13 +++ crates/rpc-client/src/poller.rs | 81 +++++++------- crates/rpc-types-eth/src/block.rs | 6 +- 11 files changed, 212 insertions(+), 165 deletions(-) rename crates/provider/src/{chain.rs => blocks.rs} (63%) diff --git a/crates/network-primitives/src/traits.rs b/crates/network-primitives/src/traits.rs index a8ec7e3f8b2f..d3e147e0799b 100644 --- a/crates/network-primitives/src/traits.rs +++ b/crates/network-primitives/src/traits.rs @@ -132,9 +132,9 @@ pub trait HeaderResponse { /// Block JSON-RPC response. pub trait BlockResponse { /// Header type - type Header; + type Header: HeaderResponse; /// Transaction type - type Transaction; + type Transaction: TransactionResponse; /// Block header fn header(&self) -> &Self::Header; diff --git a/crates/provider/src/chain.rs b/crates/provider/src/blocks.rs similarity index 63% rename from crates/provider/src/chain.rs rename to crates/provider/src/blocks.rs index b69c0f6b700b..a1079d06f4df 100644 --- a/crates/provider/src/chain.rs +++ b/crates/provider/src/blocks.rs @@ -1,10 +1,9 @@ use alloy_network::{Ethereum, Network}; use alloy_primitives::{BlockNumber, U64}; use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient}; -use alloy_rpc_types_eth::Block; -use alloy_transport::{RpcError, Transport}; +use alloy_transport::{RpcError, Transport, TransportResult}; use async_stream::stream; -use futures::{Stream, StreamExt}; +use futures::{future::Either, Stream, StreamExt}; use lru::LruCache; use std::{marker::PhantomData, num::NonZeroUsize}; @@ -17,38 +16,48 @@ const MAX_RETRIES: usize = 3; /// Default block number for when we don't have a block yet. const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX; -pub(crate) struct ChainStreamPoller { +/// Streams new blocks from the client. +pub(crate) struct NewBlocks { client: WeakClient, - poll_task: PollerBuilder, next_yield: BlockNumber, - known_blocks: LruCache, + known_blocks: LruCache, _phantom: PhantomData, } -impl ChainStreamPoller { - pub(crate) fn from_weak_client(w: WeakClient) -> Self { - Self::new(w) - } - +impl NewBlocks { pub(crate) fn new(client: WeakClient) -> Self { - Self::with_next_yield(client, NO_BLOCK_NUMBER) - } - - /// Can be used to force the poller to start at a specific block number. - /// Mostly useful for tests. - fn with_next_yield(client: WeakClient, next_yield: BlockNumber) -> Self { Self { client: client.clone(), - poll_task: PollerBuilder::new(client, "eth_blockNumber", []), - next_yield, + next_yield: NO_BLOCK_NUMBER, known_blocks: LruCache::new(BLOCK_CACHE_SIZE), _phantom: PhantomData, } } - pub(crate) fn into_stream(mut self) -> impl Stream + 'static { + pub(crate) async fn into_stream( + self, + ) -> TransportResult + 'static> { + #[cfg(feature = "pubsub")] + if let Some(client) = self.client.upgrade() { + if let Some(pubsub) = client.pubsub_frontend() { + let id = client.request("eth_subscribe", ("newHeads",)).await?; + let sub = pubsub.get_subscription(id).await?; + return Ok(Either::Left(sub.into_typed::().into_stream())); + } + } + + #[cfg(feature = "pubsub")] + let right = Either::Right; + #[cfg(not(feature = "pubsub"))] + let right = std::convert::identity; + Ok(right(self.into_poll_stream())) + } + + fn into_poll_stream(mut self) -> impl Stream + 'static { + let poll_task_builder: PollerBuilder = + PollerBuilder::new(self.client.clone(), "eth_blockNumber", []); + let mut poll_task = poll_task_builder.spawn().into_stream_raw(); stream! { - let mut poll_task = self.poll_task.spawn().into_stream_raw(); 'task: loop { // Clear any buffered blocks. while let Some(known_block) = self.known_blocks.pop(&self.next_yield) { @@ -62,11 +71,11 @@ impl ChainStreamPoller { Some(Ok(block_number)) => block_number, Some(Err(err)) => { // This is fine. - debug!(%err, "polling stream lagged"); + debug!(%err, "block number stream lagged"); continue 'task; } None => { - debug!("polling stream ended"); + debug!("block number stream ended"); break 'task; } }; @@ -125,14 +134,11 @@ impl ChainStreamPoller { #[cfg(all(test, feature = "anvil-api"))] // Tests rely heavily on ability to mine blocks on demand. mod tests { - use std::{future::Future, time::Duration}; - - use crate::{ext::AnvilApi, ProviderBuilder}; + use super::*; + use crate::{ext::AnvilApi, Provider, ProviderBuilder}; use alloy_node_bindings::Anvil; use alloy_primitives::U256; - use alloy_rpc_client::ReqwestClient; - - use super::*; + use std::{future::Future, time::Duration}; fn init_tracing() { let _ = tracing_subscriber::fmt::try_init(); @@ -140,32 +146,48 @@ mod tests { async fn with_timeout(fut: T) -> T::Output { tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Operation timed out"), + _ = tokio::time::sleep(Duration::from_secs(2)) => panic!("Operation timed out"), out = fut => out, } } #[tokio::test] - async fn yield_block() { + async fn yield_block_http() { + yield_block(false).await; + } + #[tokio::test] + #[cfg(feature = "ws")] + async fn yield_block_ws() { + yield_block(true).await; + } + async fn yield_block(ws: bool) { init_tracing(); let anvil = Anvil::new().spawn(); - let client = ReqwestClient::new_http(anvil.endpoint_url()); - let poller: ChainStreamPoller<_, Ethereum> = - ChainStreamPoller::with_next_yield(client.get_weak(), 1); - let mut stream = Box::pin(poller.into_stream()); + let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() }; + let provider = ProviderBuilder::new().on_builtin(&url).await.unwrap(); + + let poller: NewBlocks<_, Ethereum> = NewBlocks::new(provider.weak_client()); + let mut stream = Box::pin(poller.into_stream().await.unwrap()); // We will also use provider to manipulate anvil instance via RPC. - let provider = ProviderBuilder::new().on_http(anvil.endpoint_url()); provider.anvil_mine(Some(U256::from(1)), None).await.unwrap(); let block = with_timeout(stream.next()).await.expect("Block wasn't fetched"); - assert_eq!(block.header.number, 1); + assert!(block.header.number <= 1); } #[tokio::test] - async fn yield_many_blocks() { + async fn yield_many_blocks_http() { + yield_many_blocks(false).await; + } + #[tokio::test] + #[cfg(feature = "ws")] + async fn yield_many_blocks_ws() { + yield_many_blocks(true).await; + } + async fn yield_many_blocks(ws: bool) { // Make sure that we can process more blocks than fits in the cache. const BLOCKS_TO_MINE: usize = BLOCK_CACHE_SIZE.get() + 1; @@ -173,16 +195,21 @@ mod tests { let anvil = Anvil::new().spawn(); - let client = ReqwestClient::new_http(anvil.endpoint_url()); - let poller: ChainStreamPoller<_, Ethereum> = - ChainStreamPoller::with_next_yield(client.get_weak(), 1); - let stream = Box::pin(poller.into_stream()); + let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() }; + let provider = ProviderBuilder::new().on_builtin(&url).await.unwrap(); + + let poller: NewBlocks<_, Ethereum> = NewBlocks::new(provider.weak_client()); + let stream = Box::pin(poller.into_stream().await.unwrap()); // We will also use provider to manipulate anvil instance via RPC. - let provider = ProviderBuilder::new().on_http(anvil.endpoint_url()); provider.anvil_mine(Some(U256::from(BLOCKS_TO_MINE)), None).await.unwrap(); let blocks = with_timeout(stream.take(BLOCKS_TO_MINE).collect::>()).await; assert_eq!(blocks.len(), BLOCKS_TO_MINE); + let first = blocks[0].header.number; + assert!(first <= 1); + for (i, block) in blocks.iter().enumerate() { + assert_eq!(block.header.number, first + i as u64); + } } } diff --git a/crates/provider/src/heart.rs b/crates/provider/src/heart.rs index 247c7201c4e8..421a08f2e9a8 100644 --- a/crates/provider/src/heart.rs +++ b/crates/provider/src/heart.rs @@ -2,9 +2,8 @@ use crate::{Provider, RootProvider}; use alloy_json_rpc::RpcError; -use alloy_network::Network; +use alloy_network::{BlockResponse, HeaderResponse, Network}; use alloy_primitives::{TxHash, B256}; -use alloy_rpc_types_eth::Block; use alloy_transport::{utils::Spawnable, Transport, TransportError}; use futures::{stream::StreamExt, FutureExt, Stream}; use std::{ @@ -74,7 +73,7 @@ pub enum PendingTransactionError { #[must_use = "this type does nothing unless you call `register`, `watch` or `get_receipt`"] #[derive(Debug)] #[doc(alias = "PendingTxBuilder")] -pub struct PendingTransactionBuilder<'a, T, N> { +pub struct PendingTransactionBuilder<'a, T, N: Network> { config: PendingTransactionConfig, provider: &'a RootProvider, } @@ -400,12 +399,12 @@ impl Future for PendingTransaction { /// A handle to the heartbeat task. #[derive(Clone, Debug)] -pub(crate) struct HeartbeatHandle { +pub(crate) struct HeartbeatHandle { tx: mpsc::Sender, - latest: watch::Receiver>, + latest: watch::Receiver>, } -impl HeartbeatHandle { +impl HeartbeatHandle { /// Watch for a transaction to be confirmed with the given config. #[doc(alias = "watch_transaction")] pub(crate) async fn watch_tx( @@ -423,14 +422,14 @@ impl HeartbeatHandle { /// Returns a watcher that always sees the latest block. #[allow(dead_code)] - pub(crate) const fn latest(&self) -> &watch::Receiver> { + pub(crate) const fn latest(&self) -> &watch::Receiver> { &self.latest } } // TODO: Parameterize with `Network` /// A heartbeat task that receives blocks and watches for transactions. -pub(crate) struct Heartbeat { +pub(crate) struct Heartbeat { /// The stream of incoming blocks to watch. stream: futures::stream::Fuse, @@ -445,9 +444,11 @@ pub(crate) struct Heartbeat { /// Ordered map of transactions to reap at a certain time. reap_at: BTreeMap, + + _network: std::marker::PhantomData, } -impl + Unpin + 'static> Heartbeat { +impl + Unpin + 'static> Heartbeat { /// Create a new heartbeat task. pub(crate) fn new(stream: S) -> Self { Self { @@ -456,11 +457,10 @@ impl + Unpin + 'static> Heartbeat { unconfirmed: Default::default(), waiting_confs: Default::default(), reap_at: Default::default(), + _network: Default::default(), } } -} -impl Heartbeat { /// Check if any transactions have enough confirmations to notify. fn check_confirmations(&mut self, current_height: u64) { let to_keep = self.waiting_confs.split_off(&(current_height + 1)); @@ -561,9 +561,13 @@ impl Heartbeat { /// Handle a new block by checking if any of the transactions we're /// watching are in it, and if so, notifying the watcher. Also updates /// the latest block. - fn handle_new_block(&mut self, block: Block, latest: &watch::Sender>) { + fn handle_new_block( + &mut self, + block: N::BlockResponse, + latest: &watch::Sender>, + ) { // Blocks without numbers are ignored, as they're not part of the chain. - let block_height = &block.header.number; + let block_height = block.header().number(); // Add the block the lookbehind. // The value is chosen arbitrarily to not have a huge memory footprint but still @@ -577,19 +581,19 @@ impl Heartbeat { } if let Some((last_height, _)) = self.past_blocks.back().as_ref() { // Check that the chain is continuous. - if *last_height + 1 != *block_height { + if *last_height + 1 != block_height { // Move all the transactions that were reset by the reorg to the unconfirmed list. warn!(%block_height, last_height, "reorg detected"); - self.move_reorg_to_unconfirmed(*block_height); + self.move_reorg_to_unconfirmed(block_height); // Remove past blocks that are now invalid. - self.past_blocks.retain(|(h, _)| h < block_height); + self.past_blocks.retain(|(h, _)| *h < block_height); } } - self.past_blocks.push_back((*block_height, block.transactions.hashes().collect())); + self.past_blocks.push_back((block_height, block.transactions().hashes().collect())); // Check if we are watching for any of the transactions in this block. let to_check: Vec<_> = block - .transactions + .transactions() .hashes() .filter_map(|tx_hash| self.unconfirmed.remove(&tx_hash)) .collect(); @@ -607,12 +611,12 @@ impl Heartbeat { warn!(tx=%watcher.config.tx_hash, set_block=%set_block, new_block=%block_height, "received_at_block already set"); // We don't override the set value. } else { - watcher.received_at_block = Some(*block_height); + watcher.received_at_block = Some(block_height); } - self.add_to_waiting_list(watcher, *block_height); + self.add_to_waiting_list(watcher, block_height); } - self.check_confirmations(*block_height); + self.check_confirmations(block_height); // Update the latest block. We use `send_replace` here to ensure the // latest block is always up to date, even if no receivers exist. @@ -623,7 +627,7 @@ impl Heartbeat { } #[cfg(target_arch = "wasm32")] -impl + Unpin + 'static> Heartbeat { +impl + Unpin + 'static> Heartbeat { /// Spawn the heartbeat task, returning a [`HeartbeatHandle`]. pub(crate) fn spawn(self) -> HeartbeatHandle { let (latest, latest_rx) = watch::channel(None::); @@ -636,10 +640,10 @@ impl + Unpin + 'static> Heartbeat { } #[cfg(not(target_arch = "wasm32"))] -impl + Unpin + Send + 'static> Heartbeat { +impl + Unpin + Send + 'static> Heartbeat { /// Spawn the heartbeat task, returning a [`HeartbeatHandle`]. - pub(crate) fn spawn(self) -> HeartbeatHandle { - let (latest, latest_rx) = watch::channel(None::); + pub(crate) fn spawn(self) -> HeartbeatHandle { + let (latest, latest_rx) = watch::channel(None::); let (ix_tx, ixns) = mpsc::channel(16); self.into_future(latest, ixns).spawn_task(); @@ -648,10 +652,10 @@ impl + Unpin + Send + 'static> Heartbeat { } } -impl + Unpin + 'static> Heartbeat { +impl + Unpin + 'static> Heartbeat { async fn into_future( mut self, - latest: watch::Sender>, + latest: watch::Sender>, mut ixns: mpsc::Receiver, ) { 'shutdown: loop { diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index 9105f58e530b..dd2c09c3d511 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -28,7 +28,7 @@ extern crate tracing; mod builder; pub use builder::{Identity, ProviderBuilder, ProviderLayer, Stack}; -mod chain; +mod blocks; pub mod ext; diff --git a/crates/provider/src/provider/root.rs b/crates/provider/src/provider/root.rs index a47c8f288245..be9f6ff7edc7 100644 --- a/crates/provider/src/provider/root.rs +++ b/crates/provider/src/provider/root.rs @@ -1,16 +1,14 @@ use crate::{ - chain::ChainStreamPoller, + blocks::NewBlocks, heart::{Heartbeat, HeartbeatHandle}, Identity, ProviderBuilder, }; use alloy_network::{Ethereum, Network}; use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, ClientRef, RpcClient, WeakClient}; -use alloy_transport::{BoxTransport, BoxTransportConnect, Transport, TransportError}; -use std::{ - fmt, - marker::PhantomData, - sync::{Arc, OnceLock}, +use alloy_transport::{ + BoxTransport, BoxTransportConnect, Transport, TransportError, TransportResult, }; +use std::{fmt, marker::PhantomData, sync::Arc}; #[cfg(feature = "reqwest")] use alloy_transport_http::Http; @@ -20,18 +18,18 @@ use alloy_pubsub::{PubSubFrontend, Subscription}; /// The root provider manages the RPC client and the heartbeat. It is at the /// base of every provider stack. -pub struct RootProvider { +pub struct RootProvider { /// The inner state of the root provider. pub(crate) inner: Arc>, } -impl Clone for RootProvider { +impl Clone for RootProvider { fn clone(&self) -> Self { Self { inner: self.inner.clone() } } } -impl fmt::Debug for RootProvider { +impl fmt::Debug for RootProvider { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("RootProvider").field("client", &self.inner.client).finish_non_exhaustive() } @@ -100,48 +98,43 @@ impl RootProvider { #[cfg(feature = "pubsub")] pub(crate) fn pubsub_frontend(&self) -> alloy_transport::TransportResult<&PubSubFrontend> { - let t = self.transport() as &dyn std::any::Any; - t.downcast_ref::() - .or_else(|| { - t.downcast_ref::() - .and_then(|t| t.as_any().downcast_ref::()) - }) + self.inner + .client_ref() + .pubsub_frontend() .ok_or_else(alloy_transport::TransportErrorKind::pubsub_unavailable) } - #[cfg(feature = "pubsub")] - fn transport(&self) -> &T { - self.inner.client.transport() - } - #[inline] - pub(crate) fn get_heart(&self) -> &HeartbeatHandle { - self.inner.heart.get_or_init(|| { - let poller: ChainStreamPoller = - ChainStreamPoller::from_weak_client(self.inner.weak_client()); - // TODO: Can we avoid `Box::pin` here? - Heartbeat::new(Box::pin(poller.into_stream())).spawn() - }) + pub(crate) async fn get_heart(&self) -> TransportResult<&HeartbeatHandle> { + self.inner + .heart + .get_or_try_init(|| async { + let new_blocks = NewBlocks::::new(self.inner.weak_client()); + let stream = new_blocks.into_stream().await?; + // TODO: Can we avoid `Box::pin` here? + Ok(Heartbeat::new(Box::pin(stream)).spawn()) + }) + .await } } /// The root provider manages the RPC client and the heartbeat. It is at the /// base of every provider stack. -pub(crate) struct RootProviderInner { +pub(crate) struct RootProviderInner { client: RpcClient, - heart: OnceLock, + heart: tokio::sync::OnceCell>, _network: PhantomData, } -impl Clone for RootProviderInner { +impl Clone for RootProviderInner { fn clone(&self) -> Self { Self { client: self.client.clone(), heart: self.heart.clone(), _network: PhantomData } } } impl RootProviderInner { - pub(crate) const fn new(client: RpcClient) -> Self { - Self { client, heart: OnceLock::new(), _network: PhantomData } + pub(crate) fn new(client: RpcClient) -> Self { + Self { client, heart: Default::default(), _network: PhantomData } } pub(crate) fn weak_client(&self) -> WeakClient { @@ -153,7 +146,7 @@ impl RootProviderInner { } } -impl RootProviderInner { +impl RootProviderInner { fn boxed(self) -> RootProviderInner { RootProviderInner { client: self.client.boxed(), heart: self.heart, _network: PhantomData } } diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index d04fa821786e..532eed03f590 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -115,7 +115,7 @@ pub trait Provider: fn get_block_number(&self) -> ProviderCall { self.client() .request_noparams("eth_blockNumber") - .map_resp(crate::utils::convert_u64 as fn(U64) -> u64) + .map_resp(utils::convert_u64 as fn(U64) -> u64) .into() } @@ -156,7 +156,7 @@ pub trait Provider: fn get_chain_id(&self) -> ProviderCall { self.client() .request_noparams("eth_chainId") - .map_resp(crate::utils::convert_u64 as fn(U64) -> u64) + .map_resp(utils::convert_u64 as fn(U64) -> u64) .into() } @@ -184,7 +184,7 @@ pub trait Provider: &self, tx: &'req N::TransactionRequest, ) -> EthCall<'req, T, N, U128, u128> { - EthCall::gas_estimate(self.weak_client(), tx).map_resp(crate::utils::convert_u128) + EthCall::gas_estimate(self.weak_client(), tx).map_resp(utils::convert_u128) } /// Estimates the EIP1559 `maxFeePerGas` and `maxPriorityFeePerGas` fields. @@ -242,7 +242,7 @@ pub trait Provider: fn get_gas_price(&self) -> ProviderCall { self.client() .request_noparams("eth_gasPrice") - .map_resp(crate::utils::convert_u128 as fn(U128) -> u128) + .map_resp(utils::convert_u128 as fn(U128) -> u128) .into() } @@ -545,7 +545,7 @@ pub trait Provider: ) -> RpcWithBlock u64> { self.client() .request("eth_getTransactionCount", address) - .map_resp(crate::utils::convert_u64 as fn(U64) -> u64) + .map_resp(utils::convert_u64 as fn(U64) -> u64) .into() } @@ -693,7 +693,7 @@ pub trait Provider: ) -> TransportResult> { // Make sure to initialize heartbeat before we submit transaction, so that // we don't miss it if user will subscriber to it immediately after sending. - let _handle = self.root().get_heart(); + let _handle = self.root().get_heart().await?; match tx { SendableTx::Builder(mut tx) => { @@ -897,7 +897,7 @@ pub trait Provider: fn get_net_version(&self) -> ProviderCall { self.client() .request_noparams("net_version") - .map_resp(crate::utils::convert_u64 as fn(U64) -> u64) + .map_resp(utils::convert_u64 as fn(U64) -> u64) .into() } @@ -1008,6 +1008,7 @@ impl Provider for RootProvider { }; self.get_heart() + .await? .watch_tx(config, block_number) .await .map_err(|_| PendingTransactionError::FailedToRegister) diff --git a/crates/pubsub/src/sub.rs b/crates/pubsub/src/sub.rs index 342bbb5fbd31..475e3ac121d8 100644 --- a/crates/pubsub/src/sub.rs +++ b/crates/pubsub/src/sub.rs @@ -80,6 +80,11 @@ impl RawSubscription { pub fn into_stream(self) -> BroadcastStream> { self.rx.into() } + + /// Convert into a typed subscription. + pub fn into_typed(self) -> Subscription { + self.into() + } } /// An item in a typed [`Subscription`]. This is either the expected type, or diff --git a/crates/rpc-client/src/builtin.rs b/crates/rpc-client/src/builtin.rs index 6b95963a090c..911e115ef863 100644 --- a/crates/rpc-client/src/builtin.rs +++ b/crates/rpc-client/src/builtin.rs @@ -62,17 +62,15 @@ impl BuiltInConnectionString { // reqwest is enabled, hyper is not #[cfg(all(not(feature = "hyper"), feature = "reqwest"))] Self::Http(url) => { - Ok( - alloy_transport::Transport::boxed( - alloy_transport_http::Http::::new(url.clone()) - ) - ) - }, + Ok(alloy_transport::Transport::boxed( + alloy_transport_http::Http::::new(url.clone()), + )) + } // hyper is enabled, reqwest is not #[cfg(feature = "hyper")] - Self::Http(_) => Err(TransportErrorKind::custom_str( - "hyper not supported by BuiltinConnectionString. Please instantiate a hyper client manually", + Self::Http(url) => Ok(alloy_transport::Transport::boxed( + alloy_transport_http::HyperTransport::new_hyper(url.clone()), )), #[cfg(all(not(target_arch = "wasm32"), feature = "ws"))] @@ -95,7 +93,12 @@ impl BuiltInConnectionString { .await .map(alloy_transport::Transport::boxed), - #[cfg(not(any(feature = "reqwest", feature = "hyper", feature = "ws", feature = "ipc")))] + #[cfg(not(any( + feature = "reqwest", + feature = "hyper", + feature = "ws", + feature = "ipc" + )))] _ => Err(TransportErrorKind::custom_str( "No transports enabled. Enable one of: reqwest, hyper, ws, ipc", )), diff --git a/crates/rpc-client/src/client.rs b/crates/rpc-client/src/client.rs index 370d18ccaa6a..f4596574842d 100644 --- a/crates/rpc-client/src/client.rs +++ b/crates/rpc-client/src/client.rs @@ -218,6 +218,19 @@ impl RpcClientInner { self.transport } + /// Returns a reference to the pubsub frontend if the transport supports it. + #[cfg(feature = "pubsub")] + pub fn pubsub_frontend(&self) -> Option<&alloy_pubsub::PubSubFrontend> + where + T: std::any::Any, + { + let t = self.transport() as &dyn std::any::Any; + t.downcast_ref::().or_else(|| { + t.downcast_ref::() + .and_then(|t| t.as_any().downcast_ref::()) + }) + } + /// Build a `JsonRpcRequest` with the given method and params. /// /// This function reserves an ID for the request, however the request is not sent. diff --git a/crates/rpc-client/src/poller.rs b/crates/rpc-client/src/poller.rs index ad0775b03bc1..0700448bab15 100644 --- a/crates/rpc-client/src/poller.rs +++ b/crates/rpc-client/src/poller.rs @@ -150,52 +150,53 @@ where pub fn spawn(self) -> PollChannel { let (tx, rx) = broadcast::channel(self.channel_size); let span = debug_span!("poller", method = %self.method); - let fut = async move { - let mut params = ParamsOnce::Typed(self.params); - let mut retries = MAX_RETRIES; - 'outer: for _ in 0..self.limit { - let Some(client) = self.client.upgrade() else { - debug!("client dropped"); - break; - }; + self.into_future(tx).instrument(span).spawn_task(); + rx.into() + } - // Avoid serializing the params more than once. - let params = match params.get() { - Ok(p) => p, - Err(err) => { - error!(%err, "failed to serialize params"); - break; - } - }; - - loop { - trace!("polling"); - match client.request(self.method.clone(), params).await { - Ok(resp) => { - if tx.send(resp).is_err() { - debug!("channel closed"); - break 'outer; - } - } - Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => { - debug!(%err, "failed to poll, retrying"); - retries -= 1; - continue; - } - Err(err) => { - error!(%err, "failed to poll"); + async fn into_future(self, tx: broadcast::Sender) { + let mut params = ParamsOnce::Typed(self.params); + let mut retries = MAX_RETRIES; + 'outer: for _ in 0..self.limit { + let Some(client) = self.client.upgrade() else { + debug!("client dropped"); + break; + }; + + // Avoid serializing the params more than once. + let params = match params.get() { + Ok(p) => p, + Err(err) => { + error!(%err, "failed to serialize params"); + break; + } + }; + + loop { + trace!("polling"); + match client.request(self.method.clone(), params).await { + Ok(resp) => { + if tx.send(resp).is_err() { + debug!("channel closed"); break 'outer; } } - break; + Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => { + debug!(%err, "failed to poll, retrying"); + retries -= 1; + continue; + } + Err(err) => { + error!(%err, "failed to poll"); + break 'outer; + } } - - trace!(duration=?self.poll_interval, "sleeping"); - tokio::time::sleep(self.poll_interval).await; + break; } - }; - fut.instrument(span).spawn_task(); - rx.into() + + trace!(duration=?self.poll_interval, "sleeping"); + tokio::time::sleep(self.poll_interval).await; + } } /// Starts the poller and returns the stream of responses. diff --git a/crates/rpc-types-eth/src/block.rs b/crates/rpc-types-eth/src/block.rs index ef47e3f1a42f..17fc57def77b 100644 --- a/crates/rpc-types-eth/src/block.rs +++ b/crates/rpc-types-eth/src/block.rs @@ -365,13 +365,13 @@ pub struct BlockOverrides { serde(default, skip_serializing_if = "Option::is_none", alias = "baseFeePerGas") )] pub base_fee: Option, - /// A dictionary that maps blockNumber to a user-defined hash. It could be queried from the - /// solidity opcode BLOCKHASH. + /// A dictionary that maps blockNumber to a user-defined hash. It can be queried from the + /// EVM opcode BLOCKHASH. #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))] pub block_hash: Option>, } -impl BlockResponse for Block { +impl BlockResponse for Block { type Transaction = T; type Header = H;