From 027f3ff26e12670bc1d3d077658ec91806ed95e3 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 5 Nov 2024 10:21:11 +1100 Subject: [PATCH 1/7] Implement set-oldest-blob-slot --- database_manager/src/cli.rs | 14 +++++++++++++ database_manager/src/lib.rs | 39 +++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/database_manager/src/cli.rs b/database_manager/src/cli.rs index 5521b97805f..36e0caa9f03 100644 --- a/database_manager/src/cli.rs +++ b/database_manager/src/cli.rs @@ -3,6 +3,7 @@ use clap_utils::get_color_style; use clap_utils::FLAG_HEADER; use serde::{Deserialize, Serialize}; use std::path::PathBuf; +use types::Slot; use crate::InspectTarget; @@ -79,6 +80,7 @@ pub enum DatabaseManagerSubcommand { PruneBlobs(PruneBlobs), PruneStates(PruneStates), Compact(Compact), + SetOldestBlobSlot(SetOldestBlobSlot), } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] @@ -227,3 +229,15 @@ pub struct Compact { )] pub output_dir: Option, } + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Manually override the database's view of the oldest blob known.")] +pub struct SetOldestBlobSlot { + #[clap( + long, + value_name = "SLOT", + help = "Slot of the oldest blob in the database.", + display_order = 0 + )] + pub slot: Slot, +} diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 3d556318486..47d543c75ab 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -473,6 +473,41 @@ pub fn prune_states( Ok(()) } +fn set_oldest_blob_slot( + slot: Slot, + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + &blobs_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + log.clone(), + )?; + + let old_blob_info = db.get_blob_info(); + let mut new_blob_info = old_blob_info.clone(); + new_blob_info.oldest_blob_slot = Some(slot); + + info!( + log, + "Updating oldest blob slot"; + "previous" => ?old_blob_info.oldest_blob_slot, + "new" => slot, + ); + + db.compare_and_set_blob_info_with_write(old_blob_info, new_blob_info) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run( cli_args: &ArgMatches, @@ -530,5 +565,9 @@ pub fn run( let compact_config = parse_compact_config(compact_config)?; compact_db::(compact_config, client_config, log).map_err(format_err) } + cli::DatabaseManagerSubcommand::SetOldestBlobSlot(blob_slot_config) => { + set_oldest_blob_slot(blob_slot_config.slot, client_config, &context, log) + .map_err(format_err) + } } } From 57f58b8f1a884292d089acdeea77c81c7c5f61a8 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 5 Nov 2024 14:01:43 +1100 Subject: [PATCH 2/7] Inspect blobs --- database_manager/src/cli.rs | 8 ++++ database_manager/src/lib.rs | 80 ++++++++++++++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/database_manager/src/cli.rs b/database_manager/src/cli.rs index 36e0caa9f03..864fa244eb6 100644 --- a/database_manager/src/cli.rs +++ b/database_manager/src/cli.rs @@ -81,6 +81,7 @@ pub enum DatabaseManagerSubcommand { PruneStates(PruneStates), Compact(Compact), SetOldestBlobSlot(SetOldestBlobSlot), + InspectBlobs(InspectBlobs), } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] @@ -241,3 +242,10 @@ pub struct SetOldestBlobSlot { )] pub slot: Slot, } + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Produce a summary of blob availability in the databasue.")] +pub struct InspectBlobs { + #[clap(long, help = "Verify blob data integrity.", display_order = 0)] + pub verify: bool, +} diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 47d543c75ab..003862d723d 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -23,7 +23,7 @@ use store::{ DBColumn, HotColdDB, KeyValueStore, LevelDB, }; use strum::{EnumString, EnumVariantNames}; -use types::{BeaconState, EthSpec, Slot}; +use types::{BeaconState, EthSpec, Hash256, Slot}; fn parse_client_config( cli_args: &ArgMatches, @@ -508,6 +508,81 @@ fn set_oldest_blob_slot( db.compare_and_set_blob_info_with_write(old_blob_info, new_blob_info) } +fn inspect_blobs( + verify: bool, + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + &blobs_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + log.clone(), + )?; + + let split = db.get_split_info(); + let oldest_block_slot = db.get_oldest_block_slot(); + let deneb_start_slot = spec + .deneb_fork_epoch + .map_or(Slot::new(0), |epoch| epoch.start_slot(E::slots_per_epoch())); + let start_slot = oldest_block_slot.max(deneb_start_slot); + + if oldest_block_slot > deneb_start_slot { + info!( + log, + "Missing blobs AND blocks"; + "start" => deneb_start_slot, + "end" => oldest_block_slot - 1, + ); + } + + let mut last_block_root = Hash256::ZERO; + + for res in db.forwards_block_roots_iterator_until( + start_slot, + split.slot, + || panic!("not required"), + spec, + )? { + let (block_root, slot) = res?; + + if last_block_root == block_root { + info!(log, "Slot {}: no block", slot); + } else if let Some(blobs) = db.get_blobs(&block_root)? { + // FIXME(sproul): do verification here + info!(log, "Slot {}: {} blobs stored", slot, blobs.len()); + } else { + // Check whether blobs are expected. + let block = db + .get_blinded_block(&block_root)? + .ok_or(Error::BlockNotFound(block_root))?; + + let num_expected_blobs = block + .message() + .body() + .blob_kzg_commitments() + .map_or(0, |blobs| blobs.len()); + if num_expected_blobs > 0 { + info!(log, "Slot {}: {} blobs missing", slot, num_expected_blobs); + } else { + info!(log, "Slot {}: block with no blobs", slot); + } + } + last_block_root = block_root; + } + + Ok(()) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run( cli_args: &ArgMatches, @@ -569,5 +644,8 @@ pub fn run( set_oldest_blob_slot(blob_slot_config.slot, client_config, &context, log) .map_err(format_err) } + cli::DatabaseManagerSubcommand::InspectBlobs(_) => { + inspect_blobs(false, client_config, &context, log).map_err(format_err) + } } } From 299c0b2d17d1393cc8bfb9a35fb796aae2380387 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 7 Nov 2024 12:10:43 +1100 Subject: [PATCH 3/7] Implement import-blobs --- database_manager/Cargo.toml | 1 + database_manager/src/cli.rs | 10 ++++- database_manager/src/lib.rs | 82 +++++++++++++++++++++++++++++++++++-- 3 files changed, 88 insertions(+), 5 deletions(-) diff --git a/database_manager/Cargo.toml b/database_manager/Cargo.toml index 96176f3fba5..f97aa50ee51 100644 --- a/database_manager/Cargo.toml +++ b/database_manager/Cargo.toml @@ -15,3 +15,4 @@ types = { workspace = true } slog = { workspace = true } strum = { workspace = true } serde = { workspace = true } +ethereum_ssz = { workspace = true } diff --git a/database_manager/src/cli.rs b/database_manager/src/cli.rs index 864fa244eb6..7ef978fd3e5 100644 --- a/database_manager/src/cli.rs +++ b/database_manager/src/cli.rs @@ -82,6 +82,7 @@ pub enum DatabaseManagerSubcommand { Compact(Compact), SetOldestBlobSlot(SetOldestBlobSlot), InspectBlobs(InspectBlobs), + ImportBlobs(ImportBlobs), } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] @@ -244,8 +245,15 @@ pub struct SetOldestBlobSlot { } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] -#[clap(about = "Produce a summary of blob availability in the databasue.")] +#[clap(about = "Produce a summary of blob availability in the database.")] pub struct InspectBlobs { #[clap(long, help = "Verify blob data integrity.", display_order = 0)] pub verify: bool, } + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Import blobs from another node's blob database.")] +pub struct ImportBlobs { + #[clap(long, help = "Path to the database to import", display_order = 0)] + pub source_db: PathBuf, +} diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 003862d723d..4e25fb1523b 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -12,18 +12,19 @@ use clap::ValueEnum; use cli::{Compact, Inspect}; use environment::{Environment, RuntimeContext}; use serde::{Deserialize, Serialize}; -use slog::{info, warn, Logger}; +use slog::{debug, info, warn, Logger}; +use ssz::Decode; use std::fs; use std::io::Write; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use store::metadata::STATE_UPPER_LIMIT_NO_RETAIN; use store::{ errors::Error, metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}, - DBColumn, HotColdDB, KeyValueStore, LevelDB, + DBColumn, HotColdDB, KeyValueStore, LevelDB, StoreItem, }; use strum::{EnumString, EnumVariantNames}; -use types::{BeaconState, EthSpec, Hash256, Slot}; +use types::{BeaconState, BlobSidecarList, EthSpec, Hash256, Slot}; fn parse_client_config( cli_args: &ArgMatches, @@ -583,6 +584,76 @@ fn inspect_blobs( Ok(()) } +fn import_blobs( + source_path: &Path, + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + &blobs_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + log.clone(), + )?; + + let source_db = LevelDB::::open(source_path)?; + + let prev_blob_info = db.get_blob_info(); + let mut oldest_blob_slot = prev_blob_info + .oldest_blob_slot + .unwrap_or(Slot::new(u64::MAX)); + + let mut num_already_known = 0; + let mut num_imported = 0; + + let mut ops = vec![]; + let batch_size = 1024; + + for res in source_db.iter_column(DBColumn::BeaconBlob) { + let (block_root, blob_bytes) = res?; + + if db.get_blobs(&block_root)?.is_some() { + num_already_known += 1; + } else { + let blobs = BlobSidecarList::::from_ssz_bytes(&blob_bytes)?; + ops.push(blobs.as_kv_store_op(block_root)); + + if let Some(blob) = blobs.first() { + oldest_blob_slot = oldest_blob_slot.min(blob.slot()); + debug!(log, "Imported blobs for slot {}", blob.slot()); + } + num_imported += 1; + + if ops.len() >= batch_size { + db.blobs_db.do_atomically(std::mem::take(&mut ops))?; + } + } + } + db.blobs_db.do_atomically(ops)?; + + let mut new_blob_info = prev_blob_info.clone(); + new_blob_info.oldest_blob_slot = Some(oldest_blob_slot); + db.compare_and_set_blob_info_with_write(prev_blob_info, new_blob_info)?; + + info!( + log, + "Blobs imported"; + "imported" => num_imported, + "already_known" => num_already_known + ); + + Ok(()) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run( cli_args: &ArgMatches, @@ -647,5 +718,8 @@ pub fn run( cli::DatabaseManagerSubcommand::InspectBlobs(_) => { inspect_blobs(false, client_config, &context, log).map_err(format_err) } + cli::DatabaseManagerSubcommand::ImportBlobs(config) => { + import_blobs(&config.source_db, client_config, &context, log).map_err(format_err) + } } } From 8647a3e8ff6553747f12a1d2481bccaff78904fe Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 7 Nov 2024 14:22:59 +1100 Subject: [PATCH 4/7] Update cargo.lock --- Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.lock b/Cargo.lock index 0d9da0c7fed..026024cee62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1908,6 +1908,7 @@ dependencies = [ "clap", "clap_utils", "environment", + "ethereum_ssz", "hex", "serde", "slog", From 7c905cb891e60ea20d12bc3c7e17645c987f368e Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 8 Nov 2024 10:53:23 +1100 Subject: [PATCH 5/7] Fix inspect --- database_manager/src/lib.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 4e25fb1523b..eb0c5070e39 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -20,6 +20,7 @@ use std::path::{Path, PathBuf}; use store::metadata::STATE_UPPER_LIMIT_NO_RETAIN; use store::{ errors::Error, + hot_cold_store::HotColdDBError, metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}, DBColumn, HotColdDB, KeyValueStore, LevelDB, StoreItem, }; @@ -510,7 +511,7 @@ fn set_oldest_blob_slot( } fn inspect_blobs( - verify: bool, + _verify: bool, client_config: ClientConfig, runtime_context: &RuntimeContext, log: Logger, @@ -551,7 +552,11 @@ fn inspect_blobs( for res in db.forwards_block_roots_iterator_until( start_slot, split.slot, - || panic!("not required"), + || { + db.get_advanced_hot_state(split.block_root, split.slot, split.state_root)? + .ok_or(HotColdDBError::MissingSplitState(split.state_root, split.slot).into()) + .map(|(_, split_state)| (split_state, split.block_root)) + }, spec, )? { let (block_root, slot) = res?; From 582e389e74ae8390c8b788fdadec412acb54f5e8 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 8 Nov 2024 11:11:39 +1100 Subject: [PATCH 6/7] Optimise import by skipping reserialisation --- database_manager/src/lib.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index eb0c5070e39..73949b71805 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -22,7 +22,7 @@ use store::{ errors::Error, hot_cold_store::HotColdDBError, metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}, - DBColumn, HotColdDB, KeyValueStore, LevelDB, StoreItem, + DBColumn, HotColdDB, KeyValueStore, KeyValueStoreOp, LevelDB, }; use strum::{EnumString, EnumVariantNames}; use types::{BeaconState, BlobSidecarList, EthSpec, Hash256, Slot}; @@ -630,7 +630,10 @@ fn import_blobs( num_already_known += 1; } else { let blobs = BlobSidecarList::::from_ssz_bytes(&blob_bytes)?; - ops.push(blobs.as_kv_store_op(block_root)); + ops.push(KeyValueStoreOp::PutKeyValue( + block_root.to_vec(), + blob_bytes, + )); if let Some(blob) = blobs.first() { oldest_blob_slot = oldest_blob_slot.min(blob.slot()); From 95df7ea443f2a6d76164dd35b8be962f25669d0d Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 8 Nov 2024 11:59:00 +1100 Subject: [PATCH 7/7] Fix attempted optimisation and add more logs --- database_manager/src/lib.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 73949b71805..b9f9aed22d9 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -578,9 +578,12 @@ fn inspect_blobs( .blob_kzg_commitments() .map_or(0, |blobs| blobs.len()); if num_expected_blobs > 0 { - info!(log, "Slot {}: {} blobs missing", slot, num_expected_blobs); + warn!( + log, + "Slot {}: {} blobs missing ({:?})", slot, num_expected_blobs, block_root + ); } else { - info!(log, "Slot {}: block with no blobs", slot); + info!(log, "Slot {}: block with 0 blobs", slot); } } last_block_root = block_root; @@ -631,13 +634,18 @@ fn import_blobs( } else { let blobs = BlobSidecarList::::from_ssz_bytes(&blob_bytes)?; ops.push(KeyValueStoreOp::PutKeyValue( - block_root.to_vec(), + store::get_key_for_col(DBColumn::BeaconBlob.as_str(), block_root.as_slice()), blob_bytes, )); if let Some(blob) = blobs.first() { oldest_blob_slot = oldest_blob_slot.min(blob.slot()); - debug!(log, "Imported blobs for slot {}", blob.slot()); + debug!( + log, + "Imported blobs for slot {} ({:?})", + blob.slot(), + block_root + ); } num_imported += 1;