Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
chore: bootstrapping task-manager through feature flags (#266)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
Co-authored-by: Simon Paitrault <[email protected]>
  • Loading branch information
gruberb and Freyskeyd authored Jul 26, 2023
1 parent 0fa742a commit 6ea6db6
Show file tree
Hide file tree
Showing 16 changed files with 583 additions and 820 deletions.
2 changes: 1 addition & 1 deletion crates/topos-tce-broadcast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ rand.workspace = true
topos-test-sdk = { path = "../topos-test-sdk/" }

[features]
default = []
task-manager-channels = []

[[bench]]
name = "double_echo"
Expand Down
20 changes: 4 additions & 16 deletions crates/topos-tce-broadcast/benches/double_echo.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};

mod task_manager_channels;
mod task_manager_futures;
mod task_manager;

pub fn criterion_benchmark(c: &mut Criterion) {
let echo_messages = 10;
let certificates = 10_000;

let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

c.bench_function("double_echo with channels", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async {
task_manager_channels::processing_double_echo(echo_messages).await
})
})
});

c.bench_function("double_echo with futures", |b| {
c.bench_function("double_echo", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async {
task_manager_futures::processing_double_echo(echo_messages).await
})
runtime.block_on(async { task_manager::processing_double_echo(certificates).await })
})
});
}
Expand Down
118 changes: 118 additions & 0 deletions crates/topos-tce-broadcast/benches/task_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::collections::HashSet;
use tce_transport::{ProtocolEvents, ReliableBroadcastParams};
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use topos_tce_broadcast::double_echo::DoubleEcho;
use topos_tce_broadcast::sampler::SubscriptionsView;
use topos_test_sdk::certificates::create_certificate_chain;
use topos_test_sdk::constants::{SOURCE_SUBNET_ID_1, TARGET_SUBNET_ID_1};

const CHANNEL_SIZE: usize = 256_000;

struct TceParams {
nb_peers: usize,
broadcast_params: ReliableBroadcastParams,
}

struct Context {
event_receiver: Receiver<ProtocolEvents>,
}

pub async fn processing_double_echo(n: u64) {
let (subscriptions_view_sender, subscriptions_view_receiver) = mpsc::channel(CHANNEL_SIZE);

let (_cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE);
let (event_sender, event_receiver) = mpsc::channel(CHANNEL_SIZE);
let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) =
mpsc::channel::<oneshot::Sender<()>>(1);
let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(CHANNEL_SIZE);

let params = TceParams {
nb_peers: 10,
broadcast_params: ReliableBroadcastParams {
echo_threshold: 8,
ready_threshold: 5,
delivery_threshold: 8,
},
};

let mut ctx = Context { event_receiver };

let mut double_echo = DoubleEcho::new(
params.broadcast_params,
task_manager_message_sender.clone(),
cmd_receiver,
event_sender,
double_echo_shutdown_receiver,
0,
);

// List of peers
let mut peers = HashSet::new();
for i in 0..params.nb_peers {
let peer = topos_p2p::utils::local_key_pair(Some(i as u8))
.public()
.to_peer_id();
peers.insert(peer);
}

// Subscriptions
double_echo.subscriptions.echo = peers.clone();
double_echo.subscriptions.ready = peers.clone();
double_echo.subscriptions.network_size = params.nb_peers;

let msg = SubscriptionsView {
echo: peers.clone(),
ready: peers.clone(),
network_size: params.nb_peers,
};

subscriptions_view_sender.send(msg).await.unwrap();

double_echo.spawn_task_manager(subscriptions_view_receiver, task_manager_message_receiver);

let certificates =
create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], n as usize);

let double_echo_selected_echo = double_echo
.subscriptions
.echo
.iter()
.take(double_echo.params.echo_threshold)
.cloned()
.collect::<Vec<_>>();

let double_echo_selected_ready = double_echo
.subscriptions
.ready
.iter()
.take(double_echo.params.delivery_threshold)
.cloned()
.collect::<Vec<_>>();

for cert in &certificates {
double_echo.broadcast(cert.clone(), true).await;
}

for cert in &certificates {
for p in &double_echo_selected_echo {
double_echo.handle_echo(*p, cert.id).await;
}

for p in &double_echo_selected_ready {
double_echo.handle_ready(*p, cert.id).await;
}
}

let mut count = 0;

while let Some(event) = ctx.event_receiver.recv().await {
if let ProtocolEvents::CertificateDelivered { .. } = event {
count += 1;

if count == n {
break;
}
}
}
}
61 changes: 0 additions & 61 deletions crates/topos-tce-broadcast/benches/task_manager_channels.rs

This file was deleted.

62 changes: 0 additions & 62 deletions crates/topos-tce-broadcast/benches/task_manager_futures.rs

This file was deleted.

24 changes: 24 additions & 0 deletions crates/topos-tce-broadcast/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@ lazy_static! {
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Size of the channel between double echo and the task manager
pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize =
std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(20_480);
/// Size of the channel to send protocol events from the double echo
pub static ref PROTOCOL_CHANNEL_SIZE: usize =
std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Size of the channel to send updated subscriptions views to the double echo
pub static ref SUBSCRIPTION_VIEW_CHANNEL_SIZE: usize =
std::env::var("TOPOS_SUBSCRIPTION_VIEW_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Size of the channel to send updated subscriptions views to the double echo
pub static ref BROADCAST_TASK_COMPLETION_CHANNEL_SIZE: usize =
std::env::var("BROADCAST_TASK_COMPLETION_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Capacity alert threshold for the double echo command channel
pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE
.checked_mul(10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod status;

pub use status::Status;

#[derive(Debug)]
pub struct BroadcastState {
subscriptions_view: SubscriptionsView,
status: Status,
Expand Down Expand Up @@ -97,7 +98,9 @@ impl BroadcastState {
let event = ProtocolEvents::Ready {
certificate_id: self.certificate.id,
};
self.event_sender.try_send(event).unwrap();
if let Err(e) = self.event_sender.try_send(event) {
warn!("Error sending Ready message: {}", e);
}

self.status = self.status.ready_sent();

Expand Down
Loading

0 comments on commit 6ea6db6

Please sign in to comment.