Skip to content

Commit

Permalink
Revert change: select peers from chain for custody by range requests
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Jan 28, 2025

Verified

This commit was signed with the committer’s verified signature.
kyteinsky Anupam Kumar
1 parent e21b31e commit 38311f8
Showing 5 changed files with 16 additions and 34 deletions.
5 changes: 5 additions & 0 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -613,6 +613,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
blocks: Vec<RpcBlock<T::EthSpec>>,
) -> Result<(), Error<T::EthSpec>> {
let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. });
debug!(self.log, "Batch sending for process";
"blocks" => blocks.len(),
"id" => ?process_id,
);

let processor = self.clone();
let process_fn = async move {
let notify_execution_layer = if processor
6 changes: 0 additions & 6 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
@@ -942,17 +942,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, is_blob_batch) = batch.to_blocks_by_range_request();

let synced_peers = {
let peers = self.network_globals.peers.read();
peers.synced_peers().copied().collect::<Vec<_>>()
};

match network.block_components_by_range_request(
peer,
is_blob_batch,
request,
RangeRequestId::BackfillSync { batch_id },
synced_peers.into_iter(),
) {
Ok(request_id) => {
// inform the batch about the new request
32 changes: 7 additions & 25 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
@@ -43,8 +43,8 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, DataColumnSubnetId,
EthSpec, Hash256, SignedBeaconBlock, Slot,
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256,
SignedBeaconBlock, Slot,
};

pub mod custody;
@@ -307,23 +307,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.custody_peers_for_column(column_index)
}

/// Chooses a random peer assigned to custody `column_index` from the provided `syncing_peers`.
pub fn choose_random_custodial_peer<'a>(
&self,
column_index: ColumnIndex,
syncing_peers: impl Iterator<Item = &'a PeerId>,
) -> Option<PeerId> {
let peer_db_read_lock = self.network_globals().peers.read();
let subnet_id = DataColumnSubnetId::from_column_index(column_index, &self.chain.spec);

syncing_peers
.filter(|peer_id| {
peer_db_read_lock
.peer_info(peer_id)
.is_some_and(|peer_info| peer_info.is_assigned_to_custody_subnet(&subnet_id))
})
pub fn get_random_custodial_peer(&self, column_index: ColumnIndex) -> Option<PeerId> {
self.get_custodial_peers(column_index)
.into_iter()
.choose(&mut thread_rng())
.copied()
}

pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
@@ -371,7 +358,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
batch_type: ByRangeRequestType,
request: BlocksByRangeRequest,
sender_id: RangeRequestId,
syncing_peers: impl Iterator<Item = PeerId>,
) -> Result<Id, RpcRequestSendError> {
let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch());
let id = self.next_id();
@@ -440,7 +426,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let mut num_of_custody_column_req = 0;

for (peer_id, columns_by_range_request) in
self.make_columns_by_range_requests(request, &column_indexes, syncing_peers)?
self.make_columns_by_range_requests(request, &column_indexes)?
{
requested_peers.push(peer_id);

@@ -487,18 +473,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&self,
request: BlocksByRangeRequest,
custody_indexes: &HashSet<ColumnIndex>,
syncing_peers: impl Iterator<Item = PeerId>,
) -> Result<HashMap<PeerId, DataColumnsByRangeRequest>, RpcRequestSendError> {
let mut peer_id_to_request_map = HashMap::new();
let syncing_peers = syncing_peers.collect::<Vec<_>>();

for column_index in custody_indexes {
// TODO(das): The peer selection logic here needs to be improved - we should probably
// avoid retrying from failed peers, however `BatchState` currently only tracks the peer
// serving the blocks.
let Some(custody_peer) =
self.choose_random_custodial_peer(*column_index, syncing_peers.iter())
else {
let Some(custody_peer) = self.get_random_custodial_peer(*column_index) else {
// TODO(das): this will be pretty bad UX. To improve we should:
// - Attempt to fetch custody requests first, before requesting blocks
// - Handle the no peers case gracefully, maybe add some timeout and give a few
2 changes: 0 additions & 2 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
@@ -963,7 +963,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
peer: PeerId,
) -> ProcessingResult {
let batch_state = self.visualize_batch_state();
let syncing_peers = self.peers().collect::<Vec<_>>();

if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();
@@ -976,7 +975,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
chain_id: self.id,
batch_id,
},
syncing_peers.into_iter(),
) {
Ok(request_id) => {
// inform the batch about the new request
5 changes: 4 additions & 1 deletion beacon_node/network/src/sync/tests/range.rs
Original file line number Diff line number Diff line change
@@ -358,6 +358,9 @@ fn pause_and_resume_on_ee_offline() {
// now resume range, we should have two processing requests in the beacon processor.
rig.update_execution_engine_state(EngineState::Online);

rig.expect_chain_segment();
// When adding a finalized peer, the initial head chain stops syncing. So sync only sends a
// pending batch from the finalized chain to the processor.
// TODO: Why did this sent the head chain's batch for processing before??
// rig.expect_chain_segment();
rig.expect_chain_segment();
}

0 comments on commit 38311f8

Please sign in to comment.