diff --git a/cli/src/main.rs b/cli/src/main.rs index b60082dc6..d18b11d1e 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -24,12 +24,6 @@ fn main() -> ExitCode { std::env::set_var("RUST_BACKTRACE", "1"); } - rayon::ThreadPoolBuilder::new() - .stack_size(8 * 1024 * 1024) - .thread_name(|_| "rayon_worker".to_string()) - .build_global() - .unwrap(); - match App::parse().run() { Ok(()) => ExitCode::SUCCESS, Err(err) => { diff --git a/cli/src/node/config.rs b/cli/src/node/config.rs index cb7abd60a..6b5fce27e 100644 --- a/cli/src/node/config.rs +++ b/cli/src/node/config.rs @@ -65,6 +65,8 @@ pub struct NodeConfig { pub rpc: Option, pub metrics: Option, + + pub threads: ThreadPoolConfig, } impl Default for NodeConfig { @@ -84,6 +86,7 @@ impl Default for NodeConfig { collator: CollationConfig::default(), rpc: Some(RpcConfig::default()), metrics: Some(MetricsConfig::default()), + threads: ThreadPoolConfig::default(), } } } @@ -116,3 +119,22 @@ impl Default for MetricsConfig { } } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct ThreadPoolConfig { + pub rayon_threads: usize, + pub tokio_workers: usize, +} + +impl Default for ThreadPoolConfig { + fn default() -> Self { + let total_threads = std::thread::available_parallelism() + .expect("failed to get total threads") + .get(); + Self { + rayon_threads: total_threads, + tokio_workers: total_threads, + } + } +} diff --git a/cli/src/node/mod.rs b/cli/src/node/mod.rs index 74c4e0ebd..17be65627 100644 --- a/cli/src/node/mod.rs +++ b/cli/src/node/mod.rs @@ -92,11 +92,28 @@ pub struct CmdRun { impl CmdRun { pub fn run(self) -> Result<()> { + if let Some(init_config_path) = self.init_config { + return NodeConfig::default() + .save_to_file(init_config_path) + .wrap_err("failed to save node config"); + } + + let node_config = NodeConfig::from_file(self.config.as_ref().unwrap()) + .wrap_err("failed to load node config")?; + + rayon::ThreadPoolBuilder::new() + .stack_size(8 * 1024 * 1024) + .thread_name(|_| "rayon_worker".to_string()) + .num_threads(node_config.threads.rayon_threads) + .build_global() + .unwrap(); + tokio::runtime::Builder::new_multi_thread() .enable_all() + .worker_threads(node_config.threads.tokio_workers) .build()? .block_on(async move { - let run_fut = tokio::spawn(self.run_impl()); + let run_fut = tokio::spawn(self.run_impl(node_config)); let stop_fut = signal::any_signal(signal::TERMINATION_SIGNALS); tokio::select! { res = run_fut => res.unwrap(), @@ -111,19 +128,10 @@ impl CmdRun { }) } - async fn run_impl(self) -> Result<()> { - if let Some(init_config_path) = self.init_config { - return NodeConfig::default() - .save_to_file(init_config_path) - .wrap_err("failed to save node config"); - } - + async fn run_impl(self, node_config: NodeConfig) -> Result<()> { init_logger(self.logger_config)?; let node = { - let node_config = NodeConfig::from_file(self.config.unwrap()) - .wrap_err("failed to load node config")?; - if let Some(metrics_config) = &node_config.metrics { init_metrics(metrics_config)?; } @@ -131,11 +139,11 @@ impl CmdRun { let global_config = GlobalConfig::from_file(self.global_config.unwrap()) .wrap_err("failed to load global config")?; - let keys = config::NodeKeys::from_file(&self.keys.unwrap()) - .wrap_err("failed to load node keys")?; + let keys = + NodeKeys::from_file(self.keys.unwrap()).wrap_err("failed to load node keys")?; let public_ip = resolve_public_ip(node_config.public_ip).await?; - let socket_addr = SocketAddr::new(public_ip.into(), node_config.port); + let socket_addr = SocketAddr::new(public_ip, node_config.port); Node::new(socket_addr, keys, node_config, global_config)? }; @@ -790,5 +798,5 @@ fn make_shard_state( file_hash, }; - ShardStateStuff::from_root(&block_id, root, &tracker) + ShardStateStuff::from_root(&block_id, root, tracker) } diff --git a/cli/src/tools/gen_zerostate.rs b/cli/src/tools/gen_zerostate.rs index e67182623..ed850e26f 100644 --- a/cli/src/tools/gen_zerostate.rs +++ b/cli/src/tools/gen_zerostate.rs @@ -285,7 +285,7 @@ impl ZerostateConfig { ); self.accounts.insert( minter_address, - build_minter_account(&public_key, &minter_address)?.into(), + build_minter_account(public_key, &minter_address)?.into(), ); } (None, Some(_)) => anyhow::bail!("minter_public_key is required"), @@ -652,7 +652,7 @@ fn update_config_account(accounts: &mut ShardAccounts, config: &BlockchainConfig anyhow::bail!("cannot set empty config account"); }; - let Some((depth_balance, mut shard_account)) = accounts.get(&config.address)? else { + let Some((depth_balance, mut shard_account)) = accounts.get(config.address)? else { anyhow::bail!("config account not found"); }; @@ -682,7 +682,7 @@ fn update_config_account(accounts: &mut ShardAccounts, config: &BlockchainConfig shard_account.account = Lazy::new(&OptionalAccount(Some(account)))?; // Update the account entry in the dict - accounts.set(&config.address, depth_balance, shard_account)?; + accounts.set(config.address, depth_balance, shard_account)?; // Done Ok(())