From f91bcd91ba4adc853afa3cc98c4d3df745f6e395 Mon Sep 17 00:00:00 2001 From: dkf Date: Tue, 3 Sep 2024 06:59:45 +0100 Subject: [PATCH] fix(watcher): register before expiry and use network consts (#1128) Description --- * Send a VN registration transaction one epoch (10 blocks on `esmeralda`) before it is due to expire, rather than reacting to not being in the validator set * Use consensus constants received from L1 as part of the check rather than using hardcoded ones for the `esmeralda` network to generalize the solution for any network How Has This Been Tested? --- 1. Run `tari_swarm_daemon` and `tari_watcher` 2. Observe the process sending a registration tx 3. If the registration expires at block $B_E$, observe warning(s) from $B_E - 100$ (default value) and up 4. Mine until any block in $\left[B_E - 10, B_E - 1\right]$ is reached, and observe the registration tx sent --- applications/tari_watcher/README.md | 10 +- applications/tari_watcher/src/constants.rs | 2 - applications/tari_watcher/src/helpers.rs | 30 ++- applications/tari_watcher/src/main.rs | 35 +-- applications/tari_watcher/src/manager.rs | 203 +++++++++++------- applications/tari_watcher/src/minotari.rs | 14 +- applications/tari_watcher/src/monitoring.rs | 33 ++- applications/tari_watcher/src/process.rs | 10 +- applications/tari_watcher/src/registration.rs | 67 +++--- 9 files changed, 254 insertions(+), 150 deletions(-) diff --git a/applications/tari_watcher/README.md b/applications/tari_watcher/README.md index 6e2d38e35..61337d17b 100644 --- a/applications/tari_watcher/README.md +++ b/applications/tari_watcher/README.md @@ -8,9 +8,9 @@ ### Quickstart -Initialize the project with `tari_watcher init` and start it with `tari_watcher run`. Edit the newly generated `config.toml` to enable notifications on Mattermost and Telegram. Make sure to have started up `tari_validator_node` once previously to have a node directory set up, default is `tari_validator_node -- -b data/vn1`. +Initialize the project with `tari_watcher init` and start it with `tari_watcher start`. Edit the newly generated `config.toml` to enable notifications on Mattermost and Telegram. Make sure to have started up `tari_validator_node` once previously to have a node directory set up, default is `tari_validator_node -- -b data/vn1`. -### Setup +### Config and Setup The default values used (see `constants.rs`) when running the project without any flags: ``` @@ -24,6 +24,12 @@ The default values used (see `constants.rs`) when running the project without an - DEFAULT_BASE_WALLET_GRPC_ADDRESS: default is Tari swarm localhost and port ``` +The two main configuration settings for the watcher (default `true`): +``` +- auto_register: automatically re-register the node +- auto_restart: automatically restart the node if it goes down +``` + ### Project ``` diff --git a/applications/tari_watcher/src/constants.rs b/applications/tari_watcher/src/constants.rs index 243f49ed8..5244a9ea3 100644 --- a/applications/tari_watcher/src/constants.rs +++ b/applications/tari_watcher/src/constants.rs @@ -3,8 +3,6 @@ use tokio::time::Duration; -pub const CONSENSUS_CONSTANT_REGISTRATION_DURATION: u64 = 1000; // in blocks: 100 epochs * 10 blocks/epoch - pub const DEFAULT_MAIN_PROJECT_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../"); pub const DEFAULT_WATCHER_CONFIG_PATH: &str = "data/watcher/config.toml"; pub const DEFAULT_VALIDATOR_PID_PATH: &str = "data/watcher/validator.pid"; diff --git a/applications/tari_watcher/src/helpers.rs b/applications/tari_watcher/src/helpers.rs index a0301981e..c86916a42 100644 --- a/applications/tari_watcher/src/helpers.rs +++ b/applications/tari_watcher/src/helpers.rs @@ -3,13 +3,13 @@ use std::path::PathBuf; -use minotari_app_grpc::tari_rpc::GetActiveValidatorNodesResponse; +use minotari_app_grpc::tari_rpc::{ConsensusConstants, GetActiveValidatorNodesResponse}; use tari_common_types::types::PublicKey; use tari_core::transactions::transaction_components::ValidatorNodeSignature; use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::ByteArray}; use tokio::fs; -use crate::config::Config; +use crate::{config::Config, constants::DEFAULT_THRESHOLD_WARN_EXPIRATION}; pub async fn read_config_file(path: PathBuf) -> anyhow::Result { let content = fs::read_to_string(&path).await.map_err(|_| { @@ -48,3 +48,29 @@ pub fn to_vn_public_keys(vns: Vec) -> Vec, needle: PublicKey) -> bool { vns.iter().any(|vn| vn.eq(&needle)) } + +pub fn is_close_to_expiry( + constants: ConsensusConstants, + current_block: u64, + last_registered_block: Option, +) -> bool { + // if we haven't registered yet in this session, return false + if last_registered_block.is_none() { + return false; + } + let epoch_length = constants.epoch_length; + let validity_period = constants.validator_node_validity_period; + let registration_duration = validity_period * epoch_length; + // check if the current block is an epoch or less away from expiring + current_block + epoch_length >= last_registered_block.unwrap() + registration_duration +} + +pub fn is_warning_close_to_expiry( + constants: ConsensusConstants, + current_block: u64, + last_registered_block: u64, +) -> bool { + let registration_duration = constants.epoch_length * constants.validator_node_validity_period; + // if we have approached the expiration threshold + current_block + DEFAULT_THRESHOLD_WARN_EXPIRATION >= last_registered_block + registration_duration +} diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index 296c4acd8..79f3752ba 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -4,14 +4,14 @@ use anyhow::{anyhow, bail, Context}; use registration::registration_loop; use tari_shutdown::{Shutdown, ShutdownSignal}; -use tokio::{fs, task}; +use tokio::{fs, task::JoinHandle}; use crate::{ cli::{Cli, Commands}, config::{get_base_config, Config}, helpers::read_config_file, logger::init_logger, - manager::{ManagerHandle, ProcessManager}, + manager::{start_receivers, ManagerHandle, ProcessManager}, shutdown::exit_signal, }; @@ -74,14 +74,16 @@ async fn main() -> anyhow::Result<()> { async fn start(config: Config) -> anyhow::Result<()> { let shutdown = Shutdown::new(); let signal = shutdown.to_signal().select(exit_signal()?); - let (task_handle, manager_handle) = spawn(config.clone(), shutdown.to_signal(), shutdown).await; + let handlers = spawn_manager(config.clone(), shutdown.to_signal(), shutdown).await?; + let manager_handle = handlers.manager; + let task_handle = handlers.task; tokio::select! { _ = signal => { log::info!("Shutting down"); }, result = task_handle => { - result??; + result?; log::info!("Process manager exited"); }, _ = async { @@ -92,12 +94,21 @@ async fn start(config: Config) -> anyhow::Result<()> { Ok(()) } -async fn spawn( - config: Config, - shutdown: ShutdownSignal, - trigger: Shutdown, -) -> (task::JoinHandle>, ManagerHandle) { - let (manager, manager_handle) = ProcessManager::new(config, shutdown, trigger); - let task_handle = tokio::spawn(manager.start()); - (task_handle, manager_handle) +struct Handlers { + manager: ManagerHandle, + task: JoinHandle<()>, +} + +async fn spawn_manager(config: Config, shutdown: ShutdownSignal, trigger: Shutdown) -> anyhow::Result { + let (manager, mut manager_handle) = ProcessManager::new(config, shutdown, trigger); + let cr = manager.start_request_handler().await?; + let status = manager_handle.get_tip_info().await?; + // in the case the consensus constants have changed since the genesis block, use the latest ones + let constants = manager_handle.get_consensus_constants(status.height()).await?; + start_receivers(cr.rx_log, cr.rx_alert, cr.cfg_alert, constants).await; + + Ok(Handlers { + manager: manager_handle, + task: cr.task, + }) } diff --git a/applications/tari_watcher/src/manager.rs b/applications/tari_watcher/src/manager.rs index a3070c01e..479b61e6b 100644 --- a/applications/tari_watcher/src/manager.rs +++ b/applications/tari_watcher/src/manager.rs @@ -11,14 +11,20 @@ use minotari_app_grpc::tari_rpc::{ RegisterValidatorNodeResponse, }; use tari_shutdown::{Shutdown, ShutdownSignal}; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + sync::{ + mpsc::{self, Receiver}, + oneshot, + }, + task::JoinHandle, +}; use crate::{ config::{Channels, Config, ExecutableConfig}, constants::DEFAULT_VALIDATOR_NODE_BINARY_PATH, minotari::{Minotari, TipStatus}, monitoring::{process_status_alert, process_status_log, ProcessStatus, Transaction}, - process::start_validator, + process::{start_validator, ChildChannel}, }; pub struct ProcessManager { @@ -34,6 +40,13 @@ pub struct ProcessManager { pub auto_restart: bool, } +pub struct ChannelReceivers { + pub rx_log: Receiver, + pub rx_alert: Receiver, + pub cfg_alert: Channels, + pub task: JoinHandle<()>, +} + impl ProcessManager { pub fn new(config: Config, shutdown_signal: ShutdownSignal, trigger_signal: Shutdown) -> (Self, ManagerHandle) { let (tx_request, rx_request) = mpsc::channel(1); @@ -56,25 +69,117 @@ impl ProcessManager { (this, ManagerHandle::new(tx_request)) } - pub async fn start(mut self) -> anyhow::Result<()> { + pub async fn start_request_handler(mut self) -> anyhow::Result { info!("Starting validator node process"); // clean_stale_pid_file(self.base_dir.clone().join(DEFAULT_VALIDATOR_PID_PATH)).await?; + self.chain.bootstrap().await?; + + let cc = self.start_child_process().await; + + let mut last_registered_at_block = 0; + info!("Setup completed: connected to base node and wallet, ready to receive requests"); + let task_handle = tokio::spawn(async move { + loop { + tokio::select! { + Some(req) = self.rx_request.recv() => { + match req { + ManagerRequest::GetTipInfo { reply } => { + let response = match self.chain.get_tip_status().await { + Ok(resp) => resp, + Err(e) => { + error!("Failed to get tip status: {}", e); + continue; + } + }; + + // send latest block height to logging + if let Err(e) = cc.tx_log.send(ProcessStatus::WarnExpiration(response.height(), last_registered_at_block)).await { + error!("Failed to send tip status update to monitoring: {}", e); + } + // send latest block height to alerting + if let Err(e) = cc.tx_alert.send(ProcessStatus::WarnExpiration(response.height(), last_registered_at_block)).await { + error!("Failed to send tip status update to alerting: {}", e); + } + + drop(reply.send(Ok(response))); + } + ManagerRequest::GetActiveValidatorNodes { reply } => { + let response = match self.chain.get_active_validator_nodes().await { + Ok(resp) => resp, + Err(e) => { + error!("Failed to get active validator nodes: {}", e); + continue; + } + }; + drop(reply.send(Ok(response))); + } + ManagerRequest::RegisterValidatorNode { block, reply } => { + let response = match self.chain.register_validator_node().await { + Ok(resp) => resp, + Err(e) => { + error!("Failed to register validator node: {}", e); + continue; + } + }; + last_registered_at_block = block; + + // send registration response to logger + if let Err(e) = cc.tx_log.send(ProcessStatus::Submitted(Transaction::new(response.clone(), block))).await { + error!("Failed to send node registration update to monitoring: {}", e); + } + // send registration response to alerting + if let Err(e) = cc.tx_alert.send(ProcessStatus::Submitted(Transaction::new(response.clone(), block))).await { + error!("Failed to send node registration update to alerting: {}", e); + } + + drop(reply.send(Ok(response))); + }, + ManagerRequest::GetConsensusConstants { block, reply } => { + let response = match self.chain.get_consensus_constants(block).await { + Ok(resp) => resp, + Err(e) => { + error!("Failed to get consensus constants: {}", e); + continue; + } + }; + drop(reply.send(Ok(response))); + } + } + } + + _ = self.shutdown_signal.wait() => { + info!("Shutting down process manager"); + break; + } + } + } + }); + + Ok(ChannelReceivers { + rx_log: cc.rx_log, + rx_alert: cc.rx_alert, + cfg_alert: cc.cfg_alert, + task: task_handle, + }) + } + + async fn start_child_process(&self) -> ChildChannel { let vn_binary_path = self .validator_config .clone() .executable_path .unwrap_or(PathBuf::from(DEFAULT_VALIDATOR_NODE_BINARY_PATH)); - let vn_base_dir = self.base_dir.join(self.validator_base_dir); + let vn_base_dir = self.base_dir.join(self.validator_base_dir.clone()); // get child channel to communicate with the validator node process let cc = start_validator( vn_binary_path, vn_base_dir, - self.base_dir, - self.alerting_config, + self.base_dir.clone(), + self.alerting_config.clone(), self.auto_restart, self.trigger_signal.clone(), ) @@ -82,77 +187,29 @@ impl ProcessManager { if cc.is_none() { todo!("Create new validator node process event listener for fetched existing PID from OS"); } - let cc = cc.unwrap(); - // spawn logging and alerting tasks to process status updates - tokio::spawn(async move { - process_status_log(cc.rx_log).await; - warn!("Logging task has exited"); - }); - tokio::spawn(async move { - process_status_alert(cc.rx_alert, cc.cfg_alert).await; - warn!("Alerting task has exited"); - }); - - self.chain.bootstrap().await?; - - let mut last_registered_at_block = 0; - info!("Setup completed: connected to base node and wallet, ready to receive requests"); - loop { - tokio::select! { - Some(req) = self.rx_request.recv() => { - match req { - ManagerRequest::GetTipInfo { reply } => { - let response = self.chain.get_tip_status().await?; - - // send latest block height to logging - if let Err(e) = cc.tx_log.send(ProcessStatus::WarnExpiration(response.height(), last_registered_at_block)).await { - error!("Failed to send tip status update to monitoring: {}", e); - } - // send latest block height to alerting - if let Err(e) = cc.tx_alert.send(ProcessStatus::WarnExpiration(response.height(), last_registered_at_block)).await { - error!("Failed to send tip status update to alerting: {}", e); - } - - drop(reply.send(Ok(response))); - } - ManagerRequest::GetActiveValidatorNodes { reply } => { - let response = self.chain.get_active_validator_nodes().await?; - drop(reply.send(Ok(response))); - } - ManagerRequest::RegisterValidatorNode { block, reply } => { - let response = self.chain.register_validator_node().await?; - last_registered_at_block = block; - - // send registration response to logger - if let Err(e) = cc.tx_log.send(ProcessStatus::Submitted(Transaction::new(response.clone(), block))).await { - error!("Failed to send node registration update to monitoring: {}", e); - } - // send registration response to alerting - if let Err(e) = cc.tx_alert.send(ProcessStatus::Submitted(Transaction::new(response.clone(), block))).await { - error!("Failed to send node registration update to alerting: {}", e); - } - - drop(reply.send(Ok(response))); - }, - ManagerRequest::GetConsensusConstants { block, reply } => { - let response = self.chain.get_consensus_constants(block).await?; - drop(reply.send(Ok(response))); - } - } - } - - _ = self.shutdown_signal.wait() => { - info!("Shutting down process manager"); - break; - } - } - } - - Ok(()) + cc.unwrap() } } +pub async fn start_receivers( + rx_log: mpsc::Receiver, + rx_alert: mpsc::Receiver, + cfg_alert: Channels, + constants: ConsensusConstants, +) { + let const_copy = constants.clone(); + // spawn logging and alerting tasks to process status updates + tokio::spawn(async move { + process_status_log(rx_log, const_copy).await; + warn!("Logging task has exited"); + }); + tokio::spawn(async move { + process_status_alert(rx_alert, cfg_alert, constants).await; + warn!("Alerting task has exited"); + }); +} + type Reply = oneshot::Sender>; pub enum ManagerRequest { diff --git a/applications/tari_watcher/src/minotari.rs b/applications/tari_watcher/src/minotari.rs index 5a8a5af12..fac684e1c 100644 --- a/applications/tari_watcher/src/minotari.rs +++ b/applications/tari_watcher/src/minotari.rs @@ -90,8 +90,6 @@ impl Minotari { bail!("Node client not connected"); } - log::debug!("Requesting tip status from base node"); - let inner = self .node .clone() @@ -139,13 +137,13 @@ impl Minotari { break; }, Err(e) => { - bail!("Error getting active validator nodes: {}", e); + bail!("Error getting active VN: {}", e); }, } } if vns.is_empty() { - log::debug!("No active validator nodes found at height: {}", height); + log::debug!("No active VNs found at height: {}", height); } Ok(vns) @@ -156,7 +154,7 @@ impl Minotari { bail!("Node client not connected"); } - info!("Preparing to send a node registration request"); + info!("Preparing to send a VN registration request"); let info = read_registration_file(self.node_registration_file.clone()).await?; let sig = info.signature.signature(); @@ -172,16 +170,16 @@ impl Minotari { }), validator_node_claim_public_key: info.claim_fees_public_key.to_vec(), fee_per_gram: 10, - message: format!("Validator node registration: {}", info.public_key), + message: format!("VN registration: {}", info.public_key), sidechain_deployment_key: vec![], }) .await? .into_inner(); if !resp.is_success { - bail!("Failed to register validator node: {}", resp.failure_message); + bail!("Failed to register VN: {}", resp.failure_message); } - info!("Node registration request sent successfully"); + info!("VN registration request sent successfully"); Ok(resp) } diff --git a/applications/tari_watcher/src/monitoring.rs b/applications/tari_watcher/src/monitoring.rs index 33b6562a3..1d12a5bf9 100644 --- a/applications/tari_watcher/src/monitoring.rs +++ b/applications/tari_watcher/src/monitoring.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use log::*; -use minotari_app_grpc::tari_rpc::RegisterValidatorNodeResponse; +use minotari_app_grpc::tari_rpc::{ConsensusConstants, RegisterValidatorNodeResponse}; use tokio::{ process::Child, sync::mpsc, @@ -12,11 +12,8 @@ use tokio::{ use crate::{ alerting::{Alerting, MatterMostNotifier, TelegramNotifier}, config::Channels, - constants::{ - CONSENSUS_CONSTANT_REGISTRATION_DURATION, - DEFAULT_PROCESS_MONITORING_INTERVAL, - DEFAULT_THRESHOLD_WARN_EXPIRATION, - }, + constants::DEFAULT_PROCESS_MONITORING_INTERVAL, + helpers::is_warning_close_to_expiry, }; #[derive(Copy, Clone, Debug)] @@ -105,13 +102,7 @@ pub async fn monitor_child( } } -pub fn is_registration_near_expiration(curr_block: u64, last_registered_block: u64) -> bool { - last_registered_block != 0 && - curr_block + DEFAULT_THRESHOLD_WARN_EXPIRATION >= - last_registered_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION -} - -pub async fn process_status_log(mut rx: mpsc::Receiver) { +pub async fn process_status_log(mut rx: mpsc::Receiver, constants: ConsensusConstants) { loop { if let Some(status) = rx.recv().await { match status { @@ -140,8 +131,9 @@ pub async fn process_status_log(mut rx: mpsc::Receiver) { ); }, ProcessStatus::WarnExpiration(block, last_reg_block) => { - if is_registration_near_expiration(block, last_reg_block) { - let expiration_block = last_reg_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION; + if is_warning_close_to_expiry(constants.clone(), block, last_reg_block) { + let expiration_block = + last_reg_block + constants.validator_node_validity_period * constants.epoch_length; warn!( "Validator node registration expires at block {}, current block: {}", expiration_block, block @@ -157,7 +149,7 @@ fn setup_alerting_clients(cfg: Channels) -> (Option, Option< let mut mattermost: Option = None; if cfg.mattermost.enabled { let cfg = cfg.mattermost.clone(); - info!("MatterMost alerting enabled"); + info!("Mattermost alerting enabled"); mattermost = Some(MatterMostNotifier { server_url: cfg.server_url, channel_id: cfg.channel_id, @@ -166,7 +158,7 @@ fn setup_alerting_clients(cfg: Channels) -> (Option, Option< client: reqwest::Client::new(), }); } else { - info!("MatterMost alerting disabled"); + info!("Mattermost alerting disabled"); } let mut telegram: Option = None; @@ -186,7 +178,7 @@ fn setup_alerting_clients(cfg: Channels) -> (Option, Option< (mattermost, telegram) } -pub async fn process_status_alert(mut rx: mpsc::Receiver, cfg: Channels) { +pub async fn process_status_alert(mut rx: mpsc::Receiver, cfg: Channels, constants: ConsensusConstants) { let (mut mattermost, mut telegram) = setup_alerting_clients(cfg); loop { @@ -260,8 +252,9 @@ pub async fn process_status_alert(mut rx: mpsc::Receiver, cfg: Ch } }, ProcessStatus::WarnExpiration(block, last_reg_block) => { - if is_registration_near_expiration(block, last_reg_block) { - let expiration_block = last_reg_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION; + if is_warning_close_to_expiry(constants.clone(), block, last_reg_block) { + let expiration_block = + last_reg_block + constants.validator_node_validity_period * constants.epoch_length; if let Some(mm) = &mut mattermost { mm.alert(&format!( "Validator node registration expires at block {}, current block: {}", diff --git a/applications/tari_watcher/src/process.rs b/applications/tari_watcher/src/process.rs index 704e86aa4..43706cfa8 100644 --- a/applications/tari_watcher/src/process.rs +++ b/applications/tari_watcher/src/process.rs @@ -23,7 +23,7 @@ use crate::{ pub async fn clean_stale_pid_file(pid_file_path: PathBuf) -> anyhow::Result<()> { log::info!("Checking for stale PID file at {}", pid_file_path.display()); if !pid_file_path.exists() { - info!("PID file for validator does not exist, create new one"); + info!("PID file for VN does not exist, create new one"); return Ok(()); } @@ -154,7 +154,7 @@ pub async fn spawn_validator_node_os( break; } - info!("Received signal, preparing to restart validator node"); + info!("Received signal, preparing to restart VN process"); }, None => { error!("Failed to receive restart signal, exiting"); @@ -176,7 +176,7 @@ pub async fn spawn_validator_node_os( async fn check_existing_node_os(base_dir: PathBuf) -> Option { let process_dir = base_dir.join("processes"); if !process_dir.exists() { - debug!("Validator node process directory does not exist"); + debug!("VN process directory does not exist"); return None; } @@ -207,11 +207,11 @@ pub async fn start_validator( ) -> Option { let opt = check_existing_node_os(base_dir.clone()).await; if let Some(pid) = opt { - info!("Picking up existing validator node process with id: {}", pid); + info!("Picking up existing VN process with id: {}", pid); // todo: create new process status channel for picked up process return None; } else { - debug!("No existing validator node process found, spawn new one"); + debug!("No existing VN process found, spawn new one"); } let cc = spawn_validator_node_os( diff --git a/applications/tari_watcher/src/registration.rs b/applications/tari_watcher/src/registration.rs index 6cb3d09ae..b4bb4e45a 100644 --- a/applications/tari_watcher/src/registration.rs +++ b/applications/tari_watcher/src/registration.rs @@ -7,7 +7,7 @@ use tokio::time::{self, Duration}; use crate::{ config::Config, - helpers::{contains_key, read_registration_file, to_vn_public_keys}, + helpers::{contains_key, is_close_to_expiry, read_registration_file, to_vn_public_keys}, manager::ManagerHandle, }; @@ -15,41 +15,40 @@ use crate::{ // Amount of time to wait before the watcher runs a check again const REGISTRATION_LOOP_INTERVAL: Duration = Duration::from_secs(30); -// `registration_loop` periodically checks that the local node is still registered on the network. -// If it is no longer registered, it will attempt to re-register. It will do nothing if it is registered already. -// Currently, it will not keep track of when the registration was sent or register just in time before it expires. -// It is possible to add a threshold such as sending a registration request every (e.g.) 500 blocks to make sure it it -// always registered. -pub async fn registration_loop(config: Config, mut manager_handle: ManagerHandle) -> anyhow::Result { +// Periodically checks that the local node is still registered on the network. +// If it is no longer registered or close to expiry (1 epoch of blocks or less), it will attempt to re-register. +// It will do nothing if it is registered already and not close to expiry. +pub async fn registration_loop(config: Config, mut handle: ManagerHandle) -> anyhow::Result { let mut interval = time::interval(REGISTRATION_LOOP_INTERVAL); - let constants = manager_handle.get_consensus_constants(0).await?; - let total_blocks_duration = constants.validator_node_validity_period * constants.epoch_length; - info!( - "Registrations are currently valid for {} blocks ({} epochs)", - total_blocks_duration, constants.validator_node_validity_period - ); let local_node = read_registration_file(config.vn_registration_file).await?; let local_key = local_node.public_key; debug!("Local public key: {}", local_key.clone()); let mut last_block_hash: Option = None; + let mut last_registered: Option = None; + let mut recently_registered = false; loop { interval.tick().await; - let tip_info = manager_handle.get_tip_info().await; + let tip_info = handle.get_tip_info().await; if let Err(e) = tip_info { error!("Failed to get tip info: {}", e); continue; } - let curr_height = tip_info.as_ref().unwrap().height(); + + let current_block = tip_info.as_ref().unwrap().height(); if last_block_hash.is_none() || last_block_hash.unwrap() != tip_info.as_ref().unwrap().hash() { last_block_hash = Some(tip_info.unwrap().hash()); - debug!("New block hash at height {}: {}", curr_height, last_block_hash.unwrap()); + debug!( + "New block hash at height {}: {}", + current_block, + last_block_hash.unwrap() + ); } else { debug!("Same block as previous tick"); } - let vn_status = manager_handle.get_active_validator_nodes().await; + let vn_status = handle.get_active_validator_nodes().await; if let Err(e) = vn_status { error!("Failed to get active validators: {}", e); continue; @@ -60,26 +59,42 @@ pub async fn registration_loop(config: Config, mut manager_handle: ManagerHandle info!("{}", key); } - // if the node is already registered and still valid, skip registration - if contains_key(active_keys.clone(), local_key.clone()) { - info!("Node has an active registration, skip"); + let constants = handle.get_consensus_constants(current_block).await; + if let Err(e) = constants { + error!("Failed to get consensus constants: {}", e); + continue; + } + + // if the node is already registered and not close to expiring in the next epoch, skip registration + if contains_key(active_keys.clone(), local_key.clone()) && + !is_close_to_expiry(constants.unwrap(), current_block, last_registered) || + recently_registered + { + info!("VN has an active registration and will not expire in the next epoch, skip"); + recently_registered = false; continue; } - info!("Local node not active or about to expire, attempting to register.."); - let tx = manager_handle.register_validator_node(curr_height).await; + // if we are not currently registered or close to expiring, attempt to register + + info!("VN not active or about to expire, attempting to register.."); + let tx = handle.register_validator_node(current_block).await; if let Err(e) = tx { - error!("Failed to register node: {}", e); + error!("Failed to register VN: {}", e); continue; } let tx = tx.unwrap(); if !tx.is_success { - error!("Failed to register node: {}", tx.failure_message); + error!("Failed to register VN: {}", tx.failure_message); continue; } info!( - "Registered node at block {} with transaction id: {}", - curr_height, tx.transaction_id + "Registered VN at block {} with transaction id: {}", + current_block, tx.transaction_id ); + + last_registered = Some(current_block); + // give the network another tick to process the registration + recently_registered = true; } }