Skip to content

Commit

Permalink
[formal-snapshots] Introduce strict verification (#16918)
Browse files Browse the repository at this point in the history
## Description 

`--verify strict` adds an additional level of verification by verifying
the live object set of the db after restore, rather than simply the
contents of the snapshot itself. These two are not equivalent in cases
where the restore process inadvertently adds objects to the live object
set from a source other than the downloaded snapshot itself. One such
example could be inserting genesis objects.

## Test Plan 

```
target/release/sui-tool download-formal-snapshot --network mainnet --latest --path /opt/sui/db/authorities_db --genesis /opt/sui/mainnet/config/genesis.blob --no-sign-request --verify strict
Beginning formal snapshot restore to end of epoch 419, network: Mainnet with verification mode Strict
[00:25:02] ██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████ 35942088/35942088(Checkpoint summary sync is complete)
[00:07:42] ██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████ 453 out of 453 .ref files done
(ref files download complete)
[00:00:42] ████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████ 453 out of 453 ref files checksummed (Checksumming complete)
[00:02:17] ██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████ 453 out of 453 ref files accumulated from snapshot (Accumulation complete)
[00:30:33] ██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████ 453 out of 453 .obj files done
(Objects download complete)
[00:40:30] ██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████ 35942088/35942088(Checkpoint summary verification is complete)
[00:40:02] ██████████████████████████████████████████████████████████████████████████████████████░ 257689825 out of 257775627 ref files accumulated from db (live obj accumulations per sec: 107250.80399722935)DB live object state verification completed successfully!
```

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
williampsmith authored Jun 12, 2024
1 parent 101f1ff commit 3f67d9d
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 20 deletions.
96 changes: 96 additions & 0 deletions crates/sui-snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,36 @@ pub mod uploader;
mod writer;

use anyhow::Result;
use fastcrypto::hash::MultisetHash;
use indicatif::MultiProgress;
use indicatif::ProgressBar;
use indicatif::ProgressStyle;
use num_enum::IntoPrimitive;
use num_enum::TryFromPrimitive;
use object_store::path::Path;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
use sui_core::authority::authority_store_tables::LiveObject;
use sui_core::authority::epoch_start_configuration::EpochFlag;
use sui_core::authority::epoch_start_configuration::EpochStartConfiguration;
use sui_core::checkpoints::CheckpointStore;
use sui_core::epoch::committee_store::CommitteeStore;
use sui_core::state_accumulator::WrappedObject;
use sui_protocol_config::Chain;
use sui_storage::object_store::util::path_to_filesystem;
use sui_storage::{compute_sha3_checksum, FileCompression, SHA3_BYTES};
use sui_types::accumulator::Accumulator;
use sui_types::base_types::ObjectID;
use sui_types::messages_checkpoint::ECMHLiveObjectSetDigest;
use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
use sui_types::sui_system_state::get_sui_system_state;
use sui_types::sui_system_state::SuiSystemStateTrait;
use tokio::time::Instant;

/// The following describes the format of an object file (*.obj) used for persisting live sui objects.
/// The maximum size per .obj file is 128MB. State snapshot will be taken at the end of every epoch.
Expand Down Expand Up @@ -218,12 +230,17 @@ pub async fn setup_db_state(
perpetual_db: Arc<AuthorityPerpetualTables>,
checkpoint_store: Arc<CheckpointStore>,
committee_store: Arc<CommitteeStore>,
chain: Chain,
verify: bool,
num_live_objects: u64,
m: MultiProgress,
) -> Result<()> {
// This function should be called once state accumulator based hash verification
// is complete and live object set state is downloaded to local store
let system_state_object = get_sui_system_state(&perpetual_db)?;
let new_epoch_start_state = system_state_object.into_epoch_start_state();
let next_epoch_committee = new_epoch_start_state.get_sui_committee();
let root_digest: ECMHLiveObjectSetDigest = accumulator.digest().into();
let last_checkpoint = checkpoint_store
.get_epoch_last_checkpoint(epoch)
.expect("Error loading last checkpoint for current epoch")
Expand All @@ -242,5 +259,84 @@ pub async fn setup_db_state(
committee_store.insert_new_committee(&next_epoch_committee)?;
checkpoint_store.update_highest_executed_checkpoint(&last_checkpoint)?;

if verify {
eprintln!(
"Beginning DB live object state verification. This may take a while, \
and currently does not provide progress updates..."
);
let simplified_unwrap_then_delete = match (chain, epoch) {
(Chain::Mainnet, ep) if ep >= 87 => true,
(Chain::Mainnet, ep) if ep < 87 => false,
(Chain::Testnet, ep) if ep >= 50 => true,
(Chain::Testnet, ep) if ep < 50 => false,
_ => panic!("Unsupported chain"),
};
let include_tombstones = !simplified_unwrap_then_delete;
let iter = perpetual_db.iter_live_object_set(include_tombstones);
let local_digest = ECMHLiveObjectSetDigest::from(
accumulate_live_object_iter(Box::new(iter), m, num_live_objects).digest(),
);
assert_eq!(
root_digest, local_digest,
"End of epoch {} root state digest {} does not match \
local root state hash {} after restoring db from formal snapshot",
epoch, root_digest.digest, local_digest.digest,
);
eprintln!("DB live object state verification completed successfully!");
}

Ok(())
}

pub fn accumulate_live_object_iter(
iter: Box<dyn Iterator<Item = LiveObject> + '_>,
m: MultiProgress,
num_live_objects: u64,
) -> Accumulator {
// Monitor progress of live object accumulation
let accum_progress_bar = m.add(ProgressBar::new(num_live_objects).with_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {wide_bar} {pos} out of {len} ref files accumulated from db ({msg})",
).unwrap(),
));
let accum_counter = Arc::new(AtomicU64::new(0));
let cloned_accum_counter = accum_counter.clone();
tokio::spawn(async move {
let a_instant = Instant::now();
loop {
if accum_progress_bar.is_finished() {
break;
}
let num_accumulated = cloned_accum_counter.load(Ordering::Relaxed);
assert!(
num_accumulated <= num_live_objects,
"Accumulated more objects (at least {num_accumulated}) than expected ({num_live_objects})"
);
let accumulations_per_sec = num_accumulated as f64 / a_instant.elapsed().as_secs_f64();
accum_progress_bar.set_position(num_accumulated);
accum_progress_bar.set_message(format!(
"live obj accumulations per sec: {}",
accumulations_per_sec
));
tokio::time::sleep(Duration::from_secs(1)).await;
}
});

// Accumulate live objects
let mut acc = Accumulator::default();
for live_object in iter {
match live_object {
LiveObject::Normal(object) => {
acc.insert(object.compute_object_reference().2);
}
LiveObject::Wrapped(key) => {
acc.insert(
bcs::to_bytes(&WrappedObject::new(key.0, key.1))
.expect("Failed to serialize WrappedObject"),
);
}
}
accum_counter.fetch_add(1, Ordering::Relaxed);
}
acc
}
9 changes: 5 additions & 4 deletions crates/sui-snapshot/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl StateSnapshotReaderV1 {
&mut self,
perpetual_db: &AuthorityPerpetualTables,
abort_registration: AbortRegistration,
sender: Option<tokio::sync::mpsc::Sender<Accumulator>>,
sender: Option<tokio::sync::mpsc::Sender<(Accumulator, u64)>>,
) -> Result<()> {
// This computes and stores the sha3 digest of object references in REFERENCE file for each
// bucket partition. When downloading objects, we will match sha3 digest of object references
Expand Down Expand Up @@ -239,7 +239,7 @@ impl StateSnapshotReaderV1 {

fn spawn_accumulation_tasks(
&self,
sender: tokio::sync::mpsc::Sender<Accumulator>,
sender: tokio::sync::mpsc::Sender<(Accumulator, u64)>,
num_part_files: usize,
) -> JoinHandle<()> {
// Spawn accumulation progress bar
Expand All @@ -249,7 +249,7 @@ impl StateSnapshotReaderV1 {
let accum_progress_bar = self.m.add(
ProgressBar::new(num_part_files as u64).with_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {wide_bar} {pos} out of {len} ref files accumulated ({msg})",
"[{elapsed_precise}] {wide_bar} {pos} out of {len} ref files accumulated from snapshot ({msg})",
)
.unwrap(),
),
Expand Down Expand Up @@ -306,9 +306,10 @@ impl StateSnapshotReaderV1 {
let sender_clone = sender.clone();
tokio::spawn(async move {
let mut partial_acc = Accumulator::default();
let num_objects = obj_digests.len();
partial_acc.insert_all(obj_digests);
sender_clone
.send(partial_acc)
.send((partial_acc, num_objects as u64))
.await
.expect("Unable to send accumulator from snapshot reader");
})
Expand Down
11 changes: 5 additions & 6 deletions crates/sui-tool/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
download_db_snapshot, download_formal_snapshot, dump_checkpoints_from_archive,
get_latest_available_epoch, get_object, get_transaction_block, make_clients, pkg_dump,
restore_from_db_checkpoint, verify_archive, verify_archive_by_checksum, ConciseObjectOutput,
GroupedObjectOutput, VerboseObjectOutput,
GroupedObjectOutput, SnapshotVerifyMode, VerboseObjectOutput,
};
use anyhow::Result;
use futures::{future::join_all, StreamExt};
Expand Down Expand Up @@ -323,10 +323,9 @@ pub enum ToolCommand {
/// value based on number of available logical cores.
#[clap(long = "num-parallel-downloads")]
num_parallel_downloads: Option<usize>,
/// If true, perform snapshot and checkpoint summary verification.
/// Defaults to true.
#[clap(long = "verify")]
verify: Option<bool>,
/// Verification mode to employ.
#[clap(long = "verify", default_value = "normal")]
verify: Option<SnapshotVerifyMode>,
/// Network to download snapshot for. Defaults to "mainnet".
/// If `--snapshot-bucket` or `--archive-bucket` is not specified,
/// the value of this flag is used to construct default bucket names.
Expand Down Expand Up @@ -908,7 +907,7 @@ impl ToolCommand {
);
}

let verify = verify.unwrap_or(true);
let verify = verify.unwrap_or_default();
download_formal_snapshot(
&path,
epoch_to_download,
Expand Down
42 changes: 32 additions & 10 deletions crates/sui-tool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ use tokio::task::JoinHandle;
use tokio::time::Instant;

use anyhow::anyhow;
use clap::ValueEnum;
use eyre::ContextCompat;
use fastcrypto::hash::MultisetHash;
use futures::{StreamExt, TryStreamExt};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use prometheus::Registry;
use serde::{Deserialize, Serialize};
use sui_archival::reader::{ArchiveReader, ArchiveReaderMetrics};
use sui_archival::{verify_archive_with_checksums, verify_archive_with_genesis_config};
use sui_config::node::ArchiveReaderConfig;
Expand Down Expand Up @@ -71,6 +73,19 @@ pub mod commands;
pub mod db_tool;
pub mod pkg_dump;

#[derive(
Clone, Serialize, Deserialize, Debug, PartialEq, Copy, PartialOrd, Ord, Eq, ValueEnum, Default,
)]
pub enum SnapshotVerifyMode {
/// verification of both db state and checkpoint chain is skipped.
None,
/// verify snapshot state during download, but no post-restore db verification.
#[default]
Normal,
/// verify db state post-restore against the end of epoch state root commitment.
Strict,
}

// This functions requires at least one of genesis or fullnode_rpc to be `Some`.
async fn make_clients(
sui_client: &Arc<SuiClient>,
Expand Down Expand Up @@ -843,11 +858,11 @@ pub async fn download_formal_snapshot(
archive_store_config: ObjectStoreConfig,
num_parallel_downloads: usize,
network: Chain,
verify: bool,
verify: SnapshotVerifyMode,
) -> Result<(), anyhow::Error> {
eprintln!(
"Beginning formal snapshot restore to end of epoch {}, network: {:?}",
epoch, network,
"Beginning formal snapshot restore to end of epoch {}, network: {:?}, verification mode: {:?}",
epoch, network, verify,
);
let path = path.join("staging").to_path_buf();
if path.exists() {
Expand Down Expand Up @@ -878,7 +893,7 @@ pub async fn download_formal_snapshot(
archive_store_config.clone(),
epoch,
num_parallel_downloads,
verify,
verify != SnapshotVerifyMode::None,
);
let (_abort_handle, abort_registration) = AbortHandle::new_pair();
let perpetual_db_clone = perpetual_db.clone();
Expand All @@ -891,6 +906,7 @@ pub async fn download_formal_snapshot(
// TODO if verify is false, we should skip generating these and
// not pass in a channel to the reader
let (sender, mut receiver) = mpsc::channel(num_parallel_downloads);
let m_clone = m.clone();

let snapshot_handle = tokio::spawn(async move {
let local_store_config = ObjectStoreConfig {
Expand All @@ -904,7 +920,7 @@ pub async fn download_formal_snapshot(
&local_store_config,
usize::MAX,
NonZeroUsize::new(num_parallel_downloads).unwrap(),
m,
m_clone,
)
.await
.unwrap_or_else(|err| panic!("Failed to create reader: {}", err));
Expand All @@ -915,7 +931,9 @@ pub async fn download_formal_snapshot(
Ok::<(), anyhow::Error>(())
});
let mut root_accumulator = Accumulator::default();
while let Some(partial_acc) = receiver.recv().await {
let mut num_live_objects = 0;
while let Some((partial_acc, num_objects)) = receiver.recv().await {
num_live_objects += num_objects;
root_accumulator.union(&partial_acc);
}
summaries_handle
Expand All @@ -928,7 +946,7 @@ pub async fn download_formal_snapshot(
.expect("Expected nonempty checkpoint store");

// Perform snapshot state verification
if verify {
if verify != SnapshotVerifyMode::None {
assert_eq!(
last_checkpoint.epoch(),
epoch,
Expand All @@ -955,7 +973,7 @@ pub async fn download_formal_snapshot(
assert_eq!(
*consensus_digest, local_digest,
"End of epoch {} root state digest {} does not match \
local root state hash {} after restoring from formal snapshot",
local root state hash {} computed from snapshot data",
epoch, consensus_digest.digest, local_digest.digest,
);
eprintln!("Formal snapshot state verification completed successfully!");
Expand All @@ -981,10 +999,14 @@ pub async fn download_formal_snapshot(

setup_db_state(
epoch,
root_accumulator,
perpetual_db,
root_accumulator.clone(),
perpetual_db.clone(),
checkpoint_store,
committee_store,
network,
verify == SnapshotVerifyMode::Strict,
num_live_objects,
m,
)
.await?;

Expand Down

0 comments on commit 3f67d9d

Please sign in to comment.