diff --git a/Cargo.lock b/Cargo.lock index df1c430ba..3dc29255d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3651,6 +3651,20 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http", + "hyper", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-timeout" version = "0.4.1" @@ -7381,6 +7395,7 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-rustls", "hyper-tls", "ipnet", "js-sys", @@ -7391,6 +7406,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "rustls 0.21.12", "rustls-pemfile 1.0.4", "serde", "serde_json", @@ -7399,11 +7415,13 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-rustls 0.24.1", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots", "winreg", ] @@ -10167,7 +10185,9 @@ dependencies = [ "minotari_app_grpc", "minotari_node_grpc_client", "minotari_wallet_grpc_client", + "reqwest", "serde", + "serde_json", "tari_common", "tari_common_types", "tari_core", @@ -10437,6 +10457,16 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.15" @@ -10594,7 +10624,7 @@ dependencies = [ "rustls-native-certs", "rustls-pemfile 1.0.4", "tokio", - "tokio-rustls", + "tokio-rustls 0.23.4", "tokio-stream", "tokio-util 0.7.11", "tower", @@ -10862,7 +10892,7 @@ dependencies = [ "thiserror", "tinyvec", "tokio", - "tokio-rustls", + "tokio-rustls 0.23.4", "url", "webpki", ] @@ -11640,6 +11670,12 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "webrtc" version = "0.9.0" diff --git a/applications/tari_watcher/Cargo.toml b/applications/tari_watcher/Cargo.toml index 0e6c035cc..441ee01c8 100644 --- a/applications/tari_watcher/Cargo.toml +++ b/applications/tari_watcher/Cargo.toml @@ -34,6 +34,8 @@ log = { workspace = true } fern = { workspace = true, features = ["colored"] } tonic = { workspace = true } json5 = { workspace = true } +reqwest = { workspace = true, features = ["json", "blocking", "rustls-tls"] } +serde_json = { workspace = true } toml = "0.8.12" humantime = "2.1.0" diff --git a/applications/tari_watcher/README.md b/applications/tari_watcher/README.md new file mode 100644 index 000000000..667f6a799 --- /dev/null +++ b/applications/tari_watcher/README.md @@ -0,0 +1,43 @@ +# Tari Watcher + +**Features**: +* Registers the validator node on L2 by sending a transaction on L1 +* Monitors the chain and warns when registration is near expiration +* Automatically re-registers the node +* Alerts on Mattermost and Telegram + +### Quickstart + +Initialize the project with `tari_watcher init` and start it with `tari_watcher run`. Edit the newly generated `config.toml` to enable notifications on channels such as Mattermost and Telegram.Make sure to have started up `tari_validator_node` once previously to have a node directory set up, default is `tari_validator_node -- -b data/vn1`. + +### Setup + +The default values used (see `constants.rs`) when running the project without any flags: +``` +- DEFAULT_MAIN_PROJECT_PATH: base directory, the same level as the repository `tari-dan` +- DEFAULT_WATCHER_CONFIG_PATH: relative to the base directory, main configuration file +- DEFAULT_VALIDATOR_KEY_PATH: relative to the base directory, validator node registration file +- DEFAULT_VALIDATOR_NODE_BINARY_PATH: relative to the base directory, default is Rust build directory `target/release` +- DEFAULT_VALIDATOR_DIR: relative to the project base directory, home directory for everything validator node +- DEFAULT_MINOTARI_MINER_BINARY_PATH: relative to the base directory, default is Rust build directory `target/release` +- DEFAULT_BASE_NODE_GRPC_ADDRESS: default is Tari swarm localhost and port +- DEFAULT_BASE_WALLET_GRPC_ADDRESS: default is Tari swarm localhost and port +``` + +### Project + +``` +├── alerting.rs # channel notifier implementations +├── cli.rs # cli and flags passed during bootup +├── config.rs # main config file creation +├── constants.rs # various constants used as default values +├── helpers.rs # common helper functions +├── logger.rs +├── main.rs +├── manager.rs # manages the spawn validator node process and receives requests +├── minotari.rs # communicates with the base node (L1) +├── monitoring.rs # outputs logs and sends the alerts +├── process.rs # spawns the validator node process and sets up the channels +├── registration.rs # handles the logic for sending a node registration transaction +└── shutdown.rs +``` diff --git a/applications/tari_watcher/src/alerting.rs b/applications/tari_watcher/src/alerting.rs new file mode 100644 index 000000000..89c00f3f4 --- /dev/null +++ b/applications/tari_watcher/src/alerting.rs @@ -0,0 +1,97 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use anyhow::{bail, Result}; +use reqwest::StatusCode; +use serde_json::json; + +pub trait Alerting { + fn new(url: String, channel_id: String, credentials: String) -> Self; + + // Sends an alert message to the service + async fn alert(&mut self, message: &str) -> Result<()>; + + // Checks that the service is reachable + async fn ping(&self) -> Result<()>; + + // Statistics on the alerts sent + // todo: expand granularity and types of stats + fn stats(&self) -> Result; +} + +pub struct MatterMostNotifier { + // Mattermost server URL + server_url: String, + // Mattermost channel ID used for alerts + channel_id: String, + // User token (retrieved after login) + credentials: String, + // Alerts sent since last reset + alerts_sent: u64, + // HTTP client + client: reqwest::Client, +} + +impl Alerting for MatterMostNotifier { + fn new(server_url: String, channel_id: String, credentials: String) -> Self { + Self { + server_url, + channel_id, + credentials, + alerts_sent: 0, + client: reqwest::Client::new(), + } + } + + async fn alert(&mut self, message: &str) -> Result<()> { + const LOGIN_ENDPOINT: &str = "/api/v4/posts"; + let url = format!("{}{}", self.server_url, LOGIN_ENDPOINT); + let req = json!({ + "channel_id": self.channel_id, + "message": message, + }); + let resp = self + .client + .post(&url) + .json(&req) + .header("Authorization", format!("Bearer {}", self.credentials)) + .send() + .await?; + + if resp.status() != StatusCode::CREATED { + bail!("Failed to send alert, got response: {}", resp.status()); + } + + self.alerts_sent += 1; + + Ok(()) + } + + async fn ping(&self) -> Result<()> { + const PING_ENDPOINT: &str = "/api/v4/users/me"; + if self.server_url.is_empty() { + bail!("Server URL is empty"); + } + if self.credentials.is_empty() { + bail!("Credentials are empty"); + } + + let url = format!("{}{}", self.server_url, PING_ENDPOINT); + let resp = self + .client + .get(url.clone()) + .header("Authorization", format!("Bearer {}", self.credentials)) + .send() + .await?; + + if resp.status() != StatusCode::OK { + bail!("Failed to ping, got response: {}", resp.status()); + } + + Ok(()) + } + + fn stats(&self) -> Result { + Ok(self.alerts_sent) + } +} diff --git a/applications/tari_watcher/src/cli.rs b/applications/tari_watcher/src/cli.rs index 04cc622b1..0534e08cd 100644 --- a/applications/tari_watcher/src/cli.rs +++ b/applications/tari_watcher/src/cli.rs @@ -7,7 +7,12 @@ use clap::Parser; use crate::{ config::{Config, InstanceType}, - constants::{DEFAULT_PROJECT_ROOT, DEFAULT_WATCHER_CONFIG_PATH}, + constants::{ + DEFAULT_MAIN_PROJECT_PATH, + DEFAULT_VALIDATOR_DIR, + DEFAULT_VALIDATOR_KEY_PATH, + DEFAULT_WATCHER_CONFIG_PATH, + }, }; #[derive(Clone, Debug, Parser)] @@ -30,10 +35,14 @@ impl Cli { #[derive(Debug, Clone, clap::Args)] pub struct CommonCli { - #[clap(short = 'b', long, parse(from_os_str), default_value = DEFAULT_PROJECT_ROOT)] + #[clap(short = 'b', long, parse(from_os_str), default_value = DEFAULT_MAIN_PROJECT_PATH)] pub base_dir: PathBuf, #[clap(short = 'c', long, parse(from_os_str), default_value = DEFAULT_WATCHER_CONFIG_PATH)] pub config_path: PathBuf, + #[clap(short = 'k', long, parse(from_os_str), default_value = DEFAULT_VALIDATOR_KEY_PATH)] + pub key_path: PathBuf, + #[clap(short = 'v', long, parse(from_os_str), default_value = DEFAULT_VALIDATOR_DIR)] + pub validator_dir: PathBuf, } #[derive(Clone, Debug, clap::Subcommand)] diff --git a/applications/tari_watcher/src/config.rs b/applications/tari_watcher/src/config.rs index 05f5a0ef1..71bdc522f 100644 --- a/applications/tari_watcher/src/config.rs +++ b/applications/tari_watcher/src/config.rs @@ -10,13 +10,13 @@ use std::{ use tokio::io::{self, AsyncWriteExt}; use crate::{ + cli::Cli, constants::{ DEFAULT_BASE_NODE_GRPC_ADDRESS, DEFAULT_BASE_WALLET_GRPC_ADDRESS, DEFAULT_MINOTARI_MINER_BINARY_PATH, DEFAULT_VALIDATOR_NODE_BINARY_PATH, }, - Cli, }; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -38,6 +38,9 @@ pub struct Config { /// submit a registration transaction on behalf of the node pub vn_registration_file: PathBuf, + // The path of the validator node base directory, obtained when initializing a new VN on L2 + pub vn_base_dir: PathBuf, + /// The sidechain ID to use. If not provided, the default Tari sidechain ID will be used. pub sidechain_id: Option, @@ -47,8 +50,8 @@ pub struct Config { /// The process specific configuration for the executables pub executable_config: Vec, - /// The channel configuration for alerting and monitoring - pub channel_config: Vec, + /// The channel configurations for alerting and monitoring + pub channel_config: Channels, } impl Config { @@ -90,9 +93,17 @@ impl Display for InstanceType { pub struct ChannelConfig { pub name: String, pub enabled: bool, + pub server_url: String, + pub channel_id: String, pub credentials: String, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct Channels { + pub mattermost: ChannelConfig, + pub telegram: ChannelConfig, +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ExecutableConfig { pub instance_type: InstanceType, @@ -104,7 +115,6 @@ pub struct ExecutableConfig { pub struct InstanceConfig { pub name: String, pub instance_type: InstanceType, - pub num_instances: u32, #[serde(alias = "extra_args")] pub settings: HashMap, } @@ -114,7 +124,6 @@ impl InstanceConfig { Self { name: instance_type.to_string(), instance_type, - num_instances: 1, settings: HashMap::new(), } } @@ -123,11 +132,6 @@ impl InstanceConfig { self.name = name.into(); self } - - pub fn with_num_instances(mut self, num_instances: u32) -> Self { - self.num_instances = num_instances; - self - } } pub fn get_base_config(cli: &Cli) -> anyhow::Result { @@ -144,42 +148,39 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { }, ]; let instances = [ - InstanceConfig::new(InstanceType::TariValidatorNode) - .with_name("tari_validator_node") - .with_num_instances(1), - InstanceConfig::new(InstanceType::MinoTariConsoleWallet) - .with_name("minotari_wallet") - .with_num_instances(1), + InstanceConfig::new(InstanceType::TariValidatorNode).with_name("tari_validator_node"), + InstanceConfig::new(InstanceType::MinoTariConsoleWallet).with_name("minotari_wallet"), ]; let base_dir = cli.common.base_dir.clone(); - let vn_registration_file = base_dir - .join("data") - .join("vn1") - .join("esmeralda") - .join("registration.json"); + let vn_registration_file = base_dir.join(cli.common.key_path.clone()); + let vn_base_dir = base_dir.join(cli.common.validator_dir.clone()); Ok(Config { auto_register: true, - // must contain protocol and port base_node_grpc_address: DEFAULT_BASE_NODE_GRPC_ADDRESS.to_string(), base_wallet_grpc_address: DEFAULT_BASE_WALLET_GRPC_ADDRESS.to_string(), - base_dir: base_dir.clone(), + base_dir: base_dir.to_path_buf(), sidechain_id: None, vn_registration_file, + vn_base_dir, instance_config: instances.to_vec(), executable_config: executables, - channel_config: vec![ - ChannelConfig { + channel_config: Channels { + mattermost: ChannelConfig { name: "mattermost".to_string(), - enabled: true, + enabled: false, + server_url: "https://some.corporation.com".to_string(), + channel_id: "".to_string(), credentials: "".to_string(), }, - ChannelConfig { + telegram: ChannelConfig { name: "telegram".to_string(), - enabled: true, + enabled: false, + server_url: "".to_string(), + channel_id: "".to_string(), credentials: "".to_string(), }, - ], + }, }) } diff --git a/applications/tari_watcher/src/constants.rs b/applications/tari_watcher/src/constants.rs index dcce8a027..243f49ed8 100644 --- a/applications/tari_watcher/src/constants.rs +++ b/applications/tari_watcher/src/constants.rs @@ -5,9 +5,11 @@ use tokio::time::Duration; pub const CONSENSUS_CONSTANT_REGISTRATION_DURATION: u64 = 1000; // in blocks: 100 epochs * 10 blocks/epoch -pub const DEFAULT_PROJECT_ROOT: &str = env!("CARGO_MANIFEST_DIR"); +pub const DEFAULT_MAIN_PROJECT_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../"); pub const DEFAULT_WATCHER_CONFIG_PATH: &str = "data/watcher/config.toml"; pub const DEFAULT_VALIDATOR_PID_PATH: &str = "data/watcher/validator.pid"; +pub const DEFAULT_VALIDATOR_DIR: &str = "data/vn1"; +pub const DEFAULT_VALIDATOR_KEY_PATH: &str = "data/vn1/esmeralda/registration.json"; pub const DEFAULT_VALIDATOR_NODE_BINARY_PATH: &str = "target/release/tari_validator_node"; pub const DEFAULT_MINOTARI_MINER_BINARY_PATH: &str = "target/release/minotari_miner"; pub const DEFAULT_BASE_NODE_GRPC_ADDRESS: &str = "http://127.0.0.1:12001"; // note: protocol diff --git a/applications/tari_watcher/src/helpers.rs b/applications/tari_watcher/src/helpers.rs index 93b33b3ad..a0301981e 100644 --- a/applications/tari_watcher/src/helpers.rs +++ b/applications/tari_watcher/src/helpers.rs @@ -32,10 +32,7 @@ pub struct ValidatorNodeRegistration { } pub async fn read_registration_file(vn_registration_file: PathBuf) -> anyhow::Result { - log::debug!( - "Using VN registration file at: {}", - vn_registration_file.clone().into_os_string().into_string().unwrap() - ); + log::debug!("Using VN registration file at: {}", vn_registration_file.display()); let info = fs::read_to_string(vn_registration_file).await?; let reg = json5::from_str(&info)?; diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index 95db2292d..ff92e9c2a 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -15,6 +15,7 @@ use crate::{ shutdown::exit_signal, }; +mod alerting; mod cli; mod config; mod constants; diff --git a/applications/tari_watcher/src/manager.rs b/applications/tari_watcher/src/manager.rs index 533cbaaf2..96dd4fb6c 100644 --- a/applications/tari_watcher/src/manager.rs +++ b/applications/tari_watcher/src/manager.rs @@ -14,21 +14,23 @@ use tari_shutdown::ShutdownSignal; use tokio::sync::{mpsc, oneshot}; use crate::{ - config::{Config, ExecutableConfig}, + config::{Channels, Config, ExecutableConfig}, constants::DEFAULT_VALIDATOR_NODE_BINARY_PATH, minotari::{Minotari, TipStatus}, - monitoring::{read_status, ProcessStatus, Transaction}, + monitoring::{process_status_alert, process_status_log, ProcessStatus, Transaction}, process::Process, }; pub struct ProcessManager { pub base_dir: PathBuf, + pub validator_base_dir: PathBuf, pub validator_config: ExecutableConfig, pub wallet_config: ExecutableConfig, pub process: Process, pub shutdown_signal: ShutdownSignal, pub rx_request: mpsc::Receiver, pub chain: Minotari, + pub alerting_config: Channels, } impl ProcessManager { @@ -36,6 +38,7 @@ impl ProcessManager { let (tx_request, rx_request) = mpsc::channel(1); let this = Self { base_dir: config.base_dir.clone(), + validator_base_dir: config.vn_base_dir, validator_config: config.executable_config[0].clone(), wallet_config: config.executable_config[1].clone(), process: Process::new(), @@ -46,6 +49,7 @@ impl ProcessManager { config.base_wallet_grpc_address, config.vn_registration_file, ), + alerting_config: config.channel_config, }; (this, ManagerHandle::new(tx_request)) } @@ -55,26 +59,35 @@ impl ProcessManager { // clean_stale_pid_file(self.base_dir.clone().join(DEFAULT_VALIDATOR_PID_PATH)).await?; + let vn_binary_path = self + .validator_config + .clone() + .executable_path + .unwrap_or(PathBuf::from(DEFAULT_VALIDATOR_NODE_BINARY_PATH)); + + let vn_base_dir = self.base_dir.join(self.validator_base_dir); + + // get child channel to communicate with the validator node process let cc = self .process - .start_validator( - self.validator_config - .clone() - .executable_path - .unwrap_or(PathBuf::from(DEFAULT_VALIDATOR_NODE_BINARY_PATH)), - self.base_dir, - ) + .start_validator(vn_binary_path, vn_base_dir, self.base_dir, self.alerting_config) .await; if cc.is_none() { todo!("Create new validator node process event listener for fetched existing PID from OS"); } let cc = cc.unwrap(); + + // spawn logging and alerting tasks to process status updates tokio::spawn(async move { - read_status(cc.rx).await; + process_status_log(cc.rx_log).await; + }); + tokio::spawn(async move { + process_status_alert(cc.rx_alert, cc.cfg_alert).await; }); self.chain.bootstrap().await?; + let mut last_registered_at_block = 0; info!("Setup completed: connected to base node and wallet, ready to receive requests"); loop { tokio::select! { @@ -82,10 +95,16 @@ impl ProcessManager { match req { ManagerRequest::GetTipInfo { reply } => { let response = self.chain.get_tip_status().await?; - // send latest block height to monitoring, to potentially warn of upcoming node expiration - if let Err(e) = cc.tx.send(ProcessStatus::WarnExpiration(response.height())).await { + + // send latest block height to logging + if let Err(e) = cc.tx_log.send(ProcessStatus::WarnExpiration(response.height(), last_registered_at_block)).await { error!("Failed to send tip status update to monitoring: {}", e); } + // send latest block height to alerting + if let Err(e) = cc.tx_alert.send(ProcessStatus::WarnExpiration(response.height(), last_registered_at_block)).await { + error!("Failed to send tip status update to alerting: {}", e); + } + drop(reply.send(Ok(response))); } ManagerRequest::GetActiveValidatorNodes { reply } => { @@ -94,11 +113,17 @@ impl ProcessManager { } ManagerRequest::RegisterValidatorNode { block, reply } => { let response = self.chain.register_validator_node().await?; - // send response to monitoring - if let Err(e) = cc.tx.send(ProcessStatus::Submitted(Transaction::new(response.clone(), block))).await { + last_registered_at_block = block; + + // send registration response to logger + if let Err(e) = cc.tx_log.send(ProcessStatus::Submitted(Transaction::new(response.clone(), block))).await { error!("Failed to send node registration update to monitoring: {}", e); } - // send response to backend + // send registration response to alerting + if let Err(e) = cc.tx_alert.send(ProcessStatus::Submitted(Transaction::new(response.clone(), block))).await { + error!("Failed to send node registration update to alerting: {}", e); + } + drop(reply.send(Ok(response))); }, ManagerRequest::GetConsensusConstants { block, reply } => { diff --git a/applications/tari_watcher/src/monitoring.rs b/applications/tari_watcher/src/monitoring.rs index 584fb7533..6cd12ad1c 100644 --- a/applications/tari_watcher/src/monitoring.rs +++ b/applications/tari_watcher/src/monitoring.rs @@ -1,15 +1,18 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use anyhow::Error; use log::*; use minotari_app_grpc::tari_rpc::RegisterValidatorNodeResponse; use tokio::{process::Child, sync::mpsc, time::sleep}; -use crate::constants::{ - CONSENSUS_CONSTANT_REGISTRATION_DURATION, - DEFAULT_PROCESS_MONITORING_INTERVAL, - DEFAULT_THRESHOLD_WARN_EXPIRATION, +use crate::{ + alerting::{Alerting, MatterMostNotifier}, + config::Channels, + constants::{ + CONSENSUS_CONSTANT_REGISTRATION_DURATION, + DEFAULT_PROCESS_MONITORING_INTERVAL, + DEFAULT_THRESHOLD_WARN_EXPIRATION, + }, }; #[derive(Debug)] @@ -32,43 +35,75 @@ pub enum ProcessStatus { Running, Exited(i32), // status code Crashed, - InternalError(Error), + InternalError(String), Submitted(Transaction), - WarnExpiration(u64), + WarnExpiration(u64, u64), // current block and last registered block } -pub async fn monitor_child(mut child: Child, tx: mpsc::Sender) { +pub async fn monitor_child( + mut child: Child, + tx_logging: mpsc::Sender, + tx_alerting: mpsc::Sender, +) { loop { sleep(DEFAULT_PROCESS_MONITORING_INTERVAL).await; // if the child process encountered an unexpected error, not related to the process itself if child.try_wait().is_err() { let err = child.try_wait().err().unwrap(); - tx.send(ProcessStatus::InternalError(err.into())) + let err_msg = err.to_string(); + tx_logging + .send(ProcessStatus::InternalError(err_msg.clone())) .await - .expect("Failed to send internal error status"); + .expect("Failed to send internal error status to logging"); + tx_alerting + .send(ProcessStatus::InternalError(err_msg)) + .await + .expect("Failed to send internal error status to alerting"); break; } // process has finished, intentional or not, if it has some status if let Some(status) = child.try_wait().expect("Failed to poll child process") { if !status.success() { - tx.send(ProcessStatus::Crashed).await.expect("Failed to send status"); + tx_logging + .send(ProcessStatus::Crashed) + .await + .expect("Failed to send status to logging"); + tx_alerting + .send(ProcessStatus::Crashed) + .await + .expect("Failed to send status to alerting"); break; } - tx.send(ProcessStatus::Exited(status.code().unwrap_or(0))) + tx_logging + .send(ProcessStatus::Exited(status.code().unwrap_or(0))) + .await + .expect("Failed to send process exit status to logging"); + tx_alerting + .send(ProcessStatus::Exited(status.code().unwrap_or(0))) .await - .expect("Failed to send process exit status"); + .expect("Failed to send process exit status to alerting"); break; } // process is still running - tx.send(ProcessStatus::Running) + tx_logging + .send(ProcessStatus::Running) .await - .expect("Failed to send process running status"); + .expect("Failed to send process running status to logging"); + tx_alerting + .send(ProcessStatus::Running) + .await + .expect("Failed to send process running status to alerting"); } } -pub async fn read_status(mut rx: mpsc::Receiver) { - let mut last_registered_at_block = 0; +pub fn is_registration_near_expiration(curr_block: u64, last_registered_block: u64) -> bool { + last_registered_block != 0 && + curr_block + DEFAULT_THRESHOLD_WARN_EXPIRATION >= + last_registered_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION +} + +pub async fn process_status_log(mut rx: mpsc::Receiver) { while let Some(status) = rx.recv().await { match status { ProcessStatus::Exited(code) => { @@ -91,20 +126,84 @@ pub async fn read_status(mut rx: mpsc::Receiver) { "Validator node registration submitted (tx: {}, block: {})", tx.id, tx.block ); - last_registered_at_block = tx.block; }, - ProcessStatus::WarnExpiration(block) => { - if last_registered_at_block != 0 && - block + DEFAULT_THRESHOLD_WARN_EXPIRATION >= - last_registered_at_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION - { + ProcessStatus::WarnExpiration(block, last_reg_block) => { + if is_registration_near_expiration(block, last_reg_block) { + let expiration_block = last_reg_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION; warn!( - "Validator node registration expires at block {} ({} blocks remaining)", - last_registered_at_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION, - last_registered_at_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION - block + "Validator node registration expires at block {}, current block: {}", + expiration_block, block ); } }, } } } + +pub async fn process_status_alert(mut rx: mpsc::Receiver, cfg: Channels) { + let mut mattermost: Option = None; + if cfg.mattermost.enabled { + let cfg = cfg.mattermost.clone(); + info!("MatterMost alerting enabled"); + mattermost = Some(MatterMostNotifier::new(cfg.server_url, cfg.channel_id, cfg.credentials)); + } else { + info!("MatterMost alerting disabled"); + } + + while let Some(status) = rx.recv().await { + match status { + ProcessStatus::Exited(code) => { + if let Some(mm) = &mut mattermost { + mm.alert(&format!("Validator node process exited with code {}", code)) + .await + .expect("Failed to send alert to MatterMost"); + } + }, + ProcessStatus::InternalError(err) => { + if let Some(mm) = &mut mattermost { + mm.alert(&format!("Validator node process internal error: {}", err)) + .await + .expect("Failed to send alert to MatterMost"); + } + }, + ProcessStatus::Crashed => { + if let Some(mm) = &mut mattermost { + mm.alert("Validator node process crashed") + .await + .expect("Failed to send alert to MatterMost"); + } + }, + ProcessStatus::Running => { + // all good, process is still running, send heartbeat to channel(s) + if let Some(mm) = &mut mattermost { + if mm.ping().await.is_err() { + warn!("Failed to send heartbeat to MatterMost"); + } + } + }, + ProcessStatus::Submitted(tx) => { + if let Some(mm) = &mut mattermost { + mm.alert(&format!( + "Validator node registration submitted (tx: {}, block: {})", + tx.id, tx.block + )) + .await + .expect("Failed to send alert to MatterMost"); + } + }, + ProcessStatus::WarnExpiration(block, last_reg_block) => { + if is_registration_near_expiration(block, last_reg_block) { + if let Some(mm) = &mut mattermost { + let expiration_block = last_reg_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION; + mm.alert(&format!( + "Validator node registration expires at block {}, current block: {}", + expiration_block, block, + )) + .await + .expect("Failed to send alert to MatterMost"); + } + } + }, + } + } +} diff --git a/applications/tari_watcher/src/process.rs b/applications/tari_watcher/src/process.rs index c85b69447..2e7695aa3 100644 --- a/applications/tari_watcher/src/process.rs +++ b/applications/tari_watcher/src/process.rs @@ -13,6 +13,7 @@ use tokio::{ }; use crate::{ + config::Channels, constants::DEFAULT_VALIDATOR_PID_PATH, monitoring::{monitor_child, ProcessStatus}, }; @@ -58,17 +59,28 @@ async fn create_pid_file(path: PathBuf) -> anyhow::Result<()> { pub struct ChildChannel { pub pid: u32, - pub rx: mpsc::Receiver, - pub tx: mpsc::Sender, + pub rx_log: mpsc::Receiver, + pub tx_log: mpsc::Sender, + pub rx_alert: mpsc::Receiver, + pub tx_alert: mpsc::Sender, + pub cfg_alert: Channels, } -pub async fn spawn_validator_node_os(validator_node_path: PathBuf, base_dir: PathBuf) -> anyhow::Result { - let node_binary_path = base_dir.join(validator_node_path.clone()); - debug!("Using validator node binary at {}", node_binary_path.display()); +pub async fn spawn_validator_node_os( + validator_node_path: PathBuf, + validator_config_path: PathBuf, + base_dir: PathBuf, + cfg_alert: Channels, +) -> anyhow::Result { + let node_binary_path = base_dir.join(validator_node_path); + let mut vn_cfg_path = base_dir.join(validator_config_path); + let vn_cfg_str = vn_cfg_path.as_mut_os_str().to_str(); + debug!("Using VN binary at: {}", node_binary_path.display()); + debug!("Using VN config in directory: {}", vn_cfg_str.unwrap_or_default()); let child = TokioCommand::new(node_binary_path.clone().into_os_string()) .arg("-b") - .arg("data/vn1") + .arg(vn_cfg_path) .stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::null()) @@ -98,10 +110,18 @@ pub async fn spawn_validator_node_os(validator_node_path: PathBuf, base_dir: Pat .await?; file.write_all(pid.to_string().as_bytes()).await?; - let (tx, rx) = mpsc::channel(16); - tokio::spawn(monitor_child(child, tx.clone())); - - Ok(ChildChannel { pid, rx, tx }) + let (tx_log, rx_log) = mpsc::channel(16); + let (tx_alert, rx_alert) = mpsc::channel(16); + tokio::spawn(monitor_child(child, tx_log.clone(), tx_alert.clone())); + + Ok(ChildChannel { + pid, + rx_log, + tx_log, + tx_alert, + rx_alert, + cfg_alert, + }) } async fn check_existing_node_os(base_dir: PathBuf) -> Option { @@ -138,7 +158,13 @@ impl Process { Self { pid: None } } - pub async fn start_validator(&mut self, validator_path: PathBuf, base_dir: PathBuf) -> Option { + pub async fn start_validator( + &mut self, + validator_path: PathBuf, + validator_config_path: PathBuf, + base_dir: PathBuf, + alerting_config: Channels, + ) -> Option { let opt = check_existing_node_os(base_dir.clone()).await; if let Some(pid) = opt { info!("Picking up existing validator node process with id: {}", pid); @@ -150,7 +176,9 @@ impl Process { debug!("No existing validator node process found, spawn new one"); } - let cc = spawn_validator_node_os(validator_path, base_dir).await.ok()?; + let cc = spawn_validator_node_os(validator_path, validator_config_path, base_dir, alerting_config) + .await + .ok()?; self.pid = Some(cc.pid); Some(cc)