Skip to content

Commit

Permalink
output events from event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
momosh-ethernal committed Aug 20, 2024
1 parent 4f5427b commit 6b727c2
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 106 deletions.
26 changes: 6 additions & 20 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use color_eyre::{
};
use kate_recovery::com::AppData;
use std::{fs, path::Path, sync::Arc};
use tokio::sync::{broadcast, mpsc};
use tokio::sync::broadcast;
use tracing::trace;
use tracing::{error, info, span, warn, Level};

Expand Down Expand Up @@ -89,33 +89,19 @@ async fn run(
.wrap_err("Unable to initialize OpenTelemetry service")?,
);

// Create sender channel for P2P event loop commands
let (p2p_event_loop_sender, p2p_event_loop_receiver) = mpsc::unbounded_channel();

let p2p_event_loop = p2p::EventLoop::new(
let (p2p_client, p2p_event_loop, _) = p2p::init(
cfg.libp2p.clone(),
id_keys,
version,
&cfg.genesis_hash,
&id_keys,
false,
shutdown.clone(),
#[cfg(feature = "kademlia-rocksdb")]
db.inner(),
);

spawn_in_span(
shutdown.with_cancel(
p2p_event_loop
.await
.run(ot_metrics.clone(), p2p_event_loop_receiver),
),
);
)
.await?;

let p2p_client = p2p::Client::new(
p2p_event_loop_sender,
cfg.libp2p.dht_parallelization_limit,
cfg.libp2p.kademlia.kad_record_ttl,
);
spawn_in_span(shutdown.with_cancel(p2p_event_loop.run(ot_metrics.clone())));

let addrs = vec![
cfg.libp2p.tcp_multiaddress(),
Expand Down
73 changes: 68 additions & 5 deletions core/src/network/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use color_eyre::{eyre::WrapErr, Report, Result};
use configuration::LibP2PConfig;
use libp2p::{
autonat, dcutr, identify,
identity::{self, ed25519},
kad::{self, PeerRecord},
identity::{self, ed25519, Keypair},
kad::{self, Mode, PeerRecord, QueryStats},
mdns, noise, ping, relay,
swarm::NetworkBehaviour,
tcp, upnp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder,
Expand All @@ -14,8 +14,8 @@ use multihash::{self, Hasher};
use rand::thread_rng;
use semver::Version;
use serde::{Deserialize, Serialize};
use std::{fmt, net::Ipv4Addr, str::FromStr};
use tokio::sync::oneshot;
use std::{fmt, net::Ipv4Addr, str::FromStr, sync::Arc, time::Duration};
use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::info;
#[cfg(feature = "network-analysis")]
pub mod analyzer;
Expand All @@ -28,6 +28,7 @@ mod kad_mem_store;
mod kad_rocksdb_store;
use crate::{
data::{Database, P2PKeypairKey, RocksDB},
shutdown::Controller,
types::SecretKey,
};
pub use client::Client;
Expand All @@ -51,6 +52,33 @@ Bootstrap node list must not be empty.
Either use a '--network' flag or add a list of bootstrap nodes in the configuration file.
"#;

#[derive(Clone)]
pub enum OutputEvent {
IncomingGetRecord,
IncomingPutRecord,
KadModeChange(Mode),
PutRecordOk {
key: kad::RecordKey,
stats: QueryStats,
},
PutRecordQuorumFailed {
key: kad::RecordKey,
stats: QueryStats,
},
PutRecordTimeout {
key: kad::RecordKey,
stats: QueryStats,
},
NatStatusPrivate,
Ping(Duration),
IncomingConnection,
IncomingConnectionError,
MultiaddressUpdate(Multiaddr),
EstablishedConnection,
OutgoingConnectionError,
Count,
}

#[derive(Clone)]
struct AgentVersion {
pub base_version: String,
Expand Down Expand Up @@ -177,11 +205,46 @@ fn protocol_name(genesis_hash: &str) -> libp2p::StreamProtocol {
.expect("Invalid Kademlia protocol name")
}

pub async fn init(
cfg: LibP2PConfig,
id_keys: Keypair,
version: &str,
genesis_hash: &str,
is_fat: bool,
shutdown: Controller<String>,
#[cfg(feature = "kademlia-rocksdb")] db: Arc<rocksdb::DB>,
) -> Result<(Client, EventLoop, broadcast::Receiver<OutputEvent>)> {
// create sender channel for P2P event loop commands
let (command_sender, command_receiver) = mpsc::unbounded_channel();
// create P2P Client
let client = Client::new(
command_sender,
cfg.dht_parallelization_limit,
cfg.kademlia.kad_record_ttl,
);
// create Store
let store = Store::with_config(
id_keys.public().to_peer_id(),
(&cfg).into(),
#[cfg(feature = "kademlia-rocksdb")]
db,
);
// create Swarm
let swarm = build_swarm(&cfg, version, genesis_hash, &id_keys, store)
.await
.expect("Unable to build swarm.");
let (event_sender, event_receiver) = broadcast::channel(1000);
// create EventLoop
let event_loop = EventLoop::new(cfg, swarm, is_fat, command_receiver, event_sender, shutdown);

Ok((client, event_loop, event_receiver))
}

async fn build_swarm(
cfg: &LibP2PConfig,
version: &str,
genesis_hash: &str,
id_keys: &libp2p::identity::Keypair,
id_keys: &Keypair,
kad_store: Store,
) -> Result<Swarm<Behaviour>> {
// create Identify Protocol Config
Expand Down
Loading

0 comments on commit 6b727c2

Please sign in to comment.