Skip to content

Persist data columns to store #6255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3661,16 +3661,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

if let Some(_data_columns) = data_columns {
// TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073
// if !data_columns.is_empty() {
// debug!(
// self.log, "Writing data_columns to store";
// "block_root" => %block_root,
// "count" => data_columns.len(),
// );
// ops.push(StoreOp::PutDataColumns(block_root, data_columns));
// }
if let Some(data_columns) = data_columns {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
ops.push(StoreOp::PutDataColumns(block_root, data_columns));
}
}

let txn_lock = self.store.hot_db.begin_rw_transaction();
Expand Down
10 changes: 8 additions & 2 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,27 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let custody_column_count =
custody_subnet_count.saturating_mul(spec.data_columns_per_subnet());

let overflow_cache = DataAvailabilityCheckerInner::new(
let inner = DataAvailabilityCheckerInner::new(
OVERFLOW_LRU_CAPACITY,
store,
custody_column_count,
spec.clone(),
)?;
Ok(Self {
availability_cache: Arc::new(overflow_cache),
availability_cache: Arc::new(inner),
slot_clock,
kzg,
log: log.clone(),
spec,
})
}

pub fn get_custody_columns_count(&self) -> usize {
self.availability_cache
.custody_subnet_count()
.saturating_mul(self.spec.data_columns_per_subnet())
}

/// Checks if the block root is currenlty in the availability cache awaiting import because
/// of missing components.
pub fn get_execution_valid_block(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
use crate::BeaconChainTypes;
use lru::LruCache;
use parking_lot::RwLock;
use ssz_derive::{Decode, Encode};
use ssz_types::{FixedVector, VariableList};
use std::num::NonZeroUsize;
use std::sync::Arc;
Expand All @@ -20,7 +19,7 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
///
/// The blobs are all gossip and kzg verified.
/// The block has completed all verifications except the availability check.
#[derive(Encode, Decode, Clone)]
#[derive(Clone)]
pub struct PendingComponents<E: EthSpec> {
pub block_root: Hash256,
pub verified_blobs: FixedVector<Option<KzgVerifiedBlob<E>>, E::MaxBlobsPerBlock>,
Expand Down Expand Up @@ -303,6 +302,15 @@ impl<E: EthSpec> PendingComponents<E> {
});
}
}

if let Some(kzg_verified_data_column) = self.verified_data_columns.first() {
let epoch = kzg_verified_data_column
.as_data_column()
.slot()
.epoch(E::slots_per_epoch());
return Some(epoch);
}

None
})
}
Expand Down Expand Up @@ -336,6 +344,10 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
})
}

pub fn custody_subnet_count(&self) -> usize {
self.custody_column_count
}

/// Returns true if the block root is known, without altering the LRU ordering
pub fn get_execution_valid_block(
&self,
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
pub fn into_inner(self) -> Arc<DataColumnSidecar<E>> {
self.data
}

pub fn as_data_column(&self) -> &DataColumnSidecar<E> {
&self.data
}
}

/// Complete kzg verification for a `DataColumnSidecar`.
Expand Down
41 changes: 32 additions & 9 deletions beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use state_processing::{
use std::borrow::Cow;
use std::iter;
use std::time::Duration;
use store::metadata::DataColumnInfo;
use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore};
use types::{Hash256, Slot};

Expand Down Expand Up @@ -66,6 +67,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get_anchor_info()
.ok_or(HistoricalBlockError::NoAnchorInfo)?;
let blob_info = self.store.get_blob_info();
let data_column_info = self.store.get_data_column_info();

// Take all blocks with slots less than the oldest block slot.
let num_relevant = blocks.partition_point(|available_block| {
Expand All @@ -90,18 +92,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(0);
}

let n_blobs_lists_to_import = blocks_to_import
// Blobs are stored per block, and data columns are each stored individually
let n_blob_ops_per_block = if self.spec.is_peer_das_scheduled() {
self.data_availability_checker.get_custody_columns_count()
} else {
1
};

let blob_batch_size = blocks_to_import
.iter()
.filter(|available_block| available_block.blobs().is_some())
.count();
.count()
.saturating_mul(n_blob_ops_per_block);

let mut expected_block_root = anchor_info.oldest_block_parent;
let mut prev_block_slot = anchor_info.oldest_block_slot;
let mut chunk_writer =
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot;

let mut blob_batch = Vec::with_capacity(n_blobs_lists_to_import);
let mut blob_batch = Vec::with_capacity(blob_batch_size);
let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
let mut hot_batch = Vec::with_capacity(blocks_to_import.len());
let mut signed_blocks = Vec::with_capacity(blocks_to_import.len());
Expand Down Expand Up @@ -129,11 +140,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch);
}
// Store the data columns too
if let Some(_data_columns) = maybe_data_columns {
// TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073
// new_oldest_data_column_slot = Some(block.slot());
// self.store
// .data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch);
if let Some(data_columns) = maybe_data_columns {
new_oldest_data_column_slot = Some(block.slot());
self.store
.data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch);
}

// Store block roots, including at all skip slots in the freezer DB.
Expand Down Expand Up @@ -212,7 +222,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.store.hot_db.do_atomically(hot_batch)?;
self.store.cold_db.do_atomically(cold_batch)?;

let mut anchor_and_blob_batch = Vec::with_capacity(2);
let mut anchor_and_blob_batch = Vec::with_capacity(3);

// Update the blob info.
if new_oldest_blob_slot != blob_info.oldest_blob_slot {
Expand All @@ -228,6 +238,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

// Update the data column info.
if new_oldest_data_column_slot != data_column_info.oldest_data_column_slot {
if let Some(oldest_data_column_slot) = new_oldest_data_column_slot {
let new_data_column_info = DataColumnInfo {
oldest_data_column_slot: Some(oldest_data_column_slot),
};
anchor_and_blob_batch.push(
self.store
.compare_and_set_data_column_info(data_column_info, new_data_column_info)?,
);
}
}

// Update the anchor.
let new_anchor = AnchorInfo {
oldest_block_slot: prev_block_slot,
Expand Down