Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): move the notification sender logic to ockam_core #8795

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ miette = { version = "7.2.0", features = ["fancy-no-backtrace"] }
minicbor = { version = "0.25.1", default-features = false, features = ["alloc", "derive"] }
nix = { version = "0.29", features = ["signal"] }
nu-ansi-term = "0.50"
once_cell = { version = "1", default-features = false }
once_cell = { version = "1.19.0", default-features = false }
open = "5.3.0"
opentelemetry = { version = "0.26.0", features = ["logs", "metrics", "trace"] }
opentelemetry-appender-tracing = { version = "0.26.0" }
Expand Down
46 changes: 5 additions & 41 deletions implementations/rust/ockam/ockam_api/src/cli_state/cli_state.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
use rand::random;
use std::path::{Path, PathBuf};
use tokio::sync::broadcast::{channel, Receiver, Sender};

use ockam::SqlxDatabase;
use ockam_core::env::get_env_with_default;
use ockam_node::database::{DatabaseConfiguration, DatabaseType};

use crate::cli_state::error::Result;
use crate::cli_state::CliStateError;
use crate::logs::ExportingEnabled;
use crate::terminal::notification::Notification;
use ockam::SqlxDatabase;
use ockam_core::env::get_env_with_default;
use ockam_node::database::{DatabaseConfiguration, DatabaseType};
use rand::random;
use std::path::{Path, PathBuf};

pub const OCKAM_HOME: &str = "OCKAM_HOME";

/// Maximum number of notifications present in the channel
const NOTIFICATIONS_CHANNEL_CAPACITY: usize = 16;

/// The CliState struct manages all the data persisted locally.
///
/// The data is saved to several files:
Expand Down Expand Up @@ -45,8 +38,6 @@ pub struct CliState {
database: SqlxDatabase,
application_database: SqlxDatabase,
exporting_enabled: ExportingEnabled,
/// Broadcast channel to be notified of major events during a process supported by the CliState API
notifications: Sender<Notification>,
}

impl CliState {
Expand Down Expand Up @@ -91,30 +82,6 @@ impl CliState {
pub fn application_database_configuration(&self) -> Result<DatabaseConfiguration> {
Self::make_application_database_configuration(&self.mode)
}

pub fn subscribe_to_notifications(&self) -> Receiver<Notification> {
self.notifications.subscribe()
}

pub fn notify_message(&self, message: impl Into<String>) {
self.notify(Notification::message(message));
}

pub fn notify_progress(&self, message: impl Into<String>) {
self.notify(Notification::progress(message));
}

pub fn notify_progress_finish(&self, message: impl Into<String>) {
self.notify(Notification::progress_finish(Some(message.into())));
}

pub fn notify_progress_finish_and_clear(&self) {
self.notify(Notification::progress_finish(None));
}

fn notify(&self, notification: Notification) {
let _ = self.notifications.send(notification);
}
}

/// These functions allow to create and reset the local state
Expand Down Expand Up @@ -240,8 +207,6 @@ impl CliState {
application_database
);

let (notifications, _) = channel::<Notification>(NOTIFICATIONS_CHANNEL_CAPACITY);

let state = Self {
mode,
database,
Expand All @@ -251,7 +216,6 @@ impl CliState {
// the function set_tracing_enabled can be used to enable tracing, which
// is eventually used to trace user journeys.
exporting_enabled: ExportingEnabled::Off,
notifications,
};

Ok(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use colorful::Colorful;
use ockam::identity::models::ChangeHistory;
use ockam::identity::{Identifier, Identity};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::notifier::notify;
use ockam_core::Error;
use ockam_vault::{HandleToSecret, SigningSecretKeyHandle};

Expand Down Expand Up @@ -44,17 +45,17 @@ impl CliState {
let named_identity = self
.store_named_identity(&identity, name, vault_name)
.await?;
self.notify_message(fmt_ok!(
notify(fmt_ok!(
"Generated a new Identity named {}.",
color_primary(named_identity.name())
));
self.notify_message(fmt_log!(
notify(fmt_log!(
"{} has Identifier {}",
color_primary(named_identity.name()),
color_primary(named_identity.identifier().to_string())
));
if named_identity.is_default() {
self.notify_message(fmt_ok!(
notify(fmt_ok!(
"Marked {} as your default Identity, on this machine.\n",
color_primary(named_identity.name())
));
Expand Down Expand Up @@ -260,7 +261,7 @@ impl CliState {
Some(named_identity) => Ok(named_identity),
// Create a new default identity.
None => {
self.notify_message(fmt_log!(
notify(fmt_log!(
"There is no default Identity on this machine, generating one...\n"
));
self.create_identity_with_name(&random_name()).await
Expand Down
22 changes: 12 additions & 10 deletions implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use crate::cli_state::{random_name, NamedVault, Result};
use crate::cli_state::{CliState, CliStateError};
use crate::colors::color_primary;
use crate::config::lookup::InternetAddress;
use colorful::Colorful;
use minicbor::{CborLen, Decode, Encode};
use nix::errno::Errno;
Expand All @@ -6,6 +10,9 @@ use ockam::identity::utils::now;
use ockam::identity::Identifier;
use ockam::tcp::TcpListener;
use ockam_core::errcode::{Kind, Origin};
use ockam_core::notifier::{
notify, notify_end_spinner, notify_end_spinner_and_clear, notify_with_spinner,
};
use ockam_core::Error;
use ockam_multiaddr::proto::{DnsAddr, Node, Tcp};
use ockam_multiaddr::MultiAddr;
Expand All @@ -16,11 +23,6 @@ use std::process;
use std::time::Duration;
use sysinfo::{Pid, ProcessStatus, ProcessesToUpdate, System};

use crate::cli_state::{random_name, NamedVault, Result};
use crate::cli_state::{CliState, CliStateError};
use crate::colors::color_primary;
use crate::config::lookup::InternetAddress;

use crate::{fmt_warn, ConnectionStatus};

/// The methods below support the creation and update of local nodes
Expand Down Expand Up @@ -116,7 +118,7 @@ impl CliState {
let nodes = self.nodes_repository().get_nodes().await?;
for node in nodes {
if let Err(err) = self.delete_node(&node.name()).await {
self.notify_message(fmt_warn!(
notify(fmt_warn!(
"Failed to delete the node {}: {err}",
color_primary(node.name())
));
Expand Down Expand Up @@ -177,7 +179,7 @@ impl CliState {
error!(name=%node_name, %pid, %e, "failed to stop node process with SIGKILL");
return Err(e);
} else {
self.notify_progress_finish(format!(
notify_end_spinner(format!(
"The node {} has been stopped",
color_primary(node_name),
));
Expand Down Expand Up @@ -214,12 +216,12 @@ impl CliState {
// Return if max attempts have been reached
if attempts > max_attempts {
warn!(name = %node_name, %pid, %signal, "node process did not exit");
self.notify_progress_finish_and_clear();
notify_end_spinner_and_clear();
return Err(err);
}
// Notify the user that the node is stopping if it takes too long
if attempts == show_message_at_attempt {
self.notify_progress(format!(
notify_with_spinner(format!(
"Waiting for node's {} process {} to stop",
color_primary(node_name),
color_primary(pid)
Expand All @@ -230,7 +232,7 @@ impl CliState {
}
}
}
self.notify_progress_finish_and_clear();
notify_end_spinner_and_clear();
Ok(())
}

Expand Down
15 changes: 8 additions & 7 deletions implementations/rust/ockam/ockam_api/src/cli_state/vaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{fmt_log, fmt_ok, fmt_warn};
use colorful::Colorful;
use ockam::identity::{Identities, Vault};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::notifier::notify;
use ockam_node::database::SqlxDatabase;
use ockam_vault_aws::AwsSigningVault;
use std::fmt::Write;
Expand Down Expand Up @@ -124,7 +125,7 @@ impl CliState {
.delete_identity(&identity.name())
.await
{
self.notify_message(fmt_warn!(
notify(fmt_warn!(
"Failed to delete the identity {}: {err}",
color_primary(identity.name())
));
Expand All @@ -139,7 +140,7 @@ impl CliState {
let vaults = self.vaults_repository().get_named_vaults().await?;
for vault in vaults {
if let Err(err) = self.delete_named_vault(&vault.name()).await {
self.notify_message(fmt_warn!(
notify(fmt_warn!(
"Failed to delete the vault {}: {err}",
color_primary(vault.name())
));
Expand Down Expand Up @@ -182,7 +183,7 @@ impl CliState {
return Ok(existing_vault);
}

self.notify_message(fmt_log!(
notify(fmt_log!(
"This Identity needs a Vault to store its secrets."
));
let named_vault = if self
Expand All @@ -191,13 +192,13 @@ impl CliState {
.await?
.is_none()
{
self.notify_message(fmt_log!(
notify(fmt_log!(
"There is no default Vault on this machine, creating one..."
));
let vault = self
.create_database_vault(vault_name.to_string(), UseAwsKms::No)
.await?;
self.notify_message(fmt_ok!(
notify(fmt_ok!(
"Created a new Vault named {}.",
color_primary(vault_name)
));
Expand All @@ -210,15 +211,15 @@ impl CliState {
UseAwsKms::No,
)
.await?;
self.notify_message(fmt_ok!(
notify(fmt_ok!(
"Created a new Vault named {} on your disk.",
color_primary(vault_name)
));
vault
};

if named_vault.is_default() {
self.notify_message(fmt_ok!(
notify(fmt_ok!(
"Marked this new Vault as your default Vault, on this machine.\n"
));
}
Expand Down
48 changes: 22 additions & 26 deletions implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,6 @@ use std::time::Duration;
use colorful::Colorful;
use miette::IntoDiagnostic;

use ockam::identity::models::CredentialAndPurposeKey;
use ockam::identity::Identifier;
use ockam::remote::{RemoteRelay, RemoteRelayOptions};
use ockam::Result;
use ockam_core::api::{Error, Request, RequestHeader, Response};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::{async_trait, Address, TryClone};
use ockam_multiaddr::MultiAddr;
use ockam_node::compat::asynchronous::Mutex as AsyncMutex;
use ockam_node::compat::asynchronous::Mutex;
use ockam_node::Context;

use super::{NodeManager, NodeManagerWorker};
use crate::colors::color_primary;
use crate::nodes::connection::Connection;
Expand All @@ -30,6 +18,18 @@ use crate::nodes::BackgroundNodeClient;
use crate::session::replacer::{ReplacerOutcome, ReplacerOutputKind, SessionReplacer};
use crate::session::session::Session;
use crate::{fmt_info, fmt_ok, fmt_warn};
use ockam::identity::models::CredentialAndPurposeKey;
use ockam::identity::Identifier;
use ockam::remote::{RemoteRelay, RemoteRelayOptions};
use ockam::Result;
use ockam_core::api::{Error, Request, RequestHeader, Response};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::notifier::notify;
use ockam_core::{async_trait, Address, TryClone};
use ockam_multiaddr::MultiAddr;
use ockam_node::compat::asynchronous::Mutex as AsyncMutex;
use ockam_node::compat::asynchronous::Mutex;
use ockam_node::Context;

impl NodeManagerWorker {
pub async fn create_relay(
Expand Down Expand Up @@ -377,23 +377,19 @@ impl SessionReplacer for RelaySessionReplacer {
}

async fn on_session_down(&self) {
if let Some(node_manager) = self.node_manager.upgrade() {
node_manager.cli_state.notify_message(
fmt_warn!(
"The Node lost the connection to the Relay at {}\n",
color_primary(&self.addr)
) + &fmt_info!("Attempting to reconnect...\n"),
);
}
notify(
fmt_warn!(
"The Node lost the connection to the Relay at {}\n",
color_primary(&self.addr)
) + &fmt_info!("Attempting to reconnect...\n"),
);
}

async fn on_session_replaced(&self) {
if let Some(node_manager) = self.node_manager.upgrade() {
node_manager.cli_state.notify_message(fmt_ok!(
"The Node has restored the connection to the Relay at {}\n",
color_primary(&self.addr)
));
}
notify(fmt_ok!(
"The Node has restored the connection to the Relay at {}\n",
color_primary(&self.addr)
));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use ockam::udp::{UdpPuncture, UdpPunctureNegotiation, UdpTransport};
use ockam::Result;
use ockam_abac::{Action, PolicyExpression, Resource};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::notifier::notify;
use ockam_core::{async_trait, route, Error, IncomingAccessControl, OutgoingAccessControl, Route};
use ockam_multiaddr::proto::Project as ProjectProto;
use ockam_multiaddr::MultiAddr;
Expand Down Expand Up @@ -299,25 +300,21 @@ impl SessionReplacer for InletSessionReplacer {
}

async fn on_session_down(&self) {
if let Some(node_manager) = self.node_manager.upgrade() {
node_manager.cli_state.notify_message(
fmt_warn!(
"The TCP Inlet at {} lost the connection to the TCP Outlet at {}\n",
color_primary(&self.listen_addr),
color_primary(&self.outlet_addr)
) + &fmt_info!("Attempting to reconnect...\n"),
);
}
notify(
fmt_warn!(
"The TCP Inlet at {} lost the connection to the TCP Outlet at {}\n",
color_primary(&self.listen_addr),
color_primary(&self.outlet_addr)
) + &fmt_info!("Attempting to reconnect...\n"),
);
}

async fn on_session_replaced(&self) {
if let Some(node_manager) = self.node_manager.upgrade() {
node_manager.cli_state.notify_message(fmt_ok!(
"The TCP Inlet at {} has restored the connection to the TCP Outlet at {}\n",
color_primary(&self.listen_addr),
color_primary(&self.outlet_addr)
));
}
notify(fmt_ok!(
"The TCP Inlet at {} has restored the connection to the TCP Outlet at {}\n",
color_primary(&self.listen_addr),
color_primary(&self.outlet_addr)
));
}
}

Expand Down
Loading
Loading