Skip to content

Commit

Permalink
feat(cli): finish cold boot
Browse files Browse the repository at this point in the history
  • Loading branch information
pashinov committed Jun 19, 2024
1 parent facb397 commit be6a868
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 14 deletions.
162 changes: 152 additions & 10 deletions cli/src/node/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use everscale_types::models::BlockId;
use futures_util::StreamExt;
use tokio::sync::mpsc;
use tycho_block_util::archive::WithArchiveData;
use tycho_block_util::block::BlockProofStuff;
use tycho_block_util::block::{BlockProofStuff, BlockStuff};
use tycho_block_util::state::ShardStateStuff;
use tycho_core::proto::blockchain::BlockFull;
use tycho_storage::{BlockHandle, BriefBlockInfo, KeyBlocksDirection, KEY_BLOCK_UTIME_STEP};
use tycho_util::futures::JoinTask;
use tycho_util::time::now_sec;
Expand Down Expand Up @@ -36,7 +37,7 @@ pub async fn cold_boot(
if last_key_block.id().seqno != 0 {
// If the last suitable key block is not zerostate, we must download all blocks
// with their states from shards for that
// TODO
download_start_blocks_and_states(node, last_key_block.id()).await?;
};

tracing::info!("finished cold boot");
Expand Down Expand Up @@ -151,15 +152,15 @@ async fn download_key_blocks(
let block_storage = storage.block_storage();
let block_handle_storage = storage.block_handle_storage();

// TODO: add retry count to interrupt infinite loop
loop {
// Check whether block proof is already stored locally
if let Some(handle) = block_handle_storage.load_handle(&block_id) {
if let Ok(proof) = block_storage.load_block_proof(&handle, false).await {
return WithArchiveData::loaded(proof);
}
// Check whether block proof is already stored locally
if let Some(handle) = block_handle_storage.load_handle(&block_id) {
if let Ok(proof) = block_storage.load_block_proof(&handle, false).await {
return WithArchiveData::loaded(proof);
}
}

// TODO: add retry count to interrupt infinite loop
loop {
let res = blockchain_rpc_client
.get_key_block_proof(&block_id)
.await;
Expand Down Expand Up @@ -242,7 +243,7 @@ async fn download_key_blocks(
}
}

let now_utime = tycho_util::time::now_sec();
let now_utime = now_sec();
let last_utime = prev_key_block.handle().meta().gen_utime();

tracing::debug!(
Expand Down Expand Up @@ -314,6 +315,145 @@ fn choose_key_block(node: &Node) -> anyhow::Result<BlockHandle> {
Err(BootError::PersistentShardStateNotFound.into())
}

async fn download_start_blocks_and_states(
node: &Arc<Node>,
mc_block_id: &BlockId,
) -> anyhow::Result<()> {
// Download and save masterchain block and state
let (_, init_mc_block) = download_block_with_state(node, *mc_block_id, *mc_block_id).await?;

tracing::info!(
block_id = %init_mc_block.id(),
"downloaded init mc block state"
);

// Download and save blocks and states from other shards
for (_, block_id) in init_mc_block.shard_blocks()? {
download_block_with_state(node, *mc_block_id, block_id).await?;
}

Ok(())
}

async fn download_block_with_state(
node: &Arc<Node>,
mc_block_id: BlockId,
block_id: BlockId,
) -> anyhow::Result<(BlockHandle, BlockStuff)> {
let block_storage = node.storage.block_storage();
let block_handle_storage = node.storage.block_handle_storage();

let mc_seqno = mc_block_id.seqno;

let handle = block_handle_storage
.load_handle(&block_id)
.filter(|handle| handle.meta().has_data());

// Download block data and proof
let (block, handle) = match handle {
Some(handle) => (block_storage.load_block_data(&handle).await?, handle),
None => {
let blockchain_rpc_client = &node.blockchain_rpc_client;

// TODO: add retry count to interrupt infinite loop
let (block, proof, meta_data) = loop {
let res = blockchain_rpc_client.get_block_full(&block_id).await;

match res {
Ok(res) => {
let (handle, block_full) = res.split();

match block_full {
BlockFull::Found {
block_id,
block: block_data,
proof: proof_data,
is_link,
} => {
let block = match BlockStuff::deserialize_checked(
block_id,
&block_data,
) {
Ok(block) => WithArchiveData::new(block, block_data),
Err(e) => {
tracing::error!(%block_id, "failed to deserialize block: {e}");
handle.reject();
continue;
}
};

let proof = match BlockProofStuff::deserialize(
block_id,
&proof_data,
is_link,
) {
Ok(proof) => WithArchiveData::new(proof, proof_data),
Err(e) => {
tracing::error!(%block_id, "failed to deserialize block proof: {e}");
handle.reject();
continue;
}
};

match proof.pre_check_block_proof() {
Ok((_, block_info)) => {
let meta_data = BriefBlockInfo::from(&block_info)
.with_mc_seq_no(mc_seqno);

break (block, proof, meta_data);
}
Err(e) => {
tracing::error!("received invalid block: {e:?}");
}
}
}
BlockFull::Empty => {
tracing::warn!(%block_id, "block not found");
handle.reject();
}
}
}
Err(e) => {
tracing::warn!(%block_id, "failed to download block: {e:?}");
}
}
};

tracing::info!(%block_id, "downloaded block data");

let mut handle = block_storage
.store_block_data(&block, &block.archive_data, meta_data)
.await?
.handle;

if !handle.meta().has_proof() {
handle = block_storage
.store_block_proof(&proof, handle.into())
.await?
.handle;
}

(block.data, handle)
}
};

// Download block state
if !handle.meta().has_state() {
let state_update = block.block().load_state_update()?;

tracing::info!(block_id = %handle.id(), "downloading state");
let (_, shard_state) = node.load_or_download_state(&block_id).await?;
tracing::info!(block_id = %handle.id(), "downloaded state");

let state_hash = *shard_state.root_cell().repr_hash();
if state_update.new_hash != state_hash {
return Err(BootError::ShardStateHashMismatch.into());
}
}

Ok((handle, block))
}

#[derive(Clone)]
enum PrevKeyBlock {
ZeroState {
Expand Down Expand Up @@ -372,4 +512,6 @@ enum BootError {
FailedToLoadKeyBlock,
#[error("Persistent shard state not found")]
PersistentShardStateNotFound,
#[error("Downloaded shard state hash mismatch")]
ShardStateHashMismatch,
}
10 changes: 6 additions & 4 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,10 @@ impl Node {
async fn try_init(self: &Arc<Self>, zerostates: Option<Vec<PathBuf>>) -> Result<BlockId> {
let node_state = self.storage.node_state();

match node_state.load_last_mc_block_id() {
let last_key_block_id = match node_state.load_last_mc_block_id() {
Some(block_id) => {
tracing::info!("warm init");
Ok(block_id)
block_id
}
None => {
tracing::info!("cold init");
Expand All @@ -421,9 +421,11 @@ impl Node {
// node_state.store_init_mc_block_id(&zerostate_id);
// node_state.store_last_mc_block_id(&zerostate_id);

todo!()
last_mc_block_id
}
}
};

Ok(last_key_block_id)
}

async fn import_zerostates(
Expand Down

0 comments on commit be6a868

Please sign in to comment.