From b6c8c2a725b0995763eb0b9c700cbfddeb690765 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Tue, 3 Sep 2024 17:23:53 +0400 Subject: [PATCH] fix(watcher): general fixes while testing --- Cargo.lock | 5 + .../config_presets/c_validator_node.toml | 5 +- .../config_presets/d_indexer.toml | 5 +- applications/tari_indexer/Cargo.toml | 2 +- applications/tari_indexer/src/bootstrap.rs | 11 +- applications/tari_indexer/src/config.rs | 7 +- applications/tari_indexer/src/lib.rs | 13 +- .../src/process_definitions/indexer.rs | 6 +- .../src/process_definitions/validator_node.rs | 10 +- applications/tari_validator_node/Cargo.toml | 1 + .../tari_validator_node/src/bootstrap.rs | 5 +- applications/tari_validator_node/src/cli.rs | 6 + .../tari_validator_node/src/config.rs | 7 +- applications/tari_validator_node/src/lib.rs | 18 ++- applications/tari_validator_node/src/main.rs | 12 +- applications/tari_watcher/Cargo.toml | 1 + applications/tari_watcher/src/cli.rs | 25 +--- applications/tari_watcher/src/config.rs | 112 ++-------------- applications/tari_watcher/src/constants.rs | 10 +- applications/tari_watcher/src/helpers.rs | 29 +++- applications/tari_watcher/src/main.rs | 34 +++-- applications/tari_watcher/src/manager.rs | 47 +++---- applications/tari_watcher/src/minotari.rs | 28 ++-- applications/tari_watcher/src/monitoring.rs | 72 +++++----- applications/tari_watcher/src/process.rs | 124 +++++++++--------- applications/tari_watcher/src/registration.rs | 15 ++- clients/base_node_client/Cargo.toml | 5 +- clients/base_node_client/src/grpc.rs | 9 +- integration_tests/src/base_node.rs | 2 +- integration_tests/src/indexer.rs | 2 +- integration_tests/src/validator_node.rs | 3 +- 31 files changed, 289 insertions(+), 342 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9cb814e3..b351b6039 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8706,6 +8706,7 @@ dependencies = [ "thiserror", "tonic 0.8.3", "ts-rs", + "url", ] [[package]] @@ -9496,6 +9497,7 @@ dependencies = [ "thiserror", "tokio", "tower-http", + "url", ] [[package]] @@ -10083,6 +10085,7 @@ dependencies = [ "time", "tokio", "tower-http", + "url", ] [[package]] @@ -10196,6 +10199,7 @@ dependencies = [ "tokio", "toml 0.8.15", "tonic 0.8.3", + "url", ] [[package]] @@ -11156,6 +11160,7 @@ dependencies = [ "form_urlencoded", "idna 0.5.0", "percent-encoding", + "serde", ] [[package]] diff --git a/applications/tari_dan_app_utilities/config_presets/c_validator_node.toml b/applications/tari_dan_app_utilities/config_presets/c_validator_node.toml index 038875880..b1b3eb93f 100644 --- a/applications/tari_dan_app_utilities/config_presets/c_validator_node.toml +++ b/applications/tari_dan_app_utilities/config_presets/c_validator_node.toml @@ -1,4 +1,3 @@ - ######################################################################################################################## # # # Validator Node Configuration Options (ValidatorNodeConfig) # @@ -20,8 +19,8 @@ # automatically configured (default = ) #public_address = -# The Minotari base node's GRPC address. (default = "127.0.0.1:" the value is based on network) -#base_node_grpc_address = "127.0.0.1:18142" +# The Minotari base node's GRPC url. (default = "http://127.0.0.1:" the value is based on network) +#base_node_grpc_url = "http://127.0.0.1:18142" # How often do we want to scan the base layer for changes. (default = 10) #base_layer_scanning_interval = 10 diff --git a/applications/tari_dan_app_utilities/config_presets/d_indexer.toml b/applications/tari_dan_app_utilities/config_presets/d_indexer.toml index 3c6714dc5..9f53092fd 100644 --- a/applications/tari_dan_app_utilities/config_presets/d_indexer.toml +++ b/applications/tari_dan_app_utilities/config_presets/d_indexer.toml @@ -1,4 +1,3 @@ - ######################################################################################################################## # # # Indexer Configuration Options (IndexerConfig) # @@ -20,8 +19,8 @@ # automatically configured (default = ) #public_address = -# The Minotari base node's GRPC address. (default = "127.0.0.1/" the value is based on network) -#base_node_grpc_address = "127.0.0.1/tcp/18142" +# The Minotari base node's GRPC URL. (default = "http://127.0.0.1/" the value is based on network) +#base_node_grpc_url = "http://127.0.0.1:18142" # How often do we want to scan the base layer for changes. (default = 10) #base_layer_scanning_interval = 10 diff --git a/applications/tari_indexer/Cargo.toml b/applications/tari_indexer/Cargo.toml index 94a9d22eb..6b7b3e0be 100644 --- a/applications/tari_indexer/Cargo.toml +++ b/applications/tari_indexer/Cargo.toml @@ -68,7 +68,7 @@ tokio = { workspace = true, features = [ "rt-multi-thread", ] } tower-http = { workspace = true, features = ["default", "cors"] } - +url = { workspace = true, features = ["serde"] } [package.metadata.cargo-machete] ignored = [ diff --git a/applications/tari_indexer/src/bootstrap.rs b/applications/tari_indexer/src/bootstrap.rs index d79f4c9dd..102ad53ac 100644 --- a/applications/tari_indexer/src/bootstrap.rs +++ b/applications/tari_indexer/src/bootstrap.rs @@ -62,11 +62,12 @@ pub async fn spawn_services( ensure_directories_exist(config)?; // GRPC client connection to base node - let base_node_client = - GrpcBaseNodeClient::new(config.indexer.base_node_grpc_address.clone().unwrap_or_else(|| { - let port = grpc_default_port(ApplicationType::BaseNode, config.network); - format!("127.0.0.1:{port}") - })); + let base_node_client = GrpcBaseNodeClient::new(config.indexer.base_node_grpc_url.clone().unwrap_or_else(|| { + let port = grpc_default_port(ApplicationType::BaseNode, config.network); + format!("http://127.0.0.1:{port}") + .parse() + .expect("Default base node GRPC URL is malformed") + })); // Initialize networking let identity = identity::Keypair::sr25519_from_bytes(keypair.secret_key().as_bytes().to_vec()).map_err(|e| { diff --git a/applications/tari_indexer/src/config.rs b/applications/tari_indexer/src/config.rs index ea79dc3b8..da5734267 100644 --- a/applications/tari_indexer/src/config.rs +++ b/applications/tari_indexer/src/config.rs @@ -39,6 +39,7 @@ use tari_dan_app_utilities::{ p2p_config::{P2pConfig, PeerSeedsConfig}, template_manager::implementation::TemplateConfig, }; +use url::Url; #[derive(Debug, Clone)] pub struct ApplicationConfig { @@ -70,8 +71,8 @@ pub struct IndexerConfig { pub identity_file: PathBuf, /// A path to the file that stores the tor hidden service private key, if using the tor transport pub tor_identity_file: PathBuf, - /// The Tari base node's GRPC address - pub base_node_grpc_address: Option, + /// The Tari base node's GRPC URL (e.g. http://localhost:18142) + pub base_node_grpc_url: Option, /// How often do we want to scan the base layer for changes #[serde(with = "serializers::seconds")] pub base_layer_scanning_interval: Duration, @@ -127,7 +128,7 @@ impl Default for IndexerConfig { override_from: None, identity_file: PathBuf::from("indexer_id.json"), tor_identity_file: PathBuf::from("indexer_tor_id.json"), - base_node_grpc_address: None, + base_node_grpc_url: None, base_layer_scanning_interval: Duration::from_secs(10), data_dir: PathBuf::from("data/indexer"), p2p: P2pConfig::default(), diff --git a/applications/tari_indexer/src/lib.rs b/applications/tari_indexer/src/lib.rs index 28736c90d..d45c1a020 100644 --- a/applications/tari_indexer/src/lib.rs +++ b/applications/tari_indexer/src/lib.rs @@ -240,10 +240,13 @@ async fn handle_epoch_manager_event(services: &Services, event: EpochManagerEven } async fn create_base_layer_clients(config: &ApplicationConfig) -> Result { - GrpcBaseNodeClient::connect(config.indexer.base_node_grpc_address.clone().unwrap_or_else(|| { + let url = config.indexer.base_node_grpc_url.clone().unwrap_or_else(|| { let port = grpc_default_port(ApplicationType::BaseNode, config.network); - format!("127.0.0.1:{port}") - })) - .await - .map_err(|err| ExitError::new(ExitCode::ConfigError, format!("Could not connect to base node {}", err))) + format!("http://127.0.0.1:{port}") + .parse() + .expect("Default base node GRPC URL is malformed") + }); + GrpcBaseNodeClient::connect(url) + .await + .map_err(|err| ExitError::new(ExitCode::ConfigError, format!("Could not connect to base node {}", err))) } diff --git a/applications/tari_swarm_daemon/src/process_definitions/indexer.rs b/applications/tari_swarm_daemon/src/process_definitions/indexer.rs index d81eff092..ceacfbe5a 100644 --- a/applications/tari_swarm_daemon/src/process_definitions/indexer.rs +++ b/applications/tari_swarm_daemon/src/process_definitions/indexer.rs @@ -35,11 +35,11 @@ impl ProcessDefinition for Indexer { .next() .ok_or_else(|| anyhow!("Base nodes should be started before validator nodes"))?; - let base_node_grpc_address = base_node + let base_node_grpc_url = base_node .instance() .allocated_ports() .get("grpc") - .map(|port| format!("{listen_ip}:{port}")) + .map(|port| format!("http://{listen_ip}:{port}")) .ok_or_else(|| anyhow!("grpc port not found for base node"))?; command @@ -48,7 +48,7 @@ impl ProcessDefinition for Indexer { .arg(context.base_path()) .arg("--network") .arg(context.network().to_string()) - .arg(format!("-pindexer.base_node_grpc_address={base_node_grpc_address}")) + .arg(format!("-pindexer.base_node_grpc_url={base_node_grpc_url}")) .arg(format!("-pindexer.json_rpc_address={json_rpc_address}")) .arg(format!("-pindexer.http_ui_address={web_ui_address}")) .arg(format!("-pindexer.ui_connect_address={json_rpc_public_address}")) diff --git a/applications/tari_swarm_daemon/src/process_definitions/validator_node.rs b/applications/tari_swarm_daemon/src/process_definitions/validator_node.rs index b830c4640..7dc8bdb83 100644 --- a/applications/tari_swarm_daemon/src/process_definitions/validator_node.rs +++ b/applications/tari_swarm_daemon/src/process_definitions/validator_node.rs @@ -36,17 +36,17 @@ impl ProcessDefinition for ValidatorNode { .next() .ok_or_else(|| anyhow!("Base nodes should be started before validator nodes"))?; - let base_node_grpc_address = base_node + let base_node_grpc_url = base_node .instance() .allocated_ports() .get("grpc") - .map(|port| format!("{listen_ip}:{port}")) + .map(|port| format!("http://{listen_ip}:{port}")) .ok_or_else(|| anyhow!("grpc port not found for base node"))?; debug!( "Starting validator node #{} with base node grpc address: {}", context.instance_id(), - base_node_grpc_address + base_node_grpc_url ); command @@ -56,9 +56,7 @@ impl ProcessDefinition for ValidatorNode { .arg("--network") .arg(context.network().to_string()) .arg(format!("--json-rpc-public-address={json_rpc_public_address}")) - .arg(format!( - "-pvalidator_node.base_node_grpc_address={base_node_grpc_address}" - )) + .arg(format!("-pvalidator_node.base_node_grpc_url={base_node_grpc_url}")) .arg(format!("-pvalidator_node.json_rpc_listener_address={json_rpc_address}")) .arg(format!("-pvalidator_node.http_ui_listener_address={web_ui_address}")) .arg("-pvalidator_node.base_layer_scanning_interval=1"); diff --git a/applications/tari_validator_node/Cargo.toml b/applications/tari_validator_node/Cargo.toml index b310030fa..003b3df19 100644 --- a/applications/tari_validator_node/Cargo.toml +++ b/applications/tari_validator_node/Cargo.toml @@ -80,6 +80,7 @@ tokio = { workspace = true, features = [ "rt-multi-thread", ] } tower-http = { workspace = true, features = ["default", "cors"] } +url = { workspace = true, features = ["serde"] } [build-dependencies] tari_common = { workspace = true, features = ["build"] } diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index b9a55008e..5e0ad8ffc 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -369,7 +369,8 @@ async fn create_registration_file( let fee_claim_public_key = config.validator_node.fee_claim_public_key.clone(); epoch_manager .set_fee_claim_public_key(fee_claim_public_key.clone()) - .await?; + .await + .context("set_fee_claim_public_key failed when creating registration file")?; let signature = ValidatorNodeSignature::sign(keypair.secret_key(), &fee_claim_public_key, b""); @@ -382,7 +383,7 @@ async fn create_registration_file( config.common.base_path.join("registration.json"), serde_json::to_string(®istration)?, ) - .map_err(|e| ExitError::new(ExitCode::UnknownError, e))?; + .context("failed to write registration file")?; Ok(()) } diff --git a/applications/tari_validator_node/src/cli.rs b/applications/tari_validator_node/src/cli.rs index 852c488a3..f049da09f 100644 --- a/applications/tari_validator_node/src/cli.rs +++ b/applications/tari_validator_node/src/cli.rs @@ -24,6 +24,7 @@ use std::net::SocketAddr; use clap::Parser; use minotari_app_utilities::common_cli_args::CommonCliArgs; +use reqwest::Url; use tari_common::configuration::{ConfigOverrideProvider, Network}; use tari_dan_app_utilities::p2p_config::ReachabilityMode; @@ -43,6 +44,8 @@ pub struct Cli { pub http_ui_listener_address: Option, #[clap(long, env = "TARI_VN_JSON_RPC_PUBLIC_ADDRESS")] pub json_rpc_public_address: Option, + #[clap(long, alias = "node-grpc", short = 'g', env = "TARI_VN_MINOTARI_NODE_GRPC_URL")] + pub minotari_node_grpc_url: Option, #[clap(long, short = 's')] pub peer_seeds: Vec, #[clap(long)] @@ -102,6 +105,9 @@ impl ConfigOverrideProvider for Cli { if self.disable_mdns { overrides.push(("validator_node.p2p.enable_mdns".to_string(), "false".to_string())); } + if let Some(url) = self.minotari_node_grpc_url.as_ref() { + overrides.push(("validator_node.base_node_grpc_url".to_string(), url.to_string())); + } overrides } } diff --git a/applications/tari_validator_node/src/config.rs b/applications/tari_validator_node/src/config.rs index 7682157fd..a5f4073bb 100644 --- a/applications/tari_validator_node/src/config.rs +++ b/applications/tari_validator_node/src/config.rs @@ -40,6 +40,7 @@ use tari_dan_app_utilities::{ p2p_config::{P2pConfig, PeerSeedsConfig, RpcConfig}, template_manager::implementation::TemplateConfig, }; +use url::Url; #[derive(Debug, Clone)] pub struct ApplicationConfig { @@ -72,8 +73,8 @@ pub struct ValidatorNodeConfig { pub identity_file: PathBuf, //// The node's publicly-accessible hostname // pub public_address: Option, - /// The Tari base node's GRPC address - pub base_node_grpc_address: Option, + /// The Tari base node's GRPC URL + pub base_node_grpc_url: Option, /// If set to false, there will be no base layer scanning at all pub scan_base_layer: bool, /// How often do we want to scan the base layer for changes @@ -134,7 +135,7 @@ impl Default for ValidatorNodeConfig { override_from: None, shard_key_file: PathBuf::from("shard_key.json"), identity_file: PathBuf::from("validator_node_id.json"), - base_node_grpc_address: None, + base_node_grpc_url: None, scan_base_layer: true, base_layer_scanning_interval: Duration::from_secs(10), data_dir: PathBuf::from("data/validator_node"), diff --git a/applications/tari_validator_node/src/lib.rs b/applications/tari_validator_node/src/lib.rs index 7484d2644..3554e2651 100644 --- a/applications/tari_validator_node/src/lib.rs +++ b/applications/tari_validator_node/src/lib.rs @@ -167,14 +167,24 @@ pub async fn run_validator_node( } async fn create_base_layer_client(config: &ApplicationConfig) -> Result { - let base_node_address = config.validator_node.base_node_grpc_address.clone().unwrap_or_else(|| { + let base_node_address = config.validator_node.base_node_grpc_url.clone().unwrap_or_else(|| { let port = grpc_default_port(ApplicationType::BaseNode, config.network); - format!("127.0.0.1:{port}") + format!("http://127.0.0.1:{port}") + .parse() + .expect("Default base node GRPC URL is malformed") }); info!(target: LOG_TARGET, "Connecting to base node on GRPC at {}", base_node_address); - let base_node_client = GrpcBaseNodeClient::connect(base_node_address) + let base_node_client = GrpcBaseNodeClient::connect(base_node_address.clone()) .await - .map_err(|error| ExitError::new(ExitCode::ConfigError, error))?; + .map_err(|error| { + ExitError::new( + ExitCode::ConfigError, + format!( + "Could not connect to the Minotari node at address {base_node_address}: {error}. Please ensure \ + that the Minotari node is running and configured for GRPC." + ), + ) + })?; Ok(base_node_client) } diff --git a/applications/tari_validator_node/src/main.rs b/applications/tari_validator_node/src/main.rs index a87ea3f00..5627589cd 100644 --- a/applications/tari_validator_node/src/main.rs +++ b/applications/tari_validator_node/src/main.rs @@ -83,9 +83,15 @@ async fn main_inner() -> Result<(), ExitError> { info!(target: LOG_TARGET, "Starting validator node on network {}", config.network); match run_validator_node(&config, shutdown.to_signal()).await { Ok(_) => info!(target: LOG_TARGET, "Validator node shutdown successfully"), - Err(e) => { - error!(target: LOG_TARGET, "Validator node shutdown with an error: {:?}", e); - return Err(ExitError::new(ExitCode::UnknownError, e)); + Err(e) => match e.downcast() { + Ok(exit_error) => { + error!(target: LOG_TARGET, "Validator node shutdown with an error: {:?}", exit_error); + return Err(exit_error); + }, + Err(e) => { + error!(target: LOG_TARGET, "Validator node shutdown with an error: {:?}", e); + return Err(ExitError::new(ExitCode::UnknownError, e)); + }, }, } diff --git a/applications/tari_watcher/Cargo.toml b/applications/tari_watcher/Cargo.toml index 441ee01c8..2437e9a8a 100644 --- a/applications/tari_watcher/Cargo.toml +++ b/applications/tari_watcher/Cargo.toml @@ -21,6 +21,7 @@ tari_shutdown = { workspace = true } clap = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] } anyhow = { workspace = true } +url = { workspace = true, features = ["serde"] } tokio = { workspace = true, features = [ "rt-multi-thread", "macros", diff --git a/applications/tari_watcher/src/cli.rs b/applications/tari_watcher/src/cli.rs index a08baf217..deee8e7f8 100644 --- a/applications/tari_watcher/src/cli.rs +++ b/applications/tari_watcher/src/cli.rs @@ -6,11 +6,11 @@ use std::path::PathBuf; use clap::Parser; use crate::{ - config::{Config, InstanceType}, + config::Config, constants::{ - DEFAULT_MAIN_PROJECT_PATH, DEFAULT_VALIDATOR_DIR, DEFAULT_VALIDATOR_KEY_PATH, + DEFAULT_WATCHER_BASE_PATH, DEFAULT_WATCHER_CONFIG_PATH, }, }; @@ -35,7 +35,7 @@ impl Cli { #[derive(Debug, Clone, clap::Args)] pub struct CommonCli { - #[clap(short = 'b', long, parse(from_os_str), default_value = DEFAULT_MAIN_PROJECT_PATH)] + #[clap(short = 'b', long, parse(from_os_str), default_value = DEFAULT_WATCHER_BASE_PATH)] pub base_dir: PathBuf, #[clap(short = 'c', long, parse(from_os_str), default_value = DEFAULT_WATCHER_CONFIG_PATH)] pub config_path: PathBuf, @@ -71,27 +71,16 @@ impl InitArgs { #[derive(Clone, Debug, clap::Args)] pub struct Overrides { + /// The path to the validator node binary (optional) #[clap(long)] - // The path to the validator node binary (optional) pub vn_node_path: Option, } impl Overrides { pub fn apply(&self, config: &mut Config) { - if self.vn_node_path.is_none() { - return; + if let Some(path) = self.vn_node_path.clone() { + log::info!("Overriding validator node binary path to {:?}", path); + config.validator_node_executable_path = path; } - - if let Some(exec_config) = config - .executable_config - .iter_mut() - .find(|c| c.instance_type == InstanceType::TariValidatorNode) - { - exec_config.executable_path = self.vn_node_path.clone(); - } - log::info!( - "Overriding validator node binary path to {:?}", - self.vn_node_path.as_ref().unwrap() - ); } } diff --git a/applications/tari_watcher/src/config.rs b/applications/tari_watcher/src/config.rs index d6d10bd23..466a1656f 100644 --- a/applications/tari_watcher/src/config.rs +++ b/applications/tari_watcher/src/config.rs @@ -1,22 +1,14 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{ - collections::HashMap, - fmt::{self, Display}, - path::PathBuf, -}; +use std::path::PathBuf; use tokio::io::{self, AsyncWriteExt}; +use url::Url; use crate::{ cli::Cli, - constants::{ - DEFAULT_BASE_NODE_GRPC_ADDRESS, - DEFAULT_BASE_WALLET_GRPC_ADDRESS, - DEFAULT_MINOTARI_MINER_BINARY_PATH, - DEFAULT_VALIDATOR_NODE_BINARY_PATH, - }, + constants::{DEFAULT_BASE_NODE_GRPC_URL, DEFAULT_BASE_WALLET_GRPC_URL, DEFAULT_VALIDATOR_NODE_BINARY_PATH}, }; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -28,11 +20,11 @@ pub struct Config { /// Allow watcher to restart the validator node if it crashes and stops running pub auto_restart: bool, - /// The Minotari node gRPC address - pub base_node_grpc_address: String, + /// The Minotari node gRPC URL + pub base_node_grpc_url: Url, - /// The Minotari console wallet gRPC address - pub base_wallet_grpc_address: String, + /// The Minotari console wallet gRPC URL + pub base_wallet_grpc_url: Url, /// The base directory of the watcher with configuration and data files pub base_dir: PathBuf, @@ -41,17 +33,14 @@ pub struct Config { /// submit a registration transaction on behalf of the node pub vn_registration_file: PathBuf, - // The path of the validator node base directory, obtained when initializing a new VN on L2 + /// The path of the validator node base directory. This directory is automatically created when starting a new VN. pub vn_base_dir: PathBuf, /// The sidechain ID to use. If not provided, the default Tari sidechain ID will be used. pub sidechain_id: Option, - /// The configuration for managing one or multiple processes - pub instance_config: Vec, - - /// The process specific configuration for the executables - pub executable_config: Vec, + /// Path to the executable for the validator node + pub validator_node_executable_path: PathBuf, /// The channel configurations for alerting and monitoring pub channel_config: Channels, @@ -63,33 +52,6 @@ impl Config { writer.write_all(toml.as_bytes()).await?; Ok(()) } - - pub fn missing_conf(&self) -> Option> { - let mut v: Vec<&str> = Vec::new(); - if self.base_node_grpc_address.is_empty() { - v.push("base_node_grpc_address"); - } - if self.base_wallet_grpc_address.is_empty() { - v.push("base_wallet_grpc_address"); - } - if v.is_empty() { - None - } else { - Some(v) - } - } -} - -#[derive(Debug, Clone, Copy, Eq, PartialEq, serde::Serialize, serde::Deserialize)] -pub enum InstanceType { - TariValidatorNode, - MinoTariConsoleWallet, -} - -impl Display for InstanceType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Debug::fmt(self, f) - } } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -107,54 +69,7 @@ pub struct Channels { pub telegram: ChannelConfig, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct ExecutableConfig { - pub instance_type: InstanceType, - pub executable_path: Option, - pub env: Vec<(String, String)>, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct InstanceConfig { - pub name: String, - pub instance_type: InstanceType, - #[serde(alias = "extra_args")] - pub settings: HashMap, -} - -impl InstanceConfig { - pub fn new(instance_type: InstanceType) -> Self { - Self { - name: instance_type.to_string(), - instance_type, - settings: HashMap::new(), - } - } - - pub fn with_name>(mut self, name: S) -> Self { - self.name = name.into(); - self - } -} - pub fn get_base_config(cli: &Cli) -> anyhow::Result { - let executables = vec![ - ExecutableConfig { - instance_type: InstanceType::TariValidatorNode, - executable_path: Some(DEFAULT_VALIDATOR_NODE_BINARY_PATH.into()), - env: vec![], - }, - ExecutableConfig { - instance_type: InstanceType::MinoTariConsoleWallet, - executable_path: Some(DEFAULT_MINOTARI_MINER_BINARY_PATH.into()), - env: vec![], - }, - ]; - let instances = [ - InstanceConfig::new(InstanceType::TariValidatorNode).with_name("tari_validator_node"), - InstanceConfig::new(InstanceType::MinoTariConsoleWallet).with_name("minotari_wallet"), - ]; - let base_dir = cli.common.base_dir.clone(); let vn_registration_file = base_dir.join(cli.common.key_path.clone()); let vn_base_dir = base_dir.join(cli.common.validator_dir.clone()); @@ -162,14 +77,13 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { Ok(Config { auto_register: true, auto_restart: true, - base_node_grpc_address: DEFAULT_BASE_NODE_GRPC_ADDRESS.to_string(), - base_wallet_grpc_address: DEFAULT_BASE_WALLET_GRPC_ADDRESS.to_string(), + base_node_grpc_url: DEFAULT_BASE_NODE_GRPC_URL.parse()?, + base_wallet_grpc_url: DEFAULT_BASE_WALLET_GRPC_URL.parse()?, base_dir: base_dir.to_path_buf(), sidechain_id: None, vn_registration_file, vn_base_dir, - instance_config: instances.to_vec(), - executable_config: executables, + validator_node_executable_path: DEFAULT_VALIDATOR_NODE_BINARY_PATH.into(), channel_config: Channels { mattermost: ChannelConfig { name: "mattermost".to_string(), diff --git a/applications/tari_watcher/src/constants.rs b/applications/tari_watcher/src/constants.rs index 5244a9ea3..9145816ad 100644 --- a/applications/tari_watcher/src/constants.rs +++ b/applications/tari_watcher/src/constants.rs @@ -1,17 +1,13 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use tokio::time::Duration; - -pub const DEFAULT_MAIN_PROJECT_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../"); +pub const DEFAULT_WATCHER_BASE_PATH: &str = "data/watcher/"; pub const DEFAULT_WATCHER_CONFIG_PATH: &str = "data/watcher/config.toml"; pub const DEFAULT_VALIDATOR_PID_PATH: &str = "data/watcher/validator.pid"; pub const DEFAULT_VALIDATOR_DIR: &str = "data/vn1"; pub const DEFAULT_VALIDATOR_KEY_PATH: &str = "data/vn1/esmeralda/registration.json"; pub const DEFAULT_VALIDATOR_NODE_BINARY_PATH: &str = "target/release/tari_validator_node"; -pub const DEFAULT_MINOTARI_MINER_BINARY_PATH: &str = "target/release/minotari_miner"; -pub const DEFAULT_BASE_NODE_GRPC_ADDRESS: &str = "http://127.0.0.1:12001"; // note: protocol -pub const DEFAULT_BASE_WALLET_GRPC_ADDRESS: &str = "http://127.0.0.1:12003"; // note: protocol +pub const DEFAULT_BASE_NODE_GRPC_URL: &str = "http://127.0.0.1:12001"; // note: protocol +pub const DEFAULT_BASE_WALLET_GRPC_URL: &str = "http://127.0.0.1:12003"; // note: protocol -pub const DEFAULT_PROCESS_MONITORING_INTERVAL: Duration = Duration::from_secs(20); // time to sleep before checking VN process status pub const DEFAULT_THRESHOLD_WARN_EXPIRATION: u64 = 100; // warn at this many blocks before the registration expires diff --git a/applications/tari_watcher/src/helpers.rs b/applications/tari_watcher/src/helpers.rs index c86916a42..7a23583af 100644 --- a/applications/tari_watcher/src/helpers.rs +++ b/applications/tari_watcher/src/helpers.rs @@ -1,7 +1,10 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::path::PathBuf; +use std::{ + io, + path::{Path, PathBuf}, +}; use minotari_app_grpc::tari_rpc::{ConsensusConstants, GetActiveValidatorNodesResponse}; use tari_common_types::types::PublicKey; @@ -31,12 +34,24 @@ pub struct ValidatorNodeRegistration { pub claim_fees_public_key: PublicKey, } -pub async fn read_registration_file(vn_registration_file: PathBuf) -> anyhow::Result { - log::debug!("Using VN registration file at: {}", vn_registration_file.display()); - - let info = fs::read_to_string(vn_registration_file).await?; - let reg = json5::from_str(&info)?; - Ok(reg) +pub async fn read_registration_file>( + vn_registration_file: P, +) -> anyhow::Result> { + log::debug!( + "Using VN registration file at: {}", + vn_registration_file.as_ref().display() + ); + match fs::read_to_string(vn_registration_file).await { + Ok(info) => { + let reg = json5::from_str(&info)?; + Ok(Some(reg)) + }, + Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None), + Err(e) => { + log::error!("Failed to read VN registration file: {}", e); + Err(e.into()) + }, + } } pub fn to_vn_public_keys(vns: Vec) -> Vec { diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index 79f3752ba..e0541da1a 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -1,7 +1,7 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use anyhow::{anyhow, bail, Context}; +use anyhow::{anyhow, Context}; use registration::registration_loop; use tari_shutdown::{Shutdown, ShutdownSignal}; use tokio::{fs, task::JoinHandle}; @@ -9,9 +9,11 @@ use tokio::{fs, task::JoinHandle}; use crate::{ cli::{Cli, Commands}, config::{get_base_config, Config}, + constants::DEFAULT_WATCHER_BASE_PATH, helpers::read_config_file, logger::init_logger, manager::{start_receivers, ManagerHandle, ProcessManager}, + process::create_pid_file, shutdown::exit_signal, }; @@ -33,12 +35,16 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::init(); let config_path = cli.get_config_path(); + let config_path = config_path + .canonicalize() + .context("Failed to canonicalize config path")?; + init_logger()?; match cli.command { Commands::Init(ref args) => { // set by default in CommonCli - let parent = config_path.parent().unwrap(); + let parent = config_path.parent().context("parent path")?; fs::create_dir_all(parent).await?; let mut config = get_base_config(&cli)?; @@ -50,17 +56,11 @@ async fn main() -> anyhow::Result<()> { .with_context(|| anyhow!("Failed to open config path {}", config_path.display()))?; config.write(file).await.context("Writing config failed")?; - let config_path = config_path - .canonicalize() - .context("Failed to canonicalize config path")?; - log::info!("Config file created at {}", config_path.display()); }, Commands::Start(ref args) => { - let mut cfg = read_config_file(cli.get_config_path()).await?; - if let Some(conf) = cfg.missing_conf() { - bail!("Missing configuration values: {:?}", conf); - } + log::info!("Starting watcher using config {}", config_path.display()); + let mut cfg = read_config_file(config_path).await.context("read config file")?; // optionally override config values args.apply(&mut cfg); @@ -74,6 +74,14 @@ async fn main() -> anyhow::Result<()> { async fn start(config: Config) -> anyhow::Result<()> { let shutdown = Shutdown::new(); let signal = shutdown.to_signal().select(exit_signal()?); + fs::create_dir_all(config.base_dir.join(DEFAULT_WATCHER_BASE_PATH)) + .await + .context("create watcher base path")?; + create_pid_file( + config.base_dir.join(DEFAULT_WATCHER_BASE_PATH).join("watcher.pid"), + std::process::id(), + ) + .await?; let handlers = spawn_manager(config.clone(), shutdown.to_signal(), shutdown).await?; let manager_handle = handlers.manager; let task_handle = handlers.task; @@ -86,9 +94,9 @@ async fn start(config: Config) -> anyhow::Result<()> { result?; log::info!("Process manager exited"); }, - _ = async { - drop(registration_loop(config, manager_handle).await); - } => {}, + Err(err) = registration_loop(config, manager_handle) => { + log::error!("Registration loop exited with error {err}"); + }, } Ok(()) diff --git a/applications/tari_watcher/src/manager.rs b/applications/tari_watcher/src/manager.rs index 479b61e6b..1c40e432c 100644 --- a/applications/tari_watcher/src/manager.rs +++ b/applications/tari_watcher/src/manager.rs @@ -1,8 +1,6 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::path::PathBuf; - use log::*; use minotari_app_grpc::tari_rpc::{ self as grpc, @@ -20,24 +18,18 @@ use tokio::{ }; use crate::{ - config::{Channels, Config, ExecutableConfig}, - constants::DEFAULT_VALIDATOR_NODE_BINARY_PATH, - minotari::{Minotari, TipStatus}, + config::{Channels, Config}, + minotari::{MinotariNodes, TipStatus}, monitoring::{process_status_alert, process_status_log, ProcessStatus, Transaction}, process::{start_validator, ChildChannel}, }; pub struct ProcessManager { - pub base_dir: PathBuf, - pub validator_base_dir: PathBuf, - pub validator_config: ExecutableConfig, - pub wallet_config: ExecutableConfig, + pub config: Config, pub shutdown_signal: ShutdownSignal, // listen for keyboard exit signal pub trigger_signal: Shutdown, // triggered when validator auto-restart is disabled pub rx_request: mpsc::Receiver, - pub chain: Minotari, - pub alerting_config: Channels, - pub auto_restart: bool, + pub chain: MinotariNodes, } pub struct ChannelReceivers { @@ -51,20 +43,15 @@ impl ProcessManager { pub fn new(config: Config, shutdown_signal: ShutdownSignal, trigger_signal: Shutdown) -> (Self, ManagerHandle) { let (tx_request, rx_request) = mpsc::channel(1); let this = Self { - base_dir: config.base_dir.clone(), - validator_base_dir: config.vn_base_dir, - validator_config: config.executable_config[0].clone(), - wallet_config: config.executable_config[1].clone(), shutdown_signal, trigger_signal, rx_request, - chain: Minotari::new( - config.base_node_grpc_address, - config.base_wallet_grpc_address, - config.vn_registration_file, + chain: MinotariNodes::new( + config.base_node_grpc_url.clone(), + config.base_wallet_grpc_url.clone(), + config.vn_registration_file.clone(), ), - alerting_config: config.channel_config, - auto_restart: config.auto_restart, + config, }; (this, ManagerHandle::new(tx_request)) } @@ -166,21 +153,17 @@ impl ProcessManager { } async fn start_child_process(&self) -> ChildChannel { - let vn_binary_path = self - .validator_config - .clone() - .executable_path - .unwrap_or(PathBuf::from(DEFAULT_VALIDATOR_NODE_BINARY_PATH)); - - let vn_base_dir = self.base_dir.join(self.validator_base_dir.clone()); + let vn_binary_path = self.config.validator_node_executable_path.clone(); + let vn_base_dir = self.config.base_dir.join(self.config.vn_base_dir.clone()); // get child channel to communicate with the validator node process let cc = start_validator( vn_binary_path, vn_base_dir, - self.base_dir.clone(), - self.alerting_config.clone(), - self.auto_restart, + // TODO: just pass in config + self.config.base_node_grpc_url.clone(), + self.config.channel_config.clone(), + self.config.auto_restart, self.trigger_signal.clone(), ) .await; diff --git a/applications/tari_watcher/src/minotari.rs b/applications/tari_watcher/src/minotari.rs index fac684e1c..ed29ac055 100644 --- a/applications/tari_watcher/src/minotari.rs +++ b/applications/tari_watcher/src/minotari.rs @@ -12,14 +12,15 @@ use tari_common::exit_codes::{ExitCode, ExitError}; use tari_common_types::types::FixedHash; use tari_crypto::tari_utilities::ByteArray; use tonic::transport::Channel; +use url::Url; use crate::helpers::read_registration_file; #[derive(Clone)] -pub struct Minotari { +pub struct MinotariNodes { bootstrapped: bool, - node_grpc_address: String, - wallet_grpc_address: String, + node_grpc_address: Url, + wallet_grpc_address: Url, node_registration_file: PathBuf, current_height: u64, node: Option>, @@ -42,8 +43,8 @@ impl TipStatus { } } -impl Minotari { - pub fn new(node_grpc_address: String, wallet_grpc_address: String, node_registration_file: PathBuf) -> Self { +impl MinotariNodes { + pub fn new(node_grpc_address: Url, wallet_grpc_address: Url, node_registration_file: PathBuf) -> Self { Self { bootstrapped: false, node_grpc_address, @@ -67,16 +68,16 @@ impl Minotari { } async fn connect_wallet(&mut self) -> anyhow::Result<()> { - log::info!("Connecting to wallet on gRPC {}", self.wallet_grpc_address.clone()); - let client = WalletGrpcClient::connect(&self.wallet_grpc_address).await?; + log::info!("Connecting to wallet on gRPC {}", self.wallet_grpc_address); + let client = WalletGrpcClient::connect(self.wallet_grpc_address.as_str()).await?; self.wallet = Some(client); Ok(()) } async fn connect_node(&mut self) -> anyhow::Result<()> { - log::info!("Connecting to base node on gRPC {}", self.node_grpc_address.clone()); - let client = BaseNodeGrpcClient::connect(self.node_grpc_address.clone()) + log::info!("Connecting to base node on gRPC {}", self.node_grpc_address); + let client = BaseNodeGrpcClient::connect(self.node_grpc_address.to_string()) .await .map_err(|e| ExitError::new(ExitCode::ConfigError, e))?; @@ -156,7 +157,14 @@ impl Minotari { info!("Preparing to send a VN registration request"); - let info = read_registration_file(self.node_registration_file.clone()).await?; + let info = read_registration_file(self.node_registration_file.clone()) + .await? + .ok_or_else(|| { + anyhow!( + "No registration data found in file: {}", + self.node_registration_file.display() + ) + })?; let sig = info.signature.signature(); let resp = self .wallet diff --git a/applications/tari_watcher/src/monitoring.rs b/applications/tari_watcher/src/monitoring.rs index 1d12a5bf9..801d921b3 100644 --- a/applications/tari_watcher/src/monitoring.rs +++ b/applications/tari_watcher/src/monitoring.rs @@ -12,7 +12,6 @@ use tokio::{ use crate::{ alerting::{Alerting, MatterMostNotifier, TelegramNotifier}, config::Channels, - constants::DEFAULT_PROCESS_MONITORING_INTERVAL, helpers::is_warning_close_to_expiry, }; @@ -47,27 +46,32 @@ pub async fn monitor_child( tx_alerting: mpsc::Sender, tx_restart: mpsc::Sender<()>, ) { - loop { - sleep(DEFAULT_PROCESS_MONITORING_INTERVAL).await; + // process is still running + tx_logging + .send(ProcessStatus::Running) + .await + .expect("Failed to send process running status to logging"); + tx_alerting + .send(ProcessStatus::Running) + .await + .expect("Failed to send process running status to alerting"); + let exit = child.wait().await; - // if the child process encountered an unexpected error, not related to the process itself - if child.try_wait().is_err() { - let err = child.try_wait().err().unwrap(); - let err_msg = err.to_string(); - tx_logging - .send(ProcessStatus::InternalError(err_msg.clone())) - .await - .expect("Failed to send internal error status to logging"); - tx_alerting - .send(ProcessStatus::InternalError(err_msg)) - .await - .expect("Failed to send internal error status to alerting"); - tx_restart.send(()).await.expect("Failed to send restart node signal"); - break; - } - // process has finished, intentional or not, if it has some status - if let Some(status) = child.try_wait().expect("Failed to poll child process") { - if !status.success() { + match exit { + Ok(status) => { + if status.success() { + info!("Child process exited with status: {}", status); + tx_logging + .send(ProcessStatus::Exited(status.code().unwrap_or(0))) + .await + .expect("Failed to send process exit status to logging"); + tx_alerting + .send(ProcessStatus::Exited(status.code().unwrap_or(0))) + .await + .expect("Failed to send process exit status to alerting"); + tx_restart.send(()).await.expect("Failed to send restart node signal"); + } else { + warn!("Child process CRASHED with status: {}", status); tx_logging .send(ProcessStatus::Crashed) .await @@ -77,28 +81,22 @@ pub async fn monitor_child( .await .expect("Failed to send status to alerting"); tx_restart.send(()).await.expect("Failed to send restart node signal"); - break; } + }, + // if the child process encountered an unexpected error, not related to the process itself + Err(err) => { + error!("Child process encountered an error: {}", err); + let err_msg = err.to_string(); tx_logging - .send(ProcessStatus::Exited(status.code().unwrap_or(0))) + .send(ProcessStatus::InternalError(err_msg.clone())) .await - .expect("Failed to send process exit status to logging"); + .expect("Failed to send internal error status to logging"); tx_alerting - .send(ProcessStatus::Exited(status.code().unwrap_or(0))) + .send(ProcessStatus::InternalError(err_msg)) .await - .expect("Failed to send process exit status to alerting"); + .expect("Failed to send internal error status to alerting"); tx_restart.send(()).await.expect("Failed to send restart node signal"); - break; - } - // process is still running - tx_logging - .send(ProcessStatus::Running) - .await - .expect("Failed to send process running status to logging"); - tx_alerting - .send(ProcessStatus::Running) - .await - .expect("Failed to send process running status to alerting"); + }, } } diff --git a/applications/tari_watcher/src/process.rs b/applications/tari_watcher/src/process.rs index 43706cfa8..241de05ac 100644 --- a/applications/tari_watcher/src/process.rs +++ b/applications/tari_watcher/src/process.rs @@ -1,7 +1,10 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{path::PathBuf, process::Stdio}; +use std::{ + path::{Path, PathBuf}, + process::Stdio, +}; use anyhow::bail; use log::*; @@ -11,7 +14,9 @@ use tokio::{ io::AsyncWriteExt, process::{Child, Command as TokioCommand}, sync::mpsc::{self}, + time::sleep, }; +use url::Url; use crate::{ config::Channels, @@ -45,19 +50,6 @@ pub async fn clean_stale_pid_file(pid_file_path: PathBuf) -> anyhow::Result<()> Ok(()) } -async fn create_pid_file(path: PathBuf) -> anyhow::Result<()> { - let mut file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&path) - .await?; - - file.write_all(std::process::id().to_string().as_bytes()).await?; - - Ok(()) -} - pub struct ChildChannel { pub rx_log: mpsc::Receiver, pub tx_log: mpsc::Sender, @@ -66,52 +58,35 @@ pub struct ChildChannel { pub cfg_alert: Channels, } -async fn spawn_child( - validator_node_path: PathBuf, - validator_config_path: PathBuf, +async fn spawn_validator_node( + binary_path: PathBuf, base_dir: PathBuf, + minotari_node_grpc_url: &Url, ) -> anyhow::Result { - let node_binary_path = base_dir.join(validator_node_path); - let vn_cfg_path = base_dir.join(validator_config_path); - debug!("Using VN binary at: {}", node_binary_path.display()); - debug!("Using VN config in directory: {}", vn_cfg_path.display()); - - let child = TokioCommand::new(node_binary_path.clone().into_os_string()) - .arg("-b") - .arg(vn_cfg_path) + debug!("Using VN binary at: {}", binary_path.display()); + debug!("Using VN base dir in directory: {}", base_dir.display()); + // Needed to ensure the base dir exists before we create the pid file + fs::create_dir_all(&base_dir).await?; + + let child = TokioCommand::new(binary_path) + .arg(format!("-b{}", base_dir.display())) + .arg(format!("--node-grpc={minotari_node_grpc_url}")) .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) + // TODO: redirect these to a file and optionally stdout + // .stdout(Stdio::null()) + // .stderr(Stdio::null()) .kill_on_drop(false) .spawn()?; - let pid = child.id().expect("Failed to get PID for child process"); - info!("Spawned validator child process with id {}", pid); - - let path = base_dir.join(DEFAULT_VALIDATOR_PID_PATH); - if let Err(e) = create_pid_file(path.clone()).await { - log::error!("Failed to create PID file when spawning node: {}", e); - } - - create_pid_file(path.clone()).await?; - - let mut file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path) - .await?; - file.write_all(pid.to_string().as_bytes()).await?; - Ok(child) } pub async fn spawn_validator_node_os( - validator_node_path: PathBuf, - validator_config_path: PathBuf, - base_dir: PathBuf, + binary_path: PathBuf, + vn_base_dir: PathBuf, cfg_alert: Channels, auto_restart: bool, + minotari_node_grpc_url: Url, mut trigger_signal: Shutdown, ) -> anyhow::Result { let (tx_log, rx_log) = mpsc::channel(16); @@ -123,25 +98,35 @@ pub async fn spawn_validator_node_os( let tx_restart_clone_main = tx_restart.clone(); tokio::spawn(async move { loop { - let child_res = spawn_child( - validator_node_path.clone(), - validator_config_path.clone(), - base_dir.clone(), - ) - .await; + let child_res = + spawn_validator_node(binary_path.clone(), vn_base_dir.clone(), &minotari_node_grpc_url).await; match child_res { Ok(child) => { + let pid = child.id().unwrap_or(0); + info!("Spawned validator child process with id {}", pid); + + // TODO: the VN should create a PID file in its base dir + let path = vn_base_dir.join(DEFAULT_VALIDATOR_PID_PATH); + if let Err(err) = create_pid_file(path, pid).await { + error!("Failed to create VN PID file: {}", err); + } + let tx_log_monitor = tx_log_clone_main.clone(); let tx_alert_monitor = tx_alert_clone_main.clone(); let tx_restart_monitor = tx_restart_clone_main.clone(); // spawn monitoring and handle logs and alerts - tokio::spawn(async move { - monitor_child(child, tx_log_monitor, tx_alert_monitor, tx_restart_monitor).await; - }); + tokio::spawn(monitor_child( + child, + tx_log_monitor, + tx_alert_monitor, + tx_restart_monitor, + )); }, Err(e) => { - error!("Failed to spawn child process: {:?}", e); + error!("Failed to spawn child process: {}. Retrying in 5s", e); + sleep(std::time::Duration::from_secs(5)).await; + continue; }, } @@ -173,6 +158,19 @@ pub async fn spawn_validator_node_os( }) } +pub async fn create_pid_file>(path: P, pid: u32) -> anyhow::Result<()> { + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .await?; + + file.write_all(pid.to_string().as_bytes()).await?; + + Ok(()) +} + async fn check_existing_node_os(base_dir: PathBuf) -> Option { let process_dir = base_dir.join("processes"); if !process_dir.exists() { @@ -199,13 +197,13 @@ async fn check_existing_node_os(base_dir: PathBuf) -> Option { pub async fn start_validator( validator_path: PathBuf, - validator_config_path: PathBuf, - base_dir: PathBuf, + vn_base_dir: PathBuf, + minotari_node_grpc_url: Url, alerting_config: Channels, auto_restart: bool, trigger_signal: Shutdown, ) -> Option { - let opt = check_existing_node_os(base_dir.clone()).await; + let opt = check_existing_node_os(vn_base_dir.clone()).await; if let Some(pid) = opt { info!("Picking up existing VN process with id: {}", pid); // todo: create new process status channel for picked up process @@ -216,10 +214,10 @@ pub async fn start_validator( let cc = spawn_validator_node_os( validator_path, - validator_config_path, - base_dir, + vn_base_dir, alerting_config, auto_restart, + minotari_node_grpc_url, trigger_signal, ) .await diff --git a/applications/tari_watcher/src/registration.rs b/applications/tari_watcher/src/registration.rs index b4bb4e45a..825b24643 100644 --- a/applications/tari_watcher/src/registration.rs +++ b/applications/tari_watcher/src/registration.rs @@ -3,7 +3,7 @@ use log::*; use tari_common_types::types::FixedHash; -use tokio::time::{self, Duration}; +use tokio::time::{self, Duration, MissedTickBehavior}; use crate::{ config::Config, @@ -20,9 +20,7 @@ const REGISTRATION_LOOP_INTERVAL: Duration = Duration::from_secs(30); // It will do nothing if it is registered already and not close to expiry. pub async fn registration_loop(config: Config, mut handle: ManagerHandle) -> anyhow::Result { let mut interval = time::interval(REGISTRATION_LOOP_INTERVAL); - let local_node = read_registration_file(config.vn_registration_file).await?; - let local_key = local_node.public_key; - debug!("Local public key: {}", local_key.clone()); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); let mut last_block_hash: Option = None; let mut last_registered: Option = None; let mut recently_registered = false; @@ -30,6 +28,13 @@ pub async fn registration_loop(config: Config, mut handle: ManagerHandle) -> any loop { interval.tick().await; + let Some(vn_reg_data) = read_registration_file(&config.vn_registration_file).await? else { + info!("No registration data found, will try again in 30s"); + continue; + }; + let public_key = vn_reg_data.public_key; + debug!("Local public key: {}", public_key.clone()); + let tip_info = handle.get_tip_info().await; if let Err(e) = tip_info { error!("Failed to get tip info: {}", e); @@ -66,7 +71,7 @@ pub async fn registration_loop(config: Config, mut handle: ManagerHandle) -> any } // if the node is already registered and not close to expiring in the next epoch, skip registration - if contains_key(active_keys.clone(), local_key.clone()) && + if contains_key(active_keys.clone(), public_key.clone()) && !is_close_to_expiry(constants.unwrap(), current_block, last_registered) || recently_registered { diff --git a/clients/base_node_client/Cargo.toml b/clients/base_node_client/Cargo.toml index 1b0d4e13c..fd348d50c 100644 --- a/clients/base_node_client/Cargo.toml +++ b/clients/base_node_client/Cargo.toml @@ -10,9 +10,7 @@ license.workspace = true minotari_app_grpc = { workspace = true } minotari_node_grpc_client = { workspace = true } tari_common_types = { workspace = true } -tari_core = { workspace = true, default-features = false, features = [ - "transactions", -] } +tari_core = { workspace = true, default-features = false, features = ["transactions"] } tari_utilities = { workspace = true } tari_dan_common_types = { workspace = true } @@ -23,6 +21,7 @@ serde = { workspace = true, default-features = true } thiserror = { workspace = true } tonic = { workspace = true } ts-rs = { workspace = true, optional = true } +url = { workspace = true } [features] ts = ["ts-rs"] diff --git a/clients/base_node_client/src/grpc.rs b/clients/base_node_client/src/grpc.rs index 065b9b575..c08d89428 100644 --- a/clients/base_node_client/src/grpc.rs +++ b/clients/base_node_client/src/grpc.rs @@ -32,6 +32,7 @@ use tari_common_types::types::{FixedHash, PublicKey}; use tari_core::{blocks::BlockHeader, transactions::transaction_components::CodeTemplateRegistration}; use tari_dan_common_types::SubstateAddress; use tari_utilities::ByteArray; +use url::Url; use crate::{ types::{BaseLayerConsensusConstants, BaseLayerMetadata, BaseLayerValidatorNode, BlockInfo, SideChainUtxos}, @@ -45,16 +46,16 @@ type Client = BaseNodeGrpcClient; #[derive(Clone)] pub struct GrpcBaseNodeClient { - endpoint: String, + endpoint: Url, client: Option, } impl GrpcBaseNodeClient { - pub fn new(endpoint: String) -> Self { + pub fn new(endpoint: Url) -> Self { Self { endpoint, client: None } } - pub async fn connect(endpoint: String) -> Result { + pub async fn connect(endpoint: Url) -> Result { let mut client = Self { endpoint, client: None }; client.test_connection().await?; Ok(client) @@ -62,7 +63,7 @@ impl GrpcBaseNodeClient { async fn connection(&mut self) -> Result<&mut Client, BaseNodeClientError> { if self.client.is_none() { - let inner = Client::connect(format!("http://{}", self.endpoint)).await?; + let inner = Client::connect(self.endpoint.to_string()).await?; self.client = Some(inner); } self.client.as_mut().ok_or(BaseNodeClientError::ConnectionError) diff --git a/integration_tests/src/base_node.rs b/integration_tests/src/base_node.rs index 201cff413..516a6eacf 100644 --- a/integration_tests/src/base_node.rs +++ b/integration_tests/src/base_node.rs @@ -162,5 +162,5 @@ pub async fn spawn_base_node(world: &mut TariWorld, bn_name: String) { } pub fn get_base_node_client(port: u16) -> GrpcBaseNodeClient { - GrpcBaseNodeClient::new(format!("127.0.0.1:{}", port)) + GrpcBaseNodeClient::new(format!("http://127.0.0.1:{}", port).parse().unwrap()) } diff --git a/integration_tests/src/indexer.rs b/integration_tests/src/indexer.rs index 53e75d138..7b3166fb4 100644 --- a/integration_tests/src/indexer.rs +++ b/integration_tests/src/indexer.rs @@ -192,7 +192,7 @@ pub async fn spawn_indexer(world: &mut TariWorld, indexer_name: String, base_nod config.indexer.data_dir = base_dir.to_path_buf(); config.indexer.identity_file = base_dir.join("indexer_id.json"); config.indexer.tor_identity_file = base_dir.join("indexer_tor_id.json"); - config.indexer.base_node_grpc_address = Some(format!("127.0.0.1:{}", base_node_grpc_port)); + config.indexer.base_node_grpc_url = Some(format!("http://127.0.0.1:{}", base_node_grpc_port).parse().unwrap()); config.indexer.dan_layer_scanning_internal = Duration::from_secs(5); config.indexer.p2p.listener_port = port; diff --git a/integration_tests/src/validator_node.rs b/integration_tests/src/validator_node.rs index b904ad508..26f4b3f4d 100644 --- a/integration_tests/src/validator_node.rs +++ b/integration_tests/src/validator_node.rs @@ -135,7 +135,8 @@ pub async fn spawn_validator_node( config.validator_node.data_dir = temp_dir.to_path_buf(); config.validator_node.shard_key_file = temp_dir.join("shard_key.json"); config.validator_node.identity_file = temp_dir.join("validator_node_id.json"); - config.validator_node.base_node_grpc_address = Some(format!("127.0.0.1:{}", base_node_grpc_port)); + config.validator_node.base_node_grpc_url = + Some(format!("http://127.0.0.1:{}", base_node_grpc_port).parse().unwrap()); // config.validator_node.public_address = // Some(config.validator_node.p2p.transport.tcp.listener_address.clone());