Skip to content

Commit

Permalink
feat: ForkChoiceNotifications for BlockchainProvider and NoopProvider (
Browse files Browse the repository at this point in the history
…#10231)

Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
royvardhan and mattsse authored Aug 10, 2024
1 parent cdf5d8e commit 25ab85c
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 17 deletions.
10 changes: 10 additions & 0 deletions crates/chain-state/src/chain_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ impl ChainInfoTracker {
let _ = h.replace(header);
});
}

/// Subscribe to the finalized block.
pub fn subscribe_to_finalized_block(&self) -> watch::Receiver<Option<SealedHeader>> {
self.inner.finalized_block.subscribe()
}

/// Subscribe to the safe block.
pub fn subscribe_to_safe_block(&self) -> watch::Receiver<Option<SealedHeader>> {
self.inner.safe_block.subscribe()
}
}

/// Container type for all chain info fields
Expand Down
28 changes: 14 additions & 14 deletions crates/chain-state/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,20 @@ impl CanonStateNotification {

/// Wrapper around a broadcast receiver that receives fork choice notifications.
#[derive(Debug, Deref, DerefMut)]
pub struct ForkChoiceNotifications(broadcast::Receiver<SealedHeader>);
pub struct ForkChoiceNotifications(pub watch::Receiver<Option<SealedHeader>>);

/// A trait that allows to register to fork choice related events
/// and get notified when a new fork choice is available.
pub trait ForkChoiceSubscriptions: Send + Sync {
/// Get notified when a new head of the chain is selected.
fn subscribe_to_fork_choice(&self) -> ForkChoiceNotifications;
/// Get notified when a new safe block of the chain is selected.
fn subscribe_to_safe_block(&self) -> ForkChoiceNotifications;

/// Convenience method to get a stream of the new head of the chain.
/// Get notified when a new finalized block of the chain is selected.
fn subscribe_to_finalized_block(&self) -> ForkChoiceNotifications;

/// Convenience method to get a stream of the new safe blocks of the chain.
fn fork_choice_stream(&self) -> ForkChoiceStream {
ForkChoiceStream { st: BroadcastStream::new(self.subscribe_to_fork_choice().0) }
ForkChoiceStream { st: WatchStream::new(self.subscribe_to_safe_block().0) }
}
}

Expand All @@ -159,22 +162,19 @@ pub trait ForkChoiceSubscriptions: Send + Sync {
#[pin_project::pin_project]
pub struct ForkChoiceStream {
#[pin]
st: BroadcastStream<SealedHeader>,
st: WatchStream<Option<SealedHeader>>,
}

impl Stream for ForkChoiceStream {
type Item = SealedHeader;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Ok(notification)) => Poll::Ready(Some(notification)),
Some(Err(err)) => {
debug!(%err, "finalized header notification stream lagging behind");
continue
}
None => Poll::Ready(None),
};
match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Some(notification)) => return Poll::Ready(Some(notification)),
Some(None) => continue,
None => return Poll::Ready(None),
}
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion crates/storage/provider/src/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_blockchain_tree_api::{
BlockValidationKind, BlockchainTreeEngine, BlockchainTreeViewer, CanonicalOutcome,
InsertPayloadOk,
};
use reth_chain_state::ChainInfoTracker;
use reth_chain_state::{ChainInfoTracker, ForkChoiceNotifications, ForkChoiceSubscriptions};
use reth_chainspec::{ChainInfo, ChainSpec};
use reth_db_api::{
database::Database,
Expand Down Expand Up @@ -940,6 +940,21 @@ where
}
}

impl<DB> ForkChoiceSubscriptions for BlockchainProvider<DB>
where
DB: Send + Sync,
{
fn subscribe_to_safe_block(&self) -> ForkChoiceNotifications {
let receiver = self.chain_info.subscribe_to_safe_block();
ForkChoiceNotifications(receiver)
}

fn subscribe_to_finalized_block(&self) -> ForkChoiceNotifications {
let receiver = self.chain_info.subscribe_to_finalized_block();
ForkChoiceNotifications(receiver)
}
}

impl<DB> ChangeSetReader for BlockchainProvider<DB>
where
DB: Database,
Expand Down
19 changes: 17 additions & 2 deletions crates/storage/provider/src/test_utils/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::{
sync::Arc,
};

use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions};
use reth_chain_state::{
CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications,
ForkChoiceSubscriptions,
};
use reth_chainspec::{ChainInfo, ChainSpec, MAINNET};
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_errors::ProviderError;
Expand All @@ -21,7 +24,7 @@ use reth_storage_api::StateProofProvider;
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{updates::TrieUpdates, AccountProof, HashedPostState};
use revm::primitives::{BlockEnv, CfgEnvWithHandlerCfg};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, watch};

use crate::{
providers::StaticFileProvider,
Expand Down Expand Up @@ -532,3 +535,15 @@ impl CanonStateSubscriptions for NoopProvider {
broadcast::channel(1).1
}
}

impl ForkChoiceSubscriptions for NoopProvider {
fn subscribe_to_safe_block(&self) -> ForkChoiceNotifications {
let (_, rx) = watch::channel(None);
ForkChoiceNotifications(rx)
}

fn subscribe_to_finalized_block(&self) -> ForkChoiceNotifications {
let (_, rx) = watch::channel(None);
ForkChoiceNotifications(rx)
}
}

0 comments on commit 25ab85c

Please sign in to comment.