Skip to content

Commit

Permalink
net: implement ping_node() in OutboundSession and ping self before se…
Browse files Browse the repository at this point in the history
…nding own address in ProtocolAddr, ProtocolSeed

also clean up GreylistRefinery accordingly
  • Loading branch information
lunar-mining committed Dec 15, 2023
1 parent 30bd6dd commit 54951e5
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 87 deletions.
57 changes: 34 additions & 23 deletions src/net/protocol/protocol_address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use std::sync::Arc;
use std::{sync::Arc, time::UNIX_EPOCH};

use async_trait::async_trait;
use log::debug;
Expand All @@ -29,6 +29,7 @@ use super::{
message::{AddrsMessage, GetAddrsMessage},
message_subscriber::MessageSubscription,
p2p::P2pPtr,
session::{OutboundSessionPtr, SESSION_OUTBOUND},
settings::SettingsPtr,
},
protocol_base::{ProtocolBase, ProtocolBasePtr},
Expand All @@ -48,6 +49,8 @@ pub struct ProtocolAddress {
get_addrs_sub: MessageSubscription<GetAddrsMessage>,
hosts: HostsPtr,
settings: SettingsPtr,
// We require this to access ping_self() method.
session: OutboundSessionPtr,
jobsman: ProtocolJobsManagerPtr,
}

Expand All @@ -60,6 +63,7 @@ impl ProtocolAddress {
pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
let settings = p2p.settings();
let hosts = p2p.hosts();
let session = p2p.session_outbound();

// Creates a subscription to address message
let addrs_sub =
Expand All @@ -75,6 +79,7 @@ impl ProtocolAddress {
get_addrs_sub,
hosts,
jobsman: ProtocolJobsManager::new(PROTO_NAME, channel),
session,
settings,
})
}
Expand All @@ -84,14 +89,14 @@ impl ProtocolAddress {
/// received addresses to the greylist.
async fn handle_receive_addrs(self: Arc<Self>) -> Result<()> {
debug!(
target: "net::protocol_address::handle_receive_addrs2()",
target: "net::protocol_address::handle_receive_addrs()",
"[START] address={}", self.channel.address(),
);

loop {
let addrs_msg = self.addrs_sub.receive().await?;
debug!(
target: "net::protocol_address::handle_receive_addrs2()",
target: "net::protocol_address::handle_receive_addrs()",
"Received {} addrs from {}", addrs_msg.addrs.len(), self.channel.address(),
);

Expand Down Expand Up @@ -151,21 +156,23 @@ impl ProtocolAddress {
}
}

// We ignore this method for now as it's not part of the new protocol.
// TODO: evaluate whether we need to reimplement this.
//async fn send_my_addrs(self: Arc<Self>) -> Result<()> {
// debug!(
// target: "net::protocol_address::send_my_addrs()",
// "[START] address={}", self.channel.address(),
// );

// // FIXME: Revisit this. Why do we keep sending it?
// loop {
// let ext_addr_msg = AddrsMessage { addrs: self.settings.external_addrs.clone() };
// self.channel.send(&ext_addr_msg).await?;
// sleep(900).await;
// }
//}
async fn send_my_addrs(self: Arc<Self>) -> Result<()> {
debug!(
target: "net::protocol_address::send_my_addrs()",
"[START] address={}", self.channel.address(),
);

// See if we can do a version exchange with ourself.
if self.session.ping_node(self.channel.address()).await {
// We're online. Broadcast our address.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
let addrs = vec![(self.channel.address().clone(), last_seen)];
let addrs_msg = AddrsMessage { addrs };
self.channel.send(&addrs_msg).await?;
}

Ok(())
}
}

#[async_trait]
Expand All @@ -175,14 +182,18 @@ impl ProtocolBase for ProtocolAddress {
async fn start(self: Arc<Self>, ex: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net::protocol_address::start()", "START => address={}", self.channel.address());

//let type_id = self.channel.session_type_id();
let type_id = self.channel.session_type_id();

self.jobsman.clone().start(ex.clone());

//// If it's an outbound session + has an extern_addr, send our address.
//if type_id == SESSION_OUTBOUND && !self.settings.external_addrs.is_empty() {
// self.jobsman.clone().spawn(self.clone().send_my_addrs(), ex.clone()).await;
//}
// If it's an outbound session, we have an extern_addr, and address advertising
// is enabled, send our address.
if type_id == SESSION_OUTBOUND &&
!self.settings.external_addrs.is_empty() &&
self.settings.advertise
{
self.jobsman.clone().spawn(self.clone().send_my_addrs(), ex.clone()).await;
}

self.jobsman.clone().spawn(self.clone().handle_receive_addrs(), ex.clone()).await;
self.jobsman.spawn(self.clone().handle_receive_get_addrs(), ex).await;
Expand Down
23 changes: 17 additions & 6 deletions src/net/protocol/protocol_seed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::{
message::{AddrsMessage, GetAddrsMessage},
message_subscriber::MessageSubscription,
p2p::P2pPtr,
session::OutboundSessionPtr,
settings::SettingsPtr,
},
protocol_base::{ProtocolBase, ProtocolBasePtr},
Expand All @@ -41,6 +42,8 @@ pub struct ProtocolSeed {
hosts: HostsPtr,
settings: SettingsPtr,
addr_sub: MessageSubscription<AddrsMessage>,
// We require this to access ping_self() method.
session: OutboundSessionPtr,
}

const PROTO_NAME: &str = "ProtocolSeed";
Expand All @@ -50,12 +53,13 @@ impl ProtocolSeed {
pub async fn init(channel: ChannelPtr, p2p: P2pPtr) -> ProtocolBasePtr {
let hosts = p2p.hosts();
let settings = p2p.settings();
let session = p2p.session_outbound();

// Create a subscription to address message
let addr_sub =
channel.subscribe_msg::<AddrsMessage>().await.expect("Missing addr dispatcher!");

Arc::new(Self { channel, hosts, settings, addr_sub })
Arc::new(Self { channel, hosts, settings, addr_sub, session })
}

/// Sends own external addresses over a channel. Imports own external addresses
Expand All @@ -68,13 +72,19 @@ impl ProtocolSeed {
return Ok(())
}

// We set last_seen to now. TODO: ponder this a bit more. We're just reading external addrs
// from settings- that doesn't mean they will be reachable.
// Perhaps this should just send a standard Addr...
// Do nothing if advertise is set to false
if self.settings.advertise {
return Ok(())
}

let mut addrs = vec![];
for addr in self.settings.external_addrs.clone() {
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
addrs.push((addr, last_seen));
// See if we can do a version exchange with ourself.
if self.session.ping_node(&addr).await {
// We're online. Update last_seen and broadcast our address.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
addrs.push((addr, last_seen));
}
}

debug!(
Expand All @@ -97,6 +107,7 @@ impl ProtocolBase for ProtocolSeed {
async fn start(self: Arc<Self>, _ex: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "net::protocol_seed::start()", "START => address={}", self.channel.address());

// TODO: only do this if "advertise" is true
// Send own address to the seed server
self.send_self_address().await?;

Expand Down
123 changes: 65 additions & 58 deletions src/net/session/outbound_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,52 @@ impl OutboundSession {
fn wakeup_peer_discovery(&self) {
self.peer_discovery.notify()
}

async fn wakeup_slots(&self) {
let slots = &*self.slots.lock().await;
for slot in slots {
slot.notify();
}
}

// Ping a node to check it's online.
// TODO: make this an actual ping-pong method, rather than a version exchange.
pub async fn ping_node(&self, addr: &Url) -> bool {
let p2p = self.p2p();
let session = &self.p2p().session_outbound();
let parent = Arc::downgrade(session);
let connector = Connector::new(p2p.settings(), parent);

debug!(target: "net::outbound_session::ping_node()", "Attempting to connect to {}", addr);
match connector.connect(addr).await {
Ok((_url, channel)) => {
debug!(target: "net::outbound_session::ping_node()", "Connected successfully!");
let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;

let handshake_task =
session.perform_handshake_protocols(proto_ver, channel.clone(), p2p.executor());

channel.clone().start(p2p.executor());

match handshake_task.await {
Ok(()) => {
debug!(target: "net::outbound_sesion::ping_node()", "Handshake success! Stopping channel.");
channel.stop().await;
true
}
Err(e) => {
debug!(target: "net::outbound_session::ping_node()", "Handshake failure! {}", e);
false
}
}
}

Err(e) => {
debug!(target: "net::outbound_sesion::ping_node()", "Failed to connect to {}, ({})", addr, e);
false
}
}
}
}

#[async_trait]
Expand Down Expand Up @@ -605,70 +645,37 @@ impl GreylistRefinery {
let entry = &greylist[position];
let url = &entry.0;

let parent = Arc::downgrade(&self.session());
//let parent = Arc::downgrade(&self.session());

let mut greylist = hosts.greylist.write().await;
let mut whitelist = hosts.whitelist.write().await;

let connector = Connector::new(p2p.settings(), parent);
debug!(target: "net::greylist_refinery::run()", "Connecting to {}", url);
match connector.connect(url).await {
Ok((_url, channel)) => {
debug!(target: "net::greylist_refinery::run()", "Connected successfully!");
let proto_ver = ProtocolVersion::new(channel.clone(), p2p.settings()).await;

let handshake_task = session.perform_handshake_protocols(
proto_ver,
channel.clone(),
p2p.executor(),
);

channel.clone().start(p2p.executor());

match handshake_task.await {
Ok(()) => {
debug!(target: "net::greylist_refinery::run()", "Handshake success! Stopping channel.");
channel.stop().await;

// Peer is responsive. Update last_seen and add it to the whitelist.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();

// Remove oldest element if the whitelist reaches max size.
if whitelist.len() == 1000 {
// Last element in vector should have the oldest timestamp.
// This should never crash as only returns None when whitelist len() == 0.
let entry = whitelist.pop().unwrap();
debug!(target: "net::greylist_refinery::run()", "Whitelist reached max size. Removed host {}", entry.0);
}

// Append to the whitelist.
debug!(target: "net::greylist_refinery::run()", "Adding peer {} to whitelist", url);
whitelist.push((url.clone(), last_seen));

// Sort whitelist by last_seen.
whitelist.sort_unstable_by_key(|entry| entry.1);

debug!(target: "net::greylist_refinery::run()", "Sorted whitelist: {:?}", whitelist);

// Remove whitelisted peer from the greylist.
debug!(target: "net::greylist_refinery::run()", "Removing whitelisted peer {} from greylist", url);
greylist.remove(position);
}
Err(e) => {
debug!(target: "net::hosts::probe_node()", "Handshake failure! {}", e);
// Peer is not responsive. Remove it from the greylist.
greylist.remove(position);
debug!(target: "net::hosts::refresh_greylist()", "Peer {} is not response. Removed from greylist", url);
}
}
if session.ping_node(url).await {
// Peer is responsive. Update last_seen and add it to the whitelist.
let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
// Remove oldest element if the whitelist reaches max size.
if whitelist.len() == 1000 {
// Last element in vector should have the oldest timestamp.
// This should never crash as only returns None when whitelist len() == 0.
let entry = whitelist.pop().unwrap();
debug!(target: "net::greylist_refinery::run()", "Whitelist reached max size. Removed host {}", entry.0);
}

Err(e) => {
debug!(target: "net::hosts::probe_node()", "Failed to connect to {}, ({})", url, e);
// Peer is not responsive. Remove it from the greylist.
greylist.remove(position);
debug!(target: "net::hosts::refresh_greylist()", "Peer {} is not response. Removed from greylist", url);
}
// Append to the whitelist.
debug!(target: "net::greylist_refinery::run()", "Adding peer {} to whitelist", url);
whitelist.push((url.clone(), last_seen));

// Sort whitelist by last_seen.
whitelist.sort_unstable_by_key(|entry| entry.1);

debug!(target: "net::greylist_refinery::run()", "Sorted whitelist: {:?}", whitelist);

// Remove whitelisted peer from the greylist.
debug!(target: "net::greylist_refinery::run()", "Removing whitelisted peer {} from greylist", url);
greylist.remove(position);
} else {
greylist.remove(position);
debug!(target: "net::hosts::refresh_greylist()", "Peer {} is not response. Removed from greylist", url);
}
}

Expand Down

0 comments on commit 54951e5

Please sign in to comment.