Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
chore: refactor logs and fix typo (#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
hadjiszs authored Mar 5, 2024
1 parent 2c73f0b commit 8044310
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 94 deletions.
14 changes: 4 additions & 10 deletions crates/topos-certificate-spammer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async fn open_target_node_connection(
Ok(value) => value,
Err(e) => {
error!(
"Unable to create TCE client for node {}, error details: {}",
"Unable to create TCE client for node {}: {}",
&tce_address, e
);
return Err(Error::TCENodeConnection(e));
Expand All @@ -90,10 +90,7 @@ async fn open_target_node_connection(
match tce_client.open_stream(Vec::new()).await {
Ok(_) => {}
Err(e) => {
error!(
"Unable to connect to node {}, error details: {}",
&tce_address, e
);
error!("Unable to connect to node {}: {}", &tce_address, e);
return Err(Error::TCENodeConnection(e));
}
}
Expand Down Expand Up @@ -150,7 +147,7 @@ async fn close_target_node_connections(
{
info!("Closing connection to target node {}", target_node.address);
if let Err(e) = target_node.shutdown().await {
error!("Error shutting down connection {e}");
error!("Failed to close stream with {}: {e}", target_node.address);
}
}
}
Expand All @@ -176,10 +173,7 @@ async fn send_new_certificate(tce_client: &mut TceClient, cert: Certificate) {
.instrument(Span::current())
.await
{
error!(
"failed to pass certificate to tce client, error details: {}",
e
);
error!("Failed to send the Certificate to the TCE client: {}", e);
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/topos-core/src/api/graphql/subnet.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_graphql::NewType;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use tracing::{error, warn};
use tracing::error;

use super::errors::GraphQLServerError;

Expand Down Expand Up @@ -30,7 +30,7 @@ impl PartialEq<crate::uci::SubnetId> for SubnetId {
if let Ok(current) = crate::uci::SubnetId::from_str(&self.0) {
other.as_array().eq(current.as_array())
} else {
warn!("Unexpected parsing error for subnet id during comparaison");
error!("Failed to parse the subnet id {} during comparison", self.0);
false
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-p2p/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub enum P2PError {
#[error(transparent)]
CommandError(#[from] CommandExecutionError),

#[error("An error occured on the Transport layer: {0}")]
#[error("An error occurred on the Transport layer: {0}")]
TransportError(#[from] TransportError<io::Error>),

#[error("Unable to receive expected response of a oneshot channel")]
Expand Down
8 changes: 4 additions & 4 deletions crates/topos-sequencer/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ impl AppContext {
match tce_evt {
TceProxyEvent::TceServiceFailure | TceProxyEvent::WatchCertificatesChannelFailed => {
// Unrecoverable failure in interaction with the TCE. Sequencer needs to be restarted
warn!(
error!(
"Unrecoverable failure in sequencer <-> tce interaction. Shutting down sequencer \
sequencer..."
);
if let Err(e) = self.shutdown().await {
warn!("Error happened during shutdown: {e:?}");
warn!("Failed to shutdown: {e:?}");
}
warn!("Shutdown finished, restarting sequencer...");
info!("Shutdown finished, restarting sequencer...");
return AppContextStatus::Restarting;
},
_ => self.on_tce_proxy_event(tce_evt).await,
Expand All @@ -82,7 +82,7 @@ impl AppContext {
_ = shutdown.0.cancelled() => {
info!("Shutting down Sequencer app context...");
if let Err(e) = self.shutdown().await {
error!("Error shutting down Sequencer app context: {e}");
error!("Failed to shutdown the Sequencer app context: {e}");
}
// Drop the sender to notify the Sequencer termination
drop(shutdown.1);
Expand Down
54 changes: 29 additions & 25 deletions crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use topos_core::{
};
use topos_crypto::messages::MessageSigner;
use topos_metrics::DOUBLE_ECHO_BROADCAST_FINISHED_TOTAL;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, trace};
mod status;

pub use status::Status;
Expand Down Expand Up @@ -66,7 +66,10 @@ impl BroadcastState {
});

if need_gossip {
warn!("📣 Gossiping the Certificate {}", &state.certificate.id);
debug!(
"📣 Gossiping the Certificate {} from the source subnet {}",
&state.certificate.id, &state.certificate.source_subnet_id
);
let _ = state.event_sender.try_send(ProtocolEvents::Gossip {
cert: state.certificate.clone(),
});
Expand Down Expand Up @@ -118,7 +121,7 @@ impl BroadcastState {
}

fn update_status(&mut self) -> Option<Status> {
// Nothing happened yet, we're in the initial state and didn't Procced
// Nothing happened yet, we're in the initial state and didn't process
// any Echo or Ready messages
// Sending our Echo message
if let Status::Pending = self.status {
Expand All @@ -133,9 +136,10 @@ impl BroadcastState {
});

self.status = Status::EchoSent;
debug!(
"📝 Certificate {} is now {}",
&self.certificate.id, self.status
trace!(
"Certificate {} is now {}",
&self.certificate.id,
self.status
);
return Some(self.status);
}
Expand All @@ -156,14 +160,15 @@ impl BroadcastState {
validator_id: self.validator_id,
};
if let Err(e) = self.event_sender.try_send(event) {
warn!("Error sending Ready message: {}", e);
error!("Failed to send the Ready message: {}", e);
}

self.status = self.status.ready_sent();

debug!(
"📝 Certificate {} is now {}",
&self.certificate.id, self.status
trace!(
"Certificate {} is now {}",
&self.certificate.id,
self.status
);
return Some(self.status);
}
Expand All @@ -173,25 +178,21 @@ impl BroadcastState {
if !self.status.is_delivered() && self.reached_delivery_threshold() {
self.status = self.status.delivered();

debug!(
"📝 Certificate {} is now {}",
&self.certificate.id, self.status
trace!(
"Certificate {} is now {}",
&self.certificate.id,
self.status
);
// Calculate delivery time
let from = self.delivery_time;
let duration = from.elapsed();
let d = duration;

info!(
"Certificate {} got delivered in {:?}",
"📝 Certificate delivered {} with broadcast duration: {:?}",
self.certificate.id, d
);

debug!(
"📝 Accepted[{}]\t Delivery time: {:?}",
&self.certificate.id, d
);

DOUBLE_ECHO_BROADCAST_FINISHED_TOTAL.inc();

return Some(self.status);
Expand Down Expand Up @@ -220,9 +221,11 @@ impl BroadcastState {
None => false,
};

debug!(
"📝 Certificate {} reached Echo threshold: {} and Ready threshold: {}",
&self.certificate.id, reached_echo_threshold, reached_ready_threshold
trace!(
"Certificate {} reached Echo threshold: {} and Ready threshold: {}",
&self.certificate.id,
reached_echo_threshold,
reached_ready_threshold
);
// If reached any of the Echo or Ready thresholds, I send the Ready
reached_echo_threshold || reached_ready_threshold
Expand All @@ -239,9 +242,10 @@ impl BroadcastState {
None => false,
};

debug!(
"📝 Certificate {} reached Delivery threshold: {}",
&self.certificate.id, delivery_threshold
trace!(
"Certificate {} reached Delivery threshold: {}",
&self.certificate.id,
delivery_threshold
);

delivery_threshold
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl DoubleEcho {
}

else => {
warn!("Break the tokio loop for the double echo");
debug!("Break the tokio loop for the double echo");
break None;
}
}
Expand All @@ -194,7 +194,7 @@ impl DoubleEcho {
info!("Shutting down p2p double echo...");
_ = sender.send(());
} else {
warn!("Shutting down p2p double echo due to error...");
debug!("Shutting down p2p double echo due to error...");
}
}
}
Expand Down
19 changes: 8 additions & 11 deletions crates/topos-tce-broadcast/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use topos_tce_storage::store::ReadStore;
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::validator::ValidatorStore;
use topos_tce_storage::PendingCertificateId;
use tracing::debug;
use tracing::error;
use tracing::warn;
use tracing::{debug, error, info, trace, warn};

pub mod task;

Expand Down Expand Up @@ -55,7 +53,6 @@ pub struct TaskManager {
pub validator_id: ValidatorId,
pub validator_store: Arc<ValidatorStore>,
pub broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,

pub latest_pending_id: PendingCertificateId,
}

Expand Down Expand Up @@ -108,7 +105,7 @@ impl TaskManager {
}
}
Err(error) => {
error!("Error while fetching pending certificates: {:?}", error);
error!("Failed to fetch the pending certificates: {:?}", error);
}
}
}
Expand Down Expand Up @@ -136,7 +133,7 @@ impl TaskManager {
};
}
DoubleEchoCommand::Broadcast { ref cert, need_gossip } => {
debug!("Received broadcast message for certificate {} ", cert.id);
trace!("Received broadcast message for certificate {} ", cert.id);

self.create_task(cert, need_gossip)
}
Expand All @@ -146,25 +143,25 @@ impl TaskManager {

Some((certificate_id, status)) = self.running_tasks.next() => {
if let TaskStatus::Success = status {
debug!("Task for certificate {} finished successfully", certificate_id);
trace!("Task for certificate {} finished successfully", certificate_id);
self.tasks.remove(&certificate_id);
DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec();

} else {
debug!("Task for certificate {} finished unsuccessfully", certificate_id);
error!("Task for certificate {} finished unsuccessfully", certificate_id);
}

self.next_pending_certificate();
}

_ = shutdown_receiver.cancelled() => {
warn!("Task Manager shutting down");
info!("Task Manager shutting down");

warn!("There are still {} active tasks", self.tasks.len());
debug!("Remaining active tasks: {:?}", self.tasks.len());
if !self.tasks.is_empty() {
debug!("Certificates still in broadcast: {:?}", self.tasks.keys());
}
warn!("There are still {} buffered messages", self.buffered_messages.len());
warn!("Remaining buffered messages: {}", self.buffered_messages.len());
for task in self.tasks.iter() {
task.1.shutdown_sender.send(()).await.unwrap();
}
Expand Down
16 changes: 9 additions & 7 deletions crates/topos-tce-broadcast/src/task_manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use topos_tce_storage::errors::StorageError;
use topos_tce_storage::store::{ReadStore, WriteStore};
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::validator::ValidatorStore;
use tracing::warn;
use tracing::{debug, error};

use crate::double_echo::broadcast_state::{BroadcastState, Status};
use crate::{DoubleEchoCommand, TaskStatus};
Expand Down Expand Up @@ -88,9 +88,11 @@ impl IntoFuture for Task {
Err(_) => return (self.certificate_id, TaskStatus::Failure),
};

warn!(
"Expected position for {} is {:?}",
self.certificate_id, expected_position
debug!(
"Expected position for Certificate {} is {:?} for the subnet {}",
self.certificate_id,
expected_position,
self.broadcast_state.certificate.source_subnet_id
);
self.broadcast_state.expected_position = Some(expected_position);

Expand All @@ -107,7 +109,7 @@ impl IntoFuture for Task {
return (self.certificate_id, TaskStatus::Success);
}
Err(error) => {
tracing::error!("Unable to persist one delivered certificate: {:?}", error);
error!("Unable to persist one delivered certificate: {:?}", error);
return (self.certificate_id, TaskStatus::Failure);
}
}
Expand All @@ -123,7 +125,7 @@ impl IntoFuture for Task {
return (self.certificate_id, TaskStatus::Success);
}
Err(error) => {
tracing::error!("Unable to persist one delivered certificate: {:?}", error);
error!("Unable to persist one delivered certificate: {:?}", error);
return (self.certificate_id, TaskStatus::Failure);
}
}
Expand All @@ -133,7 +135,7 @@ impl IntoFuture for Task {
}
}
_ = self.shutdown_receiver.recv() => {
warn!("Received shutdown, shutting down task {:?}", self.certificate_id);
debug!("Received shutdown, shutting down task {:?}", self.certificate_id);
return (self.certificate_id, TaskStatus::Failure)
}
}
Expand Down
Loading

0 comments on commit 8044310

Please sign in to comment.