Skip to content

Commit

Permalink
Fix range sync to select custody peers from its syncing chain instead…
Browse files Browse the repository at this point in the history
… of the global peer list.
  • Loading branch information
jimmygchen committed Jan 21, 2025
1 parent eff9a5b commit 614f984
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 29 deletions.
7 changes: 7 additions & 0 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,11 +941,18 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, is_blob_batch) = batch.to_blocks_by_range_request();

let synced_peers = {
let peers = self.network_globals.peers.read();
peers.synced_peers().copied().collect::<Vec<_>>()
};

match network.block_components_by_range_request(
peer,
is_blob_batch,
request,
RangeRequestId::BackfillSync { batch_id },
synced_peers.into_iter(),
) {
Ok(request_id) => {
// inform the batch about the new request
Expand Down
32 changes: 25 additions & 7 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256,
SignedBeaconBlock, Slot,
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, DataColumnSubnetId,
EthSpec, Hash256, SignedBeaconBlock, Slot,
};

pub mod custody;
Expand Down Expand Up @@ -307,10 +307,23 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.custody_peers_for_column(column_index)
}

pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option<PeerId> {
self.get_custodial_peers(column_index)
.into_iter()
/// Chooses a random peer assigned to custody `column_index` from the provided `syncing_peers`.
pub fn choose_random_custodial_peer<'a>(
&self,
column_index: ColumnIndex,
syncing_peers: impl Iterator<Item = &'a PeerId>,
) -> Option<PeerId> {
let peer_db_read_lock = self.network_globals().peers.read();
let subnet_id = DataColumnSubnetId::from_column_index(column_index, &self.chain.spec);

syncing_peers
.filter(|peer_id| {
peer_db_read_lock
.peer_info(peer_id)
.is_some_and(|peer_info| peer_info.is_assigned_to_custody_subnet(&subnet_id))
})
.choose(&mut thread_rng())
.copied()
}

pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
Expand Down Expand Up @@ -358,6 +371,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
batch_type: ByRangeRequestType,
request: BlocksByRangeRequest,
sender_id: RangeRequestId,
syncing_peers: impl Iterator<Item = PeerId>,
) -> Result<Id, RpcRequestSendError> {
let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
let id = self.next_id();
Expand Down Expand Up @@ -426,7 +440,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let mut num_of_custody_column_req = 0;

for (peer_id, columns_by_range_request) in
self.make_columns_by_range_requests(request, &column_indexes)?
self.make_columns_by_range_requests(request, &column_indexes, syncing_peers)?
{
requested_peers.push(peer_id);

Expand Down Expand Up @@ -473,14 +487,18 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&self,
request: BlocksByRangeRequest,
custody_indexes: &HashSet<ColumnIndex>,
syncing_peers: impl Iterator<Item = PeerId>,
) -> Result<HashMap<PeerId, DataColumnsByRangeRequest>, RpcRequestSendError> {
let mut peer_id_to_request_map = HashMap::new();
let syncing_peers = syncing_peers.collect::<Vec<_>>();

for column_index in custody_indexes {
// TODO(das): The peer selection logic here needs to be improved - we should probably
// avoid retrying from failed peers, however `BatchState` currently only tracks the peer
// serving the blocks.
let Some(custody_peer) = self.get_random_custodial_peer(*column_index) else {
let Some(custody_peer) =
self.choose_random_custodial_peer(*column_index, syncing_peers.iter())
else {
// TODO(das): this will be pretty bad UX. To improve we should:
// - Attempt to fetch custody requests first, before requesting blocks
// - Handle the no peers case gracefully, maybe add some timeout and give a few
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
peer: PeerId,
) -> ProcessingResult {
let batch_state = self.visualize_batch_state();
let syncing_peers = self.peers().collect::<Vec<_>>();

if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();

match network.block_components_by_range_request(
peer,
batch_type,
Expand All @@ -973,6 +976,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
chain_id: self.id,
batch_id,
},
syncing_peers.into_iter(),
) {
Ok(request_id) => {
// inform the batch about the new request
Expand Down
6 changes: 5 additions & 1 deletion beacon_node/network/src/sync/tests/lookups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ impl TestRig {
self.fork_name.deneb_enabled()
}

pub fn after_fulu(&self) -> bool {
self.fork_name.fulu_enabled()
}

fn trigger_unknown_parent_block(&mut self, peer_id: PeerId, block: Arc<SignedBeaconBlock<E>>) {
let block_root = block.canonical_root();
self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root))
Expand Down Expand Up @@ -364,7 +368,7 @@ impl TestRig {
.__add_connected_peer_testing_only(false, &self.harness.spec)
}

fn new_connected_supernode_peer(&mut self) -> PeerId {
pub fn new_connected_supernode_peer(&mut self) -> PeerId {
self.network_globals
.peers
.write()
Expand Down
107 changes: 86 additions & 21 deletions beacon_node/network/src/sync/tests/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::status::ToStatusMessage;
use crate::sync::manager::SLOT_IMPORT_TOLERANCE;
use crate::sync::range_sync::RangeSyncType;
use crate::sync::SyncMessage;
use beacon_chain::data_column_verification::CustodyDataColumn;
use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy};
use beacon_chain::{block_verification_types::RpcBlock, EngineState, NotifyExecutionLayer};
use lighthouse_network::rpc::{RequestType, StatusMessage};
Expand All @@ -16,6 +17,11 @@ use types::{

const D: Duration = Duration::new(0, 0);

pub(crate) enum DataSidecars<E: EthSpec> {
Blobs(BlobSidecarList<E>),
DataColumns(Vec<CustodyDataColumn<E>>),
}

impl TestRig {
/// Produce a head peer with an advanced head
fn add_head_peer(&mut self) -> PeerId {
Expand Down Expand Up @@ -67,7 +73,9 @@ impl TestRig {

fn add_peer(&mut self, remote_info: SyncInfo) -> PeerId {
// Create valid peer known to network globals
let peer_id = self.new_connected_peer();
// TODO(fulu): Using supernode peers to ensure we have peer across all column
// subnets for syncing. Should add tests connecting to full node peers.
let peer_id = self.new_connected_supernode_peer();
// Send peer to sync
self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info.clone()));
peer_id
Expand Down Expand Up @@ -110,7 +118,19 @@ impl TestRig {
})
.expect("Should have a blocks by range request");

let blob_req_id = if self.after_deneb() {
let blob_req_id = if self.after_fulu() {
Some(
self.pop_received_network_event(|ev| match ev {
NetworkMessage::SendRequest {
peer_id,
request: RequestType::DataColumnsByRange(_),
request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }),
} if peer_id == target_peer_id => Some(*id),
_ => None,
})
.expect("Should have a data columns by range request"),
)
} else if self.after_deneb() {
Some(
self.pop_received_network_event(|ev| match ev {
NetworkMessage::SendRequest {
Expand Down Expand Up @@ -144,22 +164,33 @@ impl TestRig {
});

if let Some(blobs_req_id) = blobs_req_id {
// Complete the request with a single stream termination
self.log(&format!(
"Completing BlobsByRange request {blobs_req_id} with empty stream"
));
self.send_sync_message(SyncMessage::RpcBlob {
request_id: SyncRequestId::RangeBlockAndBlobs { id: blobs_req_id },
peer_id: target_peer_id,
blob_sidecar: None,
seen_timestamp: D,
});
if self.after_fulu() {
// Complete the request with a single stream termination
self.log(&format!(
"Completing DataColumnsByRange request {blobs_req_id} with empty stream"
));
self.send_sync_message(SyncMessage::RpcDataColumn {
request_id: SyncRequestId::RangeBlockAndBlobs { id: blobs_req_id },
peer_id: target_peer_id,
data_column: None,
seen_timestamp: D,
});
} else {
// Complete the request with a single stream termination
self.log(&format!(
"Completing BlobsByRange request {blobs_req_id} with empty stream"
));
self.send_sync_message(SyncMessage::RpcBlob {
request_id: SyncRequestId::RangeBlockAndBlobs { id: blobs_req_id },
peer_id: target_peer_id,
blob_sidecar: None,
seen_timestamp: D,
});
}
}
}

async fn create_canonical_block(
&mut self,
) -> (SignedBeaconBlock<E>, Option<BlobSidecarList<E>>) {
async fn create_canonical_block(&mut self) -> (SignedBeaconBlock<E>, Option<DataSidecars<E>>) {
self.harness.advance_slot();

let block_root = self
Expand All @@ -170,20 +201,38 @@ impl TestRig {
AttestationStrategy::AllValidators,
)
.await;
// TODO(das): this does not handle data columns yet

let store = &self.harness.chain.store;
let block = store.get_full_block(&block_root).unwrap().unwrap();
let blobs = if block.fork_name_unchecked().deneb_enabled() {
store.get_blobs(&block_root).unwrap().blobs()
let fork = block.fork_name_unchecked();

let data_sidecars = if fork.fulu_enabled() {
store
.get_data_columns(&block_root)
.unwrap()
.map(|columns| {
columns
.into_iter()
.map(CustodyDataColumn::from_asserted_custody)
.collect()
})
.map(DataSidecars::DataColumns)
} else if fork.deneb_enabled() {
store
.get_blobs(&block_root)
.unwrap()
.blobs()
.map(DataSidecars::Blobs)
} else {
None
};
(block, blobs)

(block, data_sidecars)
}

async fn remember_block(
&mut self,
(block, blob_sidecars): (SignedBeaconBlock<E>, Option<BlobSidecarList<E>>),
(block, data_sidecars): (SignedBeaconBlock<E>, Option<DataSidecars<E>>),
) {
// This code is kind of duplicated from Harness::process_block, but takes sidecars directly.
let block_root = block.canonical_root();
Expand All @@ -193,7 +242,7 @@ impl TestRig {
.chain
.process_block(
block_root,
RpcBlock::new(Some(block_root), block.into(), blob_sidecars).unwrap(),
build_rpc_block(block.into(), &data_sidecars, &self.spec),
NotifyExecutionLayer::Yes,
BlockImportSource::RangeSync,
|| Ok(()),
Expand All @@ -206,6 +255,22 @@ impl TestRig {
}
}

fn build_rpc_block(
block: Arc<SignedBeaconBlock<E>>,
data_sidecars: &Option<DataSidecars<E>>,
spec: &ChainSpec,
) -> RpcBlock<E> {
match data_sidecars {
Some(DataSidecars::Blobs(blobs)) => {
RpcBlock::new(None, block, Some(blobs.clone())).unwrap()
}
Some(DataSidecars::DataColumns(columns)) => {
RpcBlock::new_with_custody_columns(None, block, columns.clone(), spec).unwrap()
}
None => RpcBlock::new_without_blobs(None, block),
}
}

#[test]
fn head_chain_removed_while_finalized_syncing() {
// NOTE: this is a regression test.
Expand Down

0 comments on commit 614f984

Please sign in to comment.