Skip to content

Commit

Permalink
feat(cli): download zerostates when cold init; fix clippy warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
pashinov committed Jun 11, 2024
1 parent a012a6f commit 2cecf8b
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 94 deletions.
183 changes: 115 additions & 68 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ use tycho_core::block_strider::{
StateSubscriberExt, StorageBlockProvider,
};
use tycho_core::blockchain_rpc::{
BlockchainRpcClient, BlockchainRpcService, BlockchainRpcServiceConfig, BroadcastListener,
SelfBroadcastListener,
BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
};
use tycho_core::global_config::{GlobalConfig, ZerostateId};
use tycho_core::overlay_client::{PublicOverlayClient, PublicOverlayClientConfig};
use tycho_core::overlay_client::PublicOverlayClient;
use tycho_network::{
DhtClient, DhtService, InboundRequestMeta, Network, OverlayService, PeerId, PeerResolver,
PublicOverlay, Router,
Expand Down Expand Up @@ -129,15 +128,24 @@ impl CmdRun {
let global_config = GlobalConfig::from_file(self.global_config.unwrap())
.wrap_err("failed to load global config")?;

let keys = config::NodeKeys::from_file(&self.keys.unwrap())
.wrap_err("failed to load node keys")?;
let keys =
NodeKeys::from_file(self.keys.unwrap()).wrap_err("failed to load node keys")?;

let public_ip = resolve_public_ip(node_config.public_ip).await?;
let socket_addr = SocketAddr::new(public_ip.into(), node_config.port);
let socket_addr = SocketAddr::new(public_ip, node_config.port);

Node::new(socket_addr, keys, node_config, global_config)?
};

// Ensure that there are some neighbours
tracing::info!("waiting for initial neighbours");
node.blockchain_rpc_client
.overlay_client()
.neighbours()
.wait_for_peers(1)
.await;
tracing::info!("found initial neighbours");

let init_block_id = node
.try_init(self.import_zerostate)
.await
Expand Down Expand Up @@ -222,10 +230,9 @@ fn init_logger(logger_config: Option<PathBuf>) -> Result<()> {
std::panic::set_hook(Box::new(|info| {
use std::io::Write;

tracing::error!("PANIC: {}", info);
std::io::stderr().flush().ok();
std::io::stdout().flush().ok();
std::process::exit(1);
panic!("PANIC: {}", info);
}));

Ok(())
Expand Down Expand Up @@ -255,22 +262,21 @@ async fn resolve_public_ip(ip: Option<IpAddr>) -> Result<IpAddr> {
}

pub struct Node {
pub keypair: Arc<ed25519::KeyPair>,
keypair: Arc<ed25519::KeyPair>,

pub zerostate: ZerostateId,
zerostate: ZerostateId,

pub network: Network,
pub dht_client: DhtClient,
pub peer_resolver: PeerResolver,
pub overlay_service: OverlayService,
pub storage: Storage,
dht_client: DhtClient,
peer_resolver: PeerResolver,
overlay_service: OverlayService,
storage: Storage,
rpc_mempool_adapter: RpcMempoolAdapter,
blockchain_rpc_client: BlockchainRpcClient,

pub state_tracker: MinRefMcStateTracker,
state_tracker: MinRefMcStateTracker,

pub rpc_config: Option<RpcConfig>,
pub public_overlay_client_config: PublicOverlayClientConfig,
pub blockchain_rpc_service_config: BlockchainRpcServiceConfig,
pub blockchain_block_provider_config: BlockchainBlockProviderConfig,
rpc_config: Option<RpcConfig>,
blockchain_block_provider_config: BlockchainBlockProviderConfig,
}

impl Node {
Expand Down Expand Up @@ -342,21 +348,52 @@ impl Node {
"initialized storage"
);

// Setup blockchain rpc
let zerostate = global_config.zerostate;

let rpc_mempool_adapter = RpcMempoolAdapter {
inner: MempoolAdapterStdImpl::new(),
};

let blockchain_rpc_service = BlockchainRpcService::builder()
.with_config(node_config.blockchain_rpc_service)
.with_storage(storage.clone())
.with_broadcast_listener(rpc_mempool_adapter.clone())
.build();

let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
.with_peer_resolver(peer_resolver.clone())
.build(blockchain_rpc_service);
overlay_service.add_public_overlay(&public_overlay);

let blockchain_rpc_client = BlockchainRpcClient::builder()
.with_public_overlay_client(PublicOverlayClient::new(
network,
public_overlay,
node_config.public_overlay_client,
))
.with_self_broadcast_listener(rpc_mempool_adapter.clone())
.build();

tracing::info!(
overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
"initialized blockchain rpc"
);

// Setup block strider
let state_tracker = MinRefMcStateTracker::default();

Ok(Self {
keypair,
zerostate: global_config.zerostate,
network,
zerostate,
dht_client,
peer_resolver,
overlay_service,
storage,
rpc_mempool_adapter,
blockchain_rpc_client,
state_tracker,
rpc_config: node_config.rpc,
public_overlay_client_config: node_config.public_overlay_client,
blockchain_rpc_service_config: node_config.blockchain_rpc_service,
blockchain_block_provider_config: node_config.blockchain_block_provider,
})
}
Expand All @@ -379,9 +416,9 @@ impl Node {
node_state.store_last_mc_block_id(&zerostate_id);
zerostate_id
} else {
// TODO: Download zerostates
anyhow::bail!("zerostates not provided (STUB)");
self.download_zerostates().await?
};

Ok(zerostate_id)
}
}
Expand Down Expand Up @@ -457,6 +494,7 @@ impl Node {
// Import all zerostates
let handle_storage = self.storage.block_handle_storage();
let state_storage = self.storage.shard_state_storage();
let persistent_state_storage = self.storage.persistent_state_storage();

for state in to_import {
let (handle, status) =
Expand All @@ -479,12 +517,57 @@ impl Node {
stored,
"importing zerostate"
);

persistent_state_storage
.store_state(
state.state().seqno,
state.block_id(),
state.root_cell().repr_hash(),
)
.await?;
}

tracing::info!("imported zerostates");
Ok(zerostate_id)
}

async fn download_zerostates(&self) -> Result<BlockId> {
let zerostate_id = self.zerostate.as_block_id();

let state = self
.blockchain_rpc_client
.download_and_store_state(&zerostate_id, self.storage.clone())
.await?;

let persistent_state_storage = self.storage.persistent_state_storage();
persistent_state_storage
.store_state(
state.state().seqno,
state.block_id(),
state.root_cell().repr_hash(),
)
.await?;

for item in state.shards()?.latest_blocks() {
let block_id = item?;

let state = self
.blockchain_rpc_client
.download_and_store_state(&block_id, self.storage.clone())
.await?;

persistent_state_storage
.store_state(
state.state().seqno,
state.block_id(),
state.root_cell().repr_hash(),
)
.await?;
}

Ok(zerostate_id)
}

async fn run(&self, last_block_id: &BlockId) -> Result<()> {
// Force load last applied state
let mc_state = self
Expand All @@ -493,48 +576,21 @@ impl Node {
.load_state(last_block_id)
.await?;

let mempool_adapter = MempoolAdapterStdImpl::new(
// Run mempool adapter
let mempool_adapter = self.rpc_mempool_adapter.inner.clone();
mempool_adapter.run(
self.keypair.clone(),
self.dht_client.clone(),
self.overlay_service.clone(),
get_validator_peer_ids(&mc_state)?,
);
let rpc_mempool_adapter = RpcMempoolAdapter {
inner: mempool_adapter.clone(),
};

// Setup blockchain rpc
let blockchain_rpc_service = BlockchainRpcService::builder()
.with_config(self.blockchain_rpc_service_config.clone())
.with_storage(self.storage.clone())
.with_broadcast_listener(rpc_mempool_adapter.clone())
.build();

let public_overlay = PublicOverlay::builder(self.zerostate.compute_public_overlay_id())
.with_peer_resolver(self.peer_resolver.clone())
.build(blockchain_rpc_service);
self.overlay_service.add_public_overlay(&public_overlay);

let blockchain_rpc_client = BlockchainRpcClient::builder()
.with_public_overlay_client(PublicOverlayClient::new(
self.network.clone(),
public_overlay,
self.public_overlay_client_config.clone(),
))
.with_self_broadcast_listener(rpc_mempool_adapter)
.build();

tracing::info!(
overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
"initialized blockchain rpc"
);

// Create RPC
let rpc_state = if let Some(config) = &self.rpc_config {
let rpc_state = RpcState::builder()
.with_config(config.clone())
.with_storage(self.storage.clone())
.with_blockchain_rpc_client(blockchain_rpc_client.clone())
.with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
.build();

rpc_state.init(last_block_id).await?;
Expand All @@ -557,15 +613,6 @@ impl Node {
None
};

// Ensure that there are some neighbours
tracing::info!("waiting for initial neighbours");
blockchain_rpc_client
.overlay_client()
.neighbours()
.wait_for_peers(1)
.await;
tracing::info!("found initial neighbours");

// Create collator
tracing::info!("starting collator");

Expand Down Expand Up @@ -643,7 +690,7 @@ impl Node {

// Create block strider
let blockchain_block_provider = BlockchainBlockProvider::new(
blockchain_rpc_client,
self.blockchain_rpc_client.clone(),
self.storage.clone(),
self.blockchain_block_provider_config.clone(),
);
Expand Down Expand Up @@ -791,7 +838,7 @@ fn make_shard_state(
file_hash,
};

ShardStateStuff::from_root(&block_id, root, &tracker)
ShardStateStuff::from_root(&block_id, root, tracker)
}

fn supported_capabilities() -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion cli/src/tools/gen_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct MultisigCmd {
#[clap(short, long)]
lifetime: Option<u32>,

/// Use SetcodeMultisig instead of SafeMultisig
/// Use `SetcodeMultisig` instead of `SafeMultisig`
#[clap(short, long)]
updatable: bool,
}
Expand Down
6 changes: 3 additions & 3 deletions cli/src/tools/gen_zerostate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl ZerostateConfig {
);
self.accounts.insert(
minter_address,
build_minter_account(&public_key, &minter_address)?.into(),
build_minter_account(public_key, &minter_address)?.into(),
);
}
(None, Some(_)) => anyhow::bail!("minter_public_key is required"),
Expand Down Expand Up @@ -652,7 +652,7 @@ fn update_config_account(accounts: &mut ShardAccounts, config: &BlockchainConfig
anyhow::bail!("cannot set empty config account");
};

let Some((depth_balance, mut shard_account)) = accounts.get(&config.address)? else {
let Some((depth_balance, mut shard_account)) = accounts.get(config.address)? else {
anyhow::bail!("config account not found");
};

Expand Down Expand Up @@ -682,7 +682,7 @@ fn update_config_account(accounts: &mut ShardAccounts, config: &BlockchainConfig
shard_account.account = Lazy::new(&OptionalAccount(Some(account)))?;

// Update the account entry in the dict
accounts.set(&config.address, depth_balance, shard_account)?;
accounts.set(config.address, depth_balance, shard_account)?;

// Done
Ok(())
Expand Down
Loading

0 comments on commit 2cecf8b

Please sign in to comment.