Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

chore: bootstrapping task-manager through feature flags #266

Merged
merged 24 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1373de3
chore: adding task-manager feature flags
gruberb Jul 21, 2023
25b4983
fix: remove reduntant feature flag
gruberb Jul 21, 2023
5ee8db3
fix: make the taskmanager new function coherent
gruberb Jul 21, 2023
4fd6ff0
fix: cleanup task_managers and tasks
gruberb Jul 21, 2023
7fec500
chore: intatiate and start the task_manager
gruberb Jul 21, 2023
53986bf
Merge branch 'main' into feat/TP-644
gruberb Jul 21, 2023
cdda231
chore: add echo and ready workflow, wip
gruberb Jul 24, 2023
71c33b2
chore: move handle_x functions to taskmanager, wip
gruberb Jul 24, 2023
07f8678
chore: move event_sender to taskmanager
gruberb Jul 24, 2023
5d7ad60
chore: add workflow to futures task manager, wip
gruberb Jul 24, 2023
23f854f
fix: adjust task returns
gruberb Jul 24, 2023
e370f3a
fix: tests, wip
gruberb Jul 24, 2023
88550ea
fix: test errors, wip
gruberb Jul 24, 2023
6668fab
fix: debugging tests
gruberb Jul 25, 2023
a1d9b74
fix: add hint to restructure the broadcast, wip
gruberb Jul 25, 2023
757b3c1
chore: refactor task_manager tests
Freyskeyd Jul 25, 2023
4a8bbb7
fix: pr review, cleaning up tests and clippy messages
gruberb Jul 25, 2023
da16ac7
chore: add task-manager-channels to double-echo
gruberb Jul 25, 2023
d471220
fix: add constants for channel sizes
gruberb Jul 25, 2023
bfffea1
fix: add doc comments for double echo
gruberb Jul 25, 2023
e2c0b0e
chore: adapt task-manager benches to faeture flag
gruberb Jul 26, 2023
2b31655
fix: fix cargo xclippy
gruberb Jul 26, 2023
cc829c3
fix: cert-delivery test, forward subscription view
gruberb Jul 26, 2023
55b0530
fix: pr review
gruberb Jul 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/topos-tce-broadcast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ rand.workspace = true
topos-test-sdk = { path = "../topos-test-sdk/" }

[features]
default = []
task-manager-channels = []

[[bench]]
name = "double_echo"
Expand Down
4 changes: 4 additions & 0 deletions crates/topos-tce-broadcast/benches/double_echo.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};

#[cfg(feature = "task-manager-channels")]
mod task_manager_channels;
#[cfg(not(feature = "task-manager-channels"))]
mod task_manager_futures;

pub fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -11,6 +13,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
.build()
.unwrap();

#[cfg(feature = "task-manager-channels")]
c.bench_function("double_echo with channels", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async {
Expand All @@ -19,6 +22,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
})
});

#[cfg(not(feature = "task-manager-channels"))]
c.bench_function("double_echo with futures", |b| {
b.to_async(FuturesExecutor).iter(|| async {
runtime.block_on(async {
Expand Down
23 changes: 9 additions & 14 deletions crates/topos-tce-broadcast/benches/task_manager_channels.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,26 @@
use std::collections::HashMap;

use rand::Rng;
use tokio::spawn;
use tokio::sync::mpsc;

use tce_transport::ReliableBroadcastParams;
use topos_core::uci::CertificateId;
use topos_p2p::PeerId;
use topos_tce_broadcast::task_manager_channels::{TaskManager, Thresholds};
use topos_tce_broadcast::task_manager_channels::TaskManager;
use topos_tce_broadcast::DoubleEchoCommand;

pub async fn processing_double_echo(n: u64) {
let (message_sender, message_receiver) = mpsc::channel(1024);
let (task_completion_sender, task_completion_receiver) = mpsc::channel(1024);
let (event_sender, mut event_receiver) = mpsc::channel(1024);

let task_manager = TaskManager {
message_receiver,
task_completion: task_completion_receiver,
task_context: HashMap::new(),
thresholds: Thresholds {
echo: n as usize,
ready: n as usize,
delivery: n as usize,
},
let threshold = ReliableBroadcastParams {
echo_threshold: n as usize,
ready_threshold: n as usize,
delivery_threshold: n as usize,
};

spawn(task_manager.run(task_completion_sender, event_sender));
let (task_manager, shutdown_receiver) = TaskManager::new(message_receiver, threshold);

spawn(task_manager.run(event_sender, shutdown_receiver));

let mut certificates = vec![];

Expand Down
27 changes: 11 additions & 16 deletions crates/topos-tce-broadcast/benches/task_manager_futures.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,26 @@
use futures::stream::FuturesUnordered;
use rand::Rng;
use tce_transport::ReliableBroadcastParams;
use tokio::spawn;
use tokio::sync::mpsc;
use topos_core::uci::CertificateId;
use topos_p2p::PeerId;
use topos_tce_broadcast::DoubleEchoCommand;

use topos_tce_broadcast::task_manager_futures::{TaskManager, Thresholds};
use topos_tce_broadcast::task_manager_futures::TaskManager;

pub async fn processing_double_echo(n: u64) {
let (message_sender, message_receiver) = mpsc::channel(1024);
let (task_completion_sender, mut task_completion_receiver) = mpsc::channel(48_000);
let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);

let task_manager = TaskManager {
message_receiver,
task_completion_sender,
tasks: Default::default(),
running_tasks: FuturesUnordered::new(),
thresholds: Thresholds {
echo: n as usize,
ready: n as usize,
delivery: n as usize,
},
shutdown_sender,
let (task_completion_sender, mut task_completion_receiver) = mpsc::channel(40_000);

let thresholds = ReliableBroadcastParams {
echo_threshold: n as usize,
ready_threshold: n as usize,
delivery_threshold: n as usize,
};

let (task_manager, shutdown_receiver) =
TaskManager::new(message_receiver, task_completion_sender, thresholds);

spawn(task_manager.run(shutdown_receiver));

let mut certificates = vec![];
Expand Down
18 changes: 18 additions & 0 deletions crates/topos-tce-broadcast/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ lazy_static! {
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Size of the channel between double echo and the task manager
pub static ref TASK_MANAGER_CHANNEL_SIZE: usize =
std::env::var("TOPOS_TASK_MANAGER_CHANNEL_SIZE")
gruberb marked this conversation as resolved.
Show resolved Hide resolved
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(20_480);
/// Size of the channel to send protocol events from the double echo
pub static ref PROTOCOL_CHANNEL_SIZE: usize =
std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Size of the channel to send updated subscriptions views to the double echo
pub static ref SUBSCRIPTION_VIEW_CHANNEL_SIZE: usize =
std::env::var("TOPOS_SUBSCRIPTION_VIEW_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Capacity alert threshold for the double echo command channel
pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE
.checked_mul(10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod status;

pub use status::Status;

#[derive(Debug)]
pub struct BroadcastState {
subscriptions_view: SubscriptionsView,
status: Status,
Expand Down
Loading