Skip to content

Commit

Permalink
Scanner: Send requests to IP neighbors
Browse files Browse the repository at this point in the history
  • Loading branch information
seamlik committed Mar 16, 2024
1 parent 4d51691 commit f4ec391
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 9 deletions.
2 changes: 1 addition & 1 deletion main/src/network/ip_neighbor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::net::Ipv6Addr;
use std::net::SocketAddrV6;
use thiserror::Error;

pub async fn ip_neighbor_scanner() -> Box<dyn IpNeighborScanner> {
pub async fn ip_neighbor_scanner() -> Box<dyn IpNeighborScanner + Send> {
match crate::os::detect_operating_system().await {
Ok(OperatingSystem::Windows) => Box::new(PowerShellIpNeighborScanner),
Ok(_) => {
Expand Down
76 changes: 68 additions & 8 deletions main/src/scanner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::network::ip_neighbor::IpNeighborScanError;
use crate::network::ip_neighbor::IpNeighborScanner;
use crate::network::udp_receiver::TokioUdpReceiver;
use crate::network::udp_sender::TokioUdpSender;
use crate::network::udp_sender::UdpSender;
Expand All @@ -6,6 +8,7 @@ use crate::response_collector::GrpcResponseCollector;
use crate::response_collector::ResponseCollector;
use anyhow::Context;
use futures_util::stream::BoxStream;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryFutureExt;
Expand All @@ -26,9 +29,10 @@ pub struct Service {

/// Scans for all services being published to `discovery_port`.
pub fn scan(discovery_port: u16) -> impl Stream<Item = Result<Service, ScanError>> {
GrpcResponseCollector::new()
.err_into()
.map_ok(move |c| scan_internal(discovery_port, c, TokioUdpSender, TokioUdpReceiver))
let response_collector = GrpcResponseCollector::new().err_into();
let ip_neighbor_scanner = crate::network::ip_neighbor::ip_neighbor_scanner().map(Ok);
futures_util::future::try_join(response_collector, ip_neighbor_scanner)
.map_ok(move |(c, s)| scan_internal(discovery_port, c, TokioUdpSender, TokioUdpReceiver, s))
.try_flatten_stream()
}

Expand All @@ -37,6 +41,7 @@ fn scan_internal(
response_collector: impl ResponseCollector,
udp_sender: impl UdpSender + Send + 'static,
udp_receiver: impl DiscoveryPacketReceiver + Send + 'static,
ip_neighbor_scanner: Box<dyn IpNeighborScanner + Send>,
) -> BoxStream<'static, Result<Service, ScanError>> {
if discovery_port == 0 {
return futures_util::stream::once(async { Err(ScanError::InvalidDiscoveryPort) }).boxed();
Expand All @@ -50,25 +55,44 @@ fn scan_internal(
response_collector.collect().map_err(Into::into),
receive_announcements(udp_receiver, discovery_ip, discovery_port),
);
crate::stream::join(send_requests(udp_sender, request, discovery_port), services).boxed()
crate::stream::join(
send_requests(udp_sender, ip_neighbor_scanner, request, discovery_port),
services,
)
.boxed()
}

async fn send_requests(
udp_sender: impl UdpSender,
ip_neighbor_scanner: Box<dyn IpNeighborScanner + Send>,
request: Request,
discovery_port: u16,
) -> Result<(), ScanError> {
let multicast_address = SocketAddrV6::new(crate::get_discovery_ip(), discovery_port, 0, 0);
let packet: DiscoveryPacket = request.into();
let packet_bytes: Arc<[u8]> = packet.encode_to_vec().into();
let multicast_ip = crate::get_discovery_ip();

let multicast_address = SocketAddrV6::new(multicast_ip, discovery_port, 0, 0);
log::debug!(
"Sending {:?} to multicast address {}",
packet,
multicast_address
);
let packet_bytes: Arc<[u8]> = packet.encode_to_vec().into();
udp_sender
.send_multicast(multicast_address, packet_bytes.clone())
.await?;

for ip_neighbor in ip_neighbor_scanner.scan().await?.into_iter() {
let link_local_address = ip_neighbor.get_socket_address(discovery_port);
log::debug!(
"Sending {:?} to link-local address {}",
packet,
link_local_address
);
udp_sender
.send_unicast(link_local_address, packet_bytes.clone())
.await?;
}
Ok(())
}

Expand Down Expand Up @@ -107,15 +131,20 @@ pub enum ScanError {
InvalidDiscoveryPort,

#[error("Multicast network error")]
Multicast(#[from] std::io::Error),
SendRequest(#[from] std::io::Error),

#[error("gRPC error")]
Grpc(#[from] tonic::transport::Error),
CollectResponse(#[from] tonic::transport::Error),

#[error("IP neighbor scan error")]
ScanIpNeighbor(#[from] IpNeighborScanError),
}

#[cfg(test)]
mod test {
use super::*;
use crate::network::ip_neighbor::IpNeighbor;
use crate::network::ip_neighbor::MockIpNeighborScanner;
use crate::network::udp_sender::MockUdpSender;
use crate::packet::MockDiscoveryPacketReceiver;
use crate::response_collector::MockResponseCollector;
Expand Down Expand Up @@ -155,6 +184,17 @@ mod test {
let packet: DiscoveryPacket = request.clone().into();
let packet_bytes: Arc<[u8]> = packet.encode_to_vec().into();

let ip_neighbors = vec![
IpNeighbor {
network_interface_index: 1,
address: "fe80::1:abcd".parse().unwrap(),
},
IpNeighbor {
network_interface_index: 2,
address: "fe80::2:abcd".parse().unwrap(),
},
];

let mut response_collector = MockResponseCollector::new();
response_collector
.expect_get_port()
Expand All @@ -170,19 +210,39 @@ mod test {
.expect_send_multicast()
.with(eq(multicast_address), eq(packet_bytes.clone()))
.return_once(|_, _| async { Ok(()) }.boxed());
udp_sender
.expect_send_unicast()
.with(
eq("[fe80::1:abcd%1]:10".parse::<SocketAddrV6>().unwrap()),
eq(packet_bytes.clone()),
)
.return_once(|_, _| async { Ok(()) }.boxed());
udp_sender
.expect_send_unicast()
.with(
eq("[fe80::2:abcd%2]:10".parse::<SocketAddrV6>().unwrap()),
eq(packet_bytes.clone()),
)
.return_once(|_, _| async { Ok(()) }.boxed());

let mut udp_receiver = MockDiscoveryPacketReceiver::default();
udp_receiver
.expect_receive()
.with(eq(discovery_ip), eq(discovery_port))
.return_once_st(move |_, _| discovery_packets);

let mut ip_neighbor_scanner = MockIpNeighborScanner::default();
ip_neighbor_scanner
.expect_scan()
.return_once(|| async { Ok(ip_neighbors) }.boxed());

// When
let actual_services: Vec<_> = scan_internal(
multicast_address.port(),
response_collector,
udp_sender,
udp_receiver,
Box::new(ip_neighbor_scanner),
)
.take(3)
.try_collect()
Expand Down

0 comments on commit f4ec391

Please sign in to comment.