diff --git a/.gitignore b/.gitignore index b627be9..4240a36 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ perf.data* # VSCode /.vscode + +# cross +/zcross \ No newline at end of file diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 23e88fe..f52459a 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -1,5 +1,5 @@ use libp2p::swarm::NetworkBehaviour; -use libp2p::{identify, ping}; +use libp2p::{gossipsub, identify, ping}; #[derive(NetworkBehaviour)] pub struct AnchorBehaviour { @@ -7,6 +7,6 @@ pub struct AnchorBehaviour { pub identify: identify::Behaviour, /// Used for connection health checks. pub ping: ping::Behaviour, - // /// The routing pub-sub mechanism for Anchor. - // pub gossipsub: gossipsub::Behaviour, + /// The routing pub-sub mechanism for Anchor. + pub gossipsub: gossipsub::Behaviour, } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 1b4192b..0b0c25b 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -1,15 +1,19 @@ use crate::behaviour::AnchorBehaviour; +use crate::behaviour::AnchorBehaviourEvent; use crate::keypair_utils::load_private_key; use crate::transport::build_transport; use crate::Config; use futures::StreamExt; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; +use libp2p::gossipsub::{MessageAuthenticity, ValidationMode}; use libp2p::identity::Keypair; use libp2p::multiaddr::Protocol; -use libp2p::{futures, identify, ping, PeerId, Swarm, SwarmBuilder}; +use libp2p::swarm::SwarmEvent; +use libp2p::{futures, gossipsub, identify, ping, PeerId, Swarm, SwarmBuilder}; use std::num::{NonZeroU8, NonZeroUsize}; use std::pin::Pin; +use std::time::Duration; use task_executor::TaskExecutor; use tracing::info; @@ -74,8 +78,18 @@ impl Network { pub async fn run(mut self) { loop { tokio::select! { - _swarm_message = self.swarm.select_next_some() => { - // TODO handle and match swarm messages + swarm_message = self.swarm.select_next_some() => { + match swarm_message { + SwarmEvent::Behaviour(behaviour_event) => match behaviour_event { + AnchorBehaviourEvent::Gossipsub(_ge) => { + // TODO handle gossipsub events + }, + // TODO handle other behaviour events + _ => todo!() + }, + // TODO handle other swarm events + _ => todo!() + } } // TODO match input channels } @@ -84,8 +98,7 @@ impl Network { } fn build_anchor_behaviour(local_keypair: Keypair) -> AnchorBehaviour { - // setup gossipsub - // discv5 + // TODO setup discv5 let identify = { let local_public_key = local_keypair.public(); let identify_config = identify::Config::new("anchor".into(), local_public_key) @@ -94,10 +107,36 @@ fn build_anchor_behaviour(local_keypair: Keypair) -> AnchorBehaviour { identify::Behaviour::new(identify_config) }; + // TODO those values might need to be parameterized based on the network + let slots_per_epoch = 32; + let seconds_per_slot = 12; + let duplicate_cache_time = Duration::from_secs(slots_per_epoch * seconds_per_slot); // 6.4 min + + // TODO revisit gossipsub config + let config = gossipsub::ConfigBuilder::default() + .duplicate_cache_time(duplicate_cache_time) + .flood_publish(false) + .mesh_n(8) //D + .mesh_n_low(6) // Dlo + .mesh_n_high(12) // Dhi + .mesh_outbound_min(4) // Dout + .heartbeat_interval(Duration::from_millis(700)) + .history_length(6) + .history_gossip(4) + .max_ihave_length(1500) + .max_ihave_messages(32) + .validation_mode(ValidationMode::Permissive) + .build() + .unwrap(); + + let peer_id = local_keypair.public().to_peer_id(); + let gossipsub = + gossipsub::Behaviour::new(MessageAuthenticity::Author(peer_id), config).unwrap(); + AnchorBehaviour { identify, ping: ping::Behaviour::default(), - // gossipsub: gossipsub::Behaviour::default(), + gossipsub, } } diff --git a/anchor/src/environment.rs b/anchor/src/environment.rs index 716b131..32eedcc 100644 --- a/anchor/src/environment.rs +++ b/anchor/src/environment.rs @@ -14,6 +14,10 @@ use { tokio::signal::unix::{signal, Signal, SignalKind}, }; +#[cfg(target_family = "windows")] +#[path = "environment_windows.rs"] +mod environment_windows; + /// The maximum time in seconds the client will wait for all internal tasks to shutdown. const MAXIMUM_SHUTDOWN_TIME: u64 = 15; @@ -141,6 +145,25 @@ impl Environment { } } + #[cfg(target_family = "windows")] + pub fn block_until_shutdown_requested(&mut self) -> Result { + let signal_rx = self + .signal_rx + .take() + .ok_or("Inner shutdown already received")?; + + match self + .runtime() + .block_on(environment_windows::handle_shutdown_signals(signal_rx)) + { + Ok(reason) => { + info!(reason = reason.message(), "Internal shutdown received"); + Ok(reason) + } + Err(e) => Err(e), + } + } + /// Shutdown the `tokio` runtime when all tasks are idle. pub fn shutdown_on_idle(self) { match Arc::try_unwrap(self.runtime) { diff --git a/anchor/src/environment_windows.rs b/anchor/src/environment_windows.rs new file mode 100644 index 0000000..25ddfbe --- /dev/null +++ b/anchor/src/environment_windows.rs @@ -0,0 +1,64 @@ +use futures::channel::mpsc::Receiver; +use futures::{future, Future, StreamExt}; +use std::{pin::Pin, task::Context, task::Poll}; +use task_executor::ShutdownReason; +use tokio::signal::windows::{ctrl_c, CtrlC}; +use tracing::error; + +pub(crate) async fn handle_shutdown_signals( + mut signal_rx: Receiver, +) -> Result { + let inner_shutdown = async move { + signal_rx + .next() + .await + .ok_or("Internal shutdown channel exhausted") + }; + futures::pin_mut!(inner_shutdown); + + let register_handlers = async { + let mut handles = vec![]; + + // Setup for handling Ctrl+C + match ctrl_c() { + Ok(ctrl_c) => { + let ctrl_c = SignalFuture::new(ctrl_c, "Received Ctrl+C"); + handles.push(ctrl_c); + } + Err(e) => error!(error = ?e, "Could not register Ctrl+C handler"), + } + + future::select(inner_shutdown, future::select_all(handles.into_iter())).await + }; + + match register_handlers.await { + future::Either::Left((Ok(reason), _)) => Ok(reason), + future::Either::Left((Err(e), _)) => Err(e.into()), + future::Either::Right(((res, _, _), _)) => { + res.ok_or_else(|| "Handler channel closed".to_string()) + } + } +} + +struct SignalFuture { + signal: CtrlC, + message: &'static str, +} + +impl SignalFuture { + pub fn new(signal: CtrlC, message: &'static str) -> SignalFuture { + SignalFuture { signal, message } + } +} + +impl Future for SignalFuture { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.signal.poll_recv(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(_)) => Poll::Ready(Some(ShutdownReason::Success(self.message))), + Poll::Ready(None) => Poll::Ready(None), + } + } +}