Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Donwload and store zerostates #101

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 115 additions & 68 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ use tycho_core::block_strider::{
StateSubscriberExt, StorageBlockProvider,
};
use tycho_core::blockchain_rpc::{
BlockchainRpcClient, BlockchainRpcService, BlockchainRpcServiceConfig, BroadcastListener,
SelfBroadcastListener,
BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
};
use tycho_core::global_config::{GlobalConfig, ZerostateId};
use tycho_core::overlay_client::{PublicOverlayClient, PublicOverlayClientConfig};
use tycho_core::overlay_client::PublicOverlayClient;
use tycho_network::{
DhtClient, DhtService, InboundRequestMeta, Network, OverlayService, PeerId, PeerResolver,
PublicOverlay, Router,
Expand Down Expand Up @@ -129,15 +128,24 @@ impl CmdRun {
let global_config = GlobalConfig::from_file(self.global_config.unwrap())
.wrap_err("failed to load global config")?;

let keys = config::NodeKeys::from_file(&self.keys.unwrap())
.wrap_err("failed to load node keys")?;
let keys =
NodeKeys::from_file(self.keys.unwrap()).wrap_err("failed to load node keys")?;

let public_ip = resolve_public_ip(node_config.public_ip).await?;
let socket_addr = SocketAddr::new(public_ip.into(), node_config.port);
let socket_addr = SocketAddr::new(public_ip, node_config.port);

Node::new(socket_addr, keys, node_config, global_config)?
};

// Ensure that there are some neighbours
tracing::info!("waiting for initial neighbours");
node.blockchain_rpc_client
.overlay_client()
.neighbours()
.wait_for_peers(1)
.await;
tracing::info!("found initial neighbours");

let init_block_id = node
.try_init(self.import_zerostate)
.await
Expand Down Expand Up @@ -222,10 +230,9 @@ fn init_logger(logger_config: Option<PathBuf>) -> Result<()> {
std::panic::set_hook(Box::new(|info| {
use std::io::Write;

tracing::error!("PANIC: {}", info);
std::io::stderr().flush().ok();
std::io::stdout().flush().ok();
std::process::exit(1);
panic!("PANIC: {}", info);
}));

Ok(())
Expand Down Expand Up @@ -255,22 +262,21 @@ async fn resolve_public_ip(ip: Option<IpAddr>) -> Result<IpAddr> {
}

pub struct Node {
pub keypair: Arc<ed25519::KeyPair>,
keypair: Arc<ed25519::KeyPair>,

pub zerostate: ZerostateId,
zerostate: ZerostateId,

pub network: Network,
pub dht_client: DhtClient,
pub peer_resolver: PeerResolver,
pub overlay_service: OverlayService,
pub storage: Storage,
dht_client: DhtClient,
peer_resolver: PeerResolver,
overlay_service: OverlayService,
storage: Storage,
rpc_mempool_adapter: RpcMempoolAdapter,
blockchain_rpc_client: BlockchainRpcClient,

pub state_tracker: MinRefMcStateTracker,
state_tracker: MinRefMcStateTracker,

pub rpc_config: Option<RpcConfig>,
pub public_overlay_client_config: PublicOverlayClientConfig,
pub blockchain_rpc_service_config: BlockchainRpcServiceConfig,
pub blockchain_block_provider_config: BlockchainBlockProviderConfig,
rpc_config: Option<RpcConfig>,
blockchain_block_provider_config: BlockchainBlockProviderConfig,
}

impl Node {
Expand Down Expand Up @@ -342,21 +348,52 @@ impl Node {
"initialized storage"
);

// Setup blockchain rpc
let zerostate = global_config.zerostate;

let rpc_mempool_adapter = RpcMempoolAdapter {
inner: MempoolAdapterStdImpl::new(),
};

let blockchain_rpc_service = BlockchainRpcService::builder()
.with_config(node_config.blockchain_rpc_service)
.with_storage(storage.clone())
.with_broadcast_listener(rpc_mempool_adapter.clone())
.build();

let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
.with_peer_resolver(peer_resolver.clone())
.build(blockchain_rpc_service);
overlay_service.add_public_overlay(&public_overlay);

let blockchain_rpc_client = BlockchainRpcClient::builder()
.with_public_overlay_client(PublicOverlayClient::new(
network,
public_overlay,
node_config.public_overlay_client,
))
.with_self_broadcast_listener(rpc_mempool_adapter.clone())
.build();

tracing::info!(
overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
"initialized blockchain rpc"
);

// Setup block strider
let state_tracker = MinRefMcStateTracker::default();

Ok(Self {
keypair,
zerostate: global_config.zerostate,
network,
zerostate,
dht_client,
peer_resolver,
overlay_service,
storage,
rpc_mempool_adapter,
blockchain_rpc_client,
state_tracker,
rpc_config: node_config.rpc,
public_overlay_client_config: node_config.public_overlay_client,
blockchain_rpc_service_config: node_config.blockchain_rpc_service,
blockchain_block_provider_config: node_config.blockchain_block_provider,
})
}
Expand All @@ -379,9 +416,9 @@ impl Node {
node_state.store_last_mc_block_id(&zerostate_id);
zerostate_id
} else {
// TODO: Download zerostates
anyhow::bail!("zerostates not provided (STUB)");
self.download_zerostates().await?
};

Ok(zerostate_id)
}
}
Expand Down Expand Up @@ -457,6 +494,7 @@ impl Node {
// Import all zerostates
let handle_storage = self.storage.block_handle_storage();
let state_storage = self.storage.shard_state_storage();
let persistent_state_storage = self.storage.persistent_state_storage();

for state in to_import {
let (handle, status) =
Expand All @@ -479,12 +517,57 @@ impl Node {
stored,
"importing zerostate"
);

persistent_state_storage
.store_state(
state.state().seqno,
state.block_id(),
state.root_cell().repr_hash(),
)
.await?;
}

tracing::info!("imported zerostates");
Ok(zerostate_id)
}

async fn download_zerostates(&self) -> Result<BlockId> {
let zerostate_id = self.zerostate.as_block_id();

let state = self
.blockchain_rpc_client
.download_and_store_state(&zerostate_id, self.storage.clone())
.await?;

let persistent_state_storage = self.storage.persistent_state_storage();
persistent_state_storage
.store_state(
state.state().seqno,
state.block_id(),
state.root_cell().repr_hash(),
)
.await?;

for item in state.shards()?.latest_blocks() {
let block_id = item?;

let state = self
.blockchain_rpc_client
.download_and_store_state(&block_id, self.storage.clone())
.await?;

persistent_state_storage
.store_state(
state.state().seqno,
state.block_id(),
state.root_cell().repr_hash(),
)
.await?;
}

Ok(zerostate_id)
}

async fn run(&self, last_block_id: &BlockId) -> Result<()> {
// Force load last applied state
let mc_state = self
Expand All @@ -493,48 +576,21 @@ impl Node {
.load_state(last_block_id)
.await?;

let mempool_adapter = MempoolAdapterStdImpl::new(
// Run mempool adapter
let mempool_adapter = self.rpc_mempool_adapter.inner.clone();
mempool_adapter.run(
self.keypair.clone(),
self.dht_client.clone(),
self.overlay_service.clone(),
get_validator_peer_ids(&mc_state)?,
);
let rpc_mempool_adapter = RpcMempoolAdapter {
inner: mempool_adapter.clone(),
};

// Setup blockchain rpc
let blockchain_rpc_service = BlockchainRpcService::builder()
.with_config(self.blockchain_rpc_service_config.clone())
.with_storage(self.storage.clone())
.with_broadcast_listener(rpc_mempool_adapter.clone())
.build();

let public_overlay = PublicOverlay::builder(self.zerostate.compute_public_overlay_id())
.with_peer_resolver(self.peer_resolver.clone())
.build(blockchain_rpc_service);
self.overlay_service.add_public_overlay(&public_overlay);

let blockchain_rpc_client = BlockchainRpcClient::builder()
.with_public_overlay_client(PublicOverlayClient::new(
self.network.clone(),
public_overlay,
self.public_overlay_client_config.clone(),
))
.with_self_broadcast_listener(rpc_mempool_adapter)
.build();

tracing::info!(
overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
"initialized blockchain rpc"
);

// Create RPC
let rpc_state = if let Some(config) = &self.rpc_config {
let rpc_state = RpcState::builder()
.with_config(config.clone())
.with_storage(self.storage.clone())
.with_blockchain_rpc_client(blockchain_rpc_client.clone())
.with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
.build();

rpc_state.init(last_block_id).await?;
Expand All @@ -557,15 +613,6 @@ impl Node {
None
};

// Ensure that there are some neighbours
tracing::info!("waiting for initial neighbours");
blockchain_rpc_client
.overlay_client()
.neighbours()
.wait_for_peers(1)
.await;
tracing::info!("found initial neighbours");

// Create collator
tracing::info!("starting collator");

Expand Down Expand Up @@ -643,7 +690,7 @@ impl Node {

// Create block strider
let blockchain_block_provider = BlockchainBlockProvider::new(
blockchain_rpc_client,
self.blockchain_rpc_client.clone(),
self.storage.clone(),
self.blockchain_block_provider_config.clone(),
);
Expand Down Expand Up @@ -791,7 +838,7 @@ fn make_shard_state(
file_hash,
};

ShardStateStuff::from_root(&block_id, root, &tracker)
ShardStateStuff::from_root(&block_id, root, tracker)
}

fn supported_capabilities() -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion cli/src/tools/gen_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct MultisigCmd {
#[clap(short, long)]
lifetime: Option<u32>,

/// Use SetcodeMultisig instead of SafeMultisig
/// Use `SetcodeMultisig` instead of `SafeMultisig`
#[clap(short, long)]
updatable: bool,
}
Expand Down
6 changes: 3 additions & 3 deletions cli/src/tools/gen_zerostate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl ZerostateConfig {
);
self.accounts.insert(
minter_address,
build_minter_account(&public_key, &minter_address)?.into(),
build_minter_account(public_key, &minter_address)?.into(),
);
}
(None, Some(_)) => anyhow::bail!("minter_public_key is required"),
Expand Down Expand Up @@ -652,7 +652,7 @@ fn update_config_account(accounts: &mut ShardAccounts, config: &BlockchainConfig
anyhow::bail!("cannot set empty config account");
};

let Some((depth_balance, mut shard_account)) = accounts.get(&config.address)? else {
let Some((depth_balance, mut shard_account)) = accounts.get(config.address)? else {
anyhow::bail!("config account not found");
};

Expand Down Expand Up @@ -682,7 +682,7 @@ fn update_config_account(accounts: &mut ShardAccounts, config: &BlockchainConfig
shard_account.account = Lazy::new(&OptionalAccount(Some(account)))?;

// Update the account entry in the dict
accounts.set(&config.address, depth_balance, shard_account)?;
accounts.set(config.address, depth_balance, shard_account)?;

// Done
Ok(())
Expand Down
Loading
Loading