From 25ab85c5a79ca74532d953a1f476358565cf03f9 Mon Sep 17 00:00:00 2001 From: Roy <42067944+royvardhan@users.noreply.github.com> Date: Sat, 10 Aug 2024 20:05:42 +0530 Subject: [PATCH] feat: ForkChoiceNotifications for BlockchainProvider and NoopProvider (#10231) Co-authored-by: Matthias Seitz --- crates/chain-state/src/chain_info.rs | 10 +++++++ crates/chain-state/src/notifications.rs | 28 +++++++++---------- crates/storage/provider/src/providers/mod.rs | 17 ++++++++++- .../storage/provider/src/test_utils/noop.rs | 19 +++++++++++-- 4 files changed, 57 insertions(+), 17 deletions(-) diff --git a/crates/chain-state/src/chain_info.rs b/crates/chain-state/src/chain_info.rs index 341d85c8c26c..142d19703b3f 100644 --- a/crates/chain-state/src/chain_info.rs +++ b/crates/chain-state/src/chain_info.rs @@ -121,6 +121,16 @@ impl ChainInfoTracker { let _ = h.replace(header); }); } + + /// Subscribe to the finalized block. + pub fn subscribe_to_finalized_block(&self) -> watch::Receiver> { + self.inner.finalized_block.subscribe() + } + + /// Subscribe to the safe block. + pub fn subscribe_to_safe_block(&self) -> watch::Receiver> { + self.inner.safe_block.subscribe() + } } /// Container type for all chain info fields diff --git a/crates/chain-state/src/notifications.rs b/crates/chain-state/src/notifications.rs index 780f036ebfe3..a4ab993d8e73 100644 --- a/crates/chain-state/src/notifications.rs +++ b/crates/chain-state/src/notifications.rs @@ -140,17 +140,20 @@ impl CanonStateNotification { /// Wrapper around a broadcast receiver that receives fork choice notifications. #[derive(Debug, Deref, DerefMut)] -pub struct ForkChoiceNotifications(broadcast::Receiver); +pub struct ForkChoiceNotifications(pub watch::Receiver>); /// A trait that allows to register to fork choice related events /// and get notified when a new fork choice is available. pub trait ForkChoiceSubscriptions: Send + Sync { - /// Get notified when a new head of the chain is selected. - fn subscribe_to_fork_choice(&self) -> ForkChoiceNotifications; + /// Get notified when a new safe block of the chain is selected. + fn subscribe_to_safe_block(&self) -> ForkChoiceNotifications; - /// Convenience method to get a stream of the new head of the chain. + /// Get notified when a new finalized block of the chain is selected. + fn subscribe_to_finalized_block(&self) -> ForkChoiceNotifications; + + /// Convenience method to get a stream of the new safe blocks of the chain. fn fork_choice_stream(&self) -> ForkChoiceStream { - ForkChoiceStream { st: BroadcastStream::new(self.subscribe_to_fork_choice().0) } + ForkChoiceStream { st: WatchStream::new(self.subscribe_to_safe_block().0) } } } @@ -159,7 +162,7 @@ pub trait ForkChoiceSubscriptions: Send + Sync { #[pin_project::pin_project] pub struct ForkChoiceStream { #[pin] - st: BroadcastStream, + st: WatchStream>, } impl Stream for ForkChoiceStream { @@ -167,14 +170,11 @@ impl Stream for ForkChoiceStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - return match ready!(self.as_mut().project().st.poll_next(cx)) { - Some(Ok(notification)) => Poll::Ready(Some(notification)), - Some(Err(err)) => { - debug!(%err, "finalized header notification stream lagging behind"); - continue - } - None => Poll::Ready(None), - }; + match ready!(self.as_mut().project().st.poll_next(cx)) { + Some(Some(notification)) => return Poll::Ready(Some(notification)), + Some(None) => continue, + None => return Poll::Ready(None), + } } } } diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 10da087ee1c1..35fdb3bc9c1a 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -12,7 +12,7 @@ use reth_blockchain_tree_api::{ BlockValidationKind, BlockchainTreeEngine, BlockchainTreeViewer, CanonicalOutcome, InsertPayloadOk, }; -use reth_chain_state::ChainInfoTracker; +use reth_chain_state::{ChainInfoTracker, ForkChoiceNotifications, ForkChoiceSubscriptions}; use reth_chainspec::{ChainInfo, ChainSpec}; use reth_db_api::{ database::Database, @@ -940,6 +940,21 @@ where } } +impl ForkChoiceSubscriptions for BlockchainProvider +where + DB: Send + Sync, +{ + fn subscribe_to_safe_block(&self) -> ForkChoiceNotifications { + let receiver = self.chain_info.subscribe_to_safe_block(); + ForkChoiceNotifications(receiver) + } + + fn subscribe_to_finalized_block(&self) -> ForkChoiceNotifications { + let receiver = self.chain_info.subscribe_to_finalized_block(); + ForkChoiceNotifications(receiver) + } +} + impl ChangeSetReader for BlockchainProvider where DB: Database, diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 1d8eef60fe70..4796a0e02276 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -4,7 +4,10 @@ use std::{ sync::Arc, }; -use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions}; +use reth_chain_state::{ + CanonStateNotifications, CanonStateSubscriptions, ForkChoiceNotifications, + ForkChoiceSubscriptions, +}; use reth_chainspec::{ChainInfo, ChainSpec, MAINNET}; use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices}; use reth_errors::ProviderError; @@ -21,7 +24,7 @@ use reth_storage_api::StateProofProvider; use reth_storage_errors::provider::ProviderResult; use reth_trie::{updates::TrieUpdates, AccountProof, HashedPostState}; use revm::primitives::{BlockEnv, CfgEnvWithHandlerCfg}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, watch}; use crate::{ providers::StaticFileProvider, @@ -532,3 +535,15 @@ impl CanonStateSubscriptions for NoopProvider { broadcast::channel(1).1 } } + +impl ForkChoiceSubscriptions for NoopProvider { + fn subscribe_to_safe_block(&self) -> ForkChoiceNotifications { + let (_, rx) = watch::channel(None); + ForkChoiceNotifications(rx) + } + + fn subscribe_to_finalized_block(&self) -> ForkChoiceNotifications { + let (_, rx) = watch::channel(None); + ForkChoiceNotifications(rx) + } +}