From 2354bc6abd19ff6a9a6d33a59dd3fdc01c67bb91 Mon Sep 17 00:00:00 2001 From: Alexey Pashinov Date: Wed, 19 Jun 2024 12:51:44 +0200 Subject: [PATCH] feat(cli): finish cold boot --- cli/src/node/boot.rs | 162 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 152 insertions(+), 10 deletions(-) diff --git a/cli/src/node/boot.rs b/cli/src/node/boot.rs index 24fa7a084..48dba75cc 100644 --- a/cli/src/node/boot.rs +++ b/cli/src/node/boot.rs @@ -6,8 +6,9 @@ use everscale_types::models::BlockId; use futures_util::StreamExt; use tokio::sync::mpsc; use tycho_block_util::archive::WithArchiveData; -use tycho_block_util::block::BlockProofStuff; +use tycho_block_util::block::{BlockProofStuff, BlockStuff}; use tycho_block_util::state::ShardStateStuff; +use tycho_core::proto::blockchain::BlockFull; use tycho_storage::{BlockHandle, BriefBlockInfo, KeyBlocksDirection, KEY_BLOCK_UTIME_STEP}; use tycho_util::futures::JoinTask; use tycho_util::time::now_sec; @@ -36,7 +37,7 @@ pub async fn cold_boot( if last_key_block.id().seqno != 0 { // If the last suitable key block is not zerostate, we must download all blocks // with their states from shards for that - // TODO + download_start_blocks_and_states(node, last_key_block.id()).await?; }; tracing::info!("finished cold boot"); @@ -151,15 +152,15 @@ async fn download_key_blocks( let block_storage = storage.block_storage(); let block_handle_storage = storage.block_handle_storage(); - // TODO: add retry count to interrupt infinite loop - loop { - // Check whether block proof is already stored locally - if let Some(handle) = block_handle_storage.load_handle(&block_id) { - if let Ok(proof) = block_storage.load_block_proof(&handle, false).await { - return WithArchiveData::loaded(proof); - } + // Check whether block proof is already stored locally + if let Some(handle) = block_handle_storage.load_handle(&block_id) { + if let Ok(proof) = block_storage.load_block_proof(&handle, false).await { + return WithArchiveData::loaded(proof); } + } + // TODO: add retry count to interrupt infinite loop + loop { let res = blockchain_rpc_client .get_key_block_proof(&block_id) .await; @@ -242,7 +243,7 @@ async fn download_key_blocks( } } - let now_utime = tycho_util::time::now_sec(); + let now_utime = now_sec(); let last_utime = prev_key_block.handle().meta().gen_utime(); tracing::debug!( @@ -314,6 +315,145 @@ fn choose_key_block(node: &Node) -> anyhow::Result { Err(BootError::PersistentShardStateNotFound.into()) } +async fn download_start_blocks_and_states( + node: &Arc, + mc_block_id: &BlockId, +) -> anyhow::Result<()> { + // Download and save masterchain block and state + let (_, init_mc_block) = download_block_with_state(node, *mc_block_id, *mc_block_id).await?; + + tracing::info!( + block_id = %init_mc_block.id(), + "downloaded init mc block state" + ); + + // Download and save blocks and states from other shards + for (_, block_id) in init_mc_block.shard_blocks()? { + download_block_with_state(node, *mc_block_id, block_id).await?; + } + + Ok(()) +} + +async fn download_block_with_state( + node: &Arc, + mc_block_id: BlockId, + block_id: BlockId, +) -> anyhow::Result<(BlockHandle, BlockStuff)> { + let block_storage = node.storage.block_storage(); + let block_handle_storage = node.storage.block_handle_storage(); + + let mc_seqno = mc_block_id.seqno; + + let handle = block_handle_storage + .load_handle(&block_id) + .filter(|handle| handle.meta().has_data()); + + // Download block data and proof + let (block, handle) = match handle { + Some(handle) => (block_storage.load_block_data(&handle).await?, handle), + None => { + let blockchain_rpc_client = &node.blockchain_rpc_client; + + // TODO: add retry count to interrupt infinite loop + let (block, proof, meta_data) = loop { + let res = blockchain_rpc_client.get_block_full(&block_id).await; + + match res { + Ok(res) => { + let (handle, block_full) = res.split(); + + match block_full { + BlockFull::Found { + block_id, + block: block_data, + proof: proof_data, + is_link, + } => { + let block = match BlockStuff::deserialize_checked( + block_id, + &block_data, + ) { + Ok(block) => WithArchiveData::new(block, block_data), + Err(e) => { + tracing::error!(%block_id, "failed to deserialize block: {e}"); + handle.reject(); + continue; + } + }; + + let proof = match BlockProofStuff::deserialize( + block_id, + &proof_data, + is_link, + ) { + Ok(proof) => WithArchiveData::new(proof, proof_data), + Err(e) => { + tracing::error!(%block_id, "failed to deserialize block proof: {e}"); + handle.reject(); + continue; + } + }; + + match proof.pre_check_block_proof() { + Ok((_, block_info)) => { + let meta_data = BriefBlockInfo::from(&block_info) + .with_mc_seq_no(mc_seqno); + + break (block, proof, meta_data); + } + Err(e) => { + tracing::error!("received invalid block: {e:?}"); + } + } + } + BlockFull::Empty => { + tracing::warn!(%block_id, "block not found"); + handle.reject(); + } + } + } + Err(e) => { + tracing::warn!(%block_id, "failed to download block: {e:?}"); + } + } + }; + + tracing::info!(%block_id, "downloaded block data"); + + let mut handle = block_storage + .store_block_data(&block, &block.archive_data, meta_data) + .await? + .handle; + + if !handle.meta().has_proof() { + handle = block_storage + .store_block_proof(&proof, handle.into()) + .await? + .handle; + } + + (block.data, handle) + } + }; + + // Download block state + if !handle.meta().has_state() { + let state_update = block.block().load_state_update()?; + + tracing::info!(block_id = %handle.id(), "downloading state"); + let (_, shard_state) = node.load_or_download_state(&block_id).await?; + tracing::info!(block_id = %handle.id(), "downloaded state"); + + let state_hash = *shard_state.root_cell().repr_hash(); + if state_update.new_hash != state_hash { + return Err(BootError::ShardStateHashMismatch.into()); + } + } + + Ok((handle, block)) +} + #[derive(Clone)] enum PrevKeyBlock { ZeroState { @@ -372,4 +512,6 @@ enum BootError { FailedToLoadKeyBlock, #[error("Persistent shard state not found")] PersistentShardStateNotFound, + #[error("Downloaded shard state hash mismatch")] + ShardStateHashMismatch, }