Skip to content

Commit

Permalink
add gossipsub
Browse files Browse the repository at this point in the history
  • Loading branch information
magick93 authored and diegomrsantos committed Dec 12, 2024
1 parent 04176ae commit 943c20d
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 9 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ perf.data*

# VSCode
/.vscode

# cross
/zcross
6 changes: 3 additions & 3 deletions anchor/network/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use libp2p::swarm::NetworkBehaviour;
use libp2p::{identify, ping};
use libp2p::{gossipsub, identify, ping};

#[derive(NetworkBehaviour)]
pub struct AnchorBehaviour {
/// Provides IP addresses and peer information.
pub identify: identify::Behaviour,
/// Used for connection health checks.
pub ping: ping::Behaviour,
// /// The routing pub-sub mechanism for Anchor.
// pub gossipsub: gossipsub::Behaviour,
/// The routing pub-sub mechanism for Anchor.
pub gossipsub: gossipsub::Behaviour,
}
51 changes: 45 additions & 6 deletions anchor/network/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use crate::behaviour::AnchorBehaviour;
use crate::behaviour::AnchorBehaviourEvent;
use crate::keypair_utils::load_private_key;
use crate::transport::build_transport;
use crate::Config;
use futures::StreamExt;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::gossipsub::{MessageAuthenticity, ValidationMode};
use libp2p::identity::Keypair;
use libp2p::multiaddr::Protocol;
use libp2p::{futures, identify, ping, PeerId, Swarm, SwarmBuilder};
use libp2p::swarm::SwarmEvent;
use libp2p::{futures, gossipsub, identify, ping, PeerId, Swarm, SwarmBuilder};
use std::num::{NonZeroU8, NonZeroUsize};
use std::pin::Pin;
use std::time::Duration;
use task_executor::TaskExecutor;
use tracing::info;

Expand Down Expand Up @@ -74,8 +78,18 @@ impl Network {
pub async fn run(mut self) {
loop {
tokio::select! {
_swarm_message = self.swarm.select_next_some() => {
// TODO handle and match swarm messages
swarm_message = self.swarm.select_next_some() => {
match swarm_message {
SwarmEvent::Behaviour(behaviour_event) => match behaviour_event {
AnchorBehaviourEvent::Gossipsub(_ge) => {
// TODO handle gossipsub events
},
// TODO handle other behaviour events
_ => todo!()
},
// TODO handle other swarm events
_ => todo!()
}
}
// TODO match input channels
}
Expand All @@ -84,8 +98,7 @@ impl Network {
}

fn build_anchor_behaviour(local_keypair: Keypair) -> AnchorBehaviour {
// setup gossipsub
// discv5
// TODO setup discv5
let identify = {
let local_public_key = local_keypair.public();
let identify_config = identify::Config::new("anchor".into(), local_public_key)
Expand All @@ -94,10 +107,36 @@ fn build_anchor_behaviour(local_keypair: Keypair) -> AnchorBehaviour {
identify::Behaviour::new(identify_config)
};

// TODO those values might need to be parameterized based on the network
let slots_per_epoch = 32;
let seconds_per_slot = 12;
let duplicate_cache_time = Duration::from_secs(slots_per_epoch * seconds_per_slot); // 6.4 min

// TODO revisit gossipsub config
let config = gossipsub::ConfigBuilder::default()
.duplicate_cache_time(duplicate_cache_time)
.flood_publish(false)
.mesh_n(8) //D
.mesh_n_low(6) // Dlo
.mesh_n_high(12) // Dhi
.mesh_outbound_min(4) // Dout
.heartbeat_interval(Duration::from_millis(700))
.history_length(6)
.history_gossip(4)
.max_ihave_length(1500)
.max_ihave_messages(32)
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();

let peer_id = local_keypair.public().to_peer_id();
let gossipsub =
gossipsub::Behaviour::new(MessageAuthenticity::Author(peer_id), config).unwrap();

AnchorBehaviour {
identify,
ping: ping::Behaviour::default(),
// gossipsub: gossipsub::Behaviour::default(),
gossipsub,
}
}

Expand Down
23 changes: 23 additions & 0 deletions anchor/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ use {
tokio::signal::unix::{signal, Signal, SignalKind},
};

#[cfg(target_family = "windows")]
#[path = "environment_windows.rs"]
mod environment_windows;

/// The maximum time in seconds the client will wait for all internal tasks to shutdown.
const MAXIMUM_SHUTDOWN_TIME: u64 = 15;

Expand Down Expand Up @@ -141,6 +145,25 @@ impl Environment {
}
}

#[cfg(target_family = "windows")]
pub fn block_until_shutdown_requested(&mut self) -> Result<ShutdownReason, String> {
let signal_rx = self
.signal_rx
.take()
.ok_or("Inner shutdown already received")?;

match self
.runtime()
.block_on(environment_windows::handle_shutdown_signals(signal_rx))
{
Ok(reason) => {
info!(reason = reason.message(), "Internal shutdown received");
Ok(reason)
}
Err(e) => Err(e),
}
}

/// Shutdown the `tokio` runtime when all tasks are idle.
pub fn shutdown_on_idle(self) {
match Arc::try_unwrap(self.runtime) {
Expand Down
64 changes: 64 additions & 0 deletions anchor/src/environment_windows.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use futures::channel::mpsc::Receiver;
use futures::{future, Future, StreamExt};
use std::{pin::Pin, task::Context, task::Poll};
use task_executor::ShutdownReason;
use tokio::signal::windows::{ctrl_c, CtrlC};
use tracing::error;

pub(crate) async fn handle_shutdown_signals(
mut signal_rx: Receiver<ShutdownReason>,
) -> Result<ShutdownReason, String> {
let inner_shutdown = async move {
signal_rx
.next()
.await
.ok_or("Internal shutdown channel exhausted")
};
futures::pin_mut!(inner_shutdown);

let register_handlers = async {
let mut handles = vec![];

// Setup for handling Ctrl+C
match ctrl_c() {
Ok(ctrl_c) => {
let ctrl_c = SignalFuture::new(ctrl_c, "Received Ctrl+C");
handles.push(ctrl_c);
}
Err(e) => error!(error = ?e, "Could not register Ctrl+C handler"),
}

future::select(inner_shutdown, future::select_all(handles.into_iter())).await
};

match register_handlers.await {
future::Either::Left((Ok(reason), _)) => Ok(reason),
future::Either::Left((Err(e), _)) => Err(e.into()),
future::Either::Right(((res, _, _), _)) => {
res.ok_or_else(|| "Handler channel closed".to_string())
}
}
}

struct SignalFuture {
signal: CtrlC,
message: &'static str,
}

impl SignalFuture {
pub fn new(signal: CtrlC, message: &'static str) -> SignalFuture {
SignalFuture { signal, message }
}
}

impl Future for SignalFuture {
type Output = Option<ShutdownReason>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.signal.poll_recv(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(_)) => Poll::Ready(Some(ShutdownReason::Success(self.message))),
Poll::Ready(None) => Poll::Ready(None),
}
}
}

0 comments on commit 943c20d

Please sign in to comment.