diff --git a/crates/provider/src/blocks.rs b/crates/provider/src/blocks.rs index 8abab52d0c03..c35a55e8ef40 100644 --- a/crates/provider/src/blocks.rs +++ b/crates/provider/src/blocks.rs @@ -3,10 +3,13 @@ use alloy_primitives::{BlockNumber, U64}; use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient}; use alloy_transport::{RpcError, Transport, TransportResult}; use async_stream::stream; -use futures::{future::Either, Stream, StreamExt}; +use futures::{Stream, StreamExt}; use lru::LruCache; use std::{marker::PhantomData, num::NonZeroUsize}; +#[cfg(feature = "pubsub")] +use futures::future::Either; + /// The size of the block cache. const BLOCK_CACHE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(10) }; @@ -27,7 +30,7 @@ pub(crate) struct NewBlocks { impl NewBlocks { pub(crate) fn new(client: WeakClient) -> Self { Self { - client: client.clone(), + client, next_yield: NO_BLOCK_NUMBER, known_blocks: LruCache::new(BLOCK_CACHE_SIZE), _phantom: PhantomData, diff --git a/crates/provider/src/heart.rs b/crates/provider/src/heart.rs index c75df9357787..181fc2e7e059 100644 --- a/crates/provider/src/heart.rs +++ b/crates/provider/src/heart.rs @@ -629,13 +629,10 @@ impl + Unpin + 'static> Heartbeat #[cfg(target_arch = "wasm32")] impl + Unpin + 'static> Heartbeat { /// Spawn the heartbeat task, returning a [`HeartbeatHandle`]. - pub(crate) fn spawn(self) -> HeartbeatHandle { - let (latest, latest_rx) = watch::channel(None::); - let (ix_tx, ixns) = mpsc::channel(16); - - self.into_future(latest, ixns).spawn_task(); - - HeartbeatHandle { tx: ix_tx, latest: latest_rx } + pub(crate) fn spawn(self) -> HeartbeatHandle { + let (task, handle) = self.consume(); + task.spawn_task(); + handle } } @@ -643,16 +640,19 @@ impl + Unpin + 'static> Heartbeat impl + Unpin + Send + 'static> Heartbeat { /// Spawn the heartbeat task, returning a [`HeartbeatHandle`]. pub(crate) fn spawn(self) -> HeartbeatHandle { - let (latest, latest_rx) = watch::channel(None::); - let (ix_tx, ixns) = mpsc::channel(16); - - self.into_future(latest, ixns).spawn_task(); - - HeartbeatHandle { tx: ix_tx, latest: latest_rx } + let (task, handle) = self.consume(); + task.spawn_task(); + handle } } impl + Unpin + 'static> Heartbeat { + fn consume(self) -> (impl Future, HeartbeatHandle) { + let (latest, latest_rx) = watch::channel(None::); + let (ix_tx, ixns) = mpsc::channel(16); + (self.into_future(latest, ixns), HeartbeatHandle { tx: ix_tx, latest: latest_rx }) + } + async fn into_future( mut self, latest: watch::Sender>,