Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
chore: adding metric channels
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Apr 19, 2024
1 parent 4baa901 commit 42bde65
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 42 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/topos-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ edition = "2021"
workspace = true

[dependencies]
futures.workspace = true
lazy_static.workspace = true
prometheus.workspace = true
tokio = { workspace = true, features = ["full"] }
89 changes: 89 additions & 0 deletions crates/topos-metrics/src/channels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
pub mod mpsc {
use futures::{FutureExt, TryFutureExt};
use prometheus::IntGauge;
use tokio::sync::mpsc::error::TrySendError;
pub mod error {
pub use tokio::sync::mpsc::error::*;
}
#[derive(Debug)]
pub struct Sender<T> {
inner: tokio::sync::mpsc::Sender<T>,
gauge: IntGauge,
}

impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
gauge: self.gauge.clone(),
}
}
}

impl<T> Sender<T> {
pub async fn send(&self, value: T) -> Result<(), tokio::sync::mpsc::error::SendError<T>> {
self.inner
.send(value)
.inspect_ok(|_| self.gauge.inc())
.await
}

pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
self.inner
.try_send(message)
// remove this unsightly hack once https://github.com/rust-lang/rust/issues/91345 is resolved
.map(|val| {
self.gauge.inc();
val
})
}
}

pub struct Receiver<T> {
inner: tokio::sync::mpsc::Receiver<T>,
gauge: IntGauge,
}

impl<T> Receiver<T> {
pub fn inner(self) -> tokio::sync::mpsc::Receiver<T> {
self.inner
}

pub async fn recv(&mut self) -> Option<T> {
self.inner
.recv()
.inspect(|opt| {
if opt.is_some() {
self.gauge.dec();
}
})
.await
}

pub fn try_recv(&mut self) -> Result<T, tokio::sync::mpsc::error::TryRecvError> {
self.inner.try_recv().map(|val| {
self.gauge.dec();
val
})
}
}

pub fn channel<T>(size: usize, gauge: &IntGauge) -> (Sender<T>, Receiver<T>) {
// Reset the gauge
gauge.set(0);

let (sender, receiver) = tokio::sync::mpsc::channel(size);

(
Sender {
inner: sender,
gauge: gauge.clone(),
},
Receiver {
inner: receiver,
gauge: gauge.clone(),
},
)
}
}

20 changes: 20 additions & 0 deletions crates/topos-metrics/src/double_echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,26 @@ use lazy_static::lazy_static;
use crate::TOPOS_METRIC_REGISTRY;

lazy_static! {
pub static ref DOUBLE_ECHO_EVENT_CHANNEL: IntGauge = register_int_gauge_with_registry!(
"double_echo_event_channel",
"Gauge for the double echo event channel.",
TOPOS_METRIC_REGISTRY
)
.unwrap();
pub static ref DOUBLE_CLIENT_TO_DOUBLE_ECHO_CHANNEL: IntGauge =
register_int_gauge_with_registry!(
"double_client_to_double_echo_channel",
"Gauge for the double echo command channel between client and double echo.",
TOPOS_METRIC_REGISTRY
)
.unwrap();
pub static ref DOUBLE_ECHO_TO_TASK_MANAGER_CHANNEL: IntGauge =
register_int_gauge_with_registry!(
"double_echo_to_task_manager_channel",
"Gauge for the double echo command channel between double echo and task manager.",
TOPOS_METRIC_REGISTRY
)
.unwrap();
pub static ref DOUBLE_ECHO_ACTIVE_TASKS_COUNT: IntGauge = register_int_gauge_with_registry!(
"double_echo_active_tasks_count",
"Number of active tasks in the double echo.",
Expand Down
1 change: 1 addition & 0 deletions crates/topos-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use lazy_static::lazy_static;
use std::collections::hash_map::HashMap;

mod api;
pub mod channels;
mod double_echo;
mod p2p;
mod storage;
Expand Down
13 changes: 10 additions & 3 deletions crates/topos-tce-broadcast/benches/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use tokio::sync::{broadcast, mpsc, oneshot};
use topos_config::tce::broadcast::ReliableBroadcastParams;
use topos_core::types::ValidatorId;
use topos_crypto::messages::MessageSigner;
use topos_metrics::{
DOUBLE_CLIENT_TO_DOUBLE_ECHO_CHANNEL, DOUBLE_ECHO_EVENT_CHANNEL,
DOUBLE_ECHO_TO_TASK_MANAGER_CHANNEL,
};
use topos_tce_broadcast::double_echo::DoubleEcho;
use topos_tce_storage::validator::ValidatorStore;
use topos_test_sdk::certificates::create_certificate_chain;
Expand All @@ -19,12 +23,15 @@ struct TceParams {
}

pub async fn processing_double_echo(n: u64, validator_store: Arc<ValidatorStore>) {
let (_cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE);
let (event_sender, _event_receiver) = mpsc::channel(CHANNEL_SIZE);
let (_cmd_sender, cmd_receiver) =
topos_metrics::channels::mpsc::channel(CHANNEL_SIZE, &DOUBLE_CLIENT_TO_DOUBLE_ECHO_CHANNEL);
let (event_sender, _event_receiver) =
topos_metrics::channels::mpsc::channel(CHANNEL_SIZE, &DOUBLE_ECHO_EVENT_CHANNEL);
let (broadcast_sender, mut broadcast_receiver) = broadcast::channel(CHANNEL_SIZE);
let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) =
mpsc::channel::<oneshot::Sender<()>>(1);
let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(CHANNEL_SIZE);
let (task_manager_message_sender, task_manager_message_receiver) =
topos_metrics::channels::mpsc::channel(CHANNEL_SIZE, &DOUBLE_ECHO_TO_TASK_MANAGER_CHANNEL);

let params = TceParams {
nb_peers: 10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::event::ProtocolEvents;
use crate::sampler::SubscriptionsView;
use std::sync::Arc;
use std::{collections::HashSet, time};
use tokio::sync::mpsc;
use topos_core::{
types::{
stream::{CertificateSourceStreamPosition, Position},
Expand All @@ -11,6 +10,7 @@ use topos_core::{
uci::Certificate,
};
use topos_crypto::messages::MessageSigner;
use topos_metrics::channels::mpsc;
use topos_metrics::DOUBLE_ECHO_BROADCAST_FINISHED_TOTAL;
use tracing::{debug, error, info, trace};
mod status;
Expand Down
7 changes: 4 additions & 3 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ use crate::event::ProtocolEvents;
use crate::{DoubleEchoCommand, SubscriptionsView};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::sync::{broadcast, oneshot};
use tokio_util::sync::CancellationToken;
use topos_config::tce::broadcast::ReliableBroadcastParams;
use topos_core::{types::ValidatorId, uci::CertificateId};
use topos_crypto::messages::{MessageSigner, Signature};
use topos_metrics::channels::mpsc;
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::validator::ValidatorStore;
use tracing::{debug, info, warn};
Expand All @@ -42,7 +43,7 @@ pub struct DoubleEcho {
/// Channel to send events
event_sender: mpsc::Sender<ProtocolEvents>,
/// Channel to receive shutdown signal
pub(crate) shutdown: mpsc::Receiver<oneshot::Sender<()>>,
pub(crate) shutdown: tokio::sync::mpsc::Receiver<oneshot::Sender<()>>,
/// The threshold parameters for the double echo
pub params: ReliableBroadcastParams,
/// The connection to the TaskManager to forward DoubleEchoCommand messages
Expand Down Expand Up @@ -73,7 +74,7 @@ impl DoubleEcho {
task_manager_message_sender: mpsc::Sender<DoubleEchoCommand>,
command_receiver: mpsc::Receiver<DoubleEchoCommand>,
event_sender: mpsc::Sender<ProtocolEvents>,
shutdown: mpsc::Receiver<oneshot::Sender<()>>,
shutdown: tokio::sync::mpsc::Receiver<oneshot::Sender<()>>,
validator_store: Arc<ValidatorStore>,
broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,
) -> Self {
Expand Down
31 changes: 22 additions & 9 deletions crates/topos-tce-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ use std::collections::HashSet;
use std::sync::Arc;
use thiserror::Error;
use tokio::spawn;
use tokio::sync::mpsc::Sender;
use tokio::sync::{broadcast, mpsc, oneshot};
use topos_metrics::{
DOUBLE_CLIENT_TO_DOUBLE_ECHO_CHANNEL, DOUBLE_ECHO_EVENT_CHANNEL,
DOUBLE_ECHO_TO_TASK_MANAGER_CHANNEL,
};
// use tokio::sync::mpsc::Sender;
use topos_metrics::channels::mpsc;
use topos_metrics::channels::mpsc::Sender;

use tokio::sync::{broadcast, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use topos_config::tce::broadcast::ReliableBroadcastParams;
use topos_core::types::ValidatorId;
Expand Down Expand Up @@ -110,7 +117,7 @@ pub enum DoubleEchoCommand {
#[derive(Clone, Debug)]
pub struct ReliableBroadcastClient {
command_sender: Sender<DoubleEchoCommand>,
pub(crate) double_echo_shutdown_channel: Sender<oneshot::Sender<()>>,
pub(crate) double_echo_shutdown_channel: tokio::sync::mpsc::Sender<oneshot::Sender<()>>,
}

impl ReliableBroadcastClient {
Expand All @@ -123,13 +130,19 @@ impl ReliableBroadcastClient {
validator_store: Arc<ValidatorStore>,
broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,
) -> (Self, impl Stream<Item = ProtocolEvents>) {
let (event_sender, event_receiver) = mpsc::channel(*constant::PROTOCOL_CHANNEL_SIZE);
let (command_sender, command_receiver) = mpsc::channel(*constant::COMMAND_CHANNEL_SIZE);
let (event_sender, event_receiver) =
mpsc::channel(*constant::PROTOCOL_CHANNEL_SIZE, &DOUBLE_ECHO_EVENT_CHANNEL);
let (command_sender, command_receiver) = mpsc::channel(
*constant::COMMAND_CHANNEL_SIZE,
&DOUBLE_CLIENT_TO_DOUBLE_ECHO_CHANNEL,
);
let (double_echo_shutdown_channel, double_echo_shutdown_receiver) =
mpsc::channel::<oneshot::Sender<()>>(1);
tokio::sync::mpsc::channel::<oneshot::Sender<()>>(1);

let (task_manager_message_sender, task_manager_message_receiver) =
mpsc::channel(*constant::BROADCAST_TASK_MANAGER_CHANNEL_SIZE);
let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(
*constant::BROADCAST_TASK_MANAGER_CHANNEL_SIZE,
&DOUBLE_ECHO_TO_TASK_MANAGER_CHANNEL,
);

let double_echo = DoubleEcho::new(
config.tce_params,
Expand All @@ -155,7 +168,7 @@ impl ReliableBroadcastClient {
command_sender,
double_echo_shutdown_channel,
},
ReceiverStream::new(event_receiver),
ReceiverStream::new(event_receiver.inner()),
)
}

Expand Down
5 changes: 3 additions & 2 deletions crates/topos-tce-broadcast/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::spawn;
use tokio::sync::broadcast;
use tokio::{spawn, sync::mpsc};
use tokio_util::sync::CancellationToken;
use topos_config::tce::broadcast::ReliableBroadcastParams;
use topos_core::types::ValidatorId;
use topos_core::uci::Certificate;
use topos_core::uci::CertificateId;
use topos_metrics::channels::mpsc;
use topos_metrics::CERTIFICATE_PROCESSING_FROM_API_TOTAL;
use topos_metrics::CERTIFICATE_PROCESSING_FROM_GOSSIP_TOTAL;
use topos_metrics::CERTIFICATE_PROCESSING_TOTAL;
Expand Down Expand Up @@ -218,7 +219,7 @@ impl TaskManager {
fn start_task(
running_tasks: &RunningTasks,
task: Task,
sink: mpsc::Sender<DoubleEchoCommand>,
sink: tokio::sync::mpsc::Sender<DoubleEchoCommand>,
messages: Option<Vec<DoubleEchoCommand>>,
need_gossip: bool,
) {
Expand Down
14 changes: 7 additions & 7 deletions crates/topos-tce-broadcast/src/task_manager/task.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::broadcast;

use topos_core::types::stream::Position;
use topos_core::uci::CertificateId;
Expand All @@ -16,16 +16,16 @@ use crate::{DoubleEchoCommand, TaskStatus};

#[derive(Debug)]
pub struct TaskContext {
pub sink: mpsc::Sender<DoubleEchoCommand>,
pub shutdown_sender: mpsc::Sender<()>,
pub sink: tokio::sync::mpsc::Sender<DoubleEchoCommand>,
pub shutdown_sender: tokio::sync::mpsc::Sender<()>,
}

pub struct Task {
pub validator_store: Arc<ValidatorStore>,
pub message_receiver: mpsc::Receiver<DoubleEchoCommand>,
pub message_receiver: tokio::sync::mpsc::Receiver<DoubleEchoCommand>,
pub certificate_id: CertificateId,
pub broadcast_state: BroadcastState,
pub shutdown_receiver: mpsc::Receiver<()>,
pub shutdown_receiver: tokio::sync::mpsc::Receiver<()>,
broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,
}

Expand All @@ -36,8 +36,8 @@ impl Task {
validator_store: Arc<ValidatorStore>,
broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,
) -> (Task, TaskContext) {
let (message_sender, message_receiver) = mpsc::channel(10_024);
let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);
let (message_sender, message_receiver) = tokio::sync::mpsc::channel(10_024);
let (shutdown_sender, shutdown_receiver) = tokio::sync::mpsc::channel(1);

let task_context = TaskContext {
sink: message_sender,
Expand Down
16 changes: 11 additions & 5 deletions crates/topos-tce-broadcast/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{broadcast, mpsc, oneshot};
use topos_config::tce::broadcast::ReliableBroadcastParams;
use topos_core::uci::Certificate;
use topos_crypto::messages::MessageSigner;
use topos_crypto::validator_id::ValidatorId;
use topos_metrics::{
DOUBLE_CLIENT_TO_DOUBLE_ECHO_CHANNEL, DOUBLE_ECHO_EVENT_CHANNEL,
DOUBLE_ECHO_TO_TASK_MANAGER_CHANNEL,
};
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::validator::ValidatorStore;
use topos_test_sdk::constants::*;
Expand Down Expand Up @@ -53,18 +56,21 @@ struct TceParams {
}

struct Context {
event_receiver: Receiver<ProtocolEvents>,
event_receiver: topos_metrics::channels::mpsc::Receiver<ProtocolEvents>,
broadcast_receiver: broadcast::Receiver<CertificateDeliveredWithPositions>,
validator_store: Arc<ValidatorStore>,
}

async fn create_context(params: TceParams) -> (DoubleEcho, Context) {
let validator_store = create_validator_store::default().await;
let (_cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE);
let (event_sender, event_receiver) = mpsc::channel(CHANNEL_SIZE);
let (_cmd_sender, cmd_receiver) =
topos_metrics::channels::mpsc::channel(CHANNEL_SIZE, &DOUBLE_CLIENT_TO_DOUBLE_ECHO_CHANNEL);
let (event_sender, event_receiver) =
topos_metrics::channels::mpsc::channel(CHANNEL_SIZE, &DOUBLE_ECHO_EVENT_CHANNEL);
let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) =
mpsc::channel::<oneshot::Sender<()>>(1);
let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(CHANNEL_SIZE);
let (task_manager_message_sender, task_manager_message_receiver) =
topos_metrics::channels::mpsc::channel(CHANNEL_SIZE, &DOUBLE_ECHO_TO_TASK_MANAGER_CHANNEL);

let message_signer = Arc::new(MessageSigner::from_str(PRIVATE_KEY).unwrap());

Expand Down
Loading

0 comments on commit 42bde65

Please sign in to comment.