Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update libp2p, discv5 & fix sync with lighthouse full-node issue #45

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
797 changes: 355 additions & 442 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ dedicated_executor = { path = 'dedicated_executor' }
delay_map = '0.3.0'
derive_more = '0.99.17'
dirs = '5.0.1'
discv5 = { version = '0.4.1', features = ['libp2p'] }
discv5 = { version = '0.7.0', features = ['libp2p'] }
drain_filter_polyfill = '0.1.3'
duplicate = '1.0.0'
easy-ext = '1.0.1'
Expand Down Expand Up @@ -331,8 +331,8 @@ jwt-simple = { version = '0.12.6', default-features = false, features = ['pure-r
kzg = { git = 'https://github.com/grandinetech/rust-kzg.git', branch = 'integration-raw-2' }
lazy_static = '1.4.0'
libmdbx = { git = 'https://github.com/paradigmxyz/reth.git', package = 'reth-libmdbx', rev = '2d01f3608697eed05357fb847e25ad33ab59d702' }
libp2p = { version = '0.53', default-features = false, features = ['metrics', 'dns', 'ecdsa', 'identify', 'macros', 'noise', 'plaintext', 'secp256k1', 'serde', 'tcp', 'tokio', 'yamux', 'quic', 'upnp'] }
libp2p-mplex = '0.41.0'
libp2p = { version = '0.54', default-features = false, features = ['metrics', 'dns', 'ecdsa', 'identify', 'macros', 'noise', 'plaintext', 'secp256k1', 'serde', 'tcp', 'tokio', 'yamux', 'quic', 'upnp'] }
libp2p-mplex = '0.42.0'
log = '0.4.20'
lru = '0.12.2'
memoffset = '0.9.0'
Expand Down
56 changes: 41 additions & 15 deletions eip_7594/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ pub enum ExtendedSampleError {
AllowedFailtureOutOfRange { allowed_failures: u64 },
}

#[derive(Debug, Error)]
pub enum GetDataColumnSidecarsError {
#[error(
"Cells and proofs length {cells_length} does not match commitment length {commitments_length}"
)]
CellsCommitmentsLengthError {
cells_length: usize,
commitments_length: usize,
},
}

pub fn verify_kzg_proofs<P: Preset>(data_column_sidecar: &DataColumnSidecar<P>) -> Result<bool> {
let DataColumnSidecar {
index,
Expand Down Expand Up @@ -277,9 +288,12 @@ pub fn get_data_column_sidecars<P: Preset>(
) -> Result<Vec<DataColumnSidecar<P>>> {
let mut sidecars: Vec<DataColumnSidecar<P>> = Vec::new();
if let Some(post_deneb_beacon_block_body) = signed_block.message().body().post_deneb() {
let kzg_commitments_inclusion_proof =
kzg_commitment_inclusion_proof(post_deneb_beacon_block_body);
let signed_block_header = signed_block.to_header();
let kzg_commitments = post_deneb_beacon_block_body.blob_kzg_commitments();

if kzg_commitments.is_empty() {
return Ok(vec![]);
}

let kzg_settings = settings();
let cells_and_proofs = blobs
Expand All @@ -291,6 +305,15 @@ pub fn get_data_column_sidecars<P: Preset>(
.collect::<Result<Vec<_>>>()?;

let blob_count = cells_and_proofs.len();
ensure!(
kzg_commitments.len() == blob_count,
GetDataColumnSidecarsError::CellsCommitmentsLengthError {
cells_length: blob_count,
commitments_length: kzg_commitments.len(),
}
);
let kzg_commitments_inclusion_proof =
kzg_commitment_inclusion_proof(post_deneb_beacon_block_body);

for column_index in 0..NumberOfColumns::U64 {
let column_cells: Vec<CKzgCell> = (0..blob_count)
Expand All @@ -300,30 +323,33 @@ pub fn get_data_column_sidecars<P: Preset>(
let column_proofs: Vec<CKzgProof> = (0..blob_count)
.map(|row_index| cells_and_proofs[row_index].1[column_index as usize].clone())
.collect();

// let mut cont_cells = Vec::new();

let cells = column_cells.iter().map(|cell| cell.to_bytes());
// for cell in cells {
// let bytes = cell.into_iter();
// let v = ByteVector::from(ContiguousVector::try_from_iter(bytes)?);
// let v = Box::new(v);

let mut cont_cells = Vec::new();

for cell in cells {
let bytes = cell.into_iter();
let v = ByteVector::from(ContiguousVector::try_from_iter(bytes)?);
let v = Box::new(v);

cont_cells.push(v);
}
// cont_cells.push(v);
// }
let cells = column_cells
.iter()
.map(|cell| try_convert_ckzg_cell_to_cell(cell))
.collect::<Result<Vec<Cell>>>()?;

let proofs = column_proofs
.iter()
.map(|data| KzgProof::try_from(data.to_bytes().into_inner()).map_err(Into::into))
.map(|proof| KzgProof::try_from(proof.to_bytes().into_inner()).map_err(Into::into))
.collect::<Result<Vec<KzgProof>>>()?;

let column = ContiguousList::try_from_iter(cells.into_iter())?;
let kzg_proofs = ContiguousList::try_from_iter(proofs.into_iter())?;

sidecars.push(DataColumnSidecar {
index: column_index,
column: ContiguousList::try_from(cont_cells)?,
kzg_commitments: post_deneb_beacon_block_body.blob_kzg_commitments().clone(),
column,
kzg_commitments: kzg_commitments.clone(),
kzg_proofs,
signed_block_header,
kzg_commitments_inclusion_proof,
Expand Down
1 change: 0 additions & 1 deletion fork_choice_control/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ database = { workspace = true }
derive_more = { workspace = true }
drain_filter_polyfill = { workspace = true }
educe = { workspace = true }
eip_7594 = { workspace = true }
eth2_libp2p = { workspace = true }
execution_engine = { workspace = true }
features = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ use crate::{
storage::Storage,
tasks::{
AggregateAndProofTask, AttestationTask, AttesterSlashingTask, BlobSidecarTask, BlockTask,
StoreCustodyColumnsTask,
},
thread_pool::{Spawn, ThreadPool},
unbounded_sink::UnboundedSink,
Expand Down
5 changes: 4 additions & 1 deletion fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,10 @@ where
}

fn handle_store_custody_columns(&mut self, custody_columns: HashSet<ColumnIndex>) {
info!("storing custody columns: {custody_columns:?} for further data availability check");
info!(
"storing custody columns: [{}] for further data availability check",
custody_columns.iter().join(", "),
);

self.store_mut()
.store_custody_columns(custody_columns.into());
Expand Down
1 change: 0 additions & 1 deletion fork_choice_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ itertools = { workspace = true }
kzg_utils = { workspace = true }
log = { workspace = true }
prometheus_metrics = { workspace = true }
primitive-types = { workspace = true }
ssz = { workspace = true }
static_assertions = { workspace = true }
std_ext = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use rand::seq::SliceRandom;
use rand::thread_rng;
use slog::{o, Drain as _, Logger};
use slog_stdlog::StdLog;
use ssz::{ContiguousList, SszHash as _};
use ssz::ContiguousList;
use std_ext::ArcExt as _;
use thiserror::Error;
use typenum::Unsigned as _;
Expand Down
2 changes: 0 additions & 2 deletions p2p/src/network_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ struct NodeMetadata {
seq_number: u64,
attnets: EnrAttestationBitfield,
syncnets: Option<EnrSyncCommitteeBitfield>,
custody_subnet_count: Option<u64>,
}

#[derive(PartialEq, Eq, Deserialize, Serialize)]
Expand Down Expand Up @@ -118,7 +117,6 @@ impl<P: Preset> Network<P> {
seq_number: metadata.seq_number(),
attnets: metadata.attnets(),
syncnets: metadata.syncnets(),
custody_subnet_count: metadata.custody_subnet_count(),
};

NodeIdentity {
Expand Down
3 changes: 0 additions & 3 deletions p2p/src/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ impl SyncManager {
max_slot = start_slot + count - 1;

if config.is_eip7594_fork(misc::compute_epoch_at_slot::<P>(start_slot)) {
// TODO(feature/eip7594): figure out slot range and data columns
if data_column_serve_range_slot < max_slot {
sync_batches.push(SyncBatch {
target: SyncTarget::DataColumnSidecar,
Expand All @@ -338,7 +337,6 @@ impl SyncManager {
}
}

// TODO(feature/eip7594): refactor SyncBatch to Enum instead of struct with options
sync_batches.push(SyncBatch {
target: SyncTarget::Block,
direction: SyncDirection::Forward,
Expand Down Expand Up @@ -497,7 +495,6 @@ impl SyncManager {
"request data columns by range finished (request_id: {request_id})",
));

// TODO(feature/das): need to double check, this doesn't has in prev-version
self.data_column_requests
.request_by_range_finished(request_id)
}
Expand Down
60 changes: 32 additions & 28 deletions validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1491,7 +1491,7 @@ impl<P: Preset, W: Wait + Sync> Validator<P, W> {
let control_flow = self
.validate_and_store_block(
&beacon_block,
&slot_head.beacon_state,
&slot_head.beacon_state,
public_key.to_bytes(),
slot_head.current_epoch(),
)
Expand All @@ -1512,38 +1512,42 @@ impl<P: Preset, W: Wait + Sync> Validator<P, W> {

let block = Arc::new(beacon_block.clone());

if self.chain_config.is_eip7594_fork(epoch) {
let data_column_sidecars = eip_7594::get_data_column_sidecars(
&block,
block_blobs.unwrap_or_default().into_iter(),
)?;
if let Some(blobs) = block_blobs {
info!("there are {} blobs in slot: {}", blobs.len(), slot_head.slot());

let messages = data_column_sidecars
.into_iter()
.map(|dcs| {
let data_column_sidecar = Arc::new(dcs);
if self.chain_config.is_eip7594_fork(epoch) {
let data_column_sidecars = eip_7594::get_data_column_sidecars(
&block,
blobs.clone().into_iter(),
)?;

self.controller.on_own_data_column_sidecar(
wait_group.clone(),
data_column_sidecar.clone_arc(),
);
data_column_sidecar
})
.collect::<Vec<_>>();
let messages = data_column_sidecars
.into_iter()
.map(|dcs| {
let data_column_sidecar = Arc::new(dcs);

self.controller.on_own_data_column_sidecar(
wait_group.clone(),
data_column_sidecar.clone_arc(),
);
data_column_sidecar
})
.collect::<Vec<_>>();

ValidatorToP2p::PublishDataColumnSidecars(messages).send(&self.p2p_tx);
} else {
for blob_sidecar in misc::construct_blob_sidecars(
&block,
block_blobs.unwrap_or_default().into_iter(),
block_proofs.unwrap_or_default().into_iter(),
)? {
let blob_sidecar = Arc::new(blob_sidecar);
ValidatorToP2p::PublishDataColumnSidecars(messages).send(&self.p2p_tx);
} else {
for blob_sidecar in misc::construct_blob_sidecars(
&block,
blobs.clone().into_iter(),
block_proofs.unwrap_or_default().into_iter(),
)? {
let blob_sidecar = Arc::new(blob_sidecar);

self.controller
.on_own_blob_sidecar(wait_group.clone(), blob_sidecar.clone_arc());
self.controller
.on_own_blob_sidecar(wait_group.clone(), blob_sidecar.clone_arc());

ValidatorToP2p::PublishBlobSidecar(blob_sidecar).send(&self.p2p_tx);
ValidatorToP2p::PublishBlobSidecar(blob_sidecar).send(&self.p2p_tx);
}
}
}

Expand Down