diff --git a/fork_choice_control/src/controller.rs b/fork_choice_control/src/controller.rs index 0e72aa1c..d1599403 100644 --- a/fork_choice_control/src/controller.rs +++ b/fork_choice_control/src/controller.rs @@ -226,13 +226,12 @@ where self.spawn_data_column_sidecar_task_with_wait_group( wait_group, data_column_sidecar, - true, DataColumnSidecarOrigin::Own, ) } pub fn on_api_data_column_sidecar(&self, data_column_sidecar: Arc<DataColumnSidecar<P>>) { - self.spawn_data_column_sidecar_task(data_column_sidecar, true, DataColumnSidecarOrigin::Api) + self.spawn_data_column_sidecar_task(data_column_sidecar, DataColumnSidecarOrigin::Api) } pub fn on_api_block( @@ -410,11 +409,9 @@ where blob_sidecar: Arc<DataColumnSidecar<P>>, subnet_id: SubnetId, gossip_id: GossipId, - block_seen: bool, ) { self.spawn_data_column_sidecar_task( blob_sidecar, - block_seen, DataColumnSidecarOrigin::Gossip(subnet_id, gossip_id), ) } @@ -440,7 +437,6 @@ where pub fn on_requested_data_column_sidecar( &self, data_column_sidecar: Arc<DataColumnSidecar<P>>, - block_seen: bool, peer_id: PeerId, ) { self.spawn(DataColumnSidecarTask { @@ -448,7 +444,6 @@ where mutator_tx: self.owned_mutator_tx(), wait_group: self.owned_wait_group(), data_column_sidecar, - block_seen, origin: DataColumnSidecarOrigin::Requested(peer_id), submission_time: Instant::now(), metrics: self.metrics.clone(), @@ -505,18 +500,14 @@ where }) } - // data_column_sidecar zemiau - fn spawn_data_column_sidecar_task( &self, data_column_sidecar: Arc<DataColumnSidecar<P>>, - block_seen: bool, origin: DataColumnSidecarOrigin, ) { self.spawn_data_column_sidecar_task_with_wait_group( self.owned_wait_group(), data_column_sidecar, - block_seen, origin, ) } @@ -525,7 +516,6 @@ where &self, wait_group: W, data_column_sidecar: Arc<DataColumnSidecar<P>>, - block_seen: bool, origin: DataColumnSidecarOrigin, ) { self.spawn(DataColumnSidecarTask { @@ -533,7 +523,6 @@ where mutator_tx: self.owned_mutator_tx(), wait_group, data_column_sidecar, - block_seen, origin, submission_time: Instant::now(), metrics: self.metrics.clone(), diff --git a/fork_choice_control/src/messages.rs b/fork_choice_control/src/messages.rs index 5c0751ef..eecf7df9 100644 --- a/fork_choice_control/src/messages.rs +++ b/fork_choice_control/src/messages.rs @@ -100,7 +100,6 @@ pub enum MutatorMessage<P: Preset, W> { wait_group: W, result: Result<DataColumnSidecarAction<P>>, origin: DataColumnSidecarOrigin, - block_seen: bool, submission_time: Instant, }, FinishedPersistingBlobSidecars { diff --git a/fork_choice_control/src/misc.rs b/fork_choice_control/src/misc.rs index 673746d2..4ca813fd 100644 --- a/fork_choice_control/src/misc.rs +++ b/fork_choice_control/src/misc.rs @@ -6,13 +6,14 @@ use educe::Educe; use eth2_libp2p::GossipId; use fork_choice_store::{ AggregateAndProofAction, AggregateAndProofOrigin, AttestationAction, AttestationOrigin, - BlobSidecarOrigin, BlockOrigin, ChainLink, + BlobSidecarOrigin, BlockOrigin, ChainLink, DataColumnSidecarOrigin, }; use serde::Serialize; use strum::IntoStaticStr; use types::{ combined::SignedBeaconBlock, deneb::containers::BlobSidecar, + eip7594::DataColumnSidecar, phase0::{ containers::{Attestation, SignedAggregateAndProof}, primitives::ValidatorIndex, @@ -30,6 +31,7 @@ pub struct Delayed<P: Preset> { pub aggregates: Vec<PendingAggregateAndProof<P>>, pub attestations: Vec<PendingAttestation<P>>, pub blob_sidecars: Vec<PendingBlobSidecar<P>>, + pub data_column_sidecars: Vec<PendingDataColumnSidecar<P>>, } impl<P: Preset> Delayed<P> { @@ -40,12 +42,14 @@ impl<P: Preset> Delayed<P> { aggregates, attestations, blob_sidecars, + data_column_sidecars, } = self; blocks.is_empty() && aggregates.is_empty() && attestations.is_empty() && blob_sidecars.is_empty() + && data_column_sidecars.is_empty() } } @@ -109,6 +113,13 @@ pub struct PendingBlobSidecar<P: Preset> { pub submission_time: Instant, } +#[derive(Debug)] +pub struct PendingDataColumnSidecar<P: Preset> { + pub data_column_sidecar: Arc<DataColumnSidecar<P>>, + pub origin: DataColumnSidecarOrigin, + pub submission_time: Instant, +} + pub struct VerifyAggregateAndProofResult<P: Preset> { pub result: Result<AggregateAndProofAction<P>>, pub origin: AggregateAndProofOrigin<GossipId>, diff --git a/fork_choice_control/src/mutator.rs b/fork_choice_control/src/mutator.rs index afe5e327..e38cbc74 100644 --- a/fork_choice_control/src/mutator.rs +++ b/fork_choice_control/src/mutator.rs @@ -63,14 +63,14 @@ use crate::{ messages::{MutatorMessage, P2pMessage, SubnetMessage, SyncMessage, ValidatorMessage}, misc::{ Delayed, MutatorRejectionReason, PendingAggregateAndProof, PendingAttestation, - PendingBlobSidecar, PendingBlock, PendingChainLink, VerifyAggregateAndProofResult, - VerifyAttestationResult, WaitingForCheckpointState, + PendingBlobSidecar, PendingBlock, PendingChainLink, PendingDataColumnSidecar, + VerifyAggregateAndProofResult, VerifyAttestationResult, WaitingForCheckpointState, }, state_cache::StateCache, storage::Storage, tasks::{ AggregateAndProofTask, AttestationTask, BlobSidecarTask, BlockAttestationsTask, BlockTask, - CheckpointStateTask, PersistBlobSidecarsTask, PreprocessStateTask, + CheckpointStateTask, DataColumnSidecarTask, PersistBlobSidecarsTask, PreprocessStateTask, }, thread_pool::{Spawn, ThreadPool}, unbounded_sink::UnboundedSink, @@ -234,16 +234,9 @@ where MutatorMessage::DataColumnSidecar { wait_group, result, - block_seen, - origin, - submission_time, - } => self.handle_data_column_sidecar( - wait_group, - result, - block_seen, origin, submission_time, - ), + } => self.handle_data_column_sidecar(wait_group, result, origin, submission_time), MutatorMessage::FinishedPersistingBlobSidecars { wait_group, persisted_blob_ids, @@ -1137,7 +1130,6 @@ where &mut self, wait_group: W, result: Result<DataColumnSidecarAction<P>>, - block_seen: bool, origin: DataColumnSidecarOrigin, submission_time: Instant, ) { @@ -1154,6 +1146,44 @@ where P2pMessage::Ignore(gossip_id).send(&self.p2p_tx); } } + Ok(DataColumnSidecarAction::DelayUntilParent(data_column_sidecar)) => { + let parent_root = data_column_sidecar.signed_block_header.message.parent_root; + + let pending_data_column_sidecar = PendingDataColumnSidecar { + data_column_sidecar, + origin, + submission_time, + }; + + if self.store.contains_block(parent_root) { + self.retry_data_column_sidecar(wait_group, pending_data_column_sidecar); + } else { + debug!("data column sidecar delayed until block parent: {parent_root:?}"); + + let peer_id = pending_data_column_sidecar.origin.peer_id(); + + P2pMessage::BlockNeeded(parent_root, peer_id).send(&self.p2p_tx); + + self.delay_data_column_sidecar_until_parent(pending_data_column_sidecar); + } + } + Ok(DataColumnSidecarAction::DelayUntilSlot(data_column_sidecar)) => { + let slot = data_column_sidecar.signed_block_header.message.slot; + + let pending_data_column_sidecar = PendingDataColumnSidecar { + data_column_sidecar, + origin, + submission_time, + }; + + if slot <= self.store.slot() { + self.retry_data_column_sidecar(wait_group, pending_data_column_sidecar); + } else { + debug!("data column sidecar delayed until slot: {slot}"); + + self.delay_data_column_sidecar_until_slot(pending_data_column_sidecar); + } + } Err(error) => { warn!("data column sidecar rejected (error: {error}, origin: {origin:?})"); @@ -1978,6 +2008,40 @@ where .push(pending_blob_sidecar); } + fn delay_data_column_sidecar_until_parent( + &mut self, + pending_data_column_sidecar: PendingDataColumnSidecar<P>, + ) { + self.delayed_until_block + .entry( + pending_data_column_sidecar + .data_column_sidecar + .signed_block_header + .message + .parent_root, + ) + .or_default() + .data_column_sidecars + .push(pending_data_column_sidecar); + } + + fn delay_data_column_sidecar_until_slot( + &mut self, + pending_data_column_sidecar: PendingDataColumnSidecar<P>, + ) { + self.delayed_until_slot + .entry( + pending_data_column_sidecar + .data_column_sidecar + .signed_block_header + .message + .slot, + ) + .or_default() + .data_column_sidecars + .push(pending_data_column_sidecar); + } + fn take_delayed_until_block(&mut self, block_root: H256) -> Option<Delayed<P>> { self.delayed_until_block.remove(&block_root) } @@ -2001,6 +2065,7 @@ where aggregates, attestations, blob_sidecars, + data_column_sidecars, } = delayed; for pending_block in blocks { @@ -2018,6 +2083,10 @@ where for pending_blob_sidecar in blob_sidecars { self.retry_blob_sidecar(wait_group.clone(), pending_blob_sidecar); } + + for pending_data_column_sidecar in data_column_sidecars { + self.retry_data_column_sidecar(wait_group.clone(), pending_data_column_sidecar); + } } fn retry_block(&self, wait_group: W, pending_block: PendingBlock<P>) { @@ -2117,6 +2186,30 @@ where }); } + fn retry_data_column_sidecar( + &self, + wait_group: W, + pending_data_column_sidecar: PendingDataColumnSidecar<P>, + ) { + debug!("retrying delayed data column sidecar: {pending_data_column_sidecar:?}"); + + let PendingDataColumnSidecar { + data_column_sidecar, + origin, + submission_time, + } = pending_data_column_sidecar; + + self.spawn(DataColumnSidecarTask { + store_snapshot: self.owned_store(), + mutator_tx: self.owned_mutator_tx(), + wait_group, + data_column_sidecar, + origin, + submission_time, + metrics: self.metrics.clone(), + }); + } + // Some objects may be delayed until a block that is itself delayed. // If the latter is pruned, objects depending on it could be pruned as well. // We don't bother doing this. It's tricky to implement and might not even be worth it. @@ -2136,6 +2229,7 @@ where aggregates, attestations, blob_sidecars, + data_column_sidecars, } = delayed; gossip_ids.extend( @@ -2183,6 +2277,16 @@ where .filter_map(|pending| pending.origin.gossip_id()), ); + gossip_ids.extend( + data_column_sidecars + .drain_filter(|pending| { + // The parent of a delayed block cannot be in a finalized slot. + pending.data_column_sidecar.signed_block_header.message.slot - 1 + <= finalized_slot + }) + .filter_map(|pending| pending.origin.gossip_id()), + ); + !delayed.is_empty() }); diff --git a/fork_choice_control/src/tasks.rs b/fork_choice_control/src/tasks.rs index 36b2c547..465390b5 100644 --- a/fork_choice_control/src/tasks.rs +++ b/fork_choice_control/src/tasks.rs @@ -331,14 +331,11 @@ impl<P: Preset, W> Run for BlobSidecarTask<P, W> { } } -// data column sidecar zemiau - pub struct DataColumnSidecarTask<P: Preset, W> { pub store_snapshot: Arc<Store<P>>, pub mutator_tx: Sender<MutatorMessage<P, W>>, pub wait_group: W, pub data_column_sidecar: Arc<DataColumnSidecar<P>>, - pub block_seen: bool, pub origin: DataColumnSidecarOrigin, pub submission_time: Instant, pub metrics: Option<Arc<Metrics>>, @@ -351,7 +348,6 @@ impl<P: Preset, W> Run for DataColumnSidecarTask<P, W> { mutator_tx, wait_group, data_column_sidecar, - block_seen, origin, submission_time, metrics, @@ -373,7 +369,6 @@ impl<P: Preset, W> Run for DataColumnSidecarTask<P, W> { MutatorMessage::DataColumnSidecar { wait_group, result, - block_seen, origin, submission_time, } diff --git a/fork_choice_store/src/misc.rs b/fork_choice_store/src/misc.rs index bd7528f2..ca5b9f5d 100644 --- a/fork_choice_store/src/misc.rs +++ b/fork_choice_store/src/misc.rs @@ -563,6 +563,8 @@ pub enum BlobSidecarAction<P: Preset> { pub enum DataColumnSidecarAction<P: Preset> { Accept(Arc<DataColumnSidecar<P>>), Ignore, + DelayUntilParent(Arc<DataColumnSidecar<P>>), + DelayUntilSlot(Arc<DataColumnSidecar<P>>), } pub enum PartialBlockAction { diff --git a/fork_choice_store/src/store.rs b/fork_choice_store/src/store.rs index f1615ec2..899575a5 100644 --- a/fork_choice_store/src/store.rs +++ b/fork_choice_store/src/store.rs @@ -1878,7 +1878,9 @@ impl<P: Preset> Store<P> { // [IGNORE] The sidecar's block's parent (defined by block_header.parent_root) has been seen (via both gossip and non-gossip sources) (a client MAY queue sidecars for processing once the parent block is retrieved). let Some(parent) = self.chain_link(block_header.parent_root) else { - return Ok(DataColumnSidecarAction::Ignore); + return Ok(DataColumnSidecarAction::DelayUntilParent( + data_column_sidecar, + )); }; // [REJECT] The sidecar's block's parent (defined by block_header.parent_root) passes validation. diff --git a/p2p/src/block_sync_service.rs b/p2p/src/block_sync_service.rs index 86e7e105..4043ea10 100644 --- a/p2p/src/block_sync_service.rs +++ b/p2p/src/block_sync_service.rs @@ -309,8 +309,8 @@ impl<P: Preset> BlockSyncService<P> { } } } - P2pToSync::RequestedDataColumnSidecar(data_column_sidecar, block_seen, peer_id) => { - self.controller.on_requested_data_column_sidecar(data_column_sidecar, block_seen, peer_id); + P2pToSync::RequestedDataColumnSidecar(data_column_sidecar, peer_id) => { + self.controller.on_requested_data_column_sidecar(data_column_sidecar, peer_id); } P2pToSync::BlobsByRangeRequestFinished(request_id) => { self.sync_manager.blobs_by_range_request_finished(request_id); @@ -359,7 +359,7 @@ impl<P: Preset> BlockSyncService<P> { self.sync_manager.block_by_root_request_finished(block_root); self.request_blobs_and_blocks_if_ready()?; } - //TODO(feature/eip7549) + //TODO(feature/eip-7594) P2pToSync::DataColumnsByRangeRequestFinished(request_id) => { // self.sync_manager.blobs_by_range_request_finished(request_id); // self.request_blobs_and_blocks_if_ready()?; diff --git a/p2p/src/messages.rs b/p2p/src/messages.rs index 8af9f297..b3594d15 100644 --- a/p2p/src/messages.rs +++ b/p2p/src/messages.rs @@ -61,7 +61,7 @@ pub enum P2pToSync<P: Preset> { DataColumnsNeeded(Vec<DataColumnIdentifier>, Slot, Option<PeerId>), RequestedBlobSidecar(Arc<BlobSidecar<P>>, bool, PeerId), RequestedBlock((Arc<SignedBeaconBlock<P>>, PeerId, RequestId)), - RequestedDataColumnSidecar(Arc<DataColumnSidecar<P>>, bool, PeerId), + RequestedDataColumnSidecar(Arc<DataColumnSidecar<P>>, PeerId), BlobsByRangeRequestFinished(RequestId), BlobsByRootChunkReceived(BlobIdentifier, PeerId, RequestId), BlocksByRangeRequestFinished(RequestId), diff --git a/p2p/src/network.rs b/p2p/src/network.rs index c5935072..82aaffcc 100644 --- a/p2p/src/network.rs +++ b/p2p/src/network.rs @@ -1266,7 +1266,7 @@ impl<P: Preset> Network<P> { let controller = self.controller.clone_arc(); - // TODO(feature/eip7594): MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS + // TODO(feature/eip-7594): MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS // Let data_column_serve_range be // [max(current_epoch - MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS, EIP7594_FORK_EPOCH), current_epoch]. let start_slot = start_slot.max(misc::compute_start_slot_at_epoch::<P>( @@ -1642,7 +1642,7 @@ impl<P: Preset> Network<P> { ), ); } - // TODO(feature/eip7594): This appears to be unfinished. + // TODO(feature/eip-7594): This appears to be unfinished. // > Before consuming the next response chunk, the response reader SHOULD verify the // > data column sidecar is well-formatted, has valid inclusion proof, and is correct w.r.t. the expected KZG commitments Response::DataColumnsByRange(Some(data_column_sidecar)) => { @@ -1662,11 +1662,7 @@ impl<P: Preset> Network<P> { data_column_identifier, data_column_sidecar_slot, ) { - let block_seen = self - .received_block_roots - .contains_key(&data_column_identifier.block_root); - - P2pToSync::RequestedDataColumnSidecar(data_column_sidecar, block_seen, peer_id) + P2pToSync::RequestedDataColumnSidecar(data_column_sidecar, peer_id) .send(&self.channels.p2p_to_sync_tx); } } @@ -1805,18 +1801,10 @@ impl<P: Preset> Network<P> { let epoch = misc::compute_epoch_at_slot::<P>(data_column_sidecar.slot()); if chain_config.is_eip7594_fork(epoch) { - let block_seen = self.received_block_roots.contains_key( - &data_column_sidecar - .signed_block_header - .message - .hash_tree_root(), - ); - self.controller.on_gossip_data_column_sidecar( data_column_sidecar, subnet_id, GossipId { source, message_id }, - block_seen, ); } else { self.log_with_feature(format_args!(