From f561d839ab3bc505495eac2f4f7966db1d5f6eec Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Mon, 10 Feb 2025 10:31:19 +0100 Subject: [PATCH 1/2] refactor(rust): move the notification sender logic to ockam_core --- .../rust/ockam/ockam_api/Cargo.toml | 2 +- .../ockam_api/src/cli_state/cli_state.rs | 46 +-------- .../ockam_api/src/cli_state/identities.rs | 9 +- .../ockam/ockam_api/src/cli_state/nodes.rs | 22 +++-- .../ockam/ockam_api/src/cli_state/vaults.rs | 15 +-- .../ockam_api/src/nodes/service/relay.rs | 48 +++++----- .../service/tcp_inlets/session_replacer.rs | 29 +++--- .../ockam/ockam_api/src/ui/terminal/mod.rs | 18 ++-- .../ockam_api/src/ui/terminal/notification.rs | 73 ++++++-------- .../rust/ockam/ockam_command/Cargo.toml | 2 +- .../ockam/ockam_command/src/enroll/command.rs | 3 +- .../ockam_command/src/identity/create.rs | 2 +- .../ockam/ockam_command/src/node/create.rs | 2 +- .../src/node/create/foreground.rs | 5 +- .../ockam/ockam_command/src/node/delete.rs | 2 +- .../rust/ockam/ockam_command/src/reset/mod.rs | 3 +- .../rust/ockam/ockam_core/Cargo.toml | 3 +- .../rust/ockam/ockam_core/src/lib.rs | 1 + .../rust/ockam/ockam_core/src/notifier.rs | 96 +++++++++++++++++++ .../src/portal/portal_worker.rs | 19 +++- 20 files changed, 225 insertions(+), 175 deletions(-) create mode 100644 implementations/rust/ockam/ockam_core/src/notifier.rs diff --git a/implementations/rust/ockam/ockam_api/Cargo.toml b/implementations/rust/ockam/ockam_api/Cargo.toml index e00bece9185..9760702eee4 100644 --- a/implementations/rust/ockam/ockam_api/Cargo.toml +++ b/implementations/rust/ockam/ockam_api/Cargo.toml @@ -78,7 +78,7 @@ miette = { version = "7.2.0", features = ["fancy-no-backtrace"] } minicbor = { version = "0.25.1", default-features = false, features = ["alloc", "derive"] } nix = { version = "0.29", features = ["signal"] } nu-ansi-term = "0.50" -once_cell = { version = "1", default-features = false } +once_cell = { version = "1.19.0", default-features = false } open = "5.3.0" opentelemetry = { version = "0.26.0", features = ["logs", "metrics", "trace"] } opentelemetry-appender-tracing = { version = "0.26.0" } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs index 8cc70d47a74..421aaaa6553 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs @@ -1,21 +1,14 @@ -use rand::random; -use std::path::{Path, PathBuf}; -use tokio::sync::broadcast::{channel, Receiver, Sender}; - -use ockam::SqlxDatabase; -use ockam_core::env::get_env_with_default; -use ockam_node::database::{DatabaseConfiguration, DatabaseType}; - use crate::cli_state::error::Result; use crate::cli_state::CliStateError; use crate::logs::ExportingEnabled; -use crate::terminal::notification::Notification; +use ockam::SqlxDatabase; +use ockam_core::env::get_env_with_default; +use ockam_node::database::{DatabaseConfiguration, DatabaseType}; +use rand::random; +use std::path::{Path, PathBuf}; pub const OCKAM_HOME: &str = "OCKAM_HOME"; -/// Maximum number of notifications present in the channel -const NOTIFICATIONS_CHANNEL_CAPACITY: usize = 16; - /// The CliState struct manages all the data persisted locally. /// /// The data is saved to several files: @@ -45,8 +38,6 @@ pub struct CliState { database: SqlxDatabase, application_database: SqlxDatabase, exporting_enabled: ExportingEnabled, - /// Broadcast channel to be notified of major events during a process supported by the CliState API - notifications: Sender, } impl CliState { @@ -91,30 +82,6 @@ impl CliState { pub fn application_database_configuration(&self) -> Result { Self::make_application_database_configuration(&self.mode) } - - pub fn subscribe_to_notifications(&self) -> Receiver { - self.notifications.subscribe() - } - - pub fn notify_message(&self, message: impl Into) { - self.notify(Notification::message(message)); - } - - pub fn notify_progress(&self, message: impl Into) { - self.notify(Notification::progress(message)); - } - - pub fn notify_progress_finish(&self, message: impl Into) { - self.notify(Notification::progress_finish(Some(message.into()))); - } - - pub fn notify_progress_finish_and_clear(&self) { - self.notify(Notification::progress_finish(None)); - } - - fn notify(&self, notification: Notification) { - let _ = self.notifications.send(notification); - } } /// These functions allow to create and reset the local state @@ -240,8 +207,6 @@ impl CliState { application_database ); - let (notifications, _) = channel::(NOTIFICATIONS_CHANNEL_CAPACITY); - let state = Self { mode, database, @@ -251,7 +216,6 @@ impl CliState { // the function set_tracing_enabled can be used to enable tracing, which // is eventually used to trace user journeys. exporting_enabled: ExportingEnabled::Off, - notifications, }; Ok(state) diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/identities.rs b/implementations/rust/ockam/ockam_api/src/cli_state/identities.rs index 695f2d3df33..73c8d257da0 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/identities.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/identities.rs @@ -2,6 +2,7 @@ use colorful::Colorful; use ockam::identity::models::ChangeHistory; use ockam::identity::{Identifier, Identity}; use ockam_core::errcode::{Kind, Origin}; +use ockam_core::notifier::notify; use ockam_core::Error; use ockam_vault::{HandleToSecret, SigningSecretKeyHandle}; @@ -44,17 +45,17 @@ impl CliState { let named_identity = self .store_named_identity(&identity, name, vault_name) .await?; - self.notify_message(fmt_ok!( + notify(fmt_ok!( "Generated a new Identity named {}.", color_primary(named_identity.name()) )); - self.notify_message(fmt_log!( + notify(fmt_log!( "{} has Identifier {}", color_primary(named_identity.name()), color_primary(named_identity.identifier().to_string()) )); if named_identity.is_default() { - self.notify_message(fmt_ok!( + notify(fmt_ok!( "Marked {} as your default Identity, on this machine.\n", color_primary(named_identity.name()) )); @@ -260,7 +261,7 @@ impl CliState { Some(named_identity) => Ok(named_identity), // Create a new default identity. None => { - self.notify_message(fmt_log!( + notify(fmt_log!( "There is no default Identity on this machine, generating one...\n" )); self.create_identity_with_name(&random_name()).await diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs b/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs index b66170641b8..2499bf3e1d6 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs @@ -1,3 +1,7 @@ +use crate::cli_state::{random_name, NamedVault, Result}; +use crate::cli_state::{CliState, CliStateError}; +use crate::colors::color_primary; +use crate::config::lookup::InternetAddress; use colorful::Colorful; use minicbor::{CborLen, Decode, Encode}; use nix::errno::Errno; @@ -6,6 +10,9 @@ use ockam::identity::utils::now; use ockam::identity::Identifier; use ockam::tcp::TcpListener; use ockam_core::errcode::{Kind, Origin}; +use ockam_core::notifier::{ + notify, notify_end_spinner, notify_end_spinner_and_clear, notify_with_spinner, +}; use ockam_core::Error; use ockam_multiaddr::proto::{DnsAddr, Node, Tcp}; use ockam_multiaddr::MultiAddr; @@ -16,11 +23,6 @@ use std::process; use std::time::Duration; use sysinfo::{Pid, ProcessStatus, ProcessesToUpdate, System}; -use crate::cli_state::{random_name, NamedVault, Result}; -use crate::cli_state::{CliState, CliStateError}; -use crate::colors::color_primary; -use crate::config::lookup::InternetAddress; - use crate::{fmt_warn, ConnectionStatus}; /// The methods below support the creation and update of local nodes @@ -116,7 +118,7 @@ impl CliState { let nodes = self.nodes_repository().get_nodes().await?; for node in nodes { if let Err(err) = self.delete_node(&node.name()).await { - self.notify_message(fmt_warn!( + notify(fmt_warn!( "Failed to delete the node {}: {err}", color_primary(node.name()) )); @@ -177,7 +179,7 @@ impl CliState { error!(name=%node_name, %pid, %e, "failed to stop node process with SIGKILL"); return Err(e); } else { - self.notify_progress_finish(format!( + notify_end_spinner(format!( "The node {} has been stopped", color_primary(node_name), )); @@ -214,12 +216,12 @@ impl CliState { // Return if max attempts have been reached if attempts > max_attempts { warn!(name = %node_name, %pid, %signal, "node process did not exit"); - self.notify_progress_finish_and_clear(); + notify_end_spinner_and_clear(); return Err(err); } // Notify the user that the node is stopping if it takes too long if attempts == show_message_at_attempt { - self.notify_progress(format!( + notify_with_spinner(format!( "Waiting for node's {} process {} to stop", color_primary(node_name), color_primary(pid) @@ -230,7 +232,7 @@ impl CliState { } } } - self.notify_progress_finish_and_clear(); + notify_end_spinner_and_clear(); Ok(()) } diff --git a/implementations/rust/ockam/ockam_api/src/cli_state/vaults.rs b/implementations/rust/ockam/ockam_api/src/cli_state/vaults.rs index bd624f34233..b8ab42168e4 100644 --- a/implementations/rust/ockam/ockam_api/src/cli_state/vaults.rs +++ b/implementations/rust/ockam/ockam_api/src/cli_state/vaults.rs @@ -5,6 +5,7 @@ use crate::{fmt_log, fmt_ok, fmt_warn}; use colorful::Colorful; use ockam::identity::{Identities, Vault}; use ockam_core::errcode::{Kind, Origin}; +use ockam_core::notifier::notify; use ockam_node::database::SqlxDatabase; use ockam_vault_aws::AwsSigningVault; use std::fmt::Write; @@ -124,7 +125,7 @@ impl CliState { .delete_identity(&identity.name()) .await { - self.notify_message(fmt_warn!( + notify(fmt_warn!( "Failed to delete the identity {}: {err}", color_primary(identity.name()) )); @@ -139,7 +140,7 @@ impl CliState { let vaults = self.vaults_repository().get_named_vaults().await?; for vault in vaults { if let Err(err) = self.delete_named_vault(&vault.name()).await { - self.notify_message(fmt_warn!( + notify(fmt_warn!( "Failed to delete the vault {}: {err}", color_primary(vault.name()) )); @@ -182,7 +183,7 @@ impl CliState { return Ok(existing_vault); } - self.notify_message(fmt_log!( + notify(fmt_log!( "This Identity needs a Vault to store its secrets." )); let named_vault = if self @@ -191,13 +192,13 @@ impl CliState { .await? .is_none() { - self.notify_message(fmt_log!( + notify(fmt_log!( "There is no default Vault on this machine, creating one..." )); let vault = self .create_database_vault(vault_name.to_string(), UseAwsKms::No) .await?; - self.notify_message(fmt_ok!( + notify(fmt_ok!( "Created a new Vault named {}.", color_primary(vault_name) )); @@ -210,7 +211,7 @@ impl CliState { UseAwsKms::No, ) .await?; - self.notify_message(fmt_ok!( + notify(fmt_ok!( "Created a new Vault named {} on your disk.", color_primary(vault_name) )); @@ -218,7 +219,7 @@ impl CliState { }; if named_vault.is_default() { - self.notify_message(fmt_ok!( + notify(fmt_ok!( "Marked this new Vault as your default Vault, on this machine.\n" )); } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs index a57f7fc89d9..0b90abc85cd 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs @@ -4,18 +4,6 @@ use std::time::Duration; use colorful::Colorful; use miette::IntoDiagnostic; -use ockam::identity::models::CredentialAndPurposeKey; -use ockam::identity::Identifier; -use ockam::remote::{RemoteRelay, RemoteRelayOptions}; -use ockam::Result; -use ockam_core::api::{Error, Request, RequestHeader, Response}; -use ockam_core::errcode::{Kind, Origin}; -use ockam_core::{async_trait, Address, TryClone}; -use ockam_multiaddr::MultiAddr; -use ockam_node::compat::asynchronous::Mutex as AsyncMutex; -use ockam_node::compat::asynchronous::Mutex; -use ockam_node::Context; - use super::{NodeManager, NodeManagerWorker}; use crate::colors::color_primary; use crate::nodes::connection::Connection; @@ -30,6 +18,18 @@ use crate::nodes::BackgroundNodeClient; use crate::session::replacer::{ReplacerOutcome, ReplacerOutputKind, SessionReplacer}; use crate::session::session::Session; use crate::{fmt_info, fmt_ok, fmt_warn}; +use ockam::identity::models::CredentialAndPurposeKey; +use ockam::identity::Identifier; +use ockam::remote::{RemoteRelay, RemoteRelayOptions}; +use ockam::Result; +use ockam_core::api::{Error, Request, RequestHeader, Response}; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::notifier::notify; +use ockam_core::{async_trait, Address, TryClone}; +use ockam_multiaddr::MultiAddr; +use ockam_node::compat::asynchronous::Mutex as AsyncMutex; +use ockam_node::compat::asynchronous::Mutex; +use ockam_node::Context; impl NodeManagerWorker { pub async fn create_relay( @@ -377,23 +377,19 @@ impl SessionReplacer for RelaySessionReplacer { } async fn on_session_down(&self) { - if let Some(node_manager) = self.node_manager.upgrade() { - node_manager.cli_state.notify_message( - fmt_warn!( - "The Node lost the connection to the Relay at {}\n", - color_primary(&self.addr) - ) + &fmt_info!("Attempting to reconnect...\n"), - ); - } + notify( + fmt_warn!( + "The Node lost the connection to the Relay at {}\n", + color_primary(&self.addr) + ) + &fmt_info!("Attempting to reconnect...\n"), + ); } async fn on_session_replaced(&self) { - if let Some(node_manager) = self.node_manager.upgrade() { - node_manager.cli_state.notify_message(fmt_ok!( - "The Node has restored the connection to the Relay at {}\n", - color_primary(&self.addr) - )); - } + notify(fmt_ok!( + "The Node has restored the connection to the Relay at {}\n", + color_primary(&self.addr) + )); } } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs index 90f12f370bf..347844be272 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs @@ -11,6 +11,7 @@ use ockam::udp::{UdpPuncture, UdpPunctureNegotiation, UdpTransport}; use ockam::Result; use ockam_abac::{Action, PolicyExpression, Resource}; use ockam_core::errcode::{Kind, Origin}; +use ockam_core::notifier::notify; use ockam_core::{async_trait, route, Error, IncomingAccessControl, OutgoingAccessControl, Route}; use ockam_multiaddr::proto::Project as ProjectProto; use ockam_multiaddr::MultiAddr; @@ -299,25 +300,21 @@ impl SessionReplacer for InletSessionReplacer { } async fn on_session_down(&self) { - if let Some(node_manager) = self.node_manager.upgrade() { - node_manager.cli_state.notify_message( - fmt_warn!( - "The TCP Inlet at {} lost the connection to the TCP Outlet at {}\n", - color_primary(&self.listen_addr), - color_primary(&self.outlet_addr) - ) + &fmt_info!("Attempting to reconnect...\n"), - ); - } + notify( + fmt_warn!( + "The TCP Inlet at {} lost the connection to the TCP Outlet at {}\n", + color_primary(&self.listen_addr), + color_primary(&self.outlet_addr) + ) + &fmt_info!("Attempting to reconnect...\n"), + ); } async fn on_session_replaced(&self) { - if let Some(node_manager) = self.node_manager.upgrade() { - node_manager.cli_state.notify_message(fmt_ok!( - "The TCP Inlet at {} has restored the connection to the TCP Outlet at {}\n", - color_primary(&self.listen_addr), - color_primary(&self.outlet_addr) - )); - } + notify(fmt_ok!( + "The TCP Inlet at {} has restored the connection to the TCP Outlet at {}\n", + color_primary(&self.listen_addr), + color_primary(&self.outlet_addr) + )); } } diff --git a/implementations/rust/ockam/ockam_api/src/ui/terminal/mod.rs b/implementations/rust/ockam/ockam_api/src/ui/terminal/mod.rs index 619c463b00a..8fad2f5a83b 100644 --- a/implementations/rust/ockam/ockam_api/src/ui/terminal/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/ui/terminal/mod.rs @@ -28,7 +28,7 @@ use tracing::warn; /// A terminal abstraction to handle commands' output and messages styling. #[derive(Clone, Debug)] -pub struct Terminal { +pub struct Terminal { stdout: T, stderr: T, logging_enabled: bool, @@ -41,7 +41,7 @@ pub struct Terminal { max_height_row_count: usize, } -impl Terminal { +impl Terminal { pub fn is_quiet(&self) -> bool { self.quiet } @@ -86,7 +86,7 @@ impl TerminalStream { } /// Trait defining the main methods to write messages to a terminal stream. -pub trait TerminalWriter: Clone { +pub trait TerminalWriter: Clone + Debug { fn stdout(no_color: bool, branding: OutputBranding) -> Self; fn stderr(no_color: bool, branding: OutputBranding) -> Self; fn is_tty(&self) -> bool; @@ -98,7 +98,7 @@ pub trait TerminalWriter: Clone { } // Core functions -impl Terminal { +impl Terminal { #[allow(clippy::too_many_arguments)] pub fn new( logging_enabled: bool, @@ -231,7 +231,7 @@ impl Terminal { } // Logging mode -impl Terminal { +impl Terminal { pub fn is_tty(&self) -> bool { self.stderr.is_tty() } @@ -319,7 +319,7 @@ impl Terminal { } // Finished mode -impl Terminal { +impl Terminal { pub fn is_tty(&self) -> bool { self.stdout.is_tty() } @@ -396,13 +396,13 @@ impl Terminal { } // Extensions -impl Terminal { - pub fn can_use_progress_bar(&self) -> bool { +impl Terminal { + pub fn can_use_interactive_elements(&self) -> bool { self.stderr.is_tty() && self.can_write_to_stderr() } pub fn spinner(&self) -> Option { - if !self.can_use_progress_bar() { + if !self.can_use_interactive_elements() { return None; } diff --git a/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs b/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs index 968151b9e56..178bfcdaac8 100644 --- a/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs +++ b/implementations/rust/ockam/ockam_api/src/ui/terminal/notification.rs @@ -1,52 +1,18 @@ +use crate::fmt_log; use crate::terminal::{Terminal, TerminalWriter}; -use crate::{fmt_log, CliState}; use core::sync::atomic::AtomicBool; use core::sync::atomic::Ordering::{Acquire, Release}; use indicatif::ProgressBar; +use ockam_core::notifier::Notification; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; use tokio::select; - use tokio::sync::broadcast::Receiver; use tokio::time::sleep; const REPORTING_CHANNEL_POLL_DELAY: Duration = Duration::from_millis(20); -#[derive(Debug, Clone, PartialEq)] -pub enum Notification { - Message(String), - Progress(String), - ProgressFinishWithMessage(String), - ProgressFinishAndClear(), -} - -impl Notification { - pub fn contents(&self) -> Option<&str> { - match self { - Notification::Message(contents) => Some(contents), - Notification::Progress(contents) => Some(contents), - Notification::ProgressFinishWithMessage(contents) => Some(contents), - Notification::ProgressFinishAndClear() => None, - } - } - - pub fn message(contents: impl Into) -> Self { - Self::Message(contents.into()) - } - - pub fn progress(contents: impl Into) -> Self { - Self::Progress(contents.into()) - } - - pub fn progress_finish(contents: impl Into>) -> Self { - match contents.into() { - Some(contents) => Self::ProgressFinishWithMessage(contents), - None => Self::ProgressFinishAndClear(), - } - } -} - pub struct NotificationHandle { stop: Arc, } @@ -59,7 +25,7 @@ impl Drop for NotificationHandle { /// This struct displays notifications coming from the CliState when commands are executed #[derive(Debug)] -pub struct NotificationHandler { +pub struct NotificationHandler { /// Channel to receive notifications rx: Receiver, /// If there is a progress bar, it is used to display messages as they arrive with a spinner @@ -71,13 +37,13 @@ pub struct NotificationHandler { stop: Arc, } -impl NotificationHandler { +impl NotificationHandler { /// Create a new NotificationsProgress without progress bar. /// The notifications are printed as they arrive and stay on screen - pub fn start(cli_state: &CliState, terminal: Terminal) -> NotificationHandle { + pub fn start(terminal: Terminal) -> NotificationHandle { let stop = Arc::new(AtomicBool::new(false)); let _self = NotificationHandler { - rx: cli_state.subscribe_to_notifications(), + rx: ockam_core::notifier::receiver(), terminal: terminal.clone(), progress_bar: None, stop: stop.clone(), @@ -92,6 +58,7 @@ impl NotificationHandler { select! { _ = sleep(REPORTING_CHANNEL_POLL_DELAY) => { if self.stop.load(Acquire) { + debug!("stopping notification handler"); // Drain the channel while let Ok(notification) = self.rx.try_recv() { self.handle_notification(notification); @@ -101,10 +68,12 @@ impl NotificationHandler { } notification = self.rx.recv() => { if let Ok(notification) = notification { + trace!(?notification, "received notification"); self.handle_notification(notification); } // The channel was closed else { + debug!("notification channel closed"); break; } } @@ -116,26 +85,25 @@ impl NotificationHandler { fn handle_notification(&mut self, notification: Notification) { match notification { Notification::Message(contents) => { - let _ = self.terminal.write_line(contents); + let _ = self.terminal.write_line(self.process_contents(contents)); } Notification::Progress(contents) => { - if self.terminal.can_use_progress_bar() { + if self.terminal.can_use_interactive_elements() { if self.progress_bar.is_none() { self.progress_bar = self.terminal.spinner(); } if let Some(pb) = self.progress_bar.as_ref() { - pb.set_message(contents); + pb.set_message(self.process_contents(contents)); } } // If the progress bar can't be used (non-tty), handle as a regular message else { - // Since progress bar messages are not formatted, apply the log formatting here - let _ = self.terminal.write_line(fmt_log!("{}", contents)); + let _ = self.terminal.write_line(self.process_contents(contents)); } } Notification::ProgressFinishWithMessage(contents) => { if let Some(pb) = self.progress_bar.take() { - pb.finish_with_message(contents); + pb.finish_with_message(self.process_contents(contents)); } } Notification::ProgressFinishAndClear() => { @@ -145,4 +113,17 @@ impl NotificationHandler { } } } + + fn process_contents(&self, contents: String) -> String { + // if has an expected padding, return as is + if contents.starts_with(crate::terminal::PADDING) + || contents.starts_with(crate::terminal::ICON_PADDING) + { + contents + } + // if not, format as a log message + else { + fmt_log!("{}", contents) + } + } } diff --git a/implementations/rust/ockam/ockam_command/Cargo.toml b/implementations/rust/ockam/ockam_command/Cargo.toml index 63904b33a2c..695f12eeace 100644 --- a/implementations/rust/ockam/ockam_command/Cargo.toml +++ b/implementations/rust/ockam/ockam_command/Cargo.toml @@ -73,7 +73,7 @@ ockam_core = { path = "../ockam_core", version = "^0.124.0" } ockam_multiaddr = { path = "../ockam_multiaddr", version = "0.69.0", features = ["std"] } ockam_node = { path = "../ockam_node", version = "^0.137.0" } ockam_vault = { path = "../ockam_vault", version = "^0.130.0", default-features = false, features = ["storage", "std"] } -once_cell = "1.19" +once_cell = { version = "1.19.0", default-features = false } open = "5.3.0" opentelemetry = { version = "0.26.0", features = ["metrics", "trace"] } pem-rfc7468 = { version = "0.7.0", features = ["std"] } diff --git a/implementations/rust/ockam/ockam_command/src/enroll/command.rs b/implementations/rust/ockam/ockam_command/src/enroll/command.rs index 8057efa9e9f..a6d7a1fabdf 100644 --- a/implementations/rust/ockam/ockam_command/src/enroll/command.rs +++ b/implementations/rust/ockam/ockam_command/src/enroll/command.rs @@ -113,8 +113,7 @@ impl EnrollCommand { display_header(&opts); let identity = { - let _notification_handler = - NotificationHandler::start(&opts.state, opts.terminal.clone()); + let _notification_handler = NotificationHandler::start(opts.terminal.clone()); opts.state .get_named_identity_or_default(&self.identity) .await? diff --git a/implementations/rust/ockam/ockam_command/src/identity/create.rs b/implementations/rust/ockam/ockam_command/src/identity/create.rs index dee06ee32ac..6c067b686bc 100644 --- a/implementations/rust/ockam/ockam_command/src/identity/create.rs +++ b/implementations/rust/ockam/ockam_command/src/identity/create.rs @@ -46,7 +46,7 @@ impl Command for CreateCommand { const NAME: &'static str = "identity create"; async fn run(self, _ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { - let _notification_handler = NotificationHandler::start(&opts.state, opts.terminal.clone()); + let _notification_handler = NotificationHandler::start(opts.terminal.clone()); let vault = match &self.vault { Some(vault_name) => opts.state.get_or_create_named_vault(vault_name).await?, None => opts.state.get_or_create_default_named_vault().await?, diff --git a/implementations/rust/ockam/ockam_command/src/node/create.rs b/implementations/rust/ockam/ockam_command/src/node/create.rs index 4073eb1f2da..f487c0c34fb 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create.rs @@ -374,7 +374,7 @@ impl CreateCommand { opts: &CommandGlobalOpts, identity_name: &Option, ) -> Result { - let _notification_handler = NotificationHandler::start(&opts.state, opts.terminal.clone()); + let _notification_handler = NotificationHandler::start(opts.terminal.clone()); Ok(match identity_name { Some(name) => { if let Ok(identity) = opts.state.get_named_identity(name).await { diff --git a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs index 65346b17bed..d3720b28d13 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs @@ -59,10 +59,7 @@ impl CreateCommand { None } else { // Enable the notifications only on explicit foreground nodes. - Some(NotificationHandler::start( - &opts.state, - opts.terminal.clone(), - )) + Some(NotificationHandler::start(opts.terminal.clone())) }; let node_info = opts .state diff --git a/implementations/rust/ockam/ockam_command/src/node/delete.rs b/implementations/rust/ockam/ockam_command/src/node/delete.rs index 6f9e62d4fe0..25aebf33958 100644 --- a/implementations/rust/ockam/ockam_command/src/node/delete.rs +++ b/implementations/rust/ockam/ockam_command/src/node/delete.rs @@ -58,7 +58,7 @@ pub struct DeleteTui { impl DeleteTui { pub async fn run(opts: CommandGlobalOpts, cmd: DeleteCommand) -> miette::Result<()> { - let _notification_handler = NotificationHandler::start(&opts.state, opts.terminal.clone()); + let _notification_handler = NotificationHandler::start(opts.terminal.clone()); let tui = Self { opts, cmd }; tui.delete().await } diff --git a/implementations/rust/ockam/ockam_command/src/reset/mod.rs b/implementations/rust/ockam/ockam_command/src/reset/mod.rs index 169d305f583..10e3f617e95 100644 --- a/implementations/rust/ockam/ockam_command/src/reset/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/reset/mod.rs @@ -80,8 +80,7 @@ impl ResetCommand { } } { - let _notification_handler = - NotificationHandler::start(&opts.state, opts.terminal.clone()); + let _notification_handler = NotificationHandler::start(opts.terminal.clone()); opts.state.reset().await?; } diff --git a/implementations/rust/ockam/ockam_core/Cargo.toml b/implementations/rust/ockam/ockam_core/Cargo.toml index b346319dc7a..d653efbb48b 100644 --- a/implementations/rust/ockam/ockam_core/Cargo.toml +++ b/implementations/rust/ockam/ockam_core/Cargo.toml @@ -81,7 +81,7 @@ hex = { version = "0.4", default-features = false, optional = true } miette = { version = "7.2.0", features = ["fancy-no-backtrace"], optional = true } minicbor = { version = "0.25.1", default-features = false, features = ["derive"] } ockam_macros = { path = "../ockam_macros", version = "^0.37.0", default-features = false } -once_cell = { version = "1", optional = true, default-features = false } +once_cell = { version = "1.19.0", optional = true, default-features = false } opentelemetry = { version = "0.26.0", features = ["logs", "metrics", "trace"], optional = true } rand = { version = "0.8", default-features = false } rand_pcg = { version = "0.3.1", default-features = false, optional = true } @@ -92,6 +92,7 @@ serde_json = { version = "1", optional = true } spin = { version = "0.9.8", default-features = false, features = ["mutex", "rwlock", "spin_mutex"], optional = true } strum = { version = "0.26.3", default-features = false, features = ["derive"] } tinyvec = { version = "1.8.0", features = ["rustc_1_57"] } +tokio = { version = "1.41", default-features = false, features = ["sync"] } tracing = { version = "0.1", default-features = false } tracing-error = { version = "0.2", default-features = false, optional = true } tracing-opentelemetry = { version = "0.27.0", optional = true } diff --git a/implementations/rust/ockam/ockam_core/src/lib.rs b/implementations/rust/ockam/ockam_core/src/lib.rs index 8b9aa00cf4d..97a4955c85e 100644 --- a/implementations/rust/ockam/ockam_core/src/lib.rs +++ b/implementations/rust/ockam/ockam_core/src/lib.rs @@ -82,6 +82,7 @@ mod cbor; mod error; mod identity; mod message; +pub mod notifier; mod processor; mod routing; mod uint; diff --git a/implementations/rust/ockam/ockam_core/src/notifier.rs b/implementations/rust/ockam/ockam_core/src/notifier.rs new file mode 100644 index 00000000000..d6c293578bc --- /dev/null +++ b/implementations/rust/ockam/ockam_core/src/notifier.rs @@ -0,0 +1,96 @@ +//! This module provides a way to send high-level notifications meant to be +//! consumed by a user interface, such as the Ockam Command. + +use once_cell::sync::Lazy; +use tokio::sync::broadcast::{channel, Receiver, Sender}; + +/// Maximum number of notifications present in the channel +const NOTIFICATIONS_CHANNEL_CAPACITY: usize = 32; + +/// Global notifier channel to send notifications. +/// We use a broadcast channel to allow multiple consumers to receive notifications. +static NOTIFIER: Lazy> = Lazy::new(|| { + let (tx, _) = channel::(NOTIFICATIONS_CHANNEL_CAPACITY); + tx +}); + +/// Get a sender to send notifications +pub fn sender() -> Sender { + NOTIFIER.clone() +} + +/// Get a receiver to receive notifications +pub fn receiver() -> Receiver { + NOTIFIER.subscribe() +} + +/// Send a simple message +pub fn notify(message: impl Into) { + if let Err(err) = NOTIFIER.send(Notification::message(message)) { + warn!(%err, "couldn't send notification"); + } +} + +/// Send a message that will be attached to a spinner +pub fn notify_with_spinner(message: impl Into) { + if let Err(err) = NOTIFIER.send(Notification::progress(message)) { + warn!(%err, "couldn't to send notification"); + } +} + +/// Finish a previously set spinner with a message +pub fn notify_end_spinner(message: impl Into>) { + if let Err(err) = NOTIFIER.send(Notification::progress_finish(message)) { + warn!(%err, "couldn't to send notification"); + } +} + +/// Finish a previously set spinner and clears it +pub fn notify_end_spinner_and_clear() { + if let Err(err) = NOTIFIER.send(Notification::ProgressFinishAndClear()) { + warn!(%err, "failed to send notification"); + } +} + +/// Notification types that can be sent +#[derive(Debug, Clone, PartialEq)] +pub enum Notification { + /// A simple message + Message(String), + /// A message that will be attached to a spinner + Progress(String), + /// Finish a previously set spinner with a message + ProgressFinishWithMessage(String), + /// Finish a previously set spinner and clears it + ProgressFinishAndClear(), +} + +impl Notification { + /// Get the contents of the notification + pub fn contents(&self) -> Option<&str> { + match self { + Notification::Message(contents) => Some(contents), + Notification::Progress(contents) => Some(contents), + Notification::ProgressFinishWithMessage(contents) => Some(contents), + Notification::ProgressFinishAndClear() => None, + } + } + + /// Create a message notification + pub fn message(contents: impl Into) -> Self { + Self::Message(contents.into()) + } + + /// Create a progress notification + pub fn progress(contents: impl Into) -> Self { + Self::Progress(contents.into()) + } + + /// Create a progress finish notification + pub fn progress_finish(contents: impl Into>) -> Self { + match contents.into() { + Some(contents) => Self::ProgressFinishWithMessage(contents), + None => Self::ProgressFinishAndClear(), + } + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs index 0b0272e0694..f3837d3ae91 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs @@ -6,8 +6,8 @@ use crate::transport::{connect, connect_tls}; use crate::{portal::TcpPortalRecvProcessor, PortalInternalMessage, PortalMessage, TcpRegistry}; use ockam_core::compat::{boxed::Box, sync::Arc}; use ockam_core::{ - async_trait, AllowAll, AllowOnwardAddress, AllowSourceAddress, Decodable, DenyAll, - IncomingAccessControl, LocalInfoIdentifier, Mailbox, Mailboxes, OutgoingAccessControl, + async_trait, notifier::notify, AllowAll, AllowOnwardAddress, AllowSourceAddress, Decodable, + DenyAll, IncomingAccessControl, LocalInfoIdentifier, Mailbox, Mailboxes, OutgoingAccessControl, SecureChannelLocalInfo, }; use ockam_core::{Any, Result, Route, Routed, Worker}; @@ -529,6 +529,14 @@ impl Worker for TcpPortalWorker { self.registry .add_portal_worker(&self.addresses.sender_remote); + if self.portal_type == PortalType::Outlet { + notify(format!( + "Outlet at {} received connection from {}", + &self.hostname_port, + &self.remote_route.clone().unwrap_or(Route::new().build()), + )); + } + info!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "tcp portal worker initialized" ); @@ -647,6 +655,13 @@ impl TcpPortalWorker { #[instrument(skip_all)] async fn handle_disconnect(&mut self, ctx: &Context) -> Result<()> { + if self.portal_type == PortalType::Outlet { + notify(format!( + "Outlet at {} was disconnected from {}", + &self.hostname_port, + &self.remote_route.clone().unwrap_or(Route::new().build()), + )); + } info!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "tcp stream was dropped"); self.start_disconnection(ctx, DisconnectionReason::FailedRx) From 4007dd6b6732c8443621c523f7da3ffe8a6da7df Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Mon, 10 Feb 2025 15:26:59 +0100 Subject: [PATCH 2/2] fix(rust): session loop updates the initial_connect_was_called var after reconnecting --- .../rust/ockam/ockam_api/src/session/session.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/implementations/rust/ockam/ockam_api/src/session/session.rs b/implementations/rust/ockam/ockam_api/src/session/session.rs index a0457e51ab5..8b9dccc444e 100644 --- a/implementations/rust/ockam/ockam_api/src/session/session.rs +++ b/implementations/rust/ockam/ockam_api/src/session/session.rs @@ -439,7 +439,7 @@ impl Session { async fn run_loop( ctx: Context, key: String, - initial_connect_was_called: bool, + mut initial_connect_was_called: bool, collector_address: Address, shared_state: SharedState, ping_interval: Duration, @@ -479,6 +479,7 @@ impl Session { // The session is down, or we reached the maximum number of failures _ => { let mut replacer = shared_state.replacer.lock().await; + debug!(key = %key, %initial_connect_was_called, %first_creation, pings = %pings.len(), "session state"); if first_creation && !initial_connect_was_called { debug!(key = %key, "session is down. starting"); @@ -487,7 +488,7 @@ impl Session { warn!(key = %key, "session unresponsive. replacing"); } - if !first_creation && pings.len() > 0 { + if initial_connect_was_called && pings.len() > 0 { replacer.on_session_down().await; } @@ -502,9 +503,9 @@ impl Session { match replacer.create().await { Ok(replacer_outcome) => { info!(key = %key, ping_route = %replacer_outcome.ping_route, "replacement is up"); - if !first_creation { - replacer.on_session_replaced().await; - } + + initial_connect_was_called = true; + replacer.on_session_replaced().await; shared_state.status.set_up(replacer_outcome.ping_route); shared_state