Skip to content

Commit

Permalink
feat: improve connecting downloaded blocks (#10368)
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez authored Aug 17, 2024
1 parent f67608d commit 9bed8cf
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 26 deletions.
13 changes: 9 additions & 4 deletions crates/chain-state/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use crate::{
use alloy_signer::SignerSync;
use alloy_signer_local::PrivateKeySigner;
use rand::{thread_rng, Rng};
use reth_chainspec::ChainSpec;
use reth_chainspec::{ChainSpec, EthereumHardfork};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_primitives::{
constants::{EIP1559_INITIAL_BASE_FEE, EMPTY_ROOT_HASH},
proofs::{calculate_receipt_root, calculate_transaction_root},
proofs::{calculate_receipt_root, calculate_transaction_root, calculate_withdrawals_root},
Address, BlockNumber, Header, Receipt, Receipts, Requests, SealedBlock, SealedBlockWithSenders,
Signature, Transaction, TransactionSigned, TransactionSignedEcRecovered, TxEip1559, B256, U256,
};
Expand Down Expand Up @@ -156,15 +156,20 @@ impl TestBlockBuilder {
),
)])),
// use the number as the timestamp so it is monotonically increasing
timestamp: number,
timestamp: number +
EthereumHardfork::Cancun.activation_timestamp(self.chain_spec.chain).unwrap(),
withdrawals_root: Some(calculate_withdrawals_root(&[])),
blob_gas_used: Some(0),
excess_blob_gas: Some(0),
parent_beacon_block_root: Some(B256::random()),
..Default::default()
};

let block = SealedBlock {
header: header.seal_slow(),
body: transactions.into_iter().map(|tx| tx.into_signed()).collect(),
ommers: Vec::new(),
withdrawals: None,
withdrawals: Some(vec![].into()),
requests: None,
};

Expand Down
117 changes: 95 additions & 22 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,10 +646,15 @@ where
}
} else {
let mut latest_valid_hash = None;
let num_hash = block.num_hash();
match self.insert_block_without_senders(block) {
Ok(status) => {
let status = match status {
InsertPayloadOk::Inserted(BlockStatus::Valid(_)) |
InsertPayloadOk::Inserted(BlockStatus::Valid(_)) => {
latest_valid_hash = Some(block_hash);
self.try_connect_buffered_blocks(num_hash);
PayloadStatusEnum::Valid
}
InsertPayloadOk::AlreadySeen(BlockStatus::Valid(_)) => {
latest_valid_hash = Some(block_hash);
PayloadStatusEnum::Valid
Expand Down Expand Up @@ -2028,7 +2033,7 @@ mod tests {
use reth_evm::test_utils::MockExecutorProvider;
use reth_primitives::Bytes;
use reth_provider::test_utils::MockEthProvider;
use reth_rpc_types_compat::engine::block_to_payload_v1;
use reth_rpc_types_compat::engine::{block_to_payload_v1, payload::block_to_payload_v3};
use std::{
str::FromStr,
sync::mpsc::{channel, Sender},
Expand Down Expand Up @@ -2216,6 +2221,19 @@ mod tests {
}
}

async fn send_new_payload(&mut self, block: SealedBlockWithSenders) {
let payload = block_to_payload_v3(block.block.clone());
self.tree
.on_new_payload(
payload.into(),
Some(CancunPayloadFields {
parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
versioned_hashes: vec![],
}),
)
.unwrap();
}

async fn insert_chain(
&mut self,
chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
Expand Down Expand Up @@ -2259,16 +2277,20 @@ mod tests {
&mut self,
chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
) {
for _ in chain.clone() {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
) => {
assert!(chain.clone().into_iter().any(|b| b.hash() == block.hash()));
}
_ => panic!("Unexpected event: {:#?}", event),
for block in chain.clone() {
self.check_canon_block_added(block.hash()).await;
}
}

async fn check_canon_block_added(&mut self, expected_hash: B256) {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
) => {
assert!(block.hash() == expected_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}
}

Expand All @@ -2286,16 +2308,29 @@ mod tests {
self.provider.extend_headers(headers_data);
}

fn setup_range_insertion_for_chain(&mut self, chain: Vec<SealedBlockWithSenders>) {
fn setup_range_insertion_for_valid_chain(&mut self, chain: Vec<SealedBlockWithSenders>) {
self.setup_range_insertion_for_chain(chain, None)
}

fn setup_range_insertion_for_chain(
&mut self,
chain: Vec<SealedBlockWithSenders>,
invalid_index: Option<usize>,
) {
// setting up execution outcomes for the chain, the blocks will be
// executed starting from the oldest, so we need to reverse.
let mut chain_rev = chain;
chain_rev.reverse();

let mut execution_outcomes = Vec::with_capacity(chain_rev.len());
for block in &chain_rev {
for (index, block) in chain_rev.iter().enumerate() {
let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
self.tree.provider.add_state_root(block.state_root);
let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index {
B256::random()
} else {
block.state_root
};
self.tree.provider.add_state_root(state_root);
execution_outcomes.push(execution_outcome);
}
self.extend_execution_outcome(execution_outcomes);
Expand Down Expand Up @@ -2890,7 +2925,7 @@ mod tests {
main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
test_harness.persist_blocks(backfilled_chain.clone());

test_harness.setup_range_insertion_for_chain(backfilled_chain);
test_harness.setup_range_insertion_for_valid_chain(backfilled_chain);

// send message to mark backfill finished
test_harness.tree.on_engine_message(FromEngine::Event(
Expand Down Expand Up @@ -2933,7 +2968,7 @@ mod tests {
.drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
.collect();

test_harness.setup_range_insertion_for_chain(remaining.clone());
test_harness.setup_range_insertion_for_valid_chain(remaining.clone());

// tell engine block range downloaded
test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()));
Expand Down Expand Up @@ -3004,6 +3039,8 @@ mod tests {
// extend base chain
let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
let fork_block = extension_chain.last().unwrap().block.clone();

test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone());
test_harness.insert_chain(extension_chain).await;

// fcu to old_head
Expand All @@ -3014,19 +3051,17 @@ mod tests {
let chain_b = test_harness.block_builder.create_fork(&fork_block, 10);

// insert chain A blocks using newPayload
test_harness.setup_range_insertion_for_chain(chain_a.clone());
test_harness.setup_range_insertion_for_valid_chain(chain_a.clone());
for block in &chain_a {
let payload = block_to_payload_v1(block.block.clone());
test_harness.tree.on_new_payload(payload.into(), None).unwrap();
test_harness.send_new_payload(block.clone()).await;
}

test_harness.check_canon_chain_insertion(chain_a.clone()).await;

// insert chain B blocks using newPayload
test_harness.setup_range_insertion_for_chain(chain_b.clone());
test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
for block in &chain_b {
let payload = block_to_payload_v1(block.block.clone());
test_harness.tree.on_new_payload(payload.into(), None).unwrap();
test_harness.send_new_payload(block.clone()).await;
}

test_harness.check_canon_chain_insertion(chain_b.clone()).await;
Expand All @@ -3047,4 +3082,42 @@ mod tests {
// verify that chain A is now considered a fork
assert!(test_harness.tree.state.tree_state.is_fork(chain_a.last().unwrap().hash()));
}

#[tokio::test]
async fn test_engine_tree_buffered_blocks_are_eventually_connected() {
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec.clone());

let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
test_harness = test_harness.with_blocks(base_chain.clone());

// side chain consisting of two blocks, the last will be inserted first
// so that we force it to be buffered
let side_chain =
test_harness.block_builder.create_fork(base_chain.last().unwrap().block(), 2);

// buffer last block of side chain
let buffered_block = side_chain.last().unwrap();
let buffered_block_hash = buffered_block.hash();

test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]);
test_harness.send_new_payload(buffered_block.clone()).await;

assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some());

let non_buffered_block = side_chain.first().unwrap();
let non_buffered_block_hash = non_buffered_block.hash();

// insert block that continues the canon chain, should not be buffered
test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]);
test_harness.send_new_payload(non_buffered_block.clone()).await;
assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none());

// the previously buffered block should be connected now
assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none());

// both blocks are added to the canon chain in order
test_harness.check_canon_block_added(non_buffered_block_hash).await;
test_harness.check_canon_block_added(buffered_block_hash).await;
}
}

0 comments on commit 9bed8cf

Please sign in to comment.