Skip to content

Commit

Permalink
feat: update double echo to use pending CF (#418)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Jan 12, 2024
1 parent 5b0257b commit 8fb4003
Show file tree
Hide file tree
Showing 25 changed files with 334 additions and 176 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/topos-p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub struct NetworkConfig {
pub discovery: DiscoveryConfig,
pub yamux_max_buffer_size: usize,
pub yamux_window_size: Option<u32>,
pub is_bootnode: bool,
}

impl Default for NetworkConfig {
Expand All @@ -18,6 +19,7 @@ impl Default for NetworkConfig {
discovery: Default::default(),
yamux_max_buffer_size: usize::MAX,
yamux_window_size: None,
is_bootnode: false,
}
}
}
Expand Down
15 changes: 10 additions & 5 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ impl<'a> NetworkBuilder<'a> {
self
}

pub fn public_addresses(mut self, addresses: Vec<Multiaddr>) -> Self {
self.public_addresses = Some(addresses);
pub fn public_addresses<M: Into<Vec<Multiaddr>>>(mut self, addresses: M) -> Self {
self.public_addresses = Some(addresses.into());

self
}

pub fn listen_addresses(mut self, addresses: Vec<Multiaddr>) -> Self {
self.listen_addresses = Some(addresses);
pub fn listen_addresses<M: Into<Vec<Multiaddr>>>(mut self, addresses: M) -> Self {
self.listen_addresses = Some(addresses.into());

self
}
Expand Down Expand Up @@ -200,7 +200,6 @@ impl<'a> NetworkBuilder<'a> {
swarm,
config: self.config,
peer_set: self.known_peers.iter().map(|(p, _)| *p).collect(),
is_boot_node: self.known_peers.is_empty(),
command_receiver,
event_sender,
local_peer_id: peer_id,
Expand All @@ -213,4 +212,10 @@ impl<'a> NetworkBuilder<'a> {
},
))
}

pub fn is_bootnode(mut self, is_bootnode: bool) -> Self {
self.config.is_bootnode = is_bootnode;

self
}
}
15 changes: 4 additions & 11 deletions crates/topos-p2p/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub struct Runtime {
pub(crate) listening_on: Vec<Multiaddr>,
pub(crate) public_addresses: Vec<Multiaddr>,
pub(crate) bootstrapped: bool,
pub(crate) is_boot_node: bool,

/// Contains current listenerId of the swarm
pub active_listeners: HashSet<ListenerId>,
Expand Down Expand Up @@ -74,22 +73,16 @@ impl Runtime {
}
}

debug!("Starting a boot node ? {:?}", self.is_boot_node);
if !self.is_boot_node {
debug!("Starting a boot node ? {:?}", self.config.is_bootnode);
if !self.config.is_bootnode {
// First we need to be known and known some peers before publishing our addresses to
// the network.
let mut publish_retry = self.config.publish_retry;

// We were able to send the DHT query, starting the bootstrap
// We may want to remove the bootstrap at some point
if self
.swarm
.behaviour_mut()
.discovery
.inner
.bootstrap()
.is_err()
{
if let Err(error) = self.swarm.behaviour_mut().discovery.inner.bootstrap() {
error!("Unable to start kademlia bootstrap: {error:?}");
return Err(Box::new(P2PError::BootstrapError(
"Unable to start kademlia bootstrap",
)));
Expand Down
5 changes: 5 additions & 0 deletions crates/topos-p2p/src/tests/command/random_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ async fn no_random_peer() {

let (client, _, runtime) = crate::network::builder()
.peer_key(local.keypair.clone())
.public_addresses(&[local.addr.clone()])
.listen_addresses(&[local.addr.clone()])
.public_addresses(vec![local.addr.clone()])
.listen_addresses(vec![local.addr.clone()])
.is_bootnode(true)
.build()
.await
.expect("Unable to create p2p network");
Expand Down Expand Up @@ -48,6 +51,7 @@ async fn return_a_peer() {
.peer_key(local.keypair.clone())
.public_addresses(vec![local.addr.clone()])
.listen_addresses(vec![local.addr.clone()])
.is_bootnode(true)
.build()
.await
.expect("Unable to create p2p network");
Expand Down Expand Up @@ -77,6 +81,7 @@ async fn return_a_random_peer_among_100() {
.peer_key(local.keypair.clone())
.public_addresses(vec![local.addr.clone()])
.listen_addresses(vec![local.addr.clone()])
.is_bootnode(true)
.build()
.await
.expect("Unable to create p2p network");
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-broadcast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ serde.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
tracing.workspace = true
tce_transport = { package = "topos-tce-transport", path = "../topos-tce-transport"}
Expand Down
24 changes: 6 additions & 18 deletions crates/topos-tce-broadcast/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,6 @@ lazy_static! {
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Size of the channel to send updated subscriptions views to the double echo
pub static ref SUBSCRIPTION_VIEW_CHANNEL_SIZE: usize =
std::env::var("TOPOS_SUBSCRIPTION_VIEW_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Size of the channel to send updated subscriptions views to the double echo
pub static ref BROADCAST_TASK_COMPLETION_CHANNEL_SIZE: usize =
std::env::var("BROADCAST_TASK_COMPLETION_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
/// Capacity alert threshold for the double echo command channel
pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE
.checked_mul(10)
Expand All @@ -39,10 +27,10 @@ lazy_static! {
r
})
.unwrap_or(*COMMAND_CHANNEL_SIZE);
/// Size of the double echo buffer
pub static ref TOPOS_DOUBLE_ECHO_MAX_BUFFER_SIZE: usize =
std::env::var("TOPOS_BROADCAST_MAX_BUFFER_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(crate::double_echo::DoubleEcho::MAX_BUFFER_SIZE);
///
pub static ref PENDING_LIMIT_PER_REQUEST_TO_STORAGE: usize =
std::env::var("TOPOS_PENDING_LIMIT_PER_REQUEST_TO_STORAGE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1000);
}
13 changes: 8 additions & 5 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use tce_transport::{ProtocolEvents, ReliableBroadcastParams};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use topos_core::{
types::ValidatorId,
uci::{Certificate, CertificateId},
Expand Down Expand Up @@ -52,6 +53,8 @@ pub struct DoubleEcho {
pub validators: HashSet<ValidatorId>,
pub validator_store: Arc<ValidatorStore>,
pub broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,

pub task_manager_cancellation: CancellationToken,
}

impl DoubleEcho {
Expand Down Expand Up @@ -86,6 +89,7 @@ impl DoubleEcho {
shutdown,
validator_store,
broadcast_sender,
task_manager_cancellation: CancellationToken::new(),
}
}

Expand All @@ -95,7 +99,7 @@ impl DoubleEcho {
) -> mpsc::Receiver<(CertificateId, TaskStatus)> {
let (task_completion_sender, task_completion_receiver) = mpsc::channel(2048);

let (task_manager, shutdown_receiver) = crate::task_manager::TaskManager::new(
let task_manager = crate::task_manager::TaskManager::new(
task_manager_message_receiver,
task_completion_sender,
self.subscriptions.clone(),
Expand All @@ -107,7 +111,7 @@ impl DoubleEcho {
self.broadcast_sender.clone(),
);

tokio::spawn(task_manager.run(shutdown_receiver));
tokio::spawn(task_manager.run(self.task_manager_cancellation.child_token()));

task_completion_receiver
}
Expand All @@ -133,14 +137,13 @@ impl DoubleEcho {

shutdown = self.shutdown.recv() => {
warn!("Double echo shutdown signal received {:?}", shutdown);
self.task_manager_cancellation.cancel();
break shutdown;
},
Some(command) = self.command_receiver.recv() => {
match command {

DoubleEchoCommand::Broadcast { need_gossip, cert } => {
_ = self.broadcast(cert, need_gossip).await;
},
DoubleEchoCommand::Broadcast { need_gossip, cert } => self.broadcast(cert, need_gossip).await,

command if self.subscriptions.is_some() => {
match command {
Expand Down
Loading

0 comments on commit 8fb4003

Please sign in to comment.