Skip to content

Commit

Permalink
output events from event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
momosh-ethernal committed Aug 20, 2024
1 parent 5d33920 commit 17fc73e
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 128 deletions.
50 changes: 12 additions & 38 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use color_eyre::{
use kate_recovery::com::AppData;
use kate_recovery::matrix::Partition;
use std::{fs, path::Path, sync::Arc};
use tokio::sync::{broadcast, mpsc};
use tokio::sync::broadcast;
use tracing::trace;
use tracing::{error, info, span, warn, Level};

Expand Down Expand Up @@ -96,33 +96,19 @@ async fn run(
.wrap_err("Unable to initialize OpenTelemetry service")?,
);

// Create sender channel for P2P event loop commands
let (p2p_event_loop_sender, p2p_event_loop_receiver) = mpsc::unbounded_channel();

let p2p_event_loop = p2p::EventLoop::new(
let (p2p_client, p2p_event_loop, _) = p2p::init(
cfg.libp2p.clone(),
id_keys,
version,
&cfg.genesis_hash,
&id_keys,
cfg.is_fat_client(),
shutdown.clone(),
#[cfg(feature = "kademlia-rocksdb")]
db.inner(),
);
)
.await?;

spawn_in_span(
shutdown.with_cancel(
p2p_event_loop
.await
.run(ot_metrics.clone(), p2p_event_loop_receiver),
),
);

let p2p_client = p2p::Client::new(
p2p_event_loop_sender,
cfg.libp2p.dht_parallelization_limit,
cfg.libp2p.kademlia.kad_record_ttl,
);
spawn_in_span(shutdown.with_cancel(p2p_event_loop.run(ot_metrics.clone())));

// Start listening on provided port
p2p_client
Expand Down Expand Up @@ -381,33 +367,21 @@ async fn run_fat(
.wrap_err("Unable to initialize OpenTelemetry service")?,
);

// Create sender channel for P2P event loop commands
let (p2p_event_loop_sender, p2p_event_loop_receiver) = mpsc::unbounded_channel();

let p2p_event_loop = p2p::EventLoop::new(
let (p2p_client, p2p_event_loop, _) = p2p::init(
cfg.libp2p.clone(),
id_keys,
version,
&cfg.genesis_hash,
&id_keys,
cfg.is_fat_client(),
shutdown.clone(),
#[cfg(feature = "kademlia-rocksdb")]
db.inner(),
);
)
.await?;

spawn_in_span(
shutdown.with_cancel(
p2p_event_loop
.await
.run(ot_metrics.clone(), p2p_event_loop_receiver),
),
);
// Create sender channel for P2P event loop commands

let p2p_client = p2p::Client::new(
p2p_event_loop_sender,
cfg.libp2p.dht_parallelization_limit,
cfg.libp2p.kademlia.kad_record_ttl,
);
spawn_in_span(shutdown.with_cancel(p2p_event_loop.run(ot_metrics.clone())));

// Start listening on provided port
p2p_client
Expand Down
73 changes: 68 additions & 5 deletions core/src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ use color_eyre::{eyre::WrapErr, Report, Result};
use configuration::LibP2PConfig;
use libp2p::{
autonat, dcutr, identify,
identity::{self, ed25519},
kad::{self, PeerRecord},
identity::{self, ed25519, Keypair},
kad::{self, Mode, PeerRecord, QueryStats},
mdns, noise, ping, relay,
swarm::NetworkBehaviour,
tcp, upnp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder,
};
use multihash::{self, Hasher};
use semver::Version;
use serde::{Deserialize, Serialize};
use std::{fmt, net::Ipv4Addr, str::FromStr};
use tokio::sync::oneshot;
use std::{fmt, net::Ipv4Addr, str::FromStr, sync::Arc, time::Duration};
use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::info;
#[cfg(feature = "network-analysis")]
pub mod analyzer;
Expand All @@ -26,6 +26,7 @@ mod kad_mem_store;
mod kad_rocksdb_store;
use crate::{
data::{Database, P2PKeypairKey, RocksDB},
shutdown::Controller,
types::SecretKey,
};
pub use client::Client;
Expand All @@ -49,6 +50,33 @@ Bootstrap node list must not be empty.
Either use a '--network' flag or add a list of bootstrap nodes in the configuration file.
"#;

#[derive(Clone)]
pub enum OutputEvent {
IncomingGetRecord,
IncomingPutRecord,
KadModeChange(Mode),
PutRecordOk {
key: kad::RecordKey,
stats: QueryStats,
},
PutRecordQuorumFailed {
key: kad::RecordKey,
stats: QueryStats,
},
PutRecordTimeout {
key: kad::RecordKey,
stats: QueryStats,
},
NatStatusPrivate,
Ping(Duration),
IncomingConnection,
IncomingConnectionError,
MultiaddressUpdate(Multiaddr),
EstablishedConnection,
OutgoingConnectionError,
Count,
}

#[derive(Clone)]
struct AgentVersion {
pub base_version: String,
Expand Down Expand Up @@ -175,11 +203,46 @@ fn protocol_name(genesis_hash: &str) -> libp2p::StreamProtocol {
.expect("Invalid Kademlia protocol name")
}

pub async fn init(
cfg: LibP2PConfig,
id_keys: Keypair,
version: &str,
genesis_hash: &str,
is_fat: bool,
shutdown: Controller<String>,
#[cfg(feature = "kademlia-rocksdb")] db: Arc<rocksdb::DB>,
) -> Result<(Client, EventLoop, broadcast::Receiver<OutputEvent>)> {
// create sender channel for P2P event loop commands
let (command_sender, command_receiver) = mpsc::unbounded_channel();
// create P2P Client
let client = Client::new(
command_sender,
cfg.dht_parallelization_limit,
cfg.kademlia.kad_record_ttl,
);
// create Store
let store = Store::with_config(
id_keys.public().to_peer_id(),
(&cfg).into(),
#[cfg(feature = "kademlia-rocksdb")]
db,
);
// create Swarm
let swarm = build_swarm(&cfg, version, genesis_hash, &id_keys, store)
.await
.expect("Unable to build swarm.");
let (event_sender, event_receiver) = broadcast::channel(1000);
// create EventLoop
let event_loop = EventLoop::new(cfg, swarm, is_fat, command_receiver, event_sender, shutdown);

Ok((client, event_loop, event_receiver))
}

async fn build_swarm(
cfg: &LibP2PConfig,
version: &str,
genesis_hash: &str,
id_keys: &libp2p::identity::Keypair,
id_keys: &Keypair,
kad_store: Store,
) -> Result<Swarm<Behaviour>> {
// create Identify Protocol Config
Expand Down
Loading

0 comments on commit 17fc73e

Please sign in to comment.