Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
DaniPopes committed Sep 20, 2024
1 parent 6ffc102 commit d49c784
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
7 changes: 5 additions & 2 deletions crates/provider/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ use alloy_primitives::{BlockNumber, U64};
use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient};
use alloy_transport::{RpcError, Transport, TransportResult};
use async_stream::stream;
use futures::{future::Either, Stream, StreamExt};
use futures::{Stream, StreamExt};
use lru::LruCache;
use std::{marker::PhantomData, num::NonZeroUsize};

#[cfg(feature = "pubsub")]
use futures::future::Either;

/// The size of the block cache.
const BLOCK_CACHE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(10) };

Expand All @@ -27,7 +30,7 @@ pub(crate) struct NewBlocks<T, N: Network = Ethereum> {
impl<T: Transport + Clone, N: Network> NewBlocks<T, N> {
pub(crate) fn new(client: WeakClient<T>) -> Self {
Self {
client: client.clone(),
client,
next_yield: NO_BLOCK_NUMBER,
known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
_phantom: PhantomData,
Expand Down
26 changes: 13 additions & 13 deletions crates/provider/src/heart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,30 +629,30 @@ impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat
#[cfg(target_arch = "wasm32")]
impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
/// Spawn the heartbeat task, returning a [`HeartbeatHandle`].
pub(crate) fn spawn(self) -> HeartbeatHandle {
let (latest, latest_rx) = watch::channel(None::<N::BlockResponse>);
let (ix_tx, ixns) = mpsc::channel(16);

self.into_future(latest, ixns).spawn_task();

HeartbeatHandle { tx: ix_tx, latest: latest_rx }
pub(crate) fn spawn(self) -> HeartbeatHandle<N> {
let (task, handle) = self.consume();
task.spawn_task();
handle
}
}

#[cfg(not(target_arch = "wasm32"))]
impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + Send + 'static> Heartbeat<N, S> {
/// Spawn the heartbeat task, returning a [`HeartbeatHandle`].
pub(crate) fn spawn(self) -> HeartbeatHandle<N> {
let (latest, latest_rx) = watch::channel(None::<N::BlockResponse>);
let (ix_tx, ixns) = mpsc::channel(16);

self.into_future(latest, ixns).spawn_task();

HeartbeatHandle { tx: ix_tx, latest: latest_rx }
let (task, handle) = self.consume();
task.spawn_task();
handle
}
}

impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
fn consume(self) -> (impl Future<Output = ()>, HeartbeatHandle<N>) {
let (latest, latest_rx) = watch::channel(None::<N::BlockResponse>);
let (ix_tx, ixns) = mpsc::channel(16);
(self.into_future(latest, ixns), HeartbeatHandle { tx: ix_tx, latest: latest_rx })
}

async fn into_future(
mut self,
latest: watch::Sender<Option<N::BlockResponse>>,
Expand Down

0 comments on commit d49c784

Please sign in to comment.