From 70dee036c2c59083857ab4efc9b7dba61e2d6715 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Fri, 20 Sep 2024 23:17:29 +0200 Subject: [PATCH 1/8] feat(provider): subscribe to new blocks if possible in heartbeat --- crates/network-primitives/src/traits.rs | 4 +- crates/provider/src/{chain.rs => blocks.rs} | 113 ++++++++++++-------- crates/provider/src/heart.rs | 58 +++++----- crates/provider/src/lib.rs | 2 +- crates/provider/src/provider/root.rs | 59 +++++----- crates/provider/src/provider/trait.rs | 15 +-- crates/pubsub/src/sub.rs | 5 + crates/rpc-client/src/builtin.rs | 21 ++-- crates/rpc-client/src/client.rs | 13 +++ crates/rpc-client/src/poller.rs | 81 +++++++------- crates/rpc-types-eth/src/block.rs | 6 +- 11 files changed, 212 insertions(+), 165 deletions(-) rename crates/provider/src/{chain.rs => blocks.rs} (63%) diff --git a/crates/network-primitives/src/traits.rs b/crates/network-primitives/src/traits.rs index a8ec7e3f8b2..d3e147e0799 100644 --- a/crates/network-primitives/src/traits.rs +++ b/crates/network-primitives/src/traits.rs @@ -132,9 +132,9 @@ pub trait HeaderResponse { /// Block JSON-RPC response. pub trait BlockResponse { /// Header type - type Header; + type Header: HeaderResponse; /// Transaction type - type Transaction; + type Transaction: TransactionResponse; /// Block header fn header(&self) -> &Self::Header; diff --git a/crates/provider/src/chain.rs b/crates/provider/src/blocks.rs similarity index 63% rename from crates/provider/src/chain.rs rename to crates/provider/src/blocks.rs index b69c0f6b700..a1079d06f4d 100644 --- a/crates/provider/src/chain.rs +++ b/crates/provider/src/blocks.rs @@ -1,10 +1,9 @@ use alloy_network::{Ethereum, Network}; use alloy_primitives::{BlockNumber, U64}; use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient}; -use alloy_rpc_types_eth::Block; -use alloy_transport::{RpcError, Transport}; +use alloy_transport::{RpcError, Transport, TransportResult}; use async_stream::stream; -use futures::{Stream, StreamExt}; +use futures::{future::Either, Stream, StreamExt}; use lru::LruCache; use std::{marker::PhantomData, num::NonZeroUsize}; @@ -17,38 +16,48 @@ const MAX_RETRIES: usize = 3; /// Default block number for when we don't have a block yet. const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX; -pub(crate) struct ChainStreamPoller { +/// Streams new blocks from the client. +pub(crate) struct NewBlocks { client: WeakClient, - poll_task: PollerBuilder, next_yield: BlockNumber, - known_blocks: LruCache, + known_blocks: LruCache, _phantom: PhantomData, } -impl ChainStreamPoller { - pub(crate) fn from_weak_client(w: WeakClient) -> Self { - Self::new(w) - } - +impl NewBlocks { pub(crate) fn new(client: WeakClient) -> Self { - Self::with_next_yield(client, NO_BLOCK_NUMBER) - } - - /// Can be used to force the poller to start at a specific block number. - /// Mostly useful for tests. - fn with_next_yield(client: WeakClient, next_yield: BlockNumber) -> Self { Self { client: client.clone(), - poll_task: PollerBuilder::new(client, "eth_blockNumber", []), - next_yield, + next_yield: NO_BLOCK_NUMBER, known_blocks: LruCache::new(BLOCK_CACHE_SIZE), _phantom: PhantomData, } } - pub(crate) fn into_stream(mut self) -> impl Stream + 'static { + pub(crate) async fn into_stream( + self, + ) -> TransportResult + 'static> { + #[cfg(feature = "pubsub")] + if let Some(client) = self.client.upgrade() { + if let Some(pubsub) = client.pubsub_frontend() { + let id = client.request("eth_subscribe", ("newHeads",)).await?; + let sub = pubsub.get_subscription(id).await?; + return Ok(Either::Left(sub.into_typed::().into_stream())); + } + } + + #[cfg(feature = "pubsub")] + let right = Either::Right; + #[cfg(not(feature = "pubsub"))] + let right = std::convert::identity; + Ok(right(self.into_poll_stream())) + } + + fn into_poll_stream(mut self) -> impl Stream + 'static { + let poll_task_builder: PollerBuilder = + PollerBuilder::new(self.client.clone(), "eth_blockNumber", []); + let mut poll_task = poll_task_builder.spawn().into_stream_raw(); stream! { - let mut poll_task = self.poll_task.spawn().into_stream_raw(); 'task: loop { // Clear any buffered blocks. while let Some(known_block) = self.known_blocks.pop(&self.next_yield) { @@ -62,11 +71,11 @@ impl ChainStreamPoller { Some(Ok(block_number)) => block_number, Some(Err(err)) => { // This is fine. - debug!(%err, "polling stream lagged"); + debug!(%err, "block number stream lagged"); continue 'task; } None => { - debug!("polling stream ended"); + debug!("block number stream ended"); break 'task; } }; @@ -125,14 +134,11 @@ impl ChainStreamPoller { #[cfg(all(test, feature = "anvil-api"))] // Tests rely heavily on ability to mine blocks on demand. mod tests { - use std::{future::Future, time::Duration}; - - use crate::{ext::AnvilApi, ProviderBuilder}; + use super::*; + use crate::{ext::AnvilApi, Provider, ProviderBuilder}; use alloy_node_bindings::Anvil; use alloy_primitives::U256; - use alloy_rpc_client::ReqwestClient; - - use super::*; + use std::{future::Future, time::Duration}; fn init_tracing() { let _ = tracing_subscriber::fmt::try_init(); @@ -140,32 +146,48 @@ mod tests { async fn with_timeout(fut: T) -> T::Output { tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Operation timed out"), + _ = tokio::time::sleep(Duration::from_secs(2)) => panic!("Operation timed out"), out = fut => out, } } #[tokio::test] - async fn yield_block() { + async fn yield_block_http() { + yield_block(false).await; + } + #[tokio::test] + #[cfg(feature = "ws")] + async fn yield_block_ws() { + yield_block(true).await; + } + async fn yield_block(ws: bool) { init_tracing(); let anvil = Anvil::new().spawn(); - let client = ReqwestClient::new_http(anvil.endpoint_url()); - let poller: ChainStreamPoller<_, Ethereum> = - ChainStreamPoller::with_next_yield(client.get_weak(), 1); - let mut stream = Box::pin(poller.into_stream()); + let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() }; + let provider = ProviderBuilder::new().on_builtin(&url).await.unwrap(); + + let poller: NewBlocks<_, Ethereum> = NewBlocks::new(provider.weak_client()); + let mut stream = Box::pin(poller.into_stream().await.unwrap()); // We will also use provider to manipulate anvil instance via RPC. - let provider = ProviderBuilder::new().on_http(anvil.endpoint_url()); provider.anvil_mine(Some(U256::from(1)), None).await.unwrap(); let block = with_timeout(stream.next()).await.expect("Block wasn't fetched"); - assert_eq!(block.header.number, 1); + assert!(block.header.number <= 1); } #[tokio::test] - async fn yield_many_blocks() { + async fn yield_many_blocks_http() { + yield_many_blocks(false).await; + } + #[tokio::test] + #[cfg(feature = "ws")] + async fn yield_many_blocks_ws() { + yield_many_blocks(true).await; + } + async fn yield_many_blocks(ws: bool) { // Make sure that we can process more blocks than fits in the cache. const BLOCKS_TO_MINE: usize = BLOCK_CACHE_SIZE.get() + 1; @@ -173,16 +195,21 @@ mod tests { let anvil = Anvil::new().spawn(); - let client = ReqwestClient::new_http(anvil.endpoint_url()); - let poller: ChainStreamPoller<_, Ethereum> = - ChainStreamPoller::with_next_yield(client.get_weak(), 1); - let stream = Box::pin(poller.into_stream()); + let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() }; + let provider = ProviderBuilder::new().on_builtin(&url).await.unwrap(); + + let poller: NewBlocks<_, Ethereum> = NewBlocks::new(provider.weak_client()); + let stream = Box::pin(poller.into_stream().await.unwrap()); // We will also use provider to manipulate anvil instance via RPC. - let provider = ProviderBuilder::new().on_http(anvil.endpoint_url()); provider.anvil_mine(Some(U256::from(BLOCKS_TO_MINE)), None).await.unwrap(); let blocks = with_timeout(stream.take(BLOCKS_TO_MINE).collect::>()).await; assert_eq!(blocks.len(), BLOCKS_TO_MINE); + let first = blocks[0].header.number; + assert!(first <= 1); + for (i, block) in blocks.iter().enumerate() { + assert_eq!(block.header.number, first + i as u64); + } } } diff --git a/crates/provider/src/heart.rs b/crates/provider/src/heart.rs index 247c7201c4e..421a08f2e9a 100644 --- a/crates/provider/src/heart.rs +++ b/crates/provider/src/heart.rs @@ -2,9 +2,8 @@ use crate::{Provider, RootProvider}; use alloy_json_rpc::RpcError; -use alloy_network::Network; +use alloy_network::{BlockResponse, HeaderResponse, Network}; use alloy_primitives::{TxHash, B256}; -use alloy_rpc_types_eth::Block; use alloy_transport::{utils::Spawnable, Transport, TransportError}; use futures::{stream::StreamExt, FutureExt, Stream}; use std::{ @@ -74,7 +73,7 @@ pub enum PendingTransactionError { #[must_use = "this type does nothing unless you call `register`, `watch` or `get_receipt`"] #[derive(Debug)] #[doc(alias = "PendingTxBuilder")] -pub struct PendingTransactionBuilder<'a, T, N> { +pub struct PendingTransactionBuilder<'a, T, N: Network> { config: PendingTransactionConfig, provider: &'a RootProvider, } @@ -400,12 +399,12 @@ impl Future for PendingTransaction { /// A handle to the heartbeat task. #[derive(Clone, Debug)] -pub(crate) struct HeartbeatHandle { +pub(crate) struct HeartbeatHandle { tx: mpsc::Sender, - latest: watch::Receiver>, + latest: watch::Receiver>, } -impl HeartbeatHandle { +impl HeartbeatHandle { /// Watch for a transaction to be confirmed with the given config. #[doc(alias = "watch_transaction")] pub(crate) async fn watch_tx( @@ -423,14 +422,14 @@ impl HeartbeatHandle { /// Returns a watcher that always sees the latest block. #[allow(dead_code)] - pub(crate) const fn latest(&self) -> &watch::Receiver> { + pub(crate) const fn latest(&self) -> &watch::Receiver> { &self.latest } } // TODO: Parameterize with `Network` /// A heartbeat task that receives blocks and watches for transactions. -pub(crate) struct Heartbeat { +pub(crate) struct Heartbeat { /// The stream of incoming blocks to watch. stream: futures::stream::Fuse, @@ -445,9 +444,11 @@ pub(crate) struct Heartbeat { /// Ordered map of transactions to reap at a certain time. reap_at: BTreeMap, + + _network: std::marker::PhantomData, } -impl + Unpin + 'static> Heartbeat { +impl + Unpin + 'static> Heartbeat { /// Create a new heartbeat task. pub(crate) fn new(stream: S) -> Self { Self { @@ -456,11 +457,10 @@ impl + Unpin + 'static> Heartbeat { unconfirmed: Default::default(), waiting_confs: Default::default(), reap_at: Default::default(), + _network: Default::default(), } } -} -impl Heartbeat { /// Check if any transactions have enough confirmations to notify. fn check_confirmations(&mut self, current_height: u64) { let to_keep = self.waiting_confs.split_off(&(current_height + 1)); @@ -561,9 +561,13 @@ impl Heartbeat { /// Handle a new block by checking if any of the transactions we're /// watching are in it, and if so, notifying the watcher. Also updates /// the latest block. - fn handle_new_block(&mut self, block: Block, latest: &watch::Sender>) { + fn handle_new_block( + &mut self, + block: N::BlockResponse, + latest: &watch::Sender>, + ) { // Blocks without numbers are ignored, as they're not part of the chain. - let block_height = &block.header.number; + let block_height = block.header().number(); // Add the block the lookbehind. // The value is chosen arbitrarily to not have a huge memory footprint but still @@ -577,19 +581,19 @@ impl Heartbeat { } if let Some((last_height, _)) = self.past_blocks.back().as_ref() { // Check that the chain is continuous. - if *last_height + 1 != *block_height { + if *last_height + 1 != block_height { // Move all the transactions that were reset by the reorg to the unconfirmed list. warn!(%block_height, last_height, "reorg detected"); - self.move_reorg_to_unconfirmed(*block_height); + self.move_reorg_to_unconfirmed(block_height); // Remove past blocks that are now invalid. - self.past_blocks.retain(|(h, _)| h < block_height); + self.past_blocks.retain(|(h, _)| *h < block_height); } } - self.past_blocks.push_back((*block_height, block.transactions.hashes().collect())); + self.past_blocks.push_back((block_height, block.transactions().hashes().collect())); // Check if we are watching for any of the transactions in this block. let to_check: Vec<_> = block - .transactions + .transactions() .hashes() .filter_map(|tx_hash| self.unconfirmed.remove(&tx_hash)) .collect(); @@ -607,12 +611,12 @@ impl Heartbeat { warn!(tx=%watcher.config.tx_hash, set_block=%set_block, new_block=%block_height, "received_at_block already set"); // We don't override the set value. } else { - watcher.received_at_block = Some(*block_height); + watcher.received_at_block = Some(block_height); } - self.add_to_waiting_list(watcher, *block_height); + self.add_to_waiting_list(watcher, block_height); } - self.check_confirmations(*block_height); + self.check_confirmations(block_height); // Update the latest block. We use `send_replace` here to ensure the // latest block is always up to date, even if no receivers exist. @@ -623,7 +627,7 @@ impl Heartbeat { } #[cfg(target_arch = "wasm32")] -impl + Unpin + 'static> Heartbeat { +impl + Unpin + 'static> Heartbeat { /// Spawn the heartbeat task, returning a [`HeartbeatHandle`]. pub(crate) fn spawn(self) -> HeartbeatHandle { let (latest, latest_rx) = watch::channel(None::); @@ -636,10 +640,10 @@ impl + Unpin + 'static> Heartbeat { } #[cfg(not(target_arch = "wasm32"))] -impl + Unpin + Send + 'static> Heartbeat { +impl + Unpin + Send + 'static> Heartbeat { /// Spawn the heartbeat task, returning a [`HeartbeatHandle`]. - pub(crate) fn spawn(self) -> HeartbeatHandle { - let (latest, latest_rx) = watch::channel(None::); + pub(crate) fn spawn(self) -> HeartbeatHandle { + let (latest, latest_rx) = watch::channel(None::); let (ix_tx, ixns) = mpsc::channel(16); self.into_future(latest, ixns).spawn_task(); @@ -648,10 +652,10 @@ impl + Unpin + Send + 'static> Heartbeat { } } -impl + Unpin + 'static> Heartbeat { +impl + Unpin + 'static> Heartbeat { async fn into_future( mut self, - latest: watch::Sender>, + latest: watch::Sender>, mut ixns: mpsc::Receiver, ) { 'shutdown: loop { diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs index 9105f58e530..dd2c09c3d51 100644 --- a/crates/provider/src/lib.rs +++ b/crates/provider/src/lib.rs @@ -28,7 +28,7 @@ extern crate tracing; mod builder; pub use builder::{Identity, ProviderBuilder, ProviderLayer, Stack}; -mod chain; +mod blocks; pub mod ext; diff --git a/crates/provider/src/provider/root.rs b/crates/provider/src/provider/root.rs index a47c8f28824..be9f6ff7edc 100644 --- a/crates/provider/src/provider/root.rs +++ b/crates/provider/src/provider/root.rs @@ -1,16 +1,14 @@ use crate::{ - chain::ChainStreamPoller, + blocks::NewBlocks, heart::{Heartbeat, HeartbeatHandle}, Identity, ProviderBuilder, }; use alloy_network::{Ethereum, Network}; use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, ClientRef, RpcClient, WeakClient}; -use alloy_transport::{BoxTransport, BoxTransportConnect, Transport, TransportError}; -use std::{ - fmt, - marker::PhantomData, - sync::{Arc, OnceLock}, +use alloy_transport::{ + BoxTransport, BoxTransportConnect, Transport, TransportError, TransportResult, }; +use std::{fmt, marker::PhantomData, sync::Arc}; #[cfg(feature = "reqwest")] use alloy_transport_http::Http; @@ -20,18 +18,18 @@ use alloy_pubsub::{PubSubFrontend, Subscription}; /// The root provider manages the RPC client and the heartbeat. It is at the /// base of every provider stack. -pub struct RootProvider { +pub struct RootProvider { /// The inner state of the root provider. pub(crate) inner: Arc>, } -impl Clone for RootProvider { +impl Clone for RootProvider { fn clone(&self) -> Self { Self { inner: self.inner.clone() } } } -impl fmt::Debug for RootProvider { +impl fmt::Debug for RootProvider { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("RootProvider").field("client", &self.inner.client).finish_non_exhaustive() } @@ -100,48 +98,43 @@ impl RootProvider { #[cfg(feature = "pubsub")] pub(crate) fn pubsub_frontend(&self) -> alloy_transport::TransportResult<&PubSubFrontend> { - let t = self.transport() as &dyn std::any::Any; - t.downcast_ref::() - .or_else(|| { - t.downcast_ref::() - .and_then(|t| t.as_any().downcast_ref::()) - }) + self.inner + .client_ref() + .pubsub_frontend() .ok_or_else(alloy_transport::TransportErrorKind::pubsub_unavailable) } - #[cfg(feature = "pubsub")] - fn transport(&self) -> &T { - self.inner.client.transport() - } - #[inline] - pub(crate) fn get_heart(&self) -> &HeartbeatHandle { - self.inner.heart.get_or_init(|| { - let poller: ChainStreamPoller = - ChainStreamPoller::from_weak_client(self.inner.weak_client()); - // TODO: Can we avoid `Box::pin` here? - Heartbeat::new(Box::pin(poller.into_stream())).spawn() - }) + pub(crate) async fn get_heart(&self) -> TransportResult<&HeartbeatHandle> { + self.inner + .heart + .get_or_try_init(|| async { + let new_blocks = NewBlocks::::new(self.inner.weak_client()); + let stream = new_blocks.into_stream().await?; + // TODO: Can we avoid `Box::pin` here? + Ok(Heartbeat::new(Box::pin(stream)).spawn()) + }) + .await } } /// The root provider manages the RPC client and the heartbeat. It is at the /// base of every provider stack. -pub(crate) struct RootProviderInner { +pub(crate) struct RootProviderInner { client: RpcClient, - heart: OnceLock, + heart: tokio::sync::OnceCell>, _network: PhantomData, } -impl Clone for RootProviderInner { +impl Clone for RootProviderInner { fn clone(&self) -> Self { Self { client: self.client.clone(), heart: self.heart.clone(), _network: PhantomData } } } impl RootProviderInner { - pub(crate) const fn new(client: RpcClient) -> Self { - Self { client, heart: OnceLock::new(), _network: PhantomData } + pub(crate) fn new(client: RpcClient) -> Self { + Self { client, heart: Default::default(), _network: PhantomData } } pub(crate) fn weak_client(&self) -> WeakClient { @@ -153,7 +146,7 @@ impl RootProviderInner { } } -impl RootProviderInner { +impl RootProviderInner { fn boxed(self) -> RootProviderInner { RootProviderInner { client: self.client.boxed(), heart: self.heart, _network: PhantomData } } diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index 003569c9164..a7372b8478a 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -119,7 +119,7 @@ pub trait Provider: fn get_block_number(&self) -> ProviderCall { self.client() .request_noparams("eth_blockNumber") - .map_resp(crate::utils::convert_u64 as fn(U64) -> u64) + .map_resp(utils::convert_u64 as fn(U64) -> u64) .into() } @@ -171,7 +171,7 @@ pub trait Provider: fn get_chain_id(&self) -> ProviderCall { self.client() .request_noparams("eth_chainId") - .map_resp(crate::utils::convert_u64 as fn(U64) -> u64) + .map_resp(utils::convert_u64 as fn(U64) -> u64) .into() } @@ -199,7 +199,7 @@ pub trait Provider: &self, tx: &'req N::TransactionRequest, ) -> EthCall<'req, T, N, U128, u128> { - EthCall::gas_estimate(self.weak_client(), tx).map_resp(crate::utils::convert_u128) + EthCall::gas_estimate(self.weak_client(), tx).map_resp(utils::convert_u128) } /// Estimates the EIP1559 `maxFeePerGas` and `maxPriorityFeePerGas` fields. @@ -257,7 +257,7 @@ pub trait Provider: fn get_gas_price(&self) -> ProviderCall { self.client() .request_noparams("eth_gasPrice") - .map_resp(crate::utils::convert_u128 as fn(U128) -> u128) + .map_resp(utils::convert_u128 as fn(U128) -> u128) .into() } @@ -560,7 +560,7 @@ pub trait Provider: ) -> RpcWithBlock u64> { self.client() .request("eth_getTransactionCount", address) - .map_resp(crate::utils::convert_u64 as fn(U64) -> u64) + .map_resp(utils::convert_u64 as fn(U64) -> u64) .into() } @@ -708,7 +708,7 @@ pub trait Provider: ) -> TransportResult> { // Make sure to initialize heartbeat before we submit transaction, so that // we don't miss it if user will subscriber to it immediately after sending. - let _handle = self.root().get_heart(); + let _handle = self.root().get_heart().await?; match tx { SendableTx::Builder(mut tx) => { @@ -912,7 +912,7 @@ pub trait Provider: fn get_net_version(&self) -> ProviderCall { self.client() .request_noparams("net_version") - .map_resp(crate::utils::convert_u64 as fn(U64) -> u64) + .map_resp(utils::convert_u64 as fn(U64) -> u64) .into() } @@ -1023,6 +1023,7 @@ impl Provider for RootProvider { }; self.get_heart() + .await? .watch_tx(config, block_number) .await .map_err(|_| PendingTransactionError::FailedToRegister) diff --git a/crates/pubsub/src/sub.rs b/crates/pubsub/src/sub.rs index 342bbb5fbd3..475e3ac121d 100644 --- a/crates/pubsub/src/sub.rs +++ b/crates/pubsub/src/sub.rs @@ -80,6 +80,11 @@ impl RawSubscription { pub fn into_stream(self) -> BroadcastStream> { self.rx.into() } + + /// Convert into a typed subscription. + pub fn into_typed(self) -> Subscription { + self.into() + } } /// An item in a typed [`Subscription`]. This is either the expected type, or diff --git a/crates/rpc-client/src/builtin.rs b/crates/rpc-client/src/builtin.rs index 6b95963a090..911e115ef86 100644 --- a/crates/rpc-client/src/builtin.rs +++ b/crates/rpc-client/src/builtin.rs @@ -62,17 +62,15 @@ impl BuiltInConnectionString { // reqwest is enabled, hyper is not #[cfg(all(not(feature = "hyper"), feature = "reqwest"))] Self::Http(url) => { - Ok( - alloy_transport::Transport::boxed( - alloy_transport_http::Http::::new(url.clone()) - ) - ) - }, + Ok(alloy_transport::Transport::boxed( + alloy_transport_http::Http::::new(url.clone()), + )) + } // hyper is enabled, reqwest is not #[cfg(feature = "hyper")] - Self::Http(_) => Err(TransportErrorKind::custom_str( - "hyper not supported by BuiltinConnectionString. Please instantiate a hyper client manually", + Self::Http(url) => Ok(alloy_transport::Transport::boxed( + alloy_transport_http::HyperTransport::new_hyper(url.clone()), )), #[cfg(all(not(target_arch = "wasm32"), feature = "ws"))] @@ -95,7 +93,12 @@ impl BuiltInConnectionString { .await .map(alloy_transport::Transport::boxed), - #[cfg(not(any(feature = "reqwest", feature = "hyper", feature = "ws", feature = "ipc")))] + #[cfg(not(any( + feature = "reqwest", + feature = "hyper", + feature = "ws", + feature = "ipc" + )))] _ => Err(TransportErrorKind::custom_str( "No transports enabled. Enable one of: reqwest, hyper, ws, ipc", )), diff --git a/crates/rpc-client/src/client.rs b/crates/rpc-client/src/client.rs index 370d18ccaa6..f4596574842 100644 --- a/crates/rpc-client/src/client.rs +++ b/crates/rpc-client/src/client.rs @@ -218,6 +218,19 @@ impl RpcClientInner { self.transport } + /// Returns a reference to the pubsub frontend if the transport supports it. + #[cfg(feature = "pubsub")] + pub fn pubsub_frontend(&self) -> Option<&alloy_pubsub::PubSubFrontend> + where + T: std::any::Any, + { + let t = self.transport() as &dyn std::any::Any; + t.downcast_ref::().or_else(|| { + t.downcast_ref::() + .and_then(|t| t.as_any().downcast_ref::()) + }) + } + /// Build a `JsonRpcRequest` with the given method and params. /// /// This function reserves an ID for the request, however the request is not sent. diff --git a/crates/rpc-client/src/poller.rs b/crates/rpc-client/src/poller.rs index ad0775b03bc..0700448bab1 100644 --- a/crates/rpc-client/src/poller.rs +++ b/crates/rpc-client/src/poller.rs @@ -150,52 +150,53 @@ where pub fn spawn(self) -> PollChannel { let (tx, rx) = broadcast::channel(self.channel_size); let span = debug_span!("poller", method = %self.method); - let fut = async move { - let mut params = ParamsOnce::Typed(self.params); - let mut retries = MAX_RETRIES; - 'outer: for _ in 0..self.limit { - let Some(client) = self.client.upgrade() else { - debug!("client dropped"); - break; - }; + self.into_future(tx).instrument(span).spawn_task(); + rx.into() + } - // Avoid serializing the params more than once. - let params = match params.get() { - Ok(p) => p, - Err(err) => { - error!(%err, "failed to serialize params"); - break; - } - }; - - loop { - trace!("polling"); - match client.request(self.method.clone(), params).await { - Ok(resp) => { - if tx.send(resp).is_err() { - debug!("channel closed"); - break 'outer; - } - } - Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => { - debug!(%err, "failed to poll, retrying"); - retries -= 1; - continue; - } - Err(err) => { - error!(%err, "failed to poll"); + async fn into_future(self, tx: broadcast::Sender) { + let mut params = ParamsOnce::Typed(self.params); + let mut retries = MAX_RETRIES; + 'outer: for _ in 0..self.limit { + let Some(client) = self.client.upgrade() else { + debug!("client dropped"); + break; + }; + + // Avoid serializing the params more than once. + let params = match params.get() { + Ok(p) => p, + Err(err) => { + error!(%err, "failed to serialize params"); + break; + } + }; + + loop { + trace!("polling"); + match client.request(self.method.clone(), params).await { + Ok(resp) => { + if tx.send(resp).is_err() { + debug!("channel closed"); break 'outer; } } - break; + Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => { + debug!(%err, "failed to poll, retrying"); + retries -= 1; + continue; + } + Err(err) => { + error!(%err, "failed to poll"); + break 'outer; + } } - - trace!(duration=?self.poll_interval, "sleeping"); - tokio::time::sleep(self.poll_interval).await; + break; } - }; - fut.instrument(span).spawn_task(); - rx.into() + + trace!(duration=?self.poll_interval, "sleeping"); + tokio::time::sleep(self.poll_interval).await; + } } /// Starts the poller and returns the stream of responses. diff --git a/crates/rpc-types-eth/src/block.rs b/crates/rpc-types-eth/src/block.rs index ef47e3f1a42..17fc57def77 100644 --- a/crates/rpc-types-eth/src/block.rs +++ b/crates/rpc-types-eth/src/block.rs @@ -365,13 +365,13 @@ pub struct BlockOverrides { serde(default, skip_serializing_if = "Option::is_none", alias = "baseFeePerGas") )] pub base_fee: Option, - /// A dictionary that maps blockNumber to a user-defined hash. It could be queried from the - /// solidity opcode BLOCKHASH. + /// A dictionary that maps blockNumber to a user-defined hash. It can be queried from the + /// EVM opcode BLOCKHASH. #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))] pub block_hash: Option>, } -impl BlockResponse for Block { +impl BlockResponse for Block { type Transaction = T; type Header = H; From cd27443ffb3d48bbd202d9350fd65504ffc5bec8 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Fri, 20 Sep 2024 23:35:48 +0200 Subject: [PATCH 2/8] msg --- crates/provider/src/blocks.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/provider/src/blocks.rs b/crates/provider/src/blocks.rs index a1079d06f4d..8abab52d0c0 100644 --- a/crates/provider/src/blocks.rs +++ b/crates/provider/src/blocks.rs @@ -71,11 +71,11 @@ impl NewBlocks { Some(Ok(block_number)) => block_number, Some(Err(err)) => { // This is fine. - debug!(%err, "block number stream lagged"); + debug!(%err, "polling stream lagged"); continue 'task; } None => { - debug!("block number stream ended"); + debug!("polling stream ended"); break 'task; } }; From d28028c5d2894a2238d93702cd85441c9372ea64 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Fri, 20 Sep 2024 23:38:16 +0200 Subject: [PATCH 3/8] wasm --- crates/provider/src/heart.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/provider/src/heart.rs b/crates/provider/src/heart.rs index 421a08f2e9a..c75df935778 100644 --- a/crates/provider/src/heart.rs +++ b/crates/provider/src/heart.rs @@ -630,7 +630,7 @@ impl + Unpin + 'static> Heartbeat impl + Unpin + 'static> Heartbeat { /// Spawn the heartbeat task, returning a [`HeartbeatHandle`]. pub(crate) fn spawn(self) -> HeartbeatHandle { - let (latest, latest_rx) = watch::channel(None::); + let (latest, latest_rx) = watch::channel(None::); let (ix_tx, ixns) = mpsc::channel(16); self.into_future(latest, ixns).spawn_task(); From ed107718b2829334d4f3b9d6be3178314b51f141 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Fri, 20 Sep 2024 23:44:07 +0200 Subject: [PATCH 4/8] fix --- crates/provider/src/blocks.rs | 7 +++++-- crates/provider/src/heart.rs | 26 +++++++++++++------------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/crates/provider/src/blocks.rs b/crates/provider/src/blocks.rs index 8abab52d0c0..c35a55e8ef4 100644 --- a/crates/provider/src/blocks.rs +++ b/crates/provider/src/blocks.rs @@ -3,10 +3,13 @@ use alloy_primitives::{BlockNumber, U64}; use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient}; use alloy_transport::{RpcError, Transport, TransportResult}; use async_stream::stream; -use futures::{future::Either, Stream, StreamExt}; +use futures::{Stream, StreamExt}; use lru::LruCache; use std::{marker::PhantomData, num::NonZeroUsize}; +#[cfg(feature = "pubsub")] +use futures::future::Either; + /// The size of the block cache. const BLOCK_CACHE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(10) }; @@ -27,7 +30,7 @@ pub(crate) struct NewBlocks { impl NewBlocks { pub(crate) fn new(client: WeakClient) -> Self { Self { - client: client.clone(), + client, next_yield: NO_BLOCK_NUMBER, known_blocks: LruCache::new(BLOCK_CACHE_SIZE), _phantom: PhantomData, diff --git a/crates/provider/src/heart.rs b/crates/provider/src/heart.rs index c75df935778..181fc2e7e05 100644 --- a/crates/provider/src/heart.rs +++ b/crates/provider/src/heart.rs @@ -629,13 +629,10 @@ impl + Unpin + 'static> Heartbeat #[cfg(target_arch = "wasm32")] impl + Unpin + 'static> Heartbeat { /// Spawn the heartbeat task, returning a [`HeartbeatHandle`]. - pub(crate) fn spawn(self) -> HeartbeatHandle { - let (latest, latest_rx) = watch::channel(None::); - let (ix_tx, ixns) = mpsc::channel(16); - - self.into_future(latest, ixns).spawn_task(); - - HeartbeatHandle { tx: ix_tx, latest: latest_rx } + pub(crate) fn spawn(self) -> HeartbeatHandle { + let (task, handle) = self.consume(); + task.spawn_task(); + handle } } @@ -643,16 +640,19 @@ impl + Unpin + 'static> Heartbeat impl + Unpin + Send + 'static> Heartbeat { /// Spawn the heartbeat task, returning a [`HeartbeatHandle`]. pub(crate) fn spawn(self) -> HeartbeatHandle { - let (latest, latest_rx) = watch::channel(None::); - let (ix_tx, ixns) = mpsc::channel(16); - - self.into_future(latest, ixns).spawn_task(); - - HeartbeatHandle { tx: ix_tx, latest: latest_rx } + let (task, handle) = self.consume(); + task.spawn_task(); + handle } } impl + Unpin + 'static> Heartbeat { + fn consume(self) -> (impl Future, HeartbeatHandle) { + let (latest, latest_rx) = watch::channel(None::); + let (ix_tx, ixns) = mpsc::channel(16); + (self.into_future(latest, ixns), HeartbeatHandle { tx: ix_tx, latest: latest_rx }) + } + async fn into_future( mut self, latest: watch::Sender>, From 06cca3a855ffe565f0e44e4d5f92498853ec7fc6 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Fri, 20 Sep 2024 23:55:54 +0200 Subject: [PATCH 5/8] move into stream --- crates/provider/src/blocks.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/provider/src/blocks.rs b/crates/provider/src/blocks.rs index c35a55e8ef4..743fdd6f47e 100644 --- a/crates/provider/src/blocks.rs +++ b/crates/provider/src/blocks.rs @@ -57,10 +57,10 @@ impl NewBlocks { } fn into_poll_stream(mut self) -> impl Stream + 'static { + stream! { let poll_task_builder: PollerBuilder = PollerBuilder::new(self.client.clone(), "eth_blockNumber", []); let mut poll_task = poll_task_builder.spawn().into_stream_raw(); - stream! { 'task: loop { // Clear any buffered blocks. while let Some(known_block) = self.known_blocks.pop(&self.next_yield) { From bcfdaf18e84f1a0802ee25a69756c65be4037658 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Sat, 21 Sep 2024 20:02:49 +0200 Subject: [PATCH 6/8] feat: lazily subscribe to newHeads --- crates/provider/src/blocks.rs | 76 +++++++++++++++++++++------- crates/provider/src/provider/root.rs | 1 - 2 files changed, 58 insertions(+), 19 deletions(-) diff --git a/crates/provider/src/blocks.rs b/crates/provider/src/blocks.rs index 743fdd6f47e..279d0919424 100644 --- a/crates/provider/src/blocks.rs +++ b/crates/provider/src/blocks.rs @@ -3,7 +3,7 @@ use alloy_primitives::{BlockNumber, U64}; use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient}; use alloy_transport::{RpcError, Transport, TransportResult}; use async_stream::stream; -use futures::{Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt}; use lru::LruCache; use std::{marker::PhantomData, num::NonZeroUsize}; @@ -22,7 +22,11 @@ const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX; /// Streams new blocks from the client. pub(crate) struct NewBlocks { client: WeakClient, + /// The next block to yield. + /// [`NO_BLOCK_NUMBER`] indicates that it will be updated on the first poll. + /// Only used by the polling task. next_yield: BlockNumber, + /// LRU cache of known blocks. Only used by the polling task. known_blocks: LruCache, _phantom: PhantomData, } @@ -37,18 +41,27 @@ impl NewBlocks { } } + #[cfg(test)] + fn with_next_yield(mut self, next_yield: u64) -> Self { + self.next_yield = next_yield; + self + } + pub(crate) async fn into_stream( self, ) -> TransportResult + 'static> { + // Return a stream that lazily subscribes to `newHeads` on the first poll. #[cfg(feature = "pubsub")] if let Some(client) = self.client.upgrade() { - if let Some(pubsub) = client.pubsub_frontend() { - let id = client.request("eth_subscribe", ("newHeads",)).await?; - let sub = pubsub.get_subscription(id).await?; - return Ok(Either::Left(sub.into_typed::().into_stream())); + if client.pubsub_frontend().is_some() { + let subscriber = self.into_subscription_stream().map(futures::stream::iter); + let subscriber = futures::stream::once(subscriber); + return Ok(Either::Left(subscriber.flatten().flatten())); } } + // Returns a stream that lazily initializes an `eth_blockNumber` polling task on the first + // poll, mapped with `eth_getBlockByNumber`. #[cfg(feature = "pubsub")] let right = Either::Right; #[cfg(not(feature = "pubsub"))] @@ -56,8 +69,37 @@ impl NewBlocks { Ok(right(self.into_poll_stream())) } + async fn into_subscription_stream( + self, + ) -> Option + 'static> { + let Some(client) = self.client.upgrade() else { + debug!("client dropped"); + return None; + }; + let Some(pubsub) = client.pubsub_frontend() else { + error!("pubsub_frontend returned None after being Some"); + return None; + }; + let id = match client.request("eth_subscribe", ("newHeads",)).await { + Ok(id) => id, + Err(err) => { + error!(%err, "failed to subscribe to newHeads"); + return None; + } + }; + let sub = match pubsub.get_subscription(id).await { + Ok(sub) => sub, + Err(err) => { + error!(%err, "failed to get subscription"); + return None; + } + }; + Some(sub.into_typed::().into_stream()) + } + fn into_poll_stream(mut self) -> impl Stream + 'static { stream! { + // Spawned lazily on the first `poll`. let poll_task_builder: PollerBuilder = PollerBuilder::new(self.client.clone(), "eth_blockNumber", []); let mut poll_task = poll_task_builder.spawn().into_stream_raw(); @@ -83,6 +125,7 @@ impl NewBlocks { } }; let block_number = block_number.to::(); + trace!(%block_number, "got block number"); if self.next_yield == NO_BLOCK_NUMBER { assert!(block_number < NO_BLOCK_NUMBER, "too many blocks"); self.next_yield = block_number; @@ -147,11 +190,8 @@ mod tests { let _ = tracing_subscriber::fmt::try_init(); } - async fn with_timeout(fut: T) -> T::Output { - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(2)) => panic!("Operation timed out"), - out = fut => out, - } + async fn timeout(future: T) -> T::Output { + tokio::time::timeout(Duration::from_secs(1), future).await.expect("timed out") } #[tokio::test] @@ -171,14 +211,14 @@ mod tests { let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() }; let provider = ProviderBuilder::new().on_builtin(&url).await.unwrap(); - let poller: NewBlocks<_, Ethereum> = NewBlocks::new(provider.weak_client()); - let mut stream = Box::pin(poller.into_stream().await.unwrap()); + let new_blocks = NewBlocks::<_, Ethereum>::new(provider.weak_client()).with_next_yield(1); + let mut stream = Box::pin(new_blocks.into_stream().await.unwrap()); // We will also use provider to manipulate anvil instance via RPC. provider.anvil_mine(Some(U256::from(1)), None).await.unwrap(); - let block = with_timeout(stream.next()).await.expect("Block wasn't fetched"); - assert!(block.header.number <= 1); + let block = timeout(stream.next()).await.expect("Block wasn't fetched"); + assert_eq!(block.header.number, 1); } #[tokio::test] @@ -201,16 +241,16 @@ mod tests { let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() }; let provider = ProviderBuilder::new().on_builtin(&url).await.unwrap(); - let poller: NewBlocks<_, Ethereum> = NewBlocks::new(provider.weak_client()); - let stream = Box::pin(poller.into_stream().await.unwrap()); + let new_blocks = NewBlocks::<_, Ethereum>::new(provider.weak_client()).with_next_yield(1); + let stream = Box::pin(new_blocks.into_stream().await.unwrap()); // We will also use provider to manipulate anvil instance via RPC. provider.anvil_mine(Some(U256::from(BLOCKS_TO_MINE)), None).await.unwrap(); - let blocks = with_timeout(stream.take(BLOCKS_TO_MINE).collect::>()).await; + let blocks = timeout(stream.take(BLOCKS_TO_MINE).collect::>()).await; assert_eq!(blocks.len(), BLOCKS_TO_MINE); let first = blocks[0].header.number; - assert!(first <= 1); + assert_eq!(first, 1); for (i, block) in blocks.iter().enumerate() { assert_eq!(block.header.number, first + i as u64); } diff --git a/crates/provider/src/provider/root.rs b/crates/provider/src/provider/root.rs index be9f6ff7edc..f2cccbd20cf 100644 --- a/crates/provider/src/provider/root.rs +++ b/crates/provider/src/provider/root.rs @@ -111,7 +111,6 @@ impl RootProvider { .get_or_try_init(|| async { let new_blocks = NewBlocks::::new(self.inner.weak_client()); let stream = new_blocks.into_stream().await?; - // TODO: Can we avoid `Box::pin` here? Ok(Heartbeat::new(Box::pin(stream)).spawn()) }) .await From c79fc2f02985efa82a911d2e4c6e8c8191d1ce29 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Sat, 21 Sep 2024 20:20:21 +0200 Subject: [PATCH 7/8] chore: remove async from get_heart --- crates/provider/src/blocks.rs | 33 +++++++++++++++++---------- crates/provider/src/provider/root.rs | 25 ++++++++++---------- crates/provider/src/provider/trait.rs | 3 +-- 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/crates/provider/src/blocks.rs b/crates/provider/src/blocks.rs index 279d0919424..a6ddede0e7d 100644 --- a/crates/provider/src/blocks.rs +++ b/crates/provider/src/blocks.rs @@ -1,14 +1,14 @@ use alloy_network::{Ethereum, Network}; use alloy_primitives::{BlockNumber, U64}; use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient}; -use alloy_transport::{RpcError, Transport, TransportResult}; +use alloy_transport::{RpcError, Transport}; use async_stream::stream; -use futures::{FutureExt, Stream, StreamExt}; +use futures::{Stream, StreamExt}; use lru::LruCache; use std::{marker::PhantomData, num::NonZeroUsize}; #[cfg(feature = "pubsub")] -use futures::future::Either; +use futures::{future::Either, FutureExt}; /// The size of the block cache. const BLOCK_CACHE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(10) }; @@ -42,21 +42,19 @@ impl NewBlocks { } #[cfg(test)] - fn with_next_yield(mut self, next_yield: u64) -> Self { + const fn with_next_yield(mut self, next_yield: u64) -> Self { self.next_yield = next_yield; self } - pub(crate) async fn into_stream( - self, - ) -> TransportResult + 'static> { + pub(crate) fn into_stream(self) -> impl Stream + 'static { // Return a stream that lazily subscribes to `newHeads` on the first poll. #[cfg(feature = "pubsub")] if let Some(client) = self.client.upgrade() { if client.pubsub_frontend().is_some() { let subscriber = self.into_subscription_stream().map(futures::stream::iter); let subscriber = futures::stream::once(subscriber); - return Ok(Either::Left(subscriber.flatten().flatten())); + return Either::Left(subscriber.flatten().flatten()); } } @@ -66,9 +64,10 @@ impl NewBlocks { let right = Either::Right; #[cfg(not(feature = "pubsub"))] let right = std::convert::identity; - Ok(right(self.into_poll_stream())) + right(self.into_poll_stream()) } + #[cfg(feature = "pubsub")] async fn into_subscription_stream( self, ) -> Option + 'static> { @@ -191,7 +190,11 @@ mod tests { } async fn timeout(future: T) -> T::Output { - tokio::time::timeout(Duration::from_secs(1), future).await.expect("timed out") + try_timeout(future).await.expect("Timeout") + } + + async fn try_timeout(future: T) -> Option { + tokio::time::timeout(Duration::from_secs(2), future).await.ok() } #[tokio::test] @@ -212,7 +215,10 @@ mod tests { let provider = ProviderBuilder::new().on_builtin(&url).await.unwrap(); let new_blocks = NewBlocks::<_, Ethereum>::new(provider.weak_client()).with_next_yield(1); - let mut stream = Box::pin(new_blocks.into_stream().await.unwrap()); + let mut stream = Box::pin(new_blocks.into_stream()); + if ws { + let _ = try_timeout(stream.next()).await; // Subscribe to newHeads. + } // We will also use provider to manipulate anvil instance via RPC. provider.anvil_mine(Some(U256::from(1)), None).await.unwrap(); @@ -242,7 +248,10 @@ mod tests { let provider = ProviderBuilder::new().on_builtin(&url).await.unwrap(); let new_blocks = NewBlocks::<_, Ethereum>::new(provider.weak_client()).with_next_yield(1); - let stream = Box::pin(new_blocks.into_stream().await.unwrap()); + let mut stream = Box::pin(new_blocks.into_stream()); + if ws { + let _ = try_timeout(stream.next()).await; // Subscribe to newHeads. + } // We will also use provider to manipulate anvil instance via RPC. provider.anvil_mine(Some(U256::from(BLOCKS_TO_MINE)), None).await.unwrap(); diff --git a/crates/provider/src/provider/root.rs b/crates/provider/src/provider/root.rs index f2cccbd20cf..c8573863ca8 100644 --- a/crates/provider/src/provider/root.rs +++ b/crates/provider/src/provider/root.rs @@ -5,10 +5,12 @@ use crate::{ }; use alloy_network::{Ethereum, Network}; use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, ClientRef, RpcClient, WeakClient}; -use alloy_transport::{ - BoxTransport, BoxTransportConnect, Transport, TransportError, TransportResult, +use alloy_transport::{BoxTransport, BoxTransportConnect, Transport, TransportError}; +use std::{ + fmt, + marker::PhantomData, + sync::{Arc, OnceLock}, }; -use std::{fmt, marker::PhantomData, sync::Arc}; #[cfg(feature = "reqwest")] use alloy_transport_http::Http; @@ -105,15 +107,12 @@ impl RootProvider { } #[inline] - pub(crate) async fn get_heart(&self) -> TransportResult<&HeartbeatHandle> { - self.inner - .heart - .get_or_try_init(|| async { - let new_blocks = NewBlocks::::new(self.inner.weak_client()); - let stream = new_blocks.into_stream().await?; - Ok(Heartbeat::new(Box::pin(stream)).spawn()) - }) - .await + pub(crate) fn get_heart(&self) -> &HeartbeatHandle { + self.inner.heart.get_or_init(|| { + let new_blocks = NewBlocks::::new(self.inner.weak_client()); + let stream = new_blocks.into_stream(); + Heartbeat::new(Box::pin(stream)).spawn() + }) } } @@ -121,7 +120,7 @@ impl RootProvider { /// base of every provider stack. pub(crate) struct RootProviderInner { client: RpcClient, - heart: tokio::sync::OnceCell>, + heart: OnceLock>, _network: PhantomData, } diff --git a/crates/provider/src/provider/trait.rs b/crates/provider/src/provider/trait.rs index a7372b8478a..5443ad9c447 100644 --- a/crates/provider/src/provider/trait.rs +++ b/crates/provider/src/provider/trait.rs @@ -708,7 +708,7 @@ pub trait Provider: ) -> TransportResult> { // Make sure to initialize heartbeat before we submit transaction, so that // we don't miss it if user will subscriber to it immediately after sending. - let _handle = self.root().get_heart().await?; + let _handle = self.root().get_heart(); match tx { SendableTx::Builder(mut tx) => { @@ -1023,7 +1023,6 @@ impl Provider for RootProvider { }; self.get_heart() - .await? .watch_tx(config, block_number) .await .map_err(|_| PendingTransactionError::FailedToRegister) From 2ff30e10b6396a23b196c17664378562311b34f9 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Mon, 23 Sep 2024 22:00:51 +0200 Subject: [PATCH 8/8] testname --- crates/provider/src/ext/anvil.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/provider/src/ext/anvil.rs b/crates/provider/src/ext/anvil.rs index 9349e1eb8cd..a32d3439258 100644 --- a/crates/provider/src/ext/anvil.rs +++ b/crates/provider/src/ext/anvil.rs @@ -797,7 +797,7 @@ mod tests { } #[tokio::test] - async fn test_anvil_set_block_timestamp_interval_anvil_remove_block_timestamp_interval() { + async fn test_anvil_block_timestamp_interval() { let provider = ProviderBuilder::new().on_anvil(); provider.anvil_set_block_timestamp_interval(1).await.unwrap();