Skip to content

Commit

Permalink
feat(cli): add configurable thread pool settings
Browse files Browse the repository at this point in the history
- Minor refactoring and cleanup
  • Loading branch information
0xdeafbeef committed Jun 20, 2024
1 parent fce2bae commit 4bf3b4a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 24 deletions.
6 changes: 0 additions & 6 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ fn main() -> ExitCode {
std::env::set_var("RUST_BACKTRACE", "1");
}

rayon::ThreadPoolBuilder::new()
.stack_size(8 * 1024 * 1024)
.thread_name(|_| "rayon_worker".to_string())
.build_global()
.unwrap();

match App::parse().run() {
Ok(()) => ExitCode::SUCCESS,
Err(err) => {
Expand Down
22 changes: 22 additions & 0 deletions cli/src/node/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub struct NodeConfig {
pub rpc: Option<RpcConfig>,

pub metrics: Option<MetricsConfig>,

pub threads: ThreadPoolConfig,
}

impl Default for NodeConfig {
Expand All @@ -84,6 +86,7 @@ impl Default for NodeConfig {
collator: CollationConfig::default(),
rpc: Some(RpcConfig::default()),
metrics: Some(MetricsConfig::default()),
threads: ThreadPoolConfig::default(),
}
}
}
Expand Down Expand Up @@ -116,3 +119,22 @@ impl Default for MetricsConfig {
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ThreadPoolConfig {
pub rayon_threads: usize,
pub tokio_workers: usize,
}

impl Default for ThreadPoolConfig {
fn default() -> Self {
let total_threads = std::thread::available_parallelism()
.expect("failed to get total threads")
.get();
Self {
rayon_threads: total_threads,
tokio_workers: total_threads,
}
}
}
38 changes: 23 additions & 15 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,28 @@ pub struct CmdRun {

impl CmdRun {
pub fn run(self) -> Result<()> {
if let Some(init_config_path) = self.init_config {
return NodeConfig::default()
.save_to_file(init_config_path)
.wrap_err("failed to save node config");
}

let node_config = NodeConfig::from_file(self.config.as_ref().unwrap())
.wrap_err("failed to load node config")?;

rayon::ThreadPoolBuilder::new()
.stack_size(8 * 1024 * 1024)
.thread_name(|_| "rayon_worker".to_string())
.num_threads(node_config.threads.rayon_threads)
.build_global()
.unwrap();

tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(node_config.threads.tokio_workers)
.build()?
.block_on(async move {
let run_fut = tokio::spawn(self.run_impl());
let run_fut = tokio::spawn(self.run_impl(node_config));
let stop_fut = signal::any_signal(signal::TERMINATION_SIGNALS);
tokio::select! {
res = run_fut => res.unwrap(),
Expand All @@ -111,31 +128,22 @@ impl CmdRun {
})
}

async fn run_impl(self) -> Result<()> {
if let Some(init_config_path) = self.init_config {
return NodeConfig::default()
.save_to_file(init_config_path)
.wrap_err("failed to save node config");
}

async fn run_impl(self, node_config: NodeConfig) -> Result<()> {
init_logger(self.logger_config)?;

let node = {
let node_config = NodeConfig::from_file(self.config.unwrap())
.wrap_err("failed to load node config")?;

if let Some(metrics_config) = &node_config.metrics {
init_metrics(metrics_config)?;
}

let global_config = GlobalConfig::from_file(self.global_config.unwrap())
.wrap_err("failed to load global config")?;

let keys = config::NodeKeys::from_file(&self.keys.unwrap())
.wrap_err("failed to load node keys")?;
let keys =
NodeKeys::from_file(self.keys.unwrap()).wrap_err("failed to load node keys")?;

let public_ip = resolve_public_ip(node_config.public_ip).await?;
let socket_addr = SocketAddr::new(public_ip.into(), node_config.port);
let socket_addr = SocketAddr::new(public_ip, node_config.port);

Node::new(socket_addr, keys, node_config, global_config)?
};
Expand Down Expand Up @@ -790,5 +798,5 @@ fn make_shard_state(
file_hash,
};

ShardStateStuff::from_root(&block_id, root, &tracker)
ShardStateStuff::from_root(&block_id, root, tracker)
}
6 changes: 3 additions & 3 deletions cli/src/tools/gen_zerostate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl ZerostateConfig {
);
self.accounts.insert(
minter_address,
build_minter_account(&public_key, &minter_address)?.into(),
build_minter_account(public_key, &minter_address)?.into(),
);
}
(None, Some(_)) => anyhow::bail!("minter_public_key is required"),
Expand Down Expand Up @@ -652,7 +652,7 @@ fn update_config_account(accounts: &mut ShardAccounts, config: &BlockchainConfig
anyhow::bail!("cannot set empty config account");
};

let Some((depth_balance, mut shard_account)) = accounts.get(&config.address)? else {
let Some((depth_balance, mut shard_account)) = accounts.get(config.address)? else {
anyhow::bail!("config account not found");
};

Expand Down Expand Up @@ -682,7 +682,7 @@ fn update_config_account(accounts: &mut ShardAccounts, config: &BlockchainConfig
shard_account.account = Lazy::new(&OptionalAccount(Some(account)))?;

// Update the account entry in the dict
accounts.set(&config.address, depth_balance, shard_account)?;
accounts.set(config.address, depth_balance, shard_account)?;

// Done
Ok(())
Expand Down

0 comments on commit 4bf3b4a

Please sign in to comment.