Skip to content

Commit

Permalink
Delay data column sidecars and retry processing them when conditions …
Browse files Browse the repository at this point in the history
…allow
  • Loading branch information
povi committed May 16, 2024
1 parent 6023480 commit 4831e59
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 51 deletions.
13 changes: 1 addition & 12 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,12 @@ where
self.spawn_data_column_sidecar_task_with_wait_group(
wait_group,
data_column_sidecar,
true,
DataColumnSidecarOrigin::Own,
)
}

pub fn on_api_data_column_sidecar(&self, data_column_sidecar: Arc<DataColumnSidecar<P>>) {
self.spawn_data_column_sidecar_task(data_column_sidecar, true, DataColumnSidecarOrigin::Api)
self.spawn_data_column_sidecar_task(data_column_sidecar, DataColumnSidecarOrigin::Api)
}

pub fn on_api_block(
Expand Down Expand Up @@ -410,11 +409,9 @@ where
blob_sidecar: Arc<DataColumnSidecar<P>>,
subnet_id: SubnetId,
gossip_id: GossipId,
block_seen: bool,
) {
self.spawn_data_column_sidecar_task(
blob_sidecar,
block_seen,
DataColumnSidecarOrigin::Gossip(subnet_id, gossip_id),
)
}
Expand All @@ -440,15 +437,13 @@ where
pub fn on_requested_data_column_sidecar(
&self,
data_column_sidecar: Arc<DataColumnSidecar<P>>,
block_seen: bool,
peer_id: PeerId,
) {
self.spawn(DataColumnSidecarTask {
store_snapshot: self.owned_store_snapshot(),
mutator_tx: self.owned_mutator_tx(),
wait_group: self.owned_wait_group(),
data_column_sidecar,
block_seen,
origin: DataColumnSidecarOrigin::Requested(peer_id),
submission_time: Instant::now(),
metrics: self.metrics.clone(),
Expand Down Expand Up @@ -505,18 +500,14 @@ where
})
}

// data_column_sidecar zemiau

fn spawn_data_column_sidecar_task(
&self,
data_column_sidecar: Arc<DataColumnSidecar<P>>,
block_seen: bool,
origin: DataColumnSidecarOrigin,
) {
self.spawn_data_column_sidecar_task_with_wait_group(
self.owned_wait_group(),
data_column_sidecar,
block_seen,
origin,
)
}
Expand All @@ -525,15 +516,13 @@ where
&self,
wait_group: W,
data_column_sidecar: Arc<DataColumnSidecar<P>>,
block_seen: bool,
origin: DataColumnSidecarOrigin,
) {
self.spawn(DataColumnSidecarTask {
store_snapshot: self.owned_store_snapshot(),
mutator_tx: self.owned_mutator_tx(),
wait_group,
data_column_sidecar,
block_seen,
origin,
submission_time: Instant::now(),
metrics: self.metrics.clone(),
Expand Down
1 change: 0 additions & 1 deletion fork_choice_control/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ pub enum MutatorMessage<P: Preset, W> {
wait_group: W,
result: Result<DataColumnSidecarAction<P>>,
origin: DataColumnSidecarOrigin,
block_seen: bool,
submission_time: Instant,
},
FinishedPersistingBlobSidecars {
Expand Down
13 changes: 12 additions & 1 deletion fork_choice_control/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use educe::Educe;
use eth2_libp2p::GossipId;
use fork_choice_store::{
AggregateAndProofAction, AggregateAndProofOrigin, AttestationAction, AttestationOrigin,
BlobSidecarOrigin, BlockOrigin, ChainLink,
BlobSidecarOrigin, BlockOrigin, ChainLink, DataColumnSidecarOrigin,
};
use serde::Serialize;
use strum::IntoStaticStr;
use types::{
combined::SignedBeaconBlock,
deneb::containers::BlobSidecar,
eip7594::DataColumnSidecar,
phase0::{
containers::{Attestation, SignedAggregateAndProof},
primitives::ValidatorIndex,
Expand All @@ -30,6 +31,7 @@ pub struct Delayed<P: Preset> {
pub aggregates: Vec<PendingAggregateAndProof<P>>,
pub attestations: Vec<PendingAttestation<P>>,
pub blob_sidecars: Vec<PendingBlobSidecar<P>>,
pub data_column_sidecars: Vec<PendingDataColumnSidecar<P>>,
}

impl<P: Preset> Delayed<P> {
Expand All @@ -40,12 +42,14 @@ impl<P: Preset> Delayed<P> {
aggregates,
attestations,
blob_sidecars,
data_column_sidecars,
} = self;

blocks.is_empty()
&& aggregates.is_empty()
&& attestations.is_empty()
&& blob_sidecars.is_empty()
&& data_column_sidecars.is_empty()
}
}

Expand Down Expand Up @@ -109,6 +113,13 @@ pub struct PendingBlobSidecar<P: Preset> {
pub submission_time: Instant,
}

#[derive(Debug)]
pub struct PendingDataColumnSidecar<P: Preset> {
pub data_column_sidecar: Arc<DataColumnSidecar<P>>,
pub origin: DataColumnSidecarOrigin,
pub submission_time: Instant,
}

pub struct VerifyAggregateAndProofResult<P: Preset> {
pub result: Result<AggregateAndProofAction<P>>,
pub origin: AggregateAndProofOrigin<GossipId>,
Expand Down
128 changes: 116 additions & 12 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ use crate::{
messages::{MutatorMessage, P2pMessage, SubnetMessage, SyncMessage, ValidatorMessage},
misc::{
Delayed, MutatorRejectionReason, PendingAggregateAndProof, PendingAttestation,
PendingBlobSidecar, PendingBlock, PendingChainLink, VerifyAggregateAndProofResult,
VerifyAttestationResult, WaitingForCheckpointState,
PendingBlobSidecar, PendingBlock, PendingChainLink, PendingDataColumnSidecar,
VerifyAggregateAndProofResult, VerifyAttestationResult, WaitingForCheckpointState,
},
state_cache::StateCache,
storage::Storage,
tasks::{
AggregateAndProofTask, AttestationTask, BlobSidecarTask, BlockAttestationsTask, BlockTask,
CheckpointStateTask, PersistBlobSidecarsTask, PreprocessStateTask,
CheckpointStateTask, DataColumnSidecarTask, PersistBlobSidecarsTask, PreprocessStateTask,
},
thread_pool::{Spawn, ThreadPool},
unbounded_sink::UnboundedSink,
Expand Down Expand Up @@ -234,16 +234,9 @@ where
MutatorMessage::DataColumnSidecar {
wait_group,
result,
block_seen,
origin,
submission_time,
} => self.handle_data_column_sidecar(
wait_group,
result,
block_seen,
origin,
submission_time,
),
} => self.handle_data_column_sidecar(wait_group, result, origin, submission_time),
MutatorMessage::FinishedPersistingBlobSidecars {
wait_group,
persisted_blob_ids,
Expand Down Expand Up @@ -1137,7 +1130,6 @@ where
&mut self,
wait_group: W,
result: Result<DataColumnSidecarAction<P>>,
block_seen: bool,
origin: DataColumnSidecarOrigin,
submission_time: Instant,
) {
Expand All @@ -1154,6 +1146,44 @@ where
P2pMessage::Ignore(gossip_id).send(&self.p2p_tx);
}
}
Ok(DataColumnSidecarAction::DelayUntilParent(data_column_sidecar)) => {
let parent_root = data_column_sidecar.signed_block_header.message.parent_root;

let pending_data_column_sidecar = PendingDataColumnSidecar {
data_column_sidecar,
origin,
submission_time,
};

if self.store.contains_block(parent_root) {
self.retry_data_column_sidecar(wait_group, pending_data_column_sidecar);
} else {
debug!("data column sidecar delayed until block parent: {parent_root:?}");

let peer_id = pending_data_column_sidecar.origin.peer_id();

P2pMessage::BlockNeeded(parent_root, peer_id).send(&self.p2p_tx);

self.delay_data_column_sidecar_until_parent(pending_data_column_sidecar);
}
}
Ok(DataColumnSidecarAction::DelayUntilSlot(data_column_sidecar)) => {
let slot = data_column_sidecar.signed_block_header.message.slot;

let pending_data_column_sidecar = PendingDataColumnSidecar {
data_column_sidecar,
origin,
submission_time,
};

if slot <= self.store.slot() {
self.retry_data_column_sidecar(wait_group, pending_data_column_sidecar);
} else {
debug!("data column sidecar delayed until slot: {slot}");

self.delay_data_column_sidecar_until_slot(pending_data_column_sidecar);
}
}
Err(error) => {
warn!("data column sidecar rejected (error: {error}, origin: {origin:?})");

Expand Down Expand Up @@ -1978,6 +2008,40 @@ where
.push(pending_blob_sidecar);
}

fn delay_data_column_sidecar_until_parent(
&mut self,
pending_data_column_sidecar: PendingDataColumnSidecar<P>,
) {
self.delayed_until_block
.entry(
pending_data_column_sidecar
.data_column_sidecar
.signed_block_header
.message
.parent_root,
)
.or_default()
.data_column_sidecars
.push(pending_data_column_sidecar);
}

fn delay_data_column_sidecar_until_slot(
&mut self,
pending_data_column_sidecar: PendingDataColumnSidecar<P>,
) {
self.delayed_until_slot
.entry(
pending_data_column_sidecar
.data_column_sidecar
.signed_block_header
.message
.slot,
)
.or_default()
.data_column_sidecars
.push(pending_data_column_sidecar);
}

fn take_delayed_until_block(&mut self, block_root: H256) -> Option<Delayed<P>> {
self.delayed_until_block.remove(&block_root)
}
Expand All @@ -2001,6 +2065,7 @@ where
aggregates,
attestations,
blob_sidecars,
data_column_sidecars,
} = delayed;

for pending_block in blocks {
Expand All @@ -2018,6 +2083,10 @@ where
for pending_blob_sidecar in blob_sidecars {
self.retry_blob_sidecar(wait_group.clone(), pending_blob_sidecar);
}

for pending_data_column_sidecar in data_column_sidecars {
self.retry_data_column_sidecar(wait_group.clone(), pending_data_column_sidecar);
}
}

fn retry_block(&self, wait_group: W, pending_block: PendingBlock<P>) {
Expand Down Expand Up @@ -2117,6 +2186,30 @@ where
});
}

fn retry_data_column_sidecar(
&self,
wait_group: W,
pending_data_column_sidecar: PendingDataColumnSidecar<P>,
) {
debug!("retrying delayed data column sidecar: {pending_data_column_sidecar:?}");

let PendingDataColumnSidecar {
data_column_sidecar,
origin,
submission_time,
} = pending_data_column_sidecar;

self.spawn(DataColumnSidecarTask {
store_snapshot: self.owned_store(),
mutator_tx: self.owned_mutator_tx(),
wait_group,
data_column_sidecar,
origin,
submission_time,
metrics: self.metrics.clone(),
});
}

// Some objects may be delayed until a block that is itself delayed.
// If the latter is pruned, objects depending on it could be pruned as well.
// We don't bother doing this. It's tricky to implement and might not even be worth it.
Expand All @@ -2136,6 +2229,7 @@ where
aggregates,
attestations,
blob_sidecars,
data_column_sidecars,
} = delayed;

gossip_ids.extend(
Expand Down Expand Up @@ -2183,6 +2277,16 @@ where
.filter_map(|pending| pending.origin.gossip_id()),
);

gossip_ids.extend(
data_column_sidecars
.drain_filter(|pending| {
// The parent of a delayed block cannot be in a finalized slot.
pending.data_column_sidecar.signed_block_header.message.slot - 1
<= finalized_slot
})
.filter_map(|pending| pending.origin.gossip_id()),
);

!delayed.is_empty()
});

Expand Down
5 changes: 0 additions & 5 deletions fork_choice_control/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,11 @@ impl<P: Preset, W> Run for BlobSidecarTask<P, W> {
}
}

// data column sidecar zemiau

pub struct DataColumnSidecarTask<P: Preset, W> {
pub store_snapshot: Arc<Store<P>>,
pub mutator_tx: Sender<MutatorMessage<P, W>>,
pub wait_group: W,
pub data_column_sidecar: Arc<DataColumnSidecar<P>>,
pub block_seen: bool,
pub origin: DataColumnSidecarOrigin,
pub submission_time: Instant,
pub metrics: Option<Arc<Metrics>>,
Expand All @@ -351,7 +348,6 @@ impl<P: Preset, W> Run for DataColumnSidecarTask<P, W> {
mutator_tx,
wait_group,
data_column_sidecar,
block_seen,
origin,
submission_time,
metrics,
Expand All @@ -373,7 +369,6 @@ impl<P: Preset, W> Run for DataColumnSidecarTask<P, W> {
MutatorMessage::DataColumnSidecar {
wait_group,
result,
block_seen,
origin,
submission_time,
}
Expand Down
2 changes: 2 additions & 0 deletions fork_choice_store/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,8 @@ pub enum BlobSidecarAction<P: Preset> {
pub enum DataColumnSidecarAction<P: Preset> {
Accept(Arc<DataColumnSidecar<P>>),
Ignore,
DelayUntilParent(Arc<DataColumnSidecar<P>>),
DelayUntilSlot(Arc<DataColumnSidecar<P>>),
}

pub enum PartialBlockAction {
Expand Down
4 changes: 3 additions & 1 deletion fork_choice_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1878,7 +1878,9 @@ impl<P: Preset> Store<P> {

// [IGNORE] The sidecar's block's parent (defined by block_header.parent_root) has been seen (via both gossip and non-gossip sources) (a client MAY queue sidecars for processing once the parent block is retrieved).
let Some(parent) = self.chain_link(block_header.parent_root) else {
return Ok(DataColumnSidecarAction::Ignore);
return Ok(DataColumnSidecarAction::DelayUntilParent(
data_column_sidecar,
));
};

// [REJECT] The sidecar's block's parent (defined by block_header.parent_root) passes validation.
Expand Down
Loading

0 comments on commit 4831e59

Please sign in to comment.