Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MVP impl of import-blobs #6570

Open
wants to merge 7 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions database_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ types = { workspace = true }
slog = { workspace = true }
strum = { workspace = true }
serde = { workspace = true }
ethereum_ssz = { workspace = true }
30 changes: 30 additions & 0 deletions database_manager/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use clap_utils::get_color_style;
use clap_utils::FLAG_HEADER;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use types::Slot;

use crate::InspectTarget;

Expand Down Expand Up @@ -79,6 +80,9 @@ pub enum DatabaseManagerSubcommand {
PruneBlobs(PruneBlobs),
PruneStates(PruneStates),
Compact(Compact),
SetOldestBlobSlot(SetOldestBlobSlot),
InspectBlobs(InspectBlobs),
ImportBlobs(ImportBlobs),
}

#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
Expand Down Expand Up @@ -227,3 +231,29 @@ pub struct Compact {
)]
pub output_dir: Option<PathBuf>,
}

#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
#[clap(about = "Manually override the database's view of the oldest blob known.")]
pub struct SetOldestBlobSlot {
#[clap(
long,
value_name = "SLOT",
help = "Slot of the oldest blob in the database.",
display_order = 0
)]
pub slot: Slot,
}

#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
#[clap(about = "Produce a summary of blob availability in the database.")]
pub struct InspectBlobs {
#[clap(long, help = "Verify blob data integrity.", display_order = 0)]
pub verify: bool,
}

#[derive(Parser, Clone, Deserialize, Serialize, Debug)]
#[clap(about = "Import blobs from another node's blob database.")]
pub struct ImportBlobs {
#[clap(long, help = "Path to the database to import", display_order = 0)]
pub source_db: PathBuf,
}
215 changes: 211 additions & 4 deletions database_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ use clap::ValueEnum;
use cli::{Compact, Inspect};
use environment::{Environment, RuntimeContext};
use serde::{Deserialize, Serialize};
use slog::{info, warn, Logger};
use slog::{debug, info, warn, Logger};
use ssz::Decode;
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use store::metadata::STATE_UPPER_LIMIT_NO_RETAIN;
use store::{
errors::Error,
hot_cold_store::HotColdDBError,
metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION},
DBColumn, HotColdDB, KeyValueStore, LevelDB,
DBColumn, HotColdDB, KeyValueStore, KeyValueStoreOp, LevelDB,
};
use strum::{EnumString, EnumVariantNames};
use types::{BeaconState, EthSpec, Slot};
use types::{BeaconState, BlobSidecarList, EthSpec, Hash256, Slot};

fn parse_client_config<E: EthSpec>(
cli_args: &ArgMatches,
Expand Down Expand Up @@ -473,6 +475,201 @@ pub fn prune_states<E: EthSpec>(
Ok(())
}

fn set_oldest_blob_slot<E: EthSpec>(
slot: Slot,
client_config: ClientConfig,
runtime_context: &RuntimeContext<E>,
log: Logger,
) -> Result<(), Error> {
let spec = &runtime_context.eth2_config.spec;
let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();

let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path,
&cold_path,
&blobs_path,
|_, _, _| Ok(()),
client_config.store,
spec.clone(),
log.clone(),
)?;

let old_blob_info = db.get_blob_info();
let mut new_blob_info = old_blob_info.clone();
new_blob_info.oldest_blob_slot = Some(slot);

info!(
log,
"Updating oldest blob slot";
"previous" => ?old_blob_info.oldest_blob_slot,
"new" => slot,
);

db.compare_and_set_blob_info_with_write(old_blob_info, new_blob_info)
}

fn inspect_blobs<E: EthSpec>(
_verify: bool,
client_config: ClientConfig,
runtime_context: &RuntimeContext<E>,
log: Logger,
) -> Result<(), Error> {
let spec = &runtime_context.eth2_config.spec;
let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();

let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path,
&cold_path,
&blobs_path,
|_, _, _| Ok(()),
client_config.store,
spec.clone(),
log.clone(),
)?;

let split = db.get_split_info();
let oldest_block_slot = db.get_oldest_block_slot();
let deneb_start_slot = spec
.deneb_fork_epoch
.map_or(Slot::new(0), |epoch| epoch.start_slot(E::slots_per_epoch()));
let start_slot = oldest_block_slot.max(deneb_start_slot);

if oldest_block_slot > deneb_start_slot {
info!(
log,
"Missing blobs AND blocks";
"start" => deneb_start_slot,
"end" => oldest_block_slot - 1,
);
}

let mut last_block_root = Hash256::ZERO;

for res in db.forwards_block_roots_iterator_until(
start_slot,
split.slot,
|| {
db.get_advanced_hot_state(split.block_root, split.slot, split.state_root)?
.ok_or(HotColdDBError::MissingSplitState(split.state_root, split.slot).into())
.map(|(_, split_state)| (split_state, split.block_root))
},
spec,
)? {
let (block_root, slot) = res?;

if last_block_root == block_root {
info!(log, "Slot {}: no block", slot);
} else if let Some(blobs) = db.get_blobs(&block_root)? {
// FIXME(sproul): do verification here
info!(log, "Slot {}: {} blobs stored", slot, blobs.len());
} else {
// Check whether blobs are expected.
let block = db
.get_blinded_block(&block_root)?
.ok_or(Error::BlockNotFound(block_root))?;

let num_expected_blobs = block
.message()
.body()
.blob_kzg_commitments()
.map_or(0, |blobs| blobs.len());
if num_expected_blobs > 0 {
warn!(
log,
"Slot {}: {} blobs missing ({:?})", slot, num_expected_blobs, block_root
);
} else {
info!(log, "Slot {}: block with 0 blobs", slot);
}
}
last_block_root = block_root;
}

Ok(())
}

fn import_blobs<E: EthSpec>(
source_path: &Path,
client_config: ClientConfig,
runtime_context: &RuntimeContext<E>,
log: Logger,
) -> Result<(), Error> {
let spec = &runtime_context.eth2_config.spec;
let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();

let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path,
&cold_path,
&blobs_path,
|_, _, _| Ok(()),
client_config.store,
spec.clone(),
log.clone(),
)?;

let source_db = LevelDB::<E>::open(source_path)?;

let prev_blob_info = db.get_blob_info();
let mut oldest_blob_slot = prev_blob_info
.oldest_blob_slot
.unwrap_or(Slot::new(u64::MAX));

let mut num_already_known = 0;
let mut num_imported = 0;

let mut ops = vec![];
let batch_size = 1024;

for res in source_db.iter_column(DBColumn::BeaconBlob) {
let (block_root, blob_bytes) = res?;

if db.get_blobs(&block_root)?.is_some() {
num_already_known += 1;
} else {
let blobs = BlobSidecarList::<E>::from_ssz_bytes(&blob_bytes)?;
ops.push(KeyValueStoreOp::PutKeyValue(
store::get_key_for_col(DBColumn::BeaconBlob.as_str(), block_root.as_slice()),
blob_bytes,
));

if let Some(blob) = blobs.first() {
oldest_blob_slot = oldest_blob_slot.min(blob.slot());
debug!(
log,
"Imported blobs for slot {} ({:?})",
blob.slot(),
block_root
);
}
num_imported += 1;

if ops.len() >= batch_size {
db.blobs_db.do_atomically(std::mem::take(&mut ops))?;
}
}
}
db.blobs_db.do_atomically(ops)?;

let mut new_blob_info = prev_blob_info.clone();
new_blob_info.oldest_blob_slot = Some(oldest_blob_slot);
db.compare_and_set_blob_info_with_write(prev_blob_info, new_blob_info)?;

info!(
log,
"Blobs imported";
"imported" => num_imported,
"already_known" => num_already_known
);

Ok(())
}

/// Run the database manager, returning an error string if the operation did not succeed.
pub fn run<E: EthSpec>(
cli_args: &ArgMatches,
Expand Down Expand Up @@ -530,5 +727,15 @@ pub fn run<E: EthSpec>(
let compact_config = parse_compact_config(compact_config)?;
compact_db::<E>(compact_config, client_config, log).map_err(format_err)
}
cli::DatabaseManagerSubcommand::SetOldestBlobSlot(blob_slot_config) => {
set_oldest_blob_slot(blob_slot_config.slot, client_config, &context, log)
.map_err(format_err)
}
cli::DatabaseManagerSubcommand::InspectBlobs(_) => {
inspect_blobs(false, client_config, &context, log).map_err(format_err)
}
cli::DatabaseManagerSubcommand::ImportBlobs(config) => {
import_blobs(&config.source_db, client_config, &context, log).map_err(format_err)
}
}
}
Loading