Skip to content

Commit

Permalink
feat(cli): init collator
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed May 3, 2024
1 parent b05ff37 commit f81f0ef
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 20 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ base64 = { workspace = true }
clap = { workspace = true }
everscale-crypto = { workspace = true }
everscale-types = { workspace = true }
futures-util = { workspace = true }
hex = { workspace = true }
public-ip = { workspace = true }
rand = { workspace = true }
Expand Down
146 changes: 140 additions & 6 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,20 @@ use clap::Parser;
use everscale_crypto::ed25519;
use everscale_types::models::*;
use everscale_types::prelude::*;
use futures_util::future::BoxFuture;
use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
use tycho_collator::collator::CollatorStdImplFactory;
use tycho_collator::manager::CollationManager;
use tycho_collator::mempool::MempoolAdapterStdImpl;
use tycho_collator::msg_queue::MessageQueueAdapterStdImpl;
use tycho_collator::state_node::{StateNodeAdapter, StateNodeAdapterStdImpl};
use tycho_collator::types::{CollationConfig, ValidatorNetwork};
use tycho_collator::validator::config::ValidatorConfig;
use tycho_collator::validator::validator::ValidatorStdImplFactory;
use tycho_core::block_strider::{
BlockStrider, BlockchainBlockProvider, BlockchainBlockProviderConfig, NoopSubscriber,
PersistentBlockStriderState, StorageBlockProvider,
BlockProvider, BlockStrider, BlockchainBlockProvider, BlockchainBlockProviderConfig,
OptionalBlockStuff, PersistentBlockStriderState, StateSubscriber, StateSubscriberContext,
StorageBlockProvider,
};
use tycho_core::blockchain_rpc::{BlockchainRpcClient, BlockchainRpcService};
use tycho_core::global_config::{GlobalConfig, ZerostateId};
Expand Down Expand Up @@ -200,6 +210,8 @@ async fn resolve_public_ip(ip: Option<Ipv4Addr>) -> Result<Ipv4Addr> {
}

pub struct Node {
pub keypair: Arc<ed25519::KeyPair>,

pub zerostate: ZerostateId,

pub network: Network,
Expand Down Expand Up @@ -303,6 +315,7 @@ impl Node {
let state_tracker = MinRefMcStateTracker::default();

Ok(Self {
keypair,
zerostate: global_config.zerostate,
network,
dht_client,
Expand Down Expand Up @@ -335,6 +348,10 @@ impl Node {
};

node_state.store_init_mc_block_id(&zerostate_id);

// TODO: Only store this if it is a zerostate
node_state.store_last_mc_block_id(&zerostate_id);

Ok(zerostate_id)
}
}
Expand Down Expand Up @@ -441,6 +458,7 @@ impl Node {
}

async fn run(&self, _init_block_id: &BlockId) -> Result<()> {
// Ensure that there are some neighbours
tracing::info!("waiting for initial neighbours");
self.blockchain_rpc_client
.overlay_client()
Expand All @@ -449,6 +467,61 @@ impl Node {
.await;
tracing::info!("found initial neighbours");

// Create collator
tracing::info!("starting collator");

// TODO: move into config
let collation_config = CollationConfig {
key_pair: self.keypair.clone(),
mc_block_min_interval_ms: 10000,
max_mc_block_delta_from_bc_to_await_own: 2,
supported_block_version: 50,
supported_capabilities: supported_capabilities(),
max_collate_threads: 1,
// test_validators_keypairs: vec![],
};

let collation_manager = CollationManager::start(
collation_config,
Arc::new(MessageQueueAdapterStdImpl::new()),
|listener| StateNodeAdapterStdImpl::new(listener, self.storage.clone()),
MempoolAdapterStdImpl::new,
ValidatorStdImplFactory {
network: ValidatorNetwork {
overlay_service: self.overlay_service.clone(),
peer_resolver: self.peer_resolver.clone(),
dht_client: self.dht_client.clone(),
},
// TODO: Move into node config
config: ValidatorConfig {
base_loop_delay: Duration::from_millis(50),
max_loop_delay: Duration::from_secs(10),
},
},
CollatorStdImplFactory,
);

let collator_state_subscriber = CollatorStateSubscriber {
adapter: collation_manager.state_node_adapter().clone(),
};

// TEMP
{
let masterchain_zerostate = self
.storage
.shard_state_storage()
.load_state(&self.zerostate.as_block_id())
.await?;

collator_state_subscriber
.adapter
.handle_state(&masterchain_zerostate)
.await?;
}

tracing::info!("collator started");

// Create block strider
let blockchain_block_provider = BlockchainBlockProvider::new(
self.blockchain_rpc_client.clone(),
self.storage.clone(),
Expand All @@ -457,28 +530,64 @@ impl Node {

let storage_block_provider = StorageBlockProvider::new(self.storage.clone());

let collator_block_provider = CollatorBlockProvider {
adapter: collation_manager.state_node_adapter().clone(),
};

let strider_state =
PersistentBlockStriderState::new(self.zerostate.as_block_id(), self.storage.clone());

let block_strider = BlockStrider::builder()
.with_provider((blockchain_block_provider, storage_block_provider))
.with_provider((
(blockchain_block_provider, storage_block_provider),
collator_block_provider,
))
.with_state(strider_state)
.with_state_subscriber(
self.state_tracker.clone(),
self.storage.clone(),
NoopSubscriber,
collator_state_subscriber,
)
.build();

// Run block strider
tracing::info!("block strider started");

block_strider.run().await?;

tracing::info!("block strider finished");

Ok(())
}
}

struct CollatorStateSubscriber {
adapter: Arc<dyn StateNodeAdapter>,
}

impl StateSubscriber for CollatorStateSubscriber {
type HandleStateFut<'a> = BoxFuture<'a, Result<()>>;

fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
self.adapter.handle_state(&cx.state)
}
}

struct CollatorBlockProvider {
adapter: Arc<dyn StateNodeAdapter>,
}

impl BlockProvider for CollatorBlockProvider {
type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
self.adapter.wait_for_block(prev_block_id)
}

fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> {
self.adapter.wait_for_block(block_id)
}
}

fn load_zerostate(tracker: &MinRefMcStateTracker, path: &PathBuf) -> Result<ShardStateStuff> {
let data = std::fs::read(path).wrap_err("failed to read file")?;
let file_hash = Boc::file_hash(&data);
Expand Down Expand Up @@ -529,3 +638,28 @@ fn make_shard_state(

ShardStateStuff::new(block_id, root, &tracker)
}

fn supported_capabilities() -> u64 {
GlobalCapabilities::from([
GlobalCapability::CapCreateStatsEnabled,
GlobalCapability::CapBounceMsgBody,
GlobalCapability::CapReportVersion,
GlobalCapability::CapShortDequeue,
GlobalCapability::CapInitCodeHash,
GlobalCapability::CapOffHypercube,
GlobalCapability::CapFixTupleIndexBug,
GlobalCapability::CapFastStorageStat,
GlobalCapability::CapMyCode,
GlobalCapability::CapFullBodyInBounced,
GlobalCapability::CapStorageFeeToTvm,
GlobalCapability::CapWorkchains,
GlobalCapability::CapStcontNewFormat,
GlobalCapability::CapFastStorageStatBugfix,
GlobalCapability::CapResolveMerkleCell,
GlobalCapability::CapFeeInGasUnits,
GlobalCapability::CapBounceAfterFailedAction,
GlobalCapability::CapSuspendedList,
GlobalCapability::CapsTvmBugfixes2022,
])
.into_inner()
}
33 changes: 32 additions & 1 deletion cli/src/tools/gen_zerostate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,39 @@ impl ZerostateConfig {
}
}

let workchains = self.params.get::<ConfigParam12>()?.unwrap();
let mut shards = Vec::new();
for entry in workchains.iter() {
let (workchain, descr) = entry?;
shards.push((
ShardIdent::new_full(workchain),
ShardDescription {
seqno: 0,
reg_mc_seqno: 0,
start_lt: 0,
end_lt: 0,
root_hash: descr.zerostate_root_hash,
file_hash: descr.zerostate_file_hash,
before_split: false,
before_merge: false,
want_split: false,
want_merge: false,
nx_cc_updated: true,
next_catchain_seqno: 0,
next_validator_shard: ShardIdent::PREFIX_FULL,
min_ref_mc_seqno: u32::MAX,
gen_utime: now,
split_merge_at: None,
fees_collected: CurrencyCollection::ZERO,
funds_created: CurrencyCollection::ZERO,
copyleft_rewards: Dict::new(),
proof_chain: None,
},
));
}

state.custom = Some(Lazy::new(&McStateExtra {
shards: Default::default(),
shards: ShardHashes::from_shards(shards.iter().map(|(ident, descr)| (ident, descr)))?,
config: BlockchainConfig {
address: self.params.get::<ConfigParam0>()?.unwrap(),
params: self.params.clone(),
Expand Down
7 changes: 6 additions & 1 deletion collator/src/collator/build_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,12 @@ impl CollatorStdImpl {

new_state
.total_validator_fees
.checked_sub(&value_flow.recovered)?;
.try_add_assign(&value_flow.fees_collected)?;

// TODO:
// new_state
// .total_validator_fees
// .try_sub_assign(&value_flow.recovered)?;

if self.shard_id.is_masterchain() {
new_state.libraries =
Expand Down
3 changes: 3 additions & 0 deletions collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,8 @@ where
new_session_seqno,
))?;

tracing::warn!("SUBSET: {subset:?}");

//TEST: override with test subset with test keypairs defined on test run
#[cfg(feature = "test")]
let subset = if self.config.test_validators_keypairs.is_empty() {
Expand Down Expand Up @@ -697,6 +699,7 @@ where
let local_pubkey_opt = find_us_in_collators_set(&self.config, &subset);

let new_session_info = Arc::new(CollationSessionInfo::new(
shard_id.workchain(),
new_session_seqno,
ValidatorSubsetInfo {
validators: subset,
Expand Down
6 changes: 6 additions & 0 deletions collator/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,22 +178,28 @@ pub(crate) type CollationSessionId = (ShardIdent, u32);
#[derive(Clone)]
pub struct CollationSessionInfo {
/// Sequence number of the collation session
workchain: i32,
seqno: u32,
collators: ValidatorSubsetInfo,
current_collator_keypair: Option<Arc<KeyPair>>,
}
impl CollationSessionInfo {
pub fn new(
workchain: i32,
seqno: u32,
collators: ValidatorSubsetInfo,
current_collator_keypair: Option<Arc<KeyPair>>,
) -> Self {
Self {
workchain,
seqno,
collators,
current_collator_keypair,
}
}
pub fn workchain(&self) -> i32 {
self.workchain
}
pub fn seqno(&self) -> u32 {
self.seqno
}
Expand Down
4 changes: 3 additions & 1 deletion collator/src/validator/network/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ impl Service<ServiceRequest> for NetworkService {
signatures,
} = query;
{
let session = state.get_session(session_seqno).await;
let session = state
.get_session(block_id_short.shard.workchain(), session_seqno)
.await;
match handle_signatures_query(
session,
session_seqno,
Expand Down
Loading

0 comments on commit f81f0ef

Please sign in to comment.