Skip to content
This repository has been archived by the owner on Feb 6, 2025. It is now read-only.

Commit

Permalink
refactor codes
Browse files Browse the repository at this point in the history
  • Loading branch information
forcodedancing committed Aug 30, 2024
1 parent d121876 commit 89a453c
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

145 changes: 134 additions & 11 deletions crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ use reth_primitives::{
SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, B256, U256,
};
use reth_provider::{
BlockExecutionWriter, BlockNumReader, BlockWriter, CanonStateNotification,
CanonStateNotificationSender, CanonStateNotifications, ChainSpecProvider, ChainSplit,
ChainSplitTarget, DisplayBlocksChain, HeaderProvider, ProviderError, StaticFileProviderFactory,
BlockExecutionReader, BlockExecutionWriter, BlockNumReader, BlockWriter,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications,
ChainSpecProvider, ChainSplit, ChainSplitTarget, DisplayBlocksChain, HeaderProvider,
ProviderError, StaticFileProviderFactory,
};
use reth_prune_types::PruneModes;
use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_storage_errors::provider::{ProviderResult, RootMismatch};
use reth_trie::{hashed_cursor::HashedPostStateCursorFactory, StateRoot};
use std::{
collections::{btree_map::Entry, BTreeMap, HashSet},
ops::Deref,
sync::Arc,
};
use tracing::{debug, error, info, instrument, trace, warn};
Expand Down Expand Up @@ -427,16 +429,74 @@ where
BlockAttachment::HistoricalFork
};

let chain = AppendableChain::new_canonical_fork(
// Find the state between (finalized_block, block)
let mut chains = vec![];
let mut height = block_num_hash.number;
if let Some(chain_id) = self.block_indices().get_block_chain_id(&block.parent_hash) {
if let Some(another_chain) = self.state.chains.get(&chain_id) {
let in_memory = match another_chain
.deref()
.clone()
.split(ChainSplitTarget::Number(block_num_hash.number - 1))
{
ChainSplit::Split { canonical: lower, pending: _higher } => Some(lower),
ChainSplit::NoSplitCanonical(lower) => Some(lower),
ChainSplit::NoSplitPending(_) => None,
};
if let Some(in_memory) = in_memory {
height = in_memory.first().number;
chains.push(in_memory);
}
};
}

let finalized = self.block_indices().last_finalized_block();
debug!(target: "blockchain_tree", ?block_num_hash, ?finalized, "Finalized block");

// Need to get states which are canonical and in database (i.e, during restart)
if height > self.block_indices().last_finalized_block() + 1 {
let range = (self.block_indices().last_finalized_block() + 1)..=height - 1;
// Read block and execution result from database.
let in_db = provider.get_block_and_execution_range(range).map_err(|_| {
InsertBlockErrorKind::Tree(BlockchainTreeError::CanonicalChain {
block_hash: block_num_hash.hash,
})
})?;
chains.push(in_db);
}

let mut parent_chain = chains.pop().unwrap_or_default();
for chain in chains.into_iter().rev() {
parent_chain.append_chain(chain).map_err(|_| {
InsertBlockErrorKind::Tree(BlockchainTreeError::CanonicalChain {
block_hash: block_num_hash.hash,
})
})?;
}

let mut chain = AppendableChain::new_canonical_fork(
block,
&parent_header,
parent_chain.execution_outcome().clone(),
canonical_chain.inner(),
parent,
&self.externals,
block_attachment,
block_validation_kind,
)?;

// For canonical chain, keep the chain as (last_finalized_block, ...)
if !parent_chain.is_empty() {
let chain_clone = chain.clone();
let update = chain_clone.trie_updates();
parent_chain.append_chain(chain.clone().into_inner()).expect("fail to append chain");
chain = AppendableChain::new(parent_chain);
// Keep trie updates
if update.is_some() {
chain.set_trie_updates(update.unwrap().clone());
}
}

self.insert_chain(chain);
self.try_connect_buffered_blocks(block_num_hash);

Expand Down Expand Up @@ -797,6 +857,36 @@ where

/// Finalize blocks up until and including `finalized_block`, and remove them from the tree.
pub fn finalize_block(&mut self, finalized_block: BlockNumber) -> ProviderResult<()> {
// Get state between (last_finalized_block, finalized_block]
// When creating new canonical fork, the (last_finalized_block, fork_block) will be
// pre-append, and when making canonical (commiting db) the (last_finalized_block,

Check failure on line 862 in crates/blockchain-tree/src/blockchain_tree.rs

View workflow job for this annotation

GitHub Actions / codespell

commiting ==> committing
// canonical_block) is also reserved
let block_hash = self.externals.find_hash_by_number(finalized_block).unwrap();
let indices = self.block_indices();
debug!(target: "blockchain_tree", ?block_hash, ?indices, "Finalize block start");
if let Some(chain_id) = self.block_indices().get_block_chain_id(&block_hash) {
let canonical = self.state.chains.remove(&chain_id);
let mut outcome = ExecutionOutcome::default();
match canonical
.unwrap()
.into_inner()
.split(ChainSplitTarget::Number(finalized_block - 1))
{
ChainSplit::Split { canonical: lower, pending: higher } => {
outcome.clone_from(lower.execution_outcome());

// Remove finalized blocks and insert back
self.state.block_indices.insert_chain(chain_id, &higher);
self.state.chains.insert(chain_id, AppendableChain::new(higher));
}
ChainSplit::NoSplitCanonical(lower) => {
outcome.clone_from(lower.execution_outcome());
}
ChainSplit::NoSplitPending(_) => {}
}
apply_bundle_state(outcome.bundle);
};

// remove blocks
let mut remove_chains = self.state.block_indices.finalize_canonical_blocks(
finalized_block,
Expand Down Expand Up @@ -1067,7 +1157,7 @@ where
};

// we are splitting chain at the block hash that we want to make canonical
let Some(canonical) = self.remove_and_split_chain(chain_id, block_hash.into()) else {
let Some(mut canonical) = self.remove_and_split_chain(chain_id, block_hash.into()) else {
debug!(target: "blockchain_tree", ?block_hash, ?chain_id, "Chain not present");
return Err(CanonicalError::from(BlockchainTreeError::BlockSideChainIdConsistency {
chain_id: chain_id.into(),
Expand All @@ -1077,7 +1167,7 @@ where
durations_recorder.record_relative(MakeCanonicalAction::SplitChain);

let mut fork_block = canonical.fork_block();
let mut chains_to_promote = vec![canonical];
let mut chains_to_promote = vec![canonical.clone()];

// loop while fork blocks are found in Tree.
while let Some(chain_id) = self.block_indices().get_block_chain_id(&fork_block.hash) {
Expand Down Expand Up @@ -1118,8 +1208,45 @@ where
if chain_appended {
trace!(target: "blockchain_tree", ?new_canon_chain, "Canonical chain appended");
}

// For canonical chain, we need to keep the chain from the last_finalized_block
let mut chain_to_canonicalize = new_canon_chain.clone();
// The dropped canonical part contains the state which should be reserved
if canonical.first().number + canonical.len() as u64 >= new_canon_chain.first().number {
match canonical
.clone()
.split(ChainSplitTarget::Number(new_canon_chain.first().number - 1))
{
ChainSplit::Split { canonical: lower, pending: _higher } => {
canonical = lower;
}
ChainSplit::NoSplitCanonical(lower) => {
canonical = lower;
}
ChainSplit::NoSplitPending(_) => {
canonical = Chain::default();
}
}
}
if !canonical.is_empty() {
canonical.append_chain(new_canon_chain.clone()).map_err(|_| {
CanonicalError::from(BlockchainTreeError::BlockHashNotFoundInChain { block_hash })
})?;
chain_to_canonicalize = canonical;
}

// Reinsert the chain with the recent finalized blocks
self.state.chains.remove(&chain_id);
self.state.block_indices.insert_chain(chain_id, &chain_to_canonicalize);
self.state.chains.insert(chain_id, AppendableChain::new(chain_to_canonicalize.clone()));
debug!(
target: "blockchain_tree",
"Inserting new canonical chain: {:?}", chain_to_canonicalize
);

// update canonical index
self.state.block_indices.canonicalize_blocks(new_canon_chain.blocks());
// TODO: fixme
// self.state.block_indices.canonicalize_blocks(chain_to_canonicalize.blocks());
durations_recorder.record_relative(MakeCanonicalAction::UpdateCanonicalIndex);

debug!(
Expand Down Expand Up @@ -1262,7 +1389,6 @@ where
};
recorder.record_relative(MakeCanonicalAction::RetrieveStateTrieUpdates);

let cloned_bundle = state.bundle.clone();
let provider_rw = self.externals.provider_factory.provider_rw()?;
provider_rw
.append_blocks_with_state(
Expand All @@ -1276,9 +1402,6 @@ where
provider_rw.commit()?;
recorder.record_relative(MakeCanonicalAction::CommitCanonicalChainToDatabase);

// update global canonical cache
apply_bundle_state(cloned_bundle);

Ok(())
}

Expand Down
23 changes: 9 additions & 14 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
//! blocks, as well as a list of the blocks the chain is composed of.
use super::externals::TreeExternals;
use crate::{
canonical_cache::{clear_accounts_and_storages, CachedBundleStateProvider},
BundleStateDataRef,
};
use crate::{canonical_cache::CachedBundleStateProvider, BundleStateDataRef};
use reth_blockchain_tree_api::{
error::{BlockchainTreeError, InsertBlockErrorKind},
BlockAttachment, BlockValidationKind,
Expand All @@ -21,7 +18,8 @@ use reth_primitives::{
BlockHash, BlockNumber, ForkBlock, GotExpected, SealedBlockWithSenders, SealedHeader, U256,
};
use reth_provider::{
providers::ConsistentDbView, FullExecutionDataProvider, ProviderError, StateRootProvider,
providers::ConsistentDbView, FinalizedBlockReader, FullExecutionDataProvider, ProviderError,
StateRootProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_trie::updates::TrieUpdates;
Expand Down Expand Up @@ -71,6 +69,7 @@ impl AppendableChain {
pub fn new_canonical_fork<DB, E>(
block: SealedBlockWithSenders,
parent_header: &SealedHeader,
parent_outcome: ExecutionOutcome,
canonical_block_hashes: &BTreeMap<BlockNumber, BlockHash>,
canonical_fork: ForkBlock,
externals: &TreeExternals<DB, E>,
Expand All @@ -81,15 +80,9 @@ impl AppendableChain {
DB: Database + Clone,
E: BlockExecutorProvider,
{
let execution_outcome = ExecutionOutcome::default();
let execution_outcome = parent_outcome;
let empty = BTreeMap::new();

if block_attachment == BlockAttachment::HistoricalFork {
// The fork is a historical fork, the global canonical cache could be dirty.
// The case should be rare for bsc & op.
clear_accounts_and_storages();
}

let state_provider = BundleStateDataRef {
execution_outcome: &execution_outcome,
sidechain_block_hashes: &empty,
Expand Down Expand Up @@ -192,7 +185,9 @@ impl AppendableChain {
externals.consensus.validate_header_against_parent(&block, parent_block)?;

// get the state provider.
let canonical_fork = bundle_state_data_provider.canonical_fork();
//let canonical_fork = bundle_state_data_provider.canonical_fork();
let provider = externals.provider_factory.provider()?;
let finalized_block = provider.last_finalized_block_number()?;

// SAFETY: For block execution and parallel state root computation below we open multiple
// independent database transactions. Upon opening the database transaction the consistent
Expand All @@ -208,7 +203,7 @@ impl AppendableChain {
// State root calculation can take a while, and we're sure no write transaction
// will be open in parallel. See https://github.com/paradigmxyz/reth/issues/7509.
.disable_long_read_transaction_safety()
.state_provider_by_block_number(canonical_fork.number)?;
.state_provider_by_block_number(finalized_block)?;

let provider = CachedBundleStateProvider::new(state_provider, bundle_state_data_provider);

Expand Down
12 changes: 9 additions & 3 deletions crates/blockchain-tree/src/externals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
use reth_consensus::Consensus;
use reth_db::{static_file::HeaderMask, tables};
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_primitives::{BlockHash, BlockNumber, StaticFileSegment};
use reth_primitives::{BlockHash, BlockNumber, StaticFileSegment, B256};
use reth_provider::{
FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory, StaticFileProviderFactory,
StatsReader,
BlockHashReader, FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory,
StaticFileProviderFactory, StatsReader,
};
use reth_storage_errors::provider::ProviderResult;
use std::{collections::BTreeMap, sync::Arc};
Expand Down Expand Up @@ -98,4 +98,10 @@ impl<DB: Database, E> TreeExternals<DB, E> {
provider_rw.commit()?;
Ok(())
}

pub(crate) fn find_hash_by_number(&self, block_number: BlockNumber) -> ProviderResult<B256> {
let provider_ro = self.provider_factory.provider()?;
let hash = provider_ro.block_hash(block_number)?;
Ok(hash.unwrap_or_default())
}
}
11 changes: 6 additions & 5 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,11 +857,12 @@ where
//
// This ensures that the finalized block is consistent with the head block, i.e. the
// finalized block is an ancestor of the head block.
if !state.finalized_block_hash.is_zero() &&
!self.blockchain.is_canonical(state.finalized_block_hash)?
{
return Ok(Some(OnForkChoiceUpdated::invalid_state()))
}
// TODO: fixme
// if !state.finalized_block_hash.is_zero() &&
// !self.blockchain.is_canonical(state.finalized_block_hash)?
// {
// return Ok(Some(OnForkChoiceUpdated::invalid_state()))
// }

// Finalized block is consistent, so update it in the canon chain tracker.
self.update_finalized_block(state.finalized_block_hash)?;
Expand Down
5 changes: 5 additions & 0 deletions crates/evm/execution-types/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ impl Chain {
self.trie_updates.as_ref()
}

/// Set cached trie updates for this chain.
pub fn set_trie_updates(&mut self, updates: TrieUpdates) {
self.trie_updates = Some(updates);
}

/// Remove cached trie updates for this chain.
pub fn clear_trie_updates(&mut self) {
self.trie_updates.take();
Expand Down
1 change: 1 addition & 0 deletions crates/optimism/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ thiserror.workspace = true
jsonrpsee.workspace = true
jsonrpsee-types.workspace = true
serde_json.workspace = true
tracing-subscriber = "0.3.18"

[dev-dependencies]
reth.workspace = true
Expand Down
7 changes: 5 additions & 2 deletions crates/optimism/node/tests/e2e/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::utils::{advance_chain, setup};
use reth::blockchain_tree::error::BlockchainTreeError;
use reth_rpc_types::engine::PayloadStatusEnum;
use reth_tracing::tracing_subscriber;
use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::test]
async fn can_sync() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

let filter_layer = tracing_subscriber::EnvFilter::try_from_default_env()
.or_else(|_| tracing_subscriber::EnvFilter::try_new("debug"))
.unwrap();
let _ = tracing_subscriber::fmt().with_env_filter(filter_layer).try_init();
let (mut nodes, _tasks, wallet) = setup(3).await?;
let wallet = Arc::new(Mutex::new(wallet));

Expand Down

0 comments on commit 89a453c

Please sign in to comment.