Skip to content

Commit

Permalink
Improve single block/blob logging (#4579)
Browse files Browse the repository at this point in the history
* remove closure from `check_availability_mayb_import`

* impove logging, add wrapper struct to requested ids

* improve logging

* only log if we're in deneb. Only delay lookup if we're in deneb

* fix bug in missing components check
  • Loading branch information
realbigsean authored Aug 8, 2023
1 parent efbf906 commit 02c7a2e
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 86 deletions.
66 changes: 49 additions & 17 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, Prep
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::head_tracker::HeadTracker;
use crate::historical_blocks::HistoricalBlockError;
use crate::kzg_utils;
use crate::light_client_finality_update_verification::{
Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate,
};
Expand Down Expand Up @@ -68,6 +67,7 @@ use crate::validator_monitor::{
HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS,
};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{kzg_utils, AvailabilityPendingExecutedBlock};
use crate::{metrics, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead};
use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
use execution_layer::{
Expand Down Expand Up @@ -118,7 +118,7 @@ use task_executor::{ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::beacon_state::CloneConfig;
use types::blob_sidecar::BlobSidecarList;
use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList};
use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS;
use types::*;

Expand Down Expand Up @@ -2786,10 +2786,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
self.check_availability_and_maybe_import(blob.slot(), |chain| {
chain.data_availability_checker.put_gossip_blob(blob)
})
.await
self.check_gossip_blob_availability_and_import(blob).await
}

/// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and
Expand Down Expand Up @@ -2840,12 +2837,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
match executed_block {
ExecutedBlock::Available(block) => self.import_available_block(Box::new(block)).await,
ExecutedBlock::AvailabilityPending(block) => {
self.check_availability_and_maybe_import(block.block.slot(), |chain| {
chain
.data_availability_checker
.put_pending_executed_block(block)
})
.await
self.check_block_availability_and_import(block).await
}
}
}
Expand Down Expand Up @@ -2934,17 +2926,57 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// Accepts a fully-verified, available block and imports it into the chain without performing any
/// additional verification.
/* Import methods */

/// Checks if the block is available, and imports immediately if so, otherwise caches the block
/// in the data availability checker.
pub async fn check_block_availability_and_import(
self: &Arc<Self>,
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let slot = block.block.slot();
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
self.process_availability(slot, availability).await
}

/// Checks if the provided blob can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
pub async fn check_gossip_blob_availability_and_import(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let slot = blob.slot();
let availability = self.data_availability_checker.put_gossip_blob(blob)?;

self.process_availability(slot, availability).await
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
pub async fn check_rpc_blob_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let availability = self
.data_availability_checker
.put_rpc_blobs(block_root, blobs)?;

self.process_availability(slot, availability).await
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
///
/// An error is returned if the block was unable to be imported. It may be partially imported
/// (i.e., this function is not atomic).
pub async fn check_availability_and_maybe_import(
async fn process_availability(
self: &Arc<Self>,
slot: Slot,
cache_fn: impl FnOnce(Arc<Self>) -> Result<Availability<T::EthSpec>, AvailabilityCheckError>,
availability: Availability<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let availability = cache_fn(self.clone())?;
match availability {
Availability::Available(block) => {
// This is the time since start of the slot where all the components of the block have become available
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map_or(false, |da_epoch| block_epoch >= da_epoch)
}

/// Returns `true` if the current epoch is greater than or equal to the `Deneb` epoch.
pub fn is_deneb(&self) -> bool {
self.slot_clock.now().map_or(false, |slot| {
self.spec.deneb_fork_epoch.map_or(false, |deneb_epoch| {
let now_epoch = slot.epoch(T::EthSpec::slots_per_epoch());
now_epoch >= deneb_epoch
})
})
}

/// Persist all in memory components to disk
pub fn persist_all(&self) -> Result<(), AvailabilityCheckError> {
self.availability_cache.write_all_to_disk()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_hash)) => {
debug!(
trace!(
self.log,
"Missing block components for gossip verified blob";
"slot" => %blob_slot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

let result = self
.chain
.check_availability_and_maybe_import(slot, |chain| {
chain
.data_availability_checker
.put_rpc_blobs(block_root, blobs)
})
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await;

// Sync handles these results
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,

fn new_request(&self) -> BlobsByRootRequest {
BlobsByRootRequest {
blob_ids: VariableList::from(self.requested_ids.clone()),
blob_ids: self.requested_ids.clone().into(),
}
}

Expand Down Expand Up @@ -402,7 +402,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,
Err(LookupVerifyError::UnrequestedBlobId)
} else {
// State should remain downloading until we receive the stream terminator.
self.requested_ids.retain(|id| *id != received_id);
self.requested_ids.remove(&received_id);
let blob_index = blob.index;

if blob_index >= T::EthSpec::max_blobs_per_block() as u64 {
Expand Down
93 changes: 70 additions & 23 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub struct BlockLookups<T: BeaconChainTypes> {

single_block_lookups: FnvHashMap<Id, SingleBlockLookup<Current, T>>,

da_checker: Arc<DataAvailabilityChecker<T>>,
pub(crate) da_checker: Arc<DataAvailabilityChecker<T>>,

/// The logger for the import manager.
log: Logger,
Expand Down Expand Up @@ -126,10 +126,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
cx: &mut SyncNetworkContext<T>,
) {
let lookup = self.new_current_lookup(block_root, None, &[peer_source], cx);

if let Some(lookup) = lookup {
let msg = "Searching for block";
lookup_creation_logging(msg, &lookup, peer_source, &self.log);
self.trigger_single_lookup(lookup, cx);
}
}

/// Creates a lookup for the block with the given `block_root`.
///
/// The request is not immediately triggered, and should be triggered by a call to
Expand All @@ -142,6 +146,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
) {
let lookup = self.new_current_lookup(block_root, None, &[peer_source], cx);
if let Some(lookup) = lookup {
let msg = "Initialized delayed lookup for block";
lookup_creation_logging(msg, &lookup, peer_source, &self.log);
self.add_single_lookup(lookup)
}
}
Expand All @@ -155,13 +161,18 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn search_child_block(
&mut self,
block_root: Hash256,
child_components: Option<CachedChildComponents<T::EthSpec>>,
peer_source: &[PeerShouldHave],
child_components: CachedChildComponents<T::EthSpec>,
peer_source: PeerShouldHave,
cx: &mut SyncNetworkContext<T>,
) {
let lookup = self.new_current_lookup(block_root, child_components, peer_source, cx);
if let Some(lookup) = lookup {
self.trigger_single_lookup(lookup, cx);
if child_components.is_missing_components() {
let lookup =
self.new_current_lookup(block_root, Some(child_components), &[peer_source], cx);
if let Some(lookup) = lookup {
let msg = "Searching for components of a block with unknown parent";
lookup_creation_logging(msg, &lookup, peer_source, &self.log);
self.trigger_single_lookup(lookup, cx);
}
}
}

Expand All @@ -175,13 +186,18 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn search_child_delayed(
&mut self,
block_root: Hash256,
child_components: Option<CachedChildComponents<T::EthSpec>>,
peer_source: &[PeerShouldHave],
child_components: CachedChildComponents<T::EthSpec>,
peer_source: PeerShouldHave,
cx: &mut SyncNetworkContext<T>,
) {
let lookup = self.new_current_lookup(block_root, child_components, peer_source, cx);
if let Some(lookup) = lookup {
self.add_single_lookup(lookup)
if child_components.is_missing_components() {
let lookup =
self.new_current_lookup(block_root, Some(child_components), &[peer_source], cx);
if let Some(lookup) = lookup {
let msg = "Initialized delayed lookup for block with unknown parent";
lookup_creation_logging(msg, &lookup, peer_source, &self.log);
self.add_single_lookup(lookup)
}
}
}

Expand Down Expand Up @@ -218,6 +234,22 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn trigger_lookup_by_root(&mut self, block_root: Hash256, cx: &SyncNetworkContext<T>) {
self.single_block_lookups.retain(|_id, lookup| {
if lookup.block_root() == block_root {
if lookup.da_checker.is_deneb() {
let blob_indices = lookup.blob_request_state.requested_ids.indices();
debug!(
self.log,
"Triggering delayed single lookup";
"block" => ?block_root,
"blob_indices" => ?blob_indices
);
} else {
debug!(
self.log,
"Triggering delayed single lookup";
"block" => ?block_root,
);
}

if let Err(e) = lookup.request_block_and_blobs(cx) {
debug!(self.log, "Delayed single block lookup failed";
"error" => ?e,
Expand Down Expand Up @@ -271,13 +303,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
return None;
}

debug!(
self.log,
"Searching for block";
"peer_id" => ?peers,
"block" => ?block_root
);

Some(SingleBlockLookup::new(
block_root,
child_components,
Expand Down Expand Up @@ -583,7 +608,6 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut parent_lookup,
) {
Ok(()) => {
debug!(self.log, "Requesting parent"; &parent_lookup);
self.parent_lookups.push(parent_lookup);
}
Err(e) => {
Expand Down Expand Up @@ -1435,10 +1459,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Err(e) => {
self.handle_parent_request_error(&mut parent_lookup, cx, e);
}
Ok(_) => {
debug!(self.log, "Requesting parent"; &parent_lookup);
self.parent_lookups.push(parent_lookup)
}
Ok(_) => self.parent_lookups.push(parent_lookup),
}

// We remove and add back again requests so we want this updated regardless of outcome.
Expand All @@ -1460,3 +1481,29 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.parent_lookups.drain(..).len()
}
}

fn lookup_creation_logging<L: Lookup, T: BeaconChainTypes>(
msg: &str,
lookup: &SingleBlockLookup<L, T>,
peer_source: PeerShouldHave,
log: &Logger,
) {
let block_root = lookup.block_root();
if lookup.da_checker.is_deneb() {
let blob_indices = lookup.blob_request_state.requested_ids.indices();
debug!(
log,
"{}", msg;
"peer_id" => ?peer_source,
"block" => ?block_root,
"blob_indices" => ?blob_indices
);
} else {
debug!(
log,
"{}", msg;
"peer_id" => ?peer_source,
"block" => ?block_root,
);
}
}
6 changes: 3 additions & 3 deletions beacon_node/network/src/sync/block_lookups/parent_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
) -> Result<Option<R::VerifiedResponseType>, ParentVerifyError> {
let expected_block_root = self.current_parent_request.block_root();
let request_state = R::request_state_mut(&mut self.current_parent_request);
let root_and_block = request_state.verify_response(expected_block_root, block)?;
let root_and_verified = request_state.verify_response(expected_block_root, block)?;

// check if the parent of this block isn't in the failed cache. If it is, this chain should
// be dropped and the peer downscored.
if let Some(parent_root) = root_and_block
if let Some(parent_root) = root_and_verified
.as_ref()
.and_then(|block| R::get_parent_root(block))
{
Expand All @@ -207,7 +207,7 @@ impl<T: BeaconChainTypes> ParentLookup<T> {
}
}

Ok(root_and_block)
Ok(root_and_verified)
}

pub fn add_peers(&mut self, peer_source: &[PeerShouldHave]) {
Expand Down
Loading

0 comments on commit 02c7a2e

Please sign in to comment.