From f5f3860db55245025e1684e8cc9e159b6f46d398 Mon Sep 17 00:00:00 2001 From: dkf Date: Mon, 26 Aug 2024 06:48:24 +0100 Subject: [PATCH] feat(watcher): add monitored events to be used by notifier (#1122) Description --- Creates the following events, to be used by the notifier subsequently: - `Exited`: manually cancelling of the child process, returning its status code - `Crashed`: the child process finished for some reason but was not cancelled manually - `InternalError`: the watcher received an error when attempting to check the child process - `Submitted`: the watcher has submitted a registration transaction for the node (tracks `tx_id` and `block`) - `WarnExpiration`: the watcher emits a warning if the current block is near the expiration block of a registration - `Running`: the child process is running as expected, do nothing Picks up the events from the backend and logs them in `monitoring.rs`. In addition, whenever it spawns a new child process it also saved its process id to a file `validator.pid`. This is updated automatically and can be read from when we want to check whether a node process already exists. Currently, the lifetime of a child is dependent on the parent process. How Has This Been Tested? --- Running the `tari_watcher` together with `tari_swarm_daemon`. It then displayed (through logs) the node being registered and then warned when it was near expiration (less than 100 blocks from the expiration block, defined as `registered_at_block + 1000`). --- Cargo.lock | 4 +- applications/tari_watcher/src/cli.rs | 20 +-- applications/tari_watcher/src/config.rs | 32 ++-- applications/tari_watcher/src/constants.rs | 17 ++ applications/tari_watcher/src/forker.rs | 70 -------- applications/tari_watcher/src/helpers.rs | 6 +- applications/tari_watcher/src/logger.rs | 76 +++++++++ applications/tari_watcher/src/main.rs | 110 ++---------- applications/tari_watcher/src/manager.rs | 70 +++++--- applications/tari_watcher/src/minotari.rs | 57 +++++-- applications/tari_watcher/src/monitoring.rs | 110 ++++++++++++ applications/tari_watcher/src/process.rs | 158 ++++++++++++++++++ applications/tari_watcher/src/registration.rs | 85 ++++++++++ 13 files changed, 581 insertions(+), 234 deletions(-) create mode 100644 applications/tari_watcher/src/constants.rs delete mode 100644 applications/tari_watcher/src/forker.rs create mode 100644 applications/tari_watcher/src/logger.rs create mode 100644 applications/tari_watcher/src/monitoring.rs create mode 100644 applications/tari_watcher/src/process.rs create mode 100644 applications/tari_watcher/src/registration.rs diff --git a/Cargo.lock b/Cargo.lock index 590bb748c..9138f2568 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4337,9 +4337,9 @@ checksum = "db13adb97ab515a3691f56e4dbab09283d0b86cb45abd991d8634a9d6f501760" [[package]] name = "libc" -version = "0.2.155" +version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" [[package]] name = "libgit2-sys" diff --git a/applications/tari_watcher/src/cli.rs b/applications/tari_watcher/src/cli.rs index 485be8cc1..04cc622b1 100644 --- a/applications/tari_watcher/src/cli.rs +++ b/applications/tari_watcher/src/cli.rs @@ -5,7 +5,10 @@ use std::path::PathBuf; use clap::Parser; -use crate::config::{Config, InstanceType}; +use crate::{ + config::{Config, InstanceType}, + constants::{DEFAULT_PROJECT_ROOT, DEFAULT_WATCHER_CONFIG_PATH}, +}; #[derive(Clone, Debug, Parser)] pub struct Cli { @@ -21,22 +24,15 @@ impl Cli { } pub fn get_config_path(&self) -> PathBuf { - let Some(ref base_dir) = self.common.base_dir else { - return self.common.config_path.clone(); - }; - if self.common.config_path.is_relative() { - base_dir.join(&self.common.config_path) - } else { - self.common.config_path.clone() - } + self.common.config_path.clone() } } #[derive(Debug, Clone, clap::Args)] pub struct CommonCli { - #[clap(short = 'b', long, parse(from_os_str))] - pub base_dir: Option, - #[clap(short = 'c', long, parse(from_os_str), default_value = "./data/watcher/config.toml")] + #[clap(short = 'b', long, parse(from_os_str), default_value = DEFAULT_PROJECT_ROOT)] + pub base_dir: PathBuf, + #[clap(short = 'c', long, parse(from_os_str), default_value = DEFAULT_WATCHER_CONFIG_PATH)] pub config_path: PathBuf, } diff --git a/applications/tari_watcher/src/config.rs b/applications/tari_watcher/src/config.rs index 7304680e7..05f5a0ef1 100644 --- a/applications/tari_watcher/src/config.rs +++ b/applications/tari_watcher/src/config.rs @@ -9,7 +9,15 @@ use std::{ use tokio::io::{self, AsyncWriteExt}; -use crate::Cli; +use crate::{ + constants::{ + DEFAULT_BASE_NODE_GRPC_ADDRESS, + DEFAULT_BASE_WALLET_GRPC_ADDRESS, + DEFAULT_MINOTARI_MINER_BINARY_PATH, + DEFAULT_VALIDATOR_NODE_BINARY_PATH, + }, + Cli, +}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct Config { @@ -126,12 +134,12 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { let executables = vec![ ExecutableConfig { instance_type: InstanceType::TariValidatorNode, - executable_path: Some("target/release/tari_validator_node".into()), + executable_path: Some(DEFAULT_VALIDATOR_NODE_BINARY_PATH.into()), env: vec![], }, ExecutableConfig { instance_type: InstanceType::MinoTariConsoleWallet, - executable_path: Some("target/release/minotari_wallet".into()), + executable_path: Some(DEFAULT_MINOTARI_MINER_BINARY_PATH.into()), env: vec![], }, ]; @@ -144,18 +152,7 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { .with_num_instances(1), ]; - let base_dir = cli - .common - .base_dir - .clone() - .or_else(|| { - cli.get_config_path() - .canonicalize() - .ok() - .and_then(|p| p.parent().map(|p| p.to_path_buf())) - }) - .unwrap_or_else(|| std::env::current_dir().unwrap()); - + let base_dir = cli.common.base_dir.clone(); let vn_registration_file = base_dir .join("data") .join("vn1") @@ -164,8 +161,9 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { Ok(Config { auto_register: true, - base_node_grpc_address: "".to_string(), - base_wallet_grpc_address: "".to_string(), + // must contain protocol and port + base_node_grpc_address: DEFAULT_BASE_NODE_GRPC_ADDRESS.to_string(), + base_wallet_grpc_address: DEFAULT_BASE_WALLET_GRPC_ADDRESS.to_string(), base_dir: base_dir.clone(), sidechain_id: None, vn_registration_file, diff --git a/applications/tari_watcher/src/constants.rs b/applications/tari_watcher/src/constants.rs new file mode 100644 index 000000000..dcce8a027 --- /dev/null +++ b/applications/tari_watcher/src/constants.rs @@ -0,0 +1,17 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use tokio::time::Duration; + +pub const CONSENSUS_CONSTANT_REGISTRATION_DURATION: u64 = 1000; // in blocks: 100 epochs * 10 blocks/epoch + +pub const DEFAULT_PROJECT_ROOT: &str = env!("CARGO_MANIFEST_DIR"); +pub const DEFAULT_WATCHER_CONFIG_PATH: &str = "data/watcher/config.toml"; +pub const DEFAULT_VALIDATOR_PID_PATH: &str = "data/watcher/validator.pid"; +pub const DEFAULT_VALIDATOR_NODE_BINARY_PATH: &str = "target/release/tari_validator_node"; +pub const DEFAULT_MINOTARI_MINER_BINARY_PATH: &str = "target/release/minotari_miner"; +pub const DEFAULT_BASE_NODE_GRPC_ADDRESS: &str = "http://127.0.0.1:12001"; // note: protocol +pub const DEFAULT_BASE_WALLET_GRPC_ADDRESS: &str = "http://127.0.0.1:12003"; // note: protocol + +pub const DEFAULT_PROCESS_MONITORING_INTERVAL: Duration = Duration::from_secs(20); // time to sleep before checking VN process status +pub const DEFAULT_THRESHOLD_WARN_EXPIRATION: u64 = 100; // warn at this many blocks before the registration expires diff --git a/applications/tari_watcher/src/forker.rs b/applications/tari_watcher/src/forker.rs deleted file mode 100644 index 08e3cfea2..000000000 --- a/applications/tari_watcher/src/forker.rs +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2024 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause - -use std::{env, net::IpAddr, path::Path, process::Stdio}; - -use tokio::process::{Child, Command}; - -use crate::config::{ExecutableConfig, InstanceType}; - -pub struct Forker { - // The Tari L2 validator instance - validator: Option, - // Child process of the forked validator instance. - // Includes PID and a handle to the process. - child: Option, -} - -impl Forker { - pub fn new() -> Self { - Self { - validator: None, - child: None, - } - } - - pub async fn start_validator(&mut self, config: ExecutableConfig) -> anyhow::Result<()> { - let instance = Instance::new(InstanceType::TariValidatorNode, config.clone()); - self.validator = Some(instance.clone()); - - let mut cmd = Command::new( - config - .executable_path - .unwrap_or_else(|| Path::new("tari_validator_node").to_path_buf()), - ); - - // TODO: stdout logs - // let process_dir = self.base_dir.join("processes").join("TariValidatorNode"); - // let stdout_log_path = process_dir.join("stdout.log"); - // let stderr_log_path = process_dir.join("stderr.log"); - cmd.envs(env::vars()) - //.arg(format!("--config={validator_node_config_path}")) - .kill_on_drop(true) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .stdin(Stdio::null()); - - let child = cmd.spawn()?; - self.child = Some(child); - - Ok(()) - } -} - -#[allow(dead_code)] -#[derive(Clone)] -struct Instance { - app: InstanceType, - config: ExecutableConfig, - listen_ip: Option, -} - -impl Instance { - pub fn new(app: InstanceType, config: ExecutableConfig) -> Self { - Self { - app, - config, - listen_ip: None, - } - } -} diff --git a/applications/tari_watcher/src/helpers.rs b/applications/tari_watcher/src/helpers.rs index a60a39732..93b33b3ad 100644 --- a/applications/tari_watcher/src/helpers.rs +++ b/applications/tari_watcher/src/helpers.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; -use minotari_app_grpc::tari_rpc::{GetActiveValidatorNodesResponse, TipInfoResponse}; +use minotari_app_grpc::tari_rpc::GetActiveValidatorNodesResponse; use tari_common_types::types::PublicKey; use tari_core::transactions::transaction_components::ValidatorNodeSignature; use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::ByteArray}; @@ -48,10 +48,6 @@ pub fn to_vn_public_keys(vns: Vec) -> Vec u64 { - tip_info.metadata.unwrap().best_block_height -} - pub fn contains_key(vns: Vec, needle: PublicKey) -> bool { vns.iter().any(|vn| vn.eq(&needle)) } diff --git a/applications/tari_watcher/src/logger.rs b/applications/tari_watcher/src/logger.rs new file mode 100644 index 000000000..db4044998 --- /dev/null +++ b/applications/tari_watcher/src/logger.rs @@ -0,0 +1,76 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use fern::FormatCallback; + +pub fn init_logger() -> Result<(), log::SetLoggerError> { + fn should_skip(target: &str) -> bool { + const SKIP: [&str; 3] = ["hyper::", "h2::", "tower::"]; + if SKIP.iter().any(|s| target.starts_with(s)) { + return true; + } + + false + } + + let colors = fern::colors::ColoredLevelConfig::new() + .info(fern::colors::Color::Green) + .debug(fern::colors::Color::Yellow) + .error(fern::colors::Color::Red); + fern::Dispatch::new() + .format(move |out, message, record| { + if should_skip(record.target()) { + return; + } + + let fallback = |out: FormatCallback<'_>| out.finish(format_args!( + "[{} {} {}] {}", + humantime::format_rfc3339(std::time::SystemTime::now()), + record.metadata().target(), + colors.color(record.level()), + message + )); + + // Example: [Validator node-#1] 12:55 INFO Received vote for block #NodeHeight(88) d9abc7b1bb66fd912848f5bc4e5a69376571237e3243dc7f6a91db02bb5cf37c from a08cf5038e8e3cda8e3716c79f769cd42fad05f7110628efb5be6a40e28bc94c (4 of 3) + // Implement a naive parsing of the log message to extract the target, level and the log message from each running process + let message_str = message.to_string(); + let Some((target, rest)) = message_str.split_once( ']') else { + fallback(out); + return; + }; + + let mut parts = rest.trim().splitn(3, ' '); + + // Skip the time + if parts.next().is_none() { + fallback(out); + return; + } + + let Some(level) = parts.next() + .and_then(|s| s.parse().ok()) + .map(|l| colors.color(l)) else { + fallback(out); + return; + }; + + let Some(log) = parts.next() else { + fallback(out); + return; + }; + + out.finish(format_args!( + "{} {} {}] {} {}", + humantime::format_rfc3339(std::time::SystemTime::now()), + record.metadata().target(), + target, + level, + log + )) + }) + .filter(|record_metadata| record_metadata.target().starts_with("tari_watcher")) // skip tokio frame prints + .level(log::LevelFilter::Debug) + .chain(std::io::stdout()) + // .chain(fern::log_file("output.log").unwrap()) + .apply() +} diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index b79a33a1a..95db2292d 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -1,32 +1,30 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::time::SystemTime; - use anyhow::{anyhow, bail, Context}; -use helpers::read_registration_file; -use log::*; +use registration::registration_loop; use tari_shutdown::{Shutdown, ShutdownSignal}; -use tokio::{ - fs, - task, - time::{self, Duration}, -}; +use tokio::{fs, task}; use crate::{ cli::{Cli, Commands}, config::{get_base_config, Config}, - helpers::{contains_key, read_config_file, to_block_height, to_vn_public_keys}, + helpers::read_config_file, + logger::init_logger, manager::{ManagerHandle, ProcessManager}, shutdown::exit_signal, }; mod cli; mod config; -mod forker; +mod constants; mod helpers; +mod logger; mod manager; mod minotari; +mod monitoring; +mod process; +mod registration; mod shutdown; #[tokio::main] @@ -34,7 +32,7 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::init(); let config_path = cli.get_config_path(); - setup_logger()?; + init_logger()?; match cli.command { Commands::Init(ref args) => { @@ -72,22 +70,10 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -async fn start(config: Config) -> anyhow::Result { +async fn start(config: Config) -> anyhow::Result<()> { let shutdown = Shutdown::new(); let signal = shutdown.to_signal().select(exit_signal()?); - let (task_handle, mut manager_handle) = spawn(config.clone(), shutdown.to_signal()).await; - - let mut interval = time::interval(Duration::from_secs(10)); - let constants = manager_handle.get_consensus_constants(0).await; - let validity_period = constants.as_ref().unwrap().validator_node_validity_period; - let epoch_length = constants.unwrap().epoch_length; - debug!("Registrations are currently valid for {} epochs", validity_period); - debug!("Every epoch has {} blocks", epoch_length); - let registration_valid_for = validity_period * epoch_length; - let mut registered_at_block = 0; - let local_node = read_registration_file(config.vn_registration_file).await?; - let local_key = local_node.public_key; // 76fd45c0816f7bd78d33e1b9358a48e8c68b97bfd20d9c80f3934afbde848343 - debug!("Local public key: {}", local_key.clone()); + let (task_handle, manager_handle) = spawn(config.clone(), shutdown.to_signal()).await; tokio::select! { _ = signal => { @@ -98,54 +84,11 @@ async fn start(config: Config) -> anyhow::Result { log::info!("Process manager exited"); }, _ = async { - loop { - interval.tick().await; - - let tip_info = manager_handle.get_tip_info().await; - if let Err(e) = tip_info { - error!("Failed to get tip info: {}", e); - continue; - } - let curr_height = to_block_height(tip_info.unwrap()); - debug!("Current block height: {}", curr_height); - - let vn_status = manager_handle.get_active_validator_nodes().await; - if let Err(e) = vn_status { - error!("Failed to get active validators: {}", e); - continue; - } - let active_keys = to_vn_public_keys(vn_status.unwrap()); - info!("Amount of active validator node keys: {}", active_keys.len()); - for key in &active_keys { - info!("{}", key); - } - - // if the node is already registered and still valid, skip registration - if contains_key(active_keys.clone(), local_key.clone()) { - info!("Local node is active and still before expiration, skipping registration"); - continue; - } - - // need to be more refined but proves the concept - if curr_height < registered_at_block + registration_valid_for { - info!("Local node still within registration validity period, skipping registration"); - continue; - } - - info!("Local node not active, attempting to register.."); - let tx = manager_handle.register_validator_node().await.unwrap(); - if !tx.is_success { - error!("Failed to register node: {}", tx.failure_message); - continue; - } - info!("Registered node at height {} with transaction id: {}", curr_height, tx.transaction_id); - registered_at_block = curr_height; - - } + drop(registration_loop(config, manager_handle).await); } => {}, } - Ok(manager_handle) + Ok(()) } async fn spawn(config: Config, shutdown: ShutdownSignal) -> (task::JoinHandle>, ManagerHandle) { @@ -153,28 +96,3 @@ async fn spawn(config: Config, shutdown: ShutdownSignal) -> (task::JoinHandle Result<(), fern::InitError> { - let colors = fern::colors::ColoredLevelConfig::new() - .info(fern::colors::Color::Green) - .debug(fern::colors::Color::Cyan) - .warn(fern::colors::Color::Yellow) - .error(fern::colors::Color::Red); - - fern::Dispatch::new() - .format(move |out, message, record| { - out.finish(format_args!( - "[{} {} {}] {}", - humantime::format_rfc3339_seconds(SystemTime::now()), - colors.color(record.level()), - record.target(), - message - )) - }) - .level(log::LevelFilter::Debug) - .chain(std::io::stdout()) - .chain(fern::log_file("output.log")?) - .apply()?; - - Ok(()) -} diff --git a/applications/tari_watcher/src/manager.rs b/applications/tari_watcher/src/manager.rs index 7f384396a..533cbaaf2 100644 --- a/applications/tari_watcher/src/manager.rs +++ b/applications/tari_watcher/src/manager.rs @@ -1,27 +1,31 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use std::path::PathBuf; + use log::*; use minotari_app_grpc::tari_rpc::{ self as grpc, ConsensusConstants, GetActiveValidatorNodesResponse, RegisterValidatorNodeResponse, - TipInfoResponse, }; use tari_shutdown::ShutdownSignal; use tokio::sync::{mpsc, oneshot}; use crate::{ config::{Config, ExecutableConfig}, - forker::Forker, - minotari::Minotari, + constants::DEFAULT_VALIDATOR_NODE_BINARY_PATH, + minotari::{Minotari, TipStatus}, + monitoring::{read_status, ProcessStatus, Transaction}, + process::Process, }; pub struct ProcessManager { + pub base_dir: PathBuf, pub validator_config: ExecutableConfig, pub wallet_config: ExecutableConfig, - pub forker: Forker, + pub process: Process, pub shutdown_signal: ShutdownSignal, pub rx_request: mpsc::Receiver, pub chain: Minotari, @@ -31,9 +35,10 @@ impl ProcessManager { pub fn new(config: Config, shutdown_signal: ShutdownSignal) -> (Self, ManagerHandle) { let (tx_request, rx_request) = mpsc::channel(1); let this = Self { + base_dir: config.base_dir.clone(), validator_config: config.executable_config[0].clone(), wallet_config: config.executable_config[1].clone(), - forker: Forker::new(), + process: Process::new(), shutdown_signal, rx_request, chain: Minotari::new( @@ -48,27 +53,56 @@ impl ProcessManager { pub async fn start(mut self) -> anyhow::Result<()> { info!("Starting validator node process"); - self.forker.start_validator(self.validator_config.clone()).await?; + // clean_stale_pid_file(self.base_dir.clone().join(DEFAULT_VALIDATOR_PID_PATH)).await?; + + let cc = self + .process + .start_validator( + self.validator_config + .clone() + .executable_path + .unwrap_or(PathBuf::from(DEFAULT_VALIDATOR_NODE_BINARY_PATH)), + self.base_dir, + ) + .await; + if cc.is_none() { + todo!("Create new validator node process event listener for fetched existing PID from OS"); + } + let cc = cc.unwrap(); + tokio::spawn(async move { + read_status(cc.rx).await; + }); + self.chain.bootstrap().await?; + info!("Setup completed: connected to base node and wallet, ready to receive requests"); loop { tokio::select! { Some(req) = self.rx_request.recv() => { match req { ManagerRequest::GetTipInfo { reply } => { let response = self.chain.get_tip_status().await?; + // send latest block height to monitoring, to potentially warn of upcoming node expiration + if let Err(e) = cc.tx.send(ProcessStatus::WarnExpiration(response.height())).await { + error!("Failed to send tip status update to monitoring: {}", e); + } drop(reply.send(Ok(response))); } ManagerRequest::GetActiveValidatorNodes { reply } => { let response = self.chain.get_active_validator_nodes().await?; drop(reply.send(Ok(response))); } - ManagerRequest::RegisterValidatorNode { reply } => { + ManagerRequest::RegisterValidatorNode { block, reply } => { let response = self.chain.register_validator_node().await?; + // send response to monitoring + if let Err(e) = cc.tx.send(ProcessStatus::Submitted(Transaction::new(response.clone(), block))).await { + error!("Failed to send node registration update to monitoring: {}", e); + } + // send response to backend drop(reply.send(Ok(response))); }, - ManagerRequest::GetConsensusConstants { reply, block_height } => { - let response = self.chain.get_consensus_constants(block_height).await?; + ManagerRequest::GetConsensusConstants { block, reply } => { + let response = self.chain.get_consensus_constants(block).await?; drop(reply.send(Ok(response))); } } @@ -89,16 +123,17 @@ type Reply = oneshot::Sender>; pub enum ManagerRequest { GetTipInfo { - reply: Reply, + reply: Reply, }, GetActiveValidatorNodes { reply: Reply>, }, GetConsensusConstants { - block_height: u64, + block: u64, reply: Reply, }, RegisterValidatorNode { + block: u64, reply: Reply, }, } @@ -120,26 +155,23 @@ impl ManagerHandle { rx.await? } - pub async fn get_consensus_constants(&mut self, block_height: u64) -> anyhow::Result { + pub async fn get_consensus_constants(&mut self, block: u64) -> anyhow::Result { let (tx, rx) = oneshot::channel(); self.tx_request - .send(ManagerRequest::GetConsensusConstants { - block_height, - reply: tx, - }) + .send(ManagerRequest::GetConsensusConstants { block, reply: tx }) .await?; rx.await? } - pub async fn register_validator_node(&mut self) -> anyhow::Result { + pub async fn register_validator_node(&mut self, block: u64) -> anyhow::Result { let (tx, rx) = oneshot::channel(); self.tx_request - .send(ManagerRequest::RegisterValidatorNode { reply: tx }) + .send(ManagerRequest::RegisterValidatorNode { block, reply: tx }) .await?; rx.await? } - pub async fn get_tip_info(&mut self) -> anyhow::Result { + pub async fn get_tip_info(&mut self) -> anyhow::Result { let (tx, rx) = oneshot::channel(); self.tx_request.send(ManagerRequest::GetTipInfo { reply: tx }).await?; rx.await? diff --git a/applications/tari_watcher/src/minotari.rs b/applications/tari_watcher/src/minotari.rs index e984dae2e..5a8a5af12 100644 --- a/applications/tari_watcher/src/minotari.rs +++ b/applications/tari_watcher/src/minotari.rs @@ -3,20 +3,17 @@ use std::path::PathBuf; -use anyhow::bail; -use minotari_app_grpc::tari_rpc::{ - self as grpc, - GetActiveValidatorNodesResponse, - RegisterValidatorNodeResponse, - TipInfoResponse, -}; +use anyhow::{anyhow, bail}; +use log::*; +use minotari_app_grpc::tari_rpc::{self as grpc, GetActiveValidatorNodesResponse, RegisterValidatorNodeResponse}; use minotari_node_grpc_client::BaseNodeGrpcClient; use minotari_wallet_grpc_client::WalletGrpcClient; use tari_common::exit_codes::{ExitCode, ExitError}; +use tari_common_types::types::FixedHash; use tari_crypto::tari_utilities::ByteArray; use tonic::transport::Channel; -use crate::helpers::{read_registration_file, to_block_height}; +use crate::helpers::read_registration_file; #[derive(Clone)] pub struct Minotari { @@ -24,10 +21,27 @@ pub struct Minotari { node_grpc_address: String, wallet_grpc_address: String, node_registration_file: PathBuf, + current_height: u64, node: Option>, wallet: Option>, } +#[derive(Debug, Clone)] +pub struct TipStatus { + block_height: u64, + block_hash: FixedHash, +} + +impl TipStatus { + pub fn hash(&self) -> FixedHash { + self.block_hash + } + + pub fn height(&self) -> u64 { + self.block_height + } +} + impl Minotari { pub fn new(node_grpc_address: String, wallet_grpc_address: String, node_registration_file: PathBuf) -> Self { Self { @@ -35,6 +49,7 @@ impl Minotari { node_grpc_address, wallet_grpc_address, node_registration_file, + current_height: 0, node: None, wallet: None, } @@ -70,18 +85,31 @@ impl Minotari { Ok(()) } - pub async fn get_tip_status(&self) -> anyhow::Result { + pub async fn get_tip_status(&mut self) -> anyhow::Result { if !self.bootstrapped { bail!("Node client not connected"); } - Ok(self + log::debug!("Requesting tip status from base node"); + + let inner = self .node .clone() .unwrap() .get_tip_info(grpc::Empty {}) .await? - .into_inner()) + .into_inner(); + + let metadata = inner + .metadata + .ok_or_else(|| anyhow!("Base node returned no metadata".to_string()))?; + + self.current_height = metadata.best_block_height; + + Ok(TipStatus { + block_height: metadata.best_block_height, + block_hash: metadata.best_block_hash.try_into().map_err(|_| anyhow!("error"))?, + }) } pub async fn get_active_validator_nodes(&self) -> anyhow::Result> { @@ -89,8 +117,7 @@ impl Minotari { bail!("Node client not connected"); } - let tip_info = self.get_tip_status().await?; - let height = to_block_height(tip_info); + let height = self.current_height; let mut stream = self .node .clone() @@ -129,6 +156,8 @@ impl Minotari { bail!("Node client not connected"); } + info!("Preparing to send a node registration request"); + let info = read_registration_file(self.node_registration_file.clone()).await?; let sig = info.signature.signature(); let resp = self @@ -152,6 +181,8 @@ impl Minotari { bail!("Failed to register validator node: {}", resp.failure_message); } + info!("Node registration request sent successfully"); + Ok(resp) } diff --git a/applications/tari_watcher/src/monitoring.rs b/applications/tari_watcher/src/monitoring.rs new file mode 100644 index 000000000..584fb7533 --- /dev/null +++ b/applications/tari_watcher/src/monitoring.rs @@ -0,0 +1,110 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use anyhow::Error; +use log::*; +use minotari_app_grpc::tari_rpc::RegisterValidatorNodeResponse; +use tokio::{process::Child, sync::mpsc, time::sleep}; + +use crate::constants::{ + CONSENSUS_CONSTANT_REGISTRATION_DURATION, + DEFAULT_PROCESS_MONITORING_INTERVAL, + DEFAULT_THRESHOLD_WARN_EXPIRATION, +}; + +#[derive(Debug)] +pub struct Transaction { + id: u64, + block: u64, +} + +impl Transaction { + pub fn new(response: RegisterValidatorNodeResponse, block: u64) -> Self { + Self { + id: response.transaction_id, + block, + } + } +} + +#[derive(Debug)] +pub enum ProcessStatus { + Running, + Exited(i32), // status code + Crashed, + InternalError(Error), + Submitted(Transaction), + WarnExpiration(u64), +} + +pub async fn monitor_child(mut child: Child, tx: mpsc::Sender) { + loop { + sleep(DEFAULT_PROCESS_MONITORING_INTERVAL).await; + + // if the child process encountered an unexpected error, not related to the process itself + if child.try_wait().is_err() { + let err = child.try_wait().err().unwrap(); + tx.send(ProcessStatus::InternalError(err.into())) + .await + .expect("Failed to send internal error status"); + break; + } + // process has finished, intentional or not, if it has some status + if let Some(status) = child.try_wait().expect("Failed to poll child process") { + if !status.success() { + tx.send(ProcessStatus::Crashed).await.expect("Failed to send status"); + break; + } + tx.send(ProcessStatus::Exited(status.code().unwrap_or(0))) + .await + .expect("Failed to send process exit status"); + break; + } + // process is still running + tx.send(ProcessStatus::Running) + .await + .expect("Failed to send process running status"); + } +} + +pub async fn read_status(mut rx: mpsc::Receiver) { + let mut last_registered_at_block = 0; + while let Some(status) = rx.recv().await { + match status { + ProcessStatus::Exited(code) => { + error!("Validator node process exited with code {}", code); + break; + }, + ProcessStatus::InternalError(err) => { + error!("Validator node process exited with error: {}", err); + break; + }, + ProcessStatus::Crashed => { + error!("Validator node process crashed"); + break; + }, + ProcessStatus::Running => { + // all good, process is still running + }, + ProcessStatus::Submitted(tx) => { + info!( + "Validator node registration submitted (tx: {}, block: {})", + tx.id, tx.block + ); + last_registered_at_block = tx.block; + }, + ProcessStatus::WarnExpiration(block) => { + if last_registered_at_block != 0 && + block + DEFAULT_THRESHOLD_WARN_EXPIRATION >= + last_registered_at_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION + { + warn!( + "Validator node registration expires at block {} ({} blocks remaining)", + last_registered_at_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION, + last_registered_at_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION - block + ); + } + }, + } + } +} diff --git a/applications/tari_watcher/src/process.rs b/applications/tari_watcher/src/process.rs new file mode 100644 index 000000000..c85b69447 --- /dev/null +++ b/applications/tari_watcher/src/process.rs @@ -0,0 +1,158 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use std::{path::PathBuf, process::Stdio}; + +use anyhow::bail; +use log::*; +use tokio::{ + fs::{self, OpenOptions}, + io::AsyncWriteExt, + process::Command as TokioCommand, + sync::mpsc::{self}, +}; + +use crate::{ + constants::DEFAULT_VALIDATOR_PID_PATH, + monitoring::{monitor_child, ProcessStatus}, +}; + +#[allow(unused)] +pub async fn clean_stale_pid_file(pid_file_path: PathBuf) -> anyhow::Result<()> { + log::info!("Checking for stale PID file at {}", pid_file_path.display()); + if !pid_file_path.exists() { + info!("PID file for validator does not exist, create new one"); + return Ok(()); + } + + if let Ok(pid_str) = fs::read_to_string(&pid_file_path).await { + if let Ok(pid) = pid_str.trim().parse::() { + // check if still running + let status = TokioCommand::new("kill").arg("-0").arg(pid.to_string()).status().await; + if status.map(|s| !s.success()).unwrap_or(true) { + log::info!("Removing stale PID file"); + fs::remove_file(&pid_file_path).await?; + return Ok(()); + } + + log::info!("Process with PID {} is still running", pid); + bail!("PID file is locked by an active process"); + } + } + + Ok(()) +} + +async fn create_pid_file(path: PathBuf) -> anyhow::Result<()> { + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&path) + .await?; + + file.write_all(std::process::id().to_string().as_bytes()).await?; + + Ok(()) +} + +pub struct ChildChannel { + pub pid: u32, + pub rx: mpsc::Receiver, + pub tx: mpsc::Sender, +} + +pub async fn spawn_validator_node_os(validator_node_path: PathBuf, base_dir: PathBuf) -> anyhow::Result { + let node_binary_path = base_dir.join(validator_node_path.clone()); + debug!("Using validator node binary at {}", node_binary_path.display()); + + let child = TokioCommand::new(node_binary_path.clone().into_os_string()) + .arg("-b") + .arg("data/vn1") + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .kill_on_drop(false) + .spawn()?; + + let pid = child.id().expect("Failed to get PID for child process"); + info!("Spawned validator child process with id {}", pid); + + if let Err(e) = create_pid_file(PathBuf::from(DEFAULT_VALIDATOR_PID_PATH)).await { + log::error!("Failed to create PID file when spawning node: {}", e); + } + + let path = base_dir.join(DEFAULT_VALIDATOR_PID_PATH); + debug!( + "Spawning validator node with process persisted at file: {}", + path.display() + ); + + create_pid_file(path.clone()).await?; + + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .await?; + file.write_all(pid.to_string().as_bytes()).await?; + + let (tx, rx) = mpsc::channel(16); + tokio::spawn(monitor_child(child, tx.clone())); + + Ok(ChildChannel { pid, rx, tx }) +} + +async fn check_existing_node_os(base_dir: PathBuf) -> Option { + let process_dir = base_dir.join("processes"); + if !process_dir.exists() { + debug!("Validator node process directory does not exist"); + return None; + } + + if let Ok(pid_str) = fs::read_to_string(DEFAULT_VALIDATOR_PID_PATH).await { + debug!("Found PID file: {}", pid_str); + + if let Ok(pid) = pid_str.trim().parse::() { + if (TokioCommand::new("ps").arg("-p").arg(pid.to_string()).status().await).is_ok() { + info!("Founding existing running validator process with PID: {}", pid); + return Some(pid); + } + error!("Failed to find process with PID: {}", pid); + } else { + error!("Unable to parse PID file to number, this should not happen"); + } + } + + None +} + +pub struct Process { + // Child process ID of the forked validator instance. + pid: Option, +} + +impl Process { + pub fn new() -> Self { + Self { pid: None } + } + + pub async fn start_validator(&mut self, validator_path: PathBuf, base_dir: PathBuf) -> Option { + let opt = check_existing_node_os(base_dir.clone()).await; + if let Some(pid) = opt { + info!("Picking up existing validator node process with id: {}", pid); + + self.pid = Some(pid); + // todo: create new process status channel for picked up process + return None; + } else { + debug!("No existing validator node process found, spawn new one"); + } + + let cc = spawn_validator_node_os(validator_path, base_dir).await.ok()?; + self.pid = Some(cc.pid); + + Some(cc) + } +} diff --git a/applications/tari_watcher/src/registration.rs b/applications/tari_watcher/src/registration.rs new file mode 100644 index 000000000..6cb3d09ae --- /dev/null +++ b/applications/tari_watcher/src/registration.rs @@ -0,0 +1,85 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use log::*; +use tari_common_types::types::FixedHash; +use tokio::time::{self, Duration}; + +use crate::{ + config::Config, + helpers::{contains_key, read_registration_file, to_vn_public_keys}, + manager::ManagerHandle, +}; + +// TODO: make configurable +// Amount of time to wait before the watcher runs a check again +const REGISTRATION_LOOP_INTERVAL: Duration = Duration::from_secs(30); + +// `registration_loop` periodically checks that the local node is still registered on the network. +// If it is no longer registered, it will attempt to re-register. It will do nothing if it is registered already. +// Currently, it will not keep track of when the registration was sent or register just in time before it expires. +// It is possible to add a threshold such as sending a registration request every (e.g.) 500 blocks to make sure it it +// always registered. +pub async fn registration_loop(config: Config, mut manager_handle: ManagerHandle) -> anyhow::Result { + let mut interval = time::interval(REGISTRATION_LOOP_INTERVAL); + let constants = manager_handle.get_consensus_constants(0).await?; + let total_blocks_duration = constants.validator_node_validity_period * constants.epoch_length; + info!( + "Registrations are currently valid for {} blocks ({} epochs)", + total_blocks_duration, constants.validator_node_validity_period + ); + let local_node = read_registration_file(config.vn_registration_file).await?; + let local_key = local_node.public_key; + debug!("Local public key: {}", local_key.clone()); + let mut last_block_hash: Option = None; + + loop { + interval.tick().await; + + let tip_info = manager_handle.get_tip_info().await; + if let Err(e) = tip_info { + error!("Failed to get tip info: {}", e); + continue; + } + let curr_height = tip_info.as_ref().unwrap().height(); + if last_block_hash.is_none() || last_block_hash.unwrap() != tip_info.as_ref().unwrap().hash() { + last_block_hash = Some(tip_info.unwrap().hash()); + debug!("New block hash at height {}: {}", curr_height, last_block_hash.unwrap()); + } else { + debug!("Same block as previous tick"); + } + + let vn_status = manager_handle.get_active_validator_nodes().await; + if let Err(e) = vn_status { + error!("Failed to get active validators: {}", e); + continue; + } + let active_keys = to_vn_public_keys(vn_status.unwrap()); + info!("Amount of active validator node keys: {}", active_keys.len()); + for key in &active_keys { + info!("{}", key); + } + + // if the node is already registered and still valid, skip registration + if contains_key(active_keys.clone(), local_key.clone()) { + info!("Node has an active registration, skip"); + continue; + } + + info!("Local node not active or about to expire, attempting to register.."); + let tx = manager_handle.register_validator_node(curr_height).await; + if let Err(e) = tx { + error!("Failed to register node: {}", e); + continue; + } + let tx = tx.unwrap(); + if !tx.is_success { + error!("Failed to register node: {}", tx.failure_message); + continue; + } + info!( + "Registered node at block {} with transaction id: {}", + curr_height, tx.transaction_id + ); + } +}