Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

chore: bootstrapping task-manager through feature flags #266

Merged
merged 24 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1373de3
chore: adding task-manager feature flags
gruberb Jul 21, 2023
25b4983
fix: remove reduntant feature flag
gruberb Jul 21, 2023
5ee8db3
fix: make the taskmanager new function coherent
gruberb Jul 21, 2023
4fd6ff0
fix: cleanup task_managers and tasks
gruberb Jul 21, 2023
7fec500
chore: intatiate and start the task_manager
gruberb Jul 21, 2023
53986bf
Merge branch 'main' into feat/TP-644
gruberb Jul 21, 2023
cdda231
chore: add echo and ready workflow, wip
gruberb Jul 24, 2023
71c33b2
chore: move handle_x functions to taskmanager, wip
gruberb Jul 24, 2023
07f8678
chore: move event_sender to taskmanager
gruberb Jul 24, 2023
5d7ad60
chore: add workflow to futures task manager, wip
gruberb Jul 24, 2023
23f854f
fix: adjust task returns
gruberb Jul 24, 2023
e370f3a
fix: tests, wip
gruberb Jul 24, 2023
88550ea
fix: test errors, wip
gruberb Jul 24, 2023
6668fab
fix: debugging tests
gruberb Jul 25, 2023
a1d9b74
fix: add hint to restructure the broadcast, wip
gruberb Jul 25, 2023
757b3c1
chore: refactor task_manager tests
Freyskeyd Jul 25, 2023
4a8bbb7
fix: pr review, cleaning up tests and clippy messages
gruberb Jul 25, 2023
da16ac7
chore: add task-manager-channels to double-echo
gruberb Jul 25, 2023
d471220
fix: add constants for channel sizes
gruberb Jul 25, 2023
bfffea1
fix: add doc comments for double echo
gruberb Jul 25, 2023
e2c0b0e
chore: adapt task-manager benches to faeture flag
gruberb Jul 26, 2023
2b31655
fix: fix cargo xclippy
gruberb Jul 26, 2023
cc829c3
fix: cert-delivery test, forward subscription view
gruberb Jul 26, 2023
55b0530
fix: pr review
gruberb Jul 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/topos-tce-broadcast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ rand.workspace = true
topos-test-sdk = { path = "../topos-test-sdk/" }

[features]
default = []
task-manager-channels = []

[[bench]]
name = "double_echo"
Expand Down
20 changes: 4 additions & 16 deletions crates/topos-tce-broadcast/benches/double_echo.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};

mod task_manager_channels;
mod task_manager_futures;
mod task_manager;

pub fn criterion_benchmark(c: &mut Criterion) {
let echo_messages = 10;
let certificates = 10_000;

let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

c.bench_function("double_echo with channels", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async {
task_manager_channels::processing_double_echo(echo_messages).await
})
})
});

c.bench_function("double_echo with futures", |b| {
c.bench_function("double_echo", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async {
task_manager_futures::processing_double_echo(echo_messages).await
})
runtime.block_on(async { task_manager::processing_double_echo(certificates).await })
})
});
}
Expand Down
118 changes: 118 additions & 0 deletions crates/topos-tce-broadcast/benches/task_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::collections::HashSet;
use tce_transport::{ProtocolEvents, ReliableBroadcastParams};
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use topos_tce_broadcast::double_echo::DoubleEcho;
use topos_tce_broadcast::sampler::SubscriptionsView;
use topos_test_sdk::certificates::create_certificate_chain;
use topos_test_sdk::constants::{SOURCE_SUBNET_ID_1, TARGET_SUBNET_ID_1};

const CHANNEL_SIZE: usize = 256_000;

struct TceParams {
nb_peers: usize,
broadcast_params: ReliableBroadcastParams,
}

struct Context {
event_receiver: Receiver<ProtocolEvents>,
}

pub async fn processing_double_echo(n: u64) {
let (subscriptions_view_sender, subscriptions_view_receiver) = mpsc::channel(CHANNEL_SIZE);

let (_cmd_sender, cmd_receiver) = mpsc::channel(CHANNEL_SIZE);
let (event_sender, event_receiver) = mpsc::channel(CHANNEL_SIZE);
let (_double_echo_shutdown_sender, double_echo_shutdown_receiver) =
mpsc::channel::<oneshot::Sender<()>>(1);
let (task_manager_message_sender, task_manager_message_receiver) = mpsc::channel(CHANNEL_SIZE);

let params = TceParams {
nb_peers: 10,
broadcast_params: ReliableBroadcastParams {
echo_threshold: 8,
ready_threshold: 5,
delivery_threshold: 8,
},
};

let mut ctx = Context { event_receiver };

let mut double_echo = DoubleEcho::new(
params.broadcast_params,
task_manager_message_sender.clone(),
cmd_receiver,
event_sender,
double_echo_shutdown_receiver,
0,
);

// List of peers
let mut peers = HashSet::new();
for i in 0..params.nb_peers {
let peer = topos_p2p::utils::local_key_pair(Some(i as u8))
.public()
.to_peer_id();
peers.insert(peer);
}

// Subscriptions
double_echo.subscriptions.echo = peers.clone();
double_echo.subscriptions.ready = peers.clone();
double_echo.subscriptions.network_size = params.nb_peers;

let msg = SubscriptionsView {
echo: peers.clone(),
ready: peers.clone(),
network_size: params.nb_peers,
};

subscriptions_view_sender.send(msg).await.unwrap();

double_echo.spawn_task_manager(subscriptions_view_receiver, task_manager_message_receiver);

let certificates =
create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], n as usize);

let double_echo_selected_echo = double_echo
.subscriptions
.echo
.iter()
.take(double_echo.params.echo_threshold)
.cloned()
.collect::<Vec<_>>();

let double_echo_selected_ready = double_echo
.subscriptions
.ready
.iter()
.take(double_echo.params.delivery_threshold)
.cloned()
.collect::<Vec<_>>();

for cert in &certificates {
double_echo.broadcast(cert.clone(), true).await;
}

for cert in &certificates {
for p in &double_echo_selected_echo {
double_echo.handle_echo(*p, cert.id).await;
}

for p in &double_echo_selected_ready {
double_echo.handle_ready(*p, cert.id).await;
}
}

let mut count = 0;

while let Some(event) = ctx.event_receiver.recv().await {
if let ProtocolEvents::CertificateDelivered { .. } = event {
count += 1;

if count == n {
break;
}
}
}
}
61 changes: 0 additions & 61 deletions crates/topos-tce-broadcast/benches/task_manager_channels.rs

This file was deleted.

62 changes: 0 additions & 62 deletions crates/topos-tce-broadcast/benches/task_manager_futures.rs

This file was deleted.

24 changes: 24 additions & 0 deletions crates/topos-tce-broadcast/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@ lazy_static! {
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Size of the channel between double echo and the task manager
pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize =
std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(20_480);
/// Size of the channel to send protocol events from the double echo
pub static ref PROTOCOL_CHANNEL_SIZE: usize =
std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Size of the channel to send updated subscriptions views to the double echo
pub static ref SUBSCRIPTION_VIEW_CHANNEL_SIZE: usize =
std::env::var("TOPOS_SUBSCRIPTION_VIEW_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Size of the channel to send updated subscriptions views to the double echo
pub static ref BROADCAST_TASK_COMPLETION_CHANNEL_SIZE: usize =
std::env::var("BROADCAST_TASK_COMPLETION_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Capacity alert threshold for the double echo command channel
pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE
.checked_mul(10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod status;

pub use status::Status;

#[derive(Debug)]
pub struct BroadcastState {
subscriptions_view: SubscriptionsView,
status: Status,
Expand Down Expand Up @@ -97,7 +98,9 @@ impl BroadcastState {
let event = ProtocolEvents::Ready {
certificate_id: self.certificate.id,
};
self.event_sender.try_send(event).unwrap();
if let Err(e) = self.event_sender.try_send(event) {
warn!("Error sending Ready message: {}", e);
}

self.status = self.status.ready_sent();

Expand Down
Loading
Loading